基于vue3和springboot框架集成websocket
1.前端
在utils 新建 websocket.ts
import { ElMessage } from 'element-plus';
import emitter, { messages } from '/@/utils/mitt';
import { useUserInfo } from '/@/stores/userInfo';/*** @Description WebSocket 状态管理类* @Date 2025-09-01* @Author zangqi**/
export interface WebSocketState {ws: WebSocket | null;lockReconnect: boolean;timeout: number;timeoutObj: number | null;serverTimeoutObj: number | null;timeoutNum: number | null;
}// WebSocket 配置接口
export interface WebSocketConfig {url: string;userId: string;token: string;timeout?: number;
}// 创建 WebSocket 管理类
export class WebSocketManager {private state: WebSocketState;private config: WebSocketConfig;private reconnectCallback?: () => void;constructor(config: WebSocketConfig) {this.config = {timeout: 60000,...config};this.state = {ws: null,lockReconnect: false,timeout: this.config.timeout!,timeoutObj: null,serverTimeoutObj: null,timeoutNum: null};}// 设置重连回调public setReconnectCallback(callback: () => void): void {this.reconnectCallback = callback;}// 初始化 WebSocket 连接public init(): void {console.log("初始化 WebSocket");if (!('WebSocket' in window)) {ElMessage.error('浏览器不支持 WebSocket');return;}try {const wsUrl = `${this.config.url}websocket/${this.config.userId}?access_token=${this.config.token}`;this.state.ws = new WebSocket(wsUrl);this.state.ws.onopen = (event: Event) => {console.log('SOCKET 链接:', event);this.checkHeatBeat();};this.state.ws.onmessage = (event: MessageEvent) => {console.log('SOCKET 接收信息:', event);const content = event.data;if (content === "心跳检测") {console.log("websocket 接收到服务器信息,心跳重置");this.reset();} else {if (content === "连接成功") {console.log("websocket ping...");} else {try {const json = JSON.parse(content);console.log("发送事件");emitter.emit(messages.system, json);} catch (e) {console.error('解析 WebSocket 消息失败:', e);}}}};this.state.ws.onerror = (event: Event) => {console.log('websocket 错误信息:', event);this.reconnect();};this.state.ws.onclose = (event: CloseEvent) => {console.log('websocket 已关闭连接:', event);this.reconnect();};} catch (error) {console.error('WebSocket 初始化失败:', error);this.reconnect();}}// 心跳检测private checkHeatBeat(): void {if (this.state.timeoutObj) clearTimeout(this.state.timeoutObj);if (this.state.serverTimeoutObj) clearTimeout(this.state.serverTimeoutObj);this.state.timeoutObj = window.setTimeout(() => {if (this.state.ws && this.state.ws.readyState === WebSocket.OPEN) {console.log("websocket 心跳检测...");const userInfoStore = useUserInfo();const userInfos = userInfoStore.userInfos;const message = {"form": userInfos.id,"username": userInfos.username,"content": "心跳检测","to": userInfos.id,};try {this.state.ws.send(JSON.stringify(message));} catch (error) {console.error('发送心跳失败:', error);this.reconnect();}} else {console.log("websocket 断线重连中...");this.reconnect();}this.state.serverTimeoutObj = window.setTimeout(() => {if (this.state.ws) {this.state.ws.close();}}, this.state.timeout);}, this.state.timeout);}// 重连机制private reconnect(): void {if (this.state.lockReconnect) {return;}this.state.lockReconnect = true;if (this.state.timeoutNum) clearTimeout(this.state.timeoutNum);this.state.timeoutNum = window.setTimeout(() => {// 执行重连回调(如果提供)if (this.reconnectCallback) {this.reconnectCallback();}// 重新初始化连接this.init();this.state.lockReconnect = false;}, 5000);}// 重置心跳private reset(): void {if (this.state.timeoutObj) clearTimeout(this.state.timeoutObj);if (this.state.serverTimeoutObj) clearTimeout(this.state.serverTimeoutObj);this.checkHeatBeat();}// 发送消息public send(message: any): boolean {if (this.state.ws && this.state.ws.readyState === WebSocket.OPEN) {try {this.state.ws.send(JSON.stringify(message));return true;} catch (error) {console.error('发送消息失败:', error);return false;}}return false;}// 关闭连接public close(): void {if (this.state.ws) {this.state.ws.close();}// 清除所有定时器if (this.state.timeoutObj) clearTimeout(this.state.timeoutObj);if (this.state.serverTimeoutObj) clearTimeout(this.state.serverTimeoutObj);if (this.state.timeoutNum) clearTimeout(this.state.timeoutNum);}// 获取连接状态public getReadyState(): number {return this.state.ws ? this.state.ws.readyState : WebSocket.CLOSED;}// 检查是否连接public isConnected(): boolean {return this.state.ws !== null && this.state.ws.readyState === WebSocket.OPEN;}
}// 创建 WebSocket 管理器实例的工厂函数
export function createWebSocketManager(userId: string, token: string, url?: string): WebSocketManager {const config: WebSocketConfig = {url: url || import.meta.env.VITE_SOCKET_URL,userId: userId,token: token};return new WebSocketManager(config);
}// 默认导出
export default WebSocketManager;
2.使用
import WebSocketManager, { createWebSocketManager } from '/@/utils/websocket';
const { userInfos } = storeToRefs(stores);
let websocketManager: WebSocketManager | null = null;
// 初始化页面 WebSocket
const initWebsocket = () => {if (userInfos.value && userInfos.value.id) {websocketManager = createWebSocketManager(userInfos.value.id,Session.get('token'));// 设置重连回调websocketManager.setReconnectCallback(() => {console.log('WebSocket 重连中...');});websocketManager.init();}
};
// 页面加载时
onMounted(() => {//初始化页面websocketinitWebsocket();
});
2.后端
1.新建一个配置类 WebSocketConfig
@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
2.新建一个服务service WebSocketServer
package cn.codesys.notification.center.service;import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.idev.excel.util.StringUtils;
import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;/*** @author zangqi* @Description* @data 2025/9/1 15:32*/
@Slf4j
@ServerEndpoint(value = "/websocket/{userId}")
@Component
public class WebSocketServer {/*** concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。*/private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*/private Session session;/*** 接收userId*/private String userId = "";/*** 连接建立成* 功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId) {this.session = session;this.userId = userId;if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);//加入set中webSocketMap.put(userId, this);} else {//加入set中webSocketMap.put(userId, this);}log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());sendMessage("连接成功");}/*** 连接关闭* 调用的方法*/@OnClosepublic void onClose() {if (webSocketMap.containsKey(userId)) {webSocketMap.remove(userId);}log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());}/*** 收到客户端消* 息后调用的方法** @param message 客户端发送过来的消息**/@OnMessagepublic void onMessage(String message, Session session) {log.info("用户消息:" + userId + ",报文:" + message);JSONObject jsonObject = JSONUtil.parseObj(message);String content = (String) jsonObject.get("content");String form = (String) jsonObject.get("form");sendMsgByUserId(content, form);}/*** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());error.printStackTrace();}/*** 实现服务* 器主动推送*/public void sendMessage(String message) {try {this.session.getBasicRemote().sendText(message);} catch (IOException e) {e.printStackTrace();}}/*** 发送自定义消息***/public static void sendMsgByUserId(String message, String userId) {log.info("发送消息到:" + userId + ",报文:" + message);if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {webSocketMap.get(userId).sendMessage(message);} else {log.error("用户" + userId + ",不在线!");}}/*** 群发自定义消息***/public static void sendMsgByUserIdList(String message, List<String> userList) {userList.forEach(userId -> {log.info("发送消息到:" + userId + ",报文:" + message);if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {webSocketMap.get(userId).sendMessage(message);} else {log.error("用户" + userId + ",不在线!");}});}/*** 获得此时的* 在线人数** @return*/public static synchronized int getOnlineCount() {return webSocketMap.size();}
}
3.推送消息
webSocketServer.sendMsgByUserIdList(JSONUtil.toJsonStr(ncNotify), userIds);