当前位置: 首页 > backend >正文

基于Spring Boot和WebSocket的实时聊天系统

一、项目架构设计

1. 技术栈组成

组件用途版本
Spring Boot基础框架2.7.x
javax.websocketWebSocket实现JSR-356
Mybatis-Plus数据持久化3.5.x
Redis缓存/消息队列6.2.x
MongoDB聊天记录存储5.0.x
MinIO文件存储8.0.x
Kafka消息分发2.8.x
OAuth2认证授权2.5.x

2. 系统架构图

客户端
WebSocket连接
Spring Boot服务
消息处理器
MySQL: 用户/关系
MongoDB: 聊天记录
Redis: 在线状态
MinIO: 文件存储
Kafka: 消息分发

二、核心功能实现

1. WebSocket配置增强版

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(chatWebSocketHandler(), "/websocket").setAllowedOrigins("*").addInterceptors(new AuthHandshakeInterceptor()).withSockJS();}@Beanpublic WebSocketHandler chatWebSocketHandler() {return new ChatWebSocketHandler();}@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();container.setMaxTextMessageBufferSize(8192);container.setMaxBinaryMessageBufferSize(8192);container.setMaxSessionIdleTimeout(600000L);return container;}
}

2. 消息处理器实现

public class ChatWebSocketHandler extends TextWebSocketHandler {private static final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();private final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic void afterConnectionEstablished(WebSocketSession session) {String userId = getUserIdFromSession(session);sessions.put(userId, session);updateOnlineStatus(userId, true);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {try {ChatMessage chatMessage = objectMapper.readValue(message.getPayload(), ChatMessage.class);switch (chatMessage.getType()) {case HEARTBEAT:handleHeartbeat(session);break;case SINGLE_CHAT:handleSingleChat(chatMessage);break;case GROUP_CHAT:handleGroupChat(chatMessage);break;case READ_RECEIPT:handleReadReceipt(chatMessage);break;case FILE_UPLOAD:handleFileUpload(chatMessage);break;}} catch (Exception e) {log.error("消息处理异常", e);}}private void handleHeartbeat(WebSocketSession session) {try {session.sendMessage(new TextMessage("{\\"type\\":\\"HEARTBEAT_RESPONSE\\"}"));} catch (IOException e) {log.error("心跳响应失败", e);}}// 其他处理方法...
}

三、关键业务逻辑实现

1. 消息存储设计

MySQL表结构

CREATE TABLE `chat_message` (`id` bigint NOT NULL AUTO_INCREMENT,`msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',`sender_id` varchar(64) NOT NULL,`receiver_id` varchar(64) NOT NULL,`content` text,`msg_type` tinyint NOT NULL COMMENT '1-文本 2-图片 3-视频',`status` tinyint DEFAULT '0' COMMENT '0-未读 1-已读',`created_at` datetime NOT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uk_msg_id` (`msg_id`),KEY `idx_sender_receiver` (`sender_id`,`receiver_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

MongoDB文档结构

@Document(collection = "chat_messages")
public class ChatMessageDocument {@Idprivate String id;private String msgId;private String senderId;private String receiverId;private String content;private MessageType msgType;private MessageStatus status;private Date createdAt;private List<ReadReceipt> readReceipts;// 嵌套文档public static class ReadReceipt {private String userId;private Date readAt;}
}

2. 消息分发流程

@Service
@RequiredArgsConstructor
public class MessageDispatcher {private final KafkaTemplate<String, String> kafkaTemplate;private final RedisTemplate<String, String> redisTemplate;public void dispatch(ChatMessage message) {// 存储消息storeMessage(message);// 实时推送if (isUserOnline(message.getReceiverId())) {realtimePush(message);} else {// 离线用户通过推送通知pushNotification(message);}// 发往Kafka做后续处理kafkaTemplate.send("chat-messages", message.getMsgId(), serialize(message));}private boolean isUserOnline(String userId) {return redisTemplate.opsForValue().get("user:online:" + userId) != null;}private void realtimePush(ChatMessage message) {WebSocketSession session = sessions.get(message.getReceiverId());if (session != null && session.isOpen()) {try {session.sendMessage(new TextMessage(serialize(message)));} catch (IOException e) {log.error("消息推送失败", e);}}}
}

四、高级功能实现

1. 心跳检测机制

@Scheduled(fixedRate = 30000)
public void checkHeartbeat() {long now = System.currentTimeMillis();sessions.forEach((userId, session) -> {Long lastHeartbeat = heartbeatTimestamps.get(userId);if (lastHeartbeat == null || now - lastHeartbeat > 60000) {try {session.close(CloseStatus.SESSION_NOT_RELIABLE);sessions.remove(userId);updateOnlineStatus(userId, false);} catch (IOException e) {log.error("关闭会话失败", e);}}});
}

2. 消息已读回执

public void handleReadReceipt(ChatMessage message) {// 更新MySQL中的消息状态chatMessageMapper.updateStatusByMsgId(message.getMsgId(), MessageStatus.READ);// 更新MongoDB中的阅读状态Query query = Query.query(Criteria.where("msgId").is(message.getMsgId()));Update update = new Update().push("readReceipts", new ReadReceipt(message.getSenderId(), new Date())).set("status", MessageStatus.READ);mongoTemplate.updateFirst(query, update, ChatMessageDocument.class);// 通知发送方消息已读if (isUserOnline(message.getSenderId())) {realtimePush(new ChatMessage(MessageType.READ_RECEIPT,message.getMsgId(),message.getReceiverId(),message.getSenderId()));}
}

五、性能优化方案

1. 消息批量处理

@KafkaListener(topics = "chat-messages", groupId = "message-processor")
public void processMessages(List<ConsumerRecord<String, String>> records) {List<ChatMessage> messages = records.stream().map(record -> deserialize(record.value())).collect(Collectors.toList());// 批量存储MySQLchatMessageMapper.batchInsert(messages);// 批量存储MongoDBList<ChatMessageDocument> documents = messages.stream().map(this::convertToDocument).collect(Collectors.toList());mongoTemplate.insertAll(documents);
}

2. Redis缓存优化

@Service
public class UserStatusService {private final RedisTemplate<String, String> redisTemplate;public boolean isOnline(String userId) {return redisTemplate.opsForValue().get("user:online:" + userId) != null;}public void setOnline(String userId, boolean online) {if (online) {redisTemplate.opsForValue().set("user:online:" + userId,"1",5, TimeUnit.MINUTES);} else {redisTemplate.delete("user:online:" + userId);}}public List<String> getOnlineUsers(List<String> userIds) {return redisTemplate.executePipelined((RedisCallback<Object>) connection -> {for (String userId : userIds) {connection.exists(("user:online:" + userId).getBytes());}return null;}).stream().map(Object::toString).collect(Collectors.toList());}
}

六、安全防护措施

1. OAuth2认证集成

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {http.authorizeHttpRequests(auth -> auth.antMatchers("/websocket/**").authenticated().anyRequest().permitAll()).oauth2ResourceServer(oauth2 -> oauth2.jwt(jwt -> jwt.decoder(jwtDecoder()))).sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS));return http.build();
}@Bean
public JwtDecoder jwtDecoder() {return NimbusJwtDecoder.withJwkSetUri(jwkSetUri).build();
}

2. WebSocket安全拦截器

public class AuthHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Map<String, Object> attributes) {String token = extractToken(request);if (token == null) {return false;}try {Jwt jwt = jwtDecoder.decode(token);attributes.put("userId", jwt.getSubject());return true;} catch (JwtException e) {return false;}}private String extractToken(ServerHttpRequest request) {// 从请求头或参数中提取token}
}

七、部署与监控

1. Docker Compose配置

version: '3.8'services:app:build: .ports:- "8080:8080"depends_on:- redis- mysql- mongodb- kafkaredis:image: redis:6.2ports:- "6379:6379"mysql:image: mysql:8.0environment:MYSQL_ROOT_PASSWORD: rootMYSQL_DATABASE: chat_dbports:- "3306:3306"mongodb:image: mongo:5.0ports:- "27017:27017"kafka:image: bitnami/kafka:2.8ports:- "9092:9092"environment:KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181ALLOW_PLAINTEXT_LISTENER: "yes"zookeeper:image: bitnami/zookeeper:3.7ports:- "2181:2181"

2. Prometheus监控配置

scrape_configs:- job_name: 'chat-app'metrics_path: '/actuator/prometheus'static_configs:- targets: ['app:8080']labels:service: 'chat-service'- job_name: 'redis'static_configs:- targets: ['redis:9121']- job_name: 'mysql'static_configs:- targets: ['mysql:9104']- job_name: 'kafka'static_configs:- targets: ['kafka:7071']

通过以上实现方案,我们构建了一个功能完善、性能优越且安全可靠的实时聊天系统。该系统不仅支持基本的聊天功能,还提供了消息存储、已读回执、文件传输等高级特性,同时具备良好的扩展性和可维护性。

http://www.xdnf.cn/news/17410.html

相关文章:

  • go语言运算符
  • 遇到前端导出 Excel 文件出现乱码或文件损坏的问题
  • Linux 管道命令及相关命令练习与 Shell 编程、Tomcat 安装
  • 基于Ubuntu20.04的环境,编译QT5.15.17源码
  • Lua语言元表、协同程序
  • JavaWeb(苍穹外卖)--学习笔记17(Apache Echarts)
  • LightGBM 与 GBDT 在机器学习中的性能与特点比较
  • Graph-R1:一种用于结构化多轮推理的智能图谱检索框架,并结合端到端强化学习
  • 【最后203篇系列】031 构建MCP尝试
  • Docker Compose 部署高可用 MongoDB 副本集集群(含 Keepalived + HAProxy 负载均衡)
  • 从零学习three.js官方文档(二)——图元
  • 去除Edge微软浏览器与Chrome谷歌浏览器顶部出现“此版本的Windows不再支持升级Windows 10”的烦人提示
  • JavaWeb(苍穹外卖)--学习笔记18(Apache POI)
  • 安全引导功能及ATF的启动过程(五)
  • 数据结构:栈和队列(Stack Queue)基本概念与应用
  • AI编程插件对比分析:CodeRider、GitHub Copilot及其他
  • 云服务器最新版MySQL 安装步骤
  • 第4章 程序段的反复执行1 for语句P115练习题(题及答案)
  • Matlab系列(004) 一 Matlab分析正态分布(高斯分布)
  • cuOpt_server错误分析
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘fastai’问题
  • 面试题-----Spring Cloud
  • LLM 的向量的方向表示语义,向量长度表示什么
  • 强化学习笔记:从Q学习到GRPO
  • 1.JavaScript 介绍
  • Linux系统编程Day10 -- 进程管理
  • 分治-快排-面试题 17.14.最小k个数-力扣(LeetCode)
  • 在 Vue 中动态引入SVG图标的实现方案
  • Horse3D引擎研发笔记(三):使用QtOpenGL的Shader编程绘制彩色三角形
  • 第十九天-输入捕获实验