当前位置: 首页 > ops >正文

13.Websocket

java基础完结.最后补充一下 WebSocket

原生使用TCP实现Websocket

Websocket全双工通讯的协议  借助于Http协议进行连接,当客户端连接到服务端的时候会向服务端发送一个类似下面的HTTP报文: 

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

 这是一个HTTP的get请求报文,注意该报文中有一个upgrade首部,它的作用是告诉服务端需要将通信协议切换到websocket。
如果服务端支持websocket协议,那么它就会将自己的通信协议切换到websocket,同时发给客户端类似于以下的一个响应报文头:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

  返回的状态码为101,表示同意客户端协议转换请求,并将它转换为websocket协议。以上过程都是利用HTTP通信完成的,称之为websocket协议握手。

实现步骤

阶段一、客户端通过 HTTP 协议发送包含特殊头部的请求,触发协议升级:

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
  • Upgrade: websocket明确请求升级协议。
  • Sec-WebSocket-Key:客户端生成的随机字符串,用于安全验证。
  • Sec-WebSocket-Version:指定协议版本(RFC 6455 规定为 13)。

阶段二、服务器端进行响应确认,返回 101 Switching Protocols 响应:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
  • Sec-WebSocket-Accept:服务器将客户端的 Sec-WebSocket-Key 与固定字符串拼接后,计算 SHA-1 哈希并进行 Base64 编码,生成验证令牌。

阶段三、此时 TCP 连接从 HTTP 升级为 WebSocket 协议,后续数据可通过二进制帧进行传输。

阶段四、数据传输,WebSocket是一种全双工通信协议,客户端与服务端可同时发送/接收数据,无需等待对方请求,数据帧是以二进制格式进行传输的。

如下图所示:

  • FIN (1 bit):标记是否为消息的最后一个分片。
  • Opcode (4 bits):定义数据类型(如文本 0x1、二进制 0x2、关闭连接 0x8、Ping 0x9、Pong 0xA)。
  • Mask (1 bit):客户端发送的数据需掩码处理(防止缓存污染攻击),服务端发送的数据无需掩码。
  • Payload Length (7 or 7+16 or 7+64 bits):帧内容的长度,支持最大 2^64-1 字节。
  • Masking-key(32 bits),掩码密钥,由上面的标志位 MASK 决定的,如果使用掩码就是 4 个字节的随机数,否则就不存在。
  • payload data 字段:这里存放的就是真正要传输的数据

阶段五、连接关闭,客户端或服务器端都可以发起关闭。

示例代码

前端代码:

<!DOCTYPE html>
<html>
<body><input type="text" id="messageInput" placeholder="输入消息"><button onclick="sendMessage()">发送</button><div id="messages"></div><script>// 创建 WebSocket 连接const socket = new WebSocket('ws://localhost:8080');// 连接打开时触发socket.addEventListener('open', () => {logMessage('连接已建立');});// 接收消息时触发socket.addEventListener('message', (event) => {logMessage('收到消息: ' + event.data);});// 连接关闭时触发socket.addEventListener('close', () => {logMessage('连接已关闭');});// 错误处理socket.addEventListener('error', (error) => {logMessage('连接错误: ' + error.message);});// 发送消息function sendMessage() {const message = document.getElementById('messageInput').value;socket.send(message);logMessage('发送消息: ' + message);}// 日志输出function logMessage(message) {const messagesDiv = document.getElementById('messages');const p = document.createElement('p');p.textContent = message;messagesDiv.appendChild(p);}</script>
</body>
</html>

