当前位置: 首页 > web >正文

消息队列:Redis Stream到RabbitMQ的转换

文章目录

  • 消息队列:Redis Stream到RabbitMQ的转换
    • Redis Stream
    • RabbitMQ
      • 第一步 安装Erlang
      • 第二步 安装RabbitMQ
      • 第三步 引入RabbitMQ依赖
      • 第四步 配置RabbitMQ
      • 第五步 定义RabbitMQ配置类
      • 第六步 发送消息(原来的 Redis Stream 写入逻辑替换)
      • 第七步 消费消息(代替 Redis Stream 轮询线程)
      • 第八步 消息序列化配置
    • 迁移后的优势
    • 注意事项

消息队列:Redis Stream到RabbitMQ的转换

消息队列是一个使用队列来进行通信的组件,可以达到模块间的解耦、异步和削峰的功能。使用消息队列的时候我们最重要的就是要保证这个消息队列不会丢失消息,也就是可靠性问题。

Redis Stream

Redis是一个NoSQL数据库,因为它是将数据保存在内存中的,速度很快,所以常用来做缓存。在新版本的Redis中,推出了一个Stream数据结构用来做消息队列。提供了消费者组来消费一个队列,也有pending-list和ack确认机制来保证消息不丢失,也实现了持久化来预防redis宕机导致的消息丢失。

Redis Stream的特点:

  • 优点:轻量级、低延迟、与Redis生态集成良好
  • 缺点:需要应用层轮询消费,没有原生的事件驱动机制

在我们的项目中,使用轮询阻塞读取方式消费消息:

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstruct
private void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有消息,继续下一次循环continue;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2.判断订单信息是否为空if (list == null || list.isEmpty()) {// 如果为null,说明没有异常消息,结束循环break;}// 解析数据MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);}}}
}

这种轮询方式的问题是通过while循环不断使用XREADGROUP指令来读队列中的消息,会消耗一定的CPU资源,没有使用事件驱动的监听+回调机制。

RabbitMQ

RabbitMQ是一个专业的消息队列中间件,支持多种消息模式,具有更完善的可靠性保证和监控能力。我们使用RabbitMQ来替换Redis Stream作为项目中要使用到的消息队列。

项目为Spring Boot单体架构项目。

第一步 安装Erlang

RabbitMQ是使用Erlang语言编写的消息队列组件,它本身的安装包里没有Erlang环境,所以我们要先安装Erlang。

https://erlang.org/download/otp_versions_tree.html

进入网页找到最新的版本,下载后双击安装即可。

第二步 安装RabbitMQ

https://www.rabbitmq.com/install-windows.html

进入下载地址,找到对应版本的文件点击下载,然后进行安装。

安装完成后,开启RabbitMQ服务(RabbitMQ Service - start)。然后进入命令行窗口(RabbitMQ Command Prompt),输入 rabbitmqctl.bat status 可确认 RabbitMQ 的启动状态。

也可以在命令行中输入 rabbitmq-plugins enable rabbitmq_management 来开启Web管理界面,之后在浏览器中输入 http://localhost:15672/,就可以进入管理端页面。

第三步 引入RabbitMQ依赖

引入Spring为AMQP协议的消息中间件支持的框架。SpringBoot会自动配置连接到RabbitMQ,提供消息监听容器,提供RabbitTemplate工具类。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第四步 配置RabbitMQ

application.yml 中添加:

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest# 开启手动ack模式listener:simple:acknowledge-mode: manual# 设置消费者数量concurrency: 1max-concurrency: 3# 关闭Spring层面的自动重试,我们手动控制retry:enabled: false# 设置预取数量,避免消息积压prefetch: 1

第五步 定义RabbitMQ配置类

让Spring在启动的时候配置好要使用的RabbitMQ,包括普通队列、死信队列等。

