当前位置: 首页 > news >正文

消息中间件(Kafka VS RocketMQ)

 一、概要介绍

  1. Apache Kafka
    1. 分布式、多区、多副本、基于 ZooKeeper 协调的消息系统
    2. 核心组件:ZooKeeper、Broker、Producer、Consumer
    3. 特点:高吞吐(顺序写+零拷贝)、低延迟、易扩展
  2. Apache RocketMQ
    1. 分布式、轻量 NameServer路由、主从多副本、阿里巴巴开源的消息引擎
    2. 核心组件:NameServer、Broker、Producer、Consumer
    3. 特点:原生事务消息、定时/延迟投递、灵活顺序消费

二、架构与原理

1、Kafka

  1. 服务设计
    1. 目标:提供可水平扩展的分布式提交日志服务,实现高吞吐、低延迟、容错与持久化;
    2. 分层:
      1. 存储层:基于 CommitLog 的顺序写入,利用零拷贝提高磁盘 I/O 效率;
      2. 计算层:Producer/Consumer、Kafka Streams、Connect API,支持流处理与数据集成。
  2. 核心组件
    1. ZooKeeper:集群元数据管理、Leader 选举与配置存储;
    2. Broker:Kafka 服务器节点,负责接收、存储、分发消息;
    3. Producer:消息生产方,将数据发送至指定 Topic;
    4. Consumer:消息消费方,通过拉模型按 Offset 拉取数据。
  3. 架构要点
    1. Topic & Partition:Topic 逻辑分组,Partition 物理子分片,支持并行与负载均衡;
    2. Replication:每个 Partition 在多个 Broker 上拥有 Leader/Follower 副本,实现高可用;
    3. Controller:集群内唯一的 Controller Broker,负责分区分配与故障检测。
  4. 工作原理
    1. 生产流程:Producer 从 ZooKeeper 或 Controller 获取 Topic 路由信息 —> 选择 Partition —> 顺序写入 CommitLog —> 返回 Offset;
    2. 复制流程:Leader 接收写入请求 —> 同步写入本地 —> Follower 拉取并写入 —> 保持 ISR(In-Sync-Replicas);
    3. 消费流程:Consumer Group 拉取 Topic 路由 —> 各 Consumer 按分区拉取消息 —> 根据 Offset 进行位点提交与 Rebalance

2、RocketMQ

  1. 服务设计
    1. 目标:提供高可靠、低延迟、灵活多样的消息中间件,强调事务、顺序与延迟消息;
    2. 轻量化:去中心化 NameServer,无状态设计,简化元数据管理。
  2. 核心组件
    1. NameServer:服务发现与路由注册中心,无需持久化,节点间不通信,水平扩展简单
    2. Broker:消息存储与转发节点,分 Master/Slave 部署,顺序写 CommitLog,并维护 ConsumeQueue 索引;
    3. Producer:通过 Netty 连接 NameServer —> 获取路由 —> 发送消息至 Broker,可选择同步/异步/单向模式;
    4. Consumer:支持 Push/Pull 模型,可配置集群消费或广播消费,消费后向 Broker ACK,并可重试或进入私信队列。
  3. 架构要点
    1. Topic & Queue:Topic 逻辑分类,Queue 无力分片;多 Queue 支持并行消费与顺序消费;
    2. Replica & HA:Master/Slave 同步或异步复制,故障自动切换保证可用;
    3. 事务 & 定时:原生事务消息二阶段提交;内置定时/延迟投递机制,无需额外组件。
  4. 工作原理
    1. 注册与心跳:Broker 启动向所有 NameServer 注册并定期心跳;Producer/Consumer 启动向 NameServer 获取路由并心跳;
    2. 生产流程:Producer 获取 Topic-Queue 路由 —> 选择 Broker Master —> 顺序写入 CommitLog —> 更新 ConsumeQueue;
    3. 消费流程:Consumer 拉取路由 —> 对应 Queue 发起 Pull 请求或 Broker 主动 Push —> 处理消息后 ACK —> Broker 更新消费进度。

三、服务部署

在生产环境中,要让 Kafka 和 RocketMQ 都具备“高可用(Availability)”、“高并发(Througput)”和“高可靠(Reliability)”三大能力,就必须在架构设计、集群部署与参数调优三个层面做到精细化:

  1. 架构设计:多节点、多可用区冗余,去中心化或弱中心化元数据服务;
  2. 集群部署:合理分片/队列数、主从副本布局、心跳检测与自动切换;
  3. 参数调优:根据 HA/Throughput/Reliability 三大目标,选择不同维度的核心参数,并理解它们从“不可靠”到“绝对可靠”的逐级演进。

