当前位置: 首页 > java >正文

rocketMQ

RocketMQ 简介

RocketMQ 是阿里巴巴 2016 年 MQ 中间件,使用 Java 语言开发,RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。 同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、 快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。 具有以下特点: 1. 能够保证严格的消息顺序 2. 提供丰富的消息拉取模式 3. 高效的订阅者水平扩展能力 4. 实时的消息订阅机制 5. 亿级消息堆积能力

为什么要使用 MQ

1,要做到系统解耦,当新的模块进来时,可以做到代码改动最小; 能够解耦

2,设置流程缓冲池,可以让后端系统按自身吞吐能力进行消费,不被冲垮; 能够削峰,限流

3,强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

Mq 的作用 削峰限流 异步 解耦合

RocketMQ 重要概念

Producer:消息的发送者,生产者;举例:发件人

Consumer:消息接收者,消费者;举例:收件人

Broker:暂存和传输消息的通道;举例:快递

NameServer:管理 Broker;举例:各个快递公司的管理机构相当于 broker 的注册中心,保留了 broker 的信息

Queue:队列,消息存放的位置,一个 Broker 中可以有多个队列

Topic:主题,消息的分类

ProducerGroup:生产者组

ConsumerGroup:消费者组,多个消费者组可以同时消费一个主题的消息

消息发送的流程是,Producer 询问 NameServer,NameServer 分配一个 broker 然后 Consumer 也要询问 NameServer,得到一个具体的 broker,然后消费消息

在这里插入图片描述

生产和消费理解

在这里插入图片描述

RocketMQ整合SpringBoot

引入依赖

		<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>

生产者

