当前位置: 首页 > news >正文

19、RocketMQ核⼼编程模型

 、源码环境搭建

1 、主要功能模块


RocketMQ的官方Git仓库地址:https://github.com/apache/rocketmq 可以用git把项目 clone下来或者直接下载代码包。
也可以到RocketMQ的官方网站上下载指定版本的源码: http://rocketmq.apache.org/dowloading/releases/ 源码下很多的功能模块 ,其中大部分的功能模块都是可以见名知义的:
broker: Broker 模块( broke 启动进程)
client :消息客户端 , 包含消息生产者 、消息消费者相关类
example: RocketMQ 例代码
namesrv:NameServer模块
store:消息存储模块
remoting:远程访问模块


2 、源码启动服务

将源码导入IDEA后 ,需要先对源码进行编译 。
编译指令 clean install -Dmaven.test.skip=true

编译完成后就可以开始调试代码了 。调试时需要按照以下步骤:

2.1 启动nameServer

展开namesrv模块  运⾏NamesrvStartup类即可启动NameServer服务。

 

 对这个NamesrvStartup类做—个简单的解读都知道 ,可以通过-c参数指定—个properties配置⽂件 ,并通过-p参数打印出nameserver所有⽣效的参数配置。

 

orderMessageEnable参数默认是false ,通过指定配置⽂件 ,修改成了true 服务端部署时 ,也可以这样调整nameserver的默认参数配置。 

配置完成后 ,去掉-p参数 ,再次执⾏ ,就可以启动nameserver服务了 。启动成功 ,可以在控制台看到这样⽇志 

load config properties file OK, /Users/roykingw/names rv/names rv.properties The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

2.2 启动Broker

类似的  Broker服务的启动⼊⼝在broker模块的BrokerStatup类。

Broker服务  同样可以通过-c参数指定broker.conf⽂件 ,并通过-p或者-m参数打印出⽣效的配置信息。

broker.conf配置⽂件在distribution模块中。

然后重新启动  即可启动Broker

2.3 调⽤客户端

服务启动好了之后 ,就可以使⽤客户端收发消息了。 客户端代码在example模块中 ,具体使⽤⽅式略过。

3 、读源码的⽅法

整个源码环境调试好后 ,接下来就可以开始详细调试源码了 。但是对于RocketMQ的源码 ,不建议打断点调试  因为线程和定时任务太多 ,打断点很难调试到。 RocketMQ的源码有个特点 ,就是 ⼏乎没有注释 。所以开始读源码之前 ,我会给你分享—些读源码的⽅式  以便后续你能更好的跟上我的思路。

1 、带着问题读源码 。如果没有⾃⼰的思考 ,源码不如不读!!!

2 、⼩步快⾛ 。不要觉得—两遍就能读懂源码。这⾥我会分为三个阶段来带你逐步加深对源码的理解。

3 、分步总结 。带上⾃⼰的理解 ,及时总结。对各种扩展功能  尝试验证。对于RocketMQ ,试着去理解源码中的各种单元测试。

 、源码热身阶段

梳理—些重要的服务端核⼼配置  同时梳理—下NameServerBroker有哪些核⼼组件 ,找到—点点读源码的感觉。

1 NameServer的启动过程

1.1关注的问题

RocketMQ集群中  实际记性消息存储 、推送等核⼼功能点额是Broker 。⽽NameServer的作⽤ ,其实和微服务中的注册中⼼⾮常类似 ,他只是提供了Broker 端的服务注册与发现功能。

第—次看源码 ,不要太过陷⼊具体的细节 ,先搞清楚NameServer的⼤体结构。

1.2源码重点

NameServer的启动⼊⼝类是org.apache.rocketmq.namesrv.NamesrvStartup 。其中的核⼼是构建并启动—个NamesrvController。这个Cotroller对象就跟 MVC中的Controller是很类似的 ,都是响应客户端的请求 。只不过 ,他响应的是基于Netty的客户端请求。

另外还启动了—个ControllerManager服务  这个服务主要是⽤来保证服务⾼可⽤的  这⾥暂不解读。

另外 ,他的实际启动过程 ,其实可以配合NameServer的启动脚本进⾏更深⼊的理解 。我们这最先关注的是他的整体结构:

 

解读出以下⼏个重点:

  1. 1、这⼏个配置类就可以⽤来指导如何优化Nameserver的配 。⽐如 ,如何调整nameserver的端口? ⾃⼰试试从源码中找找答案。
  2. 2、在之前的4.x版本当中  Nameserver中是没有ControllerManagerNettyRemotingClient  这意味着现在NameServer现在也需要往外发Netty请求了。
  3. 3 、稍微解读下Nameserver中核⼼组件例如RouteInfoManager的结构 ,可以发现RocketMQ的整体源码⻛格其实就是典型的MVC思想 Controller响应⽹络请  ,各种Manager和其中包含的Service处理业务  内存中的各种Table保存消息。

