当前位置: 首页 > web >正文

速学 RocketMQ

目录

本地启动&测试&可视化

核心概念

集群

主从 集群

Dledger 集群 

总结

客户端消息确认机制

广播模式

消息过滤机制

顺序消息机制

延迟消息与批量消息

事务消息机制

ACL权限控制体系

RocketMQ客户端注意事项

消息的 ID、Key、Tag

最佳实践

消费者端进行幂等控制

关注错误消息重试

手动处理死信队列

MQ如何保证消息不丢失

1、哪些环节可能丢消息

2、生产者发送消息如何保证不丢

3、Borker写入数据如何保证不丢失

4、Broker主从同步如何保证不丢失

5、消费者消费消息如何不丢失

6、如果MQ服务全部挂了,如何保证不丢失

7、MQ消息零丢失方案总结

MQ如何保证消息的顺序性

MQ如何保证消息幂等性

MQ如何快速处理积压的消息


本地启动&测试&可视化

官网文档及下载地址:https://rocketmq.apache.org/zh/docs/

RocketMQ 也是Java开发的程序,所有需要有JDK环境

1、可以在 bin/runserver.sh 文件中修改初始堆内存、最大堆内存、年轻代初始内存大小,默认都是2g,太大了我的电脑吃不消,调小一点。注意jdk8及之前的是在图片中的 if 修改,后面的版本要在 else 中修改

2、在bin/runbroker.sh中修改

3、配置RocketMQ环境变量:

# RocketMQ 相关环境变量配置,:$PATH注意这里要追加到现有的 PATH 而不是覆盖,否则所有基础命令都会失效
export ROCKETMQ_HOME=/home/app/rocketmq/rocketmq-5.3.1-bin
export PATH=$ROCKETMQ_HOME/bin:$PATH
export NAMESRV_ADDR='localhost:9876'

刷新 source /etc/profile

4、启动NameServer,进入RocketMQ的bin目录运行

 nohup bin/mqnamesrv &

cat nohup.out   查看输出日志

出现The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876 表示成功

5、启动Broker

在/conf/broker.conf后面添加:
brokerIP1 = 服务器ip
autoCreateTopicEnable=true
namesrvAdd=localhost:9876  #(NameServer的地址)

启动命令:nohup mqbroker -c ../conf/broker.conf &

如果报错:Unrecognized VM option 'UseBiasedLocking' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.
这个是jdk版本已经不再支持UseBiasedLocking虚拟机,只需要在runbroker 将包含UseBiasedLocking这个一行注释掉即可

查看命令:cat nohup.out 出现The broker[iZ7xvhygbzymeb78axyu9pZ, 172.18.226.151:10911] boot success. serializeType=JSON and name server is localhost:9876 表示成功

6、发送测试消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

7、​消费测试消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

8、RocketMQ0-Dashboard 可视化页面
docker run -d --name rocketmq-dashboard \
  --network docker-net \
  -p 8080:8080 \
  --restart=always \
  -e "JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv_dj:9876" \
  apacherocketmq/rocketmq-dashboard

然后将在容器中的jar拷贝出来:
docker cp abc123:/rocketmq-dashboard.jar /home/app/rocketmq
启动:
nohup java --add-opens java.base/java.nio=ALL-UNNAMED \
           --add-opens java.base/sun.nio.ch=ALL-UNNAMED \
           -Drocketmq.namesrv.addr=localhost:9876 \
           -jar rocketmq-dashboard.jar > rocketmq-dashboard.log 2>&1 &

访问ip:8080 即可

核心概念

Topic:主题是消息的一级分类单元,生产者将消息发送到特定的主题,而消费者订阅该主题以接收消息。一个Topic可以有多个MessageQueue。
MessageQueue(消息队列):每个Topic可以包含一个或多个消息队列。RocketMQ通过在不同的消息队列之间分配消息来实现负载均衡和高吞吐量。MessageQueue专属于某个Topic:一个MessageQueue仅存储其所属Topic的消息,不会跨Topic存储消息
Consumer Group(消费者组):一组逻辑上相同的消费者构成一个消费者组。同一个消费者组内的所有消费者共同消费来自某个Topic的所有消息,但每条消息只会被该组中的一个消费者处理。
Consumer Instance(消费者实例):这是实际运行的消费者程序的一个实例(代码中 new 出来就算一个实例)。一个消费者组可以包含多个消费者实例。


