当前位置: 首页 > backend >正文

rocketmq 拉取消息

理解netty的回调

对于都在内存里的回调, 是所有的调用栈持有回调对象,所以在当前执行节点执行完成时,可以直接从内存中拿到回调对象,直接回调即可

针对于网络编程,netty, 需要跨机器经历网络传输,所以不可能调用栈里持有回调对象,所以会额外使用map记录回调对象future, 方便在回调时, 通过requestid找到对应的回调对象,进行回调方法的调用;
所以发起调用的是 channalWrite(), 而netty发起回调的又是另外的方法,channelRecieve()方法。
要把来个方法联系起来看;

如果遇到线程start()方法, 不要看start, 而要直接取看run()方法

获取消息的方式

消费者并不是每次要消费一条数据就向Broker获取一条数据的,这样RPC的开销太大了,因此先从Broker获取一批数据到内存中,再进行消费

消费端获取消息通常有三种方式:推送消息、拉取消息、长轮询(推拉结合)

推送消息:消息持久化到Broker后,Broker监听到有新消息,主动将消息推送到对应的消费者

Broker主动推送消息具有很好的实时性,但如果消费端没有流控,推送大量消息时会增加消费端压力,导致消息堆积、吞吐量、性能下降

拉取消息:消费端可以根据自身的能力主动向Broker拉取适量的消息,但不好预估拉取消息的频率,拉取太慢会导致实时性差,拉取太快可能导致压力大、消息堆积

长轮询:在拉取消息的基础上进行改进,如果在broker没拉取到消息,则会等待一段时间,直到消息到达或超时再触发拉取消息

长轮询相当于在拉取消息的同时,通过监听消息到达,增加推送的优点,将拉取、推送的优点结合,但长连接会更占资源,大量长连接会导致开销大

RocketMQ中常用的消费者DefaultMQPushConsumer,虽然从名字看是“推送”的方式,但获取消息用的是长轮询的方式

这种特殊的拉取消息方式能到达实时推送的效果,并在消费者端做好流控(拉取消息达到阈值就延时拉取)以防压力过大

拉取消息原理

在这里插入图片描述
点进去start方法看,我们自定义的listener被注册入 private MessageListener messageListener;,等待拉到消息后被回调(在大的交互流程中,我们的消费是被回调的一部分)

主要涉及两个类 DefaultMQPushConsumer的内部实现DefaultMQPushConsumerImpl有一个MQ客户端实例MQClientInstance 它内部包含的PullMessageService组件,就是用于长轮询拉取消息的

拉取简化的流程为:
从队列取出PullRequest,然后封装请求向Broker异步发送
响应后通过回调将查到的消息放入其内存队列中,方便后续消费
在此期间最终都会将PullRequest放回队列(失败可能延时放回),便于下次拉取该队列的消息
如下图
在这里插入图片描述
在这里插入图片描述
进入PullMessageService的线程类run方法
在这里插入图片描述

  • pullMessage最终会调用DefaultMQPushConsumerImpl.pullMessage,代码虽然很多,但主要流程为校验、获取参数、调用核心方法

  • 进行参数、状态、流控的校验,如果失败会调用executePullRequestLater后续延时50ms将拉取请求重新放回队列中,也就是后续再进行该队列的消息拉取

  • 如果是第一次执行,要获取消费进度的偏移量computePullFromWhereWithException,后续使用PullRequest上的nextOffset(集群模式向Broker获取)

  • 获取消费端相关信息(后续会封装成请求),创建回调,回调在RPC后调用

  • 执行拉取消息的核心方法 pullKernelImpl

public void pullMessage(final PullRequest pullRequest) {//获取内存队列final ProcessQueue processQueue <
http://www.xdnf.cn/news/6286.html

相关文章:

  • 信奥赛-刷题笔记-队列篇-T3-P2058海港和P1886单调队列
  • sip协议栈--sip结构分析
  • 大模型哲学:语言的边界就是世界的边界
  • 并查集算法的学习
  • React学习———useContext和useReducer
  • 香橙派zero3 安卓12 TV,遥控器关机。重启?
  • AD 规则的使能及优先级的设置
  • mybatis plus (sqlserver) 根据条件来获取id最大的,或者是新增的最新的一条记录(同条件可能会有多条出现)
  • 数据 分析
  • AD 局部铺铜
  • 职坐标解析职业规划核心五步骤
  • 谷歌web第三方登录
  • 解锁数据的力量:数据治理的新篇章与未来蓝图“
  • Chrome浏览器实验性API computePressure的隐私保护机制如何绕过?
  • ZYNQ PS VDMA②
  • ElasticSearch高级功能
  • 使用matlab进行数据拟合
  • hghac8008漏洞扫描处理
  • [Java实战]Spring Boot 3整合JWT实现无状态身份认证(二十四)
  • 文章记单词 | 第73篇(六级)
  • 【AI面试秘籍】| 第9期:Transformer架构中的QKV机制深度解析:从原理到实践实现
  • Lord Of The Root: 1.0.1通关
  • 安卓system/文件夹下的哪些文件夹可以修改为别的设备的
  • 【信息系统项目管理师】第5章:信息系统工程 - 36个经典题目及详解
  • Agent Builder API - Agent Smith 扩展的后端服务(开源代码)
  • 【Java学习笔记】toString方法
  • MySQL 数据库基础
  • 右值引用的学习
  • cGAS-STING通路
  • 线程同步机制