当前位置: 首页 > news >正文

中间件-RocketMQ

RocketMQ

  • 基本架构
  • 消息模型
  • 消费者消费消息模式
  • 顺序消息机制
  • 延迟消息
  • 批量消息
  • 事务消息
  • 消息重试
  • 最佳实践

基本架构

在这里插入图片描述

nameServer: 维护broker列表信息,客户端连接时只需要连接nameServer。可配置成集群。
broker:broker分为master和slave,master负责消息的发送和消费,slave是master的备份。master-slaver的集群方式,master挂掉时候slave不能主动转换为master提供服务(5.X版本后可以通过配置实现mater挂掉后slave转为master提供服务)。
leader-follower的集群方式,即高可用集群,各个broker是对等的,通过选举产生leader(在dashboart中显示为master),如果leader挂掉,在剩下的follower(显示为slave)中选举再产生新的leader。注意,只有超过半数的几点存活,才能选举出leader。

消息模型

在这里插入图片描述
⽣产者和消费者都可以指定⼀个Topic发送消息或者拉取消息。⽽Topic是⼀个逻辑概念。
Topic中的消息会分布在后⾯多个MessageQueue当中。这些MessageQueue会分布到⼀个或者多个broker中。

消费者消费消息模式

广播模式:所有关注topic的消费者都收到消息。广播模式下消息队列的消费位点由客户端自己维护,消费失败服务端不会重发。
集群模式:同一个消费者组只有一个成员收到消息。集群模式下消费点位由服务端维护,消费者组的所有成员共用一个位点,消费失败服务端会重发。

顺序消息机制

  1. ⽣产者只有将⼀批有顺序要求的消息,放到同⼀个MesasgeQueue上,通过MessageQueue的FIFO特性保证这⼀批消息的顺序。如果不指定MessageSelector对象,
    那么⽣产者会采⽤轮询的⽅式将多条消息依次发送到不同的MessageQueue上。
  2. 消费者需要实现MessageListenerOrderly接⼝,实际上在服务端,处理MessageListenerOrderly时,会给⼀个MessageQueue加锁,拿到MessageQueue上所有的消息,然后再去读取下⼀个MessageQueue的消息。
  3. 消费消息失败时,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。因为消费者端只进⾏有限次数的重试。如果⼀条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进⾏重试。但是,如果消费者⼀直处理失败,超过最⼤重试次数,那么RocketMQ就会跳过这⼀条消息,处理后⾯的消息,这会造成消息乱序。

延迟消息

  1. 定固定的延迟级别:对于指定固定延迟级别的延迟消息,RocketMQ的实现⽅式是预设⼀个系统Topic,名字叫做SCHEDULE_TOPIC_XXXXX。在这个Topic下,预设了18个MessageQueue。这⾥每个对列就对应了⼀种延迟级别。然后每次扫描这18个队列⾥的消息,进⾏延迟操作就可以了。
  2. 指定消息发送时间:RocketMQ是通过时间轮算法实现。

批量消息

⽣产者要发送的消息⽐较多时,可以将多条消息合并成⼀个批量消息,⼀次性发送出去。这样可以减少⽹络IO,提升消息发送的吞吐量。同⼀批消息的Topic必须相同,另外,不⽀持延迟消息。还有批量消息的⼤⼩不要超过1M,如果太⼤就需要⾃⾏分割。

事务消息

在这里插入图片描述

  1. ⽣产者将消息发送⾄ApacheRocketMQ服务端。
  2. ApacheRocketMQ服务端将消息持久化成功之后,向⽣产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. ⽣产者开始执⾏本地事务逻辑。
  4. ⽣产者根据本地事务执⾏结果向服务端提交⼆次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:⼆次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。⼆次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断⽹或者是⽣产者应⽤重启的特殊情况下,若服务端未收到发送者提交的⼆次确认结果,或服务端收到的⼆次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息⽣产者即⽣产者集群中任⼀⽣产者实例发起消息回查。
  6. ⽣产者收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
  7. ⽣产者根据检查到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤4对半事务消息进⾏处理。

消息重试

RocketMQ的消费者端,如果处理消息失败了,Broker是会将消息重新进⾏投送的。⽽在重试时,RocketMQ实际上会为每个消费者组创建⼀个对应的重试队列。重试的消息会进⼊⼀个“%RETRY%”+ConsumeGroup的队列中。
RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:
在这里插入图片描述
如果消息重试16次后仍然失败,消息将不再投递,转为进⼊死信队列。重试次数可以通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后,消息的重试时间间隔均为2⼩时。
如果消息超过最⼤重试次数,RocketMQ会将消息发送到死信队列。⼀个死信队列对应⼀个消费组。死信队列的默认权限为2(禁读)。如果需要处理死信队列的消息,需要把权限修改为6(可读可写后)消费该Topic的消息进行处理。队列中超过有效期(默认3天)的消息会被删除,不管有没有消费。

最佳实践

  1. ⼀个应⽤尽可能⽤⼀个Topic,⽽消息⼦类型则可以⽤tags来标识。tags过滤消息的性能很高,相当于索引。
  2. 消费端幂等控制:RocketMQ的每条消息都有⼀个唯⼀的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以⽤这个MessageId来作为判断幂等的关键依据。但是,这个MessageId是⽆法保证全局唯⼀的,也会有冲突的情况。所以在⼀些对幂等性要求严格的场景,最好是使⽤业务上唯⼀的⼀个标识⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进⾏传递。
http://www.xdnf.cn/news/315973.html

相关文章:

  • k8s | Kubernetes 服务暴露:NodePort、Ingress 与 YAML 配置详解
  • 【代码优化篇】强缓存和协商缓存
  • ABP-Book Store Application中文讲解 - 前期准备 - Part 2:创建Acme.BookStore + Angular
  • 【ArcGIS Pro微课1000例】0068:Pro原来可以制作演示文稿(PPT)
  • 理解与清理 Docker 中的悬空镜像(Dangling Images)
  • 8.12 GitHub Sentinel企业级进化:容器化优化×AI监控,效率提升300%实战
  • HarmonyOS运动开发:如何集成百度地图SDK、运动跟随与运动公里数记录
  • 实践004-Gitlab CICD部署应用
  • 小刚说C语言刷题—1331 做彩纸花边
  • 五、Hadoop集群部署:从零搭建三节点Hadoop环境(保姆级教程)
  • Spark和Hadoop之间的联系
  • JDK Version Manager (JVMS)
  • 【论文阅读】在调制分类中针对对抗性攻击的混合训练时和运行时防御
  • Web 架构之动静分离:原理、实践与优化
  • WHAT - Rust 智能指针
  • 【PostgreSQL】数据库主从库备份与高可用部署
  • 探索智能体开发新边界:Cangjie Magic开源平台体验与解析
  • maven基本介绍
  • Nginx+Lua+Redis实现灰度发布
  • spring4.x详解介绍
  • 一个电平转换电路导致MCU/FPGA通讯波形失真的原因分析
  • Go语言八股之channel详解
  • LeetCode 热题 100 64. 最小路径和
  • 明远智睿SD2351核心板:工业AIoT时代的创新引擎
  • 大数据、物联网(IoT)、平台架构与设计重构大模型应用
  • 轻松管理房间预约——启辰智慧预约小程序端使用教程
  • 软考 系统架构设计师系列知识点 —— 黑盒测试与白盒测试(2)
  • Linux中的`export` 设置的环境变量是临时的吗?如何永久生效?
  • 使用 AI 如何高效解析视频内容?生成思维导图或分时段概括总结
  • AI驱动的Kubernetes管理:kubectl-ai 如何简化你的云原生运维