13.Websocket
java基础完结.最后补充一下 WebSocket
原生使用TCP实现Websocket
Websocket全双工通讯的协议 借助于Http协议进行连接,当客户端连接到服务端的时候会向服务端发送一个类似下面的HTTP报文:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
这是一个HTTP的get请求报文,注意该报文中有一个upgrade首部,它的作用是告诉服务端需要将通信协议切换到websocket。
如果服务端支持websocket协议,那么它就会将自己的通信协议切换到websocket,同时发给客户端类似于以下的一个响应报文头:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
返回的状态码为101,表示同意客户端协议转换请求,并将它转换为websocket协议。以上过程都是利用HTTP通信完成的,称之为websocket协议握手。
实现步骤
阶段一、客户端通过 HTTP 协议发送包含特殊头部的请求,触发协议升级:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
- Upgrade: websocket明确请求升级协议。
- Sec-WebSocket-Key:客户端生成的随机字符串,用于安全验证。
- Sec-WebSocket-Version:指定协议版本(RFC 6455 规定为 13)。
阶段二、服务器端进行响应确认,返回 101 Switching Protocols 响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
- Sec-WebSocket-Accept:服务器将客户端的 Sec-WebSocket-Key 与固定字符串拼接后,计算 SHA-1 哈希并进行 Base64 编码,生成验证令牌。
阶段三、此时 TCP 连接从 HTTP 升级为 WebSocket 协议,后续数据可通过二进制帧进行传输。
阶段四、数据传输,WebSocket是一种全双工通信协议,客户端与服务端可同时发送/接收数据,无需等待对方请求,数据帧是以二进制格式进行传输的。
如下图所示:
- FIN (1 bit):标记是否为消息的最后一个分片。
- Opcode (4 bits):定义数据类型(如文本 0x1、二进制 0x2、关闭连接 0x8、Ping 0x9、Pong 0xA)。
- Mask (1 bit):客户端发送的数据需掩码处理(防止缓存污染攻击),服务端发送的数据无需掩码。
- Payload Length (7 or 7+16 or 7+64 bits):帧内容的长度,支持最大 2^64-1 字节。
- Masking-key(32 bits),掩码密钥,由上面的标志位 MASK 决定的,如果使用掩码就是 4 个字节的随机数,否则就不存在。
- payload data 字段:这里存放的就是真正要传输的数据
阶段五、连接关闭,客户端或服务器端都可以发起关闭。
示例代码
前端代码:
<!DOCTYPE html>
<html>
<body><input type="text" id="messageInput" placeholder="输入消息"><button onclick="sendMessage()">发送</button><div id="messages"></div><script>// 创建 WebSocket 连接const socket = new WebSocket('ws://localhost:8080');// 连接打开时触发socket.addEventListener('open', () => {logMessage('连接已建立');});// 接收消息时触发socket.addEventListener('message', (event) => {logMessage('收到消息: ' + event.data);});// 连接关闭时触发socket.addEventListener('close', () => {logMessage('连接已关闭');});// 错误处理socket.addEventListener('error', (error) => {logMessage('连接错误: ' + error.message);});// 发送消息function sendMessage() {const message = document.getElementById('messageInput').value;socket.send(message);logMessage('发送消息: ' + message);}// 日志输出function logMessage(message) {const messagesDiv = document.getElementById('messages');const p = document.createElement('p');p.textContent = message;messagesDiv.appendChild(p);}</script>
</body>
</html>
java代码:
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class WebSocketServer {private static final int PORT = 8080;private static final String WEBSOCKET_KEY_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";private final ExecutorService threadPool = Executors.newCachedThreadPool();private ServerSocket serverSocket;private boolean running = false;public static void main(String[] args) {WebSocketServer server = new WebSocketServer();server.start();}public void start() {try {serverSocket = new ServerSocket(PORT);running = true;System.out.println("WebSocket服务器已启动,监听端口: " + PORT);while (running) {Socket clientSocket = serverSocket.accept();threadPool.execute(() -> handleClient(clientSocket));}} catch (IOException e) {if (running) {System.err.println("服务器启动失败: " + e.getMessage());}}}public void stop() {running = false;try {if (serverSocket != null && !serverSocket.isClosed()) {serverSocket.close();}threadPool.shutdown();System.out.println("服务器已停止");} catch (IOException e) {System.err.println("关闭服务器时出错: " + e.getMessage());}}private void handleClient(Socket clientSocket) {try (InputStream in = clientSocket.getInputStream();OutputStream out = clientSocket.getOutputStream()) {// 读取HTTP握手请求BufferedReader reader = new BufferedReader(new InputStreamReader(in));StringBuilder request = new StringBuilder();String line;while ((line = reader.readLine()) != null && !line.isEmpty()) {request.append(line).append("\r\n");}System.out.println("收到握手请求:\n" + request);// 提取WebSocket密钥String key = extractWebSocketKey(request.toString());if (key == null) {System.out.println("不是WebSocket握手请求");return;}// 生成响应密钥String responseKey = generateResponseKey(key);// 发送HTTP升级响应String response = "HTTP/1.1 101 Switching Protocols\r\n" +"Upgrade: websocket\r\n" +"Connection: Upgrade\r\n" +"Sec-WebSocket-Accept: " + responseKey + "\r\n\r\n";out.write(response.getBytes(StandardCharsets.UTF_8));out.flush();System.out.println("发送握手响应");// 开始WebSocket通信communicateWebSocket(clientSocket, in, out);} catch (IOException e) {System.err.println("处理客户端时出错: " + e.getMessage());} finally {try {clientSocket.close();} catch (IOException e) {System.err.println("关闭客户端连接时出错: " + e.getMessage());}}}private String extractWebSocketKey(String request) {String[] lines = request.split("\r\n");for (String line : lines) {if (line.startsWith("Sec-WebSocket-Key:")) {return line.substring("Sec-WebSocket-Key:".length()).trim();}}return null;}private String generateResponseKey(String key) {try {String concatenated = key + WEBSOCKET_KEY_MAGIC;MessageDigest sha1 = MessageDigest.getInstance("SHA-1");byte[] hash = sha1.digest(concatenated.getBytes(StandardCharsets.UTF_8));return Base64.getEncoder().encodeToString(hash);} catch (NoSuchAlgorithmException e) {throw new RuntimeException("SHA-1算法不可用", e);}}private void communicateWebSocket(Socket clientSocket, InputStream in, OutputStream out) throws IOException {while (clientSocket.isConnected()) {// 读取WebSocket帧byte[] header = new byte[2];if (in.read(header) != 2) {break;}boolean fin = (header[0] & 0x80) != 0;int opcode = header[0] & 0x0F;boolean masked = (header[1] & 0x80) != 0;long payloadLength = header[1] & 0x7F;if (payloadLength == 126) {byte[] extended = new byte[2];if (in.read(extended) != 2) {break;}payloadLength = ((extended[0] & 0xFF) << 8) | (extended[1] & 0xFF);} else if (payloadLength == 127) {byte[] extended = new byte[8];if (in.read(extended) != 8) {break;}payloadLength = 0;for (int i = 0; i < 8; i++) {payloadLength = (payloadLength << 8) | (extended[i] & 0xFF);}}byte[] maskingKey = new byte[4];if (masked && in.read(maskingKey) != 4) {break;}byte[] payload = new byte[(int) payloadLength];if (in.read(payload) != payloadLength) {break;}// 解掩码if (masked) {for (int i = 0; i < payloadLength; i++) {payload[i] = (byte) (payload[i] ^ maskingKey[i % 4]);}}// 处理控制帧if (opcode == 8) { // 关闭帧System.out.println("收到关闭帧");sendCloseFrame(out);break;} else if (opcode == 9) { // ping帧System.out.println("收到ping帧");sendPong(out, payload);} else if (opcode == 1) { // 文本帧String message = new String(payload, StandardCharsets.UTF_8);System.out.println("收到消息: " + message);sendMessage(out, "服务器收到: " + message);}}}private void sendMessage(OutputStream out, String message) throws IOException {byte[] payload = message.getBytes(StandardCharsets.UTF_8);int payloadLength = payload.length;// 构建WebSocket帧ByteArrayOutputStream baos = new ByteArrayOutputStream();// 第一个字节: FIN=1, opcode=1(文本帧)baos.write(0x81);// 第二个字节: 负载长度if (payloadLength <= 125) {baos.write(payloadLength);} else if (payloadLength <= 65535) {baos.write(126);baos.write((payloadLength >> 8) & 0xFF);baos.write(payloadLength & 0xFF);} else {baos.write(127);for (int i = 7; i >= 0; i--) {baos.write((int) ((payloadLength >> (i * 8)) & 0xFF));}}// 写入负载数据baos.write(payload);// 发送帧out.write(baos.toByteArray());out.flush();}private void sendPong(OutputStream out, byte[] payload) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();// 第一个字节: FIN=1, opcode=10(乒乓帧)baos.write(0x8A);// 第二个字节: 负载长度if (payload.length <= 125) {baos.write(payload.length);} else {throw new IOException("Ping负载太长,不支持");}// 写入负载数据baos.write(payload);// 发送帧out.write(baos.toByteArray());out.flush();}private void sendCloseFrame(OutputStream out) throws IOException {ByteArrayOutputStream baos = new ByteArrayOutputStream();// 第一个字节: FIN=1, opcode=8(关闭帧)baos.write(0x88);// 第二个字节: 负载长度为 0baos.write(0x00);out.write(baos.toByteArray());out.flush();}
}
Tomcat 实现 WebSocket
Tomcat 实现 WebSocket 主要基于 Java WebSocket API(JSR 356),并采用了高效的非阻塞 I/O 模型来处理大量并发连接,而非传统的线程池模型。
1. Tomcat 的 WebSocket 实现原理
1.1 基于 NIO 的非阻塞 I/O 模型
Tomcat 自 7.0 版本后引入了 NIO 连接器(org.apache.coyote.http11.Http11NioProtocol
),并在 8.0 + 版本中默认使用 NIO2(Http11Nio2Protocol
)。这些连接器使用单线程管理多个连接,通过事件驱动的方式处理 I/O 操作,避免了传统线程池的局限性。
- Selector 模式:Tomcat 使用
java.nio.channels.Selector
监听多个SocketChannel
的读写事件,一个线程可以管理数千个连接。 - 异步处理:WebSocket 连接建立后,数据读写通过异步方式进行,不会阻塞线程。
1.2 WebSocket 握手与协议升级
当客户端发起 WebSocket 握手请求(HTTP 1.1 + Upgrade: websocket
头)时,Tomcat 会:
- 解析 HTTP 请求,验证
Sec-WebSocket-Key
等头信息。 - 生成
Sec-WebSocket-Accept
响应头,完成协议升级。 - 将连接从 HTTP 处理器移交到 WebSocket 处理器(如
WsFrameServer
)。
1.3 生命周期管理
Tomcat 为每个 WebSocket 连接创建一个Session
对象,并通过Endpoint
接口管理生命周期:
onOpen()
:连接建立时触发。onMessage()
:收到消息时触发。onClose()
:连接关闭时触发。onError()
:发生错误时触发。
Tomcat 的 WebSocket 实现通过非阻塞 I/O + 少量线程(如 1 个 Acceptor 线程 + N 个 Selector 线程)即可管理大量连接,显著提升吞吐量。
2. Tomcat 的线程模型
Tomcat 的 WebSocket 处理涉及三类线程:
- Acceptor 线程:接收新连接,将其注册到 Selector。
- Selector 线程(I/O 线程):监听所有连接的 I/O 事件,触发回调。
- Worker 线程(可选):处理耗时操作(如业务逻辑),避免阻塞 Selector 线程。
示例配置(server.xml)
<Connector port="8080" protocol="org.apache.coyote.http11.Http11Nio2Protocol"maxThreads="200" <!-- Worker线程数 -->acceptCount="100" <!-- 最大等待连接数 -->selectorThreadCount="2" <!-- Selector线程数 -->maxConnections="8192" <!-- 最大并发连接数 -->
/>
3. 性能优化建议
- 调整 Selector 线程数:根据 CPU 核心数设置,通常为
2~4
。 - 增大
maxConnections
:默认值较低(如 8192),可根据内存调整。 - 异步处理业务逻辑:避免在 Selector 线程中执行耗时操作。
- 使用 WebSocket 注解:通过
@ServerEndpoint
简化开发,Tomcat 会自动优化。
示例:异步处理消息
@ServerEndpoint("/ws")
public class MyWebSocket {@OnMessagepublic void onMessage(Session session, String message) {// 提交到线程池处理业务逻辑,避免阻塞Selector线程ExecutorService executor = Executors.newFixedThreadPool(10);executor.submit(() -> {// 处理消息(如数据库操作、调用外部服务)session.getAsyncRemote().sendText("处理完成");});}
}
4. 与 Netty 的对比
- Tomcat:基于 Servlet 容器,适合与现有 Web 应用集成,开箱即用。
- Netty:纯 NIO 框架,性能更高(约 10%~20%),但需要手动配置。
对于大多数应用,Tomcat 的 WebSocket 实现已足够高效.Tomcat 通过非阻塞 I/O + 事件驱动模型实现 WebSocket,避免了传统线程池的瓶颈,可轻松支持数万并发连接;对于高性能场景(如游戏服务器),可考虑 Netty。