当前位置: 首页 > news >正文

Java的Spring Cloud生态中实现SSE(Server-Sent Events)服务端实践

在Java的Spring Cloud生态中实现SSE(Server-Sent Events)服务端推送功能,可以通过Spring Boot原生支持的SseEmitter类简化开发。以下是详细的实现步骤,包括依赖配置、核心代码及注意事项:


一、依赖配置

SSE的实现基于Spring Web模块,无需额外依赖,只需在pom.xml中添加基础Web依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>

若需异步处理,可引入spring-boot-starter-async(非必需)


二、服务端实现

1. 控制器类

使用@RestController定义SSE接口,返回SseEmitter对象以建立长连接:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;@RestController
@RequestMapping("/sse")
public class SseController {@GetMapping(value = "/stream", produces = "text/event-stream")public SseEmitter stream() {SseEmitter emitter = new SseEmitter(0L); // 设置超时时间(0表示无超时)// 使用线程池异步推送数据Executors.newSingleThreadExecutor().execute(() -> {try {for (int i = 0; i < 10; i++) {// 发送SSE格式数据(支持自定义事件类型、ID和重试时间)emitter.send(SseEmitter.event().name("message")    // 事件类型.id(String.valueOf(i))  // 事件ID.data("Data " + i)      // 消息内容.reconnectTime(5000));  // 重连时间(毫秒)Thread.sleep(1000);}emitter.complete(); // 正常关闭连接} catch (IOException | InterruptedException e) {emitter.completeWithError(e); // 异常关闭}});// 注册回调(处理连接关闭、超时或错误)emitter.onCompletion(() -> System.out.println("SSE连接关闭"));emitter.onTimeout(() -> System.out.println("SSE连接超时"));emitter.onError(ex -> System.out.println("SSE错误: " + ex.getMessage()));return emitter;}
}package test;
import java.util.Random;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping(value = "/test")
public class SEEController {
//设置响应格式@RequestMapping(value = "/push", produces = "text/event-stream;charset=UTF-8")public @ResponseBody String push() {Random r = new Random();try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}
//SSE返回数据格式是固定的以data:开头,以\n\n结束return "data:Testing 1,2,3" + r.nextInt() + "\n\n";}
}

使用原生Servlet实现SSE(异步处理)

import javax.servlet.*;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.*;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@WebServlet(urlPatterns = "/sse", asyncSupported = true)
public class SseServlet extends HttpServlet {private final ExecutorService executor = Executors.newCachedThreadPool();@Overrideprotected void doGet(HttpServletRequest request, HttpServletResponse response) {// 设置响应头response.setContentType("text/event-stream");response.setCharacterEncoding("UTF-8");response.setHeader("Cache-Control", "no-cache");response.setHeader("Connection", "keep-alive");response.setHeader("Access-Control-Allow-Origin", "*");// 启动异步上下文AsyncContext asyncContext = request.startAsync();asyncContext.setTimeout(0); // 无超时限制// 提交任务到线程池executor.submit(() -> {try (PrintWriter writer = asyncContext.getResponse().getWriter()) {for (int i = 0; i < 10; i++) {if (asyncContext.getResponse().isCommitted()) {// 构建SSE格式数据(注意末尾的两个换行符)String message = "data: Message " + i + "\n\n";writer.write(message);writer.flush(); // 立即发送数据Thread.sleep(1000);} else {break; // 客户端已断开}}} catch (IOException | InterruptedException e) {// 客户端断开连接或发生错误} finally {asyncContext.complete(); // 完成请求}});}
}

package com.example.sse;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@SpringBootApplication
@RestController
public class SseServerApplication {// 存储所有客户端的SseEmitterprivate final List<SseEmitter> emitters = new ArrayList<>();public static void main(String[] args) {SpringApplication.run(SseServerApplication.class, args);}// SSE端点@GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter streamSseMvc() {SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间为60秒// 添加客户端到连接列表emitters.add(emitter);// 设置完成回调emitter.onCompletion(() -> emitters.remove(emitter));// 设置超时回调emitter.onTimeout(() -> emitters.remove(emitter));// 设置错误回调emitter.onError((ex) -> emitters.remove(emitter));// 发送连接成功消息try {emitter.send(SseEmitter.event().id(String.valueOf(System.currentTimeMillis())).name("message").data("{\"message\": \"连接已建立\"}"));} catch (IOException e) {emitters.remove(emitter);e.printStackTrace();}return emitter;}// 模拟定时发送数据的方法public void startPeriodicUpdates() {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {String data = "{\"timestamp\": " + System.currentTimeMillis() + ", \"message\": \"定期更新\"}";sendToAll(data);}, 0, 5, TimeUnit.SECONDS); // 每5秒发送一次更新}// 向所有客户端发送消息private void sendToAll(String data) {List<SseEmitter> deadEmitters = new ArrayList<>();emitters.forEach(emitter -> {try {emitter.send(SseEmitter.event().id(String.valueOf(System.currentTimeMillis())).name("data").data(data));} catch (IOException e) {deadEmitters.add(emitter);e.printStackTrace();}});// 移除失效的emitteremitters.removeAll(deadEmitters);}
}
2. 核心要点
  • 数据格式:遵循SSE协议,每条消息以data:开头,以\n\n结尾。使用SseEmitter.event()可自动处理格式45。
  • 异步处理:通过线程池或异步任务避免阻塞主线程,确保高并发下的性能69。
  • 超时与重连:通过SseEmitter(long timeout)设置超时,客户端断开后默认3秒重连(可通过retry字段调整)36。

三、客户端示例

前端通过EventSource监听服务端推送:

const eventSource = new EventSource('http://localhost:8080/sse/stream');eventSource.onmessage = (e) => {console.log('收到消息:', e.data);
};eventSource.addEventListener('message', (e) => { // 自定义事件类型console.log('自定义事件:', e.data);
});eventSource.onerror = () => {eventSource.close(); // 错误时关闭连接
};

四、高级配置

1. 跨域支持

若前端与服务端域名不同,需添加跨域配置:

@Configuration
public class CorsConfig implements WebMvcConfigurer {@Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/sse/**").allowedOrigins("*").allowedMethods("GET");}
}
2. 性能优化
  • 线程池管理:避免频繁创建线程,推荐使用ThreadPoolTaskExecutor6。
  • 连接数限制:浏览器对同一域名SSE连接数有限制(通常6个),需合理设计接口3。

五、SSE与WebSocket对比

特性

SSE

WebSocket

协议

HTTP(单向)

独立协议(双向)

实现复杂度

简单(无需额外协议)

较高(需握手协议)

断线重连

原生支持

需手动实现

适用场景

服务端单向推送(如实时通知)

双向通信(如聊天室)

选型建议

  • 使用SSE:低频单向推送、兼容性要求高46。
  • 使用WebSocket:高频双向交互、低延迟需求910。

六、常见问题

  1. 客户端收不到消息
    • 检查数据格式是否正确(必须以\n\n结尾)。
    • 确保响应头Content-Typetext/event-stream25。
  1. 连接频繁断开
    • 调整服务端超时时间(如new SseEmitter(-1L))。
    • 客户端增加错误处理逻辑,自动重连36。
  1. 高并发性能问题
    • 使用异步非阻塞模型(如Reactive WebFlux)替代传统Servlet8。

通过上述实现,可快速构建基于Spring Cloud的SSE服务端,适用于实时通知、数据监控等场景。

相关文档:

服务器推送消息SSE以及springmvc后台实现例子_sse 多实例如何区分-CSDN博客

http://www.xdnf.cn/news/683029.html

相关文章:

  • YoloV11改进策略:卷积篇-风车卷积-即插即用
  • 代码随想录算法训练营第60期第四十九天打卡
  • day05-常用API(二):Lambda、方法引用详解
  • Python装饰器与异常捕获的高级用法详解
  • 基于 STM32 的农村污水处理控制系统设计与实现
  • @vue/composition-api
  • uniapp-商城-72-shop(5-商品列表,购物车实现回顾)
  • Linux 6.15 内核发布,新功能
  • 【免费】【无需登录/关注】坐标系批量转换与可视化网页工具
  • 31. 自动化测试开发之实现INI配置文件解析
  • 从CPU缓存出发对引用池进行优化
  • C51-指针函数
  • Linux编译器——gcc/g++的使用
  • 基于Python的智能天气提醒助手开发指南
  • ValueError: BuilderConfig ‘xxxx‘ not found. Available:[xxx]
  • Cannot read properties of undefined (reading ‘clearSelection‘)
  • 华为仓颉语言初识:并发编程之线程的基本使用
  • PCB线路板压合工艺难点解析与技术对策
  • NB-IoT NPUSCH(三)-资源映射
  • gdiplus,GDI +为什么2001年发布后几乎没有再更新了
  • 2025 海外短剧 CPS 系统开发:技术驱动下的全球化内容分销新范式
  • SSM整合:Spring+SpringMVC+MyBatis完美融合实战指南
  • 第十二天 区块链在车辆数据存证中的应用
  • Erp系统介绍与业务方案详情
  • 彻底理解一个知识点的具体步骤
  • 【PP】SAP生产订单(创建-下达-发料-报工-入库)全流程及反向流程
  • VectorNet:自动驾驶中的向量魔法
  • 【Agent】MLGym: A New Framework and Benchmark for Advancing AI Research Agents
  • CVPR2022——立体匹配算法Fast-ACVNet复现
  • 藻华自用数据集学习2025.4.28