关系与工作机制
消息队列与消费者的对应关系:
在RocketMQ中,对于任何一个给定的消费者组,它所订阅的Topic下的每个MessageQueue最多只能由该组内的一个Consumer实例进行消费。这种一对一的关系确保了每条消息只会在消费者组内被处理一次,从而避免重复消费的问题。
负载均衡机制:
当有多个消费者实例存在于同一消费者组时,RocketMQ会自动在这几个消费者实例之间平均分配Topic下的所有MessageQueue。
如果增加的消费者实例数量超过了Topic下MessageQueue的数量,则多余的消费者实例将处于空闲状态,因为没有额外的MessageQueue供它们消费。
实际应用场景示例
假设有一个名为OrderTopic的主题,它包含4个MessageQueue,并且有两个消费者实例C1和C2属于同一个消费者组G1:

RocketMQ会将这4个MessageQueue平均分配给C1和C2,例如,C1负责消费两个MessageQueue,C2也负责另外两个MessageQueue。
如果再启动第三个消费者实例C3加入到消费者组G1,由于只有4个MessageQueue,因此其中一个消费者实例将会闲置,不会分配到任何MessageQueue。
反之,如果减少至只有一个消费者实例C1,那么这个实例将会负责消费所有的4个MessageQueue。

集群

主从 集群


修改配置就可以搭建了对应的集群了,slave负责给broker做备份

优点:性能快

缺点,当broker挂了之后 slave 不会自动升级为broker,可用性没有Dledger集群高

如果业务对高可用要求没那么高,对性能要求较高,可选用该种方式搭建集群。

Dledger 集群 

却点:性能没有上一种方式的快

优点,当broker(leader)挂了之后,其他的broker会自动选举leader,高可用

如果业务对性能要求没那么高,对高可用要求较高,可选用该种方式搭建集群。

总结

broker要有多数存活才能从新选举为leader,例如有5个broker,但是只有1个存活,这样就不能选举,而nameserver只要有一个存活就可以正常工作

1、nameServer 命名服务:
nameServer不依赖于任何其他的服务,自己独立就能启动。并且,不管是broker还是客户端,都需要明确指定nameServer的服务地址。以一台电脑为例,nameServer可以理解为是整
个RocketMQ的CPU,整个RocketMQ集群都要在CPU的协调下才能正常工作。
2、broker 核心服务:
broker是RocketMQ中最为娇贵的一个组件。RockeMQ提供了各种各样的重要设计来保护broker的安全。同时broker也是RocketMQ中配置最为繁琐的部分。同样以电脑为例,broker就是整个
RocketMQ中的硬盘、显卡这一类的核心硬件。RocketMQ最核心的消息存储、传递、查询等功能都要由broker提供。
3、client 客户端
Client包括消息生产者和消息消费者。同样以电脑为例,Client可以认为是RocketMQ中的键盘、鼠标、显示器这类的输入输出设备。鼠标、键盘输入的数据需要传输到硬盘、显卡等硬件才能进行处理。但是键盘、鼠标是不能
直接将数据输入到硬盘、显卡的,这就需要CPU进行协调。通过CPU,鼠标、键盘就可以将输入的数据最终传输到核心的硬件设备中。经过硬件设备处理完成后,再通过CPU协调,显示器这样的输出设备就能最终从核心硬件设备中获取到输出的数据。

topic是逻辑结构,而真正的物理存储结构是MessageQueue队列 
最小位点是当前队列中最开始的消息编号,如果删掉了前面的消息,那么最小位点就往前移了
最大位点最小位点 + 消息数(前提是位点连续,消息被删除或者由于消费失败而进行重试时,可能会导致位点不连续

消息模型图:

客户端消息确认机制

消息确认机制:生产者向broker发送一个消息,broker会个生产者响应一个确认消息

单向发送:生产者只负责发送消息,不关心是否发送成功,也不等待服务器响应。这种方式适用于对消息可靠性要求不高的场景。

同步发送确认
生产者发送消息后,​同步等待 Broker 的响应。若 Broker 返回 SEND_OK 状态码(如 SendStatus.SEND_OK),说明消息已持久化到 CommitLog 并被主从同步。若超时未收到响应或返回错误,生产者会触发重试(默认重试 2 次)。

// 示例:生产者同步发送消息(网页4)
SendResult sendResult = defaultMQProducer.send(message);
System.out.println("发送结果:" + sendResult.getSendStatus());

优点消息传递可靠性很强:适用于需严格保证消息送达的场景(如订单创建)。

缺点:需要阻塞性的等待broker的响应,吞吐量受限,频繁的同步等待限制并发性能。

适用场景:金融交易、关键业务通知(如支付结果)

异步发送确认
生产者通过回调函数异步处理 Broker 的响应,适用于高吞吐场景。发送线程不阻塞,通过 SendCallback 接收成功或失败通知:

producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) { /* 处理成功逻辑 */ }@Overridepublic void onException(Throwable e) { /* 处理失败并重试 */ }
});

