当前位置: 首页 > news >正文

System.IO.Pipelines 与“零拷贝”:在 .NET 打造高吞吐二进制 RPC

System.IO.Pipelines 与“零拷贝”:在 .NET 打造高吞吐二进制 RPC 🚀


目录

  • System.IO.Pipelines 与“零拷贝”:在 .NET 打造高吞吐二进制 RPC 🚀
    • 0. TL;DR —— 为什么选 Pipelines 🎯
    • 1. 帧协议 📦
    • 2. 代码公共部分:帧编解码 🧑‍💻
    • 3. Demo A:TCP + Pipelines 🌐
    • 4. Demo B:HTTP/1.1 + Pipelines 🌍
    • 5. 背压与 `AdvanceTo` 的正确姿势 💡
    • 6. 池化与零分配技巧清单 🛠
    • 7. 并发与流量控制 ⏱
    • 8. 压测与对比 🧑‍💻
    • 9. 可观测性与问题定位 🔍
    • 10. 错误处理与安全 🛡️
    • 11. 仓库结构 🗂️
    • 12. 选型建议 📝


0. TL;DR —— 为什么选 Pipelines 🎯

  • PipeReader.ReadAsync() 返回 ReadOnlySequence<byte>,天然支持跨段缓冲半帧,搭配 AdvanceTo(consumed, examined) 实现背压
  • SequenceReader<byte> 可以在不拷贝到托管数组的情况下解析协议字段;BinaryPrimitives 操作 Span/ReadOnlySpan 更高效。
  • Kestrel 暴露 BodyReader/BodyWriter,HTTP 形态也能享受 Pipes 的收益。
  • 配合内存池、写合并、并发限流,能在吞吐、延迟、分配三项上显著优于传统 Stream

1. 帧协议 📦

大端(网络序)固定头 8 字节

len:uint32   // 含头,总长度
type:uint16  // 0=Ping, 1=Echo, 2=Sum, 0xFFFF=Error
flags:uint16 // bit0=压缩; 其他保留
payload: len-8
  • Ping:空载
  • Echo:原样返回 payload(示例中演示第一段 Span 回声)
  • Sum:payload 为 N 个 int32(BE),返回 int32(BE)之和
  • Error:返回错误码/消息(演示版为简单文本)

2. 代码公共部分:帧编解码 🧑‍💻

src/Rpc.Protocol/Frame.cs

