当前位置: 首页 > backend >正文

SpringBoot3 + Netty + Vue3 实现消息推送(最新)

SpringBoot3 + Netty + Vue3 实现消息推送

  • 1、效果展示
  • 2、依赖引入
  • 3、后端代码
  • 4、后端使用推送
  • 5、前端代码
    • 5.1 websocket.js
    • 5.2 login.vue

1、效果展示

在这里插入图片描述

2、依赖引入

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.2.1</version></parent><!-- Netty 核心 -->
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.107.Final</version>
</dependency>

3、后端代码

MessageVO


package vip.xiaonuo.biz.modular.websocket;import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Map;/*** @Author 小钟* @Date 2025/7/30 16:06* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageVO {private Long unread;private Map<String, String> unreadByUserList;private String title;private String type;private String content;private String extJson;@Overridepublic String toString() {return new Gson().toJson(this);}
}

NettyWebSocketServer

package vip.xiaonuo.biz.modular.websocket;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Author 小钟* @Date 2025/7/30 15:26* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
@Component
public class NettyWebSocketServer {@Autowiredprivate WebSocketConfigProperties configProperties;@Autowiredprivate WebSocketChannelInitializer webSocketChannelInitializer;public void start() {int port = configProperties.getPort();EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(webSocketChannelInitializer);ChannelFuture f = b.bind(port).sync();System.out.println("Netty WebSocket 启动成功,端口:" + port);f.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}@PostConstructpublic void init() {new Thread(this::start).start();}
}

WebSocketChannelInitializer

package vip.xiaonuo.biz.modular.websocket;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @Author 小钟* @Date 2025/7/30 15:26* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
@Component
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {@Autowiredprivate WebSocketHandler webSocketHandler;@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(65536));pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));pipeline.addLast(webSocketHandler);}
}

WebSocketConfigProperties

package vip.xiaonuo.biz.modular.websocket;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @Author 小钟* @Date 2025/7/30 18:35* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
@Data
@Component
@ConfigurationProperties(prefix = "websocket")
public class WebSocketConfigProperties {private int port;
}

WebSocketHandler

package vip.xiaonuo.biz.modular.websocket;import com.alibaba.fastjson2.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import vip.xiaonuo.biz.modular.oamsmessage.service.OamsDevMessageService;import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** @Author 小钟* @Date 2025/7/30 15:27* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Autowiredprivate OamsDevMessageService oamsDevMessageService;// userId -> 多个连接public static final Map<String, Set<Channel>> userChannelMap = new ConcurrentHashMap<>();// channelId -> userId(用于断开连接时清理)public static final Map<String, String> channelUserMap = new ConcurrentHashMap<>();@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {Channel channel = ctx.channel();String channelId = channel.id().asShortText();String userId = channelUserMap.remove(channelId);if (userId != null) {Set<Channel> channels = userChannelMap.get(userId);if (channels != null) {channels.remove(channel);if (channels.isEmpty()) {userChannelMap.remove(userId);}}}System.out.println("连接断开:" + channelId);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {String json = msg.text();JSONObject obj = JSONObject.parseObject(json);String type = obj.getString("type");String userId = obj.getString("userId");if ("bind".equals(type)) {Channel channel = ctx.channel();userChannelMap.computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet()).add(channel);channelUserMap.put(channel.id().asShortText(), userId);channel.writeAndFlush(new TextWebSocketFrame("用户 [" + userId + "] 绑定成功"));// 推送消息提示Long unreadNumber = oamsDevMessageService.unreadNumber(userId);MessageVO pushMessageVO = new MessageVO();pushMessageVO.setUnread(unreadNumber);pushMessageVO.setType("unreadNumber");WebSocketPushUtil.pushToUser(userId, String.valueOf(pushMessageVO));} else if ("ping".equals(type)) {ctx.channel().writeAndFlush(new TextWebSocketFrame("pong"));} else {ctx.channel().writeAndFlush(new TextWebSocketFrame("未知类型"));}}
}

WebSocketPushUtil

package vip.xiaonuo.biz.modular.websocket;import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import java.util.Collection;
import java.util.Map;
import java.util.Set;/*** @Author 小钟* @Date 2025/7/30 15:27* @PackagePath vip.xiaonuo.biz.modular.websocket* @ClassDescription**/
public class WebSocketPushUtil {// 推送给一个用户所有连接public static void pushToUser(String userId, String msg) {Set<Channel> channels = WebSocketHandler.userChannelMap.get(userId);if (channels != null) {for (Channel channel : channels) {if (channel.isActive()) {channel.writeAndFlush(new TextWebSocketFrame(msg));}}}}// 推送给多个用户public static void pushToUsers(Collection<String> userIds, MessageVO pushMessageVO) {Map<String, String> unreadByUserList = pushMessageVO.getUnreadByUserList();for (String userId : userIds) {String unreadStr = unreadByUserList.getOrDefault(userId, "0");MessageVO vo = new MessageVO();vo.setType(pushMessageVO.getType());vo.setUnread(Long.parseLong(unreadStr));vo.setExtJson(pushMessageVO.getExtJson());vo.setTitle(pushMessageVO.getTitle());vo.setContent(pushMessageVO.getContent());pushToUser(userId, vo.toString());}}// 广播给所有在线用户public static void broadcast(String msg) {for (Set<Channel> channels : WebSocketHandler.userChannelMap.values()) {for (Channel channel : channels) {if (channel.isActive()) {channel.writeAndFlush(new TextWebSocketFrame(msg));}}}}
}