优点低延迟:响应时间短,提升系统整体性能,消息传递可靠性相对较强

缺点:​业务层处理可能的重复或丢失消息(如幂等设计),producer主线程需要一直开启,比较消耗服务器资源

适用场景:日志采集、实时监控等允许少量数据丢失的高吞吐场景

广播模式

广播模式和集群模式是RocketMQ的消费者端处理消息最基本的两种模式。
集群模式:一条消息只会被分配给同一个消费者组中的一个消费者实例进行处理。在此模式下,broker会在内部以消费者组的概念来维护一个消费者位。

广播模式:每条消息都会被发送到每个订阅了该主题的消费者实例。在此模式下,各自的Consumer内部(消费者客户端本地电脑offset.json文件中,该文件目前无法在Windows上创建)自行维护消费者位点,如果该文件丢失会自动创建并从MessageQueue的最后一条开始消费;消费失败无法重试

消息过滤机制

1、Tag过滤:生产者在发送消息的时候不仅可以指定Topic,还可以知道Tag,进一步细分消息类型,那么在消费者端就可以通过Topic + Tag 拿到指定的消息,但是这种过滤方式只能简单的进行字符匹配,无法进行复杂匹配,例如匹配字符数字大于3的所有Tag。

2、SQL过滤:RocketMQ还支持基于SQL92表达式的高级消息过滤功能。通过这种方式,可以根据消息属性进行更复杂的过滤操作,如数值比较、字符串匹配等。使用方法:在发送消息时,除了基本的消息体外,还可以添加一些自定义属性(当消息非常多的时候,不建议使用自定义属性来过滤)。订阅消息时,使用SQL表达式来描述过滤条件。例如,