java代码:

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class WebSocketServer {private static final int PORT = 8080;private static final String WEBSOCKET_KEY_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";private final ExecutorService threadPool = Executors.newCachedThreadPool();private ServerSocket serverSocket;private boolean running = false;public static void main(String[] args) {WebSocketServer server = new WebSocketServer();server.start();}public void start() {try {serverSocket = new ServerSocket(PORT);running = true;System.out.println("WebSocket服务器已启动,监听端口: " + PORT);while (running) {Socket clientSocket = serverSocket.accept();threadPool.execute(() -> handleClient(clientSocket));}} catch (IOException e) {if (running) {System.err.println("服务器启动失败: " + e.getMessage());}}}public void stop() {running = false;try {if (serverSocket != null && !serverSocket.isClosed()) {serverSocket.close();}threadPool.shutdown();System.out.println("服务器已停止");} catch (IOException e) {System.err.println("关闭服务器时出错: " + e.getMessage());}}private void handleClient(Socket clientSocket) {try (InputStream in = clientSocket.getInputStream();OutputStream out = clientSocket.getOutputStream()) {// 读取HTTP握手请求BufferedReader reader = new BufferedReader(new InputStreamReader(in));StringBuilder request = new StringBuilder();String line;while ((line = reader.readLine()) != null && !line.isEmpty()) {request.append(line).append("\r\n");}System.out.println("收到握手请求:\n" + request);// 提取WebSocket密钥String key = extractWebSocketKey(request.toString());if (key == null) {System.out.println("不是WebSocket握手请求");return;}// 生成响应密钥String responseKey = generateResponseKey(key);// 发送HTTP升级响应String response = "HTTP/1.1 101 Switching Protocols\r\n" +"Upgrade: websocket\r\n" +"Connection: Upgrade\r\n" +"Sec-WebSocket-Accept: " + responseKey + "\r\n\r\n";out.write(response.getBytes(StandardCharsets.UTF_8));out.flush();System.out.println("发送握手响应");// 开始WebSocket通信communicateWebSocket(clientSocket, in, out);} catch (IOException e) {System.err.println("处理客户端时出错: " + e.getMessage());} finally {try {clientSocket.close();} catch (IOException e) {System.err.println("关闭客户端连接时出错: " + e.getMessage());}}}private String extractWebSocketKey(String request) {String[] lines = request.split("\r\n");for (String line : lines) {if (line.startsWith("Sec-WebSocket-Key:")) {return line.substring("Sec-WebSocket-Key:".length()).trim();}}return null;}private String generateResponseKey(String key) {try {String concatenated = key + WEBSOCKET_KEY_MAGIC;MessageDigest sha1 = MessageDigest.getInstance("SHA-1");byte[] hash = sha1.digest(concatenated.getBytes(StandardCharsets.UTF_8));return Base64.getEncoder().encodeToString(hash);} catch (NoSuchAlgorithmException e) {throw new RuntimeException("SHA-1算法不可用", e);}}private void communicateWebSocket(Socket clientSocket, InputStream in, OutputStream out) throws IOException {while (clientSocket.isConnected()) {// 读取WebSocket帧byte[] header = new byte[2];if (in.read(header) != 2) {break;}boolean fin = (header[0] & 0x80) != 0;int opcode = header[0] & 0x0F;boolean masked = (header[1] & 0x80) != 0;long payloadLength = header[1] & 0x7F;if (payloadLength == 126) {byte[] extended = new byte[2];if (in.read(extended) != 2) {break;}payloadLength = ((extended[0] & 0xFF) << 8) | (extended[1] & 0xFF);} else if (payloadLength == 127) {byte[] extended = new byte[8];if (in.read(extended) != 8) {break;}payloadLength = 0;for (int i = 0; i < 8; i++) {payloadLength = (payloadLength << 8) | (extended[i] & 0xFF);}}byte[] maskingKey = new byte[4];if (masked && in.read(maskingKey) != 4) {break;}byte[] payload = new byte[(int) payloadLength];if (in.read(payload) != payloadLength) {break;}// 解掩码if (masked) {for (int i = 0; i < payloadLength; i++) {payload[i] = (byte) (payload[i] ^ maskingKey[i % 4]);}}// 处理控制帧if (opcode == 8) { // 关闭帧System.out.println("收到关闭帧");sendCloseFrame(out);break;} else if (opcode == 9) { // ping帧System.out.println("收到ping帧");sendPong(out, payload);} else if (opcode == 1) { // 文本帧String message = new String(payload, StandardCharsets.UTF_8);System.out.println("收到消息: " + message);sendMessage(out, "服务器收到: " + message);}}}private void sendMessage(OutputStream out, String message) throws IOException {byte[] payload = message.getBytes(StandardCharsets.UTF_8);int payloadLength = payload.length;// 构建WebSocket帧ByteArrayOutputStream baos = new ByteArrayOutputStream();// 第一个字节: FIN=1, opcode=1(文本帧)baos.write(0x81);// 第二个字节: 负载长度if (payloadLength <= 125) {baos.write(payloadLength);} else if (payloadLength <= 65535) {baos.write(126);baos.write((payloadLength >> 8) & 0xFF);baos.write(payloadLength & 0xFF);} else {baos.write(127);for (int i = 7; i >= 0; i--) {baos.write((int) ((payloadLength >> (i * 8)) & 0xFF));}}// 写入负载数据baos.write(payload);// 发送帧out.write(baos.toByteArray());out.flush();}private void sendPong(OutputStream out, byte[] payload) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();// 第一个字节: FIN=1, opcode=10(乒乓帧)baos.write(0x8A);// 第二个字节: 负载长度if (payload.length <= 125) {baos.write(payload.length);} else {throw new IOException("Ping负载太长,不支持");}// 写入负载数据baos.write(payload);// 发送帧out.write(baos.toByteArray());out.flush();}private void sendCloseFrame(OutputStream out) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();// 第一个字节: FIN=1, opcode=8(关闭帧)baos.write(0x88);// 第二个字节: 负载长度为 0baos.write(0x00);out.write(baos.toByteArray());out.flush();}
}    

Tomcat 实现 WebSocket

Tomcat 实现 WebSocket 主要基于 Java WebSocket API(JSR 356),并采用了高效的非阻塞 I/O 模型来处理大量并发连接,而非传统的线程池模型。

1. Tomcat 的 WebSocket 实现原理

1.1 基于 NIO 的非阻塞 I/O 模型

