基于Spring Boot和WebSocket的实时聊天系统
一、项目架构设计
1. 技术栈组成
组件 | 用途 | 版本 |
---|---|---|
Spring Boot | 基础框架 | 2.7.x |
javax.websocket | WebSocket实现 | JSR-356 |
Mybatis-Plus | 数据持久化 | 3.5.x |
Redis | 缓存/消息队列 | 6.2.x |
MongoDB | 聊天记录存储 | 5.0.x |
MinIO | 文件存储 | 8.0.x |
Kafka | 消息分发 | 2.8.x |
OAuth2 | 认证授权 | 2.5.x |
2. 系统架构图
二、核心功能实现
1. WebSocket配置增强版
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(chatWebSocketHandler(), "/websocket").setAllowedOrigins("*").addInterceptors(new AuthHandshakeInterceptor()).withSockJS();}@Beanpublic WebSocketHandler chatWebSocketHandler() {return new ChatWebSocketHandler();}@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();container.setMaxTextMessageBufferSize(8192);container.setMaxBinaryMessageBufferSize(8192);container.setMaxSessionIdleTimeout(600000L);return container;}
}
2. 消息处理器实现
public class ChatWebSocketHandler extends TextWebSocketHandler {private static final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();private final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic void afterConnectionEstablished(WebSocketSession session) {String userId = getUserIdFromSession(session);sessions.put(userId, session);updateOnlineStatus(userId, true);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {try {ChatMessage chatMessage = objectMapper.readValue(message.getPayload(), ChatMessage.class);switch (chatMessage.getType()) {case HEARTBEAT:handleHeartbeat(session);break;case SINGLE_CHAT:handleSingleChat(chatMessage);break;case GROUP_CHAT:handleGroupChat(chatMessage);break;case READ_RECEIPT:handleReadReceipt(chatMessage);break;case FILE_UPLOAD:handleFileUpload(chatMessage);break;}} catch (Exception e) {log.error("消息处理异常", e);}}private void handleHeartbeat(WebSocketSession session) {try {session.sendMessage(new TextMessage("{\\"type\\":\\"HEARTBEAT_RESPONSE\\"}"));} catch (IOException e) {log.error("心跳响应失败", e);}}// 其他处理方法...
}
三、关键业务逻辑实现
1. 消息存储设计
MySQL表结构
CREATE TABLE `chat_message` (`id` bigint NOT NULL AUTO_INCREMENT,`msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',`sender_id` varchar(64) NOT NULL,`receiver_id` varchar(64) NOT NULL,`content` text,`msg_type` tinyint NOT NULL COMMENT '1-文本 2-图片 3-视频',`status` tinyint DEFAULT '0' COMMENT '0-未读 1-已读',`created_at` datetime NOT NULL,PRIMARY KEY (`id`),UNIQUE KEY `uk_msg_id` (`msg_id`),KEY `idx_sender_receiver` (`sender_id`,`receiver_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
MongoDB文档结构
@Document(collection = "chat_messages")
public class ChatMessageDocument {@Idprivate String id;private String msgId;private String senderId;private String receiverId;private String content;private MessageType msgType;private MessageStatus status;private Date createdAt;private List<ReadReceipt> readReceipts;// 嵌套文档public static class ReadReceipt {private String userId;private Date readAt;}
}
2. 消息分发流程
@Service
@RequiredArgsConstructor
public class MessageDispatcher {private final KafkaTemplate<String, String> kafkaTemplate;private final RedisTemplate<String, String> redisTemplate;public void dispatch(ChatMessage message) {// 存储消息storeMessage(message);// 实时推送if (isUserOnline(message.getReceiverId())) {realtimePush(message);} else {// 离线用户通过推送通知pushNotification(message);}// 发往Kafka做后续处理kafkaTemplate.send("chat-messages", message.getMsgId(), serialize(message));}private boolean isUserOnline(String userId) {return redisTemplate.opsForValue().get("user:online:" + userId) != null;}private void realtimePush(ChatMessage message) {WebSocketSession session = sessions.get(message.getReceiverId());if (session != null && session.isOpen()) {try {session.sendMessage(new TextMessage(serialize(message)));} catch (IOException e) {log.error("消息推送失败", e);}}}
}
四、高级功能实现
1. 心跳检测机制
@Scheduled(fixedRate = 30000)
public void checkHeartbeat() {long now = System.currentTimeMillis();sessions.forEach((userId, session) -> {Long lastHeartbeat = heartbeatTimestamps.get(userId);if (lastHeartbeat == null || now - lastHeartbeat > 60000) {try {session.close(CloseStatus.SESSION_NOT_RELIABLE);sessions.remove(userId);updateOnlineStatus(userId, false);} catch (IOException e) {log.error("关闭会话失败", e);}}});
}
2. 消息已读回执
public void handleReadReceipt(ChatMessage message) {// 更新MySQL中的消息状态chatMessageMapper.updateStatusByMsgId(message.getMsgId(), MessageStatus.READ);// 更新MongoDB中的阅读状态Query query = Query.query(Criteria.where("msgId").is(message.getMsgId()));Update update = new Update().push("readReceipts", new ReadReceipt(message.getSenderId(), new Date())).set("status", MessageStatus.READ);mongoTemplate.updateFirst(query, update, ChatMessageDocument.class);// 通知发送方消息已读if (isUserOnline(message.getSenderId())) {realtimePush(new ChatMessage(MessageType.READ_RECEIPT,message.getMsgId(),message.getReceiverId(),message.getSenderId()));}
}
五、性能优化方案
1. 消息批量处理
@KafkaListener(topics = "chat-messages", groupId = "message-processor")
public void processMessages(List<ConsumerRecord<String, String>> records) {List<ChatMessage> messages = records.stream().map(record -> deserialize(record.value())).collect(Collectors.toList());// 批量存储MySQLchatMessageMapper.batchInsert(messages);// 批量存储MongoDBList<ChatMessageDocument> documents = messages.stream().map(this::convertToDocument).collect(Collectors.toList());mongoTemplate.insertAll(documents);
}
2. Redis缓存优化
@Service
public class UserStatusService {private final RedisTemplate<String, String> redisTemplate;public boolean isOnline(String userId) {return redisTemplate.opsForValue().get("user:online:" + userId) != null;}public void setOnline(String userId, boolean online) {if (online) {redisTemplate.opsForValue().set("user:online:" + userId,"1",5, TimeUnit.MINUTES);} else {redisTemplate.delete("user:online:" + userId);}}public List<String> getOnlineUsers(List<String> userIds) {return redisTemplate.executePipelined((RedisCallback<Object>) connection -> {for (String userId : userIds) {connection.exists(("user:online:" + userId).getBytes());}return null;}).stream().map(Object::toString).collect(Collectors.toList());}
}
六、安全防护措施
1. OAuth2认证集成
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {http.authorizeHttpRequests(auth -> auth.antMatchers("/websocket/**").authenticated().anyRequest().permitAll()).oauth2ResourceServer(oauth2 -> oauth2.jwt(jwt -> jwt.decoder(jwtDecoder()))).sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS));return http.build();
}@Bean
public JwtDecoder jwtDecoder() {return NimbusJwtDecoder.withJwkSetUri(jwkSetUri).build();
}
2. WebSocket安全拦截器
public class AuthHandshakeInterceptor implements HandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request,ServerHttpResponse response,WebSocketHandler wsHandler,Map<String, Object> attributes) {String token = extractToken(request);if (token == null) {return false;}try {Jwt jwt = jwtDecoder.decode(token);attributes.put("userId", jwt.getSubject());return true;} catch (JwtException e) {return false;}}private String extractToken(ServerHttpRequest request) {// 从请求头或参数中提取token}
}
七、部署与监控
1. Docker Compose配置
version: '3.8'services:app:build: .ports:- "8080:8080"depends_on:- redis- mysql- mongodb- kafkaredis:image: redis:6.2ports:- "6379:6379"mysql:image: mysql:8.0environment:MYSQL_ROOT_PASSWORD: rootMYSQL_DATABASE: chat_dbports:- "3306:3306"mongodb:image: mongo:5.0ports:- "27017:27017"kafka:image: bitnami/kafka:2.8ports:- "9092:9092"environment:KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181ALLOW_PLAINTEXT_LISTENER: "yes"zookeeper:image: bitnami/zookeeper:3.7ports:- "2181:2181"
2. Prometheus监控配置
scrape_configs:- job_name: 'chat-app'metrics_path: '/actuator/prometheus'static_configs:- targets: ['app:8080']labels:service: 'chat-service'- job_name: 'redis'static_configs:- targets: ['redis:9121']- job_name: 'mysql'static_configs:- targets: ['mysql:9104']- job_name: 'kafka'static_configs:- targets: ['kafka:7071']
通过以上实现方案,我们构建了一个功能完善、性能优越且安全可靠的实时聊天系统。该系统不仅支持基本的聊天功能,还提供了消息存储、已读回执、文件传输等高级特性,同时具备良好的扩展性和可维护性。