当前位置: 首页 > news >正文

【Netty】- 入门2

Future & Promise

netty中的Future是继承自jdk中的Future;netty中的Promise接口是继承自netty中的Future接口。

  • jdk Future:只能同步等待任务结束(成功 或 失败)才能得到结果
  • netty Future:可以同步等待任务结束后得到结果,也可以异步方式得到结果,但是都需要等任务结束
  • netty Promise:不仅有netty Future功能,也脱离独立任务存在,

jdkFuture

@Slf4j
public class Test01JdkFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 线程池ExecutorService service = Executors.newFixedThreadPool(2);// 2. 提交任务Future<Integer> future = service.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("执行计算");Thread.sleep(1000);return 50;}});// 3. 主线程通过future获取结果log.debug("等待结果");log.debug("结果是:{}", future.get()); // 阻塞方法(同步等待)}
}

理解:相当于另一个线程给主线程了一个背包,把这个线程返回的结果装到背包里。(这个过程对于主线程来说是被动的,因为主线程没法装结果,执行线程才能装结果)

nettyFuture

同步方式

@Slf4j
public class Test02NettyFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("执行计算");Thread.sleep(1000);return 50;}}); // 返回netty包下的Futurelog.debug("等待结果");log.debug("结果是:{}", future.get()); // 阻塞方法(同步等待)}
}

主线程获取结果:
在这里插入图片描述

异步方式

@Slf4j
public class Test03NettyFuture {public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.debug("执行计算");Thread.sleep(1000);return 50;}}); // 返回netty包下的Futurefuture.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {log.debug("结果是:{}", future.getNow());}});}
}

由执行者(nio线程)获取结果
在这里插入图片描述

nettyPromise

@Slf4j
public class Test04NettyPromise {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 准备EventLoop对象NioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();// 2. 可以主动创建DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);new Thread(()->{// 3. 任意一个线程执行计算,完毕后可以向Promise填充结果System.out.println("开始计算");try {int a = 1 / 0;Thread.sleep(1000);promise.setSuccess(80);} catch (Exception e) {e.printStackTrace();promise.setFailure(e);}}).start();// 4. 接收结果log.debug("等待结果");log.debug("结果是:{}", promise.get());}
}
  • promise.setSuccess():返回的结果由主线程创建,而不是子线程创建
  • promise.setFailure():返回主线程错误的结果

Handler & Pipeline

ChannelHandler是用来处理Channel上的各种事件,分为入站、出站。所有的ChannelHandler被连成一串就是PipeLine。

  • 入站处理器:ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果
  • 出站处理器:ChannelOutboundHandlerAdapter的子类,主要对协会结果进行加工

Pipeline

在下边的代码中,

  • 入站处理器:h1、h2、h3
  • 出站处理器:h4、h5、h6

如果没有写入数据,出站处理器相当于形同虚设,是不会执行的;此时只会处理入站处理器。
只有写入数据后,出站处理器才会执行;出站处理器的执行顺序和入站处理器是反过来的
在这里插入图片描述

@Slf4j
public class Test01Pipeline {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 1. 通过channel拿到pipelineChannelPipeline pipeline = ch.pipeline();/*2. 添加处理器:netty默认会添加两个handler:headHandler -> tailHandler把处理器添加到tailHanler之前(headHandler -> h1 -> h2 -> h3 -> h4 -> h5 -> h6  -> tailHandler)*/pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");super.channelRead(ctx, msg);}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("2");super.channelRead(ctx, msg);}});pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("3");super.channelRead(ctx, msg);ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));}});pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}});pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("5");super.write(ctx, msg, promise);}});pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("6");super.write(ctx, msg, promise);}});}}).bind(8080);}
}

inBoundHandler(入站处理器)

@Slf4j
public class Test02InBoundHandler {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");ByteBuf buf = (ByteBuf) msg;String name = buf.toString(Charset.defaultCharset());super.channelRead(ctx, msg);}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {log.debug("2");Student student = new Student(name.toString());super.channelRead(ctx, student);}});pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("3,结果:{},class:{}", msg, msg.getClass());ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));}});}}).bind(8080);}@AllArgsConstructor@Datastatic class Student {private String name;}
}