Message message = new Message("TopicTest", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
message.putUserProperty("a", "3"); // 添加自定义属性
SendResult sendResult = producer.send(message);
consumer.subscribe("TopicTest", MessageSelector.bySql("a > 3 and b = 'abc'"));

表示只消费那些属性a大于3且属性b等于"abc"的消息。**但是这种功能需要手动开启:在broker.conf文件中添加 enablePropertyFilter=true 即可。

性能影响:虽然SQL92表达式过滤提供了更大的灵活性,但它可能会对消息队列的性能产生一定影响,因为它需要更多的计算资源来进行条件判断。相比之下,Tag过滤由于其实现较为简单直接,性能开销较小(甚至可以忽略不记)。用SQL92表达式过滤时,要注意合理设置过滤条件,避免过于复杂的查询导致性能下降。

顺序消息机制

保证局部有序(不保证全局有序),好比每一笔订单中的每一个步骤都是有序的(局部有序),但是不保证所有的订单都按照创建时间来依次处理(全局无序,因为全局有序是没有意义的)

原理就是将一笔订单中的所有步骤都发送到同一个massegeQueue,然后broker按照massegeQueue中的消息顺序依次推送给消费者 就可以消息的顺序了,因为在不同的massegeQueue去拉去消息会存在网络因素导致顺序不一致,所有所有步骤都发送到同一个massegeQueue就可以保证了。也就是说如果把所有的消息都存储到用一个massegeQueue中就可以保证全局消息的顺序性了,但是这样队列就会有性能问题。因此我们在存储消息的时候尽可能大散在不同的队列中。
费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代,用于指示当前消息队列暂停片刻再尝试消费;例如一个队列中的步骤1成功,步骤2失败,那么就会将整个队列暂时阻塞,一段时间(基于内部实现中的退避算法,通常是几秒到几十秒不等)后在从步骤2开始重试来保证顺序

延迟消息与批量消息

延迟消息:

Message message = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延迟级别为3,即表示延迟10秒(根据默认的延迟级别表)
message.setDelayTimeLevel(3);
producer.send(message);

RocketMQ内置有一些延迟时间,可以直接使用,但是不支持在代码中自定义延迟时间,如果想在代码中设置延迟,可以考虑spring的Task等内部实现定时然后发送到topic;要自定义延迟消息的时间间隔,您需要编辑Broker的配置文件broker.conf,并调整messageDelayLevel参数。这个参数允许您定义一系列以逗号分隔的延迟级别,每个级别表示一个从消息发送到可消费的时间间隔。

messageDelayLevel=1s 5s 10s 30s 1m

批量消息:

每次发送消息都要进行一次网络的io,所有可以批量发送消息减少网络io,但是一次发送的消息量不宜过多,具体多少需要自行在客户端写逻辑判断。批量消息也有限制,如不能做延迟消息,要求所有消息都是同一个topic等。

事务消息机制

** 事务控制是在生产者端的 **,rockermq的事务和我们常规的事务不一样,rockermq的事务是保证某个事件与发送消息组成原子性

事务消息是RocketMQ非常有特色的一个高级功能。他的基础诉求是通过RocketMQ的事务机制,来保证上下游的数据一致性。以电商为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。这种场景,非常适合使用RocketMQ的解耦功能来进行串联。

1.发送方向 MQ 服务端发送事务消息(就和正常的消息一样,只不过消费者不可见,实现原理是将消息存储到系统的另一个特殊的topic中);
2.MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功(回调),此时消息为半消息。如果ACK 失败,那么客户端就重试,当超过最大重试次数就可以做告警了,比如邮箱告警等。
3.发送方开始执行本地事务逻辑。
4.发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或 Rollback 或),MQ Server 收到 Commit 状态则将半消息标记为可投递(将消息转存到正常的topic),订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
5.在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。如果会查状态是UNKNOWN(待检测),那么broker就会隔一段时间重试,如果超过重试次数就会放到死信队列中。
6.发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 
7.发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务try {// 假设这里是您的本地事务逻辑System.out.println("Executing local transaction for message: " + new String(msg.getBody()));return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 检查本地事务状态System.out.println("Checking local transaction for message: " + new String(msg.getBody()));return RocketMQLocalTransactionState.UNKNOWN;}
}
@Service
public class TransactionService {private final RocketMQTemplate rocketMQTemplate;public TransactionService(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;}public void sendMessageInTransaction() {Message<String> message = MessageBuilder.withPayload("Hello RocketMQ").setHeader("tag", "TagA").build();rocketMQTemplate.sendMessageInTransaction("TopicTest:TagA", message, null);}
}

对于事务消息还有一个使用场景---充当定时任务:

订单创建等待支付,比如订单要在15分钟内支付,否则关闭订单,常规做法是使用Xxl-Job等类似的定时任务中间件,比如每个30秒一次查询用户是否支付。

更优雅的做法是使用 rockermq 事务消息充当定时任务:
当订单创建时向 broker 发送一个 UNKNOWN 状态的事务消息,broker 就会自动的向客户端发送回调检测状态,这个就可以在回调方法里面查询用户15分钟之内是否支付成功,如果支付失败就将优惠卷状态恢复等等并返回Rollback,成功就返回Commit,消费者就可以看到这个消息,就可以通知到后面的物流系统等;如果支付成功了,但是消费者执行失败了(例如库存扣减失败),那么broker就会进行重试,会在某一次执行成功,达到最终一致性。

ACL权限控制体系

1、是否允许自动创建 topic : autoCreateTopicEnable=true 表示允许,在生产环境一般不允许

2、topic权限:

perm 代表权限,对生产者和消费者做限权

  • 0 (0000):无权限(Neither Read nor Write)
  • 2 (0010):仅读权限(Only Read)
  • 4 (0100):仅写权限(Only Write)
  • 6 (0110):读写权限(Both Read and Write)

当消费者消费失败会创建一个perm为6的重试队列,当重试次数达到一定次数就会创建一个perm为2死信队列。要处理死信队列只能手动去修改权限后再处理。

也可以在 /conf/plain_acl.yml文件中配置其他权限,例如:

如果再这个配置中配置了账号和密码,那么在客户端声明RocketMQ实例的时候要去指明即可

RocketMQ 作为一个内部服务,并不需要对外,所有权限控制很少使用。

RocketMQ客户端注意事项

消息的 ID、Key、Tag

