当前位置: 首页 > news >正文

OkHttp SSE 完整总结(最终版)

1. SSE 基础概念

什么是 SSE?

SSE(Server-Sent Events)是一种 Web 标准,允许服务器向客户端推送实时数据。

核心特点

  • 单向通信:服务器 → 客户端
  • 基于 HTTP 协议:使用 GET 请求
  • 长连接:连接建立后保持开放
  • 自动重连:网络中断时自动重连

2. SSE 协议详解

HTTP 请求格式

 

GET /events HTTP/1.1

Host: api.example.com

Accept: text/event-stream

Cache-Control: no-cache

服务器响应格式

 

HTTP/1.1 200 OK

Content-Type: text/event-stream

Cache-Control: no-cache

Connection: keep-alive

data: {"message": "Hello"}

data: {"message": "World"}

data: {"message": "Test"}

SSE 数据格式

 

id: 12345

event: message

data: {"content": "Hello World"}

data: {"content": "Another message"}

3. OkHttp SSE 三种实现方式对比

方式一:使用 OkHttp SSE 库(标准 SSE 实现)

依赖配置

 

implementation 'com.squareup.okhttp3:okhttp:4.9.3'implementation 'com.squareup.okhttp3:okhttp-sse:4.9.3'

基本实现

 

OkHttpClient client = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(0, TimeUnit.SECONDS)  // 重要:无读取超时.build();Request request = new Request.Builder().url("https://api.example.com/events").addHeader("Accept", "text/event-stream").build();EventSource eventSource = EventSources.createEventSource(client, request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {System.out.println("SSE 连接建立成功");System.out.println("响应码: " + response.code());System.out.println("响应头: " + response.headers());}@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {// 自动解析 SSE 格式,直接得到 dataSystem.out.println("收到数据: " + data);System.out.println("事件ID: " + id);System.out.println("事件类型: " + type);}@Overridepublic void onClosed(EventSource eventSource) {System.out.println("SSE 连接已关闭");}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {System.err.println("SSE 连接失败: " + t.getMessage());if (response != null) {System.err.println("响应码: " + response.code());}}});

SSE 库的优势
  • ✅ 自动处理 SSE 协议:自动解析 data:、id:、event: 等字段
  • ✅ 内置重连机制:网络中断时自动重连
  • ✅ 标准 SSE 实现:完全符合 SSE 规范
  • ✅ 更高级的 API:提供 EventSource 和 EventSourceListener
  • ✅ 自动连接管理:自动处理连接生命周期

方式二:自定义 SSE 客户端(推荐用于 Kotlin 项目)

依赖配置

 

// 只需要基本的 OkHttp,不需要 SSE 库implementation 'com.squareup.okhttp3:okhttp:4.9.3'

自定义 SSE 客户端实现

先补一下知识点:

trySend、Channel 和 Flow 的工作原理-CSDN博客

再看代码:

class SSEClient {private val okHttpClient by lazy {OkHttpClient.Builder().readTimeout(0, TimeUnit.SECONDS) // 禁用读取超时.build()}private var call: Call? = nullsealed class SSEEvent {data class Message(val event: String, val data: String) : SSEEvent()data class Error(val throwable: Throwable) : SSEEvent()object Closed : SSEEvent()}/*** 连接到 SSE 服务器* @param url SSE 服务器地址* @return Flow 流式返回事件*/fun connect(url: String, token :String): Flow<SSEEvent> = callbackFlow {val request = Request.Builder().url(url).addHeader("Accept", "text/event-stream").addHeader("Cache-Control", "no-cache").addHeader("X-Access-Token", token).build()call = okHttpClient.newCall(request)call?.enqueue(object : Callback {override fun onFailure(call: Call, e: IOException) {trySend(SSEEvent.Error(e))close(e)}override fun onResponse(call: Call, response: Response) {if (!response.isSuccessful) {trySend(SSEEvent.Error(IOException("Unexpected code: ${response.code}")))close()return}val body = response.body ?: returntry {var currentEvent = "message"val dataBuffer = StringBuilder()while (true) {val line = body.source().readUtf8Line() ?: breakwhen {line.startsWith("event:") -> currentEvent = line.substring(6).trim()line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim()).append("\n")line.isEmpty() -> { // 空行表示事件结束if (dataBuffer.isNotEmpty()) {val data = dataBuffer.toString().trim()trySend(SSEEvent.Message(currentEvent, data))dataBuffer.clear()}}}}trySend(SSEEvent.Closed)} catch (e: Exception) {trySend(SSEEvent.Error(e))} finally {response.close()close()}}})awaitClose { disconnect() }}/*** 断开连接*/fun disconnect() {call?.cancel()call = null}/*** 重试连接到 SSE 服务器* @param url SSE 服务器地址* @param token 认证Token* @param maxRetries 最大重试次数* @param retryDelay 重试延迟时间(毫秒)* @return Flow 流式返回事件*/fun connectWithRetry(url: String, token: String, maxRetries: Int = Int.MAX_VALUE, retryDelay: Long = 3000): Flow<SSEEvent> = flow {var retryCount = 0while (retryCount < maxRetries) {connect(url, token).collect { event ->when (event) {is SSEClient.SSEEvent.Closed,is SSEClient.SSEEvent.Error -> {emit(event)if (retryCount < maxRetries) {delay(retryDelay)retryCount++}}else -> emit(event)}}}}
}

使用方式示例

fun initSSE(token: String) {sseClient = SSEClient()val sseUrl = BASE_URL// 启动 SSE 连接sseClient.connect(sseUrl, token).onEach { event ->when (event) {is SSEClient.SSEEvent.Message -> {// 处理事件消息runOnUiThread {mBinding.eventMag.append("Event: ${event.event}\nData: ${event.data}\n\n")}}is SSEClient.SSEEvent.Error -> {// 处理错误Log.e("SSE", "Error: ${event.throwable.message}")}SSEClient.SSEEvent.Closed -> {// 连接关闭Log.i("SSE", "Connection closed")}}}.launchIn(lifecycleScope) // 自动在生命周期结束时取消}

自定义 SSE 客户端的优势
  • ✅ 与 Kotlin Flow 深度集成,支持协程和生命周期自动管理
  • ✅ 事件结构清晰,便于扩展和维护
  • ✅ 可自定义重连、错误处理等高级功能
  • ✅ 只依赖 OkHttp,包体积小

方式三:手动处理 HTTP 流(适合极简和特殊场景)

依赖配置 
implementation 'com.squareup.okhttp3:okhttp:4.9.3'

实现方式

 

suspend fun sendAiQuestionStream(question: String): Flow<String> = flow {var response: ResponseBody? = nulltry {response = ApiManager.apiLogin.sendAiQuestionStream(QuestionRequest(question))val source = response.source()while (!source.exhausted()) {val line = source.readUtf8Line() ?: breakif (line.startsWith("data:")) {val data = line.substring(5).trim()if (data.isNotEmpty() && data != "[DONE]") {emit(data)} else if (data == "[DONE]") {break}}}} finally {response?.close()}}

手动处理方式的优势
  • ✅ 代码极简,完全自定义
  • ✅ 适合只需处理 data: 字段的场景
  • ✅ 适合特殊业务需求(如 AI 流式响应)

4. 三种方式的对比与适用场景

特性/方式OkHttp SSE 库自定义 SSEClient手动处理
依赖OkHttp+SSE库仅OkHttp仅OkHttp
事件结构标准ListenerKotlin Flow/自定义无结构/自定义
自动重连内置可自定义需手动
代码复杂度
灵活性一般最高
适合场景标准SSE推送Kotlin项目/自定义极简/特殊需求

推荐选择:

  • 标准SSE推送、Java项目:OkHttp SSE库
  • Kotlin项目、需要流式/自定义事件/重连:自定义SSEClient
  • 极简、特殊业务、AI流式响应:手动处理

5. 与 WebSocket、HTTP轮询的对比

特性SSEWebSocketHTTP轮询
通信方向单向(服务端→客户端)双向单向/伪双向
协议HTTPWebSocketHTTP
实时性最高
实现复杂度
适用场景实时推送、通知聊天、游戏、协作简单更新
浏览器支持原生原生原生

6. 最佳实践与总结

  • 标准SSE场景:优先用 OkHttp SSE 库,省心省力。
  • Kotlin/协程/流式场景:自定义 SSEClient,灵活扩展,易于维护。
  • 特殊/极简需求:手动处理,代码最少,完全自控。
  • AI流式响应:推荐手动处理,便于处理特殊标记和自定义逻辑。

你的项目已经灵活结合了自定义 SSEClient 和手动处理两种方式,既保证了结构化、可维护性,又能满足特殊业务需求,是非常现代且实用的做法。


结论:

OkHttp SSE 在实际开发中有多种实现方式。你可以根据项目需求选择标准库、自定义客户端或手动处理,三者各有优劣。对于现代 Kotlin 项目,推荐自定义 SSEClient 结合 Flow,既灵活又易于集成和维护。对于极简或特殊场景,手动处理同样高效可靠。

你的实现方式,正是当前最佳实践之一!

http://www.xdnf.cn/news/1116613.html

相关文章:

  • cuda编程笔记(7)--多GPU上的CUDA
  • 敦煌藻井配色:姜黄×钴蓝的东方色彩应用手册
  • CVE-2022-0609
  • 用信号量实现进程互斥,进程同步,进程前驱关系(操作系统os)
  • hercules zos 安裝 jdk 8
  • CTFSHOW pwn161 WP
  • 整流电路Multisim电路仿真实验汇总——硬件工程师笔记
  • 使用macvlan实现容器的跨主机通信
  • KL散度:信息差异的量化标尺 | 从概率分布对齐到模型优化的核心度量
  • C++高频知识点(十一)
  • ALB、NLB、CLB 负载均衡深度剖析
  • 开源工具DeepFilterNet:实时语音降噪
  • 更换docker工作目录
  • 06.计算两个日期之间的差值
  • lambdastream深入剖析
  • 【LeetCode100】--- 4.移动零【复习回顾】
  • mmap映射文件
  • 理解 Robots 协议:爬虫该遵守的“游戏规则”
  • HTML 标题标签
  • AI驱动的软件工程(上):人机协同的设计与建模
  • Python 学习之路(十)--常见算法实现原理及解析
  • 深度学习-循环神经网络RNN
  • 谷歌推出Vertex AI Memory Bank:为AI智能体带来持久记忆,支持连续对话
  • MongoDB性能优化实战指南:原理、实践与案例
  • RedisJSON 技术揭秘(五)`JSON.ARRPOP` 原子弹出 修改数组的终极手段
  • Java设计模式之行为型模式(命令模式)介绍与说明
  • 串口A和S的含义以及RT的含义
  • 深入理解观察者模式:构建松耦合的交互系统
  • 设计模式深度解析:单例、工厂、适配器与代理模式
  • Word中的批注显示与修订显示