2 Broker服务启动过程

2.1关注重点

Broker是整个RocketMQ的业务核⼼ 。所有消息存储 、转发这些重要的业务都是Broker进⾏处理。 这⾥重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核⼼业务流程的起点。

2.2 源码重点

Broker启动的⼊⼝在BrokerStartup这个类 ,可以从他的main⽅法开始调试。

启动过程关键点:重点也是围绕—个BrokerController对象 ,先创建 ,然后再启动。

⾸先: 在BrokerStartup.createBrokerController⽅法中可以看到Broker的⼏个核⼼配置:

  • .  BrokerConfig  Broker服务配置
  • .  MessageStoreConfig : 消息存储配置 。 这两个配置参数都可以在broker.conf⽂件中进⾏配置
  • .  NettyServerConfig Netty服务端占⽤了10911端⼝ 。同样也可以在配置⽂件中覆盖。
  • .  NettyClientConfig  Broker既要作为Netty服务端  向客户端提供核⼼业务能⼒  ⼜要作为Netty客户端  NameServer注册⼼跳。
  • .  AuthConfig:权限相关的配置。

这些配置是我们了解如何优化 RocketMQ 使⽤的关键。

然后: 在BrokerController.start⽅法可以看到启动了—⼤堆Broker的核⼼服务 ,我们挑—些重要的


this.messageStore.start();//启动核⼼的消息存储组件this.timerMessageStore.start(); //时间轮服务 ,主要是处理指定时间点的延迟消息。this.remotingServer.start(); //Netty服务端this.fastRemotingServer.start(); //启动另—个Netty服务端。this.broke rOuterAPI.start();//启动客户端 ,往外发请求this.topicRouteInfoManager.start(); //管理Topic路由信息BrokerController.this.registerBrokerAll:  //向所有依次NameServer注册⼼跳。this.brokerStatsManager.start();//服务状态

我们现在不需要了解这些核⼼组件的具体功能  只要有个⼤概  Broker中有—⼤堆的功能组件负责具体的业务 。后⾯等到分析具体业务时再去深⼊每个服务的 细节。

我们需要抽象出Broker的—个整体结构:

 可以看到Broker启动了两个Netty服务 ,他们的功能基本差不多 。实际上 ,在应⽤中 ,可以通过producer.setSendMessageWithVIPChannel(true) ,让少量⽐ 较重要的producerVIP的通道 。⽽在消费者端 ,也可以通过consumer.setVipChannelEnabled(true) ,让消费者⽀持VIP通道的数据。

三、⼩试⽜⼑阶段

开始理解—些⽐较简单的业务逻辑

1 Netty服务注册框架

1.1关注重点?

RocketMQ实际上是—个复杂的分布式系统  NameServer  Broker Client之间需要有⼤量跨进程的RPC调⽤。这些复杂的RPC请求是怎么管理 ,怎么调⽤的 呢?这是我们去理解RocketMQ底层业务的基础。这—部分的重点就是去梳理RocketMQ的这—整套基于Netty的远程调⽤框架。

需要说明的是  RocketMQ整个服务调⽤框架绝⼤部分是使⽤Netty框架封装的。 所以 ,要看懂这部分代码 ,需要你对Netty架有⾜够的了解。

1.2源码重点

Netty的所有远程通信功能都由remoting模块实现 remoting模块中有两个对象最为重要 。 就是RPC的服务端RemotingServer以及客户端RemotingClient。在 RocketMQ ,涉及到的远程服务⾮常多  同—个服务 ,可能既是RPC的服务端也可以是RPC的客户端 。例如Broker服务 ,对于Client来说 ,他需要作为服务端  响应他们发送消息以及拉取消息等请求 ,所以Broker是需要RemotingServer的。 ⽽另—⽅⾯  Broker需要主动向NameServer发送⼼跳请求  这时  Broker   需要RemotingClient 。因此  Broker既是RPC的服务端⼜是RPC的客户端。

对于这部分的源码 ,就可以从remoting模块中RemotingServerRemotingClient的初始化过程⼊⼿。有以下⼏个重点是需要梳理清楚的:

1 RemotingServerRemotingClient之间是通过什么协议通讯的?

RocketMQ  RemotingServer是—个接⼝ ,在这个接⼝下 ,提供了两个具体的实现类  NettyRemotingServerMultiProtocolRemotingServer 。他们都是基 Netty框架封装的  只不过处理数据的协议不—样 。也就是说  RocketMQ可以基于不同协议实现RPC访问 。其实这也就为RocketMQ提供多种不同语⾔的客   户端打下了基础。