outBoundHandler(出站处理器)

@Slf4j
public class Test03OutBoundHandler {public static void main(String[] args) {new ServerBootstrap().group(new NioEventLoopGroup()).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");super.channelRead(ctx, msg);}});pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("2");super.channelRead(ctx, msg);}});pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}});pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("3");/*ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes())):从当前处理器往前找ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes())):从尾部处理器往前找*/ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));// ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));}});pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("5");super.write(ctx, msg, promise);}});pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("6");super.write(ctx, msg, promise);}});}}).bind(8080);}
}

在这里插入图片描述

embeddedChannel(测试channel)

@Slf4j
public class TestEmbeddedChannel {public static void main(String[] args) {ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("1");super.channelRead(ctx, msg);}};ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("2");super.channelRead(ctx, msg);}};ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("3");super.write(ctx, msg, promise);}};ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {log.debug("4");super.write(ctx, msg, promise);}};EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);// 模拟入站操作channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));// 模拟出站操作channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));}
}

ByteBuf

创建ByteBuf

public class Test01ByteBuf {@Testpublic void createByteBuf() {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();log(buffer); // PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)StringBuilder sb = new StringBuilder();for(int i = 0; i < 300; ++i) {sb.append("a");}buffer.writeBytes(sb.toString().getBytes()); // 写入log(buffer); // PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)}public void log(ByteBuf buffer) {int length = buffer.readableBytes();int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;StringBuilder buf = new StringBuilder(rows * 80 * 2).append("read index:").append(buffer.readerIndex()).append(" write index:").append(buffer.writerIndex()).append(" capacity:").append(buffer.capacity()).append(NEWLINE);appendPrettyHexDump(buf, buffer);System.out.println(buf.toString());}
}

netty中的ByteBuf是可以扩容的

直接内存 vs 堆内存

创建堆内存:

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

创建直接内存(默认):

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);

直接内存创建和销毁的代价高,但是读写性能高(少一次内存复制)
直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理

池化 vs 非池化

池化的意义:可以重用ByteBuf(类似于连接池)

  • 如果没有池化,每次都要创建新的ByteBuf,代价较高
  • 如果有池化,可以重用池中的ByteBuf,高并发时池化也会更节约内存,减少内存溢出

池化功能默认开启

组成

ByteBuf由四个变量组成:最大容量、容量、都指针、写指针
这四个变量划分成四个部分:可扩容部分、可写部分、可读部分、废弃部分
在这里插入图片描述

最大容量:默认整数最大值
容量和最大容量之间: 可扩容的字节(在容量不够时才去申请更多的内存)
最开始读写指针都在0位置
读指针到写指针之间:可读部分
0位置到都指针:废弃部分

写入

在这里插入图片描述

扩容规则

  1. 如果写入后数据 < 512,则选择下一个16整数倍(例:写入后大小为12,扩容后capacity为16)
  2. 如果写入后数据 > 512,则选择下一个2^n(例:写入后大小为513,扩容后capatity是2^10,因为2^9 = 512已经不够用了
  3. 扩容不能超过max capacity,否则会报错

读取

@Test
public void readByteBuf() {ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();System.out.println(buffer); // PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)StringBuilder sb = new StringBuilder();for(int i = 0; i < 300; ++i) {sb.append("a");}buffer.writeBytes(sb.toString().getBytes()); // 写入System.out.println(buffer); // PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer.readByte());System.out.println(buffer); // PooledUnsafeDirectByteBuf(ridx: 4, widx: 300, cap: 512)
}

retain & release

Netty中堆外内存最好手动释放,而不是等GC垃圾回收

  • UnpooledHeapByteBuf:使用JVM内存,等GC回收
  • UnpooledDirectByteBuf:直接内存(使用特殊的方法回收内存)
  • PooledByteBuf和它的子类:池化机制(更复杂的规则)

引用计数法

Netty使用引用计数法来控制回收内存,每个ByteBuf都实现了RefrenceCounted接口

  • 每个ByteBuf对象的初始计数为1
  • 调用release()方法计数减1,如果计数为0,ByteBuf内存被回收
  • 调用retain()方法计数加1,为了防止调用者没用完之前,其他handler调用了release()导致回收
  • 当计数器为0,底层内存被回收(计数ByteBuf对象还在,其他方法都没法使用)

总结】release:谁是最后使用者,谁负责release()

slice

@Test
public void sliceBuf1() {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});log(buf); // abcdefghij// 在切片过程中没有发生数据的复制ByteBuf f1 = buf.slice(0, 5); // 从下标0开始,切5个ByteBuf f2 = buf.slice(5, 5); // 从下标5开始,切5个log(f1); // abcdelog(f2); // fghijf1.setByte(0, 'z');log(f1); // zbcdelog(buf); // zbcdefghij
}