application.yml

server:port: 82servlet:context-path: /eamwebsocket:port: 9001

4、后端使用推送

// 推送申请通过消息(单个)
Long unreadNumber = messageService.unreadNumber();
MessageVO pushMessageVO = new MessageVO();
pushMessageVO.setUnread(unreadNumber);
pushMessageVO.setTitle("同意 ".concat(apply.getActionType()).concat(" 申请"));
pushMessageVO.setType("message");
pushMessageVO.setContent(htmlContent);
WebSocketPushUtil.pushToUser(apply.getApplicantId(), String.valueOf(pushMessageVO));// 推送消息提示(多个)
Map<String, String> unreadNumber = oamsDevMessageService.unreadNumber(reviewerIdList);
MessageVO pushMessageVO = new MessageVO();
pushMessageVO.setUnreadByUserList(unreadNumber);
pushMessageVO.setTitle(user.getName().concat(" 提交了 ").concat(apply.getActionType()).concat(" 申请"));
pushMessageVO.setType("message");
pushMessageVO.setContent(htmlContent);
WebSocketPushUtil.pushToUsers(reviewerIdList, pushMessageVO);

5、前端代码

5.1 websocket.js

/** @Date: 2025-07-30 15:45:06* @LastEditors: zhong* @LastEditTime: 2025-08-20 11:22:11* @FilePath: \snowy-admin-web\src\utils\websocket.js*/
// src/utils/websocket.js
// websocketClient.js
import { useMessageStore } from "@/store/messageStore.js";
import { notification } from "ant-design-vue";let socket = null;
let reconnectTimer = null;
let isManualClose = false; // 是否人为关闭
// 创建 WebSocket 连接
export const createWebSocket = (userId) => {const messageStore = useMessageStore();if (socket && socket.readyState === WebSocket.OPEN) {console.log("🟡 WebSocket 已连接,跳过重复连接");return;}const wsUrl = `ws://${import.meta.env.VITE_WEBSOCKET_BASEURL}:${import.meta.env.VITE_WEBSOCKET_PORT}/ws`;isManualClose = false;socket = new WebSocket(wsUrl);socket.onopen = () => {console.log("✅ WebSocket 连接成功");socket.send(JSON.stringify({ type: "bind", userId }));};socket.onmessage = (event) => {try {const msg = JSON.parse(event.data);if (msg.type === "unreadNumber" || msg.type === "message") {console.log("📨 收到消息:", msg);messageStore.addMessage(msg);if (msg.type === "message") {notification.success({message: "新消息提醒",description: msg.title || msg.content,duration: 4});}}} catch (e) {console.warn("❌ 消息解析失败:", event.data);}};socket.onerror = () => {console.warn("WebSocket 连接出错");reconnect(userId);};socket.onclose = (e) => {console.warn("WebSocket 已关闭");if (!isManualClose) {reconnect(userId);}};
};// 主动断开 WebSocket 连接
export const closeWebSocket = () => {isManualClose = true;if (socket) {console.log("🔌 手动关闭 WebSocket");socket.close();socket = null;}clearTimeout(reconnectTimer);
};// 重连机制
const reconnect = (userId) => {clearTimeout(reconnectTimer);reconnectTimer = setTimeout(() => {console.log("♻️ WebSocket 尝试重连...");createWebSocket(userId);}, 3000);
};// 获取当前连接状态
export const getWebSocketStatus = () => {if (!socket) return "未连接";switch (socket.readyState) {case WebSocket.CONNECTING:return "连接中";case WebSocket.OPEN:return "已连接";case WebSocket.CLOSING:return "正在关闭";case WebSocket.CLOSED:return "已关闭";default:return "未知状态";}
};