2 、哪些组件需要Netty服务端?哪些组件需要Netty客户端?

之间简单梳理过  NameServerBroker的服务内部都是既有RemotingServerRemotingClient的。 那么作为客户端的ProducerConsumer ,是不是就只需 RemotingClient呢?其实也不是 ,事务消息的Producer也需要响应Broker的事务状态回查 ,他也是需要NettyServer的。

这⾥需要注意的是  Netty框架是基于Channel⻓连接发起的RPC通信 。只要⻓连接建⽴了 ,那么数据发送是双向的。 也就是说 Channel⻓连接建⽴完成后, NettyServer服务端也可以向NettyClient客户端发送请求 ,所以服务端和客户端都需要对业务进⾏处理。

3 Netty框架最核⼼的部分是如何构架处理链  RocketMQ是如何构建的呢?

服务端构建处理链的核⼼代码:

// org.apache.rocketmq.remoting.netty.NettyRemotingServer
protected ChannelPipeline configChannel(SocketChannel ch) {return ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler()).addLast(defaultEventExecutorGroup, encoder, //请求编码器new NettyDecoder(), //请求解码器   distributionHandler, //请求计数器 new                 IdleStateHandler(0, 0,nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //⼼跳管理器connectionManageHandler, //连接管理器serverHandler //核⼼的业务处理器);
}

我们这⾥主要分析业务请求如何管理 。分两个部分来看:

1 、请求参数:

从请求的编解码器可以看出  RocketMQ的所有RPC请求数据都封装成RemotingCommand对象 RemotingCommand对象中有⼏个重要的属性:

private int code; //响应码 ,表示请求处理成功还是失败
private int opaque = requestId.getAndIncrement(); //服务端内部会构建唯—的请求ID。
private transient CommandCustomHeader customHeader; //⾃定义的请求头 。⽤来区分不同的业务请求 private transient byte [] body; //请求参数体
private int flag = 0; //参数类型 ,   默认0表示请求 , 1表示响应

2 、处理逻辑

所有核⼼的业务请求都是通过—个NettyServerHandler进⾏统—处理 。他处理时的核⼼代码如下:

 

@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { //统—处理所有业务请求
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) {
int localPort = RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress());
NettyRemotingAbstract remotingAbstract = NettyRemotingServer.this.remotingServerTable.get(localPort);
if (localPort != -1 && remotingAbstract != null) {
remotingAbstract.processMessageReceived(ctx, msg); //核⼼处理请求的⽅法
return;
}
// The related remoting server has been shutdown, so close the connected channel
RemotingHelper.closeChannel(ctx.channel());
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
//调整channel的读写属性
}
}

*2.1、在最核⼼的处理请求的processMessageReceived⽅法中 ,会将请求类型分为 REQUEST__COMMAND 和 RESPONSE_COMMAND来处理 。**为什么会 有两种不同类型的请求呢?

这是因为客户端的业务请求会有两种类型:—种是客户端发过来的业务请求 ,另—种是客户上次发过来的业务请求 ,可能并没有同步给出相应。这时就需要客 户端再发—个response类型的请求 ,获取上—次请求的响应。这也就能⽀持异步的RPC调⽤ 

2.2 、如何处理request类型的请求?

服务端和客户端都会维护—个processorTable。这是个HashMap,key是服务码 ,也就对应RemotingCommandcode value是对应的运⾏单元

Pair<NettyRequestProcessor, ExecutorService> 。包含了执⾏线程的线程池和具体处理业务的Processor 。 ⽽这些Processor ,是由业务系统⾃⾏注册的。 也就是说 ,想要看每个服务具体有哪些业务能⼒ ,就只要看他们注册了哪些Processor就知道了。

Broker服务注册 ,详⻅ BrokerController.registerProcssor()⽅法。

NameServer的服务注册⽅法 ,重点如下:private void registerProcessor() {
if (names rvConfig.isClusterTest()) { //是否测试集群模式 ,默认是false。也就是说现在阶段不推荐。
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this,
names rvConfig.getProductEnvName()), this.defaultExecutor);
} else {
// Support get route info only temporarily
ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);
this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC,
clientRequestProcessor, this.clientRequestExecutor);
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);
}
}

另外  NettyClient也会注册—个⼤的ClientRemotingProcessor ,统—处理所有请求 。注册⽅法⻅ org.apache.rocketmq.client.impl.MQClientAPIImpl类的构 造⽅法 。也就是说  只要⻓连接建⽴完成了  NettyClient⽐如Producer ,也可以处理NettyServer发过来的请求。

2.3 、如何处理response类型的请求?

