当前位置: 首页 > ds >正文

使用reactor-rabbitmq库监听Rabbitmq

文章目录

      • Reactor RabbitMQ 简介
      • Reactor RabbitMQ核心特性
      • 使用方法
        • 添加依赖
        • 创建连接
        • 发送消息
        • 接收消息
      • 高级配置
        • 消息确认模式
        • 错误处理
        • 集群监听(自动ACK)
        • 集群监听手动ACK
      • 性能优化建议
      • 适用场景

Reactor RabbitMQ 简介

Reactor RabbitMQ 是 Reactor 项目的一部分,旨在提供基于 Project Reactor 的 RabbitMQ 响应式编程支持。它将 RabbitMQ 的消息队列功能与 Reactor 的非阻塞、背压友好特性结合,适用于高吞吐量的异步消息处理场景。
注意: Reactor RabbitMQ 是对原生amqp-client 的封装,同样性能强大简单易用。reactor-rabbitmq是spring-boot-starter-amqp 之外的另外一种选择。

维度AMQP-ClientReactor RabbitMQSpring Boot Starter AMQP
编程模型命令式、手动管理响应式、非阻塞声明式、自动配置
框架依赖ReactorSpring Boot
适用场景轻量级/非 Spring 项目响应式微服务Spring Boot 企业应用
资源管理手动自动自动
功能丰富度基础协议操作背压、高并发优化事务、确认、死信队列等
学习曲线中等(需理解 AMQP)高(需掌握 Reactor)低(Spring 生态友好)

Reactor RabbitMQ核心特性

  • 响应式流支持:基于 Reactor 的 FluxMono 实现消息的发布与订阅。
  • 背压管理:自动处理消费者与生产者之间的速率匹配。
  • 非阻塞 API:避免传统 RabbitMQ 客户端的线程阻塞问题。
  • 声明式配置:支持通过代码或配置文件定义队列、交换机和绑定。

使用方法

添加依赖

在 Maven 项目中添加以下依赖:

<dependency>  <groupId>io.projectreactor.rabbitmq</groupId>  <artifactId>reactor-rabbitmq</artifactId>  <version>1.5.6</version>  
</dependency>  
创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();  
connectionFactory.setHost("localhost");  
Sender sender = RabbitFlux.createSender(  Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
Receiver receiver = RabbitFlux.createReceiver(  Mono.fromCallable(() -> connectionFactory.newConnection())  
);  
发送消息
sender.send(  Flux.just(new OutboundMessage(  "exchange-name",  "routing-key",  "Hello RabbitMQ".getBytes()  ))  
).subscribe();  
接收消息
receiver.consumeAutoAck("queue-name")  .map(delivery -> new String(delivery.getBody()))  .subscribe(System.out::println);  

高级配置

消息确认模式

支持自动确认(autoAck)和手动确认(manualAck):

receiver.consumeManualAck("queue-name")  .delayUntil(delivery ->  delivery.ack()  .thenReturn(delivery.getBody())  )  .subscribe();  
错误处理

通过 Reactor 的 onError 机制处理异常:

sender.send(messages)  .doOnError(e -> System.err.println("Send failed: " + e))  .retry(3)  .subscribe();  

集群监听(自动ACK)
 // 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 监听队列(自动负载均衡)receiver.consumeAutoAck("queue1") // 队列名(需在集群中预先创建).subscribe(delivery -> {String message = new String(delivery.getBody());System.out.println("收到消息: " + message);},error -> System.err.println("监听错误: " + error));// 保持程序运行Mono.never().block();
集群监听手动ACK
// 1. 配置集群连接ReceiverOptions receiverOptions = new ReceiverOptions().connectionFactory(new ConnectionFactory() {{setUsername("guest");setPassword("guest");}}).connectionSupplier(cf -> cf.newConnection(new Address[]{new Address("localhost", 5672),new Address("localhost", 5673),new Address("localhost", 5674)},"reactive-cluster"));// 2. 创建 ReceiverReceiver receiver = RabbitFlux.createReceiver(receiverOptions);// 消费消息并手动ACKreceiver.consumeManualAck("queue1").flatMap(delivery -> {try {String message = new String(delivery.getBody());log.info("received message:" + message);// 业务逻辑处理...boolean success = false;int i = RandomUtil.randomInt();if (i % 2 == 0) {success = true;}if (success) {log.info("ack success");// 处理成功,手动ACKreturn Mono.fromRunnable(() -> delivery.ack()).thenReturn("ACK");} else {log.info("ack fail");// 处理失败,手动NACK(可选择重试或丢弃)return Mono.fromRunnable(() -> delivery.nack(true)) // false表示不重新入队.thenReturn("NACK");}} catch (Exception e) {// 异常情况,NACK并可选择重试delivery.nack(true); // true表示重新入队return Mono.error(e);}}).subscribe(result -> log.info("Message processed:" + result),error -> log.info("Error:" + error));// 保持程序运行Mono.never().block();

性能优化建议

  • 连接复用:避免频繁创建/关闭连接,使用 Mono 缓存连接。
  • 批量发送:通过 Flux.buffer() 合并多条消息后一次性发送。
  • 线程池调优:自定义 Scheduler 以匹配业务场景的并发需求。

适用场景

  • 微服务间的异步通信。
  • 事件驱动的数据处理流水线。
  • 需要高吞吐量和低延迟的消息系统。

如需进一步功能(如事务、RPC 模式),可参考官方文档或源码示例。

http://www.xdnf.cn/news/14991.html

相关文章:

  • Python 量化交易安装使用教程
  • opencv的颜色通道问题 rgb bgr
  • 如何查看自己电脑的CUDA版本?
  • 【深度解析】Seedance 1.0:重新定义 AI 视频生成的工业级标准
  • 《Java修仙传:从凡胎到码帝》第三章:缩进之劫与函数峰试炼
  • python脚本编程:使用BeautifulSoup爬虫库获取热门单机游戏排行榜
  • PHP从字符串到数值的类型转换
  • 三、jenkins使用tomcat部署项目
  • 服务器间接口安全问题的全面分析
  • 模拟热血三国内城安置建筑物
  • 【wps】 excel 删除重复项
  • 【Spring Boot】HikariCP 连接池 YAML 配置详解
  • Tomcat镜像实战:掌握Dockerfile的编写以及发布项目
  • day47-tomcat
  • 《Spring 中上下文传递的那些事儿》Part 4:分布式链路追踪 —— Sleuth + Zipkin 实践
  • Python 闭包(Closure)实战总结
  • 【PyCharm 2025.1.2配置debug】
  • 分类树查询性能优化:从 2 秒到 0.1 秒的技术蜕变之路
  • 低代码实战训练营教学大纲 (10天)
  • [特殊字符] 电子机械制动(EMB)产业全景分析:从技术演进到千亿市场爆发
  • 网络编程学习路线图
  • Python 爬虫实战 | 国家医保
  • OpenBayes 教程上新丨医疗VLM新突破!HealthGPT对复杂MRI模态理解准确率达99.7%,单一模型可处理多类生成任务
  • 一天两道力扣(1)
  • 高效打字辅助工具,解决符号输入难题
  • 使用pdf box去水印
  • Part 0:射影几何,变换与估计-第三章:3D射影几何与变换
  • 分享|大数据分析师职业技术证书报考指南
  • 推荐系统中如果有一个上古精排模型,后续如何优化?
  • 遇到该问题:kex_exchange_identification: read: Connection reset`的解决办法