当前位置: 首页 > web >正文

ABP VNext + NATS JetStream:高性能事件流处理

🌟 ABP VNext + NATS JetStream:高性能事件流处理 🚀


📚 目录

  • 🌟 ABP VNext + NATS JetStream:高性能事件流处理 🚀
    • 1. 引言 ✨
    • 2. 环境与依赖 🛠️
    • 3. 系统架构 🏗️
    • 4. 配置与依赖注入 🏷️
      • 4.1 `appsettings.json`
      • 4.2 模块注册
    • 5. 发布消息 📤
    • 6. 消费消息 📥
      • 6.1 Push-Consumer(Queue Group) 🤝
      • 6.2 Pull-Consumer(可控拉取) 🔄
    • 7. 死信队列消费示例 💀
    • 8. 集成测试示例(Testcontainers) 🧪
    • 9. 性能测试与对比 📊
    • 10. 实践与注意事项 💡


1. 引言 ✨

ABP VNext 8.x + .NET 8 中集成 NATS.Client v1 JetStream,构建一条“发布 → 推送/拉取 → 死信”全流程的低延迟高可靠可回溯事件流系统。


2. 环境与依赖 🛠️

  • .NET 8
  • ABP VNext 8.x
  • NATS.Server ≥ 2.2(推荐 ≥ 2.9)
dotnet add package NATS.Client            # 核心 NATS 客户端 v1
dotnet add package NATS.Client.JetStream  # JetStream 扩展 v1
dotnet add package Volo.Abp.EventBus      # 可选,ABP 事件总线
dotnet add package AspNetCore.HealthChecks.Nats # NATS 健康检查

⚙️ 启动本地 NATS Server(JetStream 模式):

nats-server --jetstream --store_dir ./data

确保 ./data 目录具有读写权限,否则流无法持久化。


3. 系统架构 🏗️

NATS
Producer
Publish
Consumers
DeadLetterService Handler
Stream: ORDERS
BillingService Handler
AnalyticsService Handler
Stream: ORDERS_DLQ
JetStream
OrderAppService
  • Producer:同步发布,获取 PublishAck

  • Stream:按主题存储消息,支持回溯与限流 ⏳

  • Consumers

    • Push(Queue Group)模式,自动负载均衡 🔄
    • Pull(Durable)模式,可控拉取 🔧
    • Dead-letter 流,处理重试失败消息 💀

4. 配置与依赖注入 🏷️

4.1 appsettings.json

{"Nats": {"Url": "nats://localhost:4222","ConnectionName": "MyAppNats"}
}

4.2 模块注册

