当前位置: 首页 > ops >正文

中间件-MQ常见问题

MQ常见问题

  • 消息丢失
      • 消息会在哪些环节丢失
      • 应对机制
  • 消息的顺序性
  • 消息幂等
  • 消息积压的处理

消息丢失

消息会在哪些环节丢失

  1. 网络传输环节:生产者发送消息到broker,broker中master同步消息给slave,consumer消费消息,这3个环节都是跨网络传输,可能造成消息丢失。
  2. 缓存信息刷盘环节:MQ存盘时都会先写⼊操作系统的缓存pagecache中,然后再由操作系统异步的将消息写⼊硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写⼊硬盘的消息就会丢失。

应对机制

  1. 生产者发送消息环节:消息确认机制。使用同步发送机制,可以收到broker端的发送成功确认。该方法消息安全但是效率低。
  2. 消息同步环节:普通master-slave集群下,master挂掉后slave不会主动切换未master,等master再次启动后,消息不会丢失,但这种方式牺牲了可用性。高可用Dledger集群,消息要写入半数以上节点才会通知客户端写成功,消息安全性比较高,可以认为不会丢失消息。但是在脑裂情况下,也有可能会丢失消息。
  3. 刷盘环节:RocketMQ的Broker提供了⼀个很明确的配置项flushDiskType,可以选择刷盘模式。有两个可选项,SYNC_FLUSH同步刷盘和ASYNC_FLUSH异步刷盘。所谓同步刷盘,是指broker每往⽇志⽂件中写⼊⼀条消息,就调⽤⼀次刷盘操作,其实也是以10毫秒的间隔去调⽤刷盘操作。⽽异步刷盘,则是指broker每隔⼀个固定的时间,才去调⽤⼀次刷盘操作。异步刷盘性能更稳定,但是会有丢消息的可能。⽽同步刷盘的消息安全性就更⾼,但是操作系统的IO压⼒就会⾮常⼤。从理论上来说,也还是会有⾮正常断电造成消息丢失的可能,甚⾄严格意义上来说,任何应⽤程序都不可能完全保证断电消息不丢失。
  4. 消费环节:消费状态确认机制。也就是消费者处理完消息后,需要给Broker⼀个响应,
    表示消息被正常处理了。如果Broker端没有拿到这个响应,不管是因为Consumer没有拿到消息,还是Consumer处理完消息后没有给出相应,Broker都会认为消息没有处理成功。之后,Broker就会向Consumer重复投递这些没有处理成功的消息。RocketMQ是把消费失败的消息方式重试队列里面重新推送。如果Consumer给Broker返回了消费成功,用异步的方式去处理消息,这种情况可能会丢失消息。
  5. 如果MQ服务全挂掉了,想继续保持业务运行,又想不丢失消息,通常的做法是设计⼀个降级缓存。Producer往MQ发消息失败了,就往降级缓存中写,然后,依然正常去进⾏后续的业务。此时,再启动⼀个线程,不断尝试将降级缓存中的数据往MQ中发送。这样,⾄少当MQ服务恢复过来后,这些消息可以尽快进⼊到MQ中,继续往下游Conusmer推送,⽽不⾄于造成消息丢失。
    在这里插入图片描述

消息的顺序性

通常讨论MQ的消息顺序性,其实是在强调局部有序,⽽不是全局有序。比如某个业务流程的消息有序。
RocketMQ的顺序消费机制:Producer通过设置MessageQueueSelector将⼀组有序的消息写⼊到同⼀个MessageQueue中。Consumer使用Orderly的消费方式每个线程集中从⼀个MessageQueue中拿取消息。

