当前位置: 首页 > news >正文

【Netty】- NIO基础2

阻塞模式

客户端代码

public class Client {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("localhost", 8080));// sc.write(Charset.defaultCharset().encode("helloworld")); // 可以在debug下发数据System.out.println("waiting...");}
}

服务器代码

@Slf4j
public class Server {public static void main(String[] args) throws IOException {// 使用nio来理解阻塞模式(单线程)// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 创建服务器ServerSocketChannel ssc = ServerSocketChannel.open();// 2. 绑定监听端口ssc.bind(new InetSocketAddress(8080));// 3. 连接集合List<SocketChannel> channels = new ArrayList<>();while(true) {// 4. accept建立与客户端连接,SocketChannel用来与客户端之间通信log.debug("connecting...");SocketChannel sc = ssc.accept(); // 阻塞方法,没有连接建立时,线程停止运行log.debug("connected...{}", sc);channels.add(sc);// 5. 接收客户端发送的数据for(SocketChannel channel : channels) {log.debug("before read..., {}", channel);channel.read(buffer); // 阻塞方法,没有数据发送时,线程停止运行buffer.flip(); // 切换为读模式debugRead(buffer); // 读取数据buffer.clear(); // 切换为写模式log.debug("after read..., {}", channel);}}}
}

ssc.accept() 和 channel.read(buffer) 都是阻塞方法,如果没有建立连接或者没有数据过来时,线程都会阻塞等待。
因此如果同一个客户端再发第二次数据,并不会收到第二次发送的数据,因为此时没有新的连接建立,代码已经被阻塞在ssc.accept()这里了。

非阻塞模式

@Slf4j
public class Server {public static void main(String[] args) throws IOException {// 0. ByteBufferByteBuffer buffer = ByteBuffer.allocate(16);// 1. 创建服务器ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false); // 设置ServerSocketChannel为非阻塞模式// 2. 绑定监听端口ssc.bind(new InetSocketAddress(8080));// 3. 连接集合List<SocketChannel> channels = new ArrayList<>();while(true) {// 4. accept建立与客户端连接,SocketChannel用来与客户端之间通信SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,sc是nullif(sc != null) {log.debug("connected...{}", sc);ssc.configureBlocking(false); // 设置SocketChannel为非阻塞模式channels.add(sc);}// 5. 接收客户端发送的数据for(SocketChannel channel : channels) {int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read()返回0if(read > 0) {buffer.flip(); // 切换为读模式debugRead(buffer); // 读取数据buffer.clear(); // 切换为写模式log.debug("after read..., {}", channel);}}}}
}

ssc.configureBlocking(false); // 变为非阻塞模式
非阻塞模式相当于轮询的在检查是否有新的数据、是否有新的连接,这样很消耗系统资源,一般情况下也不会使用非阻塞模式,而是使用selector

Selector(多路复用)

单线程可以配合Selector完成对多个Channel可读写事件的监控

常见的四种事件

  1. accept:在有连接请求时触发
  2. connect:客户端连接建立后触发
  3. read:可读事件
  4. write:可写事件
@Slf4j
public class Server {private static void split(ByteBuffer source) {source.flip();// 切换为读模式for(int i = 0; i < source.limit(); i++) {// 找到一条完整消息if(source.get(i) == '\n') { // get(i)不会改变position的位置int len = i - source.position() + 1;// 把这条消息存入新的ByteBuffer中ByteBuffer target = ByteBuffer.allocate(len);// 从source读,向target中写for(int j = 0; j < len; j++) {target.put(source.get());// get()会改变position的位置}debugAll(target);}}source.compact();}public static void main(String[] args) throws IOException {// 1. 创建selector,管理多个channelSelector selector = Selector.open();ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);// 2. 建立selector和channel的联系(注册)SelectionKey sscKey = ssc.register(selector, 0, null);// 事件发生后,通过它可以知道事件和哪个channel的事件(管理ServerSocketChannel)sscKey.interestOps(SelectionKey.OP_ACCEPT); // 对哪个事件感兴趣(有四种事件)log.debug("register key:{}", sscKey);ssc.bind(new InetSocketAddress(8080));while(true) {// 3. select 方法/*selector():没有事件发生 - 线程阻塞有(感兴趣的)事件发生 - 线程会恢复运行*/selector.select();// 4. 处理事件,selectedKey内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept,readwhile(iter.hasNext()) {SelectionKey key = iter.next();iter.remove(); // 处理key的时候,一定要从SelectionKeys集合中删除,否则下次处理就会有问题log.debug("key:{}", key);if (key.isAcceptable()) { // 如果是accept事件ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16);// 附件(attachment)SelectionKey scKey = sc.register(selector, 0, buffer); // 把buffer当成scKey的附属品注册倒scKey上(和channel对应)scKey.interestOps(SelectionKey.OP_READ);log.debug("{}", sc);}else if(key.isReadable()) { // 如果是读事件try {SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channelByteBuffer buffer = (ByteBuffer) key.attachment();// 获取scKey上关联的附件int read = channel.read(buffer);if(read == -1) {key.cancel(); // 如果正常断开,read返回值是-1}else {split(buffer);if(buffer.position() == buffer.limit()) { // 扩容ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); // 扩容buffer.flip(); // 切换读模式newBuffer.put(buffer); // 把旧的buffer放入新的buffer中key.attach(newBuffer); // 重新关联新的buffer到key}}}catch (IOException e) {e.printStackTrace();key.cancel(); // 因为客户端断开了,所以需要将key取消(从selector集合中删除)}}}}}
}

如果selector已经处理过事件,那么下次再来事件时,selector就会认为上一次的事件已经处理过,就会处理新的事件。
但是如果selector没有处理该事件,selector会一直认为上一次的事件还没处理,就还会处理上一次的事件(表现:一直轮询处理上一次的事件)
如果selector不想处理这次事件,可以使用key.cancel()取消事件

事件发生之后,要么处理,要么取消

selector会在发生事件后,向selectionKeys中加入key,但是不会删除

事件如果被处理,我们应该手动移除

处理客户端断开