这里有个小细节需要注意,producer生产者端发送的是Message对象,而Consumer消费端处理的却是MessageExt对象。也就是说,虽然都是传递消息,但是Consumer端拿到的信息会比Producer端发送的消息更多,也就有几个重点的参数需要理解。那就是Messageld,Key和Tag

Messageld是RocketMQ内部给每条消息分配的唯一索引:
Producer发送的Message对象是没有msgld属性的。Broker端接收到Producer发过来的消息后,会给每条消息单独分配一个唯一的msgld。这个msgID可以作为消息的唯一主键来使用。但是需要注意,对于客户端来说,毕竟是不知道这个msgld是如何产生的。实际上,在RocketMQ内部,也会针对批量消息、事务消息等特殊的消息机制,有特殊的msgld分配机制(当使用某些框架的时候,可能会导致Messageld不唯一)。因此,在复杂业务场景下,不建msgld来作为消息的唯一索引l,而建议采用下面的key属性自行指定业务层面上的唯一索引。例如订单消息就将订单ID设置为key。

最佳实践

一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("tags")
Kafka的一大问题是Topic过多,会造成Partition文件过多,影响性能。而RocketMQ中的Topic完全不会对消息转发性能有影响。但是Topic过多,还是会加大RocketMQ的元数据维护的性能消耗。所以,在使用时,还是需要对Topic进行合理的分配。使用Tag区分消息时,尽量直接使用Tag过滤,不要使用复杂的SQL过滤。因为消息过滤机制虽然可以减少网络IO,但是毕竟会加大Broker端的消息处理压力。所以,消息过滤的逻辑,还是越简单越好。

消费者端进行幂等控制

官方回答中说道:RocketMQ确保所有消息至少传递一次。在大多数情况下,消息不会重复。

消息幂等的必要性:
在互联网应用中,尤其在网络不稳定的情况下,消息队列RocketMQ的消息有可能会出现重复,这个重复简单可以概括为以下情况:
1、发送时消息重复:
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 MessageID 也相同的消息。
2、投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且MessageID 也相同的消息。
3、负载均衡时消息重复(包括但不限于网络抖动、Broker重启以及订阅方应用重启)
当消息队列RocketMQ的Broker或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

处理方式
从上面的分析中知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。而要处理这个问题,RocketMQ的每条消息都有一个唯一的Messageld,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个Messageld来作为判断幂等的关键依据。但是,这个Messageld是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

关注错误消息重试

如果消费者返回的状态是 RECONSUME_LATER(稍后重试),那么这个消息会被放到重试队列中,以消费者组的规则和分配策略重新推送给消费者,这个重试队列是系统自动创建的,一般来说,有了重试队列就代表消费者处理有异常,那么我们可以监控是否有重试队列以此来错告警。

手动处理死信队列

死信队列的特征:
1、一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
2、如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
3、一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
4、死信队列中的消息不会再被消费者正常消费。
5、死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
注:默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。
 

MQ如何保证消息不丢失

1、哪些环节可能丢消息

其中,1,2,4三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。
然后关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存pagecache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。

2、生产者发送消息如何保证不丢

生产者发送消息之所以可能会丢消息,都是因为网络。因为网络的不稳定性,容易造成请求丢失。怎么解决这样的问题呢?其实一个统一的思路就是生产者确认。简单来说,就是生产者发出消息后,给生产者一个确定的通知,这个消息在Broker端是否写入完成了。就好比打电话,不确定电话通没通,那就互相说个“喂”,具体确认一下。只不过基于这个同样的思路,各个MQ产品有不同的实现方式。

1、通过RockerMQ客户端的消息确认机制保证消息不丢失

2、通过发送事务消息来保证息不丢失

3、Borker写入数据如何保证不丢失


broker接收到消息并不会马上写到磁盘上,而是先写到操作系统的pagecache缓存页中,过一段时间才才写到磁盘冲。以Linux为例,用户态的应用程序,不管是什么应用程序,想要写入磁盘文件时,都只能调用操作系统提供的write系统调用,申请写磁盘。至于消息如何经过PageCache再写入到磁盘中,这个过程,这个过程是在内核态执行的,也就是操作系统自己执行的,应用程序无法干预。这个过程中,应用系统唯一能够干预的,就是调用操作系统提供的sync系统调用,申请一次刷盘操作,主动将PageCache中的数据写入到磁盘。