@Configuration
@Slf4j
public class RabbitMQConfig {// 普通队列配置public static final String ORDER_QUEUE = "order.queue";public static final String ORDER_EXCHANGE = "order.exchange";public static final String ORDER_ROUTING_KEY = "order.routing";// 死信队列配置public static final String ORDER_DLX_QUEUE = "order.dlx.queue";public static final String ORDER_DLX_EXCHANGE = "order.dlx.exchange";public static final String ORDER_DLX_ROUTING_KEY = "order.dlx.routing";// 普通队列(绑定死信交换机)@Beanpublic Queue orderQueue() {return QueueBuilder.durable(ORDER_QUEUE).withArgument("x-dead-letter-exchange", ORDER_DLX_EXCHANGE).withArgument("x-dead-letter-routing-key", ORDER_DLX_ROUTING_KEY).withArgument("x-message-ttl", 1800000) // 30分钟TTL.build();}// 普通交换机@Beanpublic DirectExchange orderExchange() {return new DirectExchange(ORDER_EXCHANGE);}// 普通队列绑定@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);}// 死信队列@Beanpublic Queue orderDlxQueue() {return QueueBuilder.durable(ORDER_DLX_QUEUE).build();}// 死信交换机@Beanpublic DirectExchange orderDlxExchange() {return new DirectExchange(ORDER_DLX_EXCHANGE);}// 死信队列绑定@Beanpublic Binding orderDlxBinding() {return BindingBuilder.bind(orderDlxQueue()).to(orderDlxExchange()).with(ORDER_DLX_ROUTING_KEY);}
}

第六步 发送消息(原来的 Redis Stream 写入逻辑替换)

seckillVoucher() 中将 Redis Stream 写入替换成 RabbitMQ 的消息发送:

@Resource
private RabbitTemplate rabbitTemplate;@Override
public Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));int r = result.intValue();if (r != 0) {return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}// 构造订单对象VoucherOrder order = new VoucherOrder();order.setId(orderId);order.setUserId(userId);order.setVoucherId(voucherId);try {// 发送消息到 RabbitMQrabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE,RabbitMQConfig.ORDER_ROUTING_KEY,order, // 直接发送对象,让Spring自动序列化message -> {// 设置消息属性message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);message.getMessageProperties().setExpiration("1800000"); // 30分钟过期return message;});log.info("订单消息发送成功,orderId: {}", orderId);} catch (Exception e) {log.error("订单消息发送失败,orderId: {}", orderId, e);return Result.fail("系统繁忙,请稍后重试");}return Result.ok(orderId);
}

第七步 消费消息(代替 Redis Stream 轮询线程)

创建消息消费者类,支持手动ack和重试机制:

@Component
@Slf4j
public class VoucherOrderMQListener {@Resourceprivate IVoucherOrderService voucherOrderService;@Resourceprivate RedisTemplate<String, Object> redisTemplate;private static final String RETRY_COUNT_KEY = "order:retry:count:";private static final int MAX_RETRY_COUNT = 3;@RabbitListener(queues = RabbitMQConfig.ORDER_QUEUE)public void handleVoucherOrder(VoucherOrder order,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {String retryCountKey = RETRY_COUNT_KEY + order.getId();try {// 业务处理voucherOrderService.createVoucherOrder(order);// 处理成功,删除重试计数redisTemplate.delete(retryCountKey);// 手动确认消息channel.basicAck(deliveryTag, false);log.info("订单处理成功,orderId: {}", order.getId());} catch (Exception e) {log.error("处理订单消息失败,orderId: {}", order.getId(), e);try {// 获取当前重试次数Integer retryCount = (Integer) redisTemplate.opsForValue().get(retryCountKey);if (retryCount == null) {retryCount = 0;}retryCount++;if (retryCount <= MAX_RETRY_COUNT) {// 还没达到最大重试次数,记录重试次数并重新入队redisTemplate.opsForValue().set(retryCountKey, retryCount, Duration.ofHours(1));log.warn("订单处理失败,第{}次重试,orderId: {}", retryCount, order.getId());channel.basicReject(deliveryTag, true); // 重新入队} else {// 达到最大重试次数,拒绝消息进入死信队列redisTemplate.delete(retryCountKey);log.error("订单处理失败,达到最大重试次数,进入死信队列,orderId: {}", order.getId());channel.basicReject(deliveryTag, false); // 不重新入队,进入死信队列}} catch (IOException ioException) {log.error("消息确认失败", ioException);}}}@RabbitListener(queues = RabbitMQConfig.ORDER_DLX_QUEUE)public void handleDeadLetterOrder(VoucherOrder order,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {log.error("收到死信消息,orderId: {}", order.getId());try {// 记录失败的订单信息recordFailedOrder(order);// 确认死信消息channel.basicAck(deliveryTag, false);log.info("死信消息处理完成,orderId: {}", order.getId());} catch (Exception e) {log.error("死信消息处理异常,orderId: {}", order.getId(), e);try {channel.basicAck(deliveryTag, false);} catch (IOException ioException) {log.error("死信消息确认失败", ioException);}}}private void recordFailedOrder(VoucherOrder order) {log.error("订单处理最终失败,需要人工介入,订单信息: {}", order);// 这里可以写入失败订单表、发送告警等}
}

第八步 消息序列化配置

为了更好的性能和兼容性,配置消息序列化方式:

@Configuration
public class RabbitMQSerializationConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);// 设置JSON序列化template.setMessageConverter(new Jackson2JsonMessageConverter());// 设置发送确认template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.debug("消息发送成功");} else {log.error("消息发送失败: {}", cause);}});// 设置返回确认template.setReturnsCallback(returned -> {log.error("消息被退回: {}", returned.getMessage());});return template;}
}

迁移后的优势