1、Kafka

  1. 架构与部署拓扑
    1. ZooKeeper 集群
      1. 节点数:建议 3-5 个(奇数),分别部署在不同机架或可用区
      2. 负责:元数据存储、Controller选举、Broker 配置同步
    2. Broker 集群
      1. 节点数:>=3个,每个 Broker 承载多个 Partition;
      2. Topic 分区:分区数建议 >= 消费实例数*2,以保证并行度
      3. 副本因子:replication.factor = 3(至少三副本)
    3. Controller:随机在 Broker 上选举产生,负责 Rebalance、Leader 选举等
    4. Producer / Consumer:部署于业务服务侧或专用消息网关,配置多 Broker 地址以备切换
  2. 高可用(Availability)
    1. 多副本与 ISR 机制
      1. 只让与 Leader 保持同步的副本(ISR)参与选举,保证切换时读写一致;
      2. 参数要点:
        1. min.insync.replicas=2:至少两个同步副本确认
        2. unclean.leader.election.enable=false:禁止滞后副本选为 Leader
    2. ZooKeeper 心跳调优
      1. tickTime(心跳间隔)、initLimit(Follower 同步超时心跳数)、SyncLimit(常规心跳丢失数)
      2. 调优原则:心跳间隔不宜过大、否则故障感知慢;也不能过小,网络抖动易误判
    3. Controller 冗余选举
      1. Broker 之间任意节点都可成为 Controller,故障后秒级选举
      2. 监控:Controller 变更事件告警
  3. 高并发(Throughput)
    1. Partition 并行:每个 Topic 分区数按消费实例数*2来规划,分区越多,写入/消费并行度越高
    2. 批量与压缩
      1. linger.ms=5ms:最多等待5ms批量发送
      2. batch.size=64KB:单个批次最大消息量
      3. compression.type=snappy:压缩减小网络与磁盘 I/O
    3. 网络 & I/O线程池分离
      1. num.network.threads=3:专门处理网络请求
      2. num.io.threads=8:专门处理磁盘读写
      3. 这样可防止网络波动影响磁盘 I/O,又可提高并发承载
  4. 高可靠(Reliblity)
    1. ACK 进阶演进
      1. acks=0:零等待、最高吞吐、最低延迟,但完全不可靠
      2. acks=1:Leader 确认后返回,延迟极地,但 Leader 宕机前同步未完成时丢失数据
      3. acks=all + min.insync.replicas=2:Leader + 至少一个 Follower 确认后返回,丢失概率极低
    2. 幂等保障:enable.idempotence=true:自动重试无限次、顺序写、Producer ID + 序列号去重,实现 Exactly-once
    3. 事务支持:配置 transactional.id,使用两阶段提交,Broker 维护事务状态并回查,保证业务与消息原子提交
    4. 日志落盘:
      1. log.flush.scheduler.interval.ms:定时将内存写入磁盘
      2. 可选同步刷盘,但一般生产环境依赖多副本确认即可,无需每条同步刷盘

2、RocketMQ

  1. 架构与部署拓扑
    1. NameServer 集群
      1. 节点数:2-3个,无状态、只存内存路由信息
      2. 客户端(Producer/Consumer)配置多地址,失效自动切换
    2. Broker Master/Slave
      1. Master:写入并供 Consumer 拉取;Slave:只读且同步 Master
      2. 建议每个 Master 至少配置一个 Slave,跨机房部署灾备
    3. Producer / Consumer:部署于业务侧或独立网关,长连接 NameServer+Broker,定期心跳
  2. 高可用(Availability)
    1. 无状态 NameServer
      1. 节点间不通信,重启后通过 Broker 心跳自动重建路由
      2. pollNameServerInterval=30000ms:客户端每 30s 拉取一次路由
    2. 主备切换
      1. Master 宕机后,30s内 Slave 自动升级为 Master
      2. heartbeatBrokerInterval = 30000ms:客户端向 Broker 心跳间隔,保证切换及时
    3. 请求阻塞控制:brokerSuspendMaxTimeMillis=15000ms:Broker 最长阻塞时间,防止单请求拖垮线程池
  3. 高并发(Throughput)
    1. Queue 并行:每个 Topic 配置 queueNum=消费线程数*2,提升消费并发
    2. 批量 & 压缩
      1. compressMsgBodyOverHowmuch = 4096 bytes:超大消息自动压缩
      2. 异步发送:retryTimesWhenSendAysncFailed=3 重试
    3. 线程池配置
      1. sendMessageThreadPollNums=8:Broker 接收写入线程
      2. pullMessageThreadPollNums=8:Broker 响应消费线程
      3. consumeThreadMin/Max=20/64:Consumer 端并发消费动态伸缩
  4. 高可靠(Reliablity)
    1. 同步刷盘
      1. flushDiskType=SYNC_FLUSH:Master写入磁盘后才返回,确保零丢失
      2. 可选 ASYNC_FLUSH,牺牲少量可靠换取更高吞吐
    2. 重试与死信:maxReconsumeTims=16:消费失败后最大重试次数,超出转入死心队列
    3. 事务消息:使用 TransactionMQProducer,两阶段提交+Broker回查,保证跨系统事务一致性

