当前位置: 首页 > backend >正文

reactor模型

文章目录

    • BIO
      • BioServer
      • BioClient
    • NIO
      • NioServer
      • NioClient

BIO

BioServer

@Slf4j
public class BioServer {public static void main(String[] args) {//由Acceptor线程负责监听客户端的连接ServerSocket serverSocket = null;try {// 执行完,服务器启动成功(在此阻塞,直到启动成功或抛出异常)serverSocket = new ServerSocket(8888);System.out.println("服务端启动监听.......");while (true) {//Acceptor线程接收到客户端连接请求之后为每个客户端创建一个新的线程进行业务处理(在此阻塞,直到接收到1个客户端连接或抛出异常)Socket socket = serverSocket.accept();System.out.println("成功接收一个客户端连接:"+socket.getInetAddress());new Thread(new ServerHandler(socket)).start();}} catch (IOException e) {log.error("服务端发生异常", e);}finally {if (serverSocket!=null) {try {serverSocket.close();} catch (IOException e) {e.printStackTrace();}}}}
}@Slf4j
class ServerHandler implements  Runnable{private final Socket socket;public ServerHandler(Socket socket) {this.socket = socket;}public void run() {BufferedReader in = null;BufferedWriter out = null;try {//获取客户端的输入流in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));out = new BufferedWriter(new OutputStreamWriter(this.socket.getOutputStream()));System.out.println("准备接收来自客户端:"+this.socket.getInetAddress()+"的数据");//读取客户端发送过来的数据while (true) {// 当客户端调用socket.close()时,line会为null;// 但当客户端突然关闭(比如代码执行完,客户端直接退出),此时readLine()会抛出异常String line = in.readLine();if (line == null) {log.info("读取内容是null");break;}System.out.println("成功接收来自客户端的数据:"+ line);//进行业务处理//给客户端响应数据out.write("success! i am server \n");out.flush();}} catch (IOException e) {if (in != null) {try {in.close();} catch (IOException ioException) {ioException.printStackTrace();}}if (out != null) {try {out.close();} catch (IOException ioException) {ioException.printStackTrace();}}}}
}

BioClient

public class BioClient {public static void main(String[] args) {Socket socket = null;BufferedReader in = null;BufferedWriter out = null;try {// 当 new Socket("127.0.0.1", 8080) 构造函数成功返回时,意味着客户端Socket已经与服务器成功建立了TCP连接socket = new Socket("127.0.0.1",8888);in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));System.out.println("准备向服务端写数据!");//向服务端写数据out.write("hello server , i am client ! \n");//注意别丢 \n 因为服务端是readLineout.flush();//接收来自服务端的数据String line = in.readLine();System.out.println("成功接收到来自服务端的数据:"+line);// 可以选择在此处给服务端1个友好的关闭信号// socket.close();} catch (IOException e) {if (in != null) {try {in.close();} catch (IOException ioException) {ioException.printStackTrace();}}if (out != null) {try {out.close();} catch (IOException ioException) {ioException.printStackTrace();}}if (socket != null) {try {socket.close();} catch (IOException ioException) {ioException.printStackTrace();}}}}
}

NIO

NioServer

public class NioServer {/*** 基于 Channel开发** @param args*/public static void main(String[] args) {try {//1、打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道(代表客户端连接的管道都是通过它创建的)ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//2、绑定监听端口,设置连接为非阻塞模式serverSocketChannel.socket().bind(new InetSocketAddress(8888));serverSocketChannel.configureBlocking(false);//3、创建多路复用器SelectorSelector selector = Selector.open();//4、将ServerSocketChannel注册到selector上,监听客户端连接事件ACCEPTserverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服务端已成功启动,可以接收连接!");//5、创建 Reactor线程,让多路复用器在 Reactor 线程中执行多路复用程序new Thread(new SingleReactor(selector)).start();} catch (IOException e) {e.printStackTrace();}}
}@Slf4j
class SingleReactor implements Runnable {private final Selector selector;public SingleReactor(Selector selector) {this.selector = selector;}public void run() {//6、selector轮询准备就绪的事件while (true) {try {selector.select(1000);// 获取到所查询到的感兴趣的事件集Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {// 关于这个SelectionKey有几个需要理解的点// 1. select()时,当注册到selector的channel中发生了感兴趣的事件时,就会返回代表channel和selector注册关系的selectionKey,//    这个selectionKey就是channel注册到selector时返回的,是同一对象。//    通过selectionKey可以直到是何种感兴趣的事件,这个事件有可能是多个。// 2. 既然产生了感兴趣的事件,那么这个事件就必须得到处理,否则下次select()查询时,由于仍然有感兴趣的事件,所以不会阻塞住,必须处理掉这个事件。// 3. 必须将当前处理的key从key集合中移除掉,假设处理了事件,但并不移除这个key,那么下次select()查询会阻塞,但当其它channel上发生的感兴趣事件时,//    此时就不阻塞了,然后再调用selector.selectedKeys()方法,会把上次没有移除的这个key也给返回在key集合中,但是那个对应的事件实际上已经被处理了。//    所以这个selectedKeys集合,在jdk底层是不会帮我们自动移除的,它只会在注册的channel发生了感兴趣的事件时,会把这个channel对应的selectionKey放入到selectedKeys集合SelectionKey selectionKey = iterator.next();iterator.remove();try {processKey(selectionKey);} catch (IOException e) {log.error("处理selectionKey发生异常", e);selectionKey.cancel();SelectableChannel channel = selectionKey.channel();if (channel != null) {channel.close();}}}} catch (IOException e) {log.info("服务端发生异常", e);}}}private void processKey(SelectionKey key) throws IOException {if (key.isValid()) {//7、根据准备就绪的事件类型分别处理if (key.isAcceptable()) { //客户端请求连接事件就绪//7.1、接收一个新的客户端连接,创建对应的SocketChannel,ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();// 此处accept方法会以非阻塞方式执行SocketChannel socketChannel = serverSocketChannel.accept();//7.2、设置socketChannel的非阻塞模式,并将其注册到Selector上,监听读事件// 只有非阻塞模式的socketChannel才能注册到selector上socketChannel.configureBlocking(false);// 此处注册时,可以指定携带1个附件socketChannel.register(this.selector, SelectionKey.OP_READ);}if (key.isReadable()) {//读事件准备继续//7.1、读客户端发送过来的数据SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);// 1、将socketChannel的数据读到readBuffer中// 2、read仍然以非阻塞方式执行// 3、如果1次没有读完接收缓冲区中的数据,则下次仍然会触发可读事件,可以接着读// 4、如果接收缓冲区中没有数据,则立即返回0(比如说,我先一次全部读完了,我又去调用这个方法去读一遍,此时缓冲区中已经没有数据可读了)// 5、当客户端给了1个关闭的信号时,此时会返回-1// 6、如果客户端没给关闭信号,就直接退出了,这时去读就会抛出异常(比如客户端执行完所有代码就退出了)int readBytes = socketChannel.read(readBuffer);//前面设置过socketChannel是非阻塞的,故要通过返回值判断读取到的字节数if (readBytes > 0) {readBuffer.flip();//读写模式切换byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);String msg = new String(bytes, "utf-8");//进行业务处理String response = doService(msg);//给客户端响应数据System.out.println("服务端开始向客户端响应数据");byte[] responseBytes = response.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(responseBytes.length);writeBuffer.put(responseBytes);writeBuffer.flip();// 1、以非阻塞方式写出数据给客户端,意思就是发送缓冲区中当前能写多少量,反正不可能超过这个量// 2、这里不一定能够一次就能把writeBuffer中的数据全部写给客户端// 3、返回的int表示写了多少// 4、所以如果没写完,还需监听可写事件,然后将未写完的数据继续写出去socketChannel.write(writeBuffer);} else if (readBytes < 0) {//值为-1表示链路通道已经关闭key.cancel();socketChannel.close();} else {//没读取到数据,忽略log.warn("没读取到数据,忽略");}}}}private String doService(String msg) {System.out.println("成功接收来自客户端发送过来的数据:" + msg);return msg + "---from nioserver";}}

NioClient

public class NioClient {public static void main(String[] args) {try {//1、窗口客户端SocketChannel,绑定客户端本地地址(不选默认随机分配一个可用地址)SocketChannel socketChannel = SocketChannel.open();//2、设置非阻塞模式,socketChannel.configureBlocking(false);//3、创建SelectorSelector selector = Selector.open();//3、创建Reactor线程new Thread(new SingleReactorClient(socketChannel, selector)).start();} catch (IOException e) {e.printStackTrace();}}
}@Slf4j
class SingleReactorClient implements Runnable {private final SocketChannel socketChannel;private final Selector selector;public SingleReactorClient(SocketChannel socketChannel, Selector selector) {this.socketChannel = socketChannel;this.selector = selector;}public void run() {try {//连接服务端doConnect(socketChannel, selector);} catch (IOException e) {e.printStackTrace();System.exit(1);}//5、多路复用器执行多路复用程序while (true) {try {selector.select(1000);Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey selectionKey = iterator.next();processKey(selectionKey);iterator.remove();}} catch (IOException e) {e.printStackTrace();}}}private void doConnect(SocketChannel sc, Selector selector) throws IOException {System.out.println("客户端成功启动,开始连接服务端");//3、连接服务端boolean connect = sc.connect(new InetSocketAddress("127.0.0.1", 8888));//4、将socketChannel注册到selector并判断是否连接成功,连接成功监听读事件,没有继续监听连接事件System.out.println("connect=" + connect);if (connect) {sc.register(selector, SelectionKey.OP_READ);System.out.println("客户端成功连上服务端,准备发送数据");//开始进行业务处理,向服务端发送数据doService(sc);} else {sc.register(selector, SelectionKey.OP_CONNECT);}}private void processKey(SelectionKey key) throws IOException {if (key.isValid()) {//6、根据准备就绪的事件类型分别处理if (key.isConnectable()) {//服务端可连接事件准备就绪SocketChannel sc = (SocketChannel) key.channel();if (sc.finishConnect()) {//6.1、向selector注册可读事件(接收来自服务端的数据)sc.register(selector, SelectionKey.OP_READ);//6.2、处理业务 向服务端发送数据doService(sc);} else {//连接失败,退出System.exit(1);}}if (key.isReadable()) {//读事件准备继续//6.1、读服务端返回的数据SocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBufer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBufer);//前面设置过socketChannel是非阻塞的,故要通过返回值判断读取到的字节数if (readBytes > 0) {readBufer.flip();//读写模式切换byte[] bytes = new byte[readBufer.remaining()];readBufer.get(bytes);String msg = new String(bytes, "utf-8");//接收到服务端返回的数据后进行相关操作doService(msg);} else if (readBytes < 0) {//值为-1表示链路通道已经关闭key.cancel();sc.close();} else {//没读取到数据,忽略}}}}private static void doService(SocketChannel socketChannel) throws IOException {System.out.println("客户端开始向服务端发送数据:");//向服务端发送数据byte[] bytes = "hello nioServer,i am nioClient !".getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();socketChannel.write(writeBuffer);}private String doService(String msg) {System.out.println("成功接收来自服务端响应的数据:" + msg);return "";}
}
http://www.xdnf.cn/news/13794.html

相关文章:

  • 支持 CHI 协议的 NOC的错误注入和边界条件测试
  • Kubernetes微服务发布治理与Java容器化终极实践指南
  • SM3算法Python实现(无第三方库)
  • 运行springboot
  • 本地内网搭建网址需要外部网络连接怎么办?无公网ip实现https/http站点外网访问
  • 动态多目标进化算法:TrRMMEDA求解CEC2018(DF1-DF14),提供完整MATLAB代码
  • SpringBoot集成ActiveMQ
  • 3D 展示崛起:科技赋能的新变革
  • 【力扣 简单 C】83. 删除排序链表中的重复元素
  • 英一真题阅读单词笔记 10年
  • c语言接口设计模式之抽象算法,以冒泡排序为例
  • @Validation 的使用 Spring
  • Matlab图像清晰度评价指标
  • 如何在网页里填写 PDF下拉框
  • STM32 开发 - 中断案例(中断概述、STM32 的中断、NVIC 嵌套向量中断控制器、外部中断配置寄存器组、EXTI 外部中断控制器、实例实操)
  • Spring Boot 项目中Http 请求如何对响应体进行压缩
  • [C++][设计模式] : 单例模式(饿汉和懒汉)
  • php列表头部增加批量操作按钮,多选订单数据批量微信退款(含微信支付SDK)
  • 洛谷-P3375 【模板】KMP
  • 前端导出PDF(适配ios Safari浏览器)
  • 常见的网络协议有哪些
  • 图像匹配算法 笔记2025
  • 【从零学习JVM|第七篇】快速了解直接内存
  • Qt QTcpSocket的write无法发送数据【已解决】
  • 打卡day52
  • UE5制作与云渲染配置不足?3090/4090显卡云端解放创作力
  • 基于sample_aiisp例子,创建3路编码流,记录
  • 奥威BI:用AI重新定义数据分析,中小企业数字化转型的智能引擎
  • 力扣HOT100之技巧:31. 下一个排列
  • CMS软件以及常见分类