当前位置: 首页 > backend >正文

RocketMQ简介

生产与消费流程:

e4eea55ec4e1400882586f8f3e7018bf.png

消费者会启动一个后台线程,不断地从Broker拉取消息,并放入到本地的队列中。消费者通过长轮询(Long Polling)机制来优化轮询效率和新消息的实时性。在没有新消息可拉取时,Broker会挂起消费者的请求,并在有新消息到达时立即返回给消费者,或者在请求超时后返回空结果。

 

消息确认机制

确定消息生产并接收。生产者->broker

单向发送

消费者直接给broker,不关心broker是否接收到。producer.sendOneway(message);

同步发送

同步发送⽅式下,消息⽣产者在往Broker端发送消息后,会阻塞当前线程,等待Broker端的相应结果。producer.send(msg);

异步发送

⽣产者在向Broker发送消息时,会同时注册⼀个回调函数。接下来⽣产者并不等待Broker的
响应。当Broker端有响应数据过来时,⾃动触发回调函数进⾏对应的处理。

producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}
});

确定消息被消费。broker->消费者

broker等待消费者返回接收到的信号。如果等不到,broker也会重新发,默认重发16次失败后,消息会进入到死信队列。这个等待返回信息和重发的操作是异步的,不会阻塞其他信息的推送。

 

广播模式和集群模式

⼴播模式和集群模式是RocketMQ的消费者端处理消息最基本的两种模式。集群模式下,⼀个消息,只会被⼀个消费者组中的多个消费者实例共同处理⼀次。⼴播模式下,⼀个消息,则会推送给所有消费者实例处理,不再关⼼消费者组

 

顺序消息

4a68c81adb9d4e98a3f23d2cf06b0129.png

用MessageQueueSelector确定发送的队列,路由是自己代码写

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg,Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}
}, orderId);

 

延迟消息

生产者msg.setDelayTimeLevel(3);

2ec09a914936418280d59cf41a5a4fa2.png

延迟消息的难点其实是性能,需要不断进⾏定时轮训。全部扫描所有消息是不可能的,RocketMQ的实现⽅式是预设⼀个系统Topic,名字叫做SCHEDULE_TOPIC_XXXX。在这个Topic下,预设18个延迟队列。然后每次只针对这18个队列⾥的消息进⾏延迟操作,这样就不⽤⼀直扫描所有的消息了。

 

批量消息

    List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "Tag", "OrderID001", "Hello world
0".getBytes()));messages.add(new Message(topic, "Tag", "OrderID002", "Hello world
1".getBytes()));messages.add(new Message(topic, "Tag", "OrderID003", "Hello world
2".getBytes()));producer.send(messages);

 

过滤消息

订阅过滤

6f5405f300c146b4af1747a2b21f7023.png

sql过滤15b9762d072241ac8a9f5a2f5d92ee0c.png

 

事务——两阶段确认

下单问题。订单状态成功改变之后才能执行清空购物车、清空支付单等一系列操作。业务侧发消息给rocketMQ,rocketMQ把消息标记为“暂不投递”状态(第一阶段),rocketMQ返回ack给业务侧,业务侧执行sql,确定订单状态已经入库,业务侧发消息(commit/rollback)给rocketMQ(第二阶段),rocketMQ把消息标记为ok状态,这时候的消息才能被消费者消费。

 

权限

RocketMQ针对每个Topic,就有完整的权限控制。⽐如,在控制平台中,就可以很⽅便的给每个Topic配置权限。是否可写、是否可订阅。

url白名单

 

核心源码

@RocketMQMessageListener注解方式创建消费者,是如何创建的呢?⼊⼝在rocketmq-spring-boot-2.2.2.jar中的org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration类中。这个ListenerContainerConfiguration类继承了Spring当中的SmartInitializingSingleton接⼝,当Spring容器当中所有⾮懒加载的实例加载完成后,就会触发他的afterSingletonsInstantiated⽅法进⾏初始化。在这个⽅法中会去扫描所有带有注解@RocketMQMessageListener注解的类,将他注册到内部⼀个Container容器当中。重点看afterSingletonsInstantiated方法。

 

最佳使用

topic要少,一个应用最好一个topic。tags可以多,tag对应具体的业务。

因为topic过多,管理起来比较麻烦,性能消耗大。尽量用tag进行过滤,不要用复杂sql进行过滤。

key最好唯一。

  • Key主要用于消息的查询和定位,而Tag则主要用于消息的过滤和分类。
  • Key是消息的一个关键属性,可以设置为唯一值(不设置的话rocketMQ会自动生成);而Tag则是消息的一个标签,可以包含多个值。

一是因为可以根据key来过滤消息。

二是broker会对每个消息的key创建索引,要尽可能减少key的hash冲突

人工定时关注消息重试队列和死信队列。

⼀个死信队列对应⼀个ConsumGroup,⽽不是对应某个消费者实例。

对错误原因进行排查,减少潜在隐患。重试队列太多占空间。

 

幂等性

在MQ系统中,对于消息幂等有三种实现语义:
at most once 最多⼀次:每条消息最多只会被消费⼀次
at least once ⾄少⼀次:每条消息⾄少会被消费⼀次
exactly once 刚刚好⼀次:每条消息都只会确定的消费⼀次

rockeMQ中,at most once是最好保证的。RocketMQ中可以直接⽤异步发送、sendOneWay等⽅式就可以保证。

⽽at least once这个语义,RocketMQ也有同步发送、事务消息等很多⽅式能够保证。

exactly once保证不了,需要业务端自己实现,如根据messageId或者业务id

 

 

http://www.xdnf.cn/news/11648.html

相关文章:

  • Maven详解
  • MD5加密算法
  • TIM(定时器中断)寄存器及库函数配置
  • Wireshark抓包工具的使用
  • anaconda下载与安装
  • SEO实战干货:利用高权重网站借力操作关键词快速排名!
  • 单细胞RNA测序(scRNA-seq)细胞分离与扩增
  • 深入理解程序从编译到运行
  • 深度学习之---yolov1,v2,v3详解及实践
  • 2020年10月国产数据库排行:GoldenDB跃升异军突起 PolarDB和GaussDB云化融合
  • Apache Spark 基础知识总结及应用示例
  • 顺序程序设计
  • js aes加密 md5摘要 base64编码 ras加密
  • 视频编码fmpeg 常用命令汇总
  • python创意编程作品集,python创意小作品代码
  • 巧用Superset大数据分析平台搞定各类图表
  • 电子邮箱:连接你我,申请注册全攻略解析
  • 渗透测试工程师面试题大全(164道)
  • 新手站长个人经验谈几点SEO优化技巧
  • 宿舍|学生宿舍管理小程序|基于微信小程序的学生宿舍管理系统设计与实现(源码+数据库+文档)
  • 210、基于STM32单片机无线LORA通信多功能电能表电压电流功率用电量等多参数电能表上位机显示设计
  • 第三方支付平台的优缺点分别是什么?
  • 2018年四大爬虫代理IP提供商对比
  • marginRight 不起作用解决方法
  • “时间的朋友”2017跨年演讲全回顾
  • 文件无法删除:windows下文件名太长无法删除的问题 / 无法删除目录层次太深、文件名或者扩展名太长的问题
  • python贪吃蛇最简单代码,简单代码编程 贪吃蛇
  • 使用MSXML2解析XML文件
  • 2024年PHP从基础到高级详细教程(完整版)_php高级教程(1),2024年最新GitHub标星1w的Golang架构师必备技能
  • 5个常见运维场景,用这几个Python脚本就够了!