5.2 login.vue

登录时需要初始化 websocket 连接

const login = async () => {loginForm.value.validate().then(async () => {loading.value = trueconst loginData = {account: ruleForm.account,// 密码进行SM2加密,传输过程中看到的只有密文,后端存储使用hashpassword: smCrypto.doSm2Encrypt(ruleForm.password),validCode: ruleForm.validCode,validCodeReqNo: ruleForm.validCodeReqNo}// 获取tokentry {const loginToken = await loginApi.login(loginData)await afterLogin(loginToken)const userId = tool.data.get("USER_INFO")?.idconsole.log(userId);if (userId) {createWebSocket(userId, (msg) => {// 🔔 这里处理消息(比如展示通知)console.log("💬 WebSocket 消息:", msg)// 你也可以触发通知组件或消息中心})}} catch (err) {loading.value = falseif (captchaOpen.value === 'true') {loginCaptcha()}}}).catch(() => { })
}
http://www.xdnf.cn/news/19638.html

相关文章:

  • 食品分类案例
  • 码住!辉芒微MCU型号规则详细解析
  • Kafka 架构详解
  • 动子注册操作【2025.9.2学习记录】
  • MVP架构深层剖析-从六大设计原则的实现角度到用依赖注入深度解耦
  • Elasticsearch 核心知识与常见问题解析
  • MCU上跑AI—实时目标检测算法探索
  • 【 HarmonyOS 6 】HarmonyOS智能体开发实战:Function组件和智能体创建
  • 空间不足将docker挂载到其他位置
  • 03_网关ip和端口映射(路由器转发)操作和原理
  • 梯度消失问题:深度学习中的「记忆衰退」困境与解决方案
  • React 学习笔记4 Diffing/脚手架
  • 2025了,你知道electron-vite吗?
  • 网络原理——HTTP/HTTPS
  • ImageMagick命令行图片工具:批量实现格式转换与压缩,支持水印添加及GIF动态图合成
  • 2条命令,5秒安装,1秒启动!Vite项目保姆级上手指南
  • 鸿蒙NEXT界面交互全解析:弹出框、菜单、气泡提示与模态页面的实战指南
  • 开源的聚合支付系统源码/易支付系统 /三方支付系统
  • Erlang 利用 recon 排查热点进程
  • 人工智能之数学基础:分布函数对随机变量的概率分布情况进行刻画
  • 微信小程序 navigateTo 栈超过多层后会失效
  • 在 Delphi 5 中获取 Word 文档页数的方法
  • 小程序蓝牙低功耗(BLE)外围设备开发指南
  • 365 天技术创作手记:从一行代码到四万同行者的相遇
  • C++多线程编程:std::thread, std::async, std::future
  • Jenkins Pipeline 语法
  • 第 12 篇:网格边界安全 - Egress Gateway 与最佳实践
  • python中的zip() 函数介绍及使用说明
  • 基于Spark的新冠肺炎疫情实时监控系统_django+spider
  • HTML第三课:特殊元素