NettyServer处理完request请求后 ,会先缓存到responseTable ,等NettyClient下次发送response类型的请求 ,再来获取。这样就不⽤阻塞Channel ,提升 请求的吞吐量 。优雅的⽀持了异步请求。

** 2.4 、关于RocketMQ的同步结果推送与异步结果推送**

RocketMQRemotingServer服务端 ,会维护—个responseTable  这是—个线程同步的Map结构 key为请求的ID value是异步的消息结果。 ConcurrentMap<Integer /* opaque */, ResponseFuture>

处理同步请求(NettyRemotingAbstract#invokeSyncImpl) ,处理的结果会存⼊responseTable ,通过ResponseFuture提供—定的服务端异步处理⽀持 ,提升 服务端的吞吐量 。 请求返回后 ,⽴即从responseTable中移除请求记录。

实际上  同步也是通过异步实现的。

//org.apache.rocketmq.remoting.netty.ResponseFuture
//发送消息后 ,通过countDownLatch阻塞当前线程 ,造成同步等待的效果。
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
//等待异步获取到消息后 ,再通过countDownLatch释放当前线程。
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;
this.countDownLatch.countDown();
}

 

处理异步请求(NettyRemotingAbstract#invokeAsyncImpl) ,处理的结果依然会存⼊responsTable ,等待客户端后续再来请求结果 。但是他保存的依然是— ResponseFuture ,也就是在客户端请求结果时再去获取真正的结果。

另外 ,在RemotingServer启动时 ,会启动—个定时的线程任务 ,不断扫描responseTable ,将其中过期的response清除掉。

//org.apache.rocketmq.remoting.netty.NettyRemotingServer
TimerTask timerScanResponseTable = new TimerTask() {
@Override
public void run(Timeout timeout) {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
} finally {
timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS);
}
}
};
this.timer.newTimeout(timerScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);

整体RPC框架流程如下图:

可以看到  RocketMQ基于Netty框架实现的这—套基于服务码的服务注册机制  即可以让各种不同的组件都按照⾃⼰的需求注册⾃⼰的服务⽅法  ⼜可以以—  种统—的⽅式同时⽀持同步请求和异步请求 。所以这—套框架 ,其实是⾮常简洁易⽤的。在使Netty框架进⾏相关应⽤开发时 ,都可以借鉴他的这—套服务注 册机制。 例如开发—个⼤型的IM项⽬ ,要添加好友、发送⽂本、发送图⽚、发送附件 、甚⾄还有表情 、红包等等各种各样的请求。这些请求如何封装 ,就可以  参考这—套服务注册框架。 

http://www.xdnf.cn/news/1071199.html

相关文章:

  • AI助力基因数据分析:用Python玩转生命密码的秘密
  • 像素之外的智慧:Adobe AI在动态影像与云端协作中的进阶应用
  • LLM驱动开发:正在重塑软件工程的下一场革命
  • Re:从零开始的文件结构(融合线性表来理解 考研向)
  • 自动化保护 AWS ECS Fargate 服务:使用 Prisma Cloud 实现容器安全
  • uiautomation控制计算器,不动鼠标(界面控制)
  • Nuxt.js基础(配置)
  • 【Elasticsearch】Linux环境下安装Elasticsearch
  • 论特定领域软件架构
  • 《Opto-Electronic Advances》热点论文速览(2025)
  • 汽车涂胶车间的“通信桥梁”:PROFIBUS DP转ETHERNET/IP网关的应用实践
  • word中如何保存高清图片,并保存为高质量的pdf文件(图像不失真)
  • 多张图片生成PDF每张图片生成pdf的一页
  • lxd 容器内的深度学习服务器环境配置
  • sql server 将nvarchar长度设置成max有什么隐患
  • VSCode中创建和生成动态库项目
  • 时序数据库全面解析与对比
  • TCP/IP协议简要概述
  • 小型软件开发的三重境界:从混沌编码到结构化设计
  • Stable Diffusion入门-ControlNet 深入理解 第二课:ControlNet模型揭秘与使用技巧
  • 基于残差神经网络的垃圾分类
  • Maven生命周期与阶段扩展深度解析
  • 嵌入式项目:基于QT与Hi3861的物联网智能大棚集成控制系统
  • jenkins中执行python脚本导入路径错误
  • Chrome浏览器访问https提示“您的连接不是私密连接”问题解决方案
  • 【C++特殊工具与技术】固有的不可移植的特性(3)::extern“C“
  • 力扣第455场周赛
  • MATLAB 4D作图
  • Hyperledger Fabric 入门笔记(二十)Fabric V2.5 测试网络进阶之Tape性能测试
  • OpenCV模版匹配方法的衡量指标比较