【RocketMQ NameServer】- NameServer 启动源码
文章目录
- 1. 前言
- 2. RocketMQ 通信架构
- 3. NameServer 启动流程
- 3.1 创建 NameServerController
- 3.2 启动 NameServerController
- 3.3 NamesrvController#initialize
- 3.3.1 Netty 通信的整体流程
- 3.3.2 创建 NettyRemotingServer
- 3.4 this.remotingServer.start()
- 3.4.1 this.remotingServer.start()
- 3.4.2 NettyEventExecutor.start
- 3.5 shutdown 关闭 NameServer
- 4. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
前面我们介绍了消息的发送和消费的相关例子,可以看这个目录下面的文章《【RocketMQ 生产者消费者】- 同步、异步、单向发送消费消息》【RocketMQ】- 源码系列目录,这篇文章我们就深入源码,首先来看下 NameServer 的启动流程,为什么是 Nameserver 启动流程呢?因为在上一篇文章中创建生产者和消费者的时候可以看到只设置了 NameServer 的地址,但是消息最终存储和拉取都是从 broker 来拉取的,所以 生产者、消费者、broker、NameServer 这几个组件之间肯定有某种联系,所以我们就从最基础的 NameServer 来逐步深入。
2. RocketMQ 通信架构
上面就是 RocketMQ 官方给出来的通信架构图了,也很好解释了上面的问题,NameServer 是一个比较简单的 Topic 路由注册中心,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现,其中主要有两个功能:broker 管理和路由信息管理。
首先是 broker 管理,每一个 broker 启动或者通过定时任务都会往 NameServer 上报 broker 路由信息,生产者和消费者启动就会定时向 NameServer 拉取 topic 的路由信息,而 topic 路由信息里面就包括 broker 的集群信息,从而进行消息的投递和消费,由于 broker 是向每一台 NameServer 注册自己的路由信息,所以每一个NameServer 实例上面都保存一份完整的路由信息,当一台 NameServer 下线之后 broker 一样可以向其他 NameServer 注册,生产者和消费者一样可以拉到最新的消息。
关于这一部分的介绍,大家可以把源码拉下来,然后在 architecture.md 里面找到对应的信息。
3. NameServer 启动流程
NameServer 启动流程在 NamesrvStartup 这个类的 main 方法,在启动之前如果是 windows 启动的话需要配置一下 ROCKETMQ_HOME,否则启动不了。
下面就是 main 方法,main 方法里面就一行代码,使用 main0 来启动 NameServer。
public static void main(String[] args) {// 启动 NameServermain0(args);
}public static NamesrvController main0(String[] args) {try {// 创建 NamesrvControllerNamesrvController controller = createNamesrvController(args);// 启动 NamesrvControllerstart(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}
其中比较核心的两个方法,创建 NamesrvController 以及启动 NamesrvController,下面先来看下创建 NamesrvController 的逻辑。
3.1 创建 NameServerController
/*** 创建 NamesrvController* @param args* @return* @throws IOException* @throws JoranException*/
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {// 1. 设置系统变量 rocketmq.remoting.version, 也就是当前 RocketMQ 的版本System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());// 解析命令行参数commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {// 如果找不到参数, 直接返回 -1, 退出 NameServer 的启动System.exit(-1);return null;}final NamesrvConfig namesrvConfig = new NamesrvConfig();// 创建 Netty 服务端配置, 同时设置监听端口为 9876, 所以 NameServer 启动监听端口就是 9876final NettyServerConfig nettyServerConfig = new NettyServerConfig();nettyServerConfig.setListenPort(9876);// 首先看下命令行里面有没有 -c 的参数, -c 就是指定配置文件if (commandLine.hasOption('c')) {// 获取 -c 参数后面的文件路径String file = commandLine.getOptionValue('c');// 读取配置文件里面的配置信息到 properties 中if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);// 将配置文件里面的信息设置到 NamesrvConfig 和 NettyServerConfig 中MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);// 设置配置文件namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}// 如果命令行包含参数 p, p 就是用来打印日志的if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);// 同时启动变量里面需要设置 ROCKETMQ_HOME, 如果不设置就直接退出if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 初始化 logback 配置, 读取 ${ROCKETMQ_HOME}/conf/logback_namesrv.xml 配置文件LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);// 构建 NamesrvControllerfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// 将配置注册到 allConfigs 中, 防止丢失controller.getConfiguration().registerConfig(properties);return controller;
}
上面是创建 NameServerController 的逻辑,里面的逻辑主要就是解析命令行参数,比如通过命令 nohup sh bin/mqnamesrv &
进行启动,而启动的时候可以设置一些参数,比如 -c xxx,这个方法就是会解析启动命令,解析里面的配置,比如 -c 后面跟的就是配置文件,-p 就是用于日志打印。
同时这个方法中会创建 Netty 服务端配置, 同时设置监听端口为 9876, 所以 NameServer 启动监听端口就是 9876,RocketMQ 的底层通信依赖的就是 Netty。其余的可以看下上面的源码,因为源码中的注释也能理解这一部分的内容是干什么的。
3.2 启动 NameServerController
public static NamesrvController start(final NamesrvController controller) throws Exception {// 创建失败, 抛出异常if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化方法boolean initResult = controller.initialize();if (!initResult) {// 如果初始化失败, 直接退出controller.shutdown();System.exit(-3);}// 添加一个关闭的钩子方法, 当 NameServer 关闭的时候会回调里面的 call 方法, 调用 shutdownRuntime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));// 启动 NameServer 监听连接请求controller.start();return controller;
}
启动流程会调用 controller.initialize()
来初始化 NamesrvController,同时添加一个关闭的钩子方法, 当 NameServer 关闭的时候会回调里面的 call 方法, 调用 shutdown,最后启动 NameServer 监听连接请求。
3.3 NamesrvController#initialize
这个方法就是核心的初始化逻辑,下面来解析里面的源码。
// 首先加载 ${user.home}/namesrv/kvConfig.json 配置文件里面的配置到 KVConfigManager#configTable 中
this.kvConfigManager.load();
首先第一步就是加载 ${user.home}/namesrv/kvConfig.json
配置文件里面的配置到 KVConfigManager#configTable
集合中,至于这个 KV 配置是干什么的,后面有时间再梳理下吧,目前还没看到这配置有什么用。
// 初始化 NettyRemotingServer, 也就是 Netty 服务端, 用于监听 broker、producer、consumer 的请求进行通信
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// Netty 远程通信处理的连接池, 核心线程数设置成了 8, 专门用于处理远程请求
this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
然后初始化 NettyRemotingServer
,这个服务就是用来处理远程连接的,NameServer 作为服务端需要启动一个 Netty 服务端专门用于监听来自 broker、producer、consumer 的请求进行通信处理。remotingExecutor
是处理连接通信的线程池,核心线程数设置成了 8。
// 注册默认的请求处理器, 这个请求处理器 DefaultRequestProcessor 里面就会针对不同的请求做不同的处理, 比如 broker 注册请求、topic 配置查询请求等等
this.registerProcessor();
这个方法就是专门用于注册处理器的,RocketMQ 中用于通信的对象就是 RemotingCommand,这个是 RocketMQ 客户端和服务端通信的请求响应协议,发送方发送请求的时候会设置一个 CODE,而 NameServer 或者 Broker 接受到请求之后就会使用处理器去针对这个 CODE 去走不同的处理逻辑,上面 registerProcessor
就是用于注册 NameServer 的请求处理器,下面可以进入这里面看下。
/*** 注册请求处理器*/
private void registerProcessor() {if (namesrvConfig.isClusterTest()) {this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),this.remotingExecutor);} else {// 注册默认的请求处理器, 将 DefaultRequestProcessor 和处理线程池设置到 NettyRemotingServer#defaultRequestProcessor 中this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);}
}
可以看到,默认就是注册了一个 DefaultRequestProcessor,这个 DefaultRequestProcessor 里面会用来处理 NameServer 接收到的请求,根据不同 Code 走不同的逻辑,当然处理逻辑的流程需要放在线程池 remotingExecutor 中执行,源码我就不一一解析了,这里截一下图来简单看下。
回到 initialize 方法,继续往下看,接下来就是初始化两个定时任务。
// 创建定时任务, 初始化 5s 之后开始执行, 之后每 10s 执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 扫描不活跃的 broker, 关闭跟这个 broker 的连接通道, 从 brokerLiveTable 中删掉这个 brokerNamesrvController.this.routeInfoManager.scanNotActiveBroker();}
}, 5, 10, TimeUnit.SECONDS);// 创建定时任务, 初始化 1s 之后开始执行, 之后每 10s 执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 打印 kvConfigManager 里面的 configTable, 也就是 KV 配置NamesrvController.this.kvConfigManager.printAllPeriodically();}
}, 1, 10, TimeUnit.MINUTES);
这两个定时任务一个是用来扫描不活跃的 broker, 关闭跟这个 broker 的连接通道, 从 brokerLiveTable 中删掉这个 broker,我们之前说过 NameServer 会存储所有上报过来的 broker,存储的集合就是 brokerLiveTable ,但是有一些 broker 是不活跃的,这部分 broker 就需要从 NameServer 的 brokerLiveTable 集合中删掉。
第二个定时任务就是用于打印 kvConfigManager 里面的 configTable, 也就是 KV 配置。
好了,上面就是这两个定时任务,继续回到 initialize 源码,还剩最后一点,注册监听器监听 TLS 相关证书文件的变化,一旦证书文件发生变化,就会重新加载服务器的 SSL 上下文,以确保服务器使用的是最新的证书。
// 注册监听器监听 TLS相关证书文件的变化, 一旦证书文件发生变化, 就会重新加载服务器的 SSL 上下文, 以确保服务器使用的是最新的证书
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}
}
3.3.1 Netty 通信的整体流程
在 initialize 方法中初始化了 NettyRemotingServer,我们上面也说过 RocketMQ 底层依靠 Netty 进行远程通信,实际上官方文档中也是说明了这一点,在官方源码的 design.md
文件中有记录,把源码下载下来之后在 docs/cn
目录下面就能看到了。
这部分跟 Netty 通信相关了,大家如果有兴趣我比较推荐看这位博主的博客,里面对 Netty 的体系架构都有一个详细的介绍:bin的技术小屋。
RocketMQ 底层也是用的一主多从架构,主 Reactor
用于接收 Accept 事件,也就是处理客户端的连接,为每一个客户端连接构建一个 NioSocketChannel 注册到 Selector 上,接着注册 Read 事件,然后由 从 Reactor
去接收 Read 请求来处理。
当服务端接受到读事件后会把消息丢给上面图中的 Worker 线程池
去处理,处理的时候需要经过 SSL验证、编解码、空闲检查、网络连接管理
这些步骤,这些其实就是注册的入站出站处理器,这些入站出站 Handler 处理器的最后一个就是 NettyServerHandler
,经过了前面这四个步骤之后在 NettyServerHandler 中会把具体的业务请求提交到 processor 线程池中去处理,也就是上面说的根据请求的 Code 来决定走什么逻辑。
前面的流程中设计几个线程池,下面先贴出来,如果在源码中看到也能提前有个印象,下面是线程的前缀:
- NettyBoss_%d: Reactor 主线程
- NettyServerNIOSelector_%d_%d : Reactor 从线程,当然如果支持 epoll 就会创建 NettyServerEPOLLSelector_%d_%d 线程前缀的线程池
- NettyServerCodecThread_%d: Worker线程池
- RemotingExecutorThread_%d: 业务 processor 处理线程池
3.3.2 创建 NettyRemotingServer
好了,上面我们说完整体流程之后下面就来进入正题,首先来看下 NettyRemotingServer 的构造方法。
/*** 创建 NettyServer 服务* @param nettyServerConfig* @param channelEventListener*/
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,final ChannelEventListener channelEventListener) {// 首先设置服务器单向、异步发送请求的信号量, 这个是为了防止同一时间发送太多单向请求或者异步请求super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());// 创建 ServerBootstrapthis.serverBootstrap = new ServerBootstrap();// 设置 Netty 服务端的配置this.nettyServerConfig = nettyServerConfig;// 设置 Netty 时间监听器, 比如在里面可以监听连接事件然后做一些特殊处理this.channelEventListener = channelEventListener;// 公开线程池 publicExecutor 的核心线程数, 默认就是 4int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();if (publicThreadNums <= 0) {publicThreadNums = 4;}// 创建一个默认线程数为 4 的线程池, 这个线程池可以用于处理异步请求回调this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());}});// 如果使用了 Epoll, 如果是 linux 服务器并且 linux 支持 epoll, 创建 EpollEventLoopGroupif (useEpoll()) {this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});} else {// 正常来说 NIO 就是走的这里, 创建 NioEventLoopGroupthis.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));}});this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);private int threadTotal = nettyServerConfig.getServerSelectorThreads();@Overridepublic Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));}});}loadSslContext();
}
上面的构造方法首先设置服务器单向、异步发送请求的信号量, 这个是为了防止同一时间发送太多单向请求或者异步请求,设置了信号量限制之后同一时间内发送请求数就不会超过上面这个值,默认 OneWay 是 256
,异步是 64
。
接下来就是创建 ServerBootstrap
,有用过 Netty 的朋友应该都知道,这玩意就是创建出来用来绑定端口然后开始监听的,相当于 Netty 服务端的启动器了。
然后就是设置 Netty 的配置 nettyServerConfig
和设置通道事件监听器 channelEventListener = BrokerHousekeepingService
,这个监听器大家先记着,后面有用到再说。
然后就是创建了一个核心线程数是 4
的线程池 publicExecutor
,这个线程池是用来处理异步请求回调的,因为对于异步请求,发送之后需要将 ResponseFuture 丢到一个集合中,然后启动定时任务去扫描这个 table,找出过期的请求来回调,又或者正常发送回调等等,回调就是通过这个线程池来回调的,当然了 NameServer 有没有发送异步目前还不知道,先知道有这么个东西,其实一般来说生产者发送异步消息的请求回调会用的比较多。
继续回到构造器的源码,最后就是创建了主 NioEventLoopGroup 和 从 NioEventLoopGroup,分别是 1 个线程和 3 个线程。
3.4 this.remotingServer.start()
在 3.2
小结最后通过 controller.start()
启动 NettyServer 服务去监听请求到了,那么下面我们就来看下这部分的源码。
/*** 启动 NameServer* @throws Exception*/
public void start() throws Exception {// 首先启动 NettyServer 服务this.remotingServer.start();// 然后启动 TLS 文件监听服务if (this.fileWatchService != null) {this.fileWatchService.start();}
}
3.4.1 this.remotingServer.start()
还是按照惯例,下面先给出所有的源码,然后再去分析里面做了什么,因为里面涉及到大量跟 Netty 相关的参数,说实话 Netty 源码这块很久都没有看了,所以这部分还是变看边用 AI 搜的。
/*** 启动 Netty 服务端*/
@Override
public void start() {// 创建默认的事件处理组, 专门用于处理一些执行前的编解码、连接空闲状态检测、网络连接管理等操作, 不管是接收到消息还是发送消息(入站或者出站处理器)// 都可以通过这个线程池去做处理, 这样就能避免 I/O 线程浪费资源在这些状态检测上, 这里默认创建的是 8 个线程大小的线程池this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {// 创建出来的线程是 NettyServerCodecThread_0、NettyServerCodecThread_1 ...return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 处理共享的处理器, 也就是 @ChannelHandler.Sharable 注解的处理器prepareSharableHandlers();// 创建 ServerBootstrapServerBootstrap childHandler =// 设置连接事件处理器 eventLoopGroupBoss(单线程) 以及 I/O 事件处理器(线程数 3)this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置连接通道, 默认就是 NioServerSocketChannel.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 控制 TCP 连接请求队列的大小.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())// 连接关闭后处于 "TIME - WAIT" 状态的 Socket 能否被新进程的连接 Socket 重用(重用地址和端口), 这里就是可以被重用.option(ChannelOption.SO_REUSEADDR, true)// 用于 TCP 连接心跳检测, 检测 TCP 连接是否活跃, 这里可能是 Netty 自己已经存在更精细的 IdleStateHandler 处理器就行了.option(ChannelOption.SO_KEEPALIVE, false)// 禁用 Nagle 算法, Nagle 算法是说当应用程序发送的数据量较小时,TCP 不会立即将这些数据发送出去,而是会等待一段时间,将后续的数据合并// 成一个较大的数据包后再发送。这样可以减少网络上的数据包数量,降低网络拥塞的可能性,提高传输效率.childOption(ChannelOption.TCP_NODELAY, true)// 配置本机地址, 监听端口是 9876.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))// 设置 NioSocketChannel(和客户端连接) 的 ChannelHandler.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 往流水线里面添加一些入站出站处理器ch.pipeline()// 处理 TLS 握手的处理器.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,// 编码处理器encoder,// 解码处理器new NettyDecoder(),// 维护心跳连接的处理器, 指定的三个参数是值读空闲时间、写空闲时间、总空闲时间, 如果服务端一定时间内没有读写就会出发不同的事件,这三个参数对应的// 事件分别是: IdleStateEvent.READER_IDLE、IdleStateEvent.WRITER_IDLE、IdleStateEvent.ALL_IDLE, 总之这玩意就是用来检测有没有长时间不// 读写的, 这样可以判断一个连接是不是空闲连接new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),// 连接管理器, 主要是管理连接的活跃、非活跃、注册、取消注册 ... 事件connectionManageHandler,// 服务请求处理器, Netty 的请求会封装成 RemotingCommand, 这个处理器里面会处理请求和响应的业务, 但是不管是哪一种业务// 本质上都是通过 RemotingCommand 里面的请求 CODE 找到对应的处理器, 然后使用相应的处理器来处理这些请求, 再返回请求结果serverHandler);}});// 设置 Netty 服务端的消息接收缓冲区if (nettyServerConfig.getServerSocketSndBufSize() > 0) {log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());}// 设置 Netty 服务端的消息发送缓冲区if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());}// 写缓冲区的低水位和高水位if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));}// 缓冲区分配器, PooledByteBufAllocatorif (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {// 绑定端口, 开始监听ChannelFuture sync = this.serverBootstrap.bind().sync();// 获取连接通道的本地地址InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();// 获取端口号this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// 上层传入的 BrokerHousekeepingService, 里面就是从 eventQueue 获取不同的连接事件然后做不同的处理, eventQueue 集合里面的 NettyEvent// 就是上面注册的 connectionManageHandler 在处理连接请求的时候往里面写入的, 比如当处理请求异常的时候就会往里面写入一个 NettyEventType.EXCEPTION 事件if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 启动定时任务, 初始化之后 3s 开始执行, 之后每隔 1s 执行一次this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 扫描 responseTable, 将超时的 ResponseFuture 删掉, 然后执行回调逻辑NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);
}
首先创建默认的事件处理组, 专门用于处理一些执行前的编解码、连接空闲状态检测、网络连接管理等操作, 不管是接收到消息还是发送消息(入站或者出站处理器) 都可以通过这个线程池去做处理, 这样就能避免 I/O 线程浪费资源在这些状态检测上, 这里默认创建的是 8 个线程大小的线程池,这个线程的作用在上面 3.3.1
小节也说过了。
接下来就是处理共享的处理器 prepareSharableHandlers
,共享处理器意思是使用 @ChannelHandler.Sharable
注解的 Handler,这些 Handler 可以注册到不同的 Pipeline 共享。
/*** 创建 @ChannelHandler.Sharable 处理器, 意思是这些处理器可以设置到多个 ChannelPipeline 中*/
private void prepareSharableHandlers() {// 创建 TLS 握手处理器handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);// 创建 Netty 编码处理器encoder = new NettyEncoder();// 创建 Netty 连接管理器, 主要是管理连接的活跃、非活跃、注册、取消注册 ... 事件connectionManageHandler = new NettyConnectManageHandler();// 创建 Netty 连接处理器serverHandler = new NettyServerHandler();
}
接下来就是重点了,配置 serverBootstrap
,这段代码大家可以去熟悉下 Netty 的创建流程之后再回过来看就会好理解很多,中间 .option
就是设置一些配置,这些配置控制 linux 底层 TCP 连接的一些行为,比如 连接请求队列大小
、是否重用 "TIME - WAIT" 状态的 Socket
、TCP 连接心跳检测是否打开
、是否禁用 Nagle 算法
,最后在 childHandler
的 initChannel
方法中设置了流水线上的处理器。
- HandshakeHandler: TLS 握手处理器(入站处理器)
- NettyEncoder: 编码处理器(出站处理器)
- NettyDecoder: 解码处理器(入站处理器)
- IdleStateHandler: 空闲连接状态处理器(入站处理器、出战处理器)
- NettyConnectManageHandler: RocketMQ 自己维护的连接管理器(入站处理器、出战处理器)
- NettyServerHandler: 业务请求处理器,核心处理器,处理客户端请求(入站处理器)
上面就是这些处理器的入站和出站处理,这里的入站出站意思是接收消息、发送消息,而这些处理器是通过 .childHandler
添加进去的,所以当有客户端来请求连接,服务端接受到 Accept 请求之后处理请求建立连接 NioSocketChannel,这些处理器就是注册在这些 channel 的 pipeline 上的,有关这里面的逻辑可以去看下 Netty 的处理逻辑,这块就不细讲了。
当创建出 childHandler 之后,会设置 Netty 服务端的消息接收缓冲区大小、Netty 服务端的消息发送缓冲区大小、写缓冲区的内存低水位和高水位线、缓冲区对象池内存分配 PooledByteBufAllocator,最后绑定端口(9876)开始监听请求。
到这里 Netty 服务就启动起来了,下面启动 nettyEventExecutor 这个线程服务,在启动之前判断 channelEventListener 如果不为空才启动,这个 channelEventListener 就是传入的 BrokerHousekeepingService,肯定是不为空的。
最后再启动一个定时任务,初始化之后 3s 开始执行, 之后每隔 1s 执行一次,扫描 responseTable, 将超时的 ResponseFuture 删掉, 然后执行回调逻辑,这部分逻辑到时候讲生产者发送消息的时候再详细说,现在就先不深入了,因为现在也没有具体的场景。
3.4.2 NettyEventExecutor.start
这个方法就是启动 Netty 连接事件监听器,然后在里面处理 Netty 事件消息,不过里面涉及到的代码比较多,就放到下一篇文章讲了,这里先不讲,大家先记住。
3.5 shutdown 关闭 NameServer
当调用 NamesrvController#initialize 初始化失败或者 NameServer 关闭的时候就会调用 shutdown 方法,不过 NamesrvController#initialize
这个方法里面倒是没看到有哪个地方返回 false 的,总而言之还是来看下 shutdown 这个方法的源码。
/*** NameServer 的 shutdown 方法, 当 NameServer 关闭的时候就会调用*/
public void shutdown() {// 关闭 Netty 远程通信服务this.remotingServer.shutdown();// 关闭请求处理的线程池this.remotingExecutor.shutdown();// 关闭定时任务服务this.scheduledExecutorService.shutdown();// 关闭 TLS 文件监听服务if (this.fileWatchService != null) {this.fileWatchService.shutdown();}
}
这个方法中其实就是关闭了前面初始化时候创建出来的线程池以及 Netty 连接。
@Override
public void shutdown() {try {// 关闭定时扫描 ResponseFuture 集合的任务if (this.timer != null) {this.timer.cancel();}// 关闭 Netty 相关的连接池this.eventLoopGroupBoss.shutdownGracefully();this.eventLoopGroupSelector.shutdownGracefully();// 关闭连接事件监听器if (this.nettyEventExecutor != null) {this.nettyEventExecutor.shutdown();}// 关闭通道 pipeline 请求处理组if (this.defaultEventExecutorGroup != null) {this.defaultEventExecutorGroup.shutdownGracefully();}} catch (Exception e) {log.error("NettyRemotingServer shutdown exception, ", e);}// 关闭异步回调处理线程池if (this.publicExecutor != null) {try {this.publicExecutor.shutdown();} catch (Exception e) {log.error("NettyRemotingServer shutdown exception, ", e);}}
}
4. 小结
好了,这篇文章我们介绍了 NameServer 的启动流程,并且介绍了 NettyServer 的创建和初始化,不过上面还遗留了一个问题,就是 3.4.2 NettyEventExecutor 是如何处理 NettyEvent 的,下一篇文章再来说。
如有错误,欢迎指出!!!!