RocketMQ总结
深入理解RocketMQ三高架构设计
高性能
- 顺序写磁盘 + mmap 零拷贝
- 异步刷盘 + 刷盘策略可配置
- 轻量网络协议 + 长连接复用
高可用
- 主从复制机制、controller、dledger集群
- NameServer 多副本无状态
- 客户端自动切换 Broker
- 消息刷盘机制保障可靠性
高可扩展性
- Broker 水平扩展
- Consumer 分组机制
- Topic/Queue 灵活路由
- 插件式架构设计
快速梳理RocketMQ客户端消息模型
三大核心角色
角色 | 说明 |
---|---|
Producer(生产者) | 发送消息到 Broker。支持同步、异步、单向三种发送方式。 |
Consumer(消费者) | 从 Broker 拉取消息进行消费。支持推模式(Push)和拉模式(Pull)。 |
NameServer | 提供路由发现服务。Producer/Consumer 都通过它查找 Broker 地址。 |
五大关键过程
-
Producer 启动流程
- 初始化 MQClientInstance;
- 向 NameServer 拉取路由信息;
- 建立与 Broker 的连接(Netty 长连接);
- 注册自身到 Topic 路由表。
-
消息发送流程
生产者发送消息时:- 从缓存中查找 Topic 对应的路由信息;
- 按策略选择一个队列(MessageQueue);
- 通过 Netty 将消息发送到对应的 Broker;
- 根据配置选择:
- 同步发送:等待返回确认;
- 异步发送:注册回调函数;
- 单向发送:不关心发送结果,适用于日志类数据。
-
Consumer 启动流程
消费者启动时:- 初始化 MQClientInstance;
- 向 NameServer 拉取 Topic 路由;
- 与 Broker 建立连接;
- 根据消费模式(Push/Pull)拉取消息。
-
消息消费流程
支持两种消费模式:模式 说明 Push 模式(默认) 实际是 Broker 定期向 Consumer 主动推送拉取请求。 Pull 模式 Consumer 主动向 Broker 拉取消息。 消费进度(offset)根据消费模式不同,也有两种:
- 集群模式(Clustering) :队列在多个消费者之间分摊;
- 广播模式(Broadcasting) :每个消费者都消费所有消息。 -
消费确认与重试机制
- 消费成功:Consumer 会定期上报消费进度;
- 消费失败:
- 可自动重试(重投到
RETRY_TOPIC
); - 或转移到死信队列(
DLQ
)。
- 可自动重试(重投到
结合源码理解RocketMQ高性能实现细节
方面 | 实现机制 |
---|---|
消息写入 | 顺序写磁盘 + MappedByteBuffer + 异步刷盘 |
消息读取 | 消费队列(ConsumeQueue)+ 索引文件(IndexFile) |
通信框架 | 高性能 Netty + 自定义轻量协议 |
路由发现 | NameServer 提前缓存路由,无需频繁请求 |
网络效率 | 长连接复用 + 请求压缩 + 线程池模型 |
全面思考RocketMQ的集群架构
RocketMQ 集群核心角色
角色 | 描述 |
---|---|
NameServer | 类似于注册中心,管理路由信息,支持无状态集群部署 |
Broker | 真正存储消息的服务。可部署为主从结构 |
Producer | 消息生产者,连接 NameServer 获取路由,再将消息发送至 Broker |
Consumer | 消息消费者,从 Broker 拉取并消费消息 |
架构特性与设计思想
- NameServer(服务发现)
- 无状态部署,支持多个节点;
- Producer/Consumer 启动时从多个 NameServer 拉取 Broker 路由信息;
- 路由信息是 Broker 主动注册 到 NameServer 的;
- 支持故障容忍(某个 NameServer 掉线不影响整体)。
- Broker(核心)
每个 Broker 有唯一标识(brokerName
+brokerId
):
brokerId = 0
:MasterbrokerId > 0
:Slave
每个 Topic 可以配置多个队列分布在不同的 Broker 上。
主从同步方式:
同步模式 | 描述 |
---|---|
ASYNC_MASTER | 异步同步(默认),写成功不等待 Slave,同步失败不影响写入 |
SYNC_MASTER | 同步刷盘,写消息时等待 Slave 确认,提高可靠性 |
SLAVE | 只做备份,不接收写请求,不参与消费 |
- Producer 工作机制
- 从 NameServer 获取最新 Topic 路由;
- 通过负载均衡策略选择队列(MessageQueue);
- 支持三种发送方式(同步/异步/单向);
- 自动感知路由变化,动态调整发送目标。
- Consumer 工作机制
-
支持两种消费模式:
- 集群模式(Clustering):多个消费者共享消息
- 广播模式(Broadcasting):每个消费者都消费所有消息
-
支持 Push 和 Pull 模式;
-
消费进度保存在 Broker(默认)或本地(广播模式);
-
支持负载均衡重新分配队列(Rebalance)。
集群高可用与容错机制
机制 | 实现 |
---|---|
主从容灾 | Master 挂了,Slave 不自动转正,需人工或运维系统切换 |
NameServer 容灾 | Producer/Consumer 配置多个 NameServer,自动重试 |
消息重试机制 | 消费失败支持自动重试、死信队列 |
刷盘策略保障数据 | 同步刷盘 + SYNC_MASTER 可实现消息 0 丢失(牺牲部分性能) |
生产环境RocketMQ常见问题处理思路
MQ消息零丢失方案总结
各种防止MQ消息丢失的方案,本质上都是以牺牲系统性能和吞吐量为代价的。这种资源消耗必然会导致集群整体效率的下降。在实际业务场景中,我们需要根据具体需求对这些安全方案进行权衡取舍。
- 生产者发送消息如何保证不丢失
- 同步发送+多次尝试(降低吞吐)
- 异步发送(增加生产者客户端负担)
- 事务消息机制(多次网络请求)
- Broker写入数据如何保证不丢失
- 同步刷盘(I/O负担)
- Dledger集群(网络负担)
- 消费者消费消息如何不丢失
- 同步处理消息,再提交offset(无法通过异步提高吞吐)
- 如果MQ服务全部挂了,如何保证不丢失
- 增加临时的降级存储
MQ如何保证消息的顺序性
强调局部有序,而不是全局有序。
- Producer将一组有序的消息写入到同一个MessageQueue中。
- Consumer每次只有单个线程能从一个同一个TopicMessageQueue中拿取消息。
MQ如何保证消息幂等性
-
生产者发送消息到服务端如何保持幂等
Producer发送消息时,如果采用发送者确认的机制,Producer发送消息会等待Broker的响应。若未收到响应,Producer将自动重试发送。然而,这种情况也可能发生在消息已被处理成功处理但确认响应丢失的场景中,从而导致消息重复发送的问题。
RocketMQ的处理方式,是会在发送消息时,给每条消息分配一个唯一的ID。 -
消费者消费消息如何保持幂等、
RocketMQ官网明确做了回答:RocketMQ确保所有消息至少传递一次。在大多数情况下,消息不会重复。
防止重复消费的关键在于确定一个可靠的唯一性标识。RocketMQ为每条消息自动分配了唯一的messageId,消费者可以通过获取这个messageId来实现去重。将已处理的messageId记录下来,就能有效判断消息是否重复消费。数据库的兜底方案则是在某些适用的场景下设置唯一键,插入重复的唯一键自然会报错回滚。
MQ如何快速处理积压的消息
-
消息积压会有哪些问题
RocketMQ和Kafka都具备出色的消息积压处理能力,短期的消息堆积通常不会造成问题。然而需要警惕的是,若积压问题长期得不到解决,当日志文件过期时,系统会自动删除这些过期文件,导致其中未被消费的消息永久丢失。 -
怎么处理大量积压的消息
-
RabbitMQ
如果是Classic Queue经典对列,那么针对同一个Queue的多个消费者,是按照Work Queue的模式,在多个Consuemr之间依次分配消息的。所以这时,如果Consumer消费能力不够,那么直接加更多的Consumer实例就可以了。这里需要注意下的是如果各个Consumer实例他们的运行环境,或者是处理消息的速度有差别。那么可以优化一下每个Consumer的比重(Qos属性),从而尽量大的发挥Consumer实例的性能。 -
RocketMQ和Kafka
因为同一个消费者组下的多个Cosumer需要和对应Topic下的MessageQueue建立对应关系,而一个MessageQueue最多只能被一个Consumer消费,因此,增加的Consumer实例最多也只能和Topic下的MessageQueue个数相同。如果此时再继续增加Consumer的实例,那么就会有些Consumer实例是没有MessageQueue去消费的,因此也就没有用了。
如果Topic下的MessageQueue配置本来就不够多的话,那就无法一直增加Consumer节点个数了。
如果要快速处理积压的消息,可以创建一个新的Topic,配置足够多的MessageQueue。然后把Consumer实例的Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转存到新的Topic中。这个速度明显会比普通Consumer处理业务逻辑要快很多。然后在新的Topic上,就可以通过添加消费者个数来提高消费速度了。之后再根据情况考虑是否要恢复成正常情况。类似固定级别的延迟消息机制,把消息临时转到一个系统内部的Topic下,处理过后,再转回来。