RocketMQ如何调用fsync的?
RocketMQ的Broker提供了一个很明确的配置项flushDiskType,可以选择刷盘模式。有两个可选项,SYNC_FLUSH同步刷盘ASYNC_FLUSH异步刷盘
所谓同步刷盘,是指broker每往日志文件中写入一条消息,就调用一次刷盘操作。而异步刷盘,则是指broker每隔一个固定的时间,才去调用一次刷盘操作。异步刷盘性能更稳定,但是会有丢消息的可能。而同步刷盘的消息安全性就更高,但是操作系统的IO压力就会非常大。
在RocketMQ中,就算是同步刷盘,其实也并不是真的写一次消息就刷盘一次,这在海量消息的场景下,操作系统是撑不住的。所以,我们在之前梳理RocketMQ核心源码的过程中看到,RocketMQ的同步刷盘的实现方式其实也是以10毫秒的间隔去调用刷盘操作。从理论上来说,也还是会有非正常断电造成消息丢失的可能,甚至严格意义上来说,任何应用程序都不可能完全保证断电消息不丢失。但是,RocketMQ的这一套同步刷盘机制,却可以
通过绝大部分业务场景的验证。这其实就是一种平衡。

4、Broker主从同步如何保证不丢失

在这种集群机制下,消息的安全性还是比较高的。但是有一种极端的情况需要考虑。因为消息需要从Master往Slave同步,这个过程是跨网络的,因此也是有时间延迟的。所以,如果Master出现非正常崩溃,那么就有可能有一部分数据是已经写入到了Master但是还来得及同步到Slave。这一部分未来得及同步的数据,在RocketMQ的这种集群机制下,就会一直记录在Master节点上。等到Master重启后,就可以继续同步了。另外由于Slave并不会主动切换成Master,所以Master服务崩溃后,也不会有新的消息写进来,因此也不会有消息冲突的问题。所以,只要Mater的磁盘没有坏,那么在这种普通集群下,主从同步通常不会造成消息丢失。

他优先保证的是集群内的数据一致性,而并不是保证不丢失。在某些极端场景下,比如出现网络分区情况时,也会丢失一些未经过集群内确认的消息。不过,基于RocketMQ的使用场景,这种丢失消息的可能性非常小。另外,这种服务端无法保证消息安全的问题,其实结合客户端的生产者确认机制,是可以得到比较好的处理的。因此,在RocketMQ中使用Dledger集群的话,数据主从同步这个过程,数据安全性还是比较高的。基本可以认为不会造成消息丢失。

5、消费者消费消息如何不丢失

消费者消费消息的过程中,需要从Broker上拉取消息,这些消息也是跨网络的,所以拉取消息的请求也可能丢失。这时,会不会有丢消息的可能呢?
几乎所有的MQ产品都设置了消费状态确认机制。也就是消费者处理完消息后,需要给Broker一个响应,表示消息被正常处理了。如果Broker端没有拿到这个响应,不管是因为Consumer没有拿到消息,还是Consumer处理完消息后没有给出相应,Broker都会认为消息没有处理成功。之后,Broker就会向Consumer重复投递这些没有处理成功的消息(如果超过重试次数机会放到死信队列)。RocketMQ和Kafka是根据Offset机制重新投递,而RabbitMQ的ClassicQueue经典对列,则是把消息重新入队。因此,正常情况下,Consumer消费消息这个过程,是不会造成消息丢失的,相反,可能需要考虑下消息幂等的问题。

6、如果MQ服务全部挂了,如何保证不丢失

最后有一种小概率的极端情况,就是MQ的服务全部挂掉了,这时,要如何保证业务能够继续稳定进行,同时业务数据不会丢失呢?
通常的做法是设计一个降级缓存。Producer往MQ发消息失败了,就往降级缓存中写,然后,依然正常去进行后续的业务。此时,再启动一个线程,不断尝试将降级缓存中的数据往MQ中发送。这样,至少当MQ服务恢复过来后,这些消息可以尽快进入到MQ中,继续往下游Conusmer推送,而不至于造成消息丢失。

7、MQ消息零丢失方案总结