Tomcat 自 7.0 版本后引入了 NIO 连接器(org.apache.coyote.http11.Http11NioProtocol),并在 8.0 + 版本中默认使用 NIO2(Http11Nio2Protocol)。这些连接器使用单线程管理多个连接,通过事件驱动的方式处理 I/O 操作,避免了传统线程池的局限性。

  • Selector 模式:Tomcat 使用java.nio.channels.Selector监听多个SocketChannel的读写事件,一个线程可以管理数千个连接。
  • 异步处理:WebSocket 连接建立后,数据读写通过异步方式进行,不会阻塞线程。
1.2 WebSocket 握手与协议升级

当客户端发起 WebSocket 握手请求(HTTP 1.1 + Upgrade: websocket头)时,Tomcat 会:

  1. 解析 HTTP 请求,验证Sec-WebSocket-Key等头信息。
  2. 生成Sec-WebSocket-Accept响应头,完成协议升级。
  3. 将连接从 HTTP 处理器移交到 WebSocket 处理器(如WsFrameServer)。
1.3 生命周期管理

Tomcat 为每个 WebSocket 连接创建一个Session对象,并通过Endpoint接口管理生命周期:

  • onOpen():连接建立时触发。
  • onMessage():收到消息时触发。
  • onClose():连接关闭时触发。
  • onError():发生错误时触发。

Tomcat 的 WebSocket 实现通过非阻塞 I/O + 少量线程(如 1 个 Acceptor 线程 + N 个 Selector 线程)即可管理大量连接,显著提升吞吐量。

2. Tomcat 的线程模型

Tomcat 的 WebSocket 处理涉及三类线程:

  1. Acceptor 线程:接收新连接,将其注册到 Selector。
  2. Selector 线程(I/O 线程):监听所有连接的 I/O 事件,触发回调。
  3. Worker 线程(可选):处理耗时操作(如业务逻辑),避免阻塞 Selector 线程。
示例配置(server.xml)
<Connector port="8080" protocol="org.apache.coyote.http11.Http11Nio2Protocol"maxThreads="200"           <!-- Worker线程数 -->acceptCount="100"          <!-- 最大等待连接数 -->selectorThreadCount="2"    <!-- Selector线程数 -->maxConnections="8192"      <!-- 最大并发连接数 -->
/>

3. 性能优化建议

  1. 调整 Selector 线程数:根据 CPU 核心数设置,通常为2~4
  2. 增大maxConnections:默认值较低(如 8192),可根据内存调整。
  3. 异步处理业务逻辑:避免在 Selector 线程中执行耗时操作。
  4. 使用 WebSocket 注解:通过@ServerEndpoint简化开发,Tomcat 会自动优化。
示例:异步处理消息
@ServerEndpoint("/ws")
public class MyWebSocket {@OnMessagepublic void onMessage(Session session, String message) {// 提交到线程池处理业务逻辑,避免阻塞Selector线程ExecutorService executor = Executors.newFixedThreadPool(10);executor.submit(() -> {// 处理消息(如数据库操作、调用外部服务)session.getAsyncRemote().sendText("处理完成");});}
}

4. 与 Netty 的对比

  • Tomcat:基于 Servlet 容器,适合与现有 Web 应用集成,开箱即用。
  • Netty:纯 NIO 框架,性能更高(约 10%~20%),但需要手动配置。

对于大多数应用,Tomcat 的 WebSocket 实现已足够高效.Tomcat 通过非阻塞 I/O + 事件驱动模型实现 WebSocket,避免了传统线程池的瓶颈,可轻松支持数万并发连接;对于高性能场景(如游戏服务器),可考虑 Netty。

http://www.xdnf.cn/news/12901.html

相关文章:

  • WebRTC(一):整体架构
  • 【STM32】G030单片机开启超过8个ADC通道的方法
  • mongodb源码分析session执行handleRequest命令find过程
  • [ linux-系统 ] 进程控制
  • UNECE R79——解读自动驾驶相关标准法规
  • C++中vector类型的介绍和使用
  • 生成对抗网络(GAN)损失函数解读
  • 使用MFC中的CEvent实现两个线程之间的交替打印
  • 【Linux系统】Linux环境变量:系统配置的隐形指挥官
  • Gemini 2.5 Pro (0605版本) 深度测评与体验指南
  • MySQL 8.0 OCP 英文题库解析(十二)
  • Rust 学习笔记:共享状态并发
  • 三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
  • 从零手写Java版本的LSM Tree (三):MemTable 内存表
  • 图表类系列各种样式PPT模版分享
  • 高性能低功耗之道:全志A133在智能硬件中的全面应用
  • 设计模式-抽象工厂模式
  • CSS3 常用功能详细使用指南
  • App Trace技术解析:传参安装、一键拉起与快速安装
  • 【Linux】Linux安装并配置RabbitMQ
  • Maven 多仓库治理与发布策略深度实践
  • Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?
  • 大模型的LoRa通讯详解与实现教程
  • 时序数据库IoTDB在工业物联网时序数据管理中的应用
  • Ray框架:分布式AI训练与调参实践
  • WEB3全栈开发——面试专业技能点P4数据库
  • 数据结构-文件
  • Unity3D SM节点式动画技能编辑器实现
  • AIGC(AI Generated Content)测试结合自动化工具与人工评估
  • 在 Windows 11 上创建新本地用户账户