WebSocket:实时通信的新时代
在现代Web应用中,实时通信变得越来越重要。传统的HTTP协议虽然能够满足基本的请求-响应模式,但在需要频繁更新数据的场景下,其效率和性能显得捉襟见肘。WebSocket协议应运而生,它提供了一种在单个TCP连接上进行全双工通信的机制,使得服务器能够主动向客户端推送数据,从而极大地提高了实时性。
官网:WebSockets handbook | WebSocket.org
1. WebSocket的基本概念
1.1 什么是WebSocket?
WebSocket是一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务器之间的数据交换变得更加简单、高效。WebSocket协议在2011年被IETF定为标准RFC 6455,并被现代浏览器广泛支持。
单工、半双工和全双工区别:
- 单工模式(Simplex):通信方向完全单向且不可逆。例如广播、遥控器,发送端无法接收数据。
- 半双工(Half-Duplex):双向通信但需分时切换方向,如对讲机,一方说话时另一方需沉默。
- 全双工(Full-Duplex):同时双向传输,需两条独立信道或技术分割(如频分复用),如电话、网络通信。
1.2 主要特点
- 全双工通信:客户端和服务器可以同时发送和接收数据。
- 单一连接:只需要一次握手即可建立连接,后续的数据传输不需要额外的握手。
- 低开销:数据帧头较小,减少了数据传输的开销。
- 支持二进制和文本数据:可以传输文本和二进制数据。
- 跨域支持:支持跨域通信,无需额外配置。
1.3 工作原理
1.3.1 握手过程
WebSocket连接的建立是通过HTTP协议进行的,具体步骤如下:
1. 客户端发起请求:客户端通过HTTP协议向服务器发送一个特殊的请求,请求头中包含Upgrade: websocket (升级协议)和Connection: Upgrade(想升级成websocket协议)字段,并且会携带 Sec-WebSocket-Key(随机生成的base64码)头。
2. 服务器响应:如果服务器支持WebSocket协议,它会返回一个101状态码(协议切换),并在响应头中包含Upgrade: websocket和Connection: Upgrade字段,同时提供一个Sec-WebSocket-Accept 字段来验证请求。
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: sGmI5fWZaDZ0JxQV7MkKpR4lXhA=
Sec-WebSocket-Accept 的计算方法如下:
将 Sec-WebSocket-Key 与固定的 GUID(258EAFA5-E914-47DA-95CA-C5AB0DC85B11)拼接后进行 SHA-1 哈希运算,并将结果进行 Base64 编码。即
base64(hsa1(sec-websocket-key + 258EAFA5-E914-47DA-95CA-C5AB0DC85B11))
如果这个 Sec-WebSocket-Accept 计算错误浏览器会提示:Sec-WebSocket-Accept dismatch
3. 建立连接:一旦握手成功,HTTP连接就升级为WebSocket连接,双方可以开始全双工通信。
1.3.2 数据传输
一旦WebSocket连接建立,客户端和服务器可以随时发送数据帧。数据帧可以是文本或二进制数据,每个数据帧都有一个固定的格式,包含帧头和负载数据。
1.3.3 关闭阶段
WebSocket连接的关闭阶段涉及客户端和服务器之间的握手过程,确保连接能够安全地关闭。以下是关闭阶段的详细步骤:
1. 发送关闭帧:
- 任何一方(客户端或服务器)都可以发起关闭连接的过程。
- 发起关闭的一方会发送一个关闭帧(Close Frame),该帧包含一个状态码和一个可选的关闭原因字符串。
2. 接收关闭帧:
- 接收到关闭帧的一方会解析状态码和关闭原因字符串。
- 接收方会发送一个关闭帧作为响应,确认关闭请求。
3. 关闭连接:
- 一旦双方都发送了关闭帧并确认了关闭请求,连接就会被关闭。
- 关闭帧的状态码用于指示关闭的原因,常见的状态码包括:
- 1000:正常关闭。
- 1001:端点离开,例如用户导航到另一个页面或关闭浏览器。
- 1002:协议错误。
- 1003:不支持的数据类型。
- 1006:连接异常关闭(例如,没有收到关闭帧)。
- 1007:接收到的数据无法处理。
- 1008:违反协议。
- 1009:接收到的数据太大。
- 1010:客户端期望的扩展未协商。
- 1011:服务器端错误。
- 1012:服务重启。
- 1013:临时重定向。
- 1014:坏的请求。
- 1015:TLS握手失败。
4. 清理资源:关闭连接后,双方需要清理与该连接相关的所有资源,包括会话、缓冲区等。
1.4 WebSocket 的帧
WebSocket 帧(Frame)是 WebSocket 协议中用于在客户端和服务器之间传输数据的基本单位。WebSocket 协议将数据划分成一个或多个帧进行传输,每个帧都有特定的格式和用途。
1.4.1 WebSocket 帧的作用
帧的设计使得 WebSocket 能够高效地处理不同类型的数据,并支持:
- 文本消息与二进制消息的区分
- 控制帧用于管理连接状态(如关闭、心跳)
- 分片传输大消息
- 加密与掩码保护(尤其在客户端发送时)
1.4.2 WebSocket 帧的结构
WebSocket 的帧结构定义在 RFC 6455 中,每个帧包含以下字段(从左到右按位排列):
字段 | 长度 | 说明 |
FIN | 1 bit | 是否为消息的最后一帧(1 表示结束) |
RSV1, RSV2, RSV3 | 各 1 bit | 保留位,通常为 0,用于扩展协议 |
Opcode | 4 bits | 操作码,表示帧类型 |
Mask | 1 bit | 是否使用掩码(客户端发送必须为 1) |
Payload Length | 7/7+16/7+64 bits | 负载长度(可变长) |
Masking Key (可选) | 32 bits | 掩码密钥,仅当 Mask=1 时存在 |
Payload Data | 可变长度 | 实际传输的数据 |
1.4.3 WebSocket 帧的类型(Opcode)
Opcode 是操作码,决定了该帧的类型。常见的 Opcode 如下:
Opcode | 类型 | 描述 |
0x0 | continuation frame | 消息的延续帧 |
0x1 | text frame | UTF-8 编码的文本数据 |
0x2 | binary frame | 二进制数据 |
0x8 | close frame | 关闭连接 |
0x9 | ping frame | 心跳请求 |
0xA | pong frame | 心跳响应 |
2. WebSocket 的优点
- 全双工通信:客户端和服务器可以同时发送和接收数据,实现真正的双向通信。
- 低延迟:无需重复建立连接,避免了 HTTP 请求/响应的往返延迟,适合实时应用。
- 减少网络开销:握手后不再携带大量 HTTP 头信息,数据传输更高效。
- 保持长连接:连接一旦建立,即可持续通信,避免频繁连接释放带来的性能损耗。
- 兼容性好:支持主流浏览器(现代 Web 浏览器均支持),并可通过 WSS 实现加密通信(WebSocket Secure)。
- 适用于多种数据格式:可以传输文本(如 JSON)、二进制数据(如图像、音频流等)。
3. WebSocket 的缺点
- 连接维持成本高:每个连接都需要服务器长期维护,对服务器资源消耗较大,需合理使用连接池或连接复用。
- 不适用于所有场景:如果只是简单的请求-响应模型(如获取静态资源),HTTP 更加轻量高效。
- 需要额外的开发与维护成本:需要处理连接断开重连、心跳机制、消息编码解码等问题。
- 代理和防火墙限制:某些老旧的中间设备可能不支持 WebSocket 协议,导致连接失败。
- 安全性要求更高:需要防范 WebSocket 相关攻击(如跨站 WebSocket 劫持、消息注入等),建议配合 WSS 和 Token 认证。
- 负载均衡复杂度高:常规负载均衡策略难以保证客户端始终连接到同一个后端节点,通常需要引入 sticky session 或分布式状态管理。
4. 适用场景
- 在线聊天室 / IM
- 实时协作工具(如协同文档编辑)
- 实时游戏
- 股票行情推送
- 物联网设备远程控制
- 视频弹幕系统
- 实时音视频传输(结合其他协议)
5. 不推荐使用 WebSocket 的场景
- 简单的数据拉取(如分页加载)
- SEO 敏感页面(搜索引擎无法抓取 WebSocket 数据)
- 对实时性要求不高的业务逻辑
6. WebSocket 客户端
转载地址:https://juejin.cn/post/7111132777394733064
6.1 基于 HTML/JavaScript 简单示例
<!DOCTYPE HTML>
<html>
<head><meta charset="UTF-8"><title>WebSocket Demo</title>
</head>
<body>
<input id="text" type="text"/>
<button onclick="send()">发送消息</button>
<button onclick="closeWebSocket()">关闭连接</button>
<div id="message">
</div>
</body>
<script type="text/javascript">var websocket = null;var clientId = Math.random().toString(36).substr(2);//判断当前浏览器是否支持WebSocketif ('WebSocket' in window) {//连接WebSocket节点websocket = new WebSocket("ws://localhost:8080/ws/" + clientId);} else {alert('Not support websocket')}//连接发生错误的回调方法websocket.onerror = function () {setMessageInnerHTML("error");};//连接成功建立的回调方法websocket.onopen = function () {setMessageInnerHTML("连接成功");}//接收到消息的回调方法websocket.onmessage = function (event) {setMessageInnerHTML(event.data);}//连接关闭的回调方法websocket.onclose = function () {setMessageInnerHTML("close");}//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。window.onbeforeunload = function () {websocket.close();}//将消息显示在网页上function setMessageInnerHTML(innerHTML) {document.getElementById('message').innerHTML += innerHTML + '<br/>';}//发送消息function send() {var message = document.getElementById('text').value;websocket.send(message);}//关闭连接function closeWebSocket() {websocket.close();}
</script>
</html>
6.2 基于 javax.websocket(标准 API)
1. 添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 定义 WebSocket 客户端组件
import lombok.extern.slf4j.Slf4j;import javax.websocket.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;@Slf4j
@ClientEndpoint
public class JavaxWebSocketClientEndpoint extends Endpoint {@OnOpen@Overridepublic void onOpen(Session session, EndpointConfig config) {//连接建立}@OnClosepublic void onClose(Session session, CloseReason reason) {//连接关闭}@OnMessagepublic void onMessage(Session session, String message) {//接收文本消息}@OnMessagepublic void onMessage(Session session, PongMessage message) {//接收pong消息//解析示例ByteBuffer buffer = message.getApplicationData();if (buffer == null || buffer.remaining() <= 0) {log.info("客户端接收到pong消息: <empty>")}// 以UTF-8为例String text = StandardCharsets.UTF_8.decode(buffer).toString();log.info("客户端接收到pong消息:{}", text);}@OnMessagepublic void onMessage(Session session, ByteBuffer message) {//接收二进制消息}@OnErrorpublic void onError(Session session, Throwable e) {//异常处理}
}
其中:
- @OnOpen:
- 作用:标记的方法会在 WebSocket 连接建立时被调用。
- 适用方法签名:
public void onOpen(Session session)
或包含更多参数:
public void onOpen(Session session, EndpointConfig config)
- @OnClose:
- 作用: 标记的方法会在 WebSocket 连接关闭时被调用。
- 适用方法签名:
public void onClose(Session session, CloseReason closeReason)
- @OnError:
- 作用: 标记的方法会在 WebSocket 发生异常时被调用。
- 适用方法签名:
public void onError(Session session, Throwable throwable)
- @OnMessage:
- 作用: 标记的方法会在收到 WebSocket 消息时被调用。支持文本、二进制、Pong 等消息类型。
- 适用方法签名:
接收文本消息:
public void onTextMessage(String message)
接收二进制消息:
public void onBinaryMessage(ByteBuffer data)
接收 Pong 消息(响应 Ping):
public void onPongMessage(PongMessage message)
- @ClientEndpoint 支持四个参数:
- subprotocols:
- 类型:String[]
- 默认值:空数组
- 说明:声明客户端支持的子协议列表,在握手阶段与服务端协商使用哪个协议。
- 支持类型:
- chat:简单文本聊天协议
- graphql-ws:Apollo、GraphQL 订阅
- wamp:实时通信、RPC、发布/订阅
- soap:Web 服务调用
- mqtt:轻量级物联网消息协议
- 自定义命名协议版本
- ...
- 握手流程中的行为:(以subprotocols = {"chat", "superchat"}为例)
- 客户端在握手请求中携带:Sec-WebSocket-Protocol: chat, superchat
- 服务端从自己的支持列表中查找是否有匹配项,如果有,就返回选中的协议,例如:Sec-WebSocket-Protocol: chat
- 连接建立后,双方将基于选中的协议格式进行通信。
- decoders:
- 类型:Class<? extends Decoder>[]
- 默认值:空数组
- 说明:指定该端点支持的消息解码器类,用于将接收到的原始消息(如文本或二进制)转换为 Java 对象。
- encoders:
- 类型:Class<? extends Encoder>[]
- 默认值:空数组
- 说明:指定该端点支持的消息编码器类,用于将 Java 对象转换为发送的原始格式(如 JSON 字符串)。
- configurator:
- 类型:Class<? extends ClientEndpointConfig.Configurator>
- 默认值:ClientEndpointConfig.Configurator.class
- 说明:指定自定义的配置类,用于在建立连接时进行自定义配置,例如添加请求头等。
- subprotocols:
以添加请求头为例:
1. 创建 ClientEndpointConfig.Configurator 的子类。
import org.springframework.http.HttpHeaders;import javax.websocket.ClientEndpointConfig;
import java.util.Collections;
import java.util.List;
import java.util.Map;public class ClientWebSocketConfigurator extends ClientEndpointConfig.Configurator {@Overridepublic void beforeRequest(Map<String, List<String>> headers) {headers.put(HttpHeaders.AUTHORIZATION, Collections.singletonList("Bearer <your-token>"));}@Overridepublic void afterResponse(HandshakeResponse handshakeResponse) {super.afterResponse(handshakeResponse);}
}
2. 在 @ClientEndpoint 的参数上添加 configurator = ClientWebSocketConfigurator.class
注意:
- JavaxWebSocketClientEndpoint 一定是空参构造器,不要用带参构造器!
- javax.websocket库中定义了PongMessage而没有PingMessage。所有的WebSocket包括前js自带的,都实现了自动回复。也就是说当接收到一个ping消息之后,是会自动回应一个pong消息,所以没有必要再自己接受ping消息来处理了,即我们不会接受到ping消息。
- 不支持通配符。
说明:
- Session 参数可以省略;
- JavaxWebSocketClientEndpoint 可以不用继承 Endpoint ,如果不继承则可以省略@OnOpen 的 EndpointConfig 参数。
3. 客户端调用
- 方式一:
//服务端地址
String uri = "ws://localhost:8080/ws/gasijg";
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, URI.create(uri));//方式二:创建 WebSocket 配置,会覆盖 @ClientEndpoint 的参数
// ClientEndpointConfig config = ClientEndpointConfig.Builder.create()
// //WebSocket 扩展可以用来提供额外的功能
// .extensions(Arrays.asList(
// new ExtensionImpl("permessage-deflate")
// ))
// //WebSocket 协议
// .preferredSubprotocols(Arrays.asList("chat"))
// //WebSocket 消息解码器
// .decoders(Arrays.asList(MyDecoder.class))
// //WebSocket 消息编码器
// .encoders(Arrays.asList(MyEncoder.class))
// //WebSocket 配置器
// .configurator(new ClientEndpointConfig.Configurator() {
// @Override
// public void beforeRequest(Map<String, List<String>> headers) {
// //配置鉴权
// headers.put("Authorization", Collections.singletonList("Bearer <your-token>"));
// }
// @Override
// public void afterResponse(HandshakeResponse handshakeResponse) {
// super.afterResponse(handshakeResponse);
// }
// })
// .build();
// Session session = container.getContainer().connectToServer(JavaxWebSocketClientEndpoint.class, config, URI.create(uri));RemoteEndpoint.Async remote = session.getAsyncRemote();
//发送文本
remote.sendText("Text message");
//发送二进制文本
remote.sendBinary(ByteBuffer.wrap("Binary message".getBytes()));
//发送pong
remote.sendPong(ByteBuffer.wrap("Pong message".getBytes()));
//发送ping
remote.sendPing(ByteBuffer.wrap("Ping message".getBytes()));
//发送对象消息,会尝试使用Encoder编码
remote.sendObject("Object message");
- 方式二:
1.实现 ServletContextAware 类
import lombok.NonNull;
import org.springframework.stereotype.Component;
import org.springframework.web.context.ServletContextAware;import javax.servlet.ServletContext;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;@Component
public class JavaxWebSocketContainer implements ServletContextAware {private volatile WebSocketContainer container;public WebSocketContainer getContainer() {if (container == null) {synchronized (this) {if (container == null) {container = ContainerProvider.getWebSocketContainer();}}}return container;}@Overridepublic void setServletContext(@NonNull ServletContext servletContext) {if (container == null) {container = (WebSocketContainer) servletContext.getAttribute("javax.websocket.server.ServerContainer");}}
}
2. WebSocket 客户端发送消息
@Autowired
private JavaxWebSocketContainer container;//服务端地址
String uri = "ws://localhost:8080/ws/1";
Session session = this.container.getContainer().connectToServer(JavaxWebSocketClientEndpoint.class, URI.create(uri));//方式二:创建 WebSocket 配置,会覆盖 @ClientEndpoint 的参数
// ClientEndpointConfig config = ClientEndpointConfig.Builder.create()
// //WebSocket 扩展可以用来提供额外的功能
// .extensions(Arrays.asList(
// new ExtensionImpl("permessage-deflate")
// ))
// //WebSocket 协议
// .preferredSubprotocols(Arrays.asList("chat"))
// //WebSocket 消息解码器
// .decoders(Arrays.asList(MyDecoder.class))
// //WebSocket 消息编码器
// .encoders(Arrays.asList(MyEncoder.class))
// //WebSocket 配置器
// .configurator(new ClientEndpointConfig.Configurator() {
// @Override
// public void beforeRequest(Map<String, List<String>> headers) {
// //配置鉴权
// headers.put("Authorization", Collections.singletonList("Bearer <your-token>"));
// }
// @Override
// public void afterResponse(HandshakeResponse handshakeResponse) {
// super.afterResponse(handshakeResponse);
// }
// })
// .build();
// Session session = container.getContainer().connectToServer(JavaxWebSocketClientEndpoint.class, config, URI.create(uri));RemoteEndpoint.Async remote = session.getAsyncRemote();
//发送文本
remote.sendText("Text message");
//发送二进制文本
remote.sendBinary(ByteBuffer.wrap("Binary message".getBytes()));
//发送pong
remote.sendPong(ByteBuffer.wrap("Pong message".getBytes()));
//发送ping
remote.sendPing(ByteBuffer.wrap("Ping message".getBytes()));
//发送对象消息,会尝试使用Encoder编码
remote.sendObject("Object message");
注意:
- 调用时传的 ClientEndpointConfig 会覆盖 @ClientEndpoint 注解上的参数。
- JavaxWebSocketClientEndpoint 不归 Spring 管理,无法注入 Bean。可以通过该方式获取Bean:
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;@Component public class SpringBeanUtils implements ApplicationContextAware {private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringBeanUtils.applicationContext = applicationContext;}public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}public static Object getBean(String beanName) {return applicationContext.getBean(beanName);} }
通过调用 SpringBeanUtils.getBean() 方法获取。
6.3 基于 Spring WebMVC
1. 添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 定义 WebSocket 客户端组件
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;// 如果不想被 Spring 管理,@Component 可以省略
@Component
public class ServletWebSocketClientHandler implements WebSocketHandler {private WebSocketSession session;public WebSocketSession getSession() {return session;}@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// WebSocket 连接建立后的回调方法// 可在此处添加连接成功后的初始化逻辑,例如发送登录消息或订阅特定主题this.session = session;}@Overridepublic void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {// 收到服务端发送的消息时触发// 参数 message 包含了消息内容,可通过 message.getPayload() 获取实际数据// 可在此处添加对不同类型消息的处理逻辑}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {// WebSocket 通信过程中发生传输错误时调用(如网络中断)// 可记录日志、尝试重连或通知用户}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {// WebSocket 连接关闭后触发// 参数 closeStatus 提供了关闭原因,可用于判断是否异常断开// 可用于清理资源或触发自动重连机制}@Overridepublic boolean supportsPartialMessages() {// 返回当前处理器是否支持接收部分消息(即分片消息)// 默认返回 false 表示不处理分片消息// 若需处理文本或二进制的大消息分片,可返回 true 并实现 PartialMessageHandler 接口return false;}
}
3. WebSocket 客户端发送消息
@Autowired
private ServletWebSocketClientHandler handler;// WebSocket服务器的URI
String uri = "ws://localhost:8080/ws/1?key=value";
// 创建WebSocket客户端实例,可以换成 WebSocketClient 其他实现类或自己实现
WebSocketClient client = new StandardWebSocketClient();
// 可以注入,也可以创建WebSocket处理器实例,用于处理WebSocket连接的各种事件
//ServletWebSocketClientHandler handler = new ServletWebSocketClientHandler();
// 创建WebSocket连接管理器,用于管理WebSocket连接的生命周期
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, handler, uri);
// 创建并设置HTTP头信息,这里设置了Authorization头,用于认证
//MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
//map.put(HttpHeaders.AUTHORIZATION, Collections.singletonList("Bearer <your-token>"));
//manager.setHeaders(new HttpHeaders(map));
// 设置WebSocket连接的源,这里设置为通配符"*",表示允许任何源
//manager.setOrigin("*");
// 设置子协议
//manager.setSubProtocols(Arrays.asList("chat"));
// 设置自动启动WebSocket连接
//manager.setAutoStartup(true);
// 设置WebSocket连接的阶段
// Integer.MIN_VALUE:最早启动,在所有组件之前启动
// 1000:在 phase 小于 1000 的组件之后启动
// Integer.MAX_VALUE:最晚启动,在所有组件之后启动
// 0:在默认阶段启动(通常用于普通 Bean)
//manager.setPhase(0);
// 启动WebSocket连接,异步非阻塞
manager.start();// 发送消息
WebSocketSession session = handler.getSession();
// 发送文本
session.sendMessage(new TextMessage("Text message"));
// 发送二进制文本
session.sendMessage(new BinaryMessage(ByteBuffer.wrap("Binary message".getBytes())));
// 发送pong
session.sendMessage(new PongMessage(ByteBuffer.wrap("Pong message".getBytes())));
// 发送ping
session.sendMessage(new PingMessage(ByteBuffer.wrap("Ping message".getBytes())));
注意:
- manager.start(); 为异步非阻塞方法,一定要确保连接后再发送消息。
6.4 基于 Spring WebFlux
1. 添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2. 定义 WebSocket 客户端组件
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;import java.nio.charset.StandardCharsets;// 如果不想被 Spring 管理,@Component 可以省略
@Component
public class ReactiveWebSocketClientHandler implements WebSocketHandler {private FluxSink<WebSocketMessage> sink;private WebSocketSession session;public FluxSink<WebSocketMessage> getSink() {return sink;}public WebSocketSession getSession() {return session;}@Overridepublic Mono<Void> handle(WebSocketSession session) {this.session = session;Mono<Void> send = session.send(Flux.create(sink -> {this.sink = sink;})).doOnError(it -> {//异常处理});Mono<Void> receive = session.receive().doOnNext(it -> {//接收消息}).doOnError(it -> {//异常处理}).then();@SuppressWarnings("all")Disposable disposable = session.closeStatus().doOnNext(closeStatus -> {// 连接关闭,closeStatus:关闭状态码}).doOnError(it -> {//异常处理}).subscribe(it -> {//连接关闭});return Mono.zip(send, receive).then();}
}
3. WebSocket 客户端发送消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.FluxSink;import java.net.URI;@Autowired
private ReactiveWebSocketClientHandler handler;// 构建WebSocket连接的URI
String uri = "ws://localhost:8080/ws/1";
// 创建WebSocket客户端实例, 按需选择 WebSocketClient 的实现类
WebSocketClient client = new ReactorNettyWebSocketClient();
// 创建处理WebSocket消息的处理器实例
// 直接创建不能注入、依赖Spring。
//ReactiveWebSocketClientHandler handler = new ReactiveWebSocketClientHandler();
// 使用客户端执行WebSocket连接,并在连接建立后开始处理消息,异步非阻塞
client.execute(URI.create(uri), handler)// 重试次数
// .retry(5)// 自定义重试逻辑
/* .retryWhen(new Retry() {@Overridepublic Publisher<?> generateCompanion(Flux<RetrySignal> flux) {return flux.zipWith(Flux.range(1, 5), (error, count) -> count) // 重试次数.flatMap(retryCount -> Mono.delay(Duration.ofSeconds(retryCount))); // 增加重试间隔}})*/
// .doOnTerminate(() -> {
// // 关闭连接
// }).subscribe();
// 情况二:携带请求头
//HttpHeaders headers = new HttpHeaders();
//headers.add(HttpHeaders.AUTHORIZATION, "Bearer <your-token>");
//client.execute(URI.create(uri), headers, handler).subscribe();// 获取FluxSink对象,用于向WebSocket连接发送消息
FluxSink<WebSocketMessage> sink = handler.getSink();
// 获取WebSocketSession对象,用于创建WebSocket消息
WebSocketSession session = handler.getSession();
// 发送文本类型WebSocket消息
sink.next(new WebSocketMessage(WebSocketMessage.Type.TEXT, session.bufferFactory().wrap("Text message".getBytes())));
// 发送二进制类型WebSocket消息
sink.next(new WebSocketMessage(WebSocketMessage.Type.BINARY, session.bufferFactory().wrap("Binary message".getBytes())));
// 发送Ping类型WebSocket消息
sink.next(new WebSocketMessage(WebSocketMessage.Type.PING, session.bufferFactory().wrap("Ping message".getBytes())));
// 发送Pong类型WebSocket消息
sink.next(new WebSocketMessage(WebSocketMessage.Type.PONG, session.bufferFactory().wrap("Pong message".getBytes())));
其中,WebSocketSession 常用方法的详细介绍:
- Mono<Void> send(Publisher<WebSocketMessage> messages);
- 功能:向客户端或服务端发送 WebSocket 消息(文本、二进制、ping/pong)。
- 参数说明:messages: 一个 Publisher<WebSocketMessage>,可以是单条消息(Mono)或多条消息流(Flux)。
- 示例:session.send(Flux.just(session.textMessage("Hello Client!"))).subscribe();
- Flux<WebSocketMessage> receive();
- 功能:返回一个 Flux<WebSocketMessage>,用于监听从对端发来的 WebSocket 消息。
- 可获取的信息:消息类型:message.getType(),包括:
- WebSocketMessage.Type.TEXT
- WebSocketMessage.Type.BINARY
- WebSocketMessage.Type.PING
- WebSocketMessage.Type.PONG
- 消息内容:通过 message.getPayload() 获取 DataBuffer,可转换为字符串或字节数组
- 示例:
session.receive().doOnNext(message -> {if (message.getType() == WebSocketMessage.Type.TEXT) {String text = message.getPayloadAsText();System.out.println("收到文本消息:" + text);}}).subscribe();
- Mono<CloseStatus> closeStatus();
- 功能:返回一个 Mono<CloseStatus>,表示该连接关闭的原因(例如正常关闭、异常关闭等)。
- 示例:
session.closeStatus().doOnNext(closeStatus -> {System.out.println("连接关闭原因: " + closeStatus);}).subscribe();
- CloseStatus 常见值 (详见 org.springframework.web.reactive.socket.CloseStatus):
状态码 | 含义 |
1000 | 正常关闭 |
1001 | 对端离开(如服务器关闭) |
1002 | 协议错误 |
1003 | 不接受的消息类型 |
1005 | 没有状态码收到 |
1006 | 异常中断(如网络断开) |
1007 | 收到的消息数据不符合预期格式 |
1011 | 服务器内部错误 |
- String getId();
- 功能:获取当前 WebSocket 连接的唯一 ID(String 类型)。
- 示例:System.out.println("连接ID: " + session.getId());
- HandshakeInfo getHandshakeInfo();
- 功能:获取握手信息,包括请求头、URI、子协议等。
- 示例:
WebSocketSession.HandshakeInfo handshakeInfo = session.getHandshakeInfo();
System.out.println("请求地址: " + handshakeInfo.getUri());
System.out.println("用户代理: " + handshakeInfo.getHeaders().getFirst("User-Agent"));
System.out.println("使用的子协议: " + handshakeInfo.getSubProtocols());
- DataBufferFactory bufferFactory();
- 功能:获取 DataBufferFactory,用于创建 DataBuffer 实例,用于构造 WebSocket 消息。
- 示例:DataBuffer buffer = session.bufferFactory().wrap("Hello".getBytes(StandardCharsets.UTF_8));
注意:
- client.execute(URI.create(uri), handler).subscribe(); 为异步非阻塞方法,一定要确保连接后再发送消息。
7. WebSocket 服务端
7.1 基于 javax.websocket(标准 API)
1. 添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 定义 WebSocket 服务端组件
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;@Slf4j
@Component
@ServerEndpoint("/ws/{sid}")
public class JavaxWebSocketServerEndpoint {//用于存储会话的映射,键为客户端标识符,值为对应的会话对象private static final Map<String, Collection<Session>> sessionMap = new ConcurrentHashMap<>();@OnOpenpublic void onOpen(Session session, EndpointConfig endpointConfig, @PathParam("sid") String sid) {//连接建立//将客户端标识符与对应的会话对象存入映射,以便后续管理sessionMap.computeIfAbsent(sid, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())).add(session);}@OnClosepublic void onClose(Session session, CloseReason reason, @PathParam("sid") String sid){//连接关闭//从sessionMap中移除对应的会话对象Collection<Session> sessions = sessionMap.get(sid);if (sessions != null) {sessions.remove(session);if (sessions.isEmpty()) {sessionMap.remove(sid);}}}@OnErrorpublic void onError(Session session, Throwable e, @PathParam("sid") String sid) {//异常处理}@OnMessagepublic void onMessage(Session session, String message, @PathParam("sid") String sid) {//接收文本信息}@OnMessagepublic void onMessage(Session session, PongMessage message, @PathParam("sid") String sid) {//接收pong信息//解析示例ByteBuffer buffer = message.getApplicationData();if (buffer == null || buffer.remaining() <= 0) {log.info("收到来自客户端:{} 的ByteBuffer消息: <empty>", sid);return;}String text = StandardCharsets.UTF_8.decode(buffer).toString();log.info("收到来自客户端:{} 的pong消息: {}", sid, text);}@OnMessagepublic void onMessage(Session session, ByteBuffer message, @PathParam("sid") String sid) {//接收二进制信息,也可以用byte[]接收,解析类似于PongMessage示例}/*** 向所有客户端发送消息** @param message 要发送的消息内容*/public static void sendToAllClient(String message) {// 获取当前所有客户端的会话for (Collection<Session> sessions : sessionMap.values()) {for (Session session : sessions) {try {// 服务器向客户端发送消息session.getBasicRemote().sendText(message);} catch (Exception e) {// 打印异常信息,以便调试和错误追踪log.error("向客户端发送消息失败:", e);}}}}
}
3. 定义配置类,注册WebSocket的服务端组件
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** WebSocket配置类,用于注册WebSocket的Bean*/
@Configuration
public class WebSocketConfiguration {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}
其中:
- @OnOpen:
- 作用:标记的方法会在 WebSocket 连接建立时被调用。
- 适用方法签名:
public void onOpen(Session session)
或包含更多参数:
public void onOpen(Session session, EndpointConfig config)
- @OnClose:
- 作用: 标记的方法会在 WebSocket 连接关闭时被调用。
- 适用方法签名:
public void onClose(Session session, CloseReason closeReason)
- @OnError:
- 作用: 标记的方法会在 WebSocket 发生异常时被调用。
- 适用方法签名:
public void onError(Session session, Throwable throwable)
- @OnMessage:
- 作用: 标记的方法会在收到 WebSocket 消息时被调用。支持文本、二进制、Pong 等消息类型。
- 适用方法签名:
接收文本消息:
public void onTextMessage(String message)
接收二进制消息:
public void onBinaryMessage(ByteBuffer data)
接收 Pong 消息(响应 Ping):
public void onPongMessage(PongMessage message)
- @ClientEndpoint 支持四个参数:
- subprotocols:
- 类型:String[]
- 默认值:空数组
- 说明:声明客户端支持的子协议列表,在握手阶段与服务端协商使用哪个协议。
- 支持类型:
- chat:简单文本聊天协议
- graphql-ws:Apollo、GraphQL 订阅
- wamp:实时通信、RPC、发布/订阅
- soap:Web 服务调用
- mqtt:轻量级物联网消息协议
- 自定义命名协议版本
- ...
- 握手流程中的行为:(以subprotocols = {"chat", "superchat"}为例)
- 客户端在握手请求中携带:Sec-WebSocket-Protocol: chat, superchat
- 服务端从自己的支持列表中查找是否有匹配项,如果有,就返回选中的协议,例如:Sec-WebSocket-Protocol: chat
- 连接建立后,双方将基于选中的协议格式进行通信。
- decoders:
- 类型:Class<? extends Decoder>[]
- 默认值:空数组
- 说明:指定该端点支持的消息解码器类,用于将接收到的原始消息(如文本或二进制)转换为 Java 对象。
- encoders:
- 类型:Class<? extends Encoder>[]
- 默认值:空数组
- 说明:指定该端点支持的消息编码器类,用于将 Java 对象转换为发送的原始格式(如 JSON 字符串)。
- configurator:
- 类型:Class<? extends ServerEndpointConfig.Configurator>
- 默认值:ServerEndpointConfig.Configurator.class
- 说明:指定自定义的配置类,在 WebSocket 端点实例化和握手过程中进行自定义控制。
- subprotocols:
以配置 configurator 为例:
1. 使用的工具类(为了让 JavaxWebSocketServerEndpoint 可以注入、依赖 Spring Bean):
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;@Component
public class SpringContext implements ApplicationContextAware {private static ApplicationContext context;public static <T> T getBean(Class<T> beanClass) {return context.getBean(beanClass);}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {context = applicationContext;}
}
2. 创建 ServerEndpointConfig.Configurator 子类
import com.zjp.websocketdemo.utils.SpringContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.http.HttpHeaders;import javax.websocket.Extension;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import java.util.List;@Slf4j
public class WebSocketConfigurator extends ServerEndpointConfig.Configurator {@Overridepublic String getNegotiatedSubprotocol(List<String> supported, List<String> requested) {// 重写目的:强制使用某种协议(如 JSON、STOMP)、支持多协议并选择最优解、记录子协议协商日志、拒绝某些不安全或不兼容的协议return super.getNegotiatedSubprotocol(supported, requested);}@Overridepublic List<Extension> getNegotiatedExtensions(List<Extension> installed, List<Extension> requested) {// 重写目的:决定客户端和服务端使用哪些 WebSocket 扩展进行通信。return super.getNegotiatedExtensions(installed, requested);}@Overridepublic boolean checkOrigin(String originHeaderValue) {// 重写目的:防止跨域 WebSocket 请求攻击(CSRF 防护),控制哪些域名/来源可以连接你的 WebSocket 服务。return super.checkOrigin(originHeaderValue);}@Overridepublic <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {// 重写目的:控制 WebSocket 端点(如 WebSocketServer)的实例化方式。默认是通过反射创建实例的,不能注入、依赖 Spring Bean。try {// 从 Spring 获取 Beanreturn SpringContext.getBean(clazz);} catch (BeansException e) {// 如果没有注册为 Spring Bean,则 fallback 到默认方式创建return super.getEndpointInstance(clazz);}}@Overridepublic void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {// 重写目的:在 WebSocket 客户端和服务端进行握手时,可以读取请求头、设置用户属性、添加响应头等。log.info("secWebSocketProtocol:{}", request.getHeaders().get(HandshakeRequest.SEC_WEBSOCKET_PROTOCOL));// 获取请求头log.info("Authorization:{}", request.getHeaders().get(HttpHeaders.AUTHORIZATION));// 获取请求参数,用=连接log.info("queryString:{}", request.getQueryString());// 获取请求路径log.info("uri:{}", request.getRequestURI());// 获取参数,map格式,包含请求参数及路径参数log.info("parameterMap:{}", request.getParameterMap());}
}
3. @ServerEndpoint 添加 configurator = WebSocketConfigurator.class
注意:
- JavaxWebSocketServerEndpoint 一定是空参构造器,不要用带参构造器!
- 触发 @OnError 会自动断开连接。
- JavaxWebSocketServerEndpoint 默认是通过反射创建的实例,不能注入、依赖 Spring Bean,只有配置 configurator 并重写 getEndpointInstance 方法才能被 Spring 管理;
- javax.websocket库中定义了PongMessage而没有PingMessage。所有的WebSocket包括前js自带的,都实现了自动回复。也就是说当接收到一个ping消息之后,是会自动回应一个pong消息,所以没有必要再自己接受ping消息来处理了,即我们不会接受到ping消息。
说明:
- Session 、EndpointConfig 参数可以省略。
7.2 基于 Spring WebMVC
1. 添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2. 定义 WebSocket 服务端组件
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;// 不想被 Spring 管理可以省略 @Component
@Component
public class ServletWebSocketServerHandler implements WebSocketHandler {//用于存储会话的映射,键为客户端标识符,值为对应的会话对象private static final Map<String, Collection<WebSocketSession>> sessionMap = new ConcurrentHashMap<>();@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {// WebSocket 连接建立后的回调方法// 可在此处添加连接成功后的初始化逻辑,例如发送登录消息或订阅特定主题//将客户端标识符与对应的会话对象存入映射,以便后续管理String path = Objects.requireNonNull(session.getUri()).getPath();String sid = path.substring(path.lastIndexOf('/') + 1);sessionMap.computeIfAbsent(sid, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())).add(session);}@Overridepublic void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {// 收到服务端发送的消息时触发// 参数 message 包含了消息内容,可通过 message.getPayload() 获取实际数据// 可在此处添加对不同类型消息的处理逻辑;}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {// WebSocket 通信过程中发生传输错误时调用(如网络中断)// 可记录日志、尝试重连或通知用户}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {// WebSocket 连接关闭后触发// 参数 closeStatus 提供了关闭原因,可用于判断是否异常断开// 可用于清理资源或触发自动重连机制//从sessionMap中移除对应的会话对象String path = Objects.requireNonNull(session.getUri()).getPath();String sid = path.substring(path.lastIndexOf('/') + 1);Collection<WebSocketSession> sessions = sessionMap.get(sid);if (sessions != null) {sessions.remove(session);if (sessions.isEmpty()) {sessionMap.remove(sid);}}}@Overridepublic boolean supportsPartialMessages() {// 返回当前处理器是否支持接收部分消息(即分片消息)// 默认返回 false 表示不处理分片消息// 若需处理文本或二进制的大消息分片,可返回 true 并实现 PartialMessageHandler 接口return false;}/*** 向所有客户端发送消息** @param message 要发送的消息内容*/public static void sendToAllClient(String message) {// 获取当前所有客户端的会话for (Collection<WebSocketSession> sessions : sessionMap.values()) {for (WebSocketSession session : sessions) {try {// 服务器向客户端发送消息session.sendMessage(new TextMessage(message));} catch (Exception e) {// 打印异常信息,以便调试和错误追踪log.error("向客户端发送消息失败:", e);}}}}
}
3. 定义配置类,注册 WebSocket 的服务端组件
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {private final ServletWebSocketServerHandler handler;@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry// 注册 WebSocket 处理器// 如果不想被 Spring 管理,则直接创建对象
// .addHandler(new ServletWebSocketServerHandler(), "/ws/{sid}").addHandler(handler, "/ws/{sid}")// 添加拦截器.addInterceptors(new ServletWebSocketServerInterceptor())// 允许跨域.setAllowedOrigins("*");}
}
4. 创建拦截器
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import java.util.Map;@Slf4j
public class ServletWebSocketServerInterceptor implements HandshakeInterceptor {/*** 在WebSocket握手之前执行的回调方法* 该方法用于在握手前记录一些重要的请求信息,如URI、Authorization头、本地地址和远程地址** @param request ServerHttpRequest对象,表示客户端的请求* @param response ServerHttpResponse对象,表示服务端的响应* @param wsHandler WebSocketHandler对象,表示处理WebSocket消息的处理器* @param attributes 一个Map对象,用于在握手过程中存储属性* @return 返回true,表示握手过程可以继续* @throws Exception 如果记录信息时发生错误,可能抛出异常*/@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {// 记录请求的URIlog.info("uri: {}", request.getURI());// 记录请求的Authorization头信息log.info("Authorization:{}", request.getHeaders().get(HttpHeaders.AUTHORIZATION));// 记录本地地址信息log.info("localAddress:{}", request.getLocalAddress());// 记录远程地址信息log.info("remoteAddress:{}", request.getRemoteAddress());// 允许握手过程继续return true;}/*** 在WebSocket握手完成后调用的方法* 该方法用于在握手过程结束后进行一些自定义的处理操作,无论握手是否成功** @param request ServerHttpRequest对象,表示客户端发起的请求* @param response ServerHttpResponse对象,表示服务器返回的响应* @param wsHandler WebSocketHandler对象,表示处理WebSocket消息的处理器* @param exception 握手过程中可能发生的异常,如果没有异常,则为null*/@Overridepublic void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {}
}
注意:
- 触发 @OnError 会自动断开连接。
- 不支持通配符。
7.3 基于 Spring WebFlux
1. 添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2. 定义 WebSocket 服务端组件
同 WebSocket 客户端组件
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;import java.nio.charset.StandardCharsets;// 如果不想被 Spring 管理,@Component 可以省略
@Component
public class ReactiveWebSocketServerHandler implements WebSocketHandler {private FluxSink<WebSocketMessage> sink;private WebSocketSession session;public FluxSink<WebSocketMessage> getSink() {return sink;}public WebSocketSession getSession() {return session;}@Overridepublic Mono<Void> handle(WebSocketSession session) {this.session = session;Mono<Void> send = session.send(Flux.create(sink -> {this.sink = sink;})).doOnError(it -> {//异常处理});Mono<Void> receive = session.receive().doOnNext(it -> {//接收消息}).doOnError(it -> {//异常处理}).then();@SuppressWarnings("all")Disposable disposable = session.closeStatus().doOnNext(closeStatus -> {// 连接关闭,closeStatus:关闭状态码}).doOnError(it -> {//异常处理}).subscribe(it -> {//连接关闭});return Mono.zip(send, receive).then();}
}
3. 定义配置类,注册 WebSocket 的服务端组件
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;import java.util.HashMap;
import java.util.Map;@EnableWebFlux
@Configuration
@RequiredArgsConstructor
public class ReactiveWebSocketConfiguration {private final ReactiveWebSocketServerHandler handler;@Beanpublic HandlerMapping webSocketHandlerMapping() {Map<String, WebSocketHandler> map = new HashMap<>();map.put("/ws/**", handler);SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();handlerMapping.setUrlMap(map);handlerMapping.setOrder(0); // 值越小优先级越高,默认为Integer.MAX_VALUE,容易被覆盖return handlerMapping;}@Beanpublic WebSocketHandlerAdapter webSocketHandlerAdapter() {return new WebSocketHandlerAdapter();}
}
4. 配置拦截器(如果需要鉴权等操作配置,可选)
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;@Slf4j
@Component
public class MyWebFluxFilter implements WebFilter {/*** 自定义过滤器方法* 该方法用于处理所有传入的Web请求,并根据请求的URI路径进行特定的处理** @param exchange 服务器Web交换对象,包含请求和响应的所有信息* @param chain Web过滤链对象,用于将当前过滤器融入到过滤器链中* @return Mono<Void> 返回一个Mono类型的Void对象,表示异步处理完成*/@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {// 检查请求URI的路径是否以"/ws/"开头,用于识别WebSocket请求或其他特定请求if (exchange.getRequest().getURI().getPath().startsWith("/ws/")) {// 记录请求的URI,用于调试和监控log.info("URI: {}", exchange.getRequest().getURI());// 记录请求的Authorization头信息,用于验证和授权log.info("Authorization: {}", exchange.getRequest().getHeaders().get(HttpHeaders.AUTHORIZATION));}// 继续过滤器链中的下一个过滤器,确保请求继续被处理return chain.filter(exchange);}
}
注意:
以下两个依赖会导致 @EnableWebFlux 循环依赖,不要同时出现:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId> </dependency>
8. Nginx 代理 WebSocket 服务端
在使用 Nginx 作为反向代理服务器时,需要进行一些特殊配置才能正确地代理 WebSocket 连接。WebSocket 协议在客户端和服务器之间建立一个长期运行的 TCP 连接,用于实现全双工通信。Nginx 从 1.3.13 版本开始支持代理 WebSocket 连接。
http {# 其他配置(此处省略)server {listen 80; # 监听端口server_name localhost; # 服务名# 其他配置(此处省略)# websocket 配置以 /ws/{sid} 为例location /ws/ {proxy_pass http://localhost:8080; # 将请求代理到上游服务器proxy_http_version 1.1; # 使用 HTTP/1.1proxy_set_header Upgrade $http_upgrade; # 设置 Upgrade 头proxy_set_header Connection "upgrade"; # 设置 Connection 头proxy_set_header Host $host; # 设置 Host 头proxy_set_header X-Real-IP $remote_addr; # 设置真实 IPproxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # 设置转发的 IPproxy_set_header X-Forwarded-Proto $scheme; # 设置转发的协议 proxy_connect_timeout 60s; # 设置连接超时(可根据需要调整)proxy_read_timeout 86400s; # 设置读取超时,避免长时间不活动断开连接proxy_send_timeout 86400s; # 设置发送超时proxy_buffering off; # 禁用缓冲区以确保实时传输}}
}
说明:
proxy_connect_timeout 60s; # 设置连接超时(可根据需要调整) proxy_read_timeout 86400s; # 设置读取超时,避免长时间不活动断开连接 proxy_send_timeout 86400s; # 设置发送超时
这几个配置影响 websocket 的连接时长,设置太短客户端会自动断开连接,服务端并报错:
java.io.EOFException: nullat org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.fillReadBuffer(NioEndpoint.java:1340) ~[tomcat-embed-core-9.0.68.jar:9.0.68]at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.read(NioEndpoint.java:1227) ~[tomcat-embed-core-9.0.68.jar:9.0.68]at org.apache.tomcat.websocket.server.WsFrameServer.onDataAvailable(WsFrameServer.java:75) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]at org.apache.tomcat.websocket.server.WsFrameServer.doOnDataAvailable(WsFrameServer.java:183) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]at org.apache.tomcat.websocket.server.WsFrameServer.notifyDataAvailable(WsFrameServer.java:162) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]at org.apache.tomcat.websocket.server.WsHttpUpgradeHandler.upgradeDispatch(WsHttpUpgradeHandler.java:157) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]at org.apache.coyote.http11.upgrade.UpgradeProcessorInternal.dispatch(UpgradeProcessorInternal.java:60) ~[tomcat-embed-core-9.0.68.jar:9.0.68]at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:59) ~[tomcat-embed-core-9.0.68.jar:9.0.68]at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) ~[tomcat-embed-core-9.0.68.jar:9.0.68]at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1789) ~[tomcat-embed-core-9.0.68.jar:9.0.68]at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.68.jar:9.0.68]at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) [tomcat-embed-core-9.0.68.jar:9.0.68]at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) [tomcat-embed-core-9.0.68.jar:9.0.68]at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.68.jar:9.0.68]at java.lang.Thread.run(Thread.java:750) [na:1.8.0_441]