Java IO及Netty框架学习小结
Netty
netty官网: Netty
什么是Netty?
- Netty 是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
- Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发网络应用程序(例如协议服务器和客户端)。它极大地简化了 TCP 和 UDP 套接字服务器等网络编程。
Java IO
什么是Java IO?
官方文档:https://docs.oracle.com/javase/tutorial/essential/io/
I/O(Input/Output) 是计算机与外部设备(磁盘、网络、键盘等)进行数据交换的过程
分类
1.BIO (阻塞IO)
每当有一个客户端与服务器进行连接,服务器就会创建一个线程去处理当前连接,当通道没有数据的时候,线程会阻塞等待
@Slf4j
public class BioDemo1 {public static void main(String[] args) throws IOException {ExecutorService executorService = Executors.newCachedThreadPool();// 1.创建服务器监听端口SocketServerSocket socketServer = new ServerSocket(3333);log.info("服务器启动成功");// 2.等待客户端连接while (true) {Socket socket = socketServer.accept();log.info("有客户端连接了" + socket.getRemoteSocketAddress());// 3.创建线程处理客户端连接executorService.execute(() -> {try {handle(socket);} catch (IOException e) {e.printStackTrace();}});}}/*** 处理客户端连接* @param socket*/private static void handle(Socket socket) throws IOException {if (socket.isClosed()) return;// 1.打印线程信息log.info("线程信息:{}", Thread.currentThread().getName());InputStream inputStream = null;try {// 2.读取通道数据inputStream = socket.getInputStream();// 2.1 获取输入流// 2.2 读取数据byte[] bytes = new byte[1024]; // 创建一个缓存数组while (true) {int read = inputStream.read(bytes);if (read == -1) break;log.info("客户端发送的数据:{}", new String(bytes, 0, read));}} catch (IOException e) {e.printStackTrace();}finally {log.info("客户端断开连接" + socket.getRemoteSocketAddress());if (inputStream != null) inputStream.close();socket.close();}}
}
文件传输
public class FileDemoServer {public static void main(String[] args) throws IOException {File file = new File("file/test01.txt");RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");byte[] bytes = new byte[(int) file.length()];randomAccessFile.read(bytes); // 读取文件// 创建服务端 等待客户端连接发送文件ServerSocket server = new ServerSocket(3333);Socket client = server.accept();OutputStream outputStream = client.getOutputStream();outputStream.write(bytes);outputStream.close();}
}
public class FileDemoClient {public static void main(String[] args) throws IOException {Socket socket = new Socket();socket.connect(new InetSocketAddress(3333));InputStream inputStream = socket.getInputStream();byte[] bytes = new byte[1024];inputStream.read(bytes);System.out.println(new String(bytes));}
}
2.NIO (非阻塞IO)
官方文档:https://docs.oracle.com/javase/8/docs/api/java/nio/package-summary.html
- NIO是面向缓冲区或者面向块编程的
- 核心组成是Selector、Channel、Buffer
1.Channel(通道)
- 作用:双向数据传输管道(支持读和写),替代传统 BIO 的流(Stream)。
- 类型:
SocketChannel
:TCP 客户端通道。ServerSocketChannel
:TCP 服务端监听通道。FileChannel
:文件读写通道。
2. Buffer(缓冲区)
-
作用:数据暂存容器,减少直接操作底层数据源的次数。
-
类型:
ByteBuffer
(最常用)、CharBuffer
、IntBuffer
等。 -
关键方法:
put()
:写入数据。get()
:读取数据。flip()
:切换读写模式。clear()
:清空缓冲区(可复用)。
Buffer类中很重要的几个字段:
/*** 标记位置,默认为-1,表示尚未设置标记*/
private int mark = -1;/*** 当前位置,用于指示当前操作的位置*/
private int position = 0;/*** 限制位置,表示可以操作的最大位置*/
private int limit;/*** 容量,表示最大容量*/
private int capacity;
3. Selector(选择器)
- 作用:单线程监听多个通道的 I/O 事件(如连接、读、写),实现多路复用。
- 核心事件:
OP_ACCEPT
:服务端接收新连接。OP_CONNECT
:客户端完成连接。OP_READ
:数据可读。OP_WRITE
:数据可写。
3. I/O 多路复用
- 特点:通过 Selector/Epoll 监控多个 I/O 事件,当某个通道就绪时通知线程处理。
- 优点:单线程高效管理多个连接,减少线程切换开销。
- 核心组件:
Selector
(Java NIO)epoll
(Linux)kqueue
(BSD)
- 应用场景:高并发服务器(如 Netty、Nginx)。
4.AIO
异步 I/O(Asynchronous I/O,AIO)
- 特点:线程发起 I/O 操作后立即返回,内核负责将数据从内核缓冲区拷贝到用户缓冲区,完成后通知线程。
- 优点:完全非阻塞,无轮询或等待。
- 缺点:实现复杂,依赖操作系统支持(如 Linux AIO 不完善)。
- 应用场景:文件操作或高吞吐场景(如 Java
AsynchronousFileChannel
)。
NIO
1.Buffer(缓冲区)
核心属性:
属性 | 描述 | 初始值 | 约束条件 |
---|---|---|---|
capacity | 缓冲区的总容量(元素个数),创建时确定后不可修改。 | 由 allocate() 或 wrap() 确定 | capacity ≥ 0 |
position | 下一个要读/写的索引位置。初始为 0 ,每读/写一个元素递增 1 。 | 0 | 0 ≤ position ≤ limit |
limit | 第一个不能读/写的索引(即读写操作的终点)。初始等于 capacity ,可动态调整。 | capacity | 0 ≤ limit ≤ capacity |
mark | 标记一个临时位置,后续可通过 reset() 将 position 恢复到此值。默认未标记(-1 )。 |
Buffer类是一个抽象类,有很多子类继承其方法完成特定数据的缓冲操作
Buffer类中包括的方法:
(1)核心方法:
这些方法用于管理缓冲区的核心属性:容量(Capacity)、位置(Position)、限制(Limit)。
方法 | 说明 |
---|---|
int capacity() | 返回缓冲区的总容量,创建后不可修改。 |
int position() | 返回当前读写位置(索引)。 |
Buffer position(int p) | 设置当前读写位置,需满足 0 ≤ p ≤ limit 。 |
int limit() | 返回缓冲区的读写限制(position 不能超过此值)。 |
Buffer limit(int l) | 设置读写限制,需满足 0 ≤ l ≤ capacity 。 |
(2)状态切换
用于在读模式和写模式之间切换缓冲区的状态。
方法 | 说明 |
---|---|
Buffer clear() | 重置缓冲区为写模式:position=0 , limit=capacity ,数据未清除,但可被覆盖。 |
Buffer flip() | 切换为读模式:limit=position , position=0 ,通常在写入数据后调用。 |
Buffer rewind() | 重置 position=0 ,保持 limit 不变,用于重新读取数据。 |
Buffer compact() | (子类实现,如 ByteBuffer )压缩缓冲区,将未读数据移到头部,准备继续写入。 |
(3)读写方法
用于向缓冲区写入数据(put
)或从缓冲区读取数据(get
),具体方法由子类实现。
1. 基本读写方法
方法 | 说明 |
---|---|
ByteBuffer put(byte b) | 写入一个字节,position 递增。 |
ByteBuffer put(byte[] src) | 写入字节数组。 |
byte get() | 读取一个字节,position 递增。 |
ByteBuffer get(byte[] dst) | 读取字节到数组。 |
2.批量读写
方法 | 说明 |
---|---|
Buffer put(Buffer src) | 将另一个缓冲区的数据复制到当前缓冲区。 |
Buffer get(byte[] dst, int offset, int length) | 从缓冲区读取数据到数组的指定位置。 |
(4)标记与重置
用于标记和恢复 position
的位置。
方法 | 说明 |
---|---|
Buffer mark() | 标记当前 position ,后续可通过 reset() 恢复到此位置。 |
Buffer reset() | 将 position 重置到之前标记的位置。 |
(5)工具方法
方法 | 说明 |
---|---|
int remaining() | 返回剩余可操作的元素数量:limit - position 。 |
boolean hasRemaining() | 检查是否还有剩余元素可操作(position < limit )。 |
boolean isReadOnly() | 判断缓冲区是否为只读。 |
boolean isDirect() | (如 ByteBuffer )判断是否是直接内存(堆外内存)缓冲区。 |
(6)视图与复制
用于创建缓冲区的视图或副本,共享底层数据但独立维护属性。
方法 | 说明 |
---|---|
Buffer duplicate() | 创建缓冲区的副本,共享数据但独立维护 position 、limit 等属性。 |
Buffer slice() | 创建当前缓冲区的一个子视图,范围从 position 到 limit 。 |
2.Channel
在 Java NIO 中,Channel(通道) 是用于在数据源(如文件、网络套接字)和缓冲区(Buffer
)之间高效传输数据的抽象。它与传统 I/O 的流(InputStream
/OutputStream
)类似,但具有更强大的功能,如支持非阻塞模式、双向读写(部分实现)以及内存映射文件操作。
Channel在Java中是一个接口
public interface Channel extends Closeable {public boolean isOpen();public void close() throws IOException;}
区别(Channel和Stream):
1.Channel可以同时进行读写,流只能读或写
2.通道可以实现异步读写
3。通道可以写数据到缓冲区,也可以从缓冲区读数据
实现子类:
Channel 子类 | 应用场景 | 关键特性 |
---|---|---|
FileChannel | 文件读写 | 内存映射、零拷贝传输 |
SocketChannel | TCP 客户端通信 | 非阻塞模式、Selector 多路复用 |
ServerSocketChannel | TCP 服务端监听 | 接受客户端连接 |
DatagramChannel | UDP 数据报通信 | 无连接、支持广播 |
Pipe.Source/SinkChannel | 线程间通信 | 单向数据传输 |
AsynchronousFileChannel | 异步文件操作 | 回调或 Future 模式 |
AsynchronousSocketChannel | 异步 TCP 通信 | 非阻塞、高并发支持 |
FileChannel类
用于文件的读写、内存映射及零拷贝传输。
方法 | 说明 |
---|---|
读写操作 | |
int read(ByteBuffer dst) | 从通道读取数据到 ByteBuffer ,返回实际读取的字节数(可能为 0 )。 |
int write(ByteBuffer src) | 将 ByteBuffer 中的数据写入通道,返回实际写入的字节数。 |
定位与截断 | |
long position() | 返回当前文件指针的位置。 |
FileChannel position(long newPosition) | 设置文件指针的位置(用于随机读写)。 |
FileChannel truncate(long size) | 截断文件到指定大小(丢弃超出部分)。 |
内存映射与零拷贝 | |
MappedByteBuffer map(MapMode mode, long position, long size) | 将文件映射到内存,返回 MappedByteBuffer 。模式包括:READ_ONLY 、READ_WRITE 、PRIVATE 。 |
long transferTo(long position, long count, WritableByteChannel target) | 将文件数据从 position 开始的 count 字节直接传输到目标通道(零拷贝优化)。 |
long transferFrom(ReadableByteChannel src, long position, long count) | 从源通道读取数据,直接写入文件的指定位置(零拷贝优化)。 |
文件锁 | |
FileLock lock() | 获取文件的独占锁(阻塞直到获取成功)。 |
FileLock tryLock() | 尝试非阻塞获取锁,失败返回 null 。 |
SocketChannel类
(TCP 客户端)
方法 | 说明 |
---|---|
boolean connect(SocketAddress remote) | 连接到服务端地址。在非阻塞模式下可能返回 false ,需后续调用 finishConnect() 完成连接。 |
boolean finishConnect() | 完成非阻塞模式下的连接过程(需循环检查)。 |
boolean isConnected() | 检查是否已成功连接到服务端。 |
非阻塞模式 | |
SocketChannel configureBlocking(boolean block) | 设置阻塞模式(true 为阻塞,默认值)。 |
注册到 Selector | |
SelectionKey register(Selector sel, int ops) | 将通道注册到 Selector ,监听指定事件(如 SelectionKey.OP |
ServerSocketChannel类
(TCP 服务端)
方法 | 说明 |
---|---|
ServerSocketChannel bind(SocketAddress local) | 绑定到指定端口(如 new InetSocketAddress(8080) )。 |
SocketChannel accept() | 接受客户端连接请求,返回对应的 SocketChannel (阻塞模式下会等待连接)。 |
非阻塞模式 | |
configureBlocking(boolean block) | 设置非阻塞模式后,accept() 可能立即返回 null 。 |
DatagramChannel类
(UDP 通信)
方法 | 说明 |
---|---|
DatagramChannel bind(SocketAddress local) | 绑定本地端口接收数据(如 new InetSocketAddress(9090) )。 |
int send(ByteBuffer src, SocketAddress target) | 发送数据包到目标地址。 |
SocketAddress receive(ByteBuffer dst) | 接收数据包到 ByteBuffer ,返回发送方的地址。 |
3.Selector
用一个线程处理多个客户端连接,就会用到Selector,可以检测注册的多个通道中是否有事件发生,只有通道有读写事件发生时才会进行读写操作,不必为每个连接都创建线程,减少系统开销
Selector是一个抽象类
核心特点:
- 多路复用:单线程可管理多个
Channel
,减少线程资源消耗。 - 非阻塞模式:需将
Channel
设置为非阻塞模式(configureBlocking(false)
)才能注册到Selector
。 - 事件驱动:通过监听
SelectionKey
标识的事件(如OP_READ
、OP_WRITE
)触发操作。
常用方法
方法 | 说明 |
---|---|
static Selector open() | 创建一个新的 Selector 实例。 |
int select() | 阻塞等待至少一个已注册的 Channel 就绪事件,返回就绪事件的数量。 |
int select(long timeout) | 阻塞最多 timeout 毫秒,超时返回 0 。 |
int selectNow() | 非阻塞检查就绪事件,立即返回当前就绪数量。 |
Set<SelectionKey> selectedKeys() | 返回已就绪的事件集合(需手动清理已处理的 SelectionKey )。 |
Set<SelectionKey> keys() | 返回所有注册到该 Selector 的 SelectionKey 集合(不可直接修改)。 |
Selector wakeup() | 唤醒因 select() 阻塞的线程。 |
void close() | 关闭 Selector 并释放资源。 |
聊天案例
服务端
package com.itcast.nio.chat;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;public class Server {// 存储当前群聊的所有在线用户private static final Set<SocketChannel> clients = new HashSet<SocketChannel>();public static void main(String[] args) throws IOException {// 1.创建服务器通道并绑定端口ServerSocketChannel server = ServerSocketChannel.open();// 2.绑定端口并设置为非阻塞server.bind(new InetSocketAddress(3333));server.configureBlocking(false);System.out.println("服务器启动成功" + server.getLocalAddress());// 3.创建选择器Selector selector = Selector.open();// 4.注册服务器通道到选择器中server.register(selector, SelectionKey.OP_ACCEPT);while (true) {// 5.等待客户端连接selector.select();// 6.获取选择器中所有注册的通道Set<SelectionKey> selectionKeys = selector.selectedKeys();// 7.遍历选择器中的所有通道Iterator<SelectionKey> iterator = selectionKeys.iterator();// 8.处理每个通道while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove(); // 必须移除已处理的 keyif (key.isAcceptable()) {// 9.处理新连接handleAccept(key, selector);}else if (key.isReadable()) {// 10.处理数据读取handleRead(key);}}}}/*** 处理新连接* @param key* @param selector*/public static void handleAccept(SelectionKey key, Selector selector){try {ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); // 获取服务器通道SocketChannel socketChannel = serverChannel.accept(); // 获取客户端通道clients.add(socketChannel);socketChannel.configureBlocking(false); // 设置为非阻塞socketChannel.register(selector, SelectionKey.OP_READ); // 注册到选择器中System.out.println("客户端连接: " + socketChannel.getRemoteAddress());} catch (IOException e) {e.printStackTrace();}}/*** 处理读写* @param key* @throws IOException*/public static void handleRead(SelectionKey key){SocketChannel clientChannel = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(1024);int bytesRead = 0;try {bytesRead = clientChannel.read(buffer);} catch (IOException e) {e.printStackTrace();closeClient(clientChannel);return;}if (bytesRead == -1) {closeClient(clientChannel);return;}if (bytesRead > 0) {buffer.flip();byte[] data = new byte[buffer.remaining()];buffer.get(data);String message = new String(data).trim();try {System.out.println("客户端收到消息 [" + clientChannel.getRemoteAddress() + "]: " + message);} catch (IOException e) {e.printStackTrace();}// 广播给其他客户端broadcastMessage(message, clientChannel);}}private static void broadcastMessage(String message, SocketChannel clientChannel) {if (clients.isEmpty()) return;ByteBuffer byteBuffer = ByteBuffer.wrap((message + '\n').getBytes());ByteBuffer buffer = ByteBuffer.wrap((message + "\n").getBytes());for (SocketChannel client : clients) {if (client != clientChannel && client.isOpen()) {try {client.write(buffer);buffer.rewind(); // 重置buffer供下次写入} catch (IOException e) {closeClient(client);}}}}private static void closeClient(SocketChannel clientChannel) {clients.remove(clientChannel);try {System.out.println("客户端断开连接: " + clientChannel.getRemoteAddress());clientChannel.close();} catch (IOException e) {e.printStackTrace();}}
}
客户端
package com.itcast.nio.chat;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;public class Client {public static void main(String[] args) throws IOException, InterruptedException {SocketChannel client = SocketChannel.open();client.configureBlocking(false);client.connect(new InetSocketAddress(3333));System.out.println("连接服务器...");// 等待连接完成while (!client.finishConnect()) {// 可以做一些其他处理或者等待Thread.sleep(100);System.out.println("等待连接...");}// 开启线程读取服务端推送的消息// 启动读取线程new Thread(() -> {ByteBuffer buffer = ByteBuffer.allocate(1024);while (true) {try {// 从缓冲区读取数据int bytesRead = client.read(buffer);if (bytesRead > 0) {buffer.flip();System.out.println("[群消息] " + new String(buffer.array(), 0, bytesRead));buffer.clear();}} catch (IOException e) {break;}}}).start();// 循环发送消息// 控制台输入Scanner scanner = new Scanner(System.in);while (true) {String msg = scanner.nextLine();// 发送给服务器端client.write(ByteBuffer.wrap(msg.getBytes()));if ("exit".equals(msg)) {break;}}}
}
零拷贝
什么是零拷贝?
零拷贝(Zero-copy)技术指在计算机执行操作时,CPU 不需要先将数据从⼀个内存区域复制到另⼀个内存区域,从⽽可以减少上下⽂切换以及 CPU 的拷贝时间。它的作用是在数据从网络设备到⽤户程序空间传递的过程中,减少数据拷贝次数,减少系统调用,实现 CPU 的零参与,彻底消除 CPU 在这方面的负载。
实现零拷贝用到的最主要技术是 DMA 数据传输技术和内存区域映射技术。
零拷贝机制可以减少数据在内核缓冲区和⽤户进程缓冲区之间反复的 I/O 拷贝操作。零拷贝机制可以减少用户进程地址空间和内核地址空间之间因为上下⽂切换⽽带来的 CPU 开销。在 Java 程序中,常⽤的零拷贝有 mmap(内存映射)和 sendFile。
源自:https://www.cnblogs.com/liconglong/p/15211413.html
传统 I/O 的数据拷贝流程
4 次上下文切换 + 4 次数据拷贝(其中 2 次由 CPU 参与),效率较低。
在传统 I/O 操作中(例如从文件读取数据并发送到网络),数据需要经历多次拷贝和上下文切换:
- 磁盘 → 内核缓冲区:数据从磁盘读取到内核空间的缓冲区(通过 DMA 技术)。
- 内核缓冲区 → 用户缓冲区:数据从内核空间拷贝到用户空间的应用程序缓冲区(需要 CPU 参与)。
- 用户缓冲区 → Socket 缓冲区:应用程序将数据从用户缓冲区拷贝到内核空间的 Socket 缓冲区(再次 CPU 参与)。
- Socket 缓冲区 → 网卡:数据从 Socket 缓冲区发送到网络设备(通过 DMA)。
public class FileDemoServer {public static void main(String[] args) throws IOException {File file = new File("file/test01.txt");RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");byte[] bytes = new byte[(int) file.length()];randomAccessFile.read(bytes); // 读取文件// 创建服务端 等待客户端连接发送文件ServerSocket server = new ServerSocket(3333);Socket client = server.accept();OutputStream outputStream = client.getOutputStream();outputStream.write(bytes);outputStream.close();}
}
public class FileDemoClient {public static void main(String[] args) throws IOException {Socket socket = new Socket();socket.connect(new InetSocketAddress(3333));InputStream inputStream = socket.getInputStream();byte[] bytes = new byte[1024];inputStream.read(bytes);System.out.println(new String(bytes));}
}
零拷贝的两种常见实现
sendfile
系统调用:- 数据直接从文件描述符传输到 Socket 描述符,无需用户态参与。
- 适用于文件到网络的传输(如 HTTP 文件下载)。
- 内存映射文件(
mmap
):- 将文件映射到用户态虚拟内存,用户程序直接操作内存,减少拷贝次数。
- 适用于需要频繁修改文件的场景(如数据库)
mmap 和 sendFile 的区别
1.mmap 适合小数据量读写,sendFile 适合大文件传输。
2.mmap 需要 4 次上下文切换,3 次数据拷贝;sendFile 需要 3 次上下文切换,最少 2 次数据拷贝。
3.sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝到 Socket 缓冲区)。
传统 I/O vs NIO 零拷贝的对比
步骤 | 传统 I/O | NIO + 零拷贝 |
---|---|---|
数据拷贝次数 | 4 次(2 次用户态↔内核态) | 2 次(仅内核态内拷贝) |
CPU 参与次数 | 2 次(用户态↔内核态拷贝) | 0 次(DMA 完成) |
上下文切换次数 | 4 次(读/写各 2 次) | 2 次(系统调用发起和完成) |
典型实现 | FileInputStream.read() + Socket.send() | FileChannel.transferTo() |
DMA和内存映射
技术 | 核心作用 | 应用场景 |
---|---|---|
DMA | 外设与内存直接传输数据,减少 CPU 参与 | 磁盘 I/O、网络通信、GPU 渲染 |
内存区域映射 | 将外设或文件映射到内存,实现零拷贝访问 | 文件高效读写、硬件控制、进程间通信 |
零拷贝性能分析
(服务端向客户端发送大文件)
不使用零拷贝:
OldServer.class
public class OldServer {public static void main(String[] args) throws IOException {ServerSocketChannel server = ServerSocketChannel.open();server.bind(new InetSocketAddress(3333));server.configureBlocking(false);Selector selector = Selector.open();server.register(selector, SelectionKey.OP_ACCEPT);selector.select(); // 阻塞等待客户端连接Set<SelectionKey> selectionKeys = selector.selectedKeys();long t1 = System.currentTimeMillis();for (SelectionKey selectionKey : selectionKeys) {if (selectionKey.isAcceptable()) {handle(selectionKey, selector);}}long t2 = System.currentTimeMillis();System.out.println("耗时:" + (t2 - t1));}/*** 发送文件* @param key* @param selector*/public static void handle(SelectionKey key, Selector selector) throws IOException {File file = new File("file/protoc-3.6.1-win32.zip");ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel client = server.accept();// 注册client.configureBlocking(false);client.register(selector, SelectionKey.OP_READ);// 获取文件输入流FileInputStream fileInputStream = new FileInputStream(file);while (true) {if (fileInputStream.available() == 0) {break;}byte[] bytes = new byte[4096];fileInputStream.read(bytes);int sum = client.write(ByteBuffer.wrap(bytes));System.out.println("发送了:" + sum + "字节");}fileInputStream.close();client.close();}
}
OldClient.class:
public class OldClient {public static void main(String[] args) throws IOException, InterruptedException {SocketChannel client = SocketChannel.open();client.configureBlocking(false);client.connect(new InetSocketAddress(3333));System.out.println("连接服务器...");while (!client.finishConnect()) {// 可以做一些其他处理或者等待Thread.sleep(100);System.out.println("等待连接完成...");}System.out.println("连接完成...");// 从服务器读取数据long t1 = System.currentTimeMillis();while (true) {byte[] bytes = new byte[4096];ByteBuffer buffer = ByteBuffer.wrap(bytes);int read = client.read(buffer);if (read == -1) break;}long t2 = System.currentTimeMillis();System.out.println("耗时:" + (t2 - t1));}
}
使用零拷贝:
NewServer.class:
public class NewServer {public static void main(String[] args) throws IOException {ServerSocketChannel server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(3333));Selector selector = Selector.open();server.register(selector, SelectionKey.OP_ACCEPT);selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();long t1 = System.currentTimeMillis();for (SelectionKey selectionKey : selectionKeys) {if (selectionKey.isAcceptable()) {handle(selectionKey, selector);}}long t2 = System.currentTimeMillis();System.out.println("耗时:" + (t2 - t1));}public static void handle(SelectionKey key, Selector selector) throws IOException {ServerSocketChannel server = (ServerSocketChannel) key.channel();// 1.获得客户端SocketChannel client = server.accept();client.configureBlocking(false);// 2.注册client.register(selector, SelectionKey.OP_READ);// 3.获得文件输出流String filename = "file/protoc-3.6.1-win32.zip";FileChannel channel = new FileInputStream(filename).getChannel();// 4.使用transferTo实现零拷贝传输long sum = channel.transferTo(0, channel.size(), client);System.out.println("发送了" + sum + "字节");}
}
NewClient.class:
public class NewClient {public static void main(String[] args){SocketChannel client = null;try {client = SocketChannel.open();client.configureBlocking(false);client.connect(new InetSocketAddress(3333));System.out.println("连接服务器...");if (!client.finishConnect()) {// 等待连接完成Thread.sleep(100);System.out.println("正在连接中。。。");}ByteBuffer buffer = ByteBuffer.allocate(4096);while (true) {int read = 0;try {read = client.read(buffer);} catch (IOException e) {client.close();}if (read == -1) {System.out.println("服务端已关闭连接");break;}if (read > 0) {System.out.println("收到服务端的消息:" + new String(buffer.array(), 0, read));buffer.clear(); // 清空 buffer 以便下次读取} else {Thread.sleep(100); // 等待数据到达}}client.close();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("连接已关闭");}
}
性能比较:
很显然零拷贝性能要高很多
Reactor模式
什么是Reactor模式?
Reactor 模式是一种事件驱动模型,核心思想是 通过一个或多个线程监听 I/O 事件(如连接、读写),并将事件分发给对应的处理器。常见的 Reactor 变体包括:
- 单 Reactor 单线程:所有操作(连接、I/O)由一个线程完成,简单但性能受限。
- 单 Reactor 多线程:主线程处理连接,I/O 操作交给线程池,但主线程可能成为瓶颈。
- 主从 Reactor 多线程:主 Reactor 处理连接,子 Reactor 处理 I/O,Netty 默认采用此模式。
理解:
reactor可以认为是一种设计模式,用于处理客户端的事件,主要是通过select和dispatch操作来监听事件发生和处理事件
主要逻辑:
1.服务器注册到selector中,为其连接事件分配一个Handler(其实是一个Runnable对象, 名称可以定义为Acceptor)
2.Reactor监听事件发生,当事件发生时,会通过dispatch将时间分发给具体的处理器
3.dispatch通过SelectionKey(可以认为是selector实例对象中的事件id)的attachment来获取具体的Handler
4.执行具体的Handler逻辑
单Reactor单线程
键值存储服务器:
SingleThreadReactorKVStore.class:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;/*** 单线程 Reactor 模式的键值存储服务器*/
public class SingleThreadReactorKVStore {private final Selector selector;private final ServerSocketChannel serverChannel;private final Map<String, String> store = new HashMap<>(); // 内存键值存储/*** 构造函数,初始化服务器通道和选择器,并监听指定端口** @param port 服务器监听的端口号* @throws IOException 如果初始化通道或选择器时发生I/O错误*/public SingleThreadReactorKVStore(int port) throws IOException {selector = Selector.open();serverChannel = ServerSocketChannel.open();serverChannel.socket().bind(new InetSocketAddress(port));serverChannel.configureBlocking(false);// 注册 Accept 事件到 ReactorserverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor());}/*** 启动服务器,进入事件循环*/public void start() {System.out.println("Reactor thread: " + Thread.currentThread().getName());try {while (!Thread.interrupted()) {selector.select(); // 阻塞等待事件Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while (it.hasNext()) {SelectionKey key = it.next();dispatch(key); // 根据事件类型分发事件到具体的处理器it.remove();}}} catch (IOException e) {e.printStackTrace();} finally {try {serverChannel.close();selector.close();} catch (IOException e) {e.printStackTrace();}}}/*** 分发事件到对应的处理器** @param key 事件的 SelectionKey 对象*/private void dispatch(SelectionKey key) {// 获取与键关联的附件,这里附件是一个Runnable对象Runnable handler = (Runnable) key.attachment();// 检查获取到的handler是否为nullif (handler != null) {handler.run(); // 执行 Handler(Acceptor 或 IOHandler)}}/*** 处理 连接事件 的 Handler*/private class Acceptor implements Runnable {@Overridepublic void run() {System.out.println("Acceptor thread: " + Thread.currentThread().getName());try {SocketChannel clientChannel = serverChannel.accept();System.out.println("客户端新连接 : " + clientChannel.getRemoteAddress());if (clientChannel != null) {new IOHandler(clientChannel); // 处理新连接的 I/O}} catch (IOException e) {e.printStackTrace();}}}/*** 处理 I/O 的 Handler*/private class IOHandler implements Runnable {private final SocketChannel channel;private final ByteBuffer buffer = ByteBuffer.allocate(1024);/*** 构造函数,初始化 SocketChannel 并注册读事件到选择器** @param channel 接收到的客户端通道* @throws IOException 如果配置通道时发生I/O错误*/public IOHandler(SocketChannel channel) throws IOException {this.channel = channel;channel.configureBlocking(false);// 注册读事件到 Reactorchannel.register(selector, SelectionKey.OP_READ, this);}@Overridepublic void run() {System.out.println("IOHandler thread: " + Thread.currentThread().getName());try {// 检查通道是否已经关闭,如果关闭则直接返回if (!channel.isOpen()) return;// 清空缓冲区以准备读取新的数据buffer.clear();// 读取数据到缓冲区int bytesRead = channel.read(buffer);// 如果没有更多数据可读,关闭通道并返回if (bytesRead == -1) {channel.close();return;}// 解析请求并执行业务逻辑(单线程处理)buffer.flip();String request = new String(buffer.array(), 0, buffer.limit()).trim();String response = processCommand(request);// 返回响应ByteBuffer respBuffer = ByteBuffer.wrap((response + "\n").getBytes());channel.write(respBuffer);} catch (IOException e) {// 如果发生IO异常,尝试关闭通道try {channel.close();} catch (IOException ex) {// 如果关闭通道时发生异常,打印异常信息ex.printStackTrace();}}}/*** 业务逻辑处理(单线程执行)** @param request 客户端请求的字符串* @return 处理结果的字符串*/private String processCommand(String request) {System.out.println("Processing command: " + request);String[] parts = request.split(" ");if (parts.length < 2) return "ERROR: Invalid command";String cmd = parts[0].toUpperCase();String key = parts[1];switch (cmd) {case "SET":if (parts.length < 3) return "ERROR: Missing value";store.put(key, parts[2]);return "OK";case "GET":return store.getOrDefault(key, "(nil)");default:return "ERROR: Unknown command";}}}public static void main(String[] args) throws IOException {SingleThreadReactorKVStore server = new SingleThreadReactorKVStore(3333);server.start(); // 单一线程运行所有逻辑}
}
Client.class:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicBoolean;public class Client {private static final AtomicBoolean isConnected = new AtomicBoolean(true);public static void main(String[] args) throws IOException {SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false); // 非阻塞模式socketChannel.connect(new InetSocketAddress("localhost", 3333));// 等待连接完成while (!socketChannel.finishConnect()) {System.out.println("等待连接完成...");}System.out.println("已连接到服务器");// 监听服务器消息(包括连接关闭)Thread readThread = new Thread(() -> {ByteBuffer buffer = ByteBuffer.allocate(1024);try {while (isConnected.get()) {int bytesRead = socketChannel.read(buffer);if (bytesRead == -1) {// 服务器关闭连接System.out.println("[服务器已关闭连接]");isConnected.set(false);break;} else if (bytesRead > 0) {buffer.flip();System.out.println("[服务器消息] " +new String(buffer.array(), 0, bytesRead));buffer.clear();}}} catch (IOException e) {if (isConnected.get()) {System.err.println("连接异常: " + e.getMessage());isConnected.set(false);}} finally {closeChannel(socketChannel);}});readThread.start();// 发送消息循环try (Scanner scanner = new Scanner(System.in)) {while (isConnected.get()) {String line = scanner.nextLine();if ("exit".equals(line)) break;if (!isConnected.get()) {System.out.println("连接已断开,无法发送消息");break;}ByteBuffer buffer = ByteBuffer.wrap(line.getBytes());try {socketChannel.write(buffer);System.out.println("已发送: " + line);} catch (IOException e) {System.err.println("发送失败: " + e.getMessage());isConnected.set(false);break;}}}// 关闭连接closeChannel(socketChannel);}private static void closeChannel(SocketChannel channel) {try {if (channel != null && channel.isOpen()) {channel.close();System.out.println("连接已关闭");}} catch (IOException e) {e.printStackTrace();}}
}
运行结果:
单 Reactor 多线程
简单HTTP 服务器:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 单 Reactor 多线程的 HTTP 服务器*/
public class SingleReactorMultiThreadServer {private final Selector selector;private final ServerSocketChannel serverChannel;private final ExecutorService businessPool = Executors.newFixedThreadPool(4); // 业务线程池public SingleReactorMultiThreadServer(int port) throws IOException {selector = Selector.open();serverChannel = ServerSocketChannel.open();serverChannel.socket().bind(new InetSocketAddress(port));serverChannel.configureBlocking(false);serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor());}public void start() {System.out.println("Reactor thread: " + Thread.currentThread().getName());try {while (!Thread.interrupted()) {selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while (it.hasNext()) {SelectionKey key = it.next();dispatch(key);it.remove();}}} catch (IOException e) {e.printStackTrace();}}private void dispatch(SelectionKey key) {Runnable handler = (Runnable) key.attachment();if (handler != null) {handler.run();}}/*** 处理新连接*/private class Acceptor implements Runnable {@Overridepublic void run() {try {SocketChannel client = serverChannel.accept();new IOHandler(client); // 注册读事件} catch (IOException e) {e.printStackTrace();}}}/*** 处理 I/O 和业务逻辑*/private class IOHandler implements Runnable {private final SocketChannel channel;private final ByteBuffer buffer = ByteBuffer.allocate(1024);public IOHandler(SocketChannel channel) throws IOException {this.channel = channel;channel.configureBlocking(false);channel.register(selector, SelectionKey.OP_READ, this);}@Overridepublic void run() {try {if (!channel.isOpen()) return;// Reactor 线程处理读事件(非阻塞)buffer.clear();int bytesRead = channel.read(buffer);if (bytesRead == -1) {channel.close();return;}// 解析请求(示例:HTTP GET)buffer.flip();String request = new String(buffer.array(), 0, buffer.limit());if (request.startsWith("GET")) {// 提交耗时任务到线程池businessPool.submit(() -> {try {processRequest(request);} catch (ClosedChannelException e) {e.printStackTrace();}});}} catch (IOException e) {try {channel.close();} catch (IOException ex) {ex.printStackTrace();}}}private void processRequest(String request) throws ClosedChannelException {System.out.println("Business thread: " + Thread.currentThread().getName());try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String response = "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World!";// 传递 channel 到 WriteHandlerchannel.register(selector, SelectionKey.OP_WRITE, new WriteHandler(channel, response));}}/*** 处理 写操作 的 Handler*/private class WriteHandler implements Runnable {private final SocketChannel channel; // 新增成员变量private final String response;public WriteHandler(SocketChannel channel, String response) {this.channel = channel;this.response = response;}@Overridepublic void run() {try {ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());channel.write(buffer);channel.close();} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) throws IOException {SingleReactorMultiThreadServer server = new SingleReactorMultiThreadServer(3333);server.start();}
主从 Reactor 多线程案例
简易Http服务器:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;/*** 主从Reactor模式下的简易Http服务器*/
public class MasterSlaveReactorServer {// 主 Reactor 线程组(单线程)private final Reactor bossReactor;// 从 Reactor 线程组(多线程)private final Reactor[] workerReactors;// 轮询计数器,用于分配连接给 Workerprivate final AtomicInteger workerIndex = new AtomicInteger(0);// 业务线程池(处理耗时任务)private final ExecutorService businessPool = Executors.newFixedThreadPool(4);public MasterSlaveReactorServer(int port, int workerCount) throws IOException {// 初始化主 ReactorbossReactor = new Reactor("Boss");// 初始化从 Reactor 组workerReactors = new Reactor[workerCount];for (int i = 0; i < workerCount; i++) {workerReactors[i] = new Reactor("Worker-" + i);}// 主 Reactor 绑定 ServerSocketChannelServerSocketChannel serverChannel = ServerSocketChannel.open();serverChannel.bind(new InetSocketAddress(port));serverChannel.configureBlocking(false);bossReactor.register(serverChannel, SelectionKey.OP_ACCEPT, new Acceptor(serverChannel));}// 启动主从 Reactorpublic void start() {bossReactor.start();for (Reactor worker : workerReactors) {worker.start();}}// 主 Reactor 的 Acceptor,处理新连接private class Acceptor implements Runnable {private final ServerSocketChannel serverChannel;public Acceptor(ServerSocketChannel serverChannel) {this.serverChannel = serverChannel;}@Overridepublic void run() {System.out.println("主Reactor-thread : " + Thread.currentThread().getName());try {SocketChannel clientChannel = serverChannel.accept();if (clientChannel != null) {// 轮询选择一个 Worker Reactorint index = workerIndex.getAndIncrement() % workerReactors.length;Reactor worker = workerReactors[index];// 将新连接注册到 Worker Reactorworker.registerChannel(clientChannel);}} catch (IOException e) {e.printStackTrace();}}}// Reactor 线程(主和从共用)private class Reactor extends Thread {private final Selector selector;private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();public Reactor(String name) throws IOException {super(name);selector = Selector.open();}// 注册 Channel 到当前 Reactorpublic void registerChannel(SocketChannel channel) {// 异步提交注册任务,避免线程阻塞addTask(() -> {try {channel.configureBlocking(false);SelectionKey key = channel.register(selector, SelectionKey.OP_READ);key.attach(new IOHandler(channel, key));} catch (IOException e) {e.printStackTrace();}});selector.wakeup(); // 唤醒 select() 阻塞}// 添加异步任务(线程安全)public void addTask(Runnable task) {taskQueue.offer(task);}@Overridepublic void run() {System.out.println("从Reactor : " + Thread.currentThread().getName());try {while (!Thread.interrupted()) {selector.select(1000); // 超时 1 秒,避免无法唤醒processSelectedKeys();processPendingTasks();}} catch (IOException e) {e.printStackTrace();}}// 处理已就绪的 SelectionKeyprivate void processSelectedKeys() throws IOException {Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove();if (key.isValid()) {Runnable handler = (Runnable) key.attachment();if (handler != null) {handler.run();}}}}// 处理异步任务队列private void processPendingTasks() {Runnable task;while ((task = taskQueue.poll()) != null) {task.run();}}public void register(ServerSocketChannel serverChannel, int opAccept, Acceptor acceptor) {try {serverChannel.configureBlocking(false);serverChannel.register(selector, opAccept, acceptor);} catch (IOException e) {e.printStackTrace();}}}// I/O 处理器(从 Reactor 线程执行)private class IOHandler implements Runnable {private final SocketChannel channel;private final SelectionKey key;private final ByteBuffer buffer = ByteBuffer.allocate(1024);public IOHandler(SocketChannel channel, SelectionKey key) {this.channel = channel;this.key = key;}@Overridepublic void run() {System.out.println("IOHandler-thread : " + Thread.currentThread().getName());try {if (!channel.isOpen()) return;// 处理读事件if (key.isReadable()) {buffer.clear();int bytesRead = channel.read(buffer);if (bytesRead == -1) {channel.close();return;}// 提交业务逻辑到线程池buffer.flip();String request = new String(buffer.array(), 0, buffer.limit());businessPool.submit(() -> processRequest(request));}// 处理写事件(示例省略,实际需注册 OP_WRITE)} catch (IOException e) {try {channel.close();} catch (IOException ex) {ex.printStackTrace();}}}// 业务处理(在线程池执行)private void processRequest(String request) {// 模拟耗时操作try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello Reactor!";// 写回响应(需切换回 Reactor 线程)key.interestOps(SelectionKey.OP_WRITE);key.attach(new WriteHandler(channel, response));key.selector().wakeup(); // 唤醒 Selector}}// 写处理器(由 Reactor 线程执行)private class WriteHandler implements Runnable {private final String response;private final SocketChannel channel;// 修改构造函数,传入 channelpublic WriteHandler(SocketChannel channel, String response) {this.channel = channel;this.response = response;}@Overridepublic void run() {System.out.println("WriteHandler-thread : " + Thread.currentThread().getName());try {ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());channel.write(buffer);channel.close(); // 短连接示例,关闭连接} catch (IOException e) {e.printStackTrace();}}}public static void main(String[] args) throws IOException {// 启动服务器:1 个主 Reactor,4 个从 ReactorMasterSlaveReactorServer server = new MasterSlaveReactorServer(3333, 4);server.start();System.out.println("服务器已启动,主从 Reactor 模式运行中...");}
}
Netty
介绍
- Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发网络应用程序(例如协议服务器和客户端)。它极大地简化了 TCP 和 UDP 套接字服务器等网络编程。
- 它基于 Java NIO(Non-blocking I/O)技术,简化了网络编程的复杂性,广泛应用于实时通信、游戏服务器、分布式系统等领域(如 Dubbo、RocketMQ 等框架的底层通信)。
官方文档:https://netty.io/4.2/api/index.html
图源:https://img2020.cnblogs.com/blog/1708060/202111/1708060-20211110224700852-1182764791.png
Netty启动流程:
图源:https://cloud.tencent.cn/developer/article/2146079
线程模式
Netty 基于 Reactor 模式设计,主要有三种线程模型:
- 单线程模型:所有 IO 操作由一个线程处理
- 多线程模型:Acceptor 和 IO 处理器分离为不同线程组
- 主从多线程模型:Acceptor 也使用线程池处理
核心组件
ServerBootstrap(服务器启动类)
核心作用
- 用于配置和启动 Netty 服务器。
- 管理两个
EventLoopGroup
:BossGroup(接受连接)和 WorkerGroup(处理 IO)。
关键方法
group(EventLoopGroup bossGroup, EventLoopGroup workerGroup)
:设置主从线程组。channel(Class<? extends ServerChannel> channelClass)
:设置服务器通道类型(如NioServerSocketChannel
)。childHandler(ChannelHandler childHandler)
:设置子通道处理器(客户端连接的 Pipeline)。option(ChannelOption option, T value)
:设置服务器通道选项(如SO_BACKLOG
)。childOption(ChannelOption option, T value)
:设置子通道选项(如SO_KEEPALIVE
)。bind(int port)
:绑定端口并启动服务器。
Bootstrap(客户端启动类)
核心作用
- 用于配置和启动 Netty 客户端。
- 只需要一个
EventLoopGroup
处理所有连接和 IO 操作。
关键方法
group(EventLoopGroup group)
:设置线程组。channel(Class<? extends Channel> channelClass)
:设置客户端通道类型(如NioSocketChannel
)。handler(ChannelHandler handler)
:设置通道处理器(客户端 Pipeline)。option(ChannelOption option, T value)
:设置通道选项(如SO_KEEPALIVE
)。connect(String host, int port)
:连接到远程服务器。
EventLoopGroup 与 EventLoop
- EventLoopGroup:线程池,管理多个
EventLoop
。 - EventLoop:单线程执行器,负责处理 IO 事件(连接、读写)和任务队列。
分类
- NioEventLoopGroup:基于 Java NIO,跨平台。
- EpollEventLoopGroup:基于 Linux epoll,性能更高。
Channel
NioSocketChannel
:客户端 TCP 连接。NioServerSocketChannel
:服务器 TCP 监听。NioDatagramChannel
:UDP 连接。
ChannelPipeline
- 入站(Inbound):数据从网络到应用(如
channelRead()
)。 - 出站(Outbound):数据从应用到网络(如
write()
)。
ChannelHandler
- ChannelInboundHandler:处理入站数据。
- ChannelOutboundHandler:处理出站数据。
关键接口/抽象类:
- ChannelInboundHandlerAdapter:入站处理器基类。
- ChannelOutboundHandlerAdapter:出站处理器基类。
- SimpleChannelInboundHandler:自动释放资源的入站处理器。
案例分析(群聊系统):
服务器:
ChatServer.class
服务器启动类
package com.itcast.netty.chat;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class ChatServer {private final int port;public ChatServer(int port) {this.port = port;}public void run() throws InterruptedException {// 1.创建主从线程组// 1.1 处理客户端连接NioEventLoopGroup bossGroup = new NioEventLoopGroup();// 1.2 处理事件NioEventLoopGroup workGroups = new NioEventLoopGroup(5);try {// 2.创建服务器// 创建并配置Netty服务器ServerBootstrap server = new ServerBootstrap();// 设置服务器的BossGroup和WorkGroupserver.group(bossGroup, workGroups);// 指定服务器的通道类型server.channel(NioServerSocketChannel.class);// 1. 主通道处理器(用于ServerSocketChannel,处理连接建立事件)server.handler(new ChannelInitializer<NioServerSocketChannel>() {@Overrideprotected void initChannel(NioServerSocketChannel ch) {// 配置主通道的处理器链(如日志记录、连接限制等)ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));}});// 2. 子通道处理器(用于SocketChannel,处理读写事件)server.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 配置SocketChannel的管道socketChannel.pipeline().addLast(new StringDecoder(), // 字符串解码器new StringEncoder(), // 字符串编码器new ChatServerHandler(), // 自定义处理器new ChatConnectionServerHandler() // 监听客户端状态);}});// 设置服务器的配置项server.option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);// 3.绑定端口并启动服务ChannelFuture serverChannel = server.bind(port).sync();// 4.等待服务器关闭serverChannel.channel().closeFuture().sync();} finally {workGroups.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) {ChatServer chatServer = new ChatServer(3333);try {chatServer.run();} catch (InterruptedException e) {e.printStackTrace();}}
}
ChatServerHandler.class
处理客户端发来的信息
package com.itcast.netty.chat;import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.net.SocketAddress;
import java.util.HashSet;public class ChatServerHandler extends SimpleChannelInboundHandler<String> {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}/*** 服务器接收的数据* @param channelHandlerContext* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg){SocketAddress socketAddress = channelHandlerContext.channel().remoteAddress();for (Channel channel : ChatConnectionServerHandler.onlineUsers) {if (!channel.remoteAddress().equals(socketAddress)) {channel.writeAndFlush(socketAddress + "说: " + msg);}}}
}
ChatConnectionServerHandler.class
处理客户端连接和断开连接
package com.itcast.netty.chat;import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.HashSet;public class ChatConnectionServerHandler extends ChannelInboundHandlerAdapter {public static HashSet<Channel> onlineUsers = new HashSet<Channel>();/*** 监听信道的活跃状态* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端:" + ctx.channel().remoteAddress() + "上线了");onlineUsers.add(ctx.channel());}/*** 监听信道断开连接* @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端:" + ctx.channel().remoteAddress() + "断开连接");onlineUsers.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
客户端:
ChatClient.class
客户端启动类
package com.itcast.netty.chat;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;public class ChatClient {private final String host;private final int port;public ChatClient(String host, int port) {this.port = port;this.host = host;}public void run() throws InterruptedException {NioEventLoopGroup groups = new NioEventLoopGroup();try {Bootstrap client = new Bootstrap();client.group(groups);client.channel(NioSocketChannel.class);client.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringDecoder(),new StringEncoder(),new ChatClientHandler());}});// 启动客户端ChannelFuture clientChannel = client.connect(host, port).sync();// 读取控制台输入 发送给服务器Scanner scanner = new Scanner(System.in);while (true) {String msg = scanner.nextLine();if ("exit".equals(msg)) {break;}clientChannel.channel().writeAndFlush(msg);}// 等待关闭clientChannel.channel().closeFuture().sync();} finally {groups.shutdownGracefully();}}public static void main(String[] args) {ChatClient client = new ChatClient("localhost", 3333);try {client.run();} catch (InterruptedException e) {e.printStackTrace();}}
}
ChatClientHandler.class
处理服务器发来的消息
package com.itcast.netty.chat;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class ChatClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
运行结果:
Netty源码剖析
EventLoopGroup 和EventLoop
从EventLoopGroup接口的继承关系图可以从看出来这个接口继承了ExecutorService接口,说明是一个线程池
NioEventLoop
-
处理 I/O 多路复用,监听 Channel 上的事件
-
NioEventLoop
是 Netty 基于 Java NIO 机制构建的事件循环实现 -
NioEvent通过组合方式管理线程和任务。
-
NioEventLoop
→SingleThreadEventLoop
→SingleThreadEventExecutor
→AbstractScheduledEventExecutor
。 -
Netty 采用 单线程模型 处理 Channel:
- 每个
EventLoop
绑定一个 专属线程(Thread
)。 - 该线程负责处理该
EventLoop
管理的 所有 Channel 的 IO 操作(如读、写、连接)。 - 所有 Channel 操作必须在其关联的
EventLoop
线程中执行,否则会导致线程安全问题。
- 每个
重要字段
属性分类 | 关键属性 | 作用与优化点 |
---|---|---|
Selector 优化 | selectedKeys | 使用数组替代 HashSet ,提升选择键处理性能 |
任务调度 | taskQueue 、scheduledTaskQueue | 分离普通任务和定时任务,支持异步执行 |
线程控制 | threadId 、wakenUp | 确保任务在正确线程执行,优化 Selector.wakeup() 调用 |
时间管理 | ioRatio 、lastExecutionTime | 平衡 I/O 操作和任务执行时间,避免任务饥饿 |
状态管理 | state 、needsToSelectAgain | 控制事件循环状态,处理取消的 SelectionKey |
1.Selector相关字段
// Java NIO Selector
private Selector selector;
private final SelectorProvider provider;// 优化后的 Selector(通过反射替换为数组实现)
private SelectedSelectionKeySet selectedKeys;// 原生 Selector(未优化版本)
private final Selector unwrappedSelector;
2.任务队列相关字段
taskQueue
:存储通过execute()
提交的普通任务。scheduledTaskQueue
:存储通过schedule()
提交的定时任务。
// 普通任务队列(无界队列)
private final Queue<Runnable> taskQueue;// 定时任务队列(基于优先级堆)
private final PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
实现:
taskQueue
默认使用MpscUnboundedArrayQueue
(多生产者单消费者队列)。scheduledTaskQueue
使用二叉堆实现,按执行时间排序。
3.父级EventLoopGroup
表示当前EventLoop属于哪个EventLoopGroup
// 父级 EventLoopGroup
private final NioEventLoopGroup parent;
4.线程相关
- 确保任务在正确的线程中执行(通过
inEventLoop()
检查)。 - 控制
Selector
的唤醒机制(避免不必要的wakeup()
调用)
// 线程 ID(用于检查是否在 EventLoop 线程中执行)
private volatile int threadId;// 线程唤醒标志
private final AtomicBoolean wakenUp = new AtomicBoolean();// 线程状态(-1: 未启动, 1: 已启动, 2: 关闭中)
private volatile int state = ST_NOT_STARTED;// 最大待处理任务数(默认 Integer.MAX_VALUE)
private final int maxPendingTasks;// 拒绝策略(当任务队列满时的处理方式)
private final RejectedExecutionHandler rejectedExecutionHandler;
5.事件相关
selectStrategy
:决定何时调用select()
、selectNow()
或跳过选择。needsToSelectAgain
:标记是否需要重新执行select()
。
// 选择策略(控制 select() 行为)
private final SelectStrategy selectStrategy;// 待处理的取消键数量
private int cancelledKeys;// 是否需要重新选择
private boolean needsToSelectAgain;
6. 时间与执行控制属性
ioRatio
:控制 I/O 操作与任务执行的时间分配。lastExecutionTime
:用于计算任务执行超时和延迟调度。
// I/O 操作与任务执行的时间比例(默认 50%)
private final int ioRatio;// 最后一次执行任务的时间戳
private long lastExecutionTime;// 任务执行超时控制
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
线程管理
每个 NioEventLoop 绑定一个独立线程:
- 处理注册到
Selector
上的IO 事件(连接、读写等)。 - 执行用户提交的普通任务(通过
execute(task)
)和定时任务(通过schedule(task, delay, unit)
)。 - 调度后续的任务执行(如定时任务)。
NioEventLoop
内部没有线程池,它是一个单线程的事件循环,所有操作都在同一个线程中完成。
NioEventLoop
会将耗时任务通过NioEventLoopGroup
提交到外部的线程池中
重要方法
run方法:
NioEventLoop通过内部线程执行 run()
方法,通过 execute(Runnable)
提交任务
-
无限循环处理 I/O 事件和任务队列。
-
通过
processSelectedKeys()
处理网络 I/O。 -
通过
runAllTasks()
执行提交的Runnable
任务。 -
calculateStrategy
是 Netty 中用于决定 EventLoop 选择策略的核心方法,它平衡了 I/O 事件处理与任务执行的优先级。该方法由SelectStrategy
接口定义,默认实现为DefaultSelectStrategy
/*** NIO事件循环的主处理方法,负责轮询IO事件和执行任务* 这个方法实现了一个无限循环,不断地选择就绪的通道并处理IO事件,同时兼顾任务执行*/
protected void run() {// 记录select操作的连续唤醒次数,用于检测异常唤醒情况int selectCnt = 0;// 主事件循环,持续运行直到线程关闭while(true) {try {// 计算选择策略:-3/-1表示需要阻塞select,-2表示退出循环,其他正数表示就绪通道数int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {// 处理需要阻塞等待IO事件的情况case -3:case -1:// 获取下一个定时任务的截止时间long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = Long.MAX_VALUE;}nextWakeupNanos.set(curDeadlineNanos);try {// 如果没有待处理任务,进行阻塞select操作if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {// 重置唤醒时间,允许其他线程唤醒selectornextWakeupNanos.lazySet(-1L);}break;// 退出事件循环的策略case -2:break outerLoop;}// 增加select计数并重置状态++selectCnt;cancelledKeys = 0;needsToSelectAgain = false;// 根据IO比例配置处理IO事件和任务执行的时间分配int ioRatio = this.ioRatio;boolean ranTasks;// 处理就绪的IO事件if (strategy > 0) {processSelectedKeys();}// 根据IO比例执行任务if (ioRatio == 100) {ranTasks = runAllTasks();} else {long ioStartTime = System.nanoTime();try {// 处理IO事件if (strategy > 0) {processSelectedKeys();}} finally {// 计算IO操作耗时,并按比例执行任务long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);}}// 如果没有执行任务且select操作没有返回就绪通道,检查是否需要重置select计数if (!ranTasks && strategy <= 0) {if (unexpectedSelectorWakeup(selectCnt)) {selectCnt = 0;}}// 记录select操作连续过早返回的情况,可能是JDK选择器实现的问题if (selectCnt > 3 && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);}// 重置计数selectCnt = 0;} catch (IOException e) {// 发生IO异常时重建选择器rebuildSelector0();selectCnt = 0;handleLoopException(e);} catch (CancelledKeyException e) {// 处理已取消键的异常if (logger.isDebugEnabled()) {logger.debug("CancelledKeyException raised by a Selector {} - JDK bug?", selector, e);}} catch (Error e) {// 致命错误直接抛出throw e;} catch (Throwable e) {// 处理其他异常handleLoopException(e);} finally {// 检查是否正在关闭,如果是则关闭所有通道并确认关闭if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}}}
}
select(curDeadlineNanos)
:
deadlineNanos
:下一个定时任务的截止时间Long.MAX_VALUE
表示无定时任务,可永久阻塞deadlineNanos + 995000L
:增加995000 纳秒
(约 1 毫秒)以避免浮点数精度误差。deadlineToDelayNanos()
:计算当前时间到deadlineNanos
的剩余时间(纳秒)
private int select(long deadlineNanos) throws IOException {// 情况1:无截止时间(永久阻塞)if (deadlineNanos == Long.MAX_VALUE) {return this.selector.select(); // 永久阻塞,直到有 I/O 事件就绪} else {// 情况2:有截止时间,计算超时时间long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;// 根据超时时间决定执行方式return timeoutMillis <= 0L ? this.selector.selectNow() // 无超时(立即返回): this.selector.select(timeoutMillis); // 带超时的阻塞}
}
processSelectedKeys
:
private void processSelectedKeys() {if (this.selectedKeys != null) {// 使用优化的数组实现(默认策略)this.processSelectedKeysOptimized();} else {// 回退到 JDK 原生的 HashSet 实现this.processSelectedKeysPlain(this.selector.selectedKeys());}
}
private void processSelectedKeysOptimized() {// 遍历优化后的数组(替代原生 HashSet<SelectionKey>)for (int i = 0; i < this.selectedKeys.size; ++i) {SelectionKey k = this.selectedKeys.keys[i];this.selectedKeys.keys[i] = null; // 清空引用,帮助 GCObject a = k.attachment();if (a instanceof AbstractNioChannel) {// 处理 Channel 相关事件processSelectedKey(k, (AbstractNioChannel) a);} else {// 处理特殊任务(如 NioTask)NioTask<SelectableChannel> task = (NioTask) a;processSelectedKey(k, task);}// 检查是否需要重新选择(如发生 key 取消)if (this.needsToSelectAgain) {this.selectedKeys.reset(i + 1); // 重置数组游标this.selectAgain(); // 重新执行 select()i = -1; // 重新从数组头部开始遍历}}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {if (!selectedKeys.isEmpty()) {Iterator<SelectionKey> i = selectedKeys.iterator();while (true) {SelectionKey k = i.next();Object a = k.attachment();i.remove(); // 处理后立即从集合中移除,避免重复处理if (a instanceof AbstractNioChannel) {// 处理 Channel 相关事件processSelectedKey(k, (AbstractNioChannel) a);} else {// 处理特殊任务(如 NioTask)NioTask<SelectableChannel> task = (NioTask) a;processSelectedKey(k, task);}// 检查是否需要重新选择if (!i.hasNext()) {break; // 遍历完成,退出循环}if (this.needsToSelectAgain) {selectAgain(); // 重新执行 select()selectedKeys = this.selector.selectedKeys(); // 获取新的就绪 keysif (selectedKeys.isEmpty()) {break; // 没有新的就绪事件,退出循环}i = selectedKeys.iterator(); // 重置迭代器}}}
}
runAllTasks(long timeoutNanos)
:
Netty 使用优先级队列(PriorityQueue)**管理定时任务,队列中的任务按执行时间排序。当调用 runAllTasks()
时,会先将**已到期的任务从定时队列提取到普通任务队列
protected boolean runAllTasks(long timeoutNanos) {// 步骤1:将定时任务队列中已到期的任务转移到普通任务队列this.fetchFromScheduledTaskQueue();// 步骤2:获取第一个任务Runnable task = this.pollTask();if (task == null) {this.afterRunningAllTasks(); // 无任务时执行收尾工作return false;}// 步骤3:计算任务执行的截止时间long deadline = timeoutNanos > 0L ? this.getCurrentTimeNanos() + timeoutNanos : 0L;long runTasks = 0L; // 记录已执行的任务数// 步骤4:循环执行任务,直到超时或队列为空long lastExecutionTime;while(true) {safeExecute(task); // 安全执行任务++runTasks;// 每执行64个任务检查一次超时(批量执行优化)if ((runTasks & 63L) == 0L) {lastExecutionTime = this.getCurrentTimeNanos();if (lastExecutionTime >= deadline) {break; // 超时,退出循环}}// 获取下一个任务task = this.pollTask();if (task == null) {lastExecutionTime = this.getCurrentTimeNanos();break; // 队列为空,退出循环}}// 步骤5:执行收尾工作并记录执行时间this.afterRunningAllTasks();this.lastExecutionTime = lastExecutionTime;return true;
}
NioEventLoopGroup
参考链接:https://www.cnblogs.com/ZhuChangwu/p/11192219.html
NioEventLoopGroup 是 Netty 框架中处理网络 IO 操作的核心组件,它管理一组 NioEventLoop 实例,负责注册 Channel、处理 IO 事件和执行任务
重要字段
1. 线程池相关字段
private final EventExecutor[] children; // 管理的 EventLoop 数组
private final Set<EventExecutor> readonlyChildren; // 只读视图
private final EventExecutorChooserFactory.EventExecutorChooser chooser; // 线程选择器
- children:存储所有 NioEventLoop 实例,负责实际的 IO 操作和任务执行。
- chooser:负责从 children 中选择一个 EventLoop,实现负载均衡。
2. 线程配置字段
private final int nThreads; // 线程数
private final Executor executor; // 任务执行器
private final EventExecutorGroup parent; // 父级 EventLoopGroup
- nThreads:指定线程池大小,默认是 CPU 核心数的两倍(
Runtime.getRuntime().availableProcessors() * 2
)。 - executor:实际执行任务的线程池,默认使用
ThreadPerTaskExecutor
。
3. Selector 配置字段
private final SelectorProvider provider; // JDK Selector 提供者
private final SelectStrategyFactory selectStrategyFactory; // 选择策略工厂
- provider:创建 JDK NIO Selector 的工厂,默认使用系统默认实现。
- selectStrategyFactory:创建选择策略,控制 EventLoop 的 select 行为。
4. 拒绝策略和任务队列字段
private final RejectedExecutionHandler rejectedExecutionHandler; // 任务拒绝策略
private final EventLoopTaskQueueFactory taskQueueFactory; // 任务队列工厂
- rejectedExecutionHandler:当任务队列已满时的拒绝策略,默认使用
AbortPolicy
。
重要方法
1. 构造方法
public NioEventLoopGroup()
public NioEventLoopGroup(int nThreads)
public NioEventLoopGroup(int nThreads, Executor executor)
public NioEventLoopGroup(int nThreads, Executor executor, SelectorProvider provider)
- 初始化 EventLoopGroup,指定线程数、执行器和 Selector 提供者等参数。
2. 线程选择方法
public EventExecutor next()
- 从 children 数组中选择一个 EventLoop,通过
chooser
实现负载均衡。
3. Channel 注册方法
public ChannelFuture register(Channel channel)
- 将 Channel 注册到一个 EventLoop 的 Selector 上,返回异步注册结果。
4. 优雅关闭方法
public Future<?> shutdownGracefully()
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
- 优雅关闭线程池,允许在指定时间内完成未执行的任务。
5. 任务提交方法
public <T> Future<T> submit(Callable<T> task)
public Future<?> execute(Runnable task)
- 向 EventLoop 提交任务,由线程池异步执行。
6. 资源清理方法
protected void cleanup()
- 清理资源,关闭所有 EventLoop 和 Selector。
工作流程
- 初始化阶段:
- 创建并启动指定数量的 NioEventLoop 线程。
- 初始化线程选择器
chooser
,用于负载均衡。
- Channel 注册阶段:
- 调用
register(Channel)
方法将 Channel 注册到一个 EventLoop。 - EventLoop 将 Channel 注册到其管理的 Selector 上。
- 调用
- IO 事件处理阶段:
- EventLoop 不断循环调用
Selector.select()
方法检测就绪事件。 - 处理就绪的 IO 事件(读 / 写),并执行相应的 ChannelHandler。
- EventLoop 不断循环调用
- 任务执行阶段:
- 通过
execute()
或submit()
方法提交的任务在 EventLoop 线程中执行。 - 定时任务由 ScheduledExecutorService 管理和执行。
- 通过
- 关闭阶段:
- 调用
shutdownGracefully()
方法优雅关闭线程池。 - 释放所有资源,包括 Selector 和线程。
- 调用
初始化逻辑
/*** 初始化多线程事件执行器组* * @param nThreads 线程数量,必须为正数* @param executor 用于执行任务的基础线程池,若为null则创建默认实现* @param chooserFactory 执行器选择工厂,用于从多个执行器中选择一个处理任务* @param args 创建子执行器时传递的可选参数*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {// 原子计数器,记录已终止的子执行器数量this.terminatedChildren = new AtomicInteger();// 异步终止承诺,用于通知整个执行器组已完全终止this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);// 校验线程数量为正数ObjectUtil.checkPositive(nThreads, "nThreads");// 若未提供基础线程池,则创建默认实现(为每个任务创建新线程)if (executor == null) {executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());}// 创建指定数量的子执行器数组this.children = new EventExecutor[nThreads];// 循环创建并初始化所有子执行器for(int i = 0; i < nThreads; ++i) {boolean success = false;boolean inTryBlock = false;try {inTryBlock = true;// 创建子执行器实例(由子类实现具体逻辑)this.children[i] = this.newChild((Executor)executor, args);success = true;inTryBlock = false;} catch (Exception e) {// 若创建失败,抛出异常并终止已创建的子执行器throw new IllegalStateException("failed to create a child event loop", e);} finally {// 若在try块中且创建失败,优雅关闭已创建的子执行器if (inTryBlock && !success) {// 发起优雅关闭请求for(int j = 0; j < i; ++j) {this.children[j].shutdownGracefully();}// 等待所有已创建的子执行器完全终止for(int j = 0; j < i; ++j) {EventExecutor e = this.children[j];try {// 等待终止(超时时间设为极大值,等效于无限等待)while(!e.isTerminated()) {e.awaitTermination(2147483647L, TimeUnit.SECONDS);}} catch (InterruptedException ie) {// 恢复中断状态并退出循环Thread.currentThread().interrupt();break;}}}}// 若创建失败(finally块外的双重检查),再次确保资源释放if (!success) {// 与finally块中的逻辑相同,确保资源释放for(int j = 0; j < i; ++j) {this.children[j].shutdownGracefully();}for(int j = 0; j < i; ++j) {EventExecutor e = this.children[j];try {while(!e.isTerminated()) {e.awaitTermination(2147483647L, TimeUnit.SECONDS);}} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}}// 创建执行器选择器,用于从多个子执行器中选择一个处理任务this.chooser = chooserFactory.newChooser(this.children);// 创建终止监听器,当所有子执行器终止时触发FutureListener<Object> terminationListener = new FutureListener<Object>() {public void operationComplete(Future<Object> future) throws Exception {// 原子增加已终止子执行器计数,当所有子执行器终止时,标记整个组终止完成if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);}}};// 为每个子执行器注册终止监听器EventExecutor[] executors = this.children;for(int i = 0; i < executors.length; ++i) {EventExecutor e = executors[i];e.terminationFuture().addListener(terminationListener);}// 创建子执行器的只读集合视图,防止外部修改Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);Collections.addAll(childrenSet, this.children);this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
这个方法主要是:
1.初始化EventLoop数组
根据传递进来的线程数,创建EventLoop
,将EventLoop
存储在一个EventExecutor[]
数组(即children
)中
this.children[i] = this.newChild((Executor)executor, args);
这个newChild
方法调用的是MultithreadEventExecutorGroup
里面的newChild
方法返回的是一个EventExecutor
,EventLoop
接口继承了EventExecutor
,所以这个EventExecutor
其实是一个EventLoop
对象
3.初始化选择器chooser:
这个chooser
是一个EventExecutorChooserFactory.EventExecutorChooser
对象,这个对象提供了一个next
方法,返回的是EventExecutor
对象,所以这个选择器用于从EventLoopGroup
中选取一个EventLoop
this.chooser = chooserFactory.newChooser(this.children);
核心逻辑
当NioEventLoopGroup
接收客户端的连接时,会先分配一个NioEventLoop
给当前的客户端,当前这个线程池会将客户端channel
传递给NioEventLoop
,这个NioEventLoop
会将调用SingleThreadEventLoop
的register
方法将当前的客户端注册到当前的NIOEventLoop
中
SingleThreadEventLoop.register
:
参数检查:确保
promise
不为空。获取 Unsafe 对象:
promise.channel().unsafe()
获取 Channel 的内部 Unsafe 实现。
- Unsafe:是 Netty 内部使用的接口,提供了底层操作的能力,如注册、绑定、读写等。
调用 Unsafe.register ()
- 将当前
NioEventLoopGroup
中的一个 EventLoop(通过next()
方法选择)传递给 Unsafe。- Unsafe 实现会负责实际的注册操作。
返回 Promise:返回原始的 promise 对象,用于异步获取注册结果。
// ChannelPromise promise:一个异步操作的承诺对象,用于跟踪注册操作的完成状态和结果
public ChannelFuture register(ChannelPromise promise) {// 检查 promise 不为空ObjectUtil.checkNotNull(promise, "promise");// 获取 Channel 的 Unsafe 对象并调用其 register 方法promise.channel().unsafe().register(this, promise);return promise;
}
然后这个方法最终会调用AbstractChannel.AbstractUnsafe.register
(处理具体的注册逻辑)
/*** 将 Channel 注册到指定的 EventLoop 上* 此方法是 Channel 注册流程的核心入口,负责初始化 Channel 与 EventLoop 的关联* 并确保注册操作在线程安全的环境中执行* * @param eventLoop 目标 EventLoop,Channel 将注册到该 EventLoop 管理的 Selector 上* @param promise 用于异步通知注册结果的 Promise 对象*/
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 检查参数有效性ObjectUtil.checkNotNull(eventLoop, "eventLoop");// 确保 Channel 尚未注册if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}// 验证 EventLoop 类型兼容性(例如 NioEventLoop 只能处理 NIO 类型的 Channel)if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}// 将 Channel 与指定的 EventLoop 关联// 这一步确保后续所有操作都在同一个 EventLoop 线程中执行AbstractChannel.this.eventLoop = eventLoop;// 检查当前线程是否是目标 EventLoop 的线程// Netty 要求所有 Channel 操作必须在其关联的 EventLoop 线程中执行if (eventLoop.inEventLoop()) {// 如果当前线程是 EventLoop 线程,直接执行注册逻辑register0(promise);} else {// 如果不是 EventLoop 线程,则将注册任务封装为 Runnable 提交到 EventLoop// 确保注册操作在 EventLoop 线程中执行,保证线程安全try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 处理任务提交失败的情况(例如 EventLoop 已关闭)logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t);// 强制关闭 ChannelcloseForcibly();AbstractChannel.this.closeFuture.setClosed();// 标记 Promise 为失败状态safeSetFailure(promise, t);}}
}/*** 执行实际的注册操作(由 register() 方法调用)* 此方法会完成 JDK NIO Channel 到 Selector 的注册* 并触发 ChannelPipeline 中的注册完成事件*/
private void register0(ChannelPromise promise) {try {// 标记首次注册状态boolean firstRegistration = neverRegistered;// 执行 JDK NIO 底层注册操作// 将 Channel 注册到 EventLoop 的 Selector 上,初始不监听任何事件(ops=0)doRegister();// 更新注册状态neverRegistered = false;registered = true;// 触发 ChannelPipeline 中所有添加的 ChannelHandler 的初始化逻辑// 确保所有 handlerAdded() 方法被调用pipeline.invokeHandlerAddedIfNeeded();// 安全地标记 Promise 为成功状态safeSetSuccess(promise);// 触发 ChannelRegistered 事件,通知所有 Handler 通道已注册pipeline.fireChannelRegistered();// 如果 Channel 已处于活跃状态(例如客户端 Channel 已连接)// 则触发 ChannelActive 事件if (isActive()) {if (firstRegistration) {// 首次注册且已激活,触发 ChannelActive 事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {// 非首次注册但配置了自动读取,开始读取数据beginRead();}}} catch (Throwable t) {// 处理注册过程中发生的异常// 包括 JDK NIO 注册失败、Handler 初始化异常等// 强制关闭 ChannelcloseForcibly();// 标记关闭状态closeFuture.setClosed();// 安全地标记 Promise 为失败状态safeSetFailure(promise, t);}
}
假设:
EventLoop
绑定的线程是Thread-1
。- 主线程(
main
)调用channel.register()
。
执行流程:
- 主线程进入
register()
方法。 - 检查当前线程(
main
)是否是EventLoop
线程(Thread-1
),发现不是。 - 将
register0()
封装成任务,提交到EventLoop
的任务队列。 Thread-1
从队列中取出任务并执行register0()
。
注册完成后会触发pipeline.fireChannelActive();会调用AbstractChannelHandlerContext.invokeChannelActive
/*** 静态工具方法:确保在正确的事件循环中调用 ChannelActive 事件* 根据执行器所在线程决定是直接调用还是提交异步任务执行* * 执行策略:* - 如果当前线程是处理器的事件循环线程,则直接同步调用* - 否则将调用封装为任务提交到事件循环线程执行* * @param next 要调用的下一个 ChannelHandlerContext* @see EventExecutor#inEventLoop()*/
static void invokeChannelActive(final AbstractChannelHandlerContext next) {// 获取处理器关联的事件执行器EventExecutor executor = next.executor();// 判断当前线程是否为事件循环线程if (executor.inEventLoop()) {// 直接在当前线程调用,避免线程切换开销next.invokeChannelActive();} else {// 提交到事件循环线程异步执行executor.execute(new Runnable() {public void run() {next.invokeChannelActive();}});}
}/*** 触发当前处理器的 ChannelActive 事件处理逻辑* 根据处理器类型调用对应实现,并处理可能的异常* * 执行流程:* 1. 判断当前处理器是否应该处理该事件* 2. 根据处理器类型选择具体的调用方式:* - 头部处理器:直接调用 HeadContext 的实现* - 双向处理器:调用 ChannelDuplexHandler 的实现* - 默认情况:调用 ChannelInboundHandler 的实现* 3. 捕获并处理所有异常,确保事件流不中断* 4. 若当前处理器不处理,则将事件传播到下一个处理器* * @see ChannelHandler#channelActive(ChannelHandlerContext)*/
private void invokeChannelActive() {// 判断是否应由当前处理器处理事件if (this.invokeHandler()) {try {// 获取当前处理器实例ChannelHandler handler = this.handler();// 获取pipeline头部处理器引用DefaultChannelPipeline.HeadContext headContext = this.pipeline.head;// 根据处理器类型选择调用方式if (handler == headContext) {// 头部处理器特殊处理(如网络IO操作)headContext.channelActive(this);} else if (handler instanceof ChannelDuplexHandler) {// 双向处理器支持完整的入站/出站操作((ChannelDuplexHandler)handler).channelActive(this);} else {// 默认作为入站处理器处理((ChannelInboundHandler)handler).channelActive(this);}} catch (Throwable var3) {// 异常处理机制确保事件处理流程不中断this.invokeExceptionCaught(var3);}} else {// 将事件传播到下一个处理器this.fireChannelActive();}
}
头部处理器:
是Netty 底层 I/O 操作的入口和出口,负责处理与物理通道(如 TCP 连接)直接相关的操作,并衔接上层业务处理器、、
AbstractChannel
AbstractChannel
是 Netty 框架中的一个抽象类,它在 Netty 的网络通信中扮演着非常重要的角色,是所有具体通道实现类的基类
AbstractChannel
类是 Netty 框架中实现网络通信的基础,它提供了通道的基本抽象和功能,为上层应用提供了一个统一的、高效的网络编程接口。通过继承 AbstractChannel
,Netty 实现了多种不同类型的通道(如 NioServerSocketChannel
、NioSocketChannel
等
属性字段
字段名称 | 类型 | 核心作用 | 设计要点 / 场景 |
---|---|---|---|
parent | Channel | 父通道引用(如服务器通道对应的客户端通道) | 用于层级管理,父通道关闭时可级联关闭子通道 |
id | ChannelId | 通道全局唯一标识符 | 基于 UUID 生成,用于日志、监控标识特定连接 |
unsafe | Channel.Unsafe | 底层传输操作接口(如 Java NIO 的 Selector 操作) | 解耦 Netty 抽象层与具体 IO 模型(NIO/Epoll),提供 read() 、write() 等底层方法 |
pipeline | DefaultChannelPipeline | 处理器链(ChannelHandler 链表) | 处理入站 / 出站事件,支持动态添加 / 删除处理器,实现业务逻辑与 IO 的解耦 |
closeFuture | CloseFuture | 通道关闭事件的异步通知机制 | 基于 Future-Listener 模式,支持非阻塞式关闭回调(如资源释放) |
eventLoop | EventLoop | 关联的事件循环线程(NioEventLoop 等) | 通道的所有 IO 操作必须在此线程执行,确保线程安全 |
registered | boolean | 标识通道是否已注册到 Selector | true 表示可开始监听 IO 事件(如 OP_READ ),由 register() 方法更新 |
localAddress | SocketAddress | 本地绑定地址(如服务器端口) | 绑定端口后设置(如 0.0.0.0:8080 ) |
remoteAddress | SocketAddress | 远程连接地址(如客户端 IP + 端口) | 客户端连接建立后设置(如 192.168.1.1:50000 ) |
unsafeVoidPromise | VoidChannelPromise | 空操作的 ChannelPromise (占位符) | 用于不需要返回结果的操作(如内部清理),避免创建临时对象 |
closeInitiated | boolean | 标识是否已发起关闭流程 | 防止重复关闭,确保关闭逻辑幂等性 |
initialCloseCause | Throwable | 关闭原因(异常信息) |
方法
方法名称 | 参数 | 返回值 | 核心功能 | 设计要点 / 典型场景 |
---|---|---|---|---|
register(EventLoop, Promise) | EventLoop , ChannelPromise | void | 将通道注册到指定 EventLoop 的 Selector 上 | 线程安全设计:若当前线程不是 EventLoop 线程,通过 execute() 提交任务确保单线程执行 |
bind(SocketAddress, Promise) | SocketAddress , ChannelPromise | ChannelFuture | 绑定本地地址(如服务器端口) | 异步操作:返回 ChannelFuture 监听绑定结果,内部调用 doBind() 实现具体逻辑 |
connect(SocketAddress, Promise) | SocketAddress , ChannelPromise | ChannelFuture | 连接远程地址(客户端模式) | 异步操作:支持超时设置,内部调用 doConnect() 实现具体连接逻辑 |
disconnect(Promise) | ChannelPromise | ChannelFuture | 断开连接(客户端模式) | 通常用于主动关闭连接,释放资源,内部调用 doDisconnect() |
close(Promise) | ChannelPromise | ChannelFuture | 关闭通道(优雅关闭) | 触发 pipeline 的 channelInactive() 事件,确保所有处理器有机会执行清理逻辑 |
write(Object, Promise) | Object (消息), ChannelPromise | ChannelFuture | 将消息写入通道(异步操作) | 消息从 pipeline 的尾部开始流动,最终调用 unsafe.write() 执行底层写操作 |
flush() | void | Channel | 强制刷新缓冲区,将数据发送到网络 | 通常与 write() 配合使用(如 writeAndFlush() ),触发 pipeline 的 flush() 事件 |
read() | void | Channel | 触发通道读取数据(从网络接收数据) | 调用 pipeline 的 read() 事件,最终调用 unsafe.beginRead() 注册 OP_READ 事件 |
pipeline() | void | ChannelPipeline | 获取通道的 ChannelPipeline (处理器链) | 线程安全:每个通道有独立的 pipeline ,支持动态添加 / 删除处理器 |
config() | void | ChannelConfig | 获取通道配置(如 TCP_NODELAY 、SO_KEEPALIVE 等) | 配置参数通过 ChannelOption 设置,影响底层 Socket 行为 |
localAddress() | void | SocketAddress | 获取本地绑定地址 | 绑定后返回 localAddress 字段值,否则返回 null |
remoteAddress() | void | SocketAddress | 获取远程连接地址 | 连接建立后返回 remoteAddress 字段值,否则返回 null |
isActive() | void | boolean | 判断通道是否处于活跃状态(已连接或已绑定) | 通常用于检查通道是否可进行读写操作 |
Netty服务器启动源码剖析
// 创建并配置Netty服务器
ServerBootstrap server = new ServerBootstrap();
// 设置服务器的BossGroup和WorkGroup
server.group(bossGroup, workGroups);
// 指定服务器的通道类型
server.channel(NioServerSocketChannel.class);
// 2.配置服务器的处理器
// 2.1. 主通道处理器(用于ServerSocketChannel,处理连接建立事件)
server.handler(new ChannelInitializer<NioServerSocketChannel>() {@Overrideprotected void initChannel(NioServerSocketChannel ch) {// 配置主通道的处理器链(如日志记录、连接限制等)ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));}
});
// 2.2. 子通道处理器(用于SocketChannel,处理读写事件)
ChannelInitializer<SocketChannel> childHandlers = new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 配置SocketChannel的管道socketChannel.pipeline().addLast(new StringDecoder(), // 字符串解码器new StringEncoder(), // 字符串编码器new ChatServerHandler(), // 自定义处理器new ChatConnectionServerHandler() // 监听客户端状态);}
};
server.childHandler(childHandlers);
// 设置服务器的配置项
server.option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);// 3.绑定端口并启动服务
ChannelFuture serverChannel = server.bind(port).sync();// 4.等待服务器关闭
serverChannel.channel().closeFuture().sync();
- 创建并初始化
ServerSocketChannel
:ServerBootstrap
根据配置创建一个ServerSocketChannel
,用于监听客户端连接。- 配置
ServerSocketChannel
的一些参数,如SO_BACKLOG
(连接队列长度)等。
- 将
ServerSocketChannel
注册到EventLoop
:ServerSocketChannel
会被注册到EventLoop
的Selector
上,以便监听连接事件。- 注册过程是异步的,通过
ChannelFuture
来通知注册结果。
- 绑定端口:
- 调用
ServerSocketChannel
的bind
方法,将其绑定到指定的端口。 - 底层会调用操作系统的
bind
系统调用,将套接字绑定到指定的 IP 地址和端口。
- 调用
- 启动监听:
- 绑定成功后,
ServerSocketChannel
开始监听客户端连接。 - 此时,
ServerSocketChannel
处于OP_ACCEPT
状态,等待客户端连接。
- 绑定成功后,
- 阻塞等待绑定完成:
sync()
方法会阻塞当前线程,直到绑定操作完成。- 如果绑定成功,
ChannelFuture
的isSuccess()
方法返回true
;如果绑定失败,ChannelFuture
的isSuccess()
方法返回false
,并且可以通过cause()
方法获取失败原因。
- 触发
ChannelActive
事件:- 如果
Channel
在绑定后变为活跃状态(之前不活跃),会异步触发ChannelActive
事件。 ChannelActive
事件会被传播到ChannelPipeline
中的所有ChannelHandler
,以便它们可以执行相应的初始化或处理逻辑。
- 如果
1.初始化EventLoopGroup
server.group(bossGroup, workerGroups);
调用ServerBootStrap
的group
方法指定bossGroup
和workerGroup
workerGroup
会传递给ServerBootStrap
类,指定childGroup
为workerGroup
bossGroup
会传递给AbstractBootstrap
这个类,指定这个类的group
属性为传递的bossGroup
(ServerBootStrap
类继承了AbstractBootstrap
类)
一般情况下:
bossGroup
可以认为是主Reactor
线程池,主要负责处理客户端的连接请求(accept
事件),建立与客户端的连接
workerGroup
可以认为是从Reactor
线程池。主要负责处理已建立连接的 Channel
的读写事件(read/write
事件),包括数据的接收、处理和发送
2.初始化channel
主要是设置AbstractBootstrap
里面的channelFactory
属性设置为对应通道类型的工厂类
server.channel(NioServerSocketChannel.class);
调用的是父类AbstractBootstrap
的channel
方法
public B channel(Class<? extends C> channelClass) {return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory((Class)ObjectUtil.checkNotNull(channelClass, "channelClass"))));
}
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {return this.channelFactory((ChannelFactory)channelFactory);
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {ObjectUtil.checkNotNull(channelFactory, "channelFactory");if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");} else {this.channelFactory = channelFactory;return this.self();}
}
ReflectiveChannelFactory
是 Netty 中用于通过反射创建 Channel 实例的工厂类
ReflectiveChannelFactory
通过channelClass
(当前通道的类型)通过反射拿到对应类(当前通道)的无参构造器
ReflectiveChannelFactory
实现了ChannelFactory
接口,所以它是ChannelFactory
的子类
public ReflectiveChannelFactory(Class<? extends T> clazz) {ObjectUtil.checkNotNull(clazz, "clazz");try {this.constructor = clazz.getConstructor();} catch (NoSuchMethodException var3) {throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", var3);}
}
通过channelFactory
方法将当前类的channelFactory
属性初始化为ReflectiveChannelFactory
对象
public B channelFactory(ChannelFactory<? extends C> channelFactory) {ObjectUtil.checkNotNull(channelFactory, "channelFactory");if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");} else {this.channelFactory = channelFactory;return this.self();}
}
ChannelFactory
接口提供了一个newChannel
方法,这个方法对应的实现类是ReflectiveChannelFactory
,这个方法返回通过反射拿到的构造器创建对应通道类型的实例(即NioServerSocketChannel实例)
public T newChannel() {try {return (Channel)this.constructor.newInstance();} catch (Throwable var2) {throw new ChannelException("Unable to create Channel from class " + this.constructor.getDeclaringClass(), var2);}
}
3.初始化处理器
在 Netty 中,ChannelInitializer
是一个核心组件,用于动态初始化 Channel 的处理器链(ChannelPipeline
)
会在 Channel 注册到EventLoop
后、真正开始处理数据前,动态添加处理器
当 Channel 注册到EventLoop
时,ChannelInitializer
的initChannel()
方法会被触发,会调用AbstractChannel
类的pipeline
方法给当前Channel的ChannelPipeline
对象(即pipeline
)添加处理器链
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {// 配置SocketChannel的管道socketChannel.pipeline().addLast(new StringDecoder(), // 字符串解码器new StringEncoder(), // 字符串编码器new ChatServerHandler(), // 自定义处理器new ChatConnectionServerHandler() // 监听客户端状态);
}
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {ObjectUtil.checkNotNull(handlers, "handlers");ChannelHandler[] var3 = handlers;int var4 = handlers.length;for(int var5 = 0; var5 < var4; ++var5) {ChannelHandler h = var3[var5];if (h == null) {break;}this.addLast(executor, (String)null, h);}return this;
}
初始化主通道处理器(handler())
// 1. 主通道处理器(用于ServerSocketChannel,处理连接建立事件)
server.handler(new ChannelInitializer<NioServerSocketChannel>() {@Overrideprotected void initChannel(NioServerSocketChannel ch) {// 配置主通道的处理器链(如日志记录、连接限制等)ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));}
});
调用入口是AbstractBootstrap类
的handler
方法,将AbstractBootstrap类
的handler属性
设置为传入的处理器,主要用于处理bossGroup
public B handler(ChannelHandler handler) {this.handler = (ChannelHandler)ObjectUtil.checkNotNull(handler, "handler");return this.self();
}
初始化子通道处理器(childHandler())
客户端连接服务器时,为客户端通道添加处理器链
// 2. 子通道处理器(用于SocketChannel,处理读写事件)
ChannelInitializer<SocketChannel> childHandlers = new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 配置SocketChannel的管道socketChannel.pipeline().addLast(new StringDecoder(), // 字符串解码器new StringEncoder(), // 字符串编码器new ChatServerHandler(), // 自定义处理器new ChatConnectionServerHandler() // 监听客户端状态);}
};
server.childHandler(childHandlers);
调用的入口是ServerBootStrap
的childHandler
方法,主要是将ServerBootStrap
的子处理器childHandler
设置为传入的处理器
这个子处理器其实就是用于处理workerGroup
public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler = (ChannelHandler)ObjectUtil.checkNotNull(childHandler, "childHandler");return this;
}
4.绑定端口并启动服务
// 绑定端口并启动服务
ChannelFuture serverChannel = server.bind(port).sync();
调用关系图:
// 核心调用链路(简化版)
1. ServerBootstrap.bind(port) // 用户调用入口
2. -> AbstractBootstrap.doBind(localAddress) // 启动器内部逻辑
3. -> Channel.bind(localAddress) // 通道绑定操作
4. -> ChannelPipeline.bind() // 触发 Pipeline 事件
5. -> HeadContext.bind() // Pipeline 头部处理器
6. -> NioMessageUnsafe.bind() // 底层 unsafe 操作
7. -> ServerSocketChannel.bind() // 调用 Java NIO 原生方法
8. -> OS socket.bind() // 操作系统系统调用
9. -> OS socket.listen() // 操作系统系统调用
10. serverChannel.sync() // 阻塞等待绑定完成
入口函数:调用AbstractBootstrap的bind方法
public ChannelFuture bind(int inetPort) {return this.bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {this.validate();return this.doBind((SocketAddress)ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {// 步骤1:初始化Channel并注册到EventLoopfinal ChannelFuture regFuture = this.initAndRegister();final Channel channel = regFuture.channel();// 步骤2:根据注册结果处理绑定if (regFuture.cause() != null) {return regFuture; // 注册失败,直接返回失败结果} else if (regFuture.isDone()) {// 注册已完成,立即执行绑定ChannelPromise promise = channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {// 注册未完成(异步),添加监听器,注册完成后再执行绑定final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();if (cause != null) {promise.setFailure(cause); // 注册失败,传播异常} else {promise.registered();AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); // 注册成功,执行绑定}}});return promise;}
}
4.1.初始化并注册Channel
调用AbstractBootstrap的initAndRegister方法
1.通过channelFactory工厂类,创建通道实例channel
2.通过this.init(channel)初始化当前通道,负责初始化服务器 Channel 的各项参数和处理器链。
3.将 Channel 注册到 EventLoopGroup 中的某个 EventLoop
/*** 初始化 Channel 并将其注册到 EventLoopGroup,为后续的网络通信做准备。* 这是服务器启动流程中的核心步骤,完成后 Channel 开始监听网络事件。* * @return 表示注册操作的异步结果。成功时可通过 Channel 绑定端口,失败时包含具体异常。*/
final ChannelFuture initAndRegister() {Channel channel = null;try {// 1. 通过反射创建 Channel 实例(如 NioServerSocketChannel)// - 此步骤会初始化底层 Java NIO Channel(如 ServerSocketChannel)// - 设置为非阻塞模式并创建对应的 Pipelinechannel = this.channelFactory.newChannel();// 2. 初始化 Channel 配置:// - 设置 TCP 参数(如 SO_BACKLOG、SO_REUSEADDR)// - 添加用户通过 handler() 设置的处理器(如 LoggingHandler)// - 对于 ServerBootstrap,会添加 ServerBootstrapAcceptor 用于处理新连接this.init(channel);} catch (Throwable t) {// 初始化失败时,强制关闭 Channel 并返回失败的 ChannelPromise// 确保资源正确释放,避免内存泄漏if (channel != null) {channel.unsafe().closeForcibly();return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// 若 Channel 创建失败,返回特殊的 FailedChannel 实例return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 3. 将 Channel 注册到 EventLoopGroup 中的某个 EventLoop:// - 对于 ServerBootstrap,config().group() 返回 bossGroup(处理连接接受的线程池)// - 注册后,EventLoop 开始监听 OP_ACCEPT 事件(对于服务器 Channel)// - 注册是异步操作,通过 ChannelFuture 通知结果ChannelFuture regFuture = this.config().group().register(channel);// 4. 注册失败处理:// - 若注册过程中立即发现异常(如 Selector 打开失败)// - 根据 Channel 状态选择优雅关闭或强制关闭if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close(); // 已注册但失败,执行优雅关闭} else {channel.unsafe().closeForcibly(); // 未注册成功,强制关闭}}return regFuture;
}
1.通过channelFactory属性创建对应通道类型的实例(channelFactory这个工厂类是在初始化的时候创建的)
channel = this.channelFactory.newChannel();
2.通过调用ServerBootstrap类的init方法初始化当前通道
this.init(channel);
2.1.配置服务器 Channel:设置选项(如 SO_BACKLOG
)和属性。
2.2.构建 Pipeline:添加主处理器(如 LoggingHandler
)和 ServerBootstrapAcceptor
。
2.3.处理新连接:
ServerBootstrapAcceptor
负责创建子 Channel(SocketChannel
)。- 为子 Channel 配置处理器链(如
HttpServerCodec
)。 - 将子 Channel 注册到
childGroup
的EventLoop
。
void init(Channel channel) {// 1. 设置 Channel 选项(如 TCP 参数)setChannelOptions(channel, this.newOptionsArray(), logger);// 2. 设置 Channel 属性(自定义键值对)setAttributes(channel, this.newAttributesArray());// 3. 获取 Channel 的 Pipeline(处理器链)ChannelPipeline p = channel.pipeline();// 4. 保存 childGroup 和 childHandler 的引用(用于处理客户端连接)final EventLoopGroup currentChildGroup = this.childGroup;final ChannelHandler currentChildHandler = this.childHandler;// 5. 添加一个特殊的 ChannelInitializer 到 Pipelinep.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) {// 5.1 添加用户配置的主处理器(如 LoggingHandler)ChannelHandler handler = ServerBootstrap.this.config.handler();if (handler != null) {pipeline.addLast(handler);}// 5.2 添加 ServerBootstrapAcceptor 到 Pipeline(关键组件)ch.eventLoop().execute(() -> {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs, extensions));});}});// 6. 调用扩展点(如果有)if (!extensions.isEmpty() && channel instanceof ServerChannel) {// 执行扩展逻辑...}
}
ServerBootstrapAcceptor
是一个特殊的 ChannelInboundHandler
,它作为连接管理器存在于 NioServerSocketChannel
的 Pipeline 中,主要负责:
- 接收新连接:当主 Reactor(
bossGroup
)检测到客户端连接请求时,创建对应的NioSocketChannel
。 - 分配 EventLoop:从从 Reactor(
childGroup
)中选择一个EventLoop
分配给新连接。 - 初始化子 Channel:为新连接的
NioSocketChannel
配置处理器链(Pipeline
)和选项。 - 注册到 Selector:将新连接注册到分配的
EventLoop
的Selector
上,开始监听读写事件。
// 5.2 添加 ServerBootstrapAcceptor 到 Pipeline(关键组件)
ch.eventLoop().execute(() -> {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs, extensions));
});
当有新客户端连接时,ServerBootstrapAcceptor
的 channelRead()
方法会被触发
核心功能:
/*** 处理新连接的核心方法。当主 Reactor(bossGroup)接收到客户端连接时,此方法会被触发,* 负责将新连接配置并注册到从 Reactor(workerGroup)。* * @param ctx 上下文对象,用于与 Pipeline 交互* @param msg 新连接的 Channel 对象(如 NioSocketChannel)*/
public void channelRead(ChannelHandlerContext ctx, Object msg) {// 1. 获取新连接的 Channel 对象(代表客户端连接)final Channel child = (Channel) msg;// 2. 配置子 Channel 的 Pipeline:// - 添加用户通过 childHandler() 设置的处理器链(如 HttpServerCodec、业务处理器)// - 每个新连接都会创建独立的 Pipeline 实例,确保线程安全child.pipeline().addLast(new ChannelHandler[]{this.childHandler});// 3. 设置子 Channel 的 TCP 选项:// - 常见选项:TCP_NODELAY(禁用 Nagle 算法)、SO_KEEPALIVE(启用保活机制)// - 这些选项会影响底层 Socket 的行为AbstractBootstrap.setChannelOptions(child, this.childOptions, ServerBootstrap.logger);// 4. 设置子 Channel 的自定义属性:// - 存储与连接相关的元数据(如认证信息、会话ID)// - 示例:channel.attr(AttributeKey.valueOf("sessionId")).set("12345");AbstractBootstrap.setAttributes(child, this.childAttrs);// 5. 执行扩展逻辑(如果有):// - 允许用户在 Channel 初始化后执行自定义逻辑// - 例如:添加额外的处理器、修改 Channel 配置if (!this.extensions.isEmpty()) {Iterator var4 = this.extensions.iterator();while(var4.hasNext()) {ChannelInitializerExtension extension = (ChannelInitializerExtension)var4.next();try {extension.postInitializeServerChildChannel(child);} catch (Exception var8) {ServerBootstrap.logger.warn("Exception thrown from postInitializeServerChildChannel", var8);}}}// 6. 将子 Channel 注册到 workerGroup 的某个 EventLoop:// - 从 workerGroup 中选择一个 EventLoop(线程)// - 注册后,该 EventLoop 将负责处理此连接的所有 IO 操作(读写、编解码等)// - 注册是异步操作,通过 ChannelFuture 监听结果try {this.childGroup.register(child).addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {// 注册失败时,强制关闭连接并记录异常ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause());}}});} catch (Throwable var7) {// 注册过程中发生异常,立即关闭连接forceClose(child, var7);}
}
3. 将 Channel 注册到 EventLoopGroup 中的某个 EventLoop:
ChannelFuture regFuture = this.config().group().register(channel);
是 Netty 服务器启动流程中的核心操作,负责将 Channel 注册到 EventLoop 的 Selector,从而建立事件循环机制。这个操作标志着 Channel 开始真正参与网络 IO 处理。
ChannelFuture
封装异步操作结果:
- 立即返回:调用
register(channel)
后,方法会立即返回一个ChannelFuture
,此时注册操作可能尚未完成。 - 异步完成:Netty 会在后台完成实际的注册操作,完成后通过
ChannelFuture
通知结果。
config().group()
config()
:返回AbstractBootstrapConfig
,包含启动配置信息。group()
:返回EventLoopGroup(线程池):- 对于
ServerBootstrap
:
config().group()
返回bossGroup
,负责接受客户端连接。
示例:ServerBootstrap.group(bossGroup, workerGroup)
。 - 对于
Bootstrap
(客户端):
config().group()
返回唯一的EventLoopGroup
,负责处理所有连接的 IO。
- 对于
这个register
方法最终会调用AbstractChannel
类的register方法(AbstractChannel类是所有具体通道实现类的基类,定义了通道的一些通用属性和方法),返回一个ChannelFuture对象,
SingleThreadEventLoop.class:
public ChannelFuture register(Channel channel) {return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;
}
AbstractChannel.class
/*** 将 Channel 注册到指定的 EventLoop,建立事件循环机制。* 此方法是线程安全的,确保注册操作在 EventLoop 线程中执行。* * @param eventLoop 目标 EventLoop,负责处理 Channel 的所有 IO 操作* @param promise 用于异步通知注册结果的 Promise 对象*/
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 校验参数:确保 EventLoop 不为空ObjectUtil.checkNotNull(eventLoop, "eventLoop");// 状态检查:防止重复注册if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("Channel 已注册到 EventLoop"));return;}// 兼容性检查:确保 EventLoop 类型与 Channel 兼容(如 NioEventLoop 与 NioChannel)if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("不兼容的 EventLoop 类型: " + eventLoop.getClass().getName()));return;}// 绑定 EventLoop:将 Channel 与指定的 EventLoop 永久关联AbstractChannel.this.eventLoop = eventLoop;// 确保注册操作在 EventLoop 线程中执行(关键!)if (eventLoop.inEventLoop()) {// 当前线程已是 EventLoop 线程,直接执行注册register0(promise);} else {// 当前线程不是 EventLoop 线程,提交任务到 EventLoop 的任务队列try {eventLoop.execute(() -> register0(promise));} catch (Throwable t) {// 任务提交失败处理logger.warn("注册任务被 EventLoop 拒绝,强制关闭 Channel: {}", AbstractChannel.this, t);closeForcibly();AbstractChannel.this.closeFuture.setClosed();safeSetFailure(promise, t);}}
}/*** 实际执行 Channel 注册到 Selector 的核心方法。* 必须在 EventLoop 线程中调用,确保线程安全。* * @param promise 用于异步通知注册结果的 Promise 对象*/
private void register0(ChannelPromise promise) {try {// 1. 标记 Promise 为不可取消,并检查 Channel 是否处于打开状态if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}// 2. 记录是否为首次注册boolean firstRegistration = neverRegistered;// 3. 执行底层注册操作(如调用 Java NIO 的 SelectableChannel.register())AbstractChannel.this.doRegister();// 4. 更新注册状态neverRegistered = false;registered = true;// 5. 触发 HandlerAdded 回调(如果有)// 确保所有添加到 Pipeline 的 Handler 有机会执行初始化逻辑pipeline.invokeHandlerAddedIfNeeded();// 6. 标记注册成功safeSetSuccess(promise);// 7. 触发 ChannelRegistered 事件,通知 Pipeline 中的所有 Handlerpipeline.fireChannelRegistered();// 8. 如果 Channel 已处于活跃状态(如客户端已连接),触发相应事件if (isActive()) {if (firstRegistration) {// 首次注册且已活跃,触发 ChannelActive 事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {// 非首次注册但配置了自动读取,开始读取数据beginRead();}}} catch (Throwable t) {// 注册过程中发生异常,强制关闭 ChannelcloseForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}
4.2 根据注册结果处理绑定
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
调用AbstractBootstrap类的doBind0()
/*** 执行 Channel 绑定操作的核心逻辑。* 确保在 Channel 注册到 EventLoop 成功后,再异步执行绑定操作。* * @param regFuture 注册操作的 Future 对象,用于判断注册是否成功* @param channel 待绑定的 Channel* @param localAddress 要绑定的本地地址* @param promise 绑定操作的 Promise,用于异步通知结果*/
private static void doBind0(final ChannelFuture regFuture,final Channel channel,final SocketAddress localAddress,final ChannelPromise promise) {// 在 Channel 关联的 EventLoop 线程中执行绑定操作channel.eventLoop().execute(() -> {if (regFuture.isSuccess()) {// 注册成功,执行绑定操作,并添加失败自动关闭监听器channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {// 注册失败,将注册失败原因传递给绑定 Promisepromise.setFailure(regFuture.cause());}});
}
channel.bind会去调用AbstractChannel的pipeline对象,并执行对应的bind方法
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return this.pipeline.bind(localAddress, promise);
}
执行DefaultChannelPipeline类的bind方法
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return this.tail.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return this.tail.bind(localAddress, promise);
}
tail是AbstractChannelHandlerContext
对象,会调用AbstractChannelHandlerContext
类的bind方法
/*** 异步绑定 Channel 到指定的本地地址。* 通过 ChannelPipeline 触发出站事件,最终由底层传输层执行实际绑定操作。* * @param localAddress 要绑定的本地地址(如 InetSocketAddress)* @param promise 用于异步通知绑定结果的 Promise* @return 返回传入的 promise 对象,支持链式调用*/
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {// 校验参数:确保地址不为空ObjectUtil.checkNotNull(localAddress, "localAddress");// 验证 Promise 有效性(如未取消、与当前 Channel 关联)if (this.isNotValidPromise(promise, false)) {return promise; // 无效 Promise,直接返回}// 查找 Pipeline 中下一个支持 bind 操作的出站处理器上下文final AbstractChannelHandlerContext next = this.findContextOutbound(512); // 512 = BIND 操作掩码EventExecutor executor = next.executor();// 确保在 EventLoop 线程中执行绑定操作(关键!)if (executor.inEventLoop()) {// 当前线程已是 EventLoop 线程,直接调用next.invokeBind(localAddress, promise);} else {// 当前线程不是 EventLoop 线程,提交任务到 EventLoop 线程池safeExecute(executor, () -> next.invokeBind(localAddress, promise), promise, null, false);}return promise; // 返回 Promise 供调用者监听结果
}/*** 在当前 HandlerContext 中触发 bind 事件,调用对应处理器的 bind 方法。* 此方法会根据处理器类型进行不同的调用逻辑。* * @param localAddress 要绑定的本地地址* @param promise 用于异步通知绑定结果的 Promise*/
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {// 检查处理器是否启用(isRemoved() 为 false)if (this.invokeHandler()) {try {ChannelHandler handler = this.handler();DefaultChannelPipeline.HeadContext headContext = this.pipeline.head;// 根据处理器类型选择调用方式if (handler == headContext) {// 头部处理器:直接调用 HeadContext 的 bind 方法(最终触发底层网络绑定)headContext.bind(this, localAddress, promise);} else if (handler instanceof ChannelDuplexHandler) {// 双向处理器:调用其 bind 方法((ChannelDuplexHandler)handler).bind(this, localAddress, promise);} else if (handler instanceof ChannelOutboundHandlerAdapter) {// 出站处理器适配器:调用其 bind 方法(默认实现会传递给下一个处理器)((ChannelOutboundHandlerAdapter)handler).bind(this, localAddress, promise);} else {// 其他类型的出站处理器:强制转换并调用 bind 方法((ChannelOutboundHandler)handler).bind(this, localAddress, promise);}} catch (Throwable t) {// 处理异常:通知出站处理器链发生异常notifyOutboundHandlerException(t, promise);}} else {// 处理器已移除,将事件传递给下一个处理器this.bind(localAddress, promise);}
}
根据handler的类型调用不同的bind方法
以headContext为例:
headContext这个bind方法会调用AbstractChannel的bind方法
/*** 执行 Channel 到指定本地地址的底层绑定操作。* 此方法由 Channel 的 unsafe 接口实现,必须在 EventLoop 线程中调用。* * @param localAddress 要绑定的本地地址(如 InetSocketAddress)* @param promise 用于异步通知绑定结果的 Promise*/
public final void bind(SocketAddress localAddress, ChannelPromise promise) {// 断言当前线程是 Channel 关联的 EventLoop 线程(确保线程安全)this.assertEventLoop();// 标记 Promise 为不可取消,并检查 Channel 是否处于打开状态if (promise.setUncancellable() && this.ensureOpen(promise)) {// 处理广播地址警告(非 root 用户绑定非通配地址可能无法接收广播包)if (Boolean.TRUE.equals(AbstractChannel.this.config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress)localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {logger.warn("非 root 用户绑定非通配地址({})可能无法接收广播包,但仍按请求继续绑定", localAddress);}// 记录绑定前的 Channel 活跃状态boolean wasActive = AbstractChannel.this.isActive();try {// 执行实际的底层绑定操作(由子类实现,如 Java NIO 的 ServerSocketChannel.bind())AbstractChannel.this.doBind(localAddress);} catch (Throwable t) {// 绑定失败处理:标记 Promise 为失败,并在 Channel 已关闭时执行清理this.safeSetFailure(promise, t);this.closeIfClosed();return;}// 若绑定后 Channel 变为活跃状态(之前不活跃),触发 ChannelActive 事件if (!wasActive && AbstractChannel.this.isActive()) {// 异步触发事件,避免在关键路径中执行用户代码this.invokeLater(() -> AbstractChannel.this.pipeline.fireChannelActive());}// 标记 Promise 为成功this.safeSetSuccess(promise);}
}protected abstract void doBind(SocketAddress var1) throws Exception;
调用ServerSocketChannel
的doBind
方法
@SuppressJava6Requirement(reason = "Usage guarded by java version check"
)
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {this.javaChannel().bind(localAddress, this.config.getBacklog());} else {this.javaChannel().socket().bind(localAddress, this.config.getBacklog());}}public abstract ServerSocketChannel bind(SocketAddress local, int backlog)throws IOException;
上面的bind
方法会调用ServerSocketChannelImpl的具体实现方法bind
/*** 将 ServerSocketChannel 绑定到指定的本地地址,并设置最大连接队列长度。* 此方法是 Java NIO 底层网络操作的核心实现,用于创建 TCP 服务器。* * @param local 要绑定的本地地址(如 InetSocketAddress),若为 null 则绑定到通配地址(0.0.0.0)* @param backlog 最大连接队列长度(等待 accept 的连接数),若小于 1 则使用默认值 50* @return 绑定后的 ServerSocketChannel 自身,支持链式调用* @throws IOException 若发生 I/O 错误(如端口已被占用)* @throws ClosedChannelException 若 Channel 已关闭* @throws AlreadyBoundException 若 Channel 已绑定*/
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {// 使用内部锁确保操作的线程安全性(同一时间只能有一个线程执行绑定)synchronized(this.lock) {// 校验 Channel 状态:必须处于打开且未绑定状态if (!this.isOpen()) {throw new ClosedChannelException();} else if (this.isBound()) {throw new AlreadyBoundException();}// 处理地址参数:若为 null 则绑定到通配地址(0.0.0.0)并随机分配端口InetSocketAddress addr = local == null ? new InetSocketAddress(0) : Net.checkAddress(local);// 安全检查:验证当前线程是否有监听指定端口的权限// (例如:非 root 用户尝试监听 1-1023 之间的特权端口会被拒绝)SecurityManager sm = System.getSecurityManager();if (sm != null) {sm.checkListen(addr.getPort());}// 平台相关的 TCP 绑定前准备(如设置 TCP 参数、权限检查等)NetHooks.beforeTcpBind(this.fd, addr.getAddress(), addr.getPort());try {// 1. 执行底层 bind 操作(通过 JNI 调用操作系统的 socket.bind())// 将套接字与指定 IP 地址和端口号关联Net.bind(this.fd, addr.getAddress(), addr.getPort());// 2. 执行底层 listen 操作(通过 JNI 调用操作系统的 socket.listen())// 将套接字转换为监听状态,开始接受客户端连接// backlog 参数指定等待 accept 的最大连接数(TCP 半连接队列长度)Net.listen(this.fd, backlog < 1 ? 50 : backlog);} catch (IOException e) {// 绑定失败时尝试关闭文件描述符,避免资源泄漏try {this.fd.close();} catch (IOException suppressed) {e.addSuppressed(suppressed);}throw e;}// 更新本地地址信息(绑定成功后才能获取实际绑定的地址和端口)// 例如:若绑定到 0.0.0.0:0,这里会获取系统分配的实际端口号synchronized(this.stateLock) {this.localAddress = Net.localAddress(this.fd);}return this;}
}
如有错误,欢迎指正!
部分图源来自网络,侵删!