【Netty4核心原理⑧】【揭开Bootstrap的神秘面纱 - 服务端Bootstrap❶】
文章目录
- 一、前言
- 二、流程分析
- 1. 创建 EventLoopGroup
- 2. 指定 Channel 类型
- 2.1 Channel 的创建
- 2.2 Channel 的初始化
- 3. 配置自定义的业务处理器 Handler
- 3.1 ServerBootstrap#childHandler
- 3.2 handler 与 childHandler 的区别
- 4. 绑定端口服务启动
- 三、bossGroup 与 workerGroup
- 1. EventLoopGroup 的指定
- 2. ServerBootstrap#bind
- 2.1 AbstractBootstrap#initAndRegister
- 2.1.1 反射创建 Channel
- 2.1.2 ServerBootstrap#init
- 2.1.3 Channel 注册
- 2.2 AbstractBootstrap#doBind0
- 四、ServerBootstrapAcceptor
- 1. ServerBootstrapAcceptor#channelRead
- 1.1 child.pipeline().addLast(childHandler)
- 1.2 childGroup.register(child)
- 1.3 ChannelInitializer#initChannel 的触发逻辑
- 2. ServerBootstrapAcceptor#channelRead 的触发时机
- 3. 总结
- 五、 服务端 Selector 事件轮询
- 1. SingleThreadEventExecutor#inEventLoop
- 2. SingleThreadEventExecutor#addTask
- 3. SingleThreadEventExecutor#startThread
- 3.1 SingleThreadEventExecutor#doStartThread
- 4. SingleThreadEventExecutor#wakeup
- 六、总结
- 1. 服务端启动
- 2. 客户端连接
- 七、参考内容
一、前言
本系列虽说本意是作为 《Netty4 核心原理》一书的读书笔记,但在实际阅读记录过程中加入了大量个人阅读的理解和内容,因此对书中内容存在大量删改。
本篇涉及内容 :第七章 揭开Bootstrap的神秘面纱
本系列内容基于 Netty 4.1.73.Final 版本,如下:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.73.Final</version></dependency>
系列文章目录:
【Netty4核心原理】【全系列文章目录】
在 【Netty4核心原理⑥】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❶】 内容中我们对 客户端Bootstrap 进行了分析,本篇来对 服务端 Bootstrap 进行分析。
(核心流程的分析比较琐碎和混乱,我尽力了,改了删,删了改一周,感觉也没办法讲清楚 )
二、流程分析
我们以下面的例子来分析:
public static class ChatServer {public void start(int port) throws InterruptedException {// 1. 创建 boss 和 worker 线程NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup)// 2. 指定 NioServerSocketChannel 类型.channel(NioServerSocketChannel.class)// 3. 配置自定义的业务处理器 Handler.childHandler(new ChannelInitializer<SocketChannel>() {// 客户端初始化@Overrideprotected void initChannel(SocketChannel client) throws Exception {...}})// 针对主线程配置 : 分配线程数量最大 128.option(ChannelOption.SO_BACKLOG, 128)// 针对子线程配置 保持长连接.childOption(ChannelOption.SO_KEEPALIVE, true);// 4. 绑定端口服务启动ChannelFuture channelFuture = bootstrap.bind(port).sync();System.out.println("服务启动成功, 端口 : " + port);// 阻塞主线程,防止直接执行 finally 中语句导致服务关闭,当有关闭事件到来时才会放行channelFuture.channel().closeFuture().sync();} finally {// 关闭线程池bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
从上面的可以的代码可以看出,服务端基本写法与客户端基本相同,基本上也是进行如下几个部分的初始化。
- 创建 EventLoopGroup :无论是服务端还是客户端,都必须指定 EventLoopGroup。在上面的代码中,指定了 NioEventLoopGroup ,表示一个 NIO 的 EventLoopGroup,不过服务端需要指定两个EventLoopGroup,一个是 bossGroup,用于处理客户端的连接请求,另一个是 workerGroup ,用于处理与各个客户端连接的 IO 操作。
- 指定 Channel 类型。服务端使用 NioServerSocketChannel(客户端使用的是 NioSocketChannel 类型)。在后面会通过反射来创建对应类型的 Channel。
- 配置自定义的业务处理器 Handler。
- 绑定端口服务启动。
我们按照上面的注释的部分来对每一步具体分析:
1. 创建 EventLoopGroup
在服务端初始化时,我们指定了两个 NioEventLoopGroup 对象,一个是 bossGroup ,另一个是 workerGroup。bossGroup 只用于服务端的 accept 事件,也就是用于处理客户端新连接接入的请求,而 workerGroup 负责客户端连接通道的 IO操作。(具体过程在下面【bossGroup 与 workerGroup】部分进行详细分析。)
2. 指定 Channel 类型
2.1 Channel 的创建
Channel 是对 Java 底层 Socket 连接的抽象,在 【Netty4核心原理⑥】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❶】 的 【NioSocketChannel 的创建】部分我们分析过客户端的 NioSocketChannel 的创建过程,这里 NioServerSocketChannel 的创建也基本与之相同:简单来说就是通过 ReflectiveChannelFactory#newChannel 反射创建 NioServerSocketChannel 实例,这里不再过多赘述。
2.2 Channel 的初始化
当我们通过反射创建 NioServerSocketChannel 实例时,在 NioServerSocketChannel 的构造函数中还存在一定的初始化逻辑,因此下面我们来看下 NioServerSocketChannel 的初始化过程,NioServerSocketChannel 的继承层次结构如下:
NioServerSocketChannel 存在几个重载的构造函数,不过最终都会调到如下方法:
public NioServerSocketChannel(ServerSocketChannel channel) {// 调用父类构造函数 AbstractChannel#AbstractChannelsuper(null, channel, SelectionKey.OP_ACCEPT);config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
在这个方法中,调用父类构造函数时的入参是 SelectionKey.OP_ACCEPT,而客户端传入的参数则是 SelectionKey.OP_READ。因为服务端在启动后需要监听客户端的连接请求,因此这里监听的时间类型是 SelectionKey.OP_ACCEPT,也就是告知 Selector 当前服务对客户端连接的事件感兴趣。
上面调用的父类构造函数 AbstractChannel#AbstractChannel,其代码如下:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {// 调用父类构造函数 AbstractChannel#AbstractChannelsuper(parent);this.ch = ch;// 保存兴趣事件this.readInterestOp = readInterestOp;try {// 设置为非阻塞(NIO)模式ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {...}throw new ChannelException("Failed to enter non-blocking mode.", e);}}
这里又调用了父类构造函数 AbstractChannel#AbstractChannel, 代码如下:
protected AbstractChannel(Channel parent) {this.parent = parent;id = newId();// 1. Unsafe 属性的初始化:服务端这里的类型是 NioMessageUnsafe,客户端的类型是 NioByteUnsafeunsafe = newUnsafe();// 2. ChannelPipeline 的初始化pipeline = newChannelPipeline();}
上面的方法中主要有两个方面:
unsafe = newUnsafe();
:Unsafe 属性的初始化pipeline = newChannelPipeline();
:ChannelPipeline 的初始化
这里的逻辑基本与客户端的初始化相同,具体逻辑不再赘述(如有需要可详参 【Netty4核心原理⑥】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❶】的 【AbstractNioByteChannel#AbstractNioByteChannel】部分 )。
这里我们提一下 unsafe
的类型:newUnsafe()
是个抽象方法,而客户端这里的 Channel 类型是 NioSocketChannel,而 服务端这边的类型是 NioServerSocketChannel。这两个类的 newUnsafe
方法并不相同,如下:
-
NioSocketChannel#newUnsafe 方法如下:
@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioSocketChannelUnsafe();}
-
NioServerSocketChannel#newUnsafe 方法如下:
@Overrideprotected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();}
因此,这里可以知道,客户端创建的 Unsafe 类型是 NioSocketChannelUnsafe,而服务端创建的则是 NioMessageUnsafe 类型。
客户端只有一种情况:
- 客户端启动时会与服务端创建连接,过程中会创建 NioSocketChannel 类型的 Channel,此时创建的 Unsafe 类型为 NioSocketChannelUnsafe。此时的兴趣事件是 OP_READ
服务端有两种情况:
- 服务端自身启动时,绑定端口时会创建 NioServerSocketChannel 类型,此时会创建 NioMessageUnsafe 类型。此时的兴趣事件是 OP_ACCEPT
- 当服务端启动后,客户端连接服务端时,服务端会为每个客户端创建 Channel,此时连接的客户端的 Channel 类型是 NioSocketChannel,因此此时会为每个客户端 Channel 创建的 Unsafe 类型为 NioSocketChannelUnsafe。(在下面【ServerBootstrapAcceptor#channelRead 的触发时机】部分会说明这个场景)此时的兴趣事件是 OP_READ
综上:NioMessageUnsafe 用来处理 OP_ACCEPT 事件,NioSocketChannelUnsafe 用来处理 OP_READ 事件
3. 配置自定义的业务处理器 Handler
3.1 ServerBootstrap#childHandler
在 【Netty4核心原理⑥】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❶】 我们有分析过 【Handler 添加过程】,在客户端中是调用 AbstractBootstrap#handler 方法添加 Handler, 而在服务端是调用 ServerBootstrap#childHandler 添加,除此之外基本逻辑相同,因此也不在赘述。
3.2 handler 与 childHandler 的区别
在客户端时我们调用的是 Bootstrap#handler 来添加处理器(这里 Bootstrap#handler 实际上是 AbstractBootstrap#handler 方法) ,而服务端这里则是调用的 ServerBootstrap#childHandler,下面我们来介绍下这两个方法:
-
方法所属类及用途概述
- AbstractBootstrap#handler(ChannelHandler):AbstractBootstrap 是 Bootstrap(客户端引导类)和 ServerBootstrap(服务器引导类)的父类。handler 方法可用于客户端和服务器,它设置的处理器会应用于 Bootstrap 或 ServerBootstrap 自身的 Channel。
- ServerBootstrap#childHandler(ChannelHandler):ServerBootstrap 专门用于创建和配置 Netty 服务器。childHandler 方法设置的处理器会应用于服务器接受的新连接所创建的子 Channel。
-
作用对象
- AbstractBootstrap#handler(ChannelHandler):对于客户端而言,该处理器应用于客户端连接服务器的 Channel,也就是客户端用于与服务器通信的通道。
对于服务器端,该处理器应用于负责接受新连接的 ServerSocketChannel。这个 Channel 的主要任务是监听端口并接受新的客户端连接。 - ServerBootstrap#childHandler(ChannelHandler):仅适用于服务器端,该处理器应用于服务器接受新连接后创建的子 Channel(即 SocketChannel)。这些子 Channel 用于与各个客户端进行实际的数据通信。
- AbstractBootstrap#handler(ChannelHandler):对于客户端而言,该处理器应用于客户端连接服务器的 Channel,也就是客户端用于与服务器通信的通道。
-
使用场景
- AbstractBootstrap#handler(ChannelHandler):客户端:可以用于设置一些与客户端连接建立过程相关的处理器,例如在连接建立前进行一些初始化操作,或者对连接状态进行监控。
服务器端:可以用于处理服务器的启动和关闭事件,或者对新连接的接受过程进行一些额外的处理。 - ServerBootstrap#childHandler(ChannelHandler):主要用于处理服务器与客户端之间的实际数据交互。可以添加编解码器、业务逻辑处理器等,对客户端发送的数据进行解码、处理,并将处理结果编码后返回给客户端。
- AbstractBootstrap#handler(ChannelHandler):客户端:可以用于设置一些与客户端连接建立过程相关的处理器,例如在连接建立前进行一些初始化操作,或者对连接状态进行监控。
4. 绑定端口服务启动
在 ServerBootstrap 做好准备工作后会调用 ServerBootstrap#bind 来绑定服务端口,在本文下面【ServerBootstrap#bind】中我们进行了详细介绍。
三、bossGroup 与 workerGroup
在服务端初始化时,我们指定了两个 NioEventLoopGroup 对象,一个是 bossGroup ,另一个是 workerGroup。bossGroup 只用于服务端的 accept 事件,也就是用于处理客户端新连接接入的请求(在 Netty 的单端口模式下,bossGroup 线程池只会使用其中一个线程处理连接请求,其他线程处于空闲状态),而 workerGroup 负责客户端连接通道的 IO操作。如下图:
这里思想是 多 Reactor 多线程模型的思想,在 【Netty4核心原理⑩】【大名鼎鼎的 EventLoop】 的【Reactor 线程模型】中详细介绍。
简单来说:服务端的 bossGroup 会不断监听是否有客户端连接,当发现有一个新的客户端连接到来时, bossGroup 就会为此连接初始化各项资源;然后从 workerGroup 中选出一个 EventLoop 绑定到此客户端连接中;接下来服务端与客户端的交互过程将全部在此分配的 EventLoop 中完成。
根据上面的内容,我们可以看到与 bossGroup 和 workerGroup 相关的有两部分:
- EventLoopGroup 的指定 :通过
bootstrap.group(bossGroup, workerGroup)
将 bossGroup 和 workerGroup 绑定到 ServerBootstrap 中 - ServerBootstrap#bind :通过
bootstrap.bind(port).sync()
启动服务,当接收到客户端消息时,会根据规则分配给 bossGroup 或 workerGroup 来处理。
下面我们来详细看看这两部分的内容。
1. EventLoopGroup 的指定
在 ServerBootstrap 初始化时会调用 bootstrap.group(bossGroup, workerGroup)
方法设置了 workerGroup 和 bossGroup 两个 EventLoopGroup,因此这里我们来看下ServerBootstrap#group的实现,代码如下:
// io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {// 调用父类 AbstractBootstrap#group(io.netty.channel.EventLoopGroup) 构造函数,执行 this.group = group 的操作super.group(parentGroup);if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}// 保存到 ServerBootstrap 的 childGroup 属性中this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");return this;}
AbstractBootstrap#group(io.netty.channel.EventLoopGroup) 代码如下:
public B group(EventLoopGroup group) {ObjectUtil.checkNotNull(group, "group");if (this.group != null) {throw new IllegalStateException("group set already");}// 保存到 ServerBootstrap 的 group 属性中this.group = group;return self();}
这里可以看到 ServerBootstrap#group 就是将 bossGroup 和 workerGroup 保存到 ServerBootstrap 的 group 和 childGroup 属性中。
需要注意的是:在 Netty 的单端口模式下,bossGroup 线程池只会使用其中一个线程处理连接请求,其他线程处于空闲状态。这个在我们下面源码分析过程中也可以看到。
- 每个监听的端口对应一个 NioServerSocketChannel,该 Channel 在初始化时从 bossGroup 中通过轮询算法(next() 方法)选择一个 EventLoop(线程)绑定,负责监听该端口的 TCP 连接事件。即使 bossGroup 配置了多个线程,单端口场景下仅需一个线程处理 accept 事件,其余线程不会参与工作
- 官方建议在单端口场景中将 bossGroup 的线程数设为 1(如 new NioEventLoopGroup(1)),避免资源浪费。多线程配置的 bossGroup 仅用于容错(如线程异常终止后重启)或多端口监听场景,单端口模式下无实际作用
2. ServerBootstrap#bind
在上面我们提到服务启动时会调用 bootstrap.bind(port)
方法,因此这里我们来看下ServerBootstrap#bind 方法(这里调用的是父类 AbstractBootstrap#bind 方法)。而 AbstractBootstrap#bind 方法直接调用了AbstractBootstrap#doBind 方法,因此我们这里来看 AbstractBootstrap#doBind 的实现:
// doBind 方法主要负责执行 Channel 的绑定操作,即将 Channel 绑定到指定的本地地址(如本地的 IP 地址和端口号)上。// 它在整个过程中需要先处理 Channel 的初始化与注册相关事宜,然后根据注册结果来决定如何进行后续的绑定操作,同时要妥善处理各种成功、失败以及异步等待的情况。private ChannelFuture doBind(final SocketAddress localAddress) {// 1.初始化并注册 Channelfinal ChannelFuture regFuture = initAndRegister();// 获取创建的 Channel, 并判断创建过程中是否出现异常final Channel channel = regFuture.channel();if (regFuture.cause() != null) {return regFuture;}// 当 regFuture.isDone() 返回 true,表明 Channel 的初始化和注册操作已经完成。// 此时创建一个新的 ChannelPromise(用于表示绑定操作的结果),然后调用 doBind0 方法来执行实际的绑定操作,传递注册相关的 ChannelFuture(regFuture)、Channel 实例、要绑定的本地地址以及刚创建的 ChannelPromise。最后返回这个 ChannelPromise,以便调用者后续可以通过它来跟踪绑定操作的完成情况及获取结果。if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise = channel.newPromise();// 2. 如果注册完成则调用 doBind0 方法doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case it's not.// 当 regFuture.isDone() 返回 false,也就是注册操作尚未完成时,创建一个 PendingRegistrationPromise 实例(它也是 ChannelPromise 的一种实现,用于处理这种延迟完成的情况)。final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause);} else {promise.registered();// 2. 如果注册完成则调用 doBind0 方法doBind0(regFuture, channel, localAddress, promise);}}});return promise;}}
上面注释写的比较清楚,核心就注释中标注的两个方法,我们按照注释顺序来看下面两个方法:
- AbstractBootstrap#initAndRegister :该方法内部会完成 Channel 的初始化以及将其注册到对应的 EventLoop 上,返回的 ChannelFuture(regFuture)用于表示这个初始化和注册操作的结果。通过 regFuture.channel可以获取到与之关联的 Channel 实例。
- AbstractBootstrap#doBind0 :该方法主要用于服务端启动时将 Channel 绑定到指定本地地址的具体操作。
关于 ChannelFuture 和 ChannelPromise 的介绍详参 【Netty4核心原理⑫】【异步处理双子星 Future 与 Promise】
下面我们来详细看看这两个方法:
2.1 AbstractBootstrap#initAndRegister
AbstractBootstrap#initAndRegister 方法的内容我们在 【Netty4核心原理⑦】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❷】 一文中有过详细分析,当时分析的是客户端(Bootstrap)初始化的过程,而我们这里是服务端(ServerBootstrap)的初始化过程。不过逻辑基本类似,这里简单来看下:
final ChannelFuture initAndRegister() {Channel channel = null;try {// 1. 反射创建 Channel ,这里是 NioServerSocketChannel 类型channel = channelFactory.newChannel();// 2. ServerBootstrap 重写了该方法,这里调用的是 ServerBootstrap#init // 对新创建的 Channel 进行初始化。init 方法通常用于配置 Channel 的一些属性,以及向 ChannelPipeline 中添加 ChannelHandlerinit(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}...// 3. 将 Channel 注册到 Selector 上,这里 config().group() 返回的是 EventLoop 是 bossGroup ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}...return regFuture;}
2.1.1 反射创建 Channel
channelFactory.newChannel()
通过反射创建 Channel,这里已经在 【Netty4核心原理⑥】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❶】 中做了详细介绍,这里不再赘述。
这里与客户端不同的点在于:服务端这里创建的类型是 NioServerSocketChannel,而客户端的类型是 NioSocketChannel。
2.1.2 ServerBootstrap#init
这里的方法被 ServerBootstrap 重写了,因此这里调用的是 ServerBootstrap#init。该方法主要负责对 Channel 进行初始化配置。这个初始化过程涵盖了设置 Channel 的选项、属性,以及向 Channel 的 ChannelPipeline 中添加特定的 ChannelHandler,特别是添加了一个 ChannelInitializer,它会在后续进一步完善 ChannelPipeline 的配置,为服务器端接收客户端连接并处理相关事务做准备。
具体实现如下:
// channel 是 服务端的 channel void init(Channel channel) {// 为给定的 Channel 设置一系列的选项参数,这些选项可能涉及网络连接的各种配置,比如缓冲区大小、超时时间等,同时会通过传入的 logger 记录相关的配置信息setChannelOptions(channel, newOptionsArray(), logger);// 给 Channel 设置一些自定义的属性,这些属性可以在后续的代码中用于识别、区分或者传递与 Channel 相关的特定信息。setAttributes(channel, newAttributesArray());// 获取 Channel 对应的 ChannelPipeline,后续将向这个管道中添加各种 ChannelHandler,用于处理不同阶段和类型的事件。ChannelPipeline p = channel.pipeline();// 准备相关配置参数final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);// 添加 ChannelInitializer 到 ChannelPipelinep.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {// 首先获取当前 Channel(ch)的 ChannelPipelinefinal ChannelPipeline pipeline = ch.pipeline();// 尝试从配置(config)中获取一个 ChannelHandlerChannelHandler handler = config.handler();if (handler != null) {// 如果不为 null,则将其添加到 pipeline 中,这个 ChannelHandler 可能是用于处理服务器端一些通用的业务逻辑或者预处理操作等。pipeline.addLast(handler);}// 通过 ch.eventLoop().execute 将添加 ServerBootstrapAcceptor 的操作提交到 Channel 的事件循环(EventLoop)中执行。// 这样做确保了添加操作在合适的线程上下文中进行,避免了多线程并发问题。ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// ServerBootstrapAcceptor 是一个关键的 ChannelHandler,它主要用于在服务器端接受客户端连接后,对新建立的客户端连接对应的 Channel(子 Channel)进行进一步的配置和处理,// 比如将其关联到指定的 EventLoopGroup,并添加相应的 ChannelHandler 等,使用之前准备好的 currentChildGroup、currentChildHandler、currentChildOptions 和 currentChildAttrs 等参数来完成这些配置pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});}
上面的注释写的比较清楚,具体不在赘述。需要注意上面代码中有下面一句:
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
这里的代码是将 ServerBootstrapAcceptor 添加到服务端 Channel 的 ChannelPipeline 中。关于 ServerBootstrapAcceptor 的内容我们下面 【ServerBootstrapAcceptor】部分会详细说明。
2.1.3 Channel 注册
上述代码中通过 config().group().register(channel);
完成了 Channel 的注册,在 【Netty4核心原理⑥】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❶】 有过对客户端的 Channel 注册分析,服务端与客户端在代码实现的关键逻辑上都是一致的,因此这里不再赘述。
客户端与服务端的不同之处:
config()
:客户端的config()
的实现在 Bootstrap#config,返回的类型是 BootstrapConfig,而服务端的是在 ServerBootstrap#config,返回的类型是 ServerBootstrapConfigconfig().group()
: BootstrapConfig#group() 和 ServerBootstrap#group() 都会调用 io.netty.bootstrap.AbstractBootstrap#group() 方法,该方法都会返回 AbstractBootstrap#group属性,因此从后续代码执行的逻辑上来说,服务端和客户端config().group().register(channel);
都会调用 EventLoopGroup#register(io.netty.channel.ChannelPromise) 方法(实现类是 MultithreadEventLoopGroup#register(Channel)),因此二者代码执行上并无区别。
2.2 AbstractBootstrap#doBind0
AbstractBootstrap#doBind0 方法主要用于执行实际的 Channel 绑定操作,即将 Channel 绑定到指定的本地地址(例如本地 IP 地址和端口号)上。若注册成功则进行绑定,若注册失败则相应地设置绑定操作的 ChannelPromise 为失败状态,以此来处理 Channel 绑定这一关键步骤,并通过合适的异步机制确保操作在 Channel 的事件循环线程中执行。
AbstractBootstrap#doBind0 具体实现如下:
// 入参// regFuture : 保存了 Channel 注册结果// channel : 注册的 服务端channel// localAddress : channel 将要绑定的本机地址// promise :保存 channel 与 localAddress 的绑定结果private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// 下面的注释指出该方法在 channelRegistered() 被触发之前调用,目的是给用户自定义的处理器(user handlers)提供机会,使其能够在 channelRegistered() 方法实现中对 ChannelPipeline 进行相关设置。// 这体现了方法调用在 Channel 生命周期中的相对位置以及与其他相关操作的关联性,确保整个流程的连贯性和灵活性,方便开发者基于 Channel 生命周期的不同阶段进行定制化处理。// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.// 将操作放在 EventLoop 中执行,能够保证操作在 Channel 对应的正确线程上下文中进行,避免多线程并发带来的一些诸如数据不一致、资源竞争等问题,遵循了 Netty 的事件驱动和线程模型设计原则。channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// 1. 如果Channel 注册操作成功,则尝试进行绑定操作,将 Channel 绑定到指定的本地地址 localAddress,并传入 ChannelPromise(promise)用于跟踪绑定操作的结果channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {// Channel 注册失败,直接将失败结果保存等待返回promise.setFailure(regFuture.cause());}}});}
上面代码可以看到,如果Channel 注册操作成功,则会调用 channel.bind(localAddress, promise)
方法进行绑定操作(而这个执行线程是我们上面提到的 Channel 绑定的 NioEventLoop 对应的本地线程,下面 【服务端 Selector 事件轮询】部分详细说明),而这里 channel.bind(localAddress, promise)
调用的是 AbstractChannel#bindSocketAddress, ChannelPromise) 方法,内部会调用 DefaultChannelPipeline#bind(SocketAddress, ChannelPromise) 方法,如下:
@Overridepublic final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {// 调用 AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)return tail.bind(localAddress, promise);}
这里 tail 是我们之前提到过的 ChannelPipeline内部双向链表的尾节点 TailContext 实例:
- 在 DefaultChannelPipeline 中维护了一个以 AbstractChannelHandlerContext 为节点元素的双向链表,而 head 和 tail 分别指向双向链表的头尾节点。
- 这里 TailContext#bind 是出站方法,会从双向链表的尾部Tail 节点往 头部Head 节点传播。
tail.bind(localAddress, promise)
实际调用的是 AbstractChannelHandlerContext#bind(SocketAddress, ChannelPromise) ,其实现如下:
// 将 Channel 绑定到指定的本地地址(localAddress)@Overridepublic ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {ObjectUtil.checkNotNull(localAddress, "localAddress");// 校验是否合法if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}// 1. 查找用于处理 bind 操作的 AbstractChannelHandlerContext// MASK_BIND 是一个掩码,用于确定查找的是与连接操作相关的出站处理器。这个处理器将负责实际的连接逻辑。final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();// 如果当前线程是 EventExecutor 的事件循环线程(executor.inEventLoop()),则直接调用 next.invokeBind(localAddress, promise) 方法if (executor.inEventLoop()) {// 2. 调用绑定操作next.invokeBind(localAddress, promise);} else {// 如果当前线程不是 EventExecutor 的事件循环线程,通过 safeExecute 方法将连接操作提交到 EventExecutor 中执行。// safeExecute 方法会确保任务在 EventExecutor 中安全执行,并在执行过程中处理可能出现的异常。safeExecute(executor, new Runnable() {@Overridepublic void run() {next.invokeBind(localAddress, promise);}}, promise, null, false);}return promise;}
关于 Pipeline 的传播过程,在 【Netty4核心原理⑪】【Netty 大动脉 Pipeline】 中我们有详细介绍,简单来说就是从 双向链表的头或尾节点开始遍历,找到能处理当前事件的 ChannelHandler 进行处理,篇幅所限这里不做过多介绍。这里我们简单来看两点:
-
findContextOutbound(MASK_BIND);
:该方法调用是 AbstractChannelHandlerContext#findContextOutbound ,其作用简单来说就是从 DefaultChannelPipeline 内部的双向链表的 Tail 开始,不断向前找到第一个出站且支持指定操作的 ChannelHandlerContext AbstractChannelHandlerContext,然后调用它的 invokeBind 方法。例如,当执行 connect 操作时,会传入与 connect 操作对应的掩码 MASK_CONNECT。该方法会在 ChannelPipeline 中从当前位置开始向后查找,找到第一个出站且支持 connect 操作的 ChannelHandlerContext。这个找到的 ChannelHandlerContext 对应的 ChannelHandler 将负责处理 connect 操作的具体逻辑。
通过这种方式,Netty 能够在 ChannelPipeline 中灵活地定位到合适的 ChannelHandler 来处理各种出站操作,实现了强大的可扩展性和定制性。而从 双向链表的Tail 节点开始查找到第一个支持 bind 出站事件的 ChannelHandler 就是 Head 节点,因此这里调用的就是 HeadContext#invokeBind
-
next.invokeBind(localAddress, promise)
:该方法的实现是 AbstractChannelHandlerContext#invokeBind,其实现如下:private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {// 确定是否需要调用当前 Handler 来处理 channelRegistered 事件。// 这个方法会检查 Handler 的状态 以决定是否执行该 Handler 的逻辑if (invokeHandler()) {try {// 将 handler 转换为 ChannelOutboundHandler 并调用其 bind 方法进行绑定操作// 这里 handler() 返回的是 HeadContext, 即双向链表的 头节点((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);} catch (Throwable t) {// 如果发生异常,通知异常情况notifyOutboundHandlerException(t, promise);}} else {// 如果 invokeHandler() 返回 false,表示不需要当前 Handler 处理 bind事件。// 此时,调用 bind(localAddress, promise) 方法将 bind事件传递给 ChannelPipeline 中的下一个 Handler 继续处理。bind(localAddress, promise);}}
上面可以看到核心在
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
中,而 HeadContext#handler 返回的是自身,也就是 HeadContext,因此这里调用的就是 HeadContext#bind 方法,如下:
@Overridepublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {// 交由 AbstractChannel.AbstractUnsafe#bindunsafe.bind(localAddress, promise);}
AbstractUnsafe#bind 会调用 AbstractUnsafe#doBind 来完成绑定,如下:
protected void doBind(SocketAddress localAddress) throws Exception {// 判断 Java 版本,调用不同的 APIif (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}}
至此完成了服务端口的绑定,也即代表着 服务端可以正式提供服务了。
四、ServerBootstrapAcceptor
在上面【ServerBootstrap#init】部分我们提到了 ServerBootstrap#init 中会创建一个 ServerBootstrapAcceptor 添加到服务端 Channel 的 pipeline中, 如下:
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
ServerBootstrapAcceptor 是一个关键的 ChannelHandler,它主要用于在服务器端接受客户端连接后,对新建立的客户端连接对应的 Channel(子 Channel)进行进一步的配置和处理。也就是说,当有新的客户端 Channel 连接到服务器后,ServerBootstrapAcceptor 会对其做一定的处理(通过 ServerBootstrapAcceptor#channelRead 方法)。
ServerBootstrapAcceptor 继承结构如下:
这里我们可以看到 ServerBootstrapAcceptor 继承了 ChannelInboundHandlerAdapter。实际上 ServerBootstrapAcceptor 还重写了 ChannelInboundHandlerAdapter 的 channelRead 方法。
ServerBootstrapAcceptor#channelRead 是当 Channel 从网络读取到数据时触发,数据会被传递给 ChannelPipeline 中的入站处理器进行处理,简单来说就是当客户端发起连接时会触发该方法。
ChannelInboundHandlerAdapter 是 ChannelInboundHandler 接口的实现类,在 【Netty4核心原理⑦】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❷】== 的 【入站 和 出站】部分我们详细介绍了 ChannelInboundHandler 各个方法的调用时机和作用。
下面我们详细来看这个 ServerBootstrapAcceptor#channelRead 方法。
1. ServerBootstrapAcceptor#channelRead
ServerBootstrapAcceptor#channelRead 方法在新的客户端连接被服务器接受时会被调用:当服务器监听到新的连接请求时,会接受这个连接,并创建一个新的 Channel 来表示这个连接。这个新的 Channel 会被封装成一个消息对象,通过 ChannelPipeline 进行传递,当这个新连接的消息对象传递到 ServerBootstrapAcceptor 时,channelRead 方法会被调用。
ServerBootstrapAcceptor#channelRead 实现如下:
/*** 重写的 channelRead 方法,当有新的通道连接事件触发时会被调用。* 该方法主要用于处理新连接的通道,为其添加处理器、设置通道选项和属性,并将其注册到子事件循环组中。** @param ctx 通道处理器上下文对象,提供了与通道和处理器相关的操作方法* @param msg 接收到的消息对象,这里表示新连接的通道*/
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 将接收到的消息对象强制转换为 Channel 类型,代表新的客户端的连接通道final Channel child = (Channel) msg;// 1.为新连接的通道的通道处理器链(pipeline)添加 处理器childHandler// childHandler 是通过 ServerBootstrap#childHandler 方法指定的处理器,因此这里是将我们自定义的 ChannelInitializer 添加到 客户端的 Channel 上child.pipeline().addLast(childHandler);// 为新连接的通道设置通道选项(如 TCP_NODELAY 等)// childOptions 是一个包含通道选项的集合// logger 用于记录操作过程中的日志信息setChannelOptions(child, childOptions, logger);// 为新连接的通道设置属性// childAttrs 是一个包含通道属性的集合setAttributes(child, childAttrs);try {// 2. 将新连接的通道注册到子事件循环组(childGroup)中// 注册操作是异步的,返回一个 ChannelFuture 对象表示操作的未来结果// childGroup 是 构造时传入的 currentChildGroup,也就是 workerGroup 对象// childGroup.register(child) 将 workerGroup 中的某个 EventLoop 与 NioSocketChannel 进行关联childGroup.register(child).addListener(new ChannelFutureListener() {/*** 当通道注册操作完成时,该方法会被调用。** @param future 表示通道注册操作的未来结果* @throws Exception 可能抛出的异常*/@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// 检查通道注册操作是否成功if (!future.isSuccess()) {// 如果注册操作失败,调用 forceClose 方法强制关闭该通道// 并传入失败的原因(通过 future.cause() 获取)forceClose(child, future.cause());}}});} catch (Throwable t) {// 如果在注册通道的过程中发生异常,调用 forceClose 方法强制关闭该通道// 并传入捕获到的异常对象forceClose(child, t);}
}
上面代码注释比较清楚,我们按照注释来看其中两点:
1.1 child.pipeline().addLast(childHandler)
child.pipeline().addLast(childHandler)
: 将我们创建的 匿名类 ChannelInitializer ( childHandler
)添加到客户端的 Channel 的 ChannelPipeline 中。
其中 child
就是 新连接的客户端的连接通道 Channel, childHandler
则是我们通过 ServerBootstrap#childHandler 方法指定的处理器,也就是我们添加的一开始的匿名类 ChannelInitializer 。如下图:
也就是说,这一步是会为新连接客户端的 Channel 的 Pipeline 上添加了我们自定义的 childHandler,也就是说此时这个客户端的 Channel 的 ChannelPipeline 结构如下:
1.2 childGroup.register(child)
childGroup.register(child)
:将 workerGroup 中的某个 EventLoop 与 NioSocketChannel 进行关联。当这个 NioSocketChannel 有事件发生时,可以直接交由当前绑定的 EventLoop 来处理(EventLoop 可以简单认为是一个本地线程的封装。其中 childGroup
是调用 ServerBootstrap#group 方法时传入的 currentChildGroup
,也就是示例中的 workerGroup
对象,实际类型是 NioEventLoopGroup;而 child
是 NioSocketChannel 的实例。
所以这里调用的是 NioEventLoopGroup#register,根据调用链路会调用到 SingleThreadEventLoop#register 方法,该方法在 【Netty4核心原理⑦】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❷】 一文的【SingleThreadEventLoop#register 】部分有过详细分析,这里不再赘述。)
在 Netty 中,NioEventLoop 和 NioSocketChannel 是一对多的绑定关系:
- NioEventLoop 职责:NioEventLoop 本质上是一个事件循环,其主要工作是负责监听多个 Channel 上的 I/O 事件,像连接、读写等操作。它借助 Java NIO 的 Selector 来实现多路复用,能够同时处理多个 Channel 的事件。
- NioSocketChannel 职责:NioSocketChannel 代表的是一个基于 Java NIO 的 TCP 套接字通道,用于网络数据的读写。
1.3 ChannelInitializer#initChannel 的触发逻辑
结合上面,我们可以得知:
-
当有新客户端连接时,会触发 ServerBootstrapAcceptor#channelRead 方法,该方法中会执行
child.pipeline().addLast(childHandler)
逻辑,将创建的 匿名类 ChannelInitializer (childHandler
) 添加到客户端的 Channel 的 ChannelPipeline 中。此时 ChannelPipeline 双向链表的结构如下图所示:
-
随后会调用
childGroup.register(child)
中会调用到 AbstractChannel.AbstractUnsafe#register0 方法,在该方法中会调用 DefaultChannelPipeline#invokeHandlerAddedIfNeeded 方法,而该方法会触发 客户端 Channel 中的 ChannelPipeline 每个 ChannelHandler 节点的 handlerAdded 方法。 -
因此这里触发 匿名类 ChannelInitializer (
childHandler
)的 handlerAdded方法,而 ChannelInitializer#handlerAdded 中会调用 ChannelInitializer#initChannel 方法,也就是我们自己自定义的逻辑,从而将自定义的 ChannelHandler 添加到 客户端的 ChannelPipeline 中,并且会将自身(ChannelInitializer)从 Pipeline 中移除,此时 ChannelPipeline 双向链表的结构如下图所示:(详细分析在 【Netty4核心原理⑥】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❶】 的 【Handler 添加过程】部分有过介绍)
2. ServerBootstrapAcceptor#channelRead 的触发时机
上面我们介绍了 ServerBootstrapAcceptor#channelRead 方法,知道了当有新的客户端连接时会触发该方法,下面我们来详细看看 ServerBootstrapAcceptor#channelRead 的触发时机,如下:
-
当有新的客户端连接请求到达服务器时,Selector 会检测到 OP_ACCEPT(连接接受)事件接着就会调用 NioServerSocketChannel#doReadMessages 方法,该方法的作用是 :尝试从底层的 Java NIO 通道中读取新连接的客户端信息,并将其封装为 Netty 中的 NioSocketChannel 对象,添加到传入的 List 中,以此来表示接收到了新的客户端连接。具体实现如下:
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {// 获取客户端新连接的 SocketChannel 对象SocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {// this 即 NioServerSocketChannel 对象,ch 是与客户端新创建的连接通道// 这里会创建一个新的 NioSocketChannel 对象添加到 buf 中buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {...}return 0;}
-
随后会利用 Netty 的 ChannelPipeline 机制,将读取事件逐级发送到各个 Handler 中,于是就会触发ServerBootstrapAcceptor#channelRead 方法。
ChannelPipeline 是一个由多个 ChannelHandler 组成的管道。当 NioEventLoop 触发OP_ACCEPT事件处理时,会从 ChannelPipeline 的头部开始,依次调用每个 ChannelHandler 的channelRead方法(如果 ChannelHandler 实现了 channelRead 接口),在 【Netty4核心原理⑪】【Netty 大动脉 Pipeline】 一文中详细介绍。
-
ServerBootstrapAcceptor 作为 ChannelPipeline中的一个 ChannelHandler,在这个事件传播过程中,当轮到它处理时,其 channelRead方法就会被触发,完成 NioEventLoop 和 NioSocketChannel 的绑定。
3. 总结
这里我们先简单总结下 ServerBootstrapAcceptor 的作用:
-
在服务端启动时会创建一个 Channel 代表本机与监听端口的通道, 在 ServerBootstrap#init 中会为这个 Channel 添加一个 ServerBootstrapAcceptor 的 ChannelHandler,代表当这个 Channel 有事件发生时会触发 ServerBootstrapAcceptor 的对应方法。
-
而作为服务端关系的事件是 ON_ACCEPT ,即客户端连接事件。所以当有客户端与服务端建立连接时,服务端会为每个客户端连接创建一个 客户端 Channel,随后会触发 ServerBootstrapAcceptor#channelRead。
调用链路简化后如下:
- AbstractNioMessageChannel.NioMessageUnsafe#read -> ChannelPipeline#fireChannelRead -> AbstractChannelHandlerContext#invokeChannelRead -> AbstractChannelHandlerContext#invokeChannelRead -> ChannelInboundHandler#channelRead -> ChannelHandlerContext#fireChannelRead
-
ServerBootstrapAcceptor#channelRead 中会在 客户端 Channel 的 Pipeline 上注册
childHandler
,并且会从childGroup
中选择一个 NioEventLoop 来负责这个 客户端 Channel 的相关事件的处理,这样就完成 NioEventLoop 和 NioSocketChannel 的绑定。
五、 服务端 Selector 事件轮询
服务端在启动时会调用 ServerBootstrap#bind 来完成 端口监听与事件处理,上面【AbstractBootstrap#doBind0 】部分我们看到了如下代码:
channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// 如果服务端注册操作成功,则尝试进行绑定操作,将 Channel 绑定到指定的本地地址 localAddress,并传入 ChannelPromise(promise)用于跟踪绑定操作的结果channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
这里 channel.eventLoop()
会返回一个 EventLoop (实际是 bossGroup 池中的一个 NioEventLoop 实例),所以这里的 ExecutoLoop#execute 方法实际是 NioEventLoop#execute。
在上面 【ServerBootstrapAcceptor#channelRead】部分我们提到 Netty 会为将 EventLoop 与 NioSocketChannel 进行关联,因此每个 Channel 都有一个自己的绑定的 EventLoop(但一个 EventLoop 可能对应多个 Channel)。
NioEventLoop#execute 实现在父类 SingleThreadEventExecutor#execute 中, SingleThreadEventExecutor#execute 的代码如下:
/*** 执行给定的任务。该方法会根据当前线程是否在事件循环中,以及事件循环的状态,来决定如何处理任务。** @param task 要执行的任务,以 Runnable 接口的形式表示* @param immediate 一个布尔值,指示是否立即唤醒事件循环来执行任务*/private void execute(Runnable task, boolean immediate) {// 1. 检查当前线程是否在事件循环中boolean inEventLoop = inEventLoop();// 2. 将任务添加到任务队列中addTask(task);// 如果当前线程不是 EventLoop 线程if (!inEventLoop) {// 3. 尝试启动一个 EventLoop线程,交由这个线程来处理任务。startThread();// 判断线程状态是否已经关闭,如果关闭则移除任务if (isShutdown()) {boolean reject = false;try {// 尝试从任务队列中移除刚刚添加的任务if (removeTask(task)) {// 如果移除成功,标记任务需要被拒绝reject = true;}} catch (UnsupportedOperationException e) {// 当任务队列不支持移除操作时,捕获该异常// 这种情况下,我们只能继续执行,期望在任务完全终止前能够处理它// 在最坏的情况下,我们会在终止时记录日志}// 如果任务需要被拒绝if (reject) {// 调用拒绝任务的方法:抛出 RejectedExecutionException 异常 reject();}}}// 如果添加任务不会自动唤醒事件循环,并且 immediate 参数为 trueif (!addTaskWakesUp && immediate) {// 4. 唤醒事件循环,使其能够立即处理任务wakeup(inEventLoop);}}
我们这里来简单梳理下逻辑:
- 通过
inEventLoop()
来获取当前线程的标识 ,该方法会检查 当前线程是否是 SingleThreadEventExecutor#thread 线程。- Netty 采用单线程模型来处理 Channel 的 I/O 操作和相关任务。每个 EventExecutor 通常对应一个线程,这个线程负责执行注册到该 EventExecutor 上的 Channel 的 I/O 事件处理逻辑以及任务队列中的任务。
- 在 SingleThreadEventExecutor 类中,定义了一个
private volatile Thread thread
成员变量,用于存储与该 EventExecutor 关联的线程对象。 inEventLoop()
为 false 的情况有两种:- SingleThreadEventExecutor#thread 为空 :这种情况说明 SingleThreadEventExecutor 绑定的线程还没初始化,下面会调用
startThread()
来开启一个新线程 - SingleThreadEventExecutor#thread 不为空,但是并不等于当前线程:这种情况说明当前调用线程并非是 SingleThreadEventExecutor#thread 线程,
- SingleThreadEventExecutor#thread 为空 :这种情况说明 SingleThreadEventExecutor 绑定的线程还没初始化,下面会调用
- 将当前需要处理的任务通过
addTask(task)
方法添加到任务队列(SingleThreadEventExecutor#taskQueue) 中。- SingleThreadEventExecutor 内部存在一个任务队列 taskQueue 属性
- 无论 inEventLoop() 返回什么结果, 都会将 task 添加到 SingleThreadEventExecutor 的任务队列中。因此在某些情况下可能会存在将任务添加到错误的 SingleThreadEventExecutor 队列中。
- SingleThreadEventExecutor(实际类型是 NioEventLoop) 会通过一个循环不停的从 taskQueue 中获取任务执行。
- 如果当前线程并不是 SingleThreadEventExecutor#thread 线程中,则调用
startThread()
启动一个新的 EventLoop 线程。startThread()
并不一定会启动一个新线程,如果 SingleThreadEventExecutor 已经绑定了线程(SingleThreadEventExecutor#thread 不为空)则不会做任何处理
- 接着通过
isShutdown()
方法检查事件循环是否已关闭,若已关闭,尝试从任务队列中移除该任务。若移除成功,调用reject()
方法拒绝该任务。 - 唤醒事件循环:如果添加任务不会自动唤醒事件循环,且 immediate 参数为 true,则调用
wakeup()
方法唤醒事件循环,使其能立即处理任务。
下面我们我们针对上面注释的几点来进行详细分析:
1. SingleThreadEventExecutor#inEventLoop
SingleThreadEventExecutor#inEventLoop 用于判断当前调用线程是否是 Channel 绑定的线程。Netty 会为每个 Channel 绑定一个 NioEventLoop,而每个 NioEventLoop 绑定一个 本地线程 Thread,因此 每个 Channel 都会绑定一个线程,这里就是判断:调用该方法的当前线程是否是 该 Channel 绑定的线程。
SingleThreadEventExecutor#inEventLoop 调用的是其父类 AbstractEventExecutor#inEventLoop() 方法,实现如下:
// io.netty.util.concurrent.AbstractEventExecutor#inEventLoop@Overridepublic boolean inEventLoop() {// 接口方法,由其子类 SingleThreadEventExecutor 实现, 所以这里调用的是 SingleThreadEventExecutor#inEventLoopreturn inEventLoop(Thread.currentThread());}
而 SingleThreadEventExecutor#inEventLoop 实现如下:
@Overridepublic boolean inEventLoop(Thread thread) {// 判断传入的线程是否是 SingleThreadEventExecutor 所持有的线程。return thread == this.thread;}
2. SingleThreadEventExecutor#addTask
SingleThreadEventExecutor#addTask 是将任务添加到队列中等待执行 ,实现如下:
可以看到无论 inEventLoop() 返回什么结果, 都会将 task 添加到 SingleThreadEventExecutor 的任务队列中。因此在某些情况下可能会存在将任务添加到错误的 SingleThreadEventExecutor 队列中。
// 添加任务到任务队列中protected void addTask(Runnable task) {ObjectUtil.checkNotNull(task, "task");if (!offerTask(task)) {// 如果添加失败则拒绝任务reject(task);}}final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}// 将任务添加到任务队列return taskQueue.offer(task);}// 拒绝任务protected final void reject(Runnable task) {rejectedExecutionHandler.rejected(task, this);}
逻辑比较简单:就是将任务添加到 taskQueue 队列中, 当任务添加失败时会触发拒绝策略。
3. SingleThreadEventExecutor#startThread
SingleThreadEventExecutor#startThread 的调用前提是 SingleThreadEventExecutor#inEventLoop 返回 false。每个 SingleThreadEventExecutor 都会绑定一个本地线程,并且只会初始化一次,所以在 SingleThreadEventExecutor#startThread中会通过 state
判断 SingleThreadEventExecutor 是否进行过初始化,如果没有则会调用 doStartThread()
方法进行初始化。
- STATE_UPDATER 是 SingleThreadEventExecutor 内部维护的一个属性,他的作用是标识当前 Thread 的状态。在初始的时候,STATE_UPDATER == ST_NOT_STARTED,因此第一次调用 startThread 方法时会进入 if 语句中,便会调用 doStartThread() 方法。
- 因为可能会存在多个 Channel 注册到同一个 NioEventLoop上,所以存在 NioEventLoop 在当前 Channel 注册前就已经被其他 Channel 注册的情况,此时 NioEventLoop 的 thread 就已经初始化过了,这里便不会再调用 doStartThread() 方法。
SingleThreadEventExecutor#startThread 具体代码如下:
private void startThread() {// CAS 确保原子性if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {// 里面会调用 SingleThreadEventExecutor.this.run(),这个 this 就是 NioEventLoop 对象doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}
下面我们来看 SingleThreadEventExecutor#doStartThread 的实现。
3.1 SingleThreadEventExecutor#doStartThread
SingleThreadEventExecutor#doStartThread 方法是 SingleThreadEventExecutor 类中的一个关键方法,其主要功能是启动一个新线程来执行事件循环逻辑。该方法会将事件循环的执行封装在一个 Runnable 任务中,并通过 executor 来执行这个任务。在执行过程中,会处理线程中断、事件循环的执行、状态更新、任务清理以及线程资源的释放等操作,确保事件循环的正常启动和优雅关闭。其代码如下:
private void doStartThread() {// 使用 assert 语句确保 thread 为 null,即当前线程还未启动。这是一个防御性编程的检查,防止线程被重复启动。assert thread == null;// 交给线程池执行一个任务executor.execute(new Runnable() {@Overridepublic void run() {// 将当前正在执行的线程赋值给 thread 变量,以便后续对该线程进行管理。thread = Thread.currentThread();// 如果 interrupted 标志为 true,则中断当前线程if (interrupted) {thread.interrupt();}boolean success = false;// 更新最后执行时间,用于记录事件循环的执行时间updateLastExecutionTime();try {// 调用 SingleThreadEventExecutor 类的 run 方法,开始执行事件循环逻辑。run 方法通常包含了事件处理、任务执行等核心逻辑。SingleThreadEventExecutor.this.run();// 如果事件循环执行成功,将 success 标志设置为 truesuccess = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {// 对状态更新和任务清理....}});}
这里我们来看下面两点 :
-
executor.execute
: 在 NioEventLoopGroup 构建时默认会创建一个 ThreadPerTaskExecutor,它会为每个任务创建一个新线程,线程工厂使用 newDefaultThreadFactory() 创建(在 【Netty4核心原理⑥】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❶】 【创建工作线程】部分有提及) -
SingleThreadEventExecutor.this.run()
:当线程创建完成后会调用 SingleThreadEventExecutor#run 方法,该方法是抽象方法,实现在 NioEventLoop#run 中。在 NioEventLoop#run 内部有一个无限循环,在循环内部中 NioEventLoop 会做两件事:- 会通过 Java NIO 的 Selector 监听注册在其上的 Channel 的 I/O 事件,当 Selector 检测到有 Channel 发生 I/O 事件时,NioEventLoop 会调用相应的 Channel 处理器来处理这些事件;
- 执行任务队列中积累的任务,这些任务可以是用户提交的自定义任务,也可以是内部产生的系统任务。
因为这是个无限循环,这也就意味着,这个线程将 “终生” 陷入这个循环中,这样也就完成了这个这个 NioEventLoop 所谓的事件循环(EventLoop)
Java NIO 的 Selector 允许一个线程同时监听多个 Channel 的 IO 事件,因此当多个 Channel 注册到同一个 NioEventLoop 上时会同时注册兴趣事件到 NioEventLoop#selector 属性上。
关于 NioEventLoop#run 的具体内容,本篇篇幅所限,详参 【Netty4核心原理⑨】【揭开Bootstrap的神秘面纱 - 服务端Bootstrap❷】中 【NioEventLoop#run】部分。
4. SingleThreadEventExecutor#wakeup
上面代码在执行最后会尝试唤醒任务,如下:
// 如果添加任务不会自动唤醒事件循环,并且 immediate 参数为 trueif (!addTaskWakesUp && immediate) {// 4. 唤醒事件循环,使其能够立即处理任务wakeup(inEventLoop);}
这里我们看下调用前的两个条件:
addTaskWakesUp
:由 SingleThreadEventExecutor 构造函数入参传入,在 NioEventLoop 中默认为 false。immediate
:取值来自于!(task instanceof LazyRunnable) && wakesUpForTask(task)
,默认为 true- task 类型为 Runnable,所以
!(task instanceof LazyRunnable)
为 true wakesUpForTask(task)
方法没有逻辑,直接返回 true。
- task 类型为 Runnable,所以
综上,我们这里是必定会调用 wakeup(inEventLoop)
,其实现为 NioEventLoop#wakeup,其主要作用是唤醒正在阻塞的 Selector。
在 NioEventLoop#run 中会判断如果任务队列中没有任务,则会调用 Selector#select 来等待 NIO 事件发生,所以这里需要再将任务添加后唤醒 Selector。关于NioEventLoop#run的内容我们在 【Netty4核心原理⑨】【揭开Bootstrap的神秘面纱 - 服务端Bootstrap❷】 中会详细介绍。
如下:
@Overrideprotected void wakeup(boolean inEventLoop) {if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {// 唤醒 Selectorselector.wakeup();}}
其中 nextWakeupNanos.getAndSet(AWAKE) != AWAKE
:
nextWakeupNanos
是一个 AtomicLong 类型的变量,用于记录下一次唤醒 Selector 的时间。getAndSet(AWAKE)
方法会先获取 nextWakeupNanos 的当前值,然后将其设置为 AWAKE。!= AWAKE
表示当前 nextWakeupNanos 的值不是 AWAKE,即 Selector 处于阻塞状态,需要被唤醒。
六、总结
上面的分析有点零碎,我们这里来一个完整总结。
1. 服务端启动
-
服务端启动引导(ServerBootstrapbootstrap )创建与配置
-
创建 ServerBootstrapbootstrap 对象:
- 首先会创建一个Bootstrap实例,它是 Netty 服务端的启动引导类。例如:
ServerBootstrapbootstrap = new ServerBootstrap();。
- 首先会创建一个Bootstrap实例,它是 Netty 服务端的启动引导类。例如:
-
设置线程模型(EventLoopGroup):
- 为 ServerBootstrap 配置 EventLoopGroup。通常来说,我们基于 主从 Reactor 多线程模型 会指定两个 NioEventLoopGroup (bossGroup 和 workerGroup) 来处理服务端网络事件和客户端连接后的事件。
NioEventLoopGroup 内部会根据指定的线程数量通过 NioEventLoopGroup#newChild 方法来循环创建 NioEventLoop。每个 NioEventLoop 内部都与一个 JVM 本地线程绑定,因此 NioEventLoop实际上是一个单线程的执行器(Executor),它会在一个循环中不断地获取和处理事件。这个循环被称为事件循环(Event Loop),这也是NioEventLoop名字的由来。
- 为 ServerBootstrap 配置 EventLoopGroup。通常来说,我们基于 主从 Reactor 多线程模型 会指定两个 NioEventLoopGroup (bossGroup 和 workerGroup) 来处理服务端网络事件和客户端连接后的事件。
-
指定通道类型(Channel):
-
通过
bootstrap.channel(NioServerSocketChannel.class)
指定服务端使用的通道类型为 NioServerSocketChannel(基于 NIO 的套接字通道)。不同的通道类型适用于不同的网络协议和传输方式。bootstrap.channel(NioServerSocketChannel.class) 是记录当前使用的 Channel 类型,在 Bootstrap#bind方法中会通过反射创建 NioServerSocketChannel对象。
-
-
添加处理器(ChannelHandler)到ChannelPipeline:
-
使用handler方法添加一个ChannelInitializer,它是一个特殊的ChannelHandler,用于在Channel初始化时向ChannelPipeline添加其他ChannelHandler。例如:
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 在这里添加各种ChannelHandler,如编解码器、业务逻辑处理器等ch.pipeline().addLast(new SomeDecoder());ch.pipeline().addLast(new MyBusinessHandler());ch.pipeline().addLast(new SomeEncoder());} });
ChannelInitializer#initChannel 方法只会执行一遍,当方法执行结束后会将其从 ChannelPipeline 中 移除,通过 ChannelPipeline#addLast 方法将用户添加的自定义的 ChannelHandler 会在 ChannelPipeline内部形成一个双向链表。
-
ChannelPipeline是一个由多个ChannelHandler组成的管道,用于处理Channel上的事件。每个ChannelHandler负责处理特定类型的事件或者对数据进行特定的操作,数据会按照添加的顺序在这些ChannelHandler之间流动。
ChannelPipeline#addLast 方法会将添加的 ChannelHandler 包装成 AbstractChannelHandlerContext,并会根据其实现的方法判断当前 Handler 支持处理哪些事件,当对应事件来临时会找到可以处理该事件的 Handler 并调用。
-
-
-
在上面对 ServerBootstrap 的基础配置完成后会调用
bootstrap.bind(port)
方法完成服务端的端口绑定与监听,在这个方法首先会调 AbstractBootstrap#initAndRegister 通过反射创建一个 NioServerSocketChannel 实例,并且会通过config().group().register(channel)
将NioServerSocketChannel 注册到 NioEventLoop 上。如下:// 省略部分逻辑final ChannelFuture initAndRegister() {...// 反射创建 Channelchannel = channelFactory.newChannel();// 该方法的实现在 ServerBootstrap 中init(channel);...// 将 Channel 注册到一个 NioEventLoop 上ChannelFuture regFuture = config().group().register(channel);...return regFuture;}
channelFactory.newChannel();
会通过工厂类通过反射创建一个 Channel,Channel 的类型是我们通过 AbstractBootstrap#channel 方法指定的类型,即 NioServerSocketChannel 类型。init(channel)
方法的实现在 ServerBootstrap 中,目的是完成 Channel创建后的一些初始化操作,在这里会往当前服务端 Channel 的 Pipeline 添加一个 ServerBootstrapAcceptor 对象,此时 Channel 的 Pipeline 链表指向是 :Head -> ServerBootstrapAcceptor -> Tail
config().group()
获取到的是bossGroup
,config().group().register(channel)
调用的是 NioEventLoopGroup#register 方法,该方法会从bossGroup
中根据规则选择出一个 NioEventLoop ,因此这里config().group().register(channel)
调用的就是 NioEventLoop#register。
-
NioEventLoop#register 中会调用到 AbstractChannel.AbstractUnsafe#register(在 【Netty4核心原理⑦】【揭开Bootstrap的神秘面纱 - 客户端Bootstrap ❷】== 中的【 AbstractChannel.AbstractUnsafe#register】有所提及)AbstractChannel.AbstractUnsafe#register 中存在如下逻辑:
// AbstractChannel.AbstractUnsafe#register 部分代码... if (eventLoop.inEventLoop()) {register0(promise);} else {...// 调用 SingleThreadEventExecutor#executeeventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});...}...
- AbstractChannel.AbstractUnsafe#register 方法会通过
eventLoop.inEventLoop()
判断 当前线程是否是 SingleThreadEventExecutor#thread 线程。由于我们这里是服务端的启动过程,所以 SingleThreadEventExecutor#thread 为空,因此eventLoop.inEventLoop()
为 false- Netty 采用单线程模型来处理 Channel 的 I/O 操作和相关任务。每个 EventExecutor 通常对应一个线程,这个线程负责执行注册到该 EventExecutor 上的 Channel 的 I/O 事件处理逻辑以及任务队列中的任务。
- 在 SingleThreadEventExecutor 类中,定义了一个
private volatile Thread thread
成员变量,用于存储与该 EventExecutor 关联的线程对象。 inEventLoop()
为 false 的情况有两种:- SingleThreadEventExecutor#thread 为空 :这种情况说明 SingleThreadEventExecutor 绑定的线程还没初始化。
- SingleThreadEventExecutor#thread 不为空,但是并不等于当前线程:这种情况说明当前调用线程并非是 SingleThreadEventExecutor#thread 线程,
- 当
eventLoop.inEventLoop()
为 false 时会调用会调用eventLoop.execute
。eventLoop.execute
方法会将register0(promise);
封装成一个 Runnable 添加到 SingleThreadEventExecutor#taskQueue 任务队列中。然后判断 NioEventLoop#thread 是否已经初始化(我们这个场景没有初始化),如果没有初始化则会通过 SingleThreadEventExecutor#doStartThread 新建一个线程thread
并启动,然后将其赋值给 NioEventLoop#thread,确保 NioEventLoop#thread 只会初始化一次。
// SingleThreadEventExecutor#doStartThread 简化代码如下private void doStartThread() {assert thread == null;// 通过线程工厂创建一个线程executor.execute(new Runnable() {@Overridepublic void run() {...// 赋值给 SingleThreadEventExecutor#threadthread = Thread.currentThread();...// 调用 NioEventLoop#run 方法SingleThreadEventExecutor.this.run();...});}
- 新建的
thread
启动时会执行SingleThreadEventExecutor.this.run();
来调用 NioEventLoop#run 方法,在 NioEventLoop#run 方法中存在一个死循环(事件循环),循环中的逻辑是监听 Channel 事件 和 处理 SingleThreadEventExecutor#taskQueue 任务队列中的任务。也就是说 NioEventLoop#thread 创建就 “终身” 陷在 “事件循环” 中不停的执行如下逻辑 :
- 会通过 Java NIO 的 Selector 监听注册在其上的 Channel 的 I/O 事件,当 Selector 检测到有 Channel 发生 I/O 事件时,NioEventLoop 会调用相应的 Channel 处理器来处理这些事件;
- 执行任务队列 taskQueue 中积累的任务,这些任务可以是用户提交的自定义任务,也可以是内部产生的系统任务。
- 在 NioEventLoop#thread 处理任务队列任务的时候,会将
register0(promise);
任务从队列中取出,并在 NioEventLoop#thread 线程内部执行。这既是 Netty 的 无锁化的串行理念。register0(promise);
中不仅会完成将当前的服务端 Channel 注册到 Java NIO 的 Selector 上。在注册前后会调用一些前后扩展逻辑。如在注册后会调用 DefaultChannelPipeline#fireChannelRegistered 触发 Channel 的 ChannelPipeline 中的 ChannelInboundHandler#channelRegistered 方法。
- AbstractChannel.AbstractUnsafe#register 方法会通过
-
至此,我们知道了 AbstractBootstrap#initAndRegister 会创建一个 Channel ,并注册到 NioEventLoop 上,而在注册过程中会为 NioEventLoop 创建一个线程与 NioEventLoop 绑定,并这个线程在 无限循环 中完成 监听处理 Selector 上的 Channel 事件和处理任务队列中的任务两件事。
-
回到 AbstractBootstrap#doBind 中,当 AbstractBootstrap#initAndRegister 执行结束后会调用 AbstractBootstrap#doBind0 方法完成方法绑定,而在 AbstractBootstrap#doBind0 方法中则是调用
channel.eventLoop().execute
来执行channel.bind(localAddress, promise)
逻辑。 最终会会调用到 io.netty.channel.Channel.Unsafe#bind 方法来完成端口的绑定。- 这里的
channel.eventLoop().execute
调用的也是 SingleThreadEventExecutor#execute,方法,而在 AbstractBootstrap#initAndRegister 方法中我们已经对 channel 绑定的 NioEventLoop 做了线程初始化,这里调用的 SingleThreadEventExecutor#execute 方法中并不会再创建新的线程。 - 这里也是将
channel.bind(localAddress, promise)
封装成一个 Runnable 任务添加到任务队列 taskQueue 中,等待channel.eventLoop()
在 “事件循环” 中从任务队列中将其取出并执行。 - 这里执行
channel.bind(localAddress, promise)
的线程是当前 Channel 绑定的 NioEventLoop 持有的线程。
- 这里的
// 入参// regFuture : 保存了 Channel 注册结果// channel : 注册的 channel// localAddress : channel 将要绑定的本机地址// promise :保存 channel 与 localAddress 的绑定结果private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// 下面的注释指出该方法在 channelRegistered() 被触发之前调用,目的是给用户自定义的处理器(user handlers)提供机会,使其能够在 channelRegistered() 方法实现中对 ChannelPipeline 进行相关设置。channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {// 1. 如果Channel 注册操作成功,则尝试进行绑定操作,将 Channel 绑定到指定的本地地址 localAddress,并传入 ChannelPromise(promise)用于跟踪绑定操作的结果channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {// Channel 注册失败,直接将失败结果保存等待返回promise.setFailure(regFuture.cause());}}});}
- 至此,服务端的启动过程大体就已经完成,可以得知:服务端在启动时会创建了一个 ServerSocketChannel 并在注册到 bossGroup 中的某个 NioEventLoop 上,这个 NioEventLoop 会将其绑定到指定的端口上。绑定成功后,ServerSocketChannel 开始监听客户端的连接请求。当有新的连接到来时,bossGroup 中的线程会接受这个连接,并将新连接的 SocketChannel 注册到 workerGroup 中的某个 EventLoop 上。
2. 客户端连接
服务端启动后,客户端就可以发起连接客户端的请求了,当客户端发起连接请求时会与服务端端口建立 TCP 连接,我们在上面提到过,服务端在启动后会将服务端的 NioServerSocketChannel 注册到一个 NioEventLoop上,这个 NioEventLoop 会调用 run 方法,在这个方法中会通过 Java NIO 的 Selector 不断监听事件。因此,当客户端发起连接时,NioEventLoop#run 方法中就会监听到。
-
在 NioEventLoop#run 的事件循环过程中,服务端监听到事件后后会通过 NioEventLoop#processSelectedKeys 来处理兴趣事件,随后会调用到 NioEventLoop#processSelectedKey 方法来处理事件,在这个方法中会判断事件类型(OP_CONNECT、OP_WRITE、OP_READ、OP_ACCEPT),我们在上面分析代码时说过服务端这边关心的事件类型是 OP_ACCEPT,而这里遇到 OP_ACCEPT 事件后会调用AbstractNioMessageChannel.NioMessageUnsafe#read 来处理。
客户端只有一种情况:
- 客户端启动时会与服务端创建连接,过程中会创建 NioSocketChannel 类型的 Channel,此时创建的 Unsafe 类型为 NioSocketChannelUnsafe。此时的兴趣事件是 OP_READ
服务端有两种情况:
- 服务端自身启动时,绑定端口时会创建 NioServerSocketChannel 类型,此时会创建 NioMessageUnsafe 类型。此时的兴趣事件是 OP_ACCEPT
- 当服务端启动后,客户端连接服务端时,服务端会为每个客户端创建 Channel,此时连接的客户端的 Channel 类型是 NioSocketChannel,因此此时会为每个客户端 Channel 创建的 Unsafe 类型为 NioSocketChannelUnsafe。
综上:NioMessageUnsafe 用来处理 OP_ACCEPT 事件,NioSocketChannelUnsafe 用来处理 OP_READ 事件。(
关于 NioMessageUnsafe 和 NioSocketChannelUnsafe 的分析详参 【Netty4核心原理⑨】【揭开Bootstrap的神秘面纱 - 服务端Bootstrap❷】 的【unsafe.read()】部分) -
AbstractNioMessageChannel.NioMessageUnsafe#read 中会通过 AbstractNioMessageChannel#doReadMessages 方法读取连接消息,同时会获取客户端连接的 Channel ,并封装成一个 NioSocketChannel 对象, 如下:
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {...// 获取客户端连接 ChannelSocketChannel ch = SocketUtils.accept(javaChannel());...// 封装成 NioSocketChannel保存到 buf 中供后面获取if (ch != null) {buf.add(new NioSocketChannel(this, ch));return 1;}...return 0;}
-
之后 AbstractNioMessageChannel.NioMessageUnsafe#read 会通过
pipeline.fireChannelRead(readBuf.get(i));
触发 ChannelInboundHandler 的 ChannelRead 方法,这里便会触发到 服务端的 ServerBootstrap.ServerBootstrapAcceptor#channelRead 方法到目前为止还是处于服务端的逻辑,执行线程都是服务端 Channel 绑定的 NioEventLoop 所持有的 Thread。
-
ServerBootstrap.ServerBootstrapAcceptor#channelRead 方法中存在如下逻辑:
- 代码
child.pipeline().addLast(childHandler);
会将childHandler
(自定义的 ChannelInitializer )添加到当前 Channel (客户端 Channel)的 Pipeline 中 ,此时客户端 Channel 的 ChannelPipeline 双向链表的结构是Head -> ChannelInitializer -> Tail
- 代码
childGroup.register(child)
会从 workerGroup 中选择一个 NioEventLoop 并将 客户端 Channel 注册到这个 NioEventLoop 中,而在注册过程中会调用 AbstractChannel.AbstractUnsafe#register,这里就与服务端 Channel 注册的逻辑相同了,简单来说就是会初始化 NioEventLoop 中的 thread (如果需要的话),并启动这个线程并调用SingleThreadEventExecutor.this.run();
使得这个线程陷入 “事件循环”中。这样这个 NioEventLoop 就会在 “事件循环” 中不断监听当前客户端 Channel 的事件并分发处理以及处理 NioEventLoop 中的任务一个 NioEventLoop 可能会对应多个 Channel,因此如果这个 NioEventLoop 已经绑定过 Channel ,那么说明这个 NioEventLoop 的 thread 已经初始化完成了,那么此时就不会再初始化 thread,那么当前所需要做的就是将 客户端 Channel 注册到 NioEventLoop 的 Selector 上,因为Java NIO Selector 允许一个线程监听多个 Channel,所以一个 NioEventLoop 可以处理多个Channel。
- 这样就完成了 客户端 Channel 注册代 NioEventLoop 的逻辑,这个 NioEventLoop 会一直关注 客户端 Channel 的信息,当这个 Channel 有事件发生时 NioEventLoop 便可以及时响应处理,并分发到 ChannelPipeline 中对应事件的 ChannelHandler 中处理。
这里客户端类型是 NioSocketChannel 类型,持有的 Unsafe 类型是 NioSocketChannelUnsafe,关心的事件类型是 OP_READ,因此当客户端发送消息时,OP_READ 事件会就绪,NioEventLoop 中的事件循环会感知到并调用 NioSocketChannelUnsafe#read 来处理, NioSocketChannelUnsafe#read 与 NioMessageUnsafe#read 的逻辑并不相同,NioSocketChannelUnsafe#read 会从SocketChannel 中读取数据并进行相应的处理和事件通知。
- 代码
七、参考内容
- 《Netty4核心原理》
- 豆包