21、MQ常见问题梳理
目录
⼀ 、MQ如何保证消息不丢失
1 、哪些环节可能会丢消息
2 、⽣产者发送消息如何保证不丢失
2.1、⽣产者发送消息确认机制
2.2、Rocket MQ的事务消息机制
2.3 、Broker写⼊数据如何保证不丢失
2.3.1** ⾸先需要理解操作系统是如何把消息写⼊到磁盘的**。
2.3.2然后来看MQ是如何调⽤fsync的
2.4 、Broker主从同步如何保证不丢失
2.5 、消费者消费消息如何不丢失
2.6 、如果MQ服务全部挂了, 如何保证不丢失
2.7 、MQ消息零丢失⽅案总结
面试题:说说你的项目RocketMQ如何保证消息不丢失?
⼆ 、MQ如何保证消息的顺序性
三、MQ如何保证消息幂等性
1 、⽣产者发送消息到服务端如何保持幂等
2 、消费者消费消息如何保持幂等
四、MQ如何快速处理积压的消息
1 、消息积压会有哪些问题。
2 、怎么处理⼤量积压的消息
请问RocketMQ消息积压一般产生原因是什么?如何解决消息积压问题呢?
五、Rocket MQ课程总结
⼀ 、MQ如何保证消息不丢失
1 、哪些环节可能会丢消息
⾸先分析下MQ的整个消息链路中,有哪些步骤是可能会丢消息的
其中, 1, 2 ,4三个场景都是跨⽹络的,⽽跨⽹络就肯定会有丢消息的可能。
然后关于3这个环节,通常MQ存盘时都会先写⼊操作系统的缓存page cache中,然后再由操作系统异步的将 消息写⼊硬盘 。这个中间有个时间差,就可能会造成消息丢失 。如果服务挂了,缓存中还没有来得及写⼊硬盘 的消息就会丢失。
2 、⽣产者发送消息如何保证不丢失
⽣产者发送消息之所以可能会丢消息,都是因为⽹络 。因为⽹络的不稳定性,容易造成请求丢失 。怎么解决这 样的问题呢?其实—个统—的思路就是⽣产者确认 。简单来说,就是⽣产者发出消息后,给⽣产者—个确定的 通知, 这个消息在Broker端是否写⼊完成了 。就好⽐打电话,不确定电话通没通,那就互相说个“ 喂 ”, 具体确认—下 。只不过基于这个同样的思路,各个MQ产品有不同的实现⽅式。
2.1、⽣产者发送消息确认机制
在Rocket MQ中,提供了三种不同的发送消息的⽅式:
异步发送, 不需要Broker确认 。效率很⾼ ,但是会有丢消息的可能。
// 异步发送, 不需要Broker确认 。效率很⾼ ,但是会有丢消息的可能。
producer.sendOneway(msg);
// 同步发送, ⽣产者等待Broker的确认 。消息最安全 ,但是效率很低。
SendResult sendResult = producer.send(msg, 20 * 1000);
// 异步发送, ⽣产者另起—个线程等待Broker确认, 收到Broker确认后直接触发回调⽅法 。消息安全和效率之间⽐较均 衡 ,但是会加⼤客户端的负担。
producer.send(msg, new SendCallback() { @Overridepublic void onSuccess(SendResult sendResult) {// do something}@Overridepublic void onException(Throwable e) {// do something}
});
与之类似的, Kafka也同样提供了这种同步和异步的发送消息机制。
//直接send发送消息, 返回的是—个Future。这就相当于是异步调⽤ 。
Future<RecordMetadata> future = producer.send(record)
//调⽤future的get⽅法才会实际获取到发送的结果 。⽣产者收到这个结果后, 就可以知道消息是否成功发到broker了。 这个过程就变成了—个同步的过程。
RecordMetadata recordMetadata = producer.send(record).get();
⽽在RabbitMQ中,则是提供了—个Publisher Confirms⽣产者确认机制 。其思路也是Publiser收到Broker的响 应后再出发对应的回调⽅法。
//获取channel
Channel ch = ...;
//添加两个回调, —个处理ack响应, —个处理nack响应
ch.addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback)
这些各种各样不同API的背后,都是—个统—的思路,就是给⽣产者响应,让⽣产者知道消息有没有发送成功 。如果没有发送成功,也由⽣产者⾃⾏进⾏补救 。可以重发,也可以向业务抛异常 。都由⽣产者⾃⾏处理。
2.2、Rocket MQ的事务消息机制
Rocket MQ提出了事务消息机制,其实也是保证⽣产者安全发送消息的利器 。事务消息机制的基本流程如下:
其实整体上来看, Rocket MQ的事务消息机制, 还是基于⽣产者确认构建的—种实现机制 。其核⼼思想, 还是 通过Broker主动触发⽣产者的回调⽅法,从⽽确认消息是否成功发送到了Broker 。只不过, 这⾥将—次确认变 成了多次确认 。在多次确认的过程中, 除了确认消息的安全性, 还给了⽣产者“反悔”的机会 。另外,事务消息 机制进—步将⽣产者确认与⽣产者的本地事务结合到了—起,从⽽让⽣产者确认这个机制有了更多的业务属性。
例如, 以最常⻅的电商订单场景为例,就可以在下完订单后,等到⽤户⽀付的过程中使⽤事务消息机制 。这样 可以保证本地下单和第三⽅⽀付平台⽀付这两个业务是事务性的,要么同时成功,就往下游发订单消息 。要么 就同时失败,不往下游发订单消息。
2.3 、Broker写⼊数据如何保证不丢失
接下来, Producer把消息发送到Broker上了之后, Broker是不是能够保证消息不丢失呢?这⾥也有—个核⼼的问题,那就是PageCache缓存。
数据会优先写⼊到缓存,然后过—段时间再写⼊到磁盘 。但是缓存中的数据有个特点,就是断电即丢失,所 以,如果服务器发⽣⾮正常断电, 内存中的数据还没有写⼊磁盘, 这时就会造成消息丢失。
怎么解决这个问题呢?
2.3.1** ⾸先需要理解操作系统是如何把消息写⼊到磁盘的**。
以Linux为例, ⽤户态的应⽤程序,不管是什么应⽤程序, 想要写⼊磁盘⽂件时,都只能调⽤操作系统提供的 write系统调⽤, 申请写磁盘 。⾄于消息如何经过PageCache再写⼊到磁盘中, 这个过程, 这个过程是在内核 态执⾏的,也就是操作系统⾃⼰执⾏的,应⽤程序⽆法⼲预 。这个过程中,应⽤系统唯—能够⼲预的,就是调 ⽤操作系统提供的sync系统调⽤, 申请—次刷盘操作, 主动将PageCache中的数据写⼊到磁盘。
>> man 2 write
WRITE(2) Linux Programmer 's
Manual
NAME
write - write to a file descriptor>> man 2 fsync
FSYNC(2) Linux Programmer 's
Manual
NAME
fsync, fdatasync - synchronize a file 's in-core state with storage device
2.3.2然后来看MQ是如何调⽤fsync的
先来看Rocket MQ:
Rocket MQ的Broker提供了—个很明确的配置项flush DiskType ,可以选择刷盘模式 。有两个可选项, SYNC_FLUSH 同步刷盘和ASYNC_FLUSH 异步刷盘。
所谓同步刷盘, 是指broker每往⽇志⽂件中写⼊—条消息,就调⽤—次刷盘操作 。⽽异步刷盘,则是指broker 每隔—个固定的时间,才去调⽤—次刷盘操作 。异步刷盘性能更稳定,但是会有丢消息的可能 。⽽同步刷盘的 消息安全性就更⾼ ,但是操作系统的IO压⼒就会⾮常⼤ 。
在Rocket MQ中,就算是同步刷盘,其实也并不是真的写—次消息就刷盘—次, 这在海量消息的场景下,操作 系统是撑不住的 。所以,我们在之前梳理Rocket MQ核⼼源码的过程中看到, Rocket MQ的同步刷盘的实现⽅ 式其实也是以10毫秒的间隔去调⽤刷盘操作 。从理论上来说,也还是会有⾮正常断电造成消息丢失的可能, 甚 ⾄严格意义上来说,任何应⽤程序都不可能完全保证断电消息不丢失 。但是, Rocket MQ的这—套同步刷盘机 制,却可以通过绝⼤部分业务场景的验证 。这其实就是—种平衡。
然后来看Kafka:
Kafka中并没有明显的同步刷盘和异步刷盘的区别,不过他暴露了—系列的参数,可以管理刷盘的频率。
flush.ms : 多⻓时间进⾏—次强制刷盘。
log.flush.interval.messages:表示当同—个Partiton的消息条数积累到这个数量时, 就会申请—次刷盘操作 。默 认是Long.MAX。
log.flush.interval.ms: 当—个消息在内存中保留的时间, 达到这个数量时, 就会申请—次刷盘操作 。他的默认值是 空 。如果这个参数配置为空 ,则⽣效的是下—个参数。
log.flush.scheduler.interval.ms:检查是否有⽇志⽂件需要进⾏刷盘的频率 。默认也是Long.MAX。
其实在这⾥⼤家可以思考下,对kafka来说,把log.flush.interval.messages参数设置成1 ,就是每写⼊—条消 息就调⽤—次刷盘操作, 这不就是所谓的同步刷盘了吗?
最后来看RabbitMQ:
关于消息刷盘问题, RabbitMQ官⽹给了更明确的说法 。那就是对于Classic经典对列, 即便声明成了持久化对 列, RabbitMQ的服务端也不会实时调⽤fsync, 因此⽆法保证服务端消息断电不丢失 。对于Stream流式对列, 则更加直接, RabbitMQ明确不会主动调⽤fsync进⾏刷盘,⽽是交由操作系统⾃⾏刷盘。
⾄于怎么办呢?他明确就建议了,如果对消息安全性有更⾼的要求,可以使⽤Publisher Confirms机制来进— 步保证消息安全 。这其实也是对Kafka和Rocket MQ同样适⽤的建议。
2.4 、Broker主从同步如何保证不丢失
对于Broker来说,通常Slave的作⽤就是做—个数据备份 。当Broker服务宕机了, 甚⾄是磁盘都坏了时,可以 从Slave上获取数据记录 。但是,如果主从同步失败了,那么Broker的这—层保证就会失效 。因此, 主从同步 也有可能造成消息的丢失。
我们这⾥重点来讨论—下, Rocket MQ的普通集群以及Dledger⾼可⽤集群。
先来看Rocket MQ的普通集群⽅案,在这种⽅案下,可以指定集群中每个节点的角⾊, 固定的作为Master或者 Slave。
在这种集群机制下,消息的安全性还是⽐较⾼的 。但是有—种极端的情况需要考虑 。因为消息需要从Master往 Slave同步, 这个过程是跨⽹络的, 因此也是有时间延迟的 。所以,如果Master出现⾮正常崩溃,那么就有可 能有—部分数据是已经写⼊到了Master但是还来得及同步到Slave 。这—部分未来得及同步的数据,在Rocket MQ的这种集群机制下,就会—直记录在Master节点上 。等到Master重启后,就可以继续同步了 。另外 由于Slave并不会主动切换成Master ,所以Master服务崩溃后,也不会有新的消息写进来, 因此也不会有消息 冲突的问题 。所以, 只要Mater的磁盘没有坏,那么在这种普通集群下, 主从同步通常不会造成消息丢失。
与之形成对⽐的是Kafka的集群机制 。在Kafka集群中,如果Leader Partition的服务崩溃了,那么,那些Follower Partition就会选举产⽣—个新的Leadr Partition 。⽽集群中所有的消息,都以Leader Partition的为准 。即便旧的Leader Partition重启了,也是作为Follower Partition启动, 主动删除掉⾃⼰的HighWater之后的 数据,然后从新的Leader Partition上重新同步消息 。这样,就会造成那些已经写⼊旧的Leader Partition但是 还没来得及同步的消息,就彻底丢失了。
Rocket MQ和Kafka之间的这种差异,其实还是来⾃于他们处理MQ问题的初衷不同 。Rocket MQ诞⽣于阿⾥的 ⾦融体系,天⽣对消息的安全性⽐较敏感 。⽽Kafka诞⽣于LinkedIn的⽇志收集体系,天⽣对服务的可⽤性要 求更⾼ 。这也体现了不同产品对业务的取舍。
然后来看下Rocket MQ的Dledger⾼可⽤集群 。在Rocket MQ中, 直接使⽤基于Raft协议的Dledger来保存 CommitLog消息⽇志 。也就是说他的消息会通过Dledger的Raft协议,在主从节点之间同步。
⽽关于Raft协议, 之前章节做给分析,他是—种基于两阶段的多数派同意机制 。每个节点会将客户端的治指令 以Entry的形式保存到⾃⼰的Log⽇志当中 。此时Entry是uncommited状态 。当有多数节点统统保存了Entry后,就可以执⾏Entry中的客户端指令,提交到StateMachine状态机中 。此时Entry更新为commited状态。
他优先保证的是集群内的数据—致性,⽽并不是保证不丢失 。在某些极端场景下, ⽐如出现⽹络分区情况时, 也会丢失—些未经过集群内确认的消息 。不过,基于Rocket MQ的使⽤场景, 这种丢失消息的可能性⾮常⼩ 。 另外, 之前也提到过, 这种服务端⽆法保证消息安全的问题,其实结合客户端的⽣产者确认机制, 是可以得到 ⽐较好的处理的 。因此,在Rocket MQ中使⽤Dledger集群的话,数据主从同步这个过程,数据安全性还是⽐ 较⾼的 。基本可以认为不会造成消息丢失。
2.5 、消费者消费消息如何不丢失
最后,消费者消费消息的过程中, 需要从Broker上拉取消息, 这些消息也是跨⽹络的,所以拉取消息的请求也 可能丢失 。这时,会不会有丢消息的可能呢?
⼏乎所有的MQ产品都设置了消费状态确认机制 。也就是消费者处理完消息后, 需要给Broker—个响应,表示 消息被正常处理了 。如果Broker端没有拿到这个响应,不管是因为Consumer没有拿到消息, 还是Consumer 处理完消息后没有给出相应, Broker都会认为消息没有处理成功 。之后, Broker就会向Consumer重复投递这 些没有处理成功的消息 。Rocket MQ和Kafka是根据Offset机制重新投递,⽽RabbitMQ的Classic Queue经典 对列,则是把消息重新⼊队 。因此,正常情况下, Consumer消费消息这个过程, 是不会造成消息丢失的,相 反,可能需要考虑下消息幂等的问题。
但是, 这也并不是说消费者消费消息不可能丢失 。例如像下⾯这种情况, Consumer异步处理消息,就有可能 造成消息丢失。
consumer.registerMessageListener(new MessageListenerConcurrently{ @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>msgs,ConsumeConcurrentlyContext context) {new Thread(){public void run(){//处理业务逻辑System.out.printf("%s Receive New Messages: &s %n", Thread.currentThread() .getN }};return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
这⾥需要注意的是,通常在开发过程中,不太会这么直⽩的使⽤多线程异步机制去处理问题 。但是,很有可能 在处理业务时,使⽤—些第三⽅框架来处理消息 。他们是不是使⽤的多线程异步机制,那就不能确定了 。所以,线程并发,在任何业务场景下,都是必不可少的基本功。
2.6 、如果MQ服务全部挂了, 如何保证不丢失
最后有—种⼩概率的极端情况,就是MQ的服务全部挂掉了, 这时,要如何保证业务能够继续稳定进⾏, 同时 业务数据不会丢失呢?
通常的做法是设计—个降级缓存 。Producer往MQ发消息失败了,就往降级缓存中写,然后,依然正常去进⾏后续的业务。
此时,再启动—个线程,不断尝试将降级缓存中的数据往MQ中发送 。这样, ⾄少当MQ服务恢复过来后, 这些 消息可以尽快进⼊到MQ中,继续往下游Conusmer推送,⽽不⾄于造成消息丢失。
2.7 、MQ消息零丢失⽅案总结
最后要注意到, 这⾥讨论到的各种MQ消息防⽌丢失的⽅案,其实都是以增加集群负载, 降低吞吐为代价的。
这必然会造成集群效率下降 。因此, 这些保证消息安全的⽅案通常都需要根据业务场景进⾏灵活取舍,⽽不是 —股脑的直接⽤上。
这⾥希望你能够理解到, 这些消息零丢失⽅案,其实是没有最优解的 。因为如果有最优解,那么这些MQ产品,就不需要保留各种各样的设计了 。这和很多⾯试⼋股⽂是有冲突的 。⾯试⼋股⽂强调标准答案,⽽实际业 务中, 这个问题是没有标准答案的,—切,都需要根据业务场景去调整
面试题:说说你的项目RocketMQ如何保证消息不丢失?
RocketMQ通过多层面的机制来确保消息的可靠性,包括生产者端、broker端和消费者端。
1. 生产者端保证
a. 同步发送
同步发送是最可靠的发送方式,它会等待broker的确认响应。
b. 异步发送 + 重试机制
异步发送通过回调来处理发送结果,并可以设置重试次数。
2.Broker端保证
a. 同步刷盘,通过配置broker.conf文件,可以启用同步刷盘:
brokerRole = SYNC_MASTER
3. 消费者端保证
a. 手动提交消费位移,使用手动提交可以确保消息被正确处理后再提交位移。
b. 幂等性消费,在消费端实现幂等性处理,确保重复消费不会导致业务问题。
通过这些机制的组合,RocketMQ能够在各个环节保证消息的可靠性,极大地降低了消息丢失的风险。在实际应用中,可以根据业务需求选择合适的配置和实现方式,以在可靠性和性能之间取得平衡。
⼆ 、MQ如何保证消息的顺序性
这⾥⾸先需要明确的是,通常讨论MQ的消息顺序性,其实是在强调局部有序,⽽不是全局有序 。就好⽐QQ和 微信的聊天消息,通常只要保证同—个聊天窗⼝内的消息是严格有序的 。⾄于不同窗口之间的消息,顺序出了点偏差,其实是⽆所谓的 。所谓全局有序,通常在业务上没有太多的使⽤场景 。在Rocket MQ和Kafka中把Topic的分区数设置成1, 这类强⾏保证消息全局有序的⽅案,纯属思维体操。
那么怎么保证消息局部有序呢?最典型的还是Rocket MQ的顺序消费机制。
这个机制需要两个⽅⾯的保障。
- 1 、Producer将—组有序的消息写⼊到同—个MessageQueue中。
- 2 、Consumer每次集中从—个MessageQueue中拿取消息。
在Producer端, Rocket MQ和Kafka都提供了分区计算机制,可以让应⽤程序⾃⼰决定消息写⼊到哪—个分区 。所以这—块, 是由业务⾃⼰决定的 。只要通过定制数据分⽚算法,把—组局部有序的消息发到同—个对列 当中,就可以通过对列的FI FO特性,保证消息的处理顺序 。对于RabbitMQ ,则可以通过维护Exchange与Queue之间的绑定关系,将这—组局部有序的消息转发到同—个对列中,从⽽保证这—组有序的消息,在 RabbitMQ内部保存时, 是有序的。
在Conusmer端, Rocket MQ是通过让Consumer注⼊不同的消息监听器来进⾏区分的 。⽽具体的实现机制,在 之前章节分析过,核⼼是通过对Consumer的消费线程进⾏并发控制,来保证消息的消费顺序的 。类⽐到Kafka 呢 。Kafka中并没有这样的并发控制 。⽽实际上, Kafka的Consumer对某—个Partition拉取消息时,天⽣就是 单线程的,所以,参照Rocket MQ的顺序消费模型, Kafka的Consumer天⽣就是能保证局部顺序消费的。
⾄于RabbitMQ, 以他的Classic Queue经典对列为例,他的消息被—个消费者从队列中拉取后,就直接从队列 中把消息删除了 。所以,基本不存在资源竞争的问题 。那就简单的是—个队列只对应—个Consumer ,那就是 能保证顺序消费的 。如果—个队列对应了多个Consumer, 同—批消息,可能会进⼊不同的Consumer处理,所以也就没法保证消息的消费顺序
三、MQ如何保证消息幂等性
1 、⽣产者发送消息到服务端如何保持幂等
Producer发送消息时,如果采⽤发送者确认的机制,那么Producer发送消息会等待Broker的响应 。如果没有 收到Broker的响应, Producer就会发起重试 。但是, Producer没有收到Broker的响应,也有可能是Broker已 经正常处理完了消息, 只不过发给Producer的响应请求丢失了 。这时候Producer再次发起消息重试,就有可能造成消息重复。
Rocket MQ的处理⽅式, 是会在发送消息时,给每条消息分配一个唯一的ID。
//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
//for MessageBatch,ID has been set in the generating processif ( !(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}public static void setUniqID(final Message msg) {if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null){msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,createUniqID());}
}
通过这个ID ,就可以判断消息是否重复投递。
⽽对于Kafka ,则会通过他的幂等性配置, 防⽌⽣产者重复投递消息造成的幂等性问题。
在Kafka中, 需要打开idempotence幂等性控制后(默认是打开的,但是如果其他配置有冲突,会影响幂等性配 置) 。Kafka为了保证消息发送的Exactly-once语义,增加了⼏个概念:
- . PID:每个新的Producer在初始化的过程中就会被分配—个唯—的PID 。这个PID对⽤户是不可⻅的。
- . Sequence Numer: 对于每个PID, 这个Producer针对Partition会维护—个sequenceNumber 。这是—个从 0开始单调递增的数字 。当Producer要往同—个Partition发送消息时, 这个Sequence Number就会加1 。然后会随着消息—起发往Broker。
- . Broker端则会针对每个<PID,Partition>维护—个序列号( SN), 只有当对应的SequenceNumber = SN+1 时, Broker才会接收消息, 同时将SN更新为SN+1 。否则,SequenceNumber过⼩就认为消息已经写⼊了,不需要再重复写⼊ 。⽽如果SequenceNumber过⼤ ,就会认为中间可能有数据丢失了 。对⽣产者就会 抛出—个OutOfOrderSequenceException。
2 、消费者消费消息如何保持幂等
这⾥以Rocket MQ来讨论如何防⽌消费者多次重复消费同—条消息。
⾸先,关于消息会如何往消费者投递 。Rocket MQ官⽹明确做了回答:
也就是说,在⼤多数情况下,不需要单独考虑消息重复消费的问题 。但是, 同样, 这个回答⾥也说明了,存在 —些⼩概率情况, 需要单独考虑消费者的消息幂等问题。
⾄于有哪些⼩概率情况呢?最典型的情况就是⽹络出现波动的时候 。Rocket MQ是通过消费者的响应机制来推进offset的,如果consumer从broker上获取了消息,正常处理之后,他要往broker返回—个响应,但是如果⽹ 络出现波动,consumer从broker上拿取到了消息,但是等到他向broker发响应时,发⽣⽹络波动, 这个响应 丢失了,那么就会造成消息的重复消费 。因为broker没有收到响应,就会向这个Consumer所在的Group重复投递消息。
然后,消费者如何防⽌重复消费呢?
防⽌重复消费, 最主要是要找到—个唯—性的指标 。在Rocket MQ中, Producer发出—条消息后, Rocket MQ 内部会给每—条消息分配—个唯—的messageId 。⽽这个messageId在Consumer中是可以获取到的 。所以⼤ 多数情况下, 这个messageId就是—个很好的唯—性指标 。Consumer只要将处理过的messageId记录下来, 就可以判断这条消息之前有没有处理过。
但是同样也有—些特殊情况 。如果Producer是采⽤批量发送,或者是事务消息机制发送,那么这个messageId 就没有那么好控制了 。所以,如果在真实业务中,更建议根据业务场景来确定唯—指标 。例如,在电商下单的 场景,订单ID就是—个很好的带有业务属性的唯—指标 。在使⽤Rocket MQ时,可以使⽤message的key属性 来传递订单ID 。这样Consumer就能够⽐较好的防⽌重复消费。
最后,对于幂等性问题, 除了要防⽌重复消费外, 还需要防⽌消费丢失 。也就是Consumer—直没有正常消费 消息的情况。
在Rocket MQ中, 重复投递的消息,会单独放到以消费者组为维度构建的重试对列中 。如果经过多次重试后还 是⽆法被正常消费,那么最终消息会进⼊到消费者组对应的死信对列中 。也就是说,如果Rocket MQ中出现了 死信对列,那么就意味着有—批消费者的逻辑是—直有问题的, 这些消息始终⽆法正常消费 。这时就需要针对 死信对列, 单独维护—个消费者,对这些错误的业务消息进⾏补充处理 。这⾥需要注意—下的是, Rocket MQ 中的死信对列,默认权限是⽆法消费的, 需要⼿动调整权限才能正常消费。
四、MQ如何快速处理积压的消息
1 、消息积压会有哪些问题。
对Rocket MQ和Kafka来说,他们的消息积压能⼒本来就是很强的, 因此,短时间的消息积压, 是没有太多问题 的 。但是需要注意,如果消息积压问题—直得不到解决, Rocket MQ和Kafka在⽇志⽂件过期后,就会直接删除 过期的⽇志⽂件 。⽽这些⽇志⽂件上未消费的消息,就会直接丢失。
⽽对RabbitMQ来说, Classic Queue经典对列和Quorum Queue仲裁对列,如果有⼤量消息积压,未被消费,就会严重影响服务端的性能, 因此需要重点关注 。⽽⾄于Stream Queue流式对列,整体的处理机制已经 和Rocket MQ与Kafka ⽐较相似了,对消息积压的承受能⼒就会⽐较强 。但是还是需要注意和Rocket MQ与Kafka相同的问题。
2 、怎么处理⼤量积压的消息
产⽣消息积压的根本原因还是Consumer处理消息的效率太低,所以最核⼼的⽬标还是要提升Consumer消费消息的效率 。如果不能从业务上提升Consumer消费消息的性能,那么最直接的办法就是针对处理消息⽐较慢 的消费者组,增加更多的Consumer实例 。但是这⾥需要注意—下,增加Consumer实例是不是会有上限。
对于RabbitMQ ,如果是Classic Queue经典对列,那么针对同—个Queue的多个消费者, 是按照Work Queue 的模式,在多个Consuemr之间依次分配消息的 。所以这时,如果Consumer消费能⼒不够,那么直接加更多 的Consumer实例就可以了 。这⾥需要注意下的是如果各个Consumer实例他们的运⾏环境,或者是处理消息 的速度有差别 。那么可以优化—下每个Consumer的⽐重(Qos属性) ,从⽽尽量⼤的发挥Consumer实例的性能。
⽽对于Rocket MQ, 因为同—个消费者组下的多个Cosumer需要和对应Topic下的MessageQueue建⽴对应关 系,⽽—个MessageQueue最多只能被—个Consumer消费, 因此,增加的Consumer实例最多也只能和Topic 下的MessageQueue个数相同 。如果此时再继续增加Consumer的实例,那么就会有些Consumer实例是没有 MessageQueue去消费的, 因此也就没有⽤了。
这时,如果Topic下的MessageQueue配置本来就不够多的话,那就⽆法—直增加Consumer节点个数了 。这时 怎么处理呢?如果要快速处理积压的消息,可以创建—个新的Topic ,配置⾜够多的MessageQueue 。然后把 Consumer实例的Topic转向新的Topic ,并紧急上线—组新的消费者, 只负责消费旧Topic中的消息,并转存到 新的Topic中 。这个速度明显会⽐普通Consumer处理业务逻辑要快很多 。然后在新的Topic上,就可以通过添 加消费者个数来提⾼消费速度了 。之后再根据情况考虑是否要恢复成正常情况。
其实这种思路和Rocket MQ内部很多特殊机制的处理⽅式是—样的 。例如固定级别的延迟消息机制,也是 把消息临时转到—个系统内部的Topic下,处理过后,再转回来。
⾄于Kafka ,也可以采⽤和Rocket MQ相似的处理⽅式。
请问RocketMQ消息积压一般产生原因是什么?如何解决消息积压问题呢?
一般消息出现堆积原因有:
● 消费者消息处理逻辑异常,导致消息无法正常消费。
● 消息生产应用出现突发流量,消息生产速度远大于消费速度,消息来不及消费出现堆积。
● 消费者依赖的下游服务耗时变长,消费线程阻塞等。
● 消费线程不够,消费并发度较小,消费速度跟不上生产速度。
解决方案有:
(1)确认消息的消费耗时是否合理,通过打印堆栈日志信息分析如果是消费耗时较长,可以参考出来解决方案:
1. 分析和优化业务逻辑
● 简化逻辑:仔细分析业务逻辑,去除不必要的步骤和复杂计算。
● 分解任务:将复杂的任务分解为多个简单的子任务,逐步处理。
● 异步处理:对于不需要立即完成的任务,考虑使用异步处理,将其放到后台执行。
2. 使用并行和并发技术
● 多线程处理:在消费者内部使用多线程来并行处理消息。
● 批量处理:如果业务允许,合并多条消息进行批量处理,减少处理次数。
3. 优化I/O操作
● 数据库优化:优化数据库查询,使用索引、减少查询次数或使用批量操作。
● 缓存使用:对于频繁访问的数据,使用缓存来减少数据库或外部服务的访问次数。
● 网络优化:减少网络请求的次数和延迟,使用更高效的协议或配置
(2)如果消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决。
(3)设置消息过期时间
在消息发送时设置TTL,消息在队列中超过一定时间后自动过期并被丢弃。这样可以确保系统不会处理过期的消息。这个要看具体的业务场景。
五、Rocket MQ课程总结
所有的MQ ,其实处理的都是—类相似的问题 。但是, 互联⽹却诞⽣了不下⼏⼗种MQ的产品 。为什么都做着差 不多的功能,但是却有这么多不同的产品呢?这就需要对MQ的业务场景进⾏逐步的深度挖掘 。把业务问题理 解得越深刻,那么对这些不同产品的理解才会更深刻, ⽇后处理各种各样的业务问题,也才会有更多的可选⽅ 案,或者,换种说法,就是经验 。这才是程序员最⼤的价值。