最后要注意到,这里讨论到的各种MQ消息防止丢失的方案,其实都是以增加集群负载,降低吞吐为代价的。这必然会造成集群效率下降。因此,这些保证消息安全的方案通常都需要根据业务场景进行灵活取舍,而不是一股脑的直接用上。
这些消息零丢失方案,其实是没有最优解的。因为如果有最优解,那么这些MQ产品,就不需要保留各种各样的设计了。这和很多面试八股文是有冲突的。面试八股文强调标准答案,而实际业务中,这个问题是没有标准答案的,一切,都需要根据业务场景去调整。

MQ如何保证消息的顺序性

这里首先需要明确的是,通常讨论MQ的消息顺序性,其实是在强调局部有序,而不是全局有序。就好比QQ和微信的聊天消息,通常只要保证同一个聊天窗口内的消息是严格有序的。至于不同窗口之间的消息,顺序出了点偏差,其实是无所谓的。所谓全局有序,通常在业务上没有太多的使用场景。在RocketMQ和Kafka中把Topic的分区数设置成1,这类强行保证消息全局有序的方案,纯属思维体操。

这个机制需要两个方面的保障。

1、Producer将一组有序的消息写入到同一个MessageQueue中。

2、Consumer每次集中从一个MessageQueue中拿取消息。

在Producer端,RocketMQ和Kafka都提供了分区计算机制,可以让应用程序自己决定消息写入到哪一个分区。所以这一块,是由业务自己决定的。只要通过定制数据分片算法,把一组局部有序的消息发到同一个对列当中,就可以通过对列的FIFO特性,保证消息的处理顺序。对于RabbitMQ,则可以通过维护Exchange与Queue之间的绑定关系,将这一组局部有序的消息转发到同一个对列中,从而保证这一组有序的消息,在RabbitMQ内部保存时,是有序的。在Conusmer端RocketMQ是通过让Consumer注入不同的消息监听器来进行区分的。而具体的实现机制,核心是通过Consumer的消费线程进行并发控制,来保证消息的消费顺序的。类比到Kafka呢。Kafka中并没有这样的并发控制。而实际上,Kafka的Consumer对某一个Partition拉取消息时,天生就是单线程的,所以,参照RocketMQ的顺序消费模型,Kafka的Consumer天生就是能保证局部顺序消费的。

至于RabbitMQ,以他的ClassicQueue经典对列为例,他的消息被一个消费者从队列中拉取后,就直接从队列中把消息删除了。所以,基本不存在资源竞争的问题。那就简单的是一个队列只对应一

个Consumer,那就是能保证顺序消费的。如果一个队列对应了多个Consumer,同一批消息,可能会进入不同的Consumer处理,所以也就没法保证消息的消费顺序

MQ如何保证消息幂等性

通用解决方案

1. 使用唯一消息标识

在发送消息时,为每条消息生成一个全局唯一的标识符(如UUID、订单号等)。在消费端,通过检查这个唯一标识来判断消息是否已被处理过。

实现步骤

  • 发送端在发送消息时,将唯一标识作为消息的Keys或属性。
  • 消费端在接收到消息后,首先检查这个唯一标识是否已存在于数据库中。
  • 如果存在,则说明消息已被处理过,直接跳过;如果不存在,则进行业务处理,并将唯一标识存入数据库。

2. 引入消息去重表或者布隆过滤器

在数据库中创建一个消息去重表,用于记录已处理消息的唯一标识。消费端在处理消息前,先查询去重表,判断消息是否已被处理。或者使用布隆过滤器来判断是否消费过了。

实现步骤

  • 定义一个消息去重表,包含唯一标识和消息状态等字段。
  • 消费端在处理消息前,先插入一条记录到去重表中(使用唯一标识作为主键,以处理并发插入时的冲突)。
  • 如果插入成功,则说明消息是新的,进行业务处理;如果插入失败(主键冲突),则说明消息已被处理过,直接跳过。

3. 利用RocketMQ的消息ID

虽然RocketMQ的消息ID在大多数情况下是唯一的,但不建议直接依赖它来实现消息的幂等性,因为存在生产者手动重发相同消息(但Message ID不同)的情况。

然而,在某些场景下,可以结合业务唯一标识和消息ID来辅助实现幂等性。例如,在数据库中同时记录业务唯一标识和RocketMQ的消息ID,通过这两个字段的组合来确保消息的唯一性。

4. 引入分布式锁

对于需要严格保证幂等性的场景,可以考虑在消费消息前引入分布式锁。通过分布式锁来确保同一时间只有一个消费者能处理某条消息。