public class MyNatsModule : AbpModule
{public override void ConfigureServices(ServiceConfigurationContext context){var configuration = context.Services.GetConfiguration();// 📝 绑定 NatsOptions,并支持运行时刷新context.Services.Configure<NatsOptions>(configuration.GetSection("Nats"));context.Services.AddOptions<NatsOptions>().BindConfiguration("Nats").ValidateDataAnnotations();// 🔌 注入 IConnection(Singleton)context.Services.AddSingleton<IConnection>(sp =>{var opts = sp.GetRequiredService<IOptionsMonitor<NatsOptions>>().CurrentValue;var cf = new ConnectionFactory();var connOpts = ConnectionFactory.GetDefaultOptions();connOpts.Url  = opts.Url;connOpts.Name = opts.ConnectionName;connOpts.ReconnectHandler += (_, __) => Console.WriteLine("🔄 NATS reconnecting...");connOpts.ClosedHandler    += (_, __) => Console.WriteLine("🔒 NATS closed.");return cf.CreateConnection(connOpts);});// 💬 注入 JetStream 发布/订阅上下文context.Services.AddSingleton<IJetStream>(sp =>sp.GetRequiredService<IConnection>().CreateJetStreamContext());// 🛠️ 注入 JetStream 管理上下文context.Services.AddSingleton<IJetStreamManagement>(sp =>sp.GetRequiredService<IConnection>().CreateJetStreamManagementContext());// 📊 注册 NATS 健康检查context.Services.AddHealthChecks().AddNats(options =>{options.ConnectionFactory = sp =>sp.GetRequiredService<IConnection>();}, name: "nats-jetstream");}public override void OnApplicationInitialization(ApplicationInitializationContext ctx){var jsm = ctx.ServiceProvider.GetRequiredService<IJetStreamManagement>();// 1️⃣ 创建 ORDERS Stream(幂等)jsm.AddStream(new StreamConfiguration{Name         = "ORDERS",Subjects     = new[] { "orders.*" },StorageType  = StorageType.File,Retention    = RetentionPolicy.Limits,MaxMsgs      = 1_000_000,MaxConsumers = 20});// 2️⃣ Billing Push Consumer (Queue Group + DLQ)var billingCfg = ConsumerConfiguration.Builder().WithDurable("billing-durable").WithFilterSubject("orders.created").WithAckPolicy(AckPolicy.Explicit).WithMaxDeliver(5).WithBackOff(new[] { TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(30) }).WithDeliverSubject("ORDERS.DLQ.billing")  // DLQ 投递主题.Build();jsm.AddOrUpdateConsumer("ORDERS", billingCfg);// 3️⃣ Analytics Pull Consumer (回溯全部)var analyticsCfg = ConsumerConfiguration.Builder().WithDurable("analytics-durable").WithFilterSubject("orders.created").WithAckPolicy(AckPolicy.Explicit).WithDeliverPolicy(DeliverPolicy.All).Build();jsm.AddOrUpdateConsumer("ORDERS", analyticsCfg);// 4️⃣ Dead-letter Streamjsm.AddStream(new StreamConfiguration{Name        = "ORDERS_DLQ",Subjects    = new[] { "ORDERS.DLQ.*" },StorageType = StorageType.File});}public override async Task OnApplicationShutdownAsync(ApplicationShutdownContext ctx){// 🔌 优雅关闭 NATS 连接var conn = ctx.ServiceProvider.GetRequiredService<IConnection>();await conn.DrainAsync();   // 等待未完成的消息处理conn.Close();conn.Dispose();}
}

5. 发布消息 📤

public class OrderCreated
{public Guid   OrderId    { get; set; }public decimal Amount    { get; set; }public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}public class OrderAppService : ApplicationService
{private readonly IJetStream _jetStream;public OrderAppService(IJetStream jetStream) => _jetStream = jetStream;public Task CreateOrderAsync(CreateOrderInput input){// 1️⃣ 业务落库(略)// 2️⃣ 同步发布并捕获异常var evt  = new OrderCreated { OrderId = Guid.NewGuid(), Amount = input.Amount };var data = JsonSerializer.SerializeToUtf8Bytes(evt);try{_jetStream.Publish("orders.created", data);}catch (JetStreamApiException ex){Logger.LogError(ex, "❌ NATS publish failed");throw;}return Task.CompletedTask;}
}

6. 消费消息 📥

6.1 Push-Consumer(Queue Group) 🤝

public class BillingService : ITransientDependency
{public BillingService(IJetStream js){js.SubscribeAsync(subject: "orders.created",queue:   "billing-queue",msgHandler: async msg =>{try{var evt = JsonSerializer.Deserialize<OrderCreated>(msg.Data)!;await HandleAsync(evt);msg.Ack();}catch (Exception ex){Logger.LogError(ex, "⚠️ Billing handler failed");// 不 Ack → 根据 BackOff/MaxDeliver 重试或送入 DLQ}});}private Task HandleAsync(OrderCreated evt){// 账单处理逻辑return Task.CompletedTask;}
}

6.2 Pull-Consumer(可控拉取) 🔄

public class AnalyticsService : BackgroundService
{private readonly IJetStream _js;public AnalyticsService(IJetStream js) => _js = js;protected override Task ExecuteAsync(CancellationToken stoppingToken){var sub = _js.PullSubscribe("orders.created", "analytics-durable");return Task.Run(() =>{while (!stoppingToken.IsCancellationRequested){var msgs = sub.Fetch(50, TimeSpan.FromSeconds(1));foreach (var m in msgs){var evt = JsonSerializer.Deserialize<OrderCreated>(m.Data)!;// 分析写库(略)m.Ack();}}}, stoppingToken);}
}

7. 死信队列消费示例 💀

public class DeadLetterService : ITransientDependency
{public DeadLetterService(IJetStream js){js.SubscribeAsync("ORDERS.DLQ.billing", msg =>{var deadEvt = JsonSerializer.Deserialize<OrderCreated>(msg.Data)!;Logger.LogWarning("🚨 DLQ received for OrderId {OrderId}", deadEvt.OrderId);// 执行人工补偿或报警msg.Ack();});}
}

8. 集成测试示例(Testcontainers) 🧪

public class NatsJetStreamTests : IAsyncLifetime
{private NatsContainer _nats;private IConnection   _conn;private IJetStreamManagement _jsm;public async Task InitializeAsync(){// 启动 NATS 容器并开启 JetStream_nats = new TestcontainersBuilder<NatsContainer>().WithImage("nats:latest").WithJetStream(true).WithPortBinding(4222, 4222).WithWaitStrategy(Wait.ForUnixContainer().UntilPortIsAvailable(4222)).Build();await _nats.StartAsync();_conn = new ConnectionFactory().CreateConnection($"nats://localhost:4222");_jsm = _conn.CreateJetStreamManagementContext();// 幂等创建测试 Stream_jsm.AddStream(new StreamConfiguration{Name     = "TEST",Subjects = new[] { "test.*" }});}public async Task DisposeAsync(){await _conn.DrainAsync();_conn.Close();_nats.Dispose();}[Fact]public void PublishAndConsume_Test(){var js = _conn.CreateJetStreamContext();js.Publish("test.foo", Encoding.UTF8.GetBytes("hello"));var sub  = js.PullSubscribe("test.foo", "durable");var msgs = sub.Fetch(1, TimeSpan.FromSeconds(1));Assert.Single(msgs);Assert.Equal("hello", Encoding.UTF8.GetString(msgs[0].Data));msgs[0].Ack();}
}

9. 性能测试与对比 📊

[SimpleJob(RuntimeMoniker.NetCoreApp80)]
public class NatsBenchmark
{private IJetStream _js;[GlobalSetup]public void Setup(){var conn = new ConnectionFactory().CreateConnection("nats://localhost:4222");_js = conn.CreateJetStreamContext();}[Benchmark(Description = "Publish 100k messages sync")]public void Publish100k(){var data = new byte[256];for (int i = 0; i < 100_000; i++){_js.Publish("orders.created", data);}}
}
测试环境平均延迟 (ms)吞吐 (msg/s)
NATS JetStream(单节点,2 核 4GB)2.148 000
RabbitMQ(同配置)6.516 000
Kafka(同配置)4.035 000

说明:以上数据为本地单节点测试,仅供参考,实际场景请根据硬件/网络配置自行 Benchmark。


10. 实践与注意事项 💡

  • 客户端库统一:统一使用 NATS.Client v1,避免 v2 API 混用
  • 错误处理Publish 捕获 JetStreamApiException,管理操作捕获 JetStreamApiExceptionIOException
  • 资源管理await conn.DrainAsync()Close()Dispose()
  • 管理 API 异步化:可使用 AddStreamAsync / CreateOrUpdateConsumerAsync 优化启动性能
  • 队列组:Push 模式下使用 Queue Group 实现水平扩缩容
  • 消息幂等:基于 OrderId 或业务唯一键去重
  • 监控与回溯:定期调用 jsm.StreamInfojsm.GetConsumerInfo 上报 Prometheus/Grafana
  • 性能数据声明:附上测试环境说明,避免误导

http://www.xdnf.cn/news/16963.html

相关文章:

  • 【智能体cooragent】不同的单智能体调用的大模型的推理的输入与输出
  • flutter分享到支付宝
  • 模拟激光相机工作站版本6.0 5.2.32 6.0.44 6.031 5.2.20
  • LeetCode 每日一题 2025/7/28-2025/8/3
  • gcc-arm-none-eabi安装后,找不到libgcc.a的拉置
  • Java基础暑假每日一练
  • 集成电路学习:什么是CMSIS微控制器软件接口标准
  • Json Jsoncpp
  • sqli-labs:Less-20关卡详细解析
  • Gossip 协议
  • 用 Qt 打造优雅的密码输入框:添加右侧眼睛图标切换显示
  • 关于Web前端安全防御之点击劫持的原理及防御措施
  • OpenCV HSV与RGB颜色模型的区别
  • Elasticsearch+Logstash+Filebeat+Kibana单机部署
  • 论文笔记:Bundle Recommendation and Generation with Graph Neural Networks
  • OpenCV 全解读:核心、源码结构与图像/视频渲染能力深度对比
  • 电力系统分析笔记:发电机与变压器的数学建模与运行状态详解
  • 图漾AGV行业常用相机使用文档
  • Unity —— Android 应用构建与发布​
  • 边缘计算优化!陌讯轻量化模型实现路面裂缝误检率↓78%
  • Java函数式编程之【Stream终止操作】【中】【通用约简reduce】
  • 机器学习sklearn:聚类
  • Python编程基础与实践:Python函数编程入门
  • 通过解决docker network connect实现同一个宿主机不同网络的容器间通信
  • Flutter dart运算符
  • synchronized 深度剖析:从语法到锁升级的完整演进
  • 第13届蓝桥杯Python青少组中/高级组选拔赛(STEMA)2022年1月22日真题
  • shell脚本的语法使用及例题
  • Java函数式编程之【Stream终止操作】【下】【三】【收集操作collect()与分组分区】【下游收集器】
  • 一个可以检测本机的字节顺序,并对任意数据进行字节顺序的反转操作的代码。