消息幂等

  1. 消息的重复发送:Producer发送消息时,如果采⽤发送者确认的机制,那么Producer发送消息会等待Broker的响应。如果没有收到Broker的响应,Producer就会发起重试。但是,Producer没有收到Broker的响应,也有可能是Broker已经正常处理完了消息,只不过发给Producer的响应请求丢失了。这时候Producer再次发起消息重试,就有可能造成消息重复。RocketMQ的处理⽅式,是会在发送消息时,给每条消息分配⼀个唯⼀的ID。重试时broker可以根据msgId判断是否已经处理。
  2. 消息的重复消费:RocketMQ是通过消费者的响应机制来推进offset的,如果consumer从broker上获取了消息,正常处理之后,他要往broker返回⼀个响应,但是如果⽹络出现波动,consumer从broker上拿取到了消息,但是等到他向broker发响应时,发⽣⽹络波动,这个响应丢失了,那么就会造成消息的重复消费。因为broker没有收到响应,就会向这个Consumer所在的Group重复投递消息。Comsumer端可以使用msgId进行唯一性控制,或者更严格的可以使⽤message的key属性写入业务的唯一属性来控制。

消息积压的处理

产⽣消息积压的根本原因还是Consumer处理消息的效率低于消息产生的速度。所以处理消息积压第一个可以优化Consumer处理消息的效率,第二个可以增加Consumer实例的个数。但是增加Consumer实例个数是有上限的。RocketMQ中一个Topic下的MessageQueue只能由一个消费者绑定,因此如果消费者实例数最多增加到等于MessageQueue的个数。如果此时再继续增加Consumer的实例,那么就会有些Consumer实例是没有MessageQueue去消费的,因此也就没有⽤了。
如果要快速处理积压的消息,可以创建⼀个新的Topic,配置⾜够多的MessageQueue。
然后把Consumer实例的Topic转向新的Topic,并紧急上线⼀组新的消费者,只负责消费旧Topic中的消息,并转存到新的Topic中。这个速度明显会⽐普通Consumer处理业务逻辑要快很多。然后在新的Topic上,就可以通过添加消费者个数来提⾼消费速度了。之后再根据情况考虑是否要恢复成正常情况。

http://www.xdnf.cn/news/6246.html

相关文章:

  • TCP实现安全传输的核心机制 + TCP的报文讲解(全程图文讲解)
  • UEFI Spec 学习笔记---33 - Human Interface Infrastructure Overview(1)
  • 量化交易 - 网格交易策略实现与原理解析
  • 随机森林(Random Forest)
  • EasyExcel详解
  • 用户态和内核态
  • 2、ubantu系统配置OpenSSH | 使用vscode或pycharm远程连接
  • 用MCP往ppt文件里插入系统架构图
  • Servlet原理
  • 获取淘宝商品图片的完整指南
  • 3D曲面上的TSP问题(一):曲面上点集距离求解
  • EdgeShard:通过协作边缘计算实现高效的 LLM 推理
  • 华为Watch的ECG功能技术分析
  • SQLMesh 模型管理指南:从创建到验证的全流程解析
  • 部署安装jenkins.war(2.508)
  • Golang
  • CSS 溢出内容处理、可见性控制与盒类型设置深度解析
  • 多链互操作性标准解析:构建下一代区块链互联生态
  • 从AlphaGo到ChatGPT:AI技术如何一步步改变世界?
  • 【现代深度学习技术】注意力机制07:Transformer
  • AI时代的弯道超车之第十四章:AI与生活和生命的改变
  • 主流快递查询API横向对比:快递100快递鸟菜鸟物流接口差异解析
  • 《数字分身进化论:React Native与Flutter如何打造沉浸式虚拟形象编辑》
  • 蓝桥杯12届国B 完全日期
  • IP地址查询助力业务增长
  • 【MySQL】mysql/bin目录下程序介绍
  • Python训练营打卡——DAY25(2025.5.14)
  • Python对于可变对象和不可变对象的理解(主要理解代码中的注释)
  • Unity 小提示与小技巧
  • 【GESP真题解析】第 4 集 GESP 一级 2023 年 3 月编程题 1:每月天数