02-netty基础-java四种IO模型
1 阻塞IO(Blocking IO)
1.1 工作机制
从应用程序发起调用到内核空间准备好数据、拷贝数据到用户空间,然后将数据返回给应用程序,这期间应用程序这块都是阻塞的,无法响应其他请求。
- 工作机制:在进行 IO 操作时(如读取数据),线程会被挂起,进入等待状态,直到数据准备好并读取完成后才会继续执行后续代码。
- 特点:实现简单,但在等待期间线程无法处理其他任务,导致资源浪费,适用于连接数少且 IO 操作耗时短的场景。
- 示例场景:传统的 Java IO 操作,如
InputStream.read()
方法调用时,如果没有数据可读,线程会一直阻塞。
socket交互的流程可以查看上一篇文章: 01-netty基础-socket-CSDN博客
1.2 代码实现
1.2.1 服务端代码
1.2.1.1 方式一单线程
处理完一个客户端请求,然后在处理下一个客户端请求
package com.bonnie.bio;import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;/*** 阻塞io服务端* 当一个客户端连接上来后,未处理完成,那么其他客户端是无法连接上来的;* 相当于串行执行,前一个执行完成才能轮到下一个执行*/
public class BlockingServer {public static void main(String[] args) throws IOException {// 第一步:首先通过ServerSocket来监听端口,我们知道,每个进程都有一个唯一的端口ServerSocket serverSocket = new ServerSocket(8080);while (true) {try {// 通过accept方法阻塞调用,直到有客户端的连接过来,就会返回SocketSocket socket = serverSocket.accept();// 获取socket的输入流BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));int port = socket.getPort();System.out.println("客户端的端口号:"+ port);// 获取客户端的数据,这个地方是一个阻塞的io,阻塞到直到数据读取完成String cliStr = bufferedReader.readLine();System.out.println("收到客户端的数据:"+ cliStr);// 获取socket的输出流BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));// 给客户端会写数据 这块结尾一定要使用\n,结束标志bufferedWriter.write("ok\n");// 刷新bufferedWriter.flush();} catch (Exception e) {e.printStackTrace();}}}}
1.2.1.2 方式二线程池
来一个客户端的请求,开启一个新线程,从而可以达到同时处理多个请求;
因为accept方法会阻塞等待客户端的连接,导致一个线程只能处理一个连接;如果想要处理多个连接,就要使用线程池来处理连接,但是这个是非常消耗线程的,线程是非常宝贵的资源,除非是机器性能很好,一般不建议采用
package com.bonnie.bio;import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** 阻塞io服务端* 当一个客户端连接上来后,未处理完成,那么其他客户端是无法连接上来的;* 使用多线程,将接收到的客户端请求放入都到线程池中,进而看到多个客户端可以同时连接和处理的现象*/
public class ThreadBlockingServer {static ExecutorService executorService = Executors.newCachedThreadPool();public static void main(String[] args) throws IOException {// 第一步:首先通过ServerSocket来监听端口,我们知道,每个进程都有一个唯一的端口ServerSocket serverSocket = new ServerSocket(8080);while (true) {// 通过accept方法阻塞调用,直到有客户端的连接过来,就会返回SocketSocket socket = serverSocket.accept();// 接收到客户端的请求,将请求放到线程池中,一个客户端一个线程,【创建线程消耗资源消耗时间、线程资源也比较珍贵】executorService.execute(()-> {try {// 获取socket的输入流BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));int port = socket.getPort();System.out.println("客户端的端口号:"+ port);// 获取客户端的数据,这个地方是一个阻塞的io,阻塞到直到数据读取完成String cliStr = bufferedReader.readLine();System.out.println("收到客户端的数据:"+ cliStr);// 获取socket的输出流BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));// 给客户端会写数据bufferedWriter.write("ok\n");// 刷新bufferedWriter.flush();} catch (Exception e) {e.printStackTrace();}});}}}
12.2 客户端代码
package com.bonnie.bio;import java.io.*;
import java.net.Socket;/*** 阻塞io客户端*/
public class BlockingClient {public static void main(String[] args) throws IOException {Socket socket = new Socket("127.0.0.1", 8080);BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));// 这块结尾一定要使用\n,结束标志bufferedWriter.write("你好我是客户端 \n");bufferedWriter.flush();BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));String s = bufferedReader.readLine();System.out.println("服务端写回的数据: " + s);}}
1.2.3 码云位置
git地址: https://gitee.com/huyanqiu6666/netty.git 分支: 250721-io
2 非阻塞IO(No Blocking IO)
2.1 工作机制
阻塞IO:发起系统调用后,直到内核有数据才会返回数据,在这个期间,线程一直阻塞。
非阻塞IO:发起系统调用后,无论内核中数据是否准备好,都不再阻塞应用线程,而是反复轮询直到数据准备好。下图就是描述了非阻塞IO的流程
- 工作机制:线程在发起 IO 操作后会立即返回一个状态值(如
-1
表示数据未准备好),线程不会被阻塞,可以继续执行其他任务。之后线程需要不断轮询检查 IO 操作的状态,直到数据准备好。 - 特点:线程在等待期间可以处理其他任务,提高了资源利用率,但频繁的轮询会消耗 CPU 资源。
- 示例场景:在 Java 中,可以通过设置
socket.setSoTimeout(1000)
将 Socket 设置为非阻塞模式,然后循环调用read()
方法检查数据是否就绪
2.2 代码实现
2.2.1 服务端代码
package com.bonnie.noblocking;import org.apache.commons.compress.utils.Lists;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;/*** 非阻塞IO: 一个线程可以处理多个连接* 定时轮询:询问客户端是否有数据进来,每次都要询问,消耗时间,消耗资源,*/
public class NoBlockingServer {static List<SocketChannel> clients = Lists.newArrayList();public static void main(String[] args) throws IOException {// 得到一个serverSocketChannel管道,这个就等同于serverSocket,只不过这个是支持异步并且可以同时读写ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 我们想要socket为非阻塞,通过设置该值为false就是为非阻塞serverSocketChannel.configureBlocking(Boolean.FALSE);// 绑定端口serverSocketChannel.socket().bind(new InetSocketAddress(8080));while (true) {try {// 接收客户端的请求,调用accept,由于设置成非阻塞了,所以accept将不会阻塞在这里等客户端的连接过来SocketChannel socketChannel = serverSocketChannel.accept();if (socketChannel != null) {// 同时也设置socketChannel为非阻塞,因为原来我们读取数据read方法也是阻塞的socketChannel.configureBlocking(Boolean.FALSE);clients.add(socketChannel);System.out.println("客户端端口:" + socketChannel.socket().getPort());} else {Thread.sleep(3 * 1000);System.out.println("没有连接,请等待!!!");}// 主线程处理多个客户端的连接 假设有10个客户端for (SocketChannel client : clients) {// channel中的数据都是先读取到buffer中,也都先写入到buffer中,所以定义一个ByteBufferByteBuffer byteBuffer = ByteBuffer.allocate(1024);// 数据读取到缓冲区,由于上面设置了非阻塞,此时的read将不会阻塞// 一直循环调用read,看是否有数据存在===> 调用10次read===>就是一次系统调用,10次系统调用, 消耗时间消耗资源int num = client.read(byteBuffer);if (num>0) {System.out.println("收到客户端数据:" + new String(byteBuffer.array(), StandardCharsets.UTF_8));socketChannel.write(ByteBuffer.wrap("你好我是服务端\n".getBytes(StandardCharsets.UTF_8)));} else {System.out.println("等待客户端写数据!!!");}}} catch (Exception e) {e.printStackTrace();}}}}
2.2.2 客户端代码
package com.bonnie.noblocking;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;public class NoBlockingClient {public static void main(String[] args) throws IOException, InterruptedException {SocketChannel socketChannel = SocketChannel.open();// 连接服务器socketChannel.connect(new InetSocketAddress("127.0.0.1",8080));// 发送消息到服务端socketChannel.write(ByteBuffer.wrap("你好我是客户端\n".getBytes(StandardCharsets.UTF_8)));// 接收服务端消息ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int num = socketChannel.read(byteBuffer);if (num>0) {// 设置读取到末尾,并且重置位置byteBuffer.flip();System.out.println("服务端写回的数据: " + new String(byteBuffer.array(), StandardCharsets.UTF_8));}}}
2.3 码云位置
git地址: https://gitee.com/huyanqiu6666/netty.git 分支: 250721-io
2.4 存在的问题
如下图:多个客户端访问服务端,就看到一个服务端的一个线程可以同时处理多个请求,,由于是非阻塞的,所以每个客户端都会去调用read,看数据是否准备好,进而会导致很多无用的系统调用,非常的浪费资源;如果有1W个客户端只有1个客户端准备好了,资源会造成极大的浪费。
2.5 如何解决
可以采用如下图的方式,使用多路复用器,这种方式可以监听到有数据到来的IO,然后触发下一个请求;由原来的轮询所有找出有数据的IO,变成了只监听有数据的IO,性能得到了大大的提升。
多路复用(Multiplexing)是一种让单个实体能同时管理多个资源的技术方案。在 IO 编程的范畴内,多路复用指的是由单个线程借助 Selector(选择器)来监管多个 IO 通道(像网络连接这类),一旦某个通道有 IO 事件(例如数据可读)发生,就能及时对其进行处理。
工作原理
多路复用的运行机制主要包含以下几个步骤:
- 注册通道:把所有需要监控的 IO 通道都注册到 Selector 上,并且为每个通道指定想要监控的事件类型,比如读事件或者写事件。
- 阻塞等待:Selector 会进入阻塞状态,一直等到至少有一个注册的通道发生了 IO 事件。
- 事件分发:当有 IO 事件出现时,Selector 会返回发生事件的通道集合,随后线程会对这些事件进行处理。
应用场景
多路复用技术在以下场景中尤为适用:
- 高并发连接:在需要处理大量并发连接的场景下,比如聊天服务器、Web 服务器等,多路复用技术能够充分发挥其优势。
- 连接活跃度低:当大量连接处于空闲状态,只是偶尔有 IO 操作时,多路复用技术可以高效地管理这些连接。
- 资源受限环境:在系统资源有限的情况下,无法为每个连接都分配一个独立的线程,此时多路复用技术就成为了理想的选择。
3 NIO(New IO)
- 工作机制:基于 Selector(选择器)和 Channel(通道)实现。多个 Channel 可以注册到一个 Selector 上,Selector 会不断轮询这些 Channel,当某个 Channel 有数据就绪时,会通知线程进行处理。
- 特点:单线程可以处理多个连接,避免了频繁创建和销毁线程的开销,适用于连接数多但 IO 操作轻量的场景(如聊天服务器)。
- 示例场景:Java NIO 包中的
Selector
、SocketChannel
和ServerSocketChannel
的组合使用。
3.1 工作机制
3.2 代码实现
3.2.1 服务端代码
package com.bonnie.newio;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** 多路复用*/
public class NewIoServer {static Selector selector;public static void main(String[] args) throws IOException {// 得到一个多路复用器selector = Selector.open();// 获取一个管道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 设置为非阻塞serverSocketChannel.configureBlocking(Boolean.FALSE);serverSocketChannel.socket().bind(new InetSocketAddress(8080));/*** 把连接事件注册到多路复用器上,通过注册不同事件处理不同的任务,* 把serverSocketChannel注册到selector上,主要是当连接到来的时候,* 由于一个Accpet事件*/serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {// 该方法阻塞,只有当有事件到来时就不会阻塞了 === 底层:多路复用selector.select();// 获取所有事件,事件都被封装成SelectionKeySet<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {// 获取到相应的事件keySelectionKey key = iterator.next();// 拿到后要删除,防止再次调用iterator.remove();// 连接事件if (key.isAcceptable()) {handleAccept(key);}// 读的就绪事件else if (key.isReadable()) {handlesRead(key);}}}}private static void handleAccept(SelectionKey selectionKey) throws IOException {// 从selector中获取serverSocketChannel,因为当初把serverSocketChannel注册到selector上,并且注册的accept事件ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();// 能到这里,一定有客户端连接过来,所以一定会连接SocketChannel socketChannel = serverSocketChannel.accept();// 设置为非阻塞socketChannel.configureBlocking(Boolean.FALSE);// 给客户端会写数据socketChannel.write(ByteBuffer.wrap("hello client. newio Server".getBytes()));// 注册read事件,等while循环再次获取read事件,然后读取socketChannel中的数据socketChannel.register(selector, SelectionKey.OP_READ);}private static void handlesRead(SelectionKey selectionKey) throws IOException {// 从selector中获取serverSocketChannelSocketChannel socketChannel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);socketChannel.read(byteBuffer);System.out.println("server receive msg:"+new String(byteBuffer.array()));}}
3.2.1 客户端代码
package com.bonnie.newio;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;/*** 多路复用*/
public class NewIoClient {static Selector selector;public static void main(String[] args) throws IOException {selector = Selector.open();SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(Boolean.FALSE);socketChannel.connect(new InetSocketAddress("localhost", 8080));// 连接事件socketChannel.register(selector, SelectionKey.OP_CONNECT);while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();iterator.remove();// 连接事件if (selectionKey.isConnectable()) {handleConnect(selectionKey);}// 读的就绪事件else if (selectionKey.isReadable()) {handleReadable(selectionKey);}}}}private static void handleConnect(SelectionKey selectionKey) throws IOException {SocketChannel socketChannel = (SocketChannel) selectionKey.channel();// 是否完成了连接,没有则建立连接if (socketChannel.isConnectionPending()) {// 建立连接socketChannel.finishConnect();}// 设置为非阻塞socketChannel.configureBlocking(Boolean.FALSE);// 给服务端写数据socketChannel.write(ByteBuffer.wrap("hello server. I am newio client".getBytes()));socketChannel.register(selector, SelectionKey.OP_READ);}private static void handleReadable(SelectionKey selectionKey) throws IOException {SocketChannel socketChannel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);socketChannel.read(byteBuffer);System.out.println("client receive msg:"+new String(byteBuffer.array()));}}
3.3 码云位置
git地址: https://gitee.com/huyanqiu6666/netty.git 分支: 250721-io
4 AIO
无论是否准备好数据,都直接返回,然后可以执行其他的任务,当数据准备完毕后,主动推送数据到应用程序。
- 工作机制:基于事件和回调机制。当发起 IO 操作时,线程会继续执行后续代码,IO 操作完成后会通过回调函数通知线程处理结果。
- 特点:真正的异步 IO,线程不需要关注 IO 操作的过程,只需处理结果,效率最高,适用于连接数多且 IO 操作耗时长的场景(如文件传输)。
- 示例场景:Java 7 引入的
AsynchronousFileChannel
和AsynchronousSocketChannel
。
3.2 代码实现
3.2.1 服务端代码
package com.bonnie.aio;import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;/*** 异步IO-服务端*/
public class AIOServer {public static void main(String[] args) throws Exception {// 创建一个serverChannel并绑定8080端口AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {@Overridepublic void completed(AsynchronousSocketChannel socketChannel, Object attachment) {try {// 打印线程的名字System.out.println("2--"+ Thread.currentThread().getName());System.out.println(socketChannel.getRemoteAddress());ByteBuffer buffer = ByteBuffer.allocate(1024);// socketChannel异步的读取数据到buffer中socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {// 打印线程的名字System.out.println("3--"+ Thread.currentThread().getName());buffer.flip();System.out.println(new String(buffer.array(), 0, result));socketChannel.write(ByteBuffer.wrap("helloClient".getBytes()));}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();;}});} catch (Exception e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}});System.out.println("1--"+ Thread.currentThread().getName());Thread.sleep(Integer.MAX_VALUE);}}
3.2.1 客户端代码
package com.bonnie.aio;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;public class AIOClient {private final AsynchronousSocketChannel client;public AIOClient() throws IOException {client = AsynchronousSocketChannel.open();}public static void main(String[] args) throws Exception{new AIOClient().connect("localhost", 8080);}private void connect(String host, int port) {// 客户端向服务端发起连接client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {@Overridepublic void completed(Void result, Object attachment) {try {client.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();System.out.println("已发送到服务端");} catch (Exception e) {throw new RuntimeException(e);}}@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}});final ByteBuffer bb = ByteBuffer.allocate(1024);// 客户端接收服务端的数据,获取的数据写入到bb中client.read(bb, null, new CompletionHandler<Integer, Object>() {@Overridepublic void completed(Integer result, Object attachment) {// 服务端返回数据的长度resultSystem.out.println("I/O操作完成:"+result);System.out.println("获取反馈:"+ new String(bb.array()));}@Overridepublic void failed(Throwable exc, Object attachment) {exc.printStackTrace();}});try {Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
3.3 码云位置
git地址: https://gitee.com/huyanqiu6666/netty.git 分支: 250721-io