  1. 如果客户端异常断开,此时会抛出异常,需要catch去捕获异常。
  2. 如果客户端正常断开,read返回值是-1,此时不会抛出异常。
try {SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channelByteBuffer buffer = ByteBuffer.allocate(16);int read = channel.read(buffer);if(read == -1) {key.cancel(); // 如果正常断开,read返回值是-1}else {buffer.flip();debugRead(buffer);}}catch (IOException e) {e.printStackTrace();key.cancel(); // 因为客户端断开了,所以需要将key取消(从selector集合中删除)}

处理消息边界

在这里插入图片描述

ByteBuffer的大小分配:

  • 每个channel都要记录可能被切割的消息,因为ByteBuffer不能被多个channel使用,需要每个channel都维护一个独立的ByteBuffer(附件attachment的形式)
  • ByteBuffer不能太大,因此需要设计大小可变的ByteBuffer:分配一个比较小的ByteBuffer,如果数据不够,再扩容。
while(true) {// 3. select 方法/*selector():没有事件发生 - 线程阻塞有(感兴趣的)事件发生 - 线程会恢复运行*/selector.select();// 4. 处理事件,selectedKey内部包含了所有发生的事件Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept,readwhile(iter.hasNext()) {SelectionKey key = iter.next();iter.remove(); // 处理key的时候,一定要从SelectionKeys集合中删除,否则下次处理就会有问题log.debug("key:{}", key);if (key.isAcceptable()) { // 如果是accept事件ServerSocketChannel channel = (ServerSocketChannel) key.channel();SocketChannel sc = channel.accept();sc.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16);// 附件(attachment)SelectionKey scKey = sc.register(selector, 0, buffer); // 把buffer当成scKey的附属品注册倒scKey上(和channel对应)scKey.interestOps(SelectionKey.OP_READ);log.debug("{}", sc);}else if(key.isReadable()) { // 如果是读事件try {SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channelByteBuffer buffer = (ByteBuffer) key.attachment();// 获取scKey上关联的附件int read = channel.read(buffer);if(read == -1) {key.cancel(); // 如果正常断开,read返回值是-1}else {split(buffer);if(buffer.position() == buffer.limit()) { // 扩容ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); // 扩容buffer.flip(); // 切换读模式newBuffer.put(buffer); // 把旧的buffer放入新的buffer中key.attach(newBuffer); // 重新关联新的buffer到key}}}catch (IOException e) {e.printStackTrace();key.cancel(); // 因为客户端断开了,所以需要将key取消(从selector集合中删除)}}}}

服务器写入过多内容处理

基础代码

服务器:

public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while(true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);// 1. 向客户端发送大量数据StringBuilder sb = new StringBuilder();for(int i = 0; i < 30000000; ++i) {sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());while(buffer.hasRemaining()) {int write = sc.write(buffer);// 返回实际写入次数System.out.println(write);}}}}}
}

客户端:

public class WriteClient {public static void main(String[] args) throws IOException {SocketChannel sc = SocketChannel.open();sc.connect(new InetSocketAddress("127.0.0.1", 8080));// 2. 接收数据int count = 0;while(true) {ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);count += sc.read(buffer);System.out.println(count);}}
}

这样虽然可以写大量的数据,但是效率并不高,因为发送端只要内容没发满,就会一直循环,相当于卡在当前的SocetChannel上。
改进】:发送缓冲区还没满的话,可以进行读操作;缓冲区满再写。

改进

public class WriteServer {public static void main(String[] args) throws IOException {ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector selector = Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));while(true) {selector.select();Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);SelectionKey sckey = sc.register(selector, 0, null);sckey.interestOps(SelectionKey.OP_READ);// 1. 向客户端发送大量数据StringBuilder sb = new StringBuilder();for(int i = 0; i < 3000000; ++i) {sb.append("a");}ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());int write = sc.write(buffer);// 返回实际写入次数System.out.println(write);if(buffer.hasRemaining()) {// 2. 关注可写事件sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);// 3. 把未写完的数据挂到sckey上sckey.attach(buffer);}}else if(key.isWritable()) {ByteBuffer buffer = (ByteBuffer) key.attachment();SocketChannel sc = (SocketChannel) key.channel();int write = sc.write(buffer);// 返回实际写入次数System.out.println(write);// 4. 清理操作if(!buffer.hasRemaining()) { // buffer为空key.attach(null); // 清除bufferkey.interestOps(key.interestOps() - SelectionKey.OP_WRITE); // 不去关注可写事件}}}}}
}

利用多线程优化

@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {Thread.currentThread().setName("boss");ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);Selector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, 0, null);bossKey.interestOps(SelectionKey.OP_ACCEPT);ssc.bind(new InetSocketAddress(8080));// 1. 创建固定数量的worker并初始化Worker[] workers = new Worker[2];for(int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-" + i);}AtomicInteger idx = new AtomicInteger(0);while(true) {boss.select();Iterator<SelectionKey> iter = boss.selectedKeys().iterator();while(iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {SocketChannel sc = ssc.accept();sc.configureBlocking(false);log.debug("connected...{}", sc.getRemoteAddress());// 2. 关联selectorlog.debug("before register...{}", sc.getRemoteAddress());// 负载均衡算法workers[idx.getAndIncrement() % workers.length].register(sc); // 初始化selector(boss调用)log.debug("after register...{}", sc.getRemoteAddress());}}}}static class Worker implements Runnable {private Thread thread;private Selector selector;private String name;private volatile boolean start = false; // 还未初始化private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>(); // 线程间传递队列public Worker(String name) {this.name = name;}// 初始化线程和selectorpublic void register(SocketChannel sc) throws IOException {if(!start) {thread = new Thread(this, name);selector = Selector.open();thread.start();}// 向队列中添加任务,但是任务并没有执行queue.add(()->{try {sc.register(selector, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {throw new RuntimeException(e);}});selector.wakeup(); // 唤醒selector}@Overridepublic void run() {while(true) {try {selector.select();Runnable task = queue.poll();if(task != null) {task.run(); // 执行任务里的代码}Iterator<SelectionKey> iter = selector.selectedKeys().iterator();while(iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isReadable()) {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel channel = (SocketChannel) key.channel();log.debug("read...{}", channel.getRemoteAddress());channel.read(buffer);buffer.flip();debugAll(buffer);}}} catch (IOException e) {throw new RuntimeException(e);}}}}
}

NIO vs BIO

stream vs channel

  • stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区
  • stream只支持阻塞API,channel同时支持阻塞、非阻塞API,channel可以配合selector实现多路复用
  • 二者均为全双工,读写可以同时进行

IO模型

同步阻塞、同步非阻塞、多路复用(本质也是同步的)、异步阻塞、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,由其他线程送结果(至少两个线程)

异步阻塞是错误的

AIO

AIO用来解决数据复制阶段的阻塞问题

@Slf4j
public class AioFileChannel {public static void main(String[] args) {try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {/*参数1:ByteBuffer参数2:读取的起始位置参数3:附件参数4:回调对象*/ByteBuffer buffer = ByteBuffer.allocate(16);log.debug("read begin...");channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Override // read成功public void completed(Integer result, ByteBuffer attachment) {log.debug("read completed...");attachment.flip();debugAll(attachment);}@Override // read失败public void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();}});log.debug("read end...");} catch (IOException e) {e.printStackTrace();}}
}
http://www.xdnf.cn/news/555661.html

相关文章:

  • 易境通海外仓系统PDA蓝牙面单打印:解锁库内作业新姿势
  • 【MySQL成神之路】运算符总结
  • day 31
  • STM32之定时器(TIMER)与脉冲宽度调制(PWM)
  • Glasgow Smile: 2靶场渗透
  • PostGIS栅格数据类型解析【raster】
  • 【深入理解索引扩展—1】提升智能检索系统召回质量的3大利器
  • 详解ip地址、子网掩码、网关、广播地址
  • 系统编程的标准IO
  • 【LINUX操作系统】日志系统——自己实现一个简易的日志系统
  • 容器环境渗透测试工具(docker渗透测试工具、kubernetes)
  • 一文掌握vue3基础,适合自学入门案例丰富
  • FreeRTOS学习笔记【11】-----任务列表
  • 第40天-Python开发音乐播放器完整指南
  • 左右边界策略
  • 前端读取本地项目中 public/a.xlsx 文件中的数据 vue3
  • Linux管道工具
  • 全能签软件的由来和介绍
  • MRVG-Net论文精读
  • Linux周测(一)
  • 龙虎榜——20250520
  • vue3+elementPlus穿梭框拖拽
  • MONA:5%参数微调超越全量微调,CVPR2025新型视觉适配器
  • Linux学习心得问题整理(二)
  • 工业智能网关在工业锅炉安全监控中的组网配置指南
  • C++ QT 与 win32 窗口可以互操作
  • HarmonyOS5云服务技术分享--ArkTS开发函数
  • SpringBootDay1|面试题
  • 环特生物董事长汤珣、执行总经理张勇赴白云美湾国际化妆品研究院集群考察调研
  • ES6核心特性与语法