【Netty源码分析总结】
文章目录
- Netty介绍
- EventLoop开启扫描事件源码
- Netty客户端和服务端源码分析:
- Netty的注意点
Netty介绍
业务执行流程:
Netty 是一个高性能、异步事件驱动的网络应用框架,主要用于快速开发可靠、可维护的服务端和客户端网络程序,特别适合处理高并发、大吞吐量的网络通信场景。
Netty源码分析流程:
EventLoop开启扫描事件源码
NioEventLoopGroup继承的类:
NioEventLoop中启动Run()方法,扫描注册事件:
SingleThreadEventExecutor->execute()->startThread(),一般是非EventLoop得IO线程,才会执行startThread()方法
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();addTask(task);if (!inEventLoop) {startThread();if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}}
注意点:为什么必须是非EventLoop线程才可以执行startThread()方法:
1.保证线程只启动一次
如果当前调用线程已经是该 NioEventLoop 绑定的线程(inEventLoop() 返回 true),说明线程已经启动且正在执行 run(),这时不需要重复启动。
2.避免死锁和线程安全问题
在 EventLoop 线程内调用 startThread(),如果这个方法还要启动线程,反而可能引发死锁或状态异常。因为线程已经在跑了。
3.延迟启动线程
Netty 设计中,线程的启动是延迟的,只有真正需要处理任务时才启动,避免资源浪费。
Netty是如何避免Nio的空轮训的:
1.Selector 重建机制当连续空轮询超过阈值时,自动重建 selector;
2.任务感知机制如果有任务可执行,即使 select 返回 0 也不会判定为空轮询;
3.wakeup 优化避免不必要的唤醒导致 select 提前返回;
4.版本兼容建议避免使用易触发 epoll 空轮询 bug 的 JDK 版本;
5.用户可调参数SELECTOR_AUTO_REBUILD_THRESHOLD 可配置,适应不同业务场景;
2.启动扫描NioEventLoop->run()->processSelectedKeys()
protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Let's rebuild// the selector and retry. https://github.com/netty/netty/issues/8566rebuildSelector0();handleLoopException(e);continue;}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}
注意点:为什么要设置一个IORatio比例:
IORatio比例,默认50%,EventLoop中默认有两种任务,一种是IO任务,另一种是taskQueue中的任务。如果IO时间比例过高以及过多,就会导致taskQueue中的任务饿死,如果IO时间比例过低,大量的时间都在执行普通任务,如果普通任务过多,也会导致IO一直等待,导致IO饿死。
3.执行轮训的核心策略processSelectedKeys()->processSelectedKeysOptimized():
private void processSelectedKeys() {if (selectedKeys != null) {//不用JDK的selector.selectedKeys(), 性能更好(1%-2%),垃圾回收更少processSelectedKeysOptimized();} else {//JDK的selector.selectedKeys()processSelectedKeysPlain(selector.selectedKeys());}}private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];// See https://github.com/netty/netty/issues/2363selectedKeys.keys[i] = null;//清空数组元素,在通道关闭后对其进行GC//AbstractNioChannel中的doRegister()方法中//selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);final Object a = k.attachment();//NioSocketChannelif (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GC'ed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys.reset(i + 1);selectAgain();i = -1;}}}
注意点:在这里Netty相比较于JDK默认的Selector做了优化,优化原因:
JDK的实现:每次 select 后都会得到一个新的 Set,这个 selectedKeys:
是一个 HashSet;
迭代性能不高;
清理时会涉及迭代删除;
会产生大量临时对象,增加 GC 压力。
Netty的实现:SelectedSelectionKeySet对象:
用一个固定大小的数组(SelectionKey[])替代 HashSet;
无需装箱、拆箱;
避免 iterator 和 entrySet;
手动维护数组下标,实现高效 add/clear;
使用反射替换掉 SelectorImpl.selectedKeys 的默认实现!
4.NioEventLoop->ProcessSelectKey()扫描时间OP_CONNECT、OP_ACCEPT、OP_READ、OP_WRITE
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {// If the channel implementation throws an exception because there is no event loop, we ignore this// because we are only trying to determine if ch is registered to this event loop and thus has authority// to close ch.return;}// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is// still healthy and should not be closed.// See https://github.com/netty/netty/issues/5125if (eventLoop != this || eventLoop == null) {return;}// close the channel if the key is not valid anymoreunsafe.close(unsafe.voidPromise());return;}try {int readyOps = k.readyOps();// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise// the NIO JDK channel implementation may throw a NotYetConnectedException.if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking// See https://github.com/netty/netty/issues/924int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) {// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to writech.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loop//处理读请求(断开连接)或接入连接if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}}
5.重点分析读OP_WRITE事件
AbstractChannel->flush0()->NioSocketChannel->doWrite()
protected void doWrite(ChannelOutboundBuffer in) throws Exception {SocketChannel ch = javaChannel();//有数据要写,且能写入,这最多尝试16次int writeSpinCount = config().getWriteSpinCount();do {if (in.isEmpty()) {// All written so clear OP_WRITE//数据都写完了,不用也不需要写16次clearOpWrite();// Directly return here so incompleteWrite(...) is not called.return;}// Ensure the pending writes are made of ByteBufs only.int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();//最多返回1024个数据,总的size尽量不超过maxBytesPerGatheringWriteByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);int nioBufferCnt = in.nioBufferCount();// Always us nioBuffers() to workaround data-corruption.// See https://github.com/netty/netty/issues/2761switch (nioBufferCnt) {//是否是单个数据,还是批量数据case 0:// We have something else beside ByteBuffers to write so fallback to normal writes.writeSpinCount -= doWrite0(in);break;case 1: {//单个数据// Only one ByteBuf so use non-gathering write// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need// to check if the total size of all the buffers is non-zero.ByteBuffer buffer = nioBuffers[0];int attemptedBytes = buffer.remaining();final int localWrittenBytes = ch.write(buffer);if (localWrittenBytes <= 0) {incompleteWrite(true);return;}adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);//从ChannelOutboundBuffer中移除已经写出的数据in.removeBytes(localWrittenBytes);--writeSpinCount;break;}default: {//批量数据// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need// to check if the total size of all the buffers is non-zero.// We limit the max amount to int above so cast is safelong attemptedBytes = in.nioBufferSize();final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);if (localWrittenBytes <= 0) {//缓存区满了,写不进去了,注册写事件。incompleteWrite(true);return;}// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,maxBytesPerGatheringWrite);in.removeBytes(localWrittenBytes);--writeSpinCount;break;}}} while (writeSpinCount > 0);//写了16次数据,还是没有写完,直接schedule一个新的flush task出来。而不是注册写事件。incompleteWrite(writeSpinCount < 0);}
注意点:1.Netty 处理OP_Write事件,我们为什么要限制16次
在 Netty 中,处理 OP_WRITE(写就绪)事件时,我们限制最多连续写 16 次(默认),这是为了在高性能和公平性之间取得平衡。
Netty 默认只写 16 次,即使没写完也会暂停:
每次尝试 write(),如果没写完,就再试;
最多尝试 16 次,不管是否完成,都会“让出”线程;
下一次 OP_WRITE 事件或 flush() 调用时继续写。
6.分析OP_ACCEPT操作
AbstractNioMessageChannel->read()
注意核心方法:doReadMessage()
public void read() {assert eventLoop().inEventLoop();final ChannelConfig config = config();final ChannelPipeline pipeline = pipeline();final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed = false;Throwable exception = null;try {try {do {//核心方法,为新链接创建一个NioSocketChannel通道int localRead = doReadMessages(readBuf);if (localRead == 0) {break;}if (localRead < 0) {closed = true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception = t;}int size = readBuf.size();for (int i = 0; i < size; i ++) {readPending = false;pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete();if (exception != null) {closed = closeOnReadError(exception);pipeline.fireExceptionCaught(exception);}if (closed) {inputShutdown = true;if (isOpen()) {close(voidPromise());}}} finally {// Check if there is a readPending which was not processed yet.// This could be for two reasons:// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method//// See https://github.com/netty/netty/issues/2254if (!readPending && !config.isAutoRead()) {removeReadOp();}}}}
NioServerSocketChannel->doReadMessage()
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {//接受新连接创建SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());try {if (ch != null) {//创建一个新的NioSocketChannel添加到Buffer中buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);try {ch.close();} catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);}}return 0;}
AbstractChannelHandlerContext->invokeChannelRead()
private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {核心((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);}}
ServerBootstrapAcceptor->channelRead()
这个方法就是将BoosGroup的OP_ACCEPT事件,注册到workGroup上的EventLoopGroup上。
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;// 为客户端连接设置 handler、options、attributeschild.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}
MultithreadEventLoopGroup.register()来选择数组中位置的EventLoop然后再进行注册:
SingleThreadEventLoop.register() 注册的核心方法。
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}
Netty客户端和服务端源码分析:
在 Netty 中,Bootstrap 和 ServerBootstrap 是客户端和服务端启动的核心辅助类,分别用于初始化、配置并启动客户端 Channel 和服务端 Channel。
ServerBootStrap源码分析:
1.AnsractBootStrap->initRegister()
final ChannelFuture initAndRegister() {Channel channel = null;try {channel = channelFactory.newChannel();// 1、创建一个ServerSocketChannel(NioServerSocketChannel)init(channel);//2、初始化ServerSocketChannel(NioServerSocketChannel)} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}//3、Register:给ServerSocketChannel从Bossgroup中选择一个NioEventLoop//选择一个EventLoop注册channelChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;}
Netty的注意点
一、内存泄漏
原因:
使用了 ByteBuf 手动管理内存,未正确调用 release() 导致内存未释放。
长时间缓存 ChannelHandlerContext 或 Channel 等引用。
没有关闭 EventLoopGroup 或定时任务未取消。
解决方式:
使用 SimpleLeakAwareByteBuf 检查泄漏。
配合 -Dio.netty.leakDetection.level=PARANOID 启用泄漏检测。
确保手动分配的资源(如 ByteBufAllocator)释放。
二、业务阻塞导致 IO 线程卡死
原因:
在 Netty 的 EventLoop 中执行了阻塞操作(数据库访问、文件 IO 等)。
解决方式:
将耗时操作 offload 到业务线程池。
使用 DefaultEventExecutorGroup 将某些 Handler 放入独立线程执行。
三、Netty 中的“粘包/拆包问题”是 TCP 通信中常见的问题之一。
原因:
应用层写入数据过快 多次 write() 被合并发送
接收缓冲区处理慢 多个包积压
网络缓冲 TCP 层做了 Nagle 合并或乱序到达
消息本身无边界 一段 byte[] 不包含长度/结束符,无法分辨。
解决方式:
使用 Netty 提供的 LengthFieldBasedFrameDecoder 等解码器。
协议设计要明确消息长度或使用特定分隔符
四、内存使用过高或 Full GC 频繁
原因:
分配过多堆外内存(DirectMemory),导致系统内存紧张。
使用堆内 ByteBuf 并频繁拷贝数据。
定时任务或回调未清理导致对象无法 GC。
解决方式:
控制 PooledByteBufAllocator 使用,避免大块内存泄漏。
定期监控堆外内存使用量(Netty direct memory)。
使用 jmap, jstack 配合排查对象泄漏。