【Netty】- 入门2
Future & Promise
netty中的Future是继承自jdk中的Future;netty中的Promise接口是继承自netty中的Future接口。
- jdk Future:只能同步等待任务结束(成功 或 失败)才能得到结果
- netty Future:可以同步等待任务结束后得到结果,也可以异步方式得到结果,但是都需要等任务结束
- netty Promise:不仅有netty Future功能,也脱离独立任务存在,
jdkFuture
@Slf4j
public class Test01JdkFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 线程池ExecutorService service = Executors.newFixedThreadPool(2);// 2. 提交任务Future<Integer> future = service.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("执行计算");Thread.sleep(1000);return 50;}});// 3. 主线程通过future获取结果log.debug("等待结果");log.debug("结果是:{}", future.get()); // 阻塞方法(同步等待)}
}
理解:相当于另一个线程给主线程了一个背包,把这个线程返回的结果装到背包里。(这个过程对于主线程来说是被动的,因为主线程没法装结果,执行线程才能装结果)
nettyFuture
同步方式
@Slf4j
public class Test02NettyFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("执行计算");Thread.sleep(1000);return 50;}}); // 返回netty包下的Futurelog.debug("等待结果");log.debug("结果是:{}", future.get()); // 阻塞方法(同步等待)}
}
主线程获取结果:
异步方式
@Slf4j
public class Test03NettyFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("执行计算");Thread.sleep(1000);return 50;}}); // 返回netty包下的Futurefuture.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {log.debug("结果是:{}", future.getNow());}});}
}
由执行者(nio线程)获取结果
nettyPromise
@Slf4j
public class Test04NettyPromise {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 准备EventLoop对象NioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();// 2. 可以主动创建DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);new Thread(()->{// 3. 任意一个线程执行计算,完毕后可以向Promise填充结果System.out.println("开始计算");try {int a = 1 / 0;Thread.sleep(1000);promise.setSuccess(80);} catch (Exception e) {e.printStackTrace();promise.setFailure(e);}}).start();// 4. 接收结果log.debug("等待结果");log.debug("结果是:{}", promise.get());}
}
- promise.setSuccess():返回的结果由主线程创建,而不是子线程创建
- promise.setFailure():返回主线程错误的结果
Handler & Pipeline
ChannelHandler是用来处理Channel上的各种事件,分为入站、出站。所有的ChannelHandler被连成一串就是PipeLine。
- 入站处理器:ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
- 出站处理器:ChannelOutboundHandlerAdapter的子类,主要对协会结果进行加工
Pipeline
在下边的代码中,
- 入站处理器:h1、h2、h3
- 出站处理器:h4、h5、h6
如果没有写入数据,出站处理器相当于形同虚设,是不会执行的;此时只会处理入站处理器。
只有写入数据后,出站处理器才会执行;出站处理器的执行顺序和入站处理器是反过来的
@Slf4j
public class Test01Pipeline {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 1. 通过channel拿到pipelineChannelPipeline pipeline = ch.pipeline();/*2. 添加处理器:netty默认会添加两个handler:headHandler -> tailHandler把处理器添加到tailHanler之前(headHandler -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 -> tailHandler)*/pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");super.channelRead(ctx, msg);}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("2");super.channelRead(ctx, msg);}});pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("3");super.channelRead(ctx, msg);ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));}});pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}});pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("5");super.write(ctx, msg, promise);}});pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("6");super.write(ctx, msg, promise);}});}}).bind(8080);}
}
inBoundHandler(入站处理器)
@Slf4j
public class Test02InBoundHandler {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");ByteBuf buf = (ByteBuf) msg;String name = buf.toString(Charset.defaultCharset());super.channelRead(ctx, msg);}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {log.debug("2");Student student = new Student(name.toString());super.channelRead(ctx, student);}});pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("3,结果:{},class:{}", msg, msg.getClass());ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));}});}}).bind(8080);}@AllArgsConstructor@Datastatic class Student {private String name;}
}
outBoundHandler(出站处理器)
@Slf4j
public class Test03OutBoundHandler {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");super.channelRead(ctx, msg);}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("2");super.channelRead(ctx, msg);}});pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}});pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("3");/*ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes())):从当前处理器往前找ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes())):从尾部处理器往前找*/ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));// ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));}});pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("5");super.write(ctx, msg, promise);}});pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("6");super.write(ctx, msg, promise);}});}}).bind(8080);}
}
embeddedChannel(测试channel)
@Slf4j
public class TestEmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("2");super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("3");super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}};EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);// 模拟入站操作channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));// 模拟出站操作channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));}
}
ByteBuf
创建ByteBuf
public class Test01ByteBuf {@Testpublic void createByteBuf() {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();log(buffer); // PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)StringBuilder sb = new StringBuilder();for(int i = 0; i < 300; ++i) {sb.append("a");}buffer.writeBytes(sb.toString().getBytes()); // 写入log(buffer); // PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)}public void log(ByteBuf buffer) {int length = buffer.readableBytes();int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;StringBuilder buf = new StringBuilder(rows * 80 * 2).append("read index:").append(buffer.readerIndex()).append(" write index:").append(buffer.writerIndex()).append(" capacity:").append(buffer.capacity()).append(NEWLINE);appendPrettyHexDump(buf, buffer);System.out.println(buf.toString());}
}
netty中的ByteBuf是可以扩容的
直接内存 vs 堆内存
创建堆内存:
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
创建直接内存(默认):
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
直接内存创建和销毁的代价高,但是读写性能高(少一次内存复制)
直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理
池化 vs 非池化
池化的意义:可以重用ByteBuf(类似于连接池)
- 如果没有池化,每次都要创建新的ByteBuf,代价较高
- 如果有池化,可以重用池中的ByteBuf,高并发时池化也会更节约内存,减少内存溢出
池化功能默认开启
组成
ByteBuf由四个变量组成:最大容量、容量、都指针、写指针
这四个变量划分成四个部分:可扩容部分、可写部分、可读部分、废弃部分
最大容量:默认整数最大值
容量和最大容量之间: 可扩容的字节(在容量不够时才去申请更多的内存)
最开始读写指针都在0位置
读指针到写指针之间:可读部分
0位置到都指针:废弃部分
写入
扩容规则
- 如果写入后数据 < 512,则选择下一个16整数倍(例:写入后大小为12,扩容后capacity为16)
- 如果写入后数据 > 512,则选择下一个
2^n
(例:写入后大小为513,扩容后capatity是2^10,因为2^9 = 512已经不够用了
) - 扩容不能超过max capacity,否则会报错
读取
@Test
public void readByteBuf() {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();System.out.println(buffer); // PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)StringBuilder sb = new StringBuilder();for(int i = 0; i < 300; ++i) {sb.append("a");}buffer.writeBytes(sb.toString().getBytes()); // 写入System.out.println(buffer); // PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer); // PooledUnsafeDirectByteBuf(ridx: 4, widx: 300, cap: 512)
}
retain & release
Netty中堆外内存最好手动释放,而不是等GC垃圾回收
- UnpooledHeapByteBuf:使用JVM内存,等GC回收
- UnpooledDirectByteBuf:直接内存(使用特殊的方法回收内存)
- PooledByteBuf和它的子类:池化机制(更复杂的规则)
引用计数法
Netty使用引用计数法来控制回收内存,每个ByteBuf都实现了RefrenceCounted接口
- 每个ByteBuf对象的初始计数为1
- 调用
release()
方法计数减1,如果计数为0,ByteBuf内存被回收 - 调用
retain()
方法计数加1,为了防止调用者没用完之前,其他handler调用了release()导致回收 - 当计数器为0,底层内存被回收(计数ByteBuf对象还在,其他方法都没法使用)
【总结
】release:谁是最后使用者,谁负责release()
slice
@Test
public void sliceBuf1() {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});log(buf); // abcdefghij// 在切片过程中没有发生数据的复制ByteBuf f1 = buf.slice(0, 5); // 从下标0开始,切5个ByteBuf f2 = buf.slice(5, 5); // 从下标5开始,切5个log(f1); // abcdelog(f2); // fghijf1.setByte(0, 'z');log(f1); // zbcdelog(buf); // zbcdefghij
}
slice()不会发生数据的复制
不允许往切片后的ByteBuf中写入数据,否则会报错
@Test
public void sliceBuf2() {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});log(buf); // abcdefghijByteBuf f1 = buf.slice(0, 5);// 释放原有内存buf.release();log(f1); // 这行代码会报错
}
如果释放了原有内存,通过原有内存获取的切片就会报错
@Test
public void sliceBuf3() {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});log(buf);ByteBuf f1 = buf.slice(0, 5);// 让f1的引用计数 + 1f1.retain();// 释放原有内存buf.release(); // f1的引用计数-1,buf的引用计数也-1log(f1);
}
如果想释放原有内存,但是切片还能用的话,可以使用retain(),先让切片内存+1
composite
@Test
public void compositeByteBuf() {ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});/*ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();buffer.writeBytes(buf1).writeBytes(buf2); // writeBytes会发生数据复制,这行代码会发生两次数据复制*/CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();buffer.addComponents(true, buf1, buf2); // 避免了内存的复制log(buffer);
}
虽然可以避免内存复制,但是他需要对这两个ByteBuf的读写指针进行重新的计算
ByteBuf优势
- 池化:可以重用池中ByteBuf实例,更节约内存,减少内存溢出
- 读写指针分离,不需要像ByteBuffer一样切换读写模式
- 自动扩容
- 支持链式调用
- 很多方法都可以支持零拷贝(slice、duplicate、CompositeByteBuf)