当前位置: 首页 > news >正文

Netty 的 Select/Poll 机制核心实现主要在 NioEventLoop 的事件循环

Netty 的 Select/Poll 机制核心实现主要在 NioEventLoop 的事件循环中,其设计结合了 I/O 事件处理、任务调度和资源管理。以下是关键机制分析:


1. 事件循环核心流程 (run())

java

for (;;) {int strategy = selectStrategy.calculateStrategy(...);switch (strategy) {case SelectStrategy.SELECT:long deadline = nextScheduledTaskDeadlineNanos();if (!hasTasks()) {strategy = select(deadline); // 阻塞式 Select}nextWakeupNanos.lazySet(AWAKE); // 重置唤醒状态break;// ... 其他策略处理}// I/O 事件与任务处理if (ioRatio == 100) {processSelectedKeys();   // 处理 I/O 事件runAllTasks();           // 运行所有任务} else {long ioStartTime = System.nanoTime();processSelectedKeys();long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按比例运行任务}// 异常与关闭处理if (isShuttingDown()) {closeAll();if (confirmShutdown()) return;}
}
  • Select 策略

    • SELECT:无任务时阻塞 Select,超时时间取最近定时任务的截止时间(deadline),避免错过定时任务。

    • BUSY_WAIT:Netty 未实现(NIO 不支持)。

    • CONTINUE:跳过本次循环(内部状态触发)。

  • I/O 与任务平衡

    • ioRatio 控制 I/O 事件与任务执行的时间比例(默认 50)。

    • 若 ioRatio=100,先处理所有 I/O 事件再处理任务。

    • 否则按比例分配任务执行时间:任务时间 = I/O 时间 * (100 - ioRatio) / ioRatio


2. Select 优化机制

2.1 唤醒与阻塞控制
  • nextWakeupNanos:记录下一次唤醒时间,用于避免无效的 Selector.wakeup() 调用。

  • 阻塞释放:Select 结束后设置 nextWakeupNanos=AWAKE,标识线程已唤醒。

2.2 重建 Selector

java

catch (IOException e) {rebuildSelector0(); // 重建 SelectorselectCnt = 0;continue;
}
  • 当捕获 IOException 时(如 JDK Selector 空轮询 Bug),重建 Selector 并迁移 Channel。

2.3 规避无效唤醒

java

if (unexpectedSelectorWakeup(selectCnt)) {selectCnt = 0; // 重置连续 Select 次数
}
  • selectCnt:记录连续 Select 返回的次数。

  • MIN_PREMATURE_SELECTOR_RETURNS:阈值(默认 256),超过则判定为无效唤醒,可能触发 Selector 重建。


3. I/O 事件处理 (processSelectedKeys)

3.1 优化 SelectedKeys 遍历

java

private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {SelectionKey k = selectedKeys.keys[i];selectedKeys.keys[i] = null; // 显式置空加速 GCObject a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);}// ... 处理 NioTask}
}
  • 优化点:使用数组 (SelectedSelectionKeySet) 替代 JDK 的 HashSet,减少迭代器开销。

  • 显式置空:避免 Channel 关闭后内存滞留(Netty 修复 JDK 内存泄漏问题)。

3.2 事件分发逻辑 (processSelectedKey)

java

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {k.interestOps(ops & ~SelectionKey.OP_CONNECT); // 移除 CONNECT 监听unsafe.finishConnect(); // 完成连接
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush(); // 处理写事件
}
if ((readyOps & (OP_READ | OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read(); // 处理读/接受连接事件
}
  • 优先级:先处理 OP_CONNECT > OP_WRITE > OP_READ/OP_ACCEPT

  • 特殊 case:处理 JDK Bug(readyOps=0 时仍可能触发读事件)。


4. 读事件处理 (NioByteUnsafe.read())

java

do {byteBuf = allocHandle.allocate(allocator);int bytesRead = doReadBytes(byteBuf);if (bytesRead <= 0) {if (bytesRead < 0) close = true; // EOF 关闭连接break;}pipeline.fireChannelRead(byteBuf); // 触发事件
} while (allocHandle.continueReading());
  • 自适应缓冲区:通过 RecvByteBufAllocator 动态调整缓冲区大小(如 AdaptiveRecvByteBufAllocator)。

  • 循环读取:持续读取直到无数据或达到上限(避免饿死其他 Channel)。

  • 异常处理

    • 读异常时释放缓冲区,触发 exceptionCaught

    • 若需关闭,根据配置决定半关闭或全关闭。


5. 任务调度与线程模型

5.1 任务提交 (execute())

java

public void execute(Runnable task) {execute(task, wakesUpForTask(task));
}
  • 唤醒控制:若任务需立即执行(非 LazyRunnable),调用 wakeup() 唤醒阻塞的 Selector。

5.2 线程生命周期 (doStartThread())
  • 单线程执行:通过 executor.execute() 启动唯一事件循环线程。

  • 优雅关闭

    1. 标记状态为 ST_SHUTTING_DOWN

    2. 执行剩余任务和关闭钩子。

    3. 拒绝新任务(状态置为 ST_SHUTDOWN)。

    4. 清理资源(如 FastThreadLocal),通知终止 Future。


总结:Netty Select/Poll 机制特点

  1. 高效事件循环

    • 融合 I/O 事件、任务调度、定时任务。

    • 按 ioRatio 平衡 I/O 与任务执行时间。

  2. Select 优化

    • 超时时间绑定定时任务。

    • 规避 JDK Selector 空轮询(重建机制)。

    • 优化 SelectedKeys 遍历(数组 + 显式置空)。

  3. 健壮性设计

    • 异常处理分离(I/O 异常 vs 循环异常)。

    • 连接/读写事件分层处理。

  4. 资源管理

    • 读缓冲区自适应分配。

    • Channel 生命周期绑定事件循环。

    • 优雅关闭确保任务不丢失。

此机制使 Netty 在高并发下保持低延迟,同时避免传统 NIO 的常见陷阱(如空轮询、资源泄漏)。 

##源码

@Overrideprotected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {// This update is just to help block unnecessary selector wakeups// so use of lazySet is ok (no race condition)nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {processSelectedKeys();}} finally {// Ensure we always run tasks.ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)selectCnt = 0;}} catch (CancelledKeyException e) {// Harmless exception - log anywayif (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = state;if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {if (logger.isErrorEnabled()) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +"be called before run() implementation terminates.");}}try {// Run all remaining tasks and shutdown hooks. At this point the event loop// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for// graceful shutdown with quietPeriod.for (;;) {if (confirmShutdown()) {break;}}// Now we want to make sure no more tasks can be added from this point. This is// achieved by switching the state. Any new tasks beyond this point will be rejected.for (;;) {int oldState = state;if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {break;}}// We have the final set of tasks in the queue now, no more can be added, run all remaining.// No need to loop here, this is the final pass.confirmShutdown();} finally {try {cleanup();} finally {// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify// the future. The user may block on the future and once it unblocks the JVM may terminate// and start unloading classes.// See https://github.com/netty/netty/issues/6596.FastThreadLocal.removeAll();STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.countDown();int numUserTasks = drainTasks();if (numUserTasks > 0 && logger.isWarnEnabled()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + numUserTasks + ')');}terminationFuture.setSuccess(null);}}}}});}private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// null out entry in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;final Object a = k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop == this) {// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());}return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}protected class NioByteUnsafe extends AbstractNioUnsafe {private void closeOnRead(ChannelPipeline pipeline) {if (!isInputShutdown0()) {if (isAllowHalfClosure(config())) {shutdownInput();pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);} else {close(voidPromise());}} else {inputClosedSeenErrorOnRead = true;pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);}}private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,RecvByteBufAllocator.Handle allocHandle) {if (byteBuf != null) {if (byteBuf.isReadable()) {readPending = false;pipeline.fireChannelRead(byteBuf);} else {byteBuf.release();}}allocHandle.readComplete();pipeline.fireChannelReadComplete();pipeline.fireExceptionCaught(cause);if (close || cause instanceof IOException) {closeOnRead(pipeline);}}@Overridepublic final void read() {final ChannelConfig config = config();if (shouldBreakReadReady(config)) {clearReadPending();return;}final ChannelPipeline pipeline = pipeline();final ByteBufAllocator allocator = config.getAllocator();final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf = null;boolean close = false;try {do {byteBuf = allocHandle.allocate(allocator);allocHandle.lastBytesRead(doReadBytes(byteBuf));if (allocHandle.lastBytesRead() <= 0) {// nothing was read. release the buffer.byteBuf.release();byteBuf = null;close = allocHandle.lastBytesRead() < 0;if (close) {// There is nothing left to read as we received an EOF.readPending = false;}break;}allocHandle.incMessagesRead(1);readPending = false;pipeline.fireChannelRead(byteBuf);byteBuf = null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete();if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}@Overridepublic void execute(Runnable task) {ObjectUtil.checkNotNull(task, "task");execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));}  

http://www.xdnf.cn/news/1312615.html

相关文章:

  • 同创物流学习记录1
  • 【论文阅读】Multimodal Graph Contrastive Learning for Multimedia-based Recommendation
  • 从冒泡到快速排序:探索经典排序算法的奥秘(二)
  • 如果构建企业本地的ERP智能ai系统,让先进的大模型数据处理ERP的各类数据,更加轻松智能,准确?从企业资源计划ERP变成企业资源智能EPA的升级
  • 基本电子元件:金属氧化膜电阻器
  • 玩转tokenizer
  • vscode中用python调用matlab的函数(环境安装)
  • SpringSecurity(一)入门
  • Winsows系统去除右键文件显示的快捷列表
  • 【完整源码+数据集+部署教程】高尔夫球追踪与识别系统源码和数据集:改进yolo11-LAWDS
  • Hexo 双分支部署指南:从原理到 Netlify 实战
  • C# 应用特性的更多内容:多维度解析与深度拓展
  • 启发式合并 + 莫队 恋恋的心跳大冒险
  • 设计索引的原则有哪些?
  • 八、SpringBoot项目热部署
  • 嵌入式硬件篇---电源电路
  • pwn定时器,ARM定时delay 外部中断用函数指针(统一)day55,56
  • 19.3 Transformers量化模型极速加载指南:4倍推理加速+75%显存节省实战
  • 头文件包含和前置声明
  • 什么是微前端?
  • 超越Transformer:大模型架构创新的深度探索
  • 数据结构:二叉平衡树
  • OpenCV 图像处理基础操作指南(二)
  • ClickHouse的学习与了解
  • 概率论基础教程第3章条件概率与独立性(三)
  • Linux sar命令详细使用指南
  • Qt 动态属性(Dynamic Property)详解
  • Qt 关于QString和std::string数据截断的问题- 遇到\0或者0x00如何处理?
  • 【经典上穿突破】副图/选股指标,双均线交叉原理,对价格波动反应灵敏,适合捕捉短期启动点
  • [1Prompt1Story] 注意力机制增强 IPCA | 去噪神经网络 UNet | U型架构分步去噪