四、消息

1、顺序消息

  1. Kafka的顺序消息只能在单个分区内保证严格顺序,通过消息key或者自定义分区器将相关消息路由到同一个分区,再由消费者单线程按 Offset 顺序消费;
  2. RocketMQ除了类似的“单队列”顺序外,还通过 MessageQueueSelector 实现“分区顺序”(局部有序),并在 Broker 端对队列级别加锁,保证每个队列在同一时刻只有一个线程消费。

(1)Kafka

全局顺序(Global Order)

  1. 实现:将 topic 配置为只有一个 partition,并且只有1个消费者实例,即可在全局上保持顺序
  2. 性能折中:吞吐与并行度大幅下降,因为只有单分区、单线程在跑

分区内顺序(Partition Order)

  1. 实现原理:Kafka 只在单个 Partition 内保证顺序,所有写入该 Partition 的消息按 Offset 严格有序
  2. 消息路由:
    1. Key分区:ProducerRecord 构造时带 key,DefaultPartitioner 会把相同 key 的消息路由到同一分区
    2. 自定义分区器:实现 Partitioner 接口,覆盖 partition() 方法
// Producer
// 构造 ProducerRecord(topic, key, value) → 客户端调用 send(record)。
// DefaultPartitioner.partition(topic, keyBytes, ...) 内部做:
int numPartitions = partitionsFor(topic).size();
return (Utils.murmur2(keyBytes) & 0x7fffffff) % numPartitions;// 自定义分区器 实现 org.apache.kafka.clients.producer.Partitioner 接口:
public class MyPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {// 比如按用户ID取模return Math.abs(key.hashCode()) % cluster.partitionsForTopic(topic).size();}
}// 客户端配置:
partitioner.class=com.example.MyPartitioner// 这样所有 producer.send() 时都会走你的 partition() 逻辑,在发送前就决定好了分区。
// Producer 的路由逻辑在调用 send() 时就完成了,消费者不会再“路由”——它只会拉取自己所属的分区。// Comsuner
// 分区分配:Consumer Group 启动后,协调出每个实例各自负责哪些分区(由协调器 / rebalance 算法决定)。
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 这里 record.topic(), record.partition(), record.offset() 是固定的}consumer.commitSync();
}// 总结:Producer 决定“放到哪个分区”;Consumer 决定“从哪些分区拉”。中间没有二次路由。

(2)RocketMQ

全局顺序(Global Order)

  1. 实现:Topic 只能配置1个队列,1个消费者实例;所有消息都发送到同一队列,单线程消费
