SpringBoot3 + Netty + Vue3 实现消息推送(最新)
SpringBoot3 + Netty + Vue3 实现消息推送
- 1、效果展示
- 2、依赖引入
- 3、后端代码
- 4、后端使用推送
- 5、前端代码
- 5.1 websocket.js
- 5.2 login.vue
1、效果展示
2、依赖引入
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.1</version></parent><!-- Netty 核心 -->
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.107.Final</version>
</dependency>
3、后端代码
MessageVO
package vip.xiaonuo.biz.modular.websocket;import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Map;/*** @Author 小钟* @Date 2025/7/30 16:06* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageVO {private Long unread;private Map<String, String> unreadByUserList;private String title;private String type;private String content;private String extJson;@Overridepublic String toString() {return new Gson().toJson(this);}
}
NettyWebSocketServer
package vip.xiaonuo.biz.modular.websocket;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Author 小钟* @Date 2025/7/30 15:26* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
@Component
public class NettyWebSocketServer {@Autowiredprivate WebSocketConfigProperties configProperties;@Autowiredprivate WebSocketChannelInitializer webSocketChannelInitializer;public void start() {int port = configProperties.getPort();EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(webSocketChannelInitializer);ChannelFuture f = b.bind(port).sync();System.out.println("Netty WebSocket 启动成功,端口:" + port);f.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}@PostConstructpublic void init() {new Thread(this::start).start();}
}
WebSocketChannelInitializer
package vip.xiaonuo.biz.modular.websocket;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Author 小钟* @Date 2025/7/30 15:26* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
@Component
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {@Autowiredprivate WebSocketHandler webSocketHandler;@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));pipeline.addLast(webSocketHandler);}
}
WebSocketConfigProperties
package vip.xiaonuo.biz.modular.websocket;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @Author 小钟* @Date 2025/7/30 18:35* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketConfigProperties {private int port;
}
WebSocketHandler
package vip.xiaonuo.biz.modular.websocket;import com.alibaba.fastjson2.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import vip.xiaonuo.biz.modular.oamsmessage.service.OamsDevMessageService;import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** @Author 小钟* @Date 2025/7/30 15:27* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Autowiredprivate OamsDevMessageService oamsDevMessageService;// userId -> 多个连接public static final Map<String, Set<Channel>> userChannelMap = new ConcurrentHashMap<>();// channelId -> userId(用于断开连接时清理)public static final Map<String, String> channelUserMap = new ConcurrentHashMap<>();@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {Channel channel = ctx.channel();String channelId = channel.id().asShortText();String userId = channelUserMap.remove(channelId);if (userId != null) {Set<Channel> channels = userChannelMap.get(userId);if (channels != null) {channels.remove(channel);if (channels.isEmpty()) {userChannelMap.remove(userId);}}}System.out.println("连接断开:" + channelId);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {String json = msg.text();JSONObject obj = JSONObject.parseObject(json);String type = obj.getString("type");String userId = obj.getString("userId");if ("bind".equals(type)) {Channel channel = ctx.channel();userChannelMap.computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet()).add(channel);channelUserMap.put(channel.id().asShortText(), userId);channel.writeAndFlush(new TextWebSocketFrame("用户 [" + userId + "] 绑定成功"));// 推送消息提示Long unreadNumber = oamsDevMessageService.unreadNumber(userId);MessageVO pushMessageVO = new MessageVO();pushMessageVO.setUnread(unreadNumber);pushMessageVO.setType("unreadNumber");WebSocketPushUtil.pushToUser(userId, String.valueOf(pushMessageVO));} else if ("ping".equals(type)) {ctx.channel().writeAndFlush(new TextWebSocketFrame("pong"));} else {ctx.channel().writeAndFlush(new TextWebSocketFrame("未知类型"));}}
}
WebSocketPushUtil
package vip.xiaonuo.biz.modular.websocket;import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import java.util.Collection;
import java.util.Map;
import java.util.Set;/*** @Author 小钟* @Date 2025/7/30 15:27* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
public class WebSocketPushUtil {// 推送给一个用户所有连接public static void pushToUser(String userId, String msg) {Set<Channel> channels = WebSocketHandler.userChannelMap.get(userId);if (channels != null) {for (Channel channel : channels) {if (channel.isActive()) {channel.writeAndFlush(new TextWebSocketFrame(msg));}}}}// 推送给多个用户public static void pushToUsers(Collection<String> userIds, MessageVO pushMessageVO) {Map<String, String> unreadByUserList = pushMessageVO.getUnreadByUserList();for (String userId : userIds) {String unreadStr = unreadByUserList.getOrDefault(userId, "0");MessageVO vo = new MessageVO();vo.setType(pushMessageVO.getType());vo.setUnread(Long.parseLong(unreadStr));vo.setExtJson(pushMessageVO.getExtJson());vo.setTitle(pushMessageVO.getTitle());vo.setContent(pushMessageVO.getContent());pushToUser(userId, vo.toString());}}// 广播给所有在线用户public static void broadcast(String msg) {for (Set<Channel> channels : WebSocketHandler.userChannelMap.values()) {for (Channel channel : channels) {if (channel.isActive()) {channel.writeAndFlush(new TextWebSocketFrame(msg));}}}}
}
application.yml
server:port: 82servlet:context-path: /eamwebsocket:port: 9001
4、后端使用推送
// 推送申请通过消息(单个)
Long unreadNumber = messageService.unreadNumber();
MessageVO pushMessageVO = new MessageVO();
pushMessageVO.setUnread(unreadNumber);
pushMessageVO.setTitle("同意 ".concat(apply.getActionType()).concat(" 申请"));
pushMessageVO.setType("message");
pushMessageVO.setContent(htmlContent);
WebSocketPushUtil.pushToUser(apply.getApplicantId(), String.valueOf(pushMessageVO));// 推送消息提示(多个)
Map<String, String> unreadNumber = oamsDevMessageService.unreadNumber(reviewerIdList);
MessageVO pushMessageVO = new MessageVO();
pushMessageVO.setUnreadByUserList(unreadNumber);
pushMessageVO.setTitle(user.getName().concat(" 提交了 ").concat(apply.getActionType()).concat(" 申请"));
pushMessageVO.setType("message");
pushMessageVO.setContent(htmlContent);
WebSocketPushUtil.pushToUsers(reviewerIdList, pushMessageVO);
5、前端代码
5.1 websocket.js
/** @Date: 2025-07-30 15:45:06* @LastEditors: zhong* @LastEditTime: 2025-08-20 11:22:11* @FilePath: \snowy-admin-web\src\utils\websocket.js*/
// src/utils/websocket.js
// websocketClient.js
import { useMessageStore } from "@/store/messageStore.js";
import { notification } from "ant-design-vue";let socket = null;
let reconnectTimer = null;
let isManualClose = false; // 是否人为关闭
// 创建 WebSocket 连接
export const createWebSocket = (userId) => {const messageStore = useMessageStore();if (socket && socket.readyState === WebSocket.OPEN) {console.log("🟡 WebSocket 已连接,跳过重复连接");return;}const wsUrl = `ws://${import.meta.env.VITE_WEBSOCKET_BASEURL}:${import.meta.env.VITE_WEBSOCKET_PORT}/ws`;isManualClose = false;socket = new WebSocket(wsUrl);socket.onopen = () => {console.log("✅ WebSocket 连接成功");socket.send(JSON.stringify({ type: "bind", userId }));};socket.onmessage = (event) => {try {const msg = JSON.parse(event.data);if (msg.type === "unreadNumber" || msg.type === "message") {console.log("📨 收到消息:", msg);messageStore.addMessage(msg);if (msg.type === "message") {notification.success({message: "新消息提醒",description: msg.title || msg.content,duration: 4});}}} catch (e) {console.warn("❌ 消息解析失败:", event.data);}};socket.onerror = () => {console.warn("WebSocket 连接出错");reconnect(userId);};socket.onclose = (e) => {console.warn("WebSocket 已关闭");if (!isManualClose) {reconnect(userId);}};
};// 主动断开 WebSocket 连接
export const closeWebSocket = () => {isManualClose = true;if (socket) {console.log("🔌 手动关闭 WebSocket");socket.close();socket = null;}clearTimeout(reconnectTimer);
};// 重连机制
const reconnect = (userId) => {clearTimeout(reconnectTimer);reconnectTimer = setTimeout(() => {console.log("♻️ WebSocket 尝试重连...");createWebSocket(userId);}, 3000);
};// 获取当前连接状态
export const getWebSocketStatus = () => {if (!socket) return "未连接";switch (socket.readyState) {case WebSocket.CONNECTING:return "连接中";case WebSocket.OPEN:return "已连接";case WebSocket.CLOSING:return "正在关闭";case WebSocket.CLOSED:return "已关闭";default:return "未知状态";}
};
5.2 login.vue
登录时需要初始化 websocket 连接
const login = async () => {loginForm.value.validate().then(async () => {loading.value = trueconst loginData = {account: ruleForm.account,// 密码进行SM2加密,传输过程中看到的只有密文,后端存储使用hashpassword: smCrypto.doSm2Encrypt(ruleForm.password),validCode: ruleForm.validCode,validCodeReqNo: ruleForm.validCodeReqNo}// 获取tokentry {const loginToken = await loginApi.login(loginData)await afterLogin(loginToken)const userId = tool.data.get("USER_INFO")?.idconsole.log(userId);if (userId) {createWebSocket(userId, (msg) => {// 🔔 这里处理消息(比如展示通知)console.log("💬 WebSocket 消息:", msg)// 你也可以触发通知组件或消息中心})}} catch (err) {loading.value = falseif (captchaOpen.value === 'true') {loginCaptcha()}}}).catch(() => { })
}