Java的Spring Cloud生态中实现SSE(Server-Sent Events)服务端实践
在Java的Spring Cloud生态中实现SSE(Server-Sent Events)服务端推送功能,可以通过Spring Boot原生支持的SseEmitter
类简化开发。以下是详细的实现步骤,包括依赖配置、核心代码及注意事项:
一、依赖配置
SSE的实现基于Spring Web模块,无需额外依赖,只需在pom.xml
中添加基础Web依赖:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>
若需异步处理,可引入spring-boot-starter-async
(非必需)
二、服务端实现
1. 控制器类
使用@RestController
定义SSE接口,返回SseEmitter
对象以建立长连接:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@RestController
@RequestMapping("/sse")
public class SseController {@GetMapping(value = "/stream", produces = "text/event-stream")public SseEmitter stream() {SseEmitter emitter = new SseEmitter(0L); // 设置超时时间(0表示无超时)// 使用线程池异步推送数据Executors.newSingleThreadExecutor().execute(() -> {try {for (int i = 0; i < 10; i++) {// 发送SSE格式数据(支持自定义事件类型、ID和重试时间)emitter.send(SseEmitter.event().name("message") // 事件类型.id(String.valueOf(i)) // 事件ID.data("Data " + i) // 消息内容.reconnectTime(5000)); // 重连时间(毫秒)Thread.sleep(1000);}emitter.complete(); // 正常关闭连接} catch (IOException | InterruptedException e) {emitter.completeWithError(e); // 异常关闭}});// 注册回调(处理连接关闭、超时或错误)emitter.onCompletion(() -> System.out.println("SSE连接关闭"));emitter.onTimeout(() -> System.out.println("SSE连接超时"));emitter.onError(ex -> System.out.println("SSE错误: " + ex.getMessage()));return emitter;}
}package test;
import java.util.Random;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping(value = "/test")
public class SEEController {
//设置响应格式@RequestMapping(value = "/push", produces = "text/event-stream;charset=UTF-8")public @ResponseBody String push() {Random r = new Random();try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}
//SSE返回数据格式是固定的以data:开头,以\n\n结束return "data:Testing 1,2,3" + r.nextInt() + "\n\n";}
}
使用原生Servlet实现SSE(异步处理)
import javax.servlet.*;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.*;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@WebServlet(urlPatterns = "/sse", asyncSupported = true)
public class SseServlet extends HttpServlet {private final ExecutorService executor = Executors.newCachedThreadPool();@Overrideprotected void doGet(HttpServletRequest request, HttpServletResponse response) {// 设置响应头response.setContentType("text/event-stream");response.setCharacterEncoding("UTF-8");response.setHeader("Cache-Control", "no-cache");response.setHeader("Connection", "keep-alive");response.setHeader("Access-Control-Allow-Origin", "*");// 启动异步上下文AsyncContext asyncContext = request.startAsync();asyncContext.setTimeout(0); // 无超时限制// 提交任务到线程池executor.submit(() -> {try (PrintWriter writer = asyncContext.getResponse().getWriter()) {for (int i = 0; i < 10; i++) {if (asyncContext.getResponse().isCommitted()) {// 构建SSE格式数据(注意末尾的两个换行符)String message = "data: Message " + i + "\n\n";writer.write(message);writer.flush(); // 立即发送数据Thread.sleep(1000);} else {break; // 客户端已断开}}} catch (IOException | InterruptedException e) {// 客户端断开连接或发生错误} finally {asyncContext.complete(); // 完成请求}});}
}
package com.example.sse;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@SpringBootApplication
@RestController
public class SseServerApplication {// 存储所有客户端的SseEmitterprivate final List<SseEmitter> emitters = new ArrayList<>();public static void main(String[] args) {SpringApplication.run(SseServerApplication.class, args);}// SSE端点@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter streamSseMvc() {SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间为60秒// 添加客户端到连接列表emitters.add(emitter);// 设置完成回调emitter.onCompletion(() -> emitters.remove(emitter));// 设置超时回调emitter.onTimeout(() -> emitters.remove(emitter));// 设置错误回调emitter.onError((ex) -> emitters.remove(emitter));// 发送连接成功消息try {emitter.send(SseEmitter.event().id(String.valueOf(System.currentTimeMillis())).name("message").data("{\"message\": \"连接已建立\"}"));} catch (IOException e) {emitters.remove(emitter);e.printStackTrace();}return emitter;}// 模拟定时发送数据的方法public void startPeriodicUpdates() {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {String data = "{\"timestamp\": " + System.currentTimeMillis() + ", \"message\": \"定期更新\"}";sendToAll(data);}, 0, 5, TimeUnit.SECONDS); // 每5秒发送一次更新}// 向所有客户端发送消息private void sendToAll(String data) {List<SseEmitter> deadEmitters = new ArrayList<>();emitters.forEach(emitter -> {try {emitter.send(SseEmitter.event().id(String.valueOf(System.currentTimeMillis())).name("data").data(data));} catch (IOException e) {deadEmitters.add(emitter);e.printStackTrace();}});// 移除失效的emitteremitters.removeAll(deadEmitters);}
}
2. 核心要点
- 数据格式:遵循SSE协议,每条消息以
data:
开头,以\n\n
结尾。使用SseEmitter.event()
可自动处理格式45。 - 异步处理:通过线程池或异步任务避免阻塞主线程,确保高并发下的性能69。
- 超时与重连:通过
SseEmitter(long timeout)
设置超时,客户端断开后默认3秒重连(可通过retry
字段调整)36。
三、客户端示例
前端通过EventSource
监听服务端推送:
const eventSource = new EventSource('http://localhost:8080/sse/stream');eventSource.onmessage = (e) => {console.log('收到消息:', e.data);
};eventSource.addEventListener('message', (e) => { // 自定义事件类型console.log('自定义事件:', e.data);
});eventSource.onerror = () => {eventSource.close(); // 错误时关闭连接
};
四、高级配置
1. 跨域支持
若前端与服务端域名不同,需添加跨域配置:
@Configuration
public class CorsConfig implements WebMvcConfigurer {@Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/sse/**").allowedOrigins("*").allowedMethods("GET");}
}
2. 性能优化
- 线程池管理:避免频繁创建线程,推荐使用
ThreadPoolTaskExecutor
6。 - 连接数限制:浏览器对同一域名SSE连接数有限制(通常6个),需合理设计接口3。
五、SSE与WebSocket对比
特性 | SSE | WebSocket |
协议 | HTTP(单向) | 独立协议(双向) |
实现复杂度 | 简单(无需额外协议) | 较高(需握手协议) |
断线重连 | 原生支持 | 需手动实现 |
适用场景 | 服务端单向推送(如实时通知) | 双向通信(如聊天室) |
选型建议:
- 使用SSE:低频单向推送、兼容性要求高46。
- 使用WebSocket:高频双向交互、低延迟需求910。
六、常见问题
- 客户端收不到消息
-
- 检查数据格式是否正确(必须以
\n\n
结尾)。 - 确保响应头
Content-Type
为text/event-stream
25。
- 检查数据格式是否正确(必须以
- 连接频繁断开
-
- 调整服务端超时时间(如
new SseEmitter(-1L)
)。 - 客户端增加错误处理逻辑,自动重连36。
- 调整服务端超时时间(如
- 高并发性能问题
-
- 使用异步非阻塞模型(如Reactive WebFlux)替代传统Servlet8。
通过上述实现,可快速构建基于Spring Cloud的SSE服务端,适用于实时通知、数据监控等场景。
相关文档:
服务器推送消息SSE以及springmvc后台实现例子_sse 多实例如何区分-CSDN博客