当前位置: 首页 > news >正文

SpringBoot整合SSE,基于okhttp

一、引入依赖

<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.10.0</version>
</dependency>
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp-sse</artifactId><version>4.10.0</version>
</dependency>

二、创建 SSE 客户端服务类

import okhttp3.*;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.springframework.stereotype.Service;import java.util.concurrent.TimeUnit;@Service
public class SseClientService {private final OkHttpClient okHttpClient;private EventSource eventSource;public SseClientService() {this.okHttpClient = new OkHttpClient.Builder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(0, TimeUnit.SECONDS) // 0表示不超时.writeTimeout(10, TimeUnit.SECONDS).build();}public void connectToSseServer(String url) {Request request = new Request.Builder().url(url).build();EventSource.Factory factory = EventSources.createFactory(okHttpClient);this.eventSource = factory.newEventSource(request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {System.out.println("SSE连接已建立");}@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {System.out.printf("收到事件: id=%s, type=%s, data=%s%n", id, type, data);// 在这里处理收到的数据}@Overridepublic void onClosed(EventSource eventSource) {System.out.println("SSE连接已关闭");}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {System.err.println("SSE连接失败: " + t.getMessage());// 可以在这里实现重连逻辑reconnect(url);}});}private void reconnect(String url) {try {Thread.sleep(5000); // 5秒后重连connectToSseServer(url);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}public void closeConnection() {if (eventSource != null) {eventSource.cancel();}okHttpClient.dispatcher().executorService().shutdown();}
}

三、 创建控制器测试

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/sse-client")
public class SseClientController {private final SseClientService sseClientService;public SseClientController(SseClientService sseClientService) {this.sseClientService = sseClientService;}@GetMapping("/connect")public String connect() {// 连接到SSE服务器(可以是另一个Spring Boot应用的SSE端点)sseClientService.connectToSseServer("http://localhost:8080/sse-server/subscribe");return "SSE客户端已启动";}@GetMapping("/disconnect")public String disconnect() {sseClientService.closeConnection();return "SSE客户端已关闭";}
}

四、 高级功能实现

1. 自定义事件处理
// 在EventSourceListener中添加更多事件处理
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {switch (type) {case "message":handleMessageEvent(data);break;case "system-alert":handleSystemAlert(data);break;default:handleDefaultEvent(data);}
}
2. 添加认证头
public void connectToSseServerWithAuth(String url, String token) {Request request = new Request.Builder().url(url).header("Authorization", "Bearer " + token).build();// 其余代码与基础实现相同
}
3. 心跳检测
@Override
public void onEvent(EventSource eventSource, String id, String type, String data) {if ("heartbeat".equals(type)) {System.out.println("收到心跳: " + data);return;}// 处理其他事件
}

五、OkHttpConfig单独配置

import okhttp3.OkHttpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.TimeUnit;@Configuration
public class OkHttpConfig {@Beanpublic OkHttpClient okHttpClient() {return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS).readTimeout(0, TimeUnit.SECONDS) // 0表示不超时.writeTimeout(15, TimeUnit.SECONDS).retryOnConnectionFailure(true).build();}
}
http://www.xdnf.cn/news/95113.html

相关文章:

  • Java面试:探索Spring Boot与微服务的深度挑战
  • 01 ubuntu中wps桌面快捷键无法使用
  • C++23 新特性:令声明顺序决定非静态类数据成员的布局 (P1847R4)
  • C++学习:六个月从基础到就业——STL算法(一) 基础与查找算法
  • JS通过GetCapabilities获取wms服务元数据信息并在SuperMap iClient3D for WebGL进行叠加显示
  • C++语言速成,语法及示例宝典汇总整理
  • 状态模式(State Pattern)详解
  • Hooks的使用限制及原因
  • 单例模式:确保唯一实例的设计模式
  • mall-cook 本地运行
  • 基于MTF的1D-2D-CNN-LSTM-Attention时序图像多模态融合的故障识别,适合研究学习(Matlab完整源码和数据),附模型研究报告
  • VUE Element-ui Message 消息提示组件自定义封装
  • Android Cordova 开发 - Cordova 解读初始化项目(index.html meta、Cordova.js、config.xml)
  • 【PCB工艺】运放电路中的负反馈机制
  • 2025.04.23华为机考第三题-300分
  • 零基础入门 Verilog VHDL:在线仿真与 FPGA 实战全流程指南
  • 力扣-第645题《错误的集合》
  • 咖啡机语音芯片方案-WTN6040FP-14S直接驱动4欧/3W喇叭-大功率输出
  • 每日一练(4~23):特别数的和
  • label studio的安装
  • docker底层原理简述
  • 解析虚拟机与Docker容器化服务的本质差异及Docker核心价值
  • 大语言模型(LLM)的Prompt Engineering:从入门到精通
  • Godot学习-3D基本环境设置以及3D角色移动
  • 力扣DAY63-67 | 热100 | 二分:搜索插入位置、搜索二维矩阵、排序数组查找元素、搜索旋转排序数组、搜索最小值
  • 如何预约VMware VCP线下考试?
  • 【Java后端】MyBatis 与 MyBatis-Plus 如何防止 SQL 注入?从原理到实战
  • Kotlin 协程在 LiveData 中的完美封装:CoroutineLiveData 全解
  • Spring Boot 项目:如何在 JAR 运行时读取外部配置文件
  • Ubuntu启动SMB(Samba)服务步骤