当前位置: 首页 > backend >正文

Rocket客户端消息确认机制

消息确认机制

RocketMQ要⽀持互联⽹⾦融场景,那么消息安全是必须优先保障的。⽽消息安全有两⽅⾯的要求,⼀⽅⾯是⽣产者要能确保将消息发送到
Broker上。另⼀⽅⾯是消费者要能确保从Broker上争取获取到消息。

  1. 消息⽣产端采⽤消息确认加多次重试的机制保证消息正常发送到RocketMQ
  • 第⼀种称为单向发送
    单向发送⽅式下,消息⽣产者只管往Broker发送消息,⽽全然不关⼼Broker端有没有成功接收到消息。
  • 第⼆种称为同步发送
    同步发送⽅式下,消息⽣产者在往Broker端发送消息后,会阻塞当前线程,等待Broker端的相应结果。
  • 第三种称为异步发送
    异步发送机制下,⽣产者在向Broker发送消息时,会同时注册⼀个回调函数。接下来⽣产者并不等待Broker的响应。当Broker端有响应数据过来时,⾃动触发回调函数进⾏对应的处理。
  1. 消息消费者端采⽤状态确认机制保证消费者⼀定能正常处理对应的消息
    换成了Broker等待消费者返回消息处理状态。
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}})

这个返回值是⼀个枚举值,有两个选项 CONSUME_SUCCESSRECONSUME_LATER。如果消费者返回CONSUME_SUCCESS,那么消息⾃然就处理结束了。但是如果消费者没有处理成功,返回的是RECONSUME_LATER,Broker就会过⼀段时间再发起消息重试。

为了要兼容重试机制的成功率和性能,RocketMQ设计了⼀套⾮常完善的消息重试机制,从⽽尽可能保证消费者能够正常处理⽤户的订单信息。

  • Broker不可能⽆限制的向消费失败的消费者推送消息。如果消费者⼀直没有恢复,Broker显然不可能⼀直⽆限制的推送,这会浪费集群很多的性能。所以,Broker会记录每⼀个消息的重试次数。如果⼀个消息经过很多次重试后,消费者依然⽆法正常处理,那么Broker会将这个消息推⼊到消费者组对应的死信Topic中。死信Topic相当于windows当中的垃圾桶。你可以⼈⼯介⼊对死信Topic中的消息进⾏补救,也可以直接彻底删除这些消息。RocketMQ默认的最⼤重试次数是16次
  • 为了让这些重试的消息不会影响Topic下其他正常的消息,Broker会给每个消费者组设计对应的重试Topic。MessageQueue是⼀个具有严格FIFO特性的数据结构。如果需要重试的这些消息还是放在原来的MessageQueue中,就会对当前MessageQueue产⽣阻塞,让其他正常的消息⽆法处理。RocketMQ的做法是给每个消费者组⾃动⽣成⼀个对应的重试Topic。在消息需要重试时,会先移动到对应的重试Topic中。后续Broker只要从这些重试Topic中不断拿出消息,往消费者组重新推送即可。这样,这些重试的消息有了⾃⼰单独的队列,就不会影响到Topic下的其他消息了。
  • RocketMQ中设定的消费者组都是订阅主题和消费逻辑相同的服务备份,所以当消息重试时,Broker只要往消费者组中随意⼀个实例推送即可。这是消息重试机制能够正常运⾏的基础。但是,在客户端的具体实现时,MQDefaultMQConsumer并没有强制规定消费者组不能重复。也就是说,你完全可以实现出⼀些订阅主题和消费逻辑完全不同的消费者服务,共同组成⼀个消费组。在这种情况下,RocketMQ不会报错,但是消息的处理逻辑就⽆法保持⼀致了。这会给业务带来很⼤的麻烦。这是在实际应⽤时需要注意的地⽅。
  • Broker端最终只通过消费者组返回的状态来确定消息有没有处理成功。⾄于消费者组⾃⼰的业务执⾏是否正常,Broker端是没有办法知道的。因此,在实现消费者的业务逻辑时,应该要尽量使⽤同步实现⽅式,保证在⾃⼰业务处理完成之后再向Broker端返回状态。⽽应该尽量避免异步的⽅式处理业务逻辑。
  1. 消费者组也可以⾃⾏指定起始消费位点
    Broker端通过Consumer返回的状态来推进所属消费者组对应的Offset。但是,这⾥还是会造成⼀种分裂,消息最终是由Consumer来处理,但是消息却是由Broker推送过来的

使⽤消息队列要如何解决这样的问题呢?这时,就可以创建另外⼀个新的消费者组,并通过ConsumerFromWhere属性指定这个消费者组的消费起点,从⽽让这个新的消费者组去消费之前发送过的历史消息。⽽这个ConsumerFromWhere属性并不是直接指定Offset的数值,因为客户端也不知道Broker端记录的Offset数值是多少。RocketMQ就提供了⼀个枚举值。

public enum ConsumeFromWhere {CONSUME_FROM_LAST_OFFSET, //从对列的最后⼀条消息开始消费 
CONSUME_FROM_FIRST_OFFSET, //从对列的第⼀条消息开始消费 
CONSUME_FROM_TIMESTAMP; //从某⼀个时间点开始重新消费 
}
http://www.xdnf.cn/news/12027.html

相关文章:

  • JAVA:String类详解
  • 华为云Flexus+DeepSeek征文|Flexus云服务器单机部署+CCE容器高可用部署快速搭建生产级的生成式AI应用
  • (LeetCode 每日一题)3403. 从盒子中找出字典序最大的字符串 I (贪心+枚举)
  • 12.3Swing控件1
  • Java并发编程实战 Day 7:并发集合类详解
  • Docker轻松搭建Neo4j+APOC环境
  • 《PyTorch Hub:解锁深度学习模型的百宝箱》
  • 物流数据接口新玩法:跨境订单处理效率提升200%
  • echarts显示/隐藏标签的同时,始终显示饼图中间文字
  • 简数采集技巧之快速获取特殊链接网址URL方法
  • Playwright 测试框架 - Python
  • 软件工程专业的本科生应该具备哪些技能
  • 【Bluedroid】蓝牙启动之gatt_init 流程源码解析
  • DrissionPage爬虫包实战分享
  • 汽车加气站操作工证考试重点
  • 文献阅读|基于PSMA PET/CT与mpMRI多模态深度学习预测前列腺癌的不良病变
  • Spring AI 之工具调用
  • cpp多线程学习
  • 无人机光纤FC接口模块技术分析
  • [华为eNSP] 在eNSP上实现IPv4地址以及IPv4静态路由的配置
  • 融智学的数学基础,通过微分几何的纤维丛结构,构建理论框架模型包含生物层、动物层、心智层、人造物层和人格层五个维度
  • C++算法训练营 Day7 哈希表及双指针
  • 聊聊FlaUI:让Windows UI自动化测试优雅起飞!
  • Deepin 安装 Nginx
  • (eNSP)配置WDS手拉手业务
  • .NET 生态中的 MCP 项目及技术分析
  • 那些Java 线程中断的实现方式
  • 单锁与分布式锁
  • AI工程师的武器库:核心技术与实战案例
  • MCP与检索增强生成(RAG):AI应用的强大组合