slice()不会发生数据的复制
不允许往切片后的ByteBuf中写入数据,否则会报错

@Test
public void sliceBuf2() {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});log(buf); // abcdefghijByteBuf f1 = buf.slice(0, 5);// 释放原有内存buf.release();log(f1); // 这行代码会报错
}

如果释放了原有内存,通过原有内存获取的切片就会报错

@Test
public void sliceBuf3() {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});log(buf);ByteBuf f1 = buf.slice(0, 5);// 让f1的引用计数 + 1f1.retain();// 释放原有内存buf.release(); // f1的引用计数-1,buf的引用计数也-1log(f1); 
}

如果想释放原有内存,但是切片还能用的话,可以使用retain(),先让切片内存+1

composite

@Test
public void compositeByteBuf() {ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});/*ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();buffer.writeBytes(buf1).writeBytes(buf2); // writeBytes会发生数据复制,这行代码会发生两次数据复制*/CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();buffer.addComponents(true, buf1, buf2); // 避免了内存的复制log(buffer);
}

虽然可以避免内存复制,但是他需要对这两个ByteBuf的读写指针进行重新的计算

ByteBuf优势

  1. 池化:可以重用池中ByteBuf实例,更节约内存,减少内存溢出
  2. 读写指针分离,不需要像ByteBuffer一样切换读写模式
  3. 自动扩容
  4. 支持链式调用
  5. 很多方法都可以支持零拷贝(slice、duplicate、CompositeByteBuf)
http://www.xdnf.cn/news/566425.html

相关文章:

  • dify基于文本模型实现微调Fine-tune语料构造工作流
  • 在 Ubuntu 下通过 C APP程序实现串口发送数据并接收返回数据
  • OSCP备战-Stapler靶场详细步骤
  • 用java实现内网通讯,可多开客户端链接同一个服务器
  • 离线服务器算法部署环境配置
  • 深度解析 Element Plus
  • Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
  • naive-ui切换主题
  • 基于RT-Thread的STM32F4开发第六讲——PWM输出(CH1和CH1N)
  • DevOps学习回顾03-ops三部曲之配置管理(CM)
  • C++核心编程_初始化列表
  • Unity3D序列化机制详解
  • 云计算与大数据进阶 | 28、存储系统如何突破容量天花板?可扩展架构的核心技术与实践—— 分布式、弹性扩展、高可用的底层逻辑(下)
  • 游戏盾功能与技术解析
  • 电力设备制造企业数字化转型路径研究:从生产优化到生态重构
  • SpringBoot3+Vue3(2)-前端基本页面配置-登录界面编写-Axios请求封装-后端跨越请求错误
  • 【Java高阶面经:微服务篇】4.大促生存法则:微服务降级实战与高可用架构设计
  • 使用计算机视觉实现目标分类和计数!!超详细入门教程
  • uni-app(2):页面
  • 用python实现汉字转拼音工具
  • 【AI News | 20250521】每日AI进展
  • 【Java高阶面经:微服务篇】9.微服务高可用全攻略:从架构设计到自动容灾
  • Ajax快速入门教程
  • OpenCV CUDA模块特征检测与描述------用于创建一个最大值盒式滤波器(Max Box Filter)函数createBoxMaxFilter()
  • PostgreSQL日志维护
  • 阿里云合集(不定期更新)
  • 适合初学者的 Blender 第二部分
  • 1.4 C++之运算符与表达式
  • leetcode hot100刷题日记——8.合并区间
  • java综合交易所13国语言,股票,区块链,外汇,自带客服系统运营级,有测试