  1. 事件驱动:使用@RabbitListener注解将方法注册到Spring AMQP提供的消息监听容器中,避免了Redis Stream中的while(true)轮询消费
  2. 自动重试:RabbitMQ支持配置化的自动重试机制,失败的消息会自动重新投递
  3. 死信处理:多次重试失败的消息会进入死信队列,可以进行人工干预或特殊处理
  4. 更好的监控:RabbitMQ提供了丰富的管理界面和监控指标
  5. 消息持久化:更完善的消息持久化机制,保证服务重启后消息不丢失

注意事项

  1. 幂等性:消费者需要保证幂等性,避免重复消费造成的问题

有时会因为网络波动导致消费者传给队列的ack消息丢失或者一直没到达,队列就认为这个消息没被消费导致再次传给消费者,这就会出现消息被重复消费的情况。可以使用全局唯一ID+消息幂等性存储。在生产者生产消息的时候为每个消息都生成一个全局唯一ID,然后在消费端每次消费这个消息前都判断一下这个消息是否被消费了,如果被消费了就不会再消费,如果没有消费则会继续消费。

  1. 消息顺序:如果需要保证消息顺序,需要使用单一队列和单一消费者
  2. 资源监控:注意监控队列长度、消费速率等指标,避免消息积压
  3. 错误处理:合理设计重试次数和死信处理逻辑,避免无限重试
http://www.xdnf.cn/news/14816.html

相关文章:

  • MongoDB06 - MongoDB 地理空间
  • PyQt5—QPushButton 功能 API 学习笔记
  • Zynq7020 Linux更新启动分区文件导致文件大小为0的处理方式
  • 力扣第84题-柱状图中最大的矩形
  • Webpack中的Loader详解
  • 用户行为序列建模(篇六)-【阿里】DSIN
  • 实战篇----利用 LangChain 和 BERT 用于命名实体识别-----完整代码
  • flask使用-链接mongoDB
  • Python爬虫-爬取汽车之家全部汽车品牌及车型数据
  • ListExtension 扩展方法增加 转DataTable()方法
  • Lua现学现卖
  • DOP数据开放平台(真实线上项目)
  • 电商返利APP架构设计:如何基于Spring Cloud构建高并发佣金结算系统
  • OpenLayers 下载地图切片
  • 解决cursor无法下载插件等网络问题
  • vue-29(创建 Nuxt.js 项目)
  • 从用户到权限:解密 AWS IAM Identity Center 的授权之道
  • 给定一个没有重复元素的数组,写出生成这个数组的MaxTree的函数
  • TDengine 如何使用 MQTT 采集数据?
  • lambda、function基础/响应式编程基础
  • [论文阅读] 软件工程 | 微前端在电商领域的实践:一项案例研究的深度解析
  • NLP中的同义词替换及我踩的坑
  • 创客匠人视角:创始人 IP 打造为何成为知识变现的核心竞争力
  • 【算法深练】单调栈:有序入栈,及时删除垃圾数据
  • 鸿蒙5:组件监听和部分状态管理V2
  • 为何需要防爆平板?它究竟有何能耐?
  • 【龙泽科技】新能源汽车故障诊断仿真教学软件【吉利几何G6】
  • 学习使用dotnet-dump工具分析.net内存转储文件(2)
  • vue-28(服务器端渲染(SSR)简介及其优势)
  • 舵机在不同类型机器人中的应用