实现步骤

  • 在处理消息前,尝试获取分布式锁(以消息的唯一标识作为锁键)。
  • 如果获取成功,则进行业务处理;如果获取失败,则说明有其他消费者正在处理该消息,直接跳过。

注意事项

  • 性能考虑:在实现幂等性时,要注意避免引入过多的数据库操作或分布式锁,以免影响系统的整体性能。
  • 容错性:要确保幂等性实现方案具有容错性,能够在各种异常情况下正确运行。
  • 业务逻辑适配:幂等性实现应紧密结合业务逻辑,确保在复杂业务场景下的有效性和正确性。

MQ如何快速处理积压的消息

1、消息积压会有哪些问题。
对RocketMQ和Kafka来说,他们的消息积压能力本来就是很强的,因此,短时间的消息积压,是没有太多问题的。但是需要注意,如果消息积压问题一直得不到解决,RocketMQ和Kafka在日志文件过期后,就会直接删除过期的日志文件。而这些日志文件上未消费的消息,就会直接丢失。
 
2、怎么处理大量积压的消息
产生消息积压的根本原因还是Consumer处理消息的效率太低,所以最核心的目标还是要提升Consumer消费消息的效率。如果不能从业务上提升Consumer消费消息的性能,那么最直接的办法就是针对处理消息比较慢的消费者组,增加更多的Consumer实例。但是这里需要注意一下,增加Consumer实例是不是会有上限。因为同一个消费者组下的多个Cosumer需要和对应Topic下的MessageQueue建立对应关系,而一个MessageQueue最多只能被一个Consumer消费,因此,增加的Consumer实例最多也只能和Topic下的MessageQueue个数相同。如果此时再继续增加Consumer的实例,那么就会有些Consumer实例是没有MessageQueue去消费的,因此也就没有用了。
这时,如果Topic下的MessageQueue配置本来就不够多的话,那就无法一直增加Consumer节点个数了。这时怎么处理呢?如果要快速处理积压的消息,可以创建一个新的Topic,配置足够多的MessageQueue。然后把Consumer实例的Topic转向新的Topic(在消费者代码中发消息),并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转存到新的Topic中。这个速度明显会比普通Consumer处理业务逻辑要快很多。然后在新的Topic上,就可以通过添加消费者个数来提高消费速度了。之后再根据情况考虑是否要恢复成正常情况。其实这种思路和RocketMQ内部很多特殊机制的处理方式是一样的。例如固定级别的延迟消息机制,也是把消息临时转到一个系统内部的Topic下,处理过后,再转回来。
http://www.xdnf.cn/news/15214.html

相关文章:

  • 基于定制开发开源AI智能名片与S2B2C商城小程序的旅游日志创新应用研究
  • FPGA实现SDI转LVDS视频发送,基于GTX+OSERDES2原语架构,提供2套工程源码和技术支持
  • Maui劝退:用windows直接真机调试iOS,无须和Mac配对
  • leetcode:518. 零钱兑换 II[完全背包]
  • Python 类型注解实战:`Optional` 与安全数据处理的艺术
  • 静态路由综合实验
  • GitHub敏感信息收集与防御指南
  • 人大金仓下载安装教程总结
  • 时间显示 蓝桥云课Java
  • 安卓应用启动崩溃的问题排查记录
  • P1722 矩阵 II 题解 DFS深度优先遍历与卡特兰数(Catalan number)解
  • 【实战】使用 ELK 搭建 Spring Boot Docker 容器日志监控系统
  • 【三维生成】FlashDreamer:基于扩散模型的单目图像到3D场景
  • 力扣-54.螺旋矩阵
  • “Datawhale AI夏令营”基于带货视频评论的用户洞察挑战赛
  • 敏捷测试中的质量闸门如何设置?
  • 【RL-VLM-F】算法框架图绘图学习笔记
  • 【PyTorch】PyTorch中的数据预处理操作
  • Java 与 MySQL 性能优化:MySQL连接池参数优化与性能提升
  • 7月10号总结 (1)
  • HTTP核心基础详解(附实战要点)
  • Android开发中几种scope的对比
  • 【TCP/IP】12. 文件传输协议
  • 力扣-73.矩阵置零
  • 如何安装python以及jupyter notebook
  • Rust中Option和Result详解
  • Unity WebGL文本输入
  • 【世纪龙科技】汽车信息化综合实训考核平台(机电方向)-学测
  • ClickHouse JSON 解析
  • Java代码块