Netty 的 Select/Poll 机制核心实现主要在 NioEventLoop 的事件循环
Netty 的 Select/Poll 机制核心实现主要在 NioEventLoop
的事件循环中,其设计结合了 I/O 事件处理、任务调度和资源管理。以下是关键机制分析:
1. 事件循环核心流程 (run()
)
java
for (;;) {int strategy = selectStrategy.calculateStrategy(...);switch (strategy) {case SelectStrategy.SELECT:long deadline = nextScheduledTaskDeadlineNanos();if (!hasTasks()) {strategy = select(deadline); // 阻塞式 Select}nextWakeupNanos.lazySet(AWAKE); // 重置唤醒状态break;// ... 其他策略处理}// I/O 事件与任务处理if (ioRatio == 100) {processSelectedKeys(); // 处理 I/O 事件runAllTasks(); // 运行所有任务} else {long ioStartTime = System.nanoTime();processSelectedKeys();long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按比例运行任务}// 异常与关闭处理if (isShuttingDown()) {closeAll();if (confirmShutdown()) return;} }
Select 策略:
SELECT
:无任务时阻塞 Select,超时时间取最近定时任务的截止时间(deadline
),避免错过定时任务。BUSY_WAIT
:Netty 未实现(NIO 不支持)。CONTINUE
:跳过本次循环(内部状态触发)。
I/O 与任务平衡:
ioRatio
控制 I/O 事件与任务执行的时间比例(默认 50)。若
ioRatio=100
,先处理所有 I/O 事件再处理任务。否则按比例分配任务执行时间:
任务时间 = I/O 时间 * (100 - ioRatio) / ioRatio
。
2. Select 优化机制
2.1 唤醒与阻塞控制
nextWakeupNanos
:记录下一次唤醒时间,用于避免无效的Selector.wakeup()
调用。阻塞释放:Select 结束后设置
nextWakeupNanos=AWAKE
,标识线程已唤醒。
2.2 重建 Selector
java
catch (IOException e) {rebuildSelector0(); // 重建 SelectorselectCnt = 0;continue; }
当捕获
IOException
时(如 JDK Selector 空轮询 Bug),重建 Selector 并迁移 Channel。
2.3 规避无效唤醒
java
if (unexpectedSelectorWakeup(selectCnt)) {selectCnt = 0; // 重置连续 Select 次数 }
selectCnt
:记录连续 Select 返回的次数。MIN_PREMATURE_SELECTOR_RETURNS
:阈值(默认 256),超过则判定为无效唤醒,可能触发 Selector 重建。
3. I/O 事件处理 (processSelectedKeys
)
3.1 优化 SelectedKeys 遍历
java
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {SelectionKey k = selectedKeys.keys[i];selectedKeys.keys[i] = null; // 显式置空加速 GCObject a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);}// ... 处理 NioTask} }
优化点:使用数组 (
SelectedSelectionKeySet
) 替代 JDK 的HashSet
,减少迭代器开销。显式置空:避免 Channel 关闭后内存滞留(Netty 修复 JDK 内存泄漏问题)。
3.2 事件分发逻辑 (processSelectedKey
)
java
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {k.interestOps(ops & ~SelectionKey.OP_CONNECT); // 移除 CONNECT 监听unsafe.finishConnect(); // 完成连接 } if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush(); // 处理写事件 } if ((readyOps & (OP_READ | OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read(); // 处理读/接受连接事件 }
优先级:先处理
OP_CONNECT
>OP_WRITE
>OP_READ/OP_ACCEPT
。特殊 case:处理 JDK Bug(
readyOps=0
时仍可能触发读事件)。
4. 读事件处理 (NioByteUnsafe.read()
)
java
do {byteBuf = allocHandle.allocate(allocator);int bytesRead = doReadBytes(byteBuf);if (bytesRead <= 0) {if (bytesRead < 0) close = true; // EOF 关闭连接break;}pipeline.fireChannelRead(byteBuf); // 触发事件 } while (allocHandle.continueReading());
自适应缓冲区:通过
RecvByteBufAllocator
动态调整缓冲区大小(如AdaptiveRecvByteBufAllocator
)。循环读取:持续读取直到无数据或达到上限(避免饿死其他 Channel)。
异常处理:
读异常时释放缓冲区,触发
exceptionCaught
。若需关闭,根据配置决定半关闭或全关闭。
5. 任务调度与线程模型
5.1 任务提交 (execute()
)
java
public void execute(Runnable task) {execute(task, wakesUpForTask(task)); }
唤醒控制:若任务需立即执行(非
LazyRunnable
),调用wakeup()
唤醒阻塞的 Selector。
5.2 线程生命周期 (doStartThread()
)
单线程执行:通过
executor.execute()
启动唯一事件循环线程。优雅关闭:
标记状态为
ST_SHUTTING_DOWN
。执行剩余任务和关闭钩子。
拒绝新任务(状态置为
ST_SHUTDOWN
)。清理资源(如
FastThreadLocal
),通知终止 Future。
总结:Netty Select/Poll 机制特点
高效事件循环:
融合 I/O 事件、任务调度、定时任务。
按
ioRatio
平衡 I/O 与任务执行时间。
Select 优化:
超时时间绑定定时任务。
规避 JDK Selector 空轮询(重建机制)。
优化 SelectedKeys 遍历(数组 + 显式置空)。
健壮性设计:
异常处理分离(I/O 异常 vs 循环异常)。
连接/读写事件分层处理。
资源管理:
读缓冲区自适应分配。
Channel 生命周期绑定事件循环。
优雅关闭确保任务不丢失。
此机制使 Netty 在高并发下保持低延迟,同时避免传统 NIO 的常见陷阱(如空轮询、资源泄漏)。
##源码
@Overrideprotected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {processSelectedKeys();}} finally {// Ensure we always run tasks.ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt = 0;}} catch (CancelledKeyException e) {// Harmless exception - log anywayif (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {if (logger.isErrorEnabled()) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +"be called before run() implementation terminates.");}}try {// Run all remaining tasks and shutdown hooks. At this point the event loop// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for// graceful shutdown with quietPeriod.for (;;) {if (confirmShutdown()) {break;}}// Now we want to make sure no more tasks can be added from this point. This is// achieved by switching the state. Any new tasks beyond this point will be rejected.for (;;) {int oldState = state;if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {break;}}// We have the final set of tasks in the queue now, no more can be added, run all remaining.// No need to loop here, this is the final pass.confirmShutdown();} finally {try {cleanup();} finally {// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify// the future. The user may block on the future and once it unblocks the JVM may terminate// and start unloading classes.// See https://github.com/netty/netty/issues/6596.FastThreadLocal.removeAll();STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.countDown();int numUserTasks = drainTasks();if (numUserTasks > 0 && logger.isWarnEnabled()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + numUserTasks + ')');}terminationFuture.setSuccess(null);}}}}});}private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop == this) {// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());}return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}protected class NioByteUnsafe extends AbstractNioUnsafe {private void closeOnRead(ChannelPipeline pipeline) {if (!isInputShutdown0()) {if (isAllowHalfClosure(config())) {shutdownInput();pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);} else {close(voidPromise());}} else {inputClosedSeenErrorOnRead = true;pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);}}private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,RecvByteBufAllocator.Handle allocHandle) {if (byteBuf != null) {if (byteBuf.isReadable()) {readPending = false;pipeline.fireChannelRead(byteBuf);} else {byteBuf.release();}}allocHandle.readComplete();pipeline.fireChannelReadComplete();pipeline.fireExceptionCaught(cause);if (close || cause instanceof IOException) {closeOnRead(pipeline);}}@Overridepublic final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator);allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}@Overridepublic void execute(Runnable task) {ObjectUtil.checkNotNull(task, "task");execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));}