using System;
using System.Buffers;
using System.Buffers.Binary;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;namespace Rpc.Protocol;public static class Frame
{public const int HeaderSize = 8;public const ushort TypePing  = 0;public const ushort TypeEcho  = 1;public const ushort TypeSum   = 2;public const ushort TypeError = 0xFFFF;[MethodImpl(MethodImplOptions.AggressiveInlining)]public static bool TryParseFrame(ref ReadOnlySequence<byte> buffer,out ushort type,out ushort flags,out ReadOnlySequence<byte> payload){type = 0; flags = 0; payload = default;if (buffer.Length < HeaderSize) return false;Span<byte> header = stackalloc byte[HeaderSize];buffer.Slice(0, HeaderSize).CopyTo(header);uint len = BinaryPrimitives.ReadUInt32BigEndian(header);type  = BinaryPrimitives.ReadUInt16BigEndian(header.Slice(4));flags = BinaryPrimitives.ReadUInt16BigEndian(header.Slice(6));if (len < HeaderSize) throw new InvalidOperationException("Invalid length");if (buffer.Length < len) return false; // 半帧var frame = buffer.Slice(0, len);payload = frame.Slice(HeaderSize, len - HeaderSize);buffer = buffer.Slice(len);return true;}public static void WriteFrame(PipeWriter writer, ushort type, ushort flags, ReadOnlySpan<byte> payload){int len = HeaderSize + payload.Length;Span<byte> span = writer.GetSpan(len);BinaryPrimitives.WriteUInt32BigEndian(span, (uint)len);BinaryPrimitives.WriteUInt16BigEndian(span.Slice(4), type);BinaryPrimitives.WriteUInt16BigEndian(span.Slice(6), flags);payload.CopyTo(span.Slice(HeaderSize));writer.Advance(len);}
}

要点

  • 解析时仅拷头部到栈;payload 始终是原始 ReadOnlySequence<byte> 的切片(零拷贝)。
  • 写入时一次性拿到足够 Span,减少 Advance/Flush 次数。
  • len下限校验防御异常输入。

3. Demo A:TCP + Pipelines 🌐

src/Rpc.TcpServer/Program.cs

using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading.Channels;
using Rpc.Protocol;var listener = new TcpListener(IPAddress.Loopback, 5001);
listener.Start();
Console.WriteLine("TCP RPC listening on 127.0.0.1:5001");while (true)
{var client = await listener.AcceptTcpClientAsync();_ = Task.Run(() => Handle(client));
}static async Task Handle(TcpClient client)
{const int MaxInFlight = 32;var workQueue = Channel.CreateBounded<(ushort type, ReadOnlySequence<byte> payload)>(new BoundedChannelOptions(MaxInFlight){SingleReader = true,SingleWriter = true});using var _ = client;client.NoDelay = true;var stream = client.GetStream();var reader = PipeReader.Create(stream, new StreamPipeReaderOptions(bufferSize: 64 * 1024));var networkWriter = PipeWriter.Create(stream, new StreamPipeWriterOptions(MemoryPool<byte>.Shared, 64 * 1024, leaveOpen: true));var sendPipe = new Pipe(new PipeOptions(pool: MemoryPool<byte>.Shared,pauseWriterThreshold: 256 * 1024,resumeWriterThreshold: 128 * 1024));var pumpTask = PumpSendAsync(sendPipe.Reader, networkWriter, flushThreshold: 64 * 1024, flushStopwatchMs: 2);var workerTask = Task.Run(() => WorkerAsync(workQueue.Reader, sendPipe.Writer));try{while (true){var result = await reader.ReadAsync();var buf = result.Buffer;try{while (Frame.TryParseFrame(ref buf, out var type, out var flags, out var payload)){if (payload.Length > 4 * 1024 * 1024){Enqueue(workQueue, (Frame.TypeError, BuildErrorPayload("Payload too large")));continue;}Enqueue(workQueue, (type, payload));}}catch (Exception ex){Enqueue(workQueue, (Frame.TypeError, BuildErrorPayload("Bad frame: " + ex.Message)));}reader.AdvanceTo(buf.Start, buf.End);if (result.IsCompleted) break;}}finally{workQueue.Writer.TryComplete();await workerTask;await sendPipe.Writer.CompleteAsync();await pumpTask;await networkWriter.CompleteAsync();await reader.CompleteAsync();stream.Close();}static void Enqueue(Channel<(ushort, ReadOnlySequence<byte>)> ch, (ushort, ReadOnlySequence<byte>) item){if (!ch.Writer.TryWrite(item)){// 队列已满或关闭,返回错误以施加背压}}
}static async Task WorkerAsync(ChannelReader<(ushort type, ReadOnlySequence<byte> payload)> reader, PipeWriter sendWriter)
{await foreach (var (type, payload) in reader.ReadAllAsync()){try{if (type == Frame.TypePing) { Frame.WriteFrame(sendWriter, Frame.TypePing, 0, ReadOnlySpan<byte>.Empty); continue; }if (type == Frame.TypeEcho) { Frame.WriteFrame(sendWriter, Frame.TypeEcho, 0, payload.FirstSpan); continue; }if (type == Frame.TypeSum){Span<byte> tmp = stackalloc byte[4];int sum = 0;var sr = new System.Buffers.SequenceReader<byte>(payload);while (sr.Remaining >= 4){if (!sr.TryCopyTo(tmp)) break;sr.Advance(4);sum += BinaryPrimitives.ReadInt32BigEndian(tmp);}BinaryPrimitives.WriteInt32BigEndian(tmp, sum);Frame.WriteFrame(sendWriter, Frame.TypeSum, 0, tmp);continue;}Frame.WriteFrame(sendWriter, Frame.TypeError, 0, BuildErrorPayload("Unknown type"));}catch (Exception ex){Frame.WriteFrame(sendWriter, Frame.TypeError, 0, BuildErrorPayload("Handler error: " + ex.Message));}}
}static async Task PumpSendAsync(PipeReader from, PipeWriter to, int flushThreshold, int flushStopwatchMs)
{var sw = System.Diagnostics.Stopwatch.StartNew();int pending = 0;try{while (true){var r = await from.ReadAsync();var buf = r.Buffer;foreach (var seg in buf){var span = to.GetSpan(seg.Length);seg.Span.CopyTo(span);to.Advance(seg.Length);pending += seg.Length;}from.AdvanceTo(buf.End);bool needFlush = pending >= flushThreshold || r.IsCompleted || sw.ElapsedMilliseconds >= flushStopwatchMs;if (needFlush){var fr = await to.FlushAsync();pending = 0;sw.Restart();if (r.IsCompleted || fr.IsCompleted) break;}}}finally{await from.CompleteAsync();}
}static ReadOnlySpan<byte> BuildErrorPayload(string msg)
{return System.Text.Encoding.UTF8.GetBytes(msg);
}

4. Demo B:HTTP/1.1 + Pipelines 🌍

src/Rpc.HttpServer/Program.cs

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Hosting;
using Rpc.Protocol;
using System.Buffers;
using System.Buffers.Binary;
using System.IO.Pipelines;
using System.Threading.Channels;var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();app.MapPost("/rpc", async (HttpContext ctx) =>
{ctx.Response.ContentType = "application/octet-stream";var reader = ctx.Request.BodyReader;var writer = ctx.Response.BodyWriter;var sendPipe = new Pipe(new PipeOptions(pool: MemoryPool<byte>.Shared, pauseWriterThreshold: 256 * 1024, resumeWriterThreshold: 128 * 1024));var pump = PumpSendAsync(sendPipe.Reader, writer, flushThreshold: 64 * 1024, flushStopwatchMs: 2);const int MaxInFlight = 32;var workQueue = Channel.CreateBounded<(ushort type, ReadOnlySequence<byte> payload)>(new BoundedChannelOptions(MaxInFlight){SingleReader = true,SingleWriter = true});var worker = WorkerAsync(workQueue, sendPipe.Writer);try{while (true){var result = await reader.ReadAsync(ctx.RequestAborted);var buf = result.Buffer;try{while (Frame.TryParseFrame(ref buf, out var type, out var flags, out var payload)){if (payload.Length > 4 * 1024 * 1024){workQueue.Writer.TryWrite((Frame.TypeError, new ReadOnlySequence<byte>(BuildErrorPayload("Payload too large").ToArray())));continue;}workQueue.Writer.TryWrite((type, payload));}}catch (Exception ex){workQueue.Writer.TryWrite((Frame.TypeError, new ReadOnlySequence<byte>(System.Text.Encoding.UTF8.GetBytes("Bad frame: " + ex.Message))));}reader.AdvanceTo(buf.Start, buf.End);if (result.IsCompleted) break;}}finally{workQueue.Writer.TryComplete();await worker;await sendPipe.Writer.CompleteAsync();await pump;await writer.FlushAsync();}
});app.Run("http://0.0.0.0:5000");static async Task WorkerAsync(Channel<(ushort type, ReadOnlySequence<byte> payload)> queue, PipeWriter sendWriter)
{await foreach (var (type, payload) in queue.Reader.ReadAllAsync()){try{if (type == Frame.TypePing) { Frame.WriteFrame(sendWriter, Frame.TypePing, 0, ReadOnlySpan<byte>.Empty); continue; }if (type == Frame.TypeEcho) { Frame.WriteFrame(sendWriter, Frame.TypeEcho, 0, payload.FirstSpan); continue; }if (type == Frame.TypeSum){Span<byte> tmp = stackalloc byte[4];int sum = 0;var sr = new System.Buffers.SequenceReader<byte>(payload);while (sr.Remaining >= 4){if (!sr.TryCopyTo(tmp)) break;sr.Advance(4);sum += BinaryPrimitives.ReadInt32BigEndian(tmp);}BinaryPrimitives.WriteInt32BigEndian(tmp, sum);Frame.WriteFrame(sendWriter, Frame.TypeSum, 0, tmp);continue;}Frame.WriteFrame(sendWriter, Frame.TypeError, 0, System.Text.Encoding.UTF8.GetBytes("Unknown type"));}catch (Exception ex){Frame.WriteFrame(sendWriter, Frame.TypeError, 0, System.Text.Encoding.UTF8.GetBytes("Handler error: " + ex.Message));}}
}static async Task PumpSendAsync(PipeReader from, PipeWriter to, int flushThreshold, int flushStopwatchMs)
{var sw = System.Diagnostics.Stopwatch.StartNew();int pending = 0;try{while (true){var r = await from.ReadAsync();var buf = r.Buffer;foreach (var seg in rBuffer){var span = to.GetSpan(seg.Length);seg.Span.CopyTo(span);to.Advance(seg.Length);pending += seg.Length;}from.AdvanceTo(buf.End);bool needFlush = pending >= flushThreshold || r.IsCompleted || sw.ElapsedMilliseconds >= flushStopwatchMs;if (needFlush){var fr = await to.FlushAsync();pending = 0;sw.Restart();if (r.IsCompleted || fr.IsCompleted) break;}}}finally{await from.CompleteAsync();}
}

5. 背压与 AdvanceTo 的正确姿势 💡

读取数据
是否为完整帧
保留半帧
消费并推进:AdvanceTo(consumed, examined)
处理下一帧
  • 半帧处理:当解析失败或数据不足(半帧)时,不要推进 consumed,把 examined 设为当前读取批次的 buffer.End,允许底层继续填充。
  • 避免空转:反复传入同一对 (consumed, examined) 会导致“立即返回”的忙等。
  • 阈值生效范围pauseWriterThreshold/resumeWriterThreshold 仅对自建 Pipe 生效(文中用于发送聚合管道);StreamPipeReader/WriterOptions 是另一类配置(池、缓冲尺寸、是否保留底层流),没有背压阈值配置。

6. 池化与零分配技巧清单 🛠

ArrayPool.Shared
Rent/Return
减少内存分配
内存池(MemoryPool)
IMemoryOwner
生命周期管理
确保资源归还
  • 尽量用切片ReadOnlySequence<byte> + Slice/SequenceReader 在原缓冲上游走,避免 ToArray()

  • 池化策略

    • 小对象:ArrayPool<byte>.Shared.Rent/Return
    • 大块与 Pipe:MemoryPool<byte>.Shared,通过 IMemoryOwner<byte> 生命周期保证归还。
  • 一次取足PipeWriter.GetSpan(expected) → 填充 → Advance(expected)少 flush,多合并

  • 异常分支归还:所有可能抛异常的路径都要归还池化对象 / Complete 管道。


7. 并发与流量控制 ⏱

  • 每连接并发上限:用 Channel<T>(有界)把“解析出的请求”投递给业务处理器,避免把背压转化为“线程风暴”。
  • 工作者数:示例为单消费者(确保对 sendPipe.Writer单线程写);若要多工作者并发处理,请对写入统一串行化(如追加一个“发送队列”或使用单写锁)。
  • 超时与取消:按需在 ReadAsync/FlushAsync/业务处理 引入 CancellationToken 与超时,避免悬挂。

8. 压测与对比 🧑‍💻

Lua 脚本scripts/wrk/rpc.lua

local function be32(n) return string.char((n>>24)&255, (n>>16)&255, (n>>8)&255, n&255) end
local function be16(n) return string.char((n>>8)&255, n&255) endlocal function build_frame(msg_type, payload)local len = 8 + #payloadreturn be32(len) .. be16(msg_type) .. be16(0) .. payload
endwrk.method = "POST"
wrk.headers["Content-Type"] = "application/octet-stream"request = function()if math.random() < 0.5 thenlocal p = string.rep("A", 16)         -- Echoreturn wrk.format(nil, "/rpc", nil, build_frame(1, p))elselocal N = 8                            -- Sumlocal buf = {}for i=1,N dolocal v = math.random(1, 1000)buf[#buf+1] = be32(v)endreturn wrk.format(nil, "/rpc", nil, build_frame(2, table.concat(buf)))end
end

命令
wrk -t8 -c256 -d30s --latency -s scripts/wrk/rpc.lua http://127.0.0.1:5000/rpc

指标关注:RPS、p50/p95/p99、socket errors、CPU、GC(分配/暂停时间)、Flush 次数(侧写 syscalls)。

基线对比:实现一个“传统 Stream”版本(BinaryReader/NetworkStream.ReadExactly),功能一致,作为对照组。


9. 可观测性与问题定位 🔍

  • dotnet-counters

    • System.Runtime:GC 堆大小、Gen0/1/2 次数、分配速率、线程池队列长度/吞吐等
    • Microsoft.AspNetCore.Hosting(HTTP 形态)
  • 自定义指标

    • 每连接活跃请求数、工作队列长度、聚合写累计字节Flush 次数
    • 解析耗时/业务处理耗时
  • 关键日志点Accept → Read → Parse → Enqueue → Handle → Write → FlushConnectionId


10. 错误处理与安全 🛡️

  • 输入校验len 上限、type 白名单;必要时加入 checksum 或 HMAC。
  • DoS 防护:限制并发连接、排队长度、单连接速率/字节上限;空闲与读写超时。
  • TLS:TCP 形态用 SslStream(服务端证书/客户端证书视需求);HTTP 形态交由 Kestrel。
  • 收尾规则:无论正常/异常,确保 CompleteAsync() 与池化对象归还总能发生

11. 仓库结构 🗂️

pipelines-rpc/src/Rpc.Protocol/            # 帧定义、解析器、序列化帮助类(Frame.cs)Rpc.TcpServer/           # Demo A:TcpListener + Pipelines (+ 聚合写 + 并发限流)Rpc.HttpServer/          # Demo B:Kestrel + BodyReader/Writer(同协议/同策略)Rpc.BaselineStream/      # 可选:传统 Stream 基线实现(对照组)scripts/wrk/rpc.lua              # 构造二进制帧;Echo/Sum 混合perf-collect.ps1         # dotnet-counters 收集脚本(可选)README.md                  # 启动/压测指引与期望结果模板

12. 选型建议 📝

  • 优先 Pipelines:自定义二进制协议、复杂帧/多段缓冲、高 RPS/低延迟、网关/代理内核。
  • 继续 Stream:吞吐需求一般、成本优先、协议/处理简单。
  • 与 gRPC 共存:业务开放接口用 gRPC(生态/可维护);内部热路径或代理内核用 Pipelines(极致性能)。

http://www.xdnf.cn/news/1356535.html

相关文章:

  • 关于 svn无法查看下拉日志提示“要离线”和根目录看日志“no data” 的解决方法
  • 编译Marlin 1.1.9.1固件指南
  • 如何理解“向量”
  • 大数据、hadoop、爬虫、spark项目开发设计之基于数据挖掘的交通流量分析研究
  • 数据挖掘 4.1~4.7 机器学习性能评估参数
  • 【软考架构】云计算相关概念
  • 《CF1120D Power Tree》
  • Implementing Redis in C++ : E(AVL树详解)
  • 深入解析Apache Kafka的核心概念:构建高吞吐分布式流处理平台
  • 自动化运维之k8s——Kubernetes集群部署、pod、service微服务、kubernetes网络通信
  • Linux-函数的使用-编写监控脚本
  • Qt——网络通信(UDP/TCP/HTTP)
  • Linux学习-TCP网络协议
  • Linux shell脚本数值计算与条件执行
  • (计算机网络)JWT三部分及 Signature 作用
  • 如何在 IDEA 中在启动 Spring Boot 项目时加参数
  • [Windows] PDF-XChange Editor Plus官方便携版
  • 海盗王3.0客户端从32位升级64位之路
  • 操作系统文件系统
  • [e3nn] 等变神经网络 | 线性层o3.Linear | 非线性nn.Gate
  • Excel 转化成JSON
  • GPT 模型详解:从原理到应用
  • 第16届蓝桥杯C++中高级选拔赛(STEMA)2024年12月22日真题
  • 以国产IoTDB为代表的主流时序数据库架构与性能深度选型评测
  • 对象作为HashMap的key的注意事项
  • 30分钟通关二分查找:C语言实现+LeetCode真题
  • 机器学习算法-朴素贝叶斯
  • 优化OpenHarmony中lspci命令实现直接获取设备具体型号
  • 机械学习综合练习项目
  • 基于SpringBoot的新能源汽车租赁管理系统【2026最新】