rocketmq:name-server: localhost:9876producer:group: boot-producer-group
public interface MqConstant {String TOPIC = "bootTopic";String TOPIC_TAG = "bootTopic:tagA";
}
@SpringBootTest
class BRocketmqBootProducerApplicationTests {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 重试机制/*** 重试的时间间隔 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";* 默认重试16次* 2.如果重试了16次(并发模式) 顺序模式下(int最大值次)都是失败的? 是一个死信消息 则会放在一个死信主题中去 主题的名称: %DLQ%rocketmq-consumer-group* --------* 再实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理*//*** 发送的是同步消息* rocketMQTemplate.syncSend()* rocketMQTemplate.send()* rocketMQTemplate.convertAndSend()* 这三种发送消息的方法,底层都是调用syncSend*//*** 测试发送简单的消息*/@Testpublic void testSimpleMsg() {SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.TOPIC, "我是一个同步简单消息");System.out.println(sendResult.getSendStatus());System.out.println(sendResult.getMsgId());System.out.println(sendResult.getMessageQueue());}/*** 测试发送对象消息*/@Testpublic void testObjectMsg() {Person person = new Person(IdUtil.simpleUUID(), "lxx", 24);rocketMQTemplate.syncSend(MqConstant.TOPIC, person);}/*** 测试发送集合消息*/@Testpublic void testCollectionMsg() {List<Person> list = new ArrayList<>();Person person1 = new Person(IdUtil.simpleUUID(), "lxx", 24);Person person2 = new Person(IdUtil.simpleUUID(), "sanhao", 18);list.add(person1);list.add(person2);rocketMQTemplate.syncSend(MqConstant.TOPIC, list);}/*** 发送异步消息* rocketMQTemplate.asyncSend()*//*** 测试异步发送消息** @throws Exception*/@Testpublic void testAsyncSend() throws Exception {// 发送异步消息,发送完以后会有一个异步通知rocketMQTemplate.asyncSend(MqConstant.TOPIC, "发送一个异步消息", new SendCallback() {/*** 成功的回调** @param sendResult*/@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功");}/*** 失败的回调** @param throwable*/@Overridepublic void onException(Throwable throwable) {System.out.println("发送失败");}});// 测试一下异步的效果System.out.println("谁先执行");// 挂起jvm 不让方法结束System.in.read();}/*** 测试单向消息*/@Testpublic void testOnWay() {// 发送单向消息,没有返回值和结果rocketMQTemplate.sendOneWay(MqConstant.TOPIC, "这是一个单向消息");}/*** 测试延迟消息*/@Testpublic void testDelay() {// 构建消息对象Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();// 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费SendResult sendResult = rocketMQTemplate.syncSend(MqConstant.TOPIC, message, 2000, 4);System.out.println(sendResult.getSendStatus());}/*** 测试顺序消费* mq会根据hash的值来存放到一个队列里面去* <p>* 消费者监听类:** @RocketMQMessageListener(topic = "bootTopic", consumerGroup =* "boot-consumer-group", consumeMode = ConsumeMode.ORDERLY)* consumeMode 指定消费类型 :* 1. CONCURRENTLY 并发消费 (默认值)* 2. ORDERLY 顺序消费 messages orderly. one queue, one thread*/@Testpublic void testOrderly() {List<Order> orders = Arrays.asList(new Order(IdUtil.simpleUUID(), "张三的下订单", null, null, null, 1),new Order(IdUtil.simpleUUID(), "张三的发短信", null, null, null, 1),new Order(IdUtil.simpleUUID(), "张三的物流", null, null, null, 1),new Order(IdUtil.simpleUUID(), "张三的签收", null, null, null, 1),new Order(IdUtil.simpleUUID(), "李四的下订单", null, null, null, 2),new Order(IdUtil.simpleUUID(), "李四的发短信", null, null, null, 2),new Order(IdUtil.simpleUUID(), "李四的物流", null, null, null, 2),new Order(IdUtil.simpleUUID(), "李四的签收", null, null, null, 2));// 我们控制流程为 下订单->发短信->物流->签收  hash的值为seq,也就是说 seq相同的会放在同一个队列里面,顺序消费orders.forEach(order -> {rocketMQTemplate.syncSendOrderly(MqConstant.TOPIC, order, String.valueOf(order.getSeq()));});}/*** 发送一个带tag的消息** @throws Exception* @RocketMQMessageListener(topic = "bootTopic", consumerGroup =* "boot-consumer-group", selectorType = SelectorType.TAG, selectorExpression = "tagA || tagB")* <p>* selectorType = SelectorType.TAG,属性指定消费的选择类型为Tag,这个类型也是selectorType属性的默认值(也可以使用sql92 需要在配置文件broker.conf中开启enbalePropertyFilter=true)* selectorExpression = "tagA || tagB"属性来选择消费的Tag。默认是*,支持"tag1 || tag2 || tag3"* <p>*/@Testpublic void testTagMsg() {rocketMQTemplate.syncSend(MqConstant.TOPIC_TAG, "我是一个带tag的消息");}/*** 发送一个带key的消息*/@Testpublic void testKeyMsg() {// key是写在消息头的Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, "lxx666").build();rocketMQTemplate.syncSend(MqConstant.TOPIC, message);}}

消费者

server:port: 8081rocketmq:name-server: localhost:9876
@Component
@RocketMQMessageListener(topic = "bootTopic",consumerGroup = "boot-consumer-group",consumeMode = ConsumeMode.CONCURRENTLY, // 消费类型 CONCURRENTLY 并发消费 ORDERLY 顺序消费messageModel = MessageModel.CLUSTERING) // 消费模式 CLUSTERING 负载均衡模式 BROADCASTING 广播模式
public class BaseConsumerListener implements RocketMQListener<MessageExt> {/*** 消费消息的方法*/@Overridepublic void onMessage(MessageExt messageExt) {System.out.println(new String(messageExt.getBody()));System.out.println(messageExt.getKeys());}
}

RocketMQ 消息重复消费问题

为什么

broadcast(广播)模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。 clustering(负载均衡)模式下,如果一个 topic 被多个 consumerGroup 消费,也会重复消费。 即使是在 clustering模式下,同一个 consumerGroup 下,一个队列只会分配给一个消费者, 看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的 offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把 offset 提交给 broker,那么新的消费者可能会重新消费一次。虽然 orderly 模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起 concurrently 要严格了,但是加锁的线程和提交 offset 的线程不是同一个,所以还是会出现极端情况下的重复消费。 还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。

那么如果在 clustering(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是 msgId 也可以是你自定义的唯一的key-业务唯一键(订单号),这样就可以去重了

怎么做

使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯 一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存。

1、可以选择布隆过滤器(BloomFilter)
2、可以设计去重表使用mysql唯一性索引,每次消费处理业务逻辑之前插入自定义唯一的key到去重表中,成功即处理业务逻辑,失败则为重复消息。

RocketMQ 消息堆积解决方案

1、生产消息太快了

  • 生产方可以做业务限流
  • 增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量 (根据IO密集型 ( 2n ) / CPU密集型 ( n+1 ) )
  • 动态扩容队列数量,从而增加消费者数量

2、消费者消费出现问题

  • 排查消费者程序的问题

RocketMQ 如何确保消息不丢失

RocketMQ 通过多种机制在消息生命周期的各个阶段(生产、存储、消费)保障消息不丢失,具体方案如下:

生产者

同步发送 + 重试机制 : 生产者使用 send() 同步发送,等待 Broker 返回写入确认(ACK)。若发送失败或超时,自动重试(默认重试 2 次,可配置)。

Broker

  • 刷盘模式
    • 同步刷盘: 在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。优点:机器宕机消息也会被保留,缺点:写入速度慢,吞吐量小。
    • 异步刷盘: 在返回写成功状态时,消息只是被写入了内存的PAGECACHE,这样的好处是写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入,缺点是一旦机器宕机消息可能会丢失
  • 复制模式
    • 同步复制
      生产者发送消息后,Master 接收到存储消息请求,将消息数据同步给 Slave ,直到master和slave节点都写入成功(写入磁盘或者内存,看刷盘方式),才将成功结果返回给生产者。同步复制模式下,发送消息会有一定延迟,系统吞吐量也会降低。
    • 异步复制
      生产者发送消息后,Master 接收到存储消息请求,将消息写入成功后(写入磁盘或者内存,看刷盘方式),直接将成功结果返回给生产者。 Master 和 Slave 再通过异步的方式同步数据,这种复制模式具有较小的延迟,可以实现比较高的吞吐量。若 Master 出现故障,有些数据可能未写入 Slave ,未同步的数据可能丢失。
# Broker 配置
flushDiskType=SYNC_FLUSH # 同步刷盘
brokerRole=SYNC_MASTER # 同步复制

消费者

1、生产者使用同步发送模式 ,收到mq的返回确认以后 ,顺便往自己的数据库里面写msgId+status(1)+time
2、消费者消费以后, 修改数据这条消息的状态 = 2
3、写一个定时任务,间隔两天去查询数据 ,如果有status = 1 AND time < NOW() - INTERVAL 2 DAY,查出来进行补发,同时消费者需要做好幂等性控制重复消费。
4、定时器+状态表

在这里插入图片描述

http://www.xdnf.cn/news/6541.html

相关文章:

  • Unity:延迟执行函数:Invoke()
  • 2024 睿抗机器人开发者大赛CAIP-编程技能赛-本科组(国赛) | 珂学家
  • 深入理解二叉树:遍历、存储与算法实现
  • LIIGO ❤️ RUST 12 YEARS
  • Milvus(24):全文搜索、文本匹配
  • STM32的ADC模块中,**采样时机(Sampling Time)**和**转换时机(Conversion Time),获取数据的时机详解
  • 【leetcode】144. 二叉树的前序遍历
  • Rust 数据结构:String
  • iOS SwiftUI的具体运用实例(SwiftUI库的运用)
  • 深入解析ZAB协议:ZooKeeper的分布式一致性核心
  • YOLOv3深度解析:多尺度特征融合与实时检测的里程碑
  • 【图像生成1】Latent Diffusion Models 论文学习笔记
  • Java注解详解:从入门到实战应用篇
  • Graph Representation Learning【图最短路径优化/Node2vec/Deepwalk】
  • 开源鸿蒙北向源码开发: 5.0kit化相关sdk编译
  • YOLOv8在单目向下多车辆目标检测中的应用
  • 协议不兼容?Profinet转Modbus TCP网关让恒压供水系统通信0障碍
  • 当 PyIceberg 和 DuckDB 遇见 AWS S3 Tables:打造 Serverless 数据湖“开源梦幻组合”
  • 【数据结构】手撕AVL树(万字详解)
  • 部署docker上的redis,idea一直显示Failed to connect to any host resolved for DNS name
  • 制造业工厂的三大核心系统:ERP+PLM+MES
  • 滑动窗口之二(优先队列)
  • 关于PID的几种整定方法
  • 【Fifty Project - D26】
  • 第四章:文件内容查看
  • 使用nps配置内网穿透加域名解析
  • 中国版 Cursor?腾讯推出 AI 编程助手 CodeBuddy,重新定义编程体验
  • 项目变更管理
  • 怎样用 esProc 实现连续区间的差集运算
  • 2023年河南CCPC->F题