// 发送端:只定义一个队列
producer.send(message, new MessageQueueSelector() {public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {return mqs.get(0); // 始终选择第一个队列}
}, null);
// 消费端:使用顺序监听器
consumer.registerMessageListener(new MessageListenerOrderly() { ... });

分区顺序(Partition Order)

  1. 实现:根据业务的 Sharding Key 选队列,同一 Key 的消息走同一队列;消费者可多实例,每个实例独占若干队列。
producer.send(msg, (mqs, msg, key) -> {int index = key.hashCode() % mqs.size();return mqs.get(index);
}, orderId);
consumer.registerMessageListener(new MessageListenerOrderly() { ... });

队列级锁:

Broker 在消费每个队列前会加锁,保证同一时刻只有一个线程消费该队列,顺序不会被打乱。

总结:

  1. 全局顺序:topic 只配1个队列,并且消费者实例数=1;所有消息都跑到同一个队列;
  2. 分区顺序:topic配多个队列;Producer 根据 Sharding Key 选定队列;多个消费者实例,各自独占若干队列;
  3. 队列级锁
    1. Broker端:收到消费者订阅请求后,为每个 MessageQueue(队列)维护一把分布式锁;
    2. 加锁时机:当消费者实例调用 MessageListenerOnderly 顺序消费时,客户端会向 Broker 请求“对某个队列(MessageQueue)加锁”。加锁成功后,该实例再整个批次(ConsumeOrderlyBatchMaxSize)乃至单条消息逐个处理完毕并显示提交ACK之前,不会释放锁;
    3. 锁的生命周期:
      1. 获取锁 —> 开始 pull 并依次交给 listener 处理
      2. 处理完成所有拉到的消息后 —> 主动释放锁
        1. 如果在 consumeOrderlyTimeout 时间内未能完成处理并释放锁,Broker 会强制过期这把锁,然后其他实例才能继续抢锁消费
      3. Broker 再将队列分配给下一个有空闲线程的消费者实例
  4. 消费模式:一条一条拉取 vs 批量拉取
    1. 批量拉取:默认 RocketMQ 顺序消费每次 pull 一批消息(可配置),但客户端内部会将这批消息逐条调用 MessageListenerOrderLy
    2. 单条消费:你的 listener 也可以每次只处理一条,处理完再 ack。只要队列锁未释放,就会继续拉取下一条。

2、消息过滤

  1. RocketMQ 原生支持服务端过滤(Tag过滤、SQL92属性过滤)和消费端多虑,可以在Broker上就把不感兴趣的消息“丢掉”,降低网络与客户端压力;
  2. Kafka本身不支持Broker层的消息内容过滤,只能靠消费端过滤或借助 Kafka Streams/KSQL 等流处理层来实现,真正的筛选总是在消费者一侧完成。

(1)Kafka

消费端过滤

消费者在 poll() 后,根据消息内容自行判断;无用的消息也会进入消费端

Kafka Streams / KSQL 过滤(附加层)

如果需要在 Broker 与下游业务之间做过滤“类似”服务端效果,可部署 Kafka Streams 或 ksqlDB:

// Kafka Streams 示例
KStream<String, Order> orders = builder.stream("TopicTest");
KStream<String, Order> filtered = orders.filter((k, v) -> v.getStatus().equals("CREATED"));
filtered.to("TopicCreatedOrders");

将过滤结果写入新的 Topic,消费者再只订阅 TopicCreatedOrders。

缺点:需要额外部署流处理集群,消息经过二次写入才可消费。

(2)RocketMQ

Tag过滤(最轻量)

原理:给消息打上一个或多个Tag(标签),Broker 为每个队列维护 Tag 到消息位置的简单索引;

服务端:

// Producer 发送时指定 Tag
Message msg = new Message("TopicTest", "TagA", "OrderID001", body);
producer.send(msg);

Broker 在存储时,会同时在消费队列的索引中记录该消息的 Tag;当消费者订阅 TagA 时,只扫描索引取出带有 TagA 的消息。

消费端:

// Consumer 订阅时指定 Tag 过滤
consumer.subscribe("TopicTest", "TagA || TagB");

消费者只拉取包含 A 或 B 的消息,不会接收到其它 Tag 的数据。

SQL92 属性过滤(精细化)

原理:消息可附带任意用户属性(key-value),Broker在存储时把这些属性信息也写入 ConsumeQueue 的二级索引;订阅时可传入 SQL 表达式,Broker 会对索引做简单的条件匹配,只返回符合条件的消息。

生产端:

Message msg = new Message("TopicTest", // topic"TagA",      // tagbody);
msg.putUserProperty("orderStatus", "CREATED");
producer.send(msg);

消费端:

// 服务端过滤
consumer.subscribe("TopicTest",MessageSelector.bySql("orderStatus = 'CREATED' AND price > 100"));

这样只有属性条件满足的消息才会被拉取到客户端,极大减少网络流量与客户端CPU消耗。

3、延迟消息

  1. RocketMQ 原生支持延迟消息,通过将消息先存入内部特殊的调度主题 SCHEDULE_TOPIC_XXXX,并由18个定时任务定期扫描、判断、转发到目标 Topic,来实现任意一级延迟;
  2. Kafka 核心并不内置延迟消息能力,仅能借助外部定时重投、Kafka Streams 定时器或延迟队列 Topic 等手段来模拟延迟发送。

(1)Kafka

Kafka API 中没有类似 RocketMQ 的“延迟等级”或“调度主题”机制;Producer 的 send() 方法只能指定目标 Topic、Partition、Key、Timestamp 等字段,无法内置延迟投递。

(2)RocketMQ

延迟等级与配置:

  1. 默认 18 个等级:从 1s、5s、... 到 2h;
  2. 可在 broker.conf 中通过 messageDelayLevel=... 自定义对应的延迟时间,并在 Broker 启动时加载到 DelayLevelTable

消息流转:

  1. 生产者发送带延迟级别的消息:
    Message msg = new Message(topic, tags, body);
    // 发送时指定延迟级别 3(10s)
    SendResult res = producer.send(msg, 3);
  2. 存储到调度主题:Broker 不直接存入目标 Topic,而是写入内部主题 SCHEDULE_TOPIC_XXXX 对应的第3号队列(延迟级别队列)
  3. 定时调度:RocketMQ启动后,会为每个延迟等级启动一个定时任务(默认每100ms执行一次),执行:
    for each delayLevel in DelayLevelTable:queueId = delayLevel.index - 1nextOffset = topicScheduleMessageService.fetchOffset(queueId)msgExt = scheduleMessageStore.getMessage(queueId, nextOffset)if msgExt.storeTimestamp + delayTime(delayLevel) <= now:// 时间到了,投递到原 TopicbrokerController.getMessageStore().putMessage(clone msg with real Topic)scheduleMessageStore.updateOffset(queueId, nextOffset + 1)
  4. 转发到真实Topic:调度任务将消息“透传”到原本的 Topic 和队列,消费者就像正常消费普通消息一样接收。

关键点:

  1. 调度主题不可见:客户端无法直接订阅 SCHEDULE_TOPIC_XXXX;
  2. 定时队列指针:每个等级维护偏移量,下次从上次结束处继续扫描;
  3. 并发调度:18个任务并行处理不同等级,互不影响;
  4. 可调整精度:通过修改定时任务间隔(20ms -> 100ms)与延迟级别时间,可在毫秒级到小时级自由配置。

4、事务消息

  1. RocketMQ 原生支持事务消息,通过“半消息+本地事务+回查”三步走,实现了本地事务与消息投递的原子性,保证“订单写入成功 <=> 消息投递成功”同步一致。
  2. Kafka 从0.11版本起也提供了事务API,支持 Producer 端在多个分区和多个 Topic 上的 Exactly-Once 语义,但其实现机制与 RcoketMQ 略有差异。

(1)Kafka

  1. 核心机制:事务协调器(Transaction Coordinator)
    1. 幂等生产者:开启 enable.idempotence=true 后,Kafka 客户端自动为每条消息分配序列号,Broker 去重;
    2. 事务 API:
      1. initTransactions() 初始化与协调器的连接;
      2. beginTransaction() 开启事务;
      3. 多次 send() 到同一个或多个 Topic/Partition;
      4. 成功阶段:commitTransaction() —> 协调器写入事务完成标记;
      5. 失败阶段:abortTransaction() —> 丢弃事务内消息;
    3. Exactly-Once:消费者需配置 isolation.level=read_committed,只读已提交事务消息。
  2. 实现细节与限制
    1. 事务协调器:部署在 Kafka Broker 上,每个事务由协调器管理;
    2. 事务范围:同一 Producer 实例内,多个 Topic/Patition;
    3. 幂等与事务耦合:Producer 自动在幂等基础上开启事务,确保无重复并原子提交;
    4. 性能考量:
      1. 事务内消息在提交前对消费者不可见,需额外的协调开销;
      2. 事务跨分区会触发协调器同步,增加延迟。

(2)RocketMQ

  1. 基本模型:二阶段提交
    1. Prepare(半消息):Producer调用 sendMessageInTransaction(msg, arg),Broker 接收并保存为“半消息”(不可见给消费者);
    2. 执行本地事务:Producer 在本地执行业务(如下单、扣库存),代码在 executeLoacalTransaction() 中完成,返回 COMMIT 或 ROLLBACK;
    3. Commit / Rollback:Producer 调用 endTransaction() 通知 Broker:
      1. COMMIT_MESSAGE —> Broker 将半消息变为可投递消息;
      2. ROLLBACK_MESSAGE —> Broker 删除半消息,不投递。
    4. 事务回查:若 Producer 在超时时间内未调用 endTransaction() (网络异常、进程崩溃等),Broker 会定期发起回查请求,Producer 的 checkLocalTransaction(MessageExt msg) 负责读本地状态(持久化的事务记录)并返回状态,最终保证“悬挂”事务决断。
  2. 实现细节与关键点
    1. 半消息存储:写入到内部主题 RMQ_SYS_TRANS_HALF_TOPIC,并带上唯一 transactionId、msg.getKeys() 或 userProprety,用于回查定位;
    2. 事务状态持久化:强烈建议将本地事务状态(如 orderId —> SUCCESS/FAILED)持久化到数据库,避免 JVM 崩溃后丢失状态;
    3. 幂等与安全:
      1. checkLoaclTransaction() 可能被多次或并发调用,务必保证幂等、安全;
      2. 回查返回 UNKNOW 可让 Broker 继续重试,以防业务系统暂时无法判断。
    4. 回查频率:Broker 后台线程定期(默认每30s)检查未决事务,直到收到明确状态;
    5. 性能影响:
      1. 半消息写两次磁盘:一次 Prepare,一次 Commit;
      2. 持久化事务记录与幂等回查的额外 DB 访问。

5、广播消息

  1. RocketMQ原生支持“广播消息”模式,只要将消费者的 MessageModel 设置为 BROADCASTING,就能让同一消费组内的所有实例都各自收到并消费每一条消息;
  2. Kafka 不支持同组内广播。

(1)Kafka

Kafka的 Consumer Group 设计目标就是“同组内消息分发到单实例”,不支持组内广播。

(2)RocketMQ

原理剖析

  1. 消费模式切换

    1. 默认是集群模式(CLUSTERING):同组内消息只会被一个实例消费;
    2. 广播模式(BROADCASTING):同组内每个实例都能消费每条消息。
  2. 消费位点存储
    1. 集群模式:位点(offset)由 Broker 统一维护;
    2. 广播模式:位点由客户端本地保存(通常是本地文件或内存,默认是 ${user.home}/rocketmq_offsets/broadcast_group/),每次拉取后会持久化
  3. 消息分发:Broker 在推送消息时,不检查该组中其他实例的消费状态,只要实例在线并订阅,就逐条下发;
  4. 故障与重启:
    1. 如果本地已有保存的 offset,消费者重启后会从该 offset 继续消费(而非从0开始)
    2. 如果本地尚无任何 offset(首次启动或数据目录被清空),将根据 consumeFromWhere 配置决定起点(如果没有显示的配置 consumeFromWhere,RMQ的广播消费者在首次启动时会使用默认策略  CONSUME_FROM_LAST_OFFSET,只从最新消息之后开始消费,不会重跑历史消息)
      // 默认为 CONSUME_FROM_LAST_OFFSET
      consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
      // 可选 CONSUME_FROM_FIRST_OFFSET 或 CONSUME_FROM_TIMESTAMP
    3. 所以只要不删除本地 offset 文件,每次重启都能接着上次进度继续。

使用方式

// 1. 创建 Consumer 并设置为广播模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 切换为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 订阅 Topic
consumer.subscribe("TopicBroadcast", "*");
// 注册消息监听
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {// 逐条处理System.out.printf("实例[%s] 收到消息: %s%n",InetAddress.getLocalHost().getHostName(),new String(msg.getBody(), StandardCharsets.UTF_8));}// 手动 Ackreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
  1. 手动 ACK:广播模式下,Broker 不等待 ACK,但客户端仍可通过返回 CONSUME_SUCCESS 或抛出异常来触发本地重试/死信机制;
  2. 重试与DLQ:当消费失败时,客户端可自行重试,或配置 maxReconsumeTimes 后转入死信队列,保证至少一次投递;
  3. 位点回溯:可在客户端启动时调用 seek() 或设置 consumeFromWhere 为 CONSUME_FROM_TIMESTAMP,回溯到指定时间点。
    1. 广播模式默认不会自动回溯N条,只在首次无位点时根据 consumeFromWhere 决定起点
    2. 若需“回溯N条”,手动读取并修改本地 offset;
    3. 对于集群模式,推荐使用 mqadmin resetOffsetByTime 或 updateConsumerOffset 工具按时间或按条数统一重置

确保消息不丢的最佳实践

  1. 冗余发送:在 Broker 端开启主从同步刷盘(SYNC_FLUSH),避免挂掉导致消息丢失;
  2. 定期 pull:消费者使用 pull 模式定期拉取,降低网络抖动带来的漏拉消费;
  3. 本地记录:消费端可维护“已消费消息ID”列表,配合去重,防止重复或漏消费;
  4. 重平衡回溯:在实例掉线后重启,主动拉取未消费消息,或回溯N条,确保每条都被消费过

五、高吞吐

在高吞吐领域,Kafka 和 RocketMQ 都通过顺序I/O、索引结构、批量处理和并行化等手段提升性能,但两者的实现思路和侧重点有所差异。Kafka凭借操作系统零拷贝、Page Cache 深度利用、超大分区并行和批量压缩等优化,使其在同等硬件下吞吐能力往往高于 RocketMQ。

1、Kafka

  1. 顺序读写与 Page Cache
    1. 顺序写入 CommitLog:所有消息追加到分区对应的文件(Segment)末尾,避免随机 I/O;
    2. 操作系统 Page Cache:Kafka 不在 JVM 堆内维护大型缓存,而是让 OS 把磁盘数据缓存到内核空间,读写几乎在内存完成,降低 GC 影响;
  2. 零拷贝(Zero-Copy):sendfile() 系统调用:在消费端,数据从 Page Cache 直接推送到网络 Socket,跳过用户态缓冲区复制,减少上下文切换和内存拷贝次数;
  3. 分区与分段(Partition & Segment)
    1. 多分区并行:每个 Topic 可以拥有成百上千个分区,不同分区并行处理,线上集群可轻松横向扩展;
    2. Segment切片:每个分区又按固定大小(如1GB)切分为多个 Segment,提供文件滚动和索引机制,保证单个文件可控切快速查找。
  4. 批量与压缩
    1. 批量发送/拉取:Producer linger.ms + batch.size,Consumer fetch.min.bytes + fetch.max.wait.ms,都鼓励打包处理,提高 I/O 效率;
    2. 压缩算法:Snappy、LZ4 等实时压缩,在保证低延迟到同时显著降低网络贷款与磁盘吞吐压力。
  5. 索引与快速定位:Offset索引:每个 Segment 维护内存映射的 Offset 索引,消费者仅需根据 Offset 直接定位文件偏移,无需扫描。

2、RocketMQ

  1. CommitLog + 内存映射(mmap)
    1. 单一 CommitLog:所有消息顺序写入同一组文件,简化存储逻辑并提升写入吞吐;
    2. 内存映射文件:Broker 端通过 Java MappedByteBuffer 将文件映射到内存,写入与刷盘性能接近本地内存操作。
  2. 消费索引:ConsumeQueue 与 IndexFile
    1. ConsumeQueue:为每个 Topic-Queue 维护轻量级索引文件,记录物理偏移量、消息大小等,消费者拉取时只查索引定位,无需遍历主日志;
    2. IndexFile:可选的二级索引,允许按消息 Key 快速检索,适合少量热点查询。
  3. 批量与刷盘策略
    1. 批量写入:Producer 支持批量发送,Broker 端累积后一次持久化;
    2. 灵活刷盘:同步(SYNC_FLUSH)与异步(ASYNC_FLUSH)可选,允许在可靠性和吞吐之间做权衡。
  4. 分区并行(Queue)
    1. 多队列并行:每个 Topic 可有多个队列,消费者实例可以并行拉取不同队列,加大并发吞吐;
    2. 消息路由灵活:通过 MessageQueueSelector 能根据业务 Key 按需分区,既保证顺序又并行。

3、关键差异对比

优化点KafkaRocketMQ
内存缓存完全依赖 OS Page Chache,JVM对外,降低GC影响依赖 MappedByteBuffer(对外),写入同时映射到内存
网络拷贝sendfile() 零拷贝默认通过 Netty ByteBuffer,可选 FileRegion(部分版本支持零拷贝)
批量处理上层 Producer/Consumer 强制批量配置,如 linger.ms、fetch.min.bytesProducer 批量 API+Broker 端批量刷盘
分区并行分区数可达上千,横向扩容几乎无限队列数受限于 Broker 配置与硬件资源,可并行但通常低于 Kafka 分区数
索引结构基于内存映射 Offset 索引,直接文件定位ConsumeQueue + IndexFile 二级索引,定位稍多一步
刷盘模式可选定时/同步刷盘(多副本确认替代同步刷盘)同步或异步刷盘,功能更直观
延迟与吞吐网络零拷贝 + 大批量 + 高并行 = 毫秒级延迟 + 百万级 TPSmmap + 顺序写 + 批量刷盘 = 低延迟 + 十万级 TPS

4、为何 Kafka 吞吐更大?

  1. OS 层零拷贝 vs Netty复制:
    1. Kafka 原生调用 sendfile(),将 Page Cache 中数据直接推送到 Socket 缓冲,避免用户态 <=> 内核态多次拷贝与上下文切换。
    2. RocketMQ 默认使用 Netty ByteBuf,数据从 MappedByteBuffer —> JVM堆外 —> Netty 缓冲区,多了一次内核—>用户空间的复制。
      1. 虽然可以通过 Netty 的 FileRegion 插件启用零拷贝,但这需要外配置,不如 Kafka 原生开箱即用。
  2. 分区并行 vs 队列并行:
    1. Kafka 分区(Partition)可随时在线添加,且能将分区数调至数百、上千,极大提升并行度;
    2. RocketMQ 队列(Queue)数在 Topic 创建时确定,扩容需重建 Topic 或滚动重启 Broker,难以像 Kafka 那样无中断大幅增队列。
  3. 批量策略灵活度:
    1. Kafka Producer/Consumer 端提供 linger.ms、batch.size、fetch.min.bytes、fetch.max.wait.ms 等多维度参数,可在微秒级动态拼包;
    2. RocketMQ 虽支持批量发送 API(send(List<Message>))和批量拉取,但缺少类似 Kafka 的“时间阈值 + 最小字节”自适应批量拼包机制,需要应用自行控制调用时机。
  4. 索引结构轻量 vs 双级索引:
    1. Kafka 只保留 Offset 索引,在内存映射文件中直接定位偏移,即查即读,无额外索引结构开销。
    2. RocketMQ 采用 ConsumeQueue + IndexFile 双级索引
      1. ConsumeQueue 存放 CommitLog 偏移、消息大小
      2. IndexFile 支持 Key 查询
        1. 虽然功能更强,但读写时需额外维护和解析索引文件,相对多了 I/O 和 CPU 开销。
  5. 副本同步开销 vs 拉模型复制:
    1. Kafka 默认异步复制(Broker 拉取),但在 acks=all 下写入 Leader 时不阻塞后端拷贝,Follower 会后台拉取,避免写路径的阻塞;
      1. Kafka本身是异步拉取复制架构,但通过 acks=all + min.insync.replicas 配置,可以实现“效果上”的同步复制,Leader 在写入本地后,会等待所有在ISR(同步副本组)内的 Follower 都成功写入并 ACK,才向 Producer 返回成功;
      2. acks=all:Producer 要求等待所有 ISR 内的副本(包括 Leader 本身)确认写入后,才算写入成功;
      3. min.insync.replicas=2:意味着写入请求必须至少有2个副本在ISR(可同步的副本集)内确认,否则抛出 NotEnoughReplicas 异常
      4. 虽然 Follower 依旧是“拉”,但 Leader 会阻塞在本地等待这些 ISR Follower 的ACK(其拉到并写入后会回报给 Leader),再向 Producer 返回,这相当于“同步风格”,保证了至少 min.insync.replicas 个节点上的数据一致。
    2. RocketMQ 主从复制可选同步刷盘或异步刷盘
      1. 同步刷盘模式下,Master 必须等待 Slave 磁盘写入返回后才 ACK,增加写延迟
      2. 异步刷盘虽然速度快,但可靠性下滑,不宜用于高可靠场景。
      3. RMQ的复制流程
        1. Master 写入 CommitLog
          1. 根据 flushDiskType
            1. ASYNC_FLUSH:Master 在本地内存提交后异步刷盘,立即返回 ACK。
            2. SYNC_FLUSH:Master 在本地内存提交后同步写磁盘再返回 ACK;
        2. Slave 拉取同步
          1. Slave 持续以 Pull 方式向 Master 请求新消息(CommitLog),拉到后写本地并刷盘。
          2. 无论 Master 是同步刷盘还是异步刷盘,Slave 拉取机制都不变,Master 并不等待 Slave 完成拉取。
        3. Producer ACK 时机:Producer 在 Master 本地写(并根据 flushDiskType 刷盘)成功后,就会收到 ACK,与 Slave 是否已拉取无关。

六、高可用

http://www.xdnf.cn/news/1133407.html

相关文章:

  • UDP和TCP的主要区别是什么?
  • 单片机(STM32-中断)
  • 构建足球实时比分APP:REST API与WebSocket接入方案详解
  • 比特币技术简史 第二章:密码学基础 - 哈希函数、公钥密码学与数字签名
  • 主机安全---开源wazuh使用
  • OCR 与 AI 图像识别:协同共生的智能双引擎
  • 从0开始学习R语言--Day48--Calibration Curves 评估模型
  • 预训练模型:大规模数据预学习范式——定义、原理与演进逻辑
  • 360安全卫士硬盘写入问题解析
  • 了解一下Unity Object的内存管理机制
  • 使用JS编写一个购物车界面
  • C# --- 单例类错误初始化 + 没有释放资源导致线程泄漏
  • 实训十一——网络通信原理
  • WP Force SSL Pro – HTTPS SSL Redirect Boost Your Website‘s Trust in Minutes!
  • ByteToMessageDecoder详解
  • 神经网络常见激活函数 13-Softplus函数
  • Linux4:线程
  • 7.16 Java基础 | 集合框架(上)
  • SM3算法工程中添加bouncycastle.bcprov.jdk15on库
  • 从函数调用到进程通信:Linux下的多语言协作实践
  • MySQL 8.0 OCP 1Z0-908 题目解析(27)
  • 解决“Windows 无法启动服务”问题指南
  • 论文导读--PQ3D:通过分段级分组实现多模态特征融合和 MTU3D:在线查询表示学习与动态空间记忆
  • C# 8.0 创建一个简单的控制台应用程序
  • 使用 CrewAI 进行股票分析:自动化投资决策的新途径
  • YAML 自动化用例中 GET vs POST 请求的参数写法差异
  • 剑指offer64_圆圈中最后剩下的数字
  • 分块(chunked) vs 滑动窗口(windowed)
  • 力扣面试150(31/150)
  • Python爬虫实战:研究PyYAML库相关技术