OkHttp SSE 完整总结(最终版)
1. SSE 基础概念
什么是 SSE?
SSE(Server-Sent Events)是一种 Web 标准,允许服务器向客户端推送实时数据。
核心特点
- 单向通信:服务器 → 客户端
- 基于 HTTP 协议:使用 GET 请求
- 长连接:连接建立后保持开放
- 自动重连:网络中断时自动重连
2. SSE 协议详解
HTTP 请求格式
GET /events HTTP/1.1
Host: api.example.com
Accept: text/event-stream
Cache-Control: no-cache
服务器响应格式
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: {"message": "Hello"}
data: {"message": "World"}
data: {"message": "Test"}
SSE 数据格式
id: 12345
event: message
data: {"content": "Hello World"}
data: {"content": "Another message"}
3. OkHttp SSE 三种实现方式对比
方式一:使用 OkHttp SSE 库(标准 SSE 实现)
依赖配置
implementation 'com.squareup.okhttp3:okhttp:4.9.3'implementation 'com.squareup.okhttp3:okhttp-sse:4.9.3'
基本实现
OkHttpClient client = new OkHttpClient.Builder().connectTimeout(30, TimeUnit.SECONDS).readTimeout(0, TimeUnit.SECONDS) // 重要:无读取超时.build();Request request = new Request.Builder().url("https://api.example.com/events").addHeader("Accept", "text/event-stream").build();EventSource eventSource = EventSources.createEventSource(client, request, new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {System.out.println("SSE 连接建立成功");System.out.println("响应码: " + response.code());System.out.println("响应头: " + response.headers());}@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {// 自动解析 SSE 格式,直接得到 dataSystem.out.println("收到数据: " + data);System.out.println("事件ID: " + id);System.out.println("事件类型: " + type);}@Overridepublic void onClosed(EventSource eventSource) {System.out.println("SSE 连接已关闭");}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {System.err.println("SSE 连接失败: " + t.getMessage());if (response != null) {System.err.println("响应码: " + response.code());}}});
SSE 库的优势
- ✅ 自动处理 SSE 协议:自动解析 data:、id:、event: 等字段
- ✅ 内置重连机制:网络中断时自动重连
- ✅ 标准 SSE 实现:完全符合 SSE 规范
- ✅ 更高级的 API:提供 EventSource 和 EventSourceListener
- ✅ 自动连接管理:自动处理连接生命周期
方式二:自定义 SSE 客户端(推荐用于 Kotlin 项目)
依赖配置
// 只需要基本的 OkHttp,不需要 SSE 库implementation 'com.squareup.okhttp3:okhttp:4.9.3'
自定义 SSE 客户端实现
先补一下知识点:
trySend、Channel 和 Flow 的工作原理-CSDN博客
再看代码:
class SSEClient {private val okHttpClient by lazy {OkHttpClient.Builder().readTimeout(0, TimeUnit.SECONDS) // 禁用读取超时.build()}private var call: Call? = nullsealed class SSEEvent {data class Message(val event: String, val data: String) : SSEEvent()data class Error(val throwable: Throwable) : SSEEvent()object Closed : SSEEvent()}/*** 连接到 SSE 服务器* @param url SSE 服务器地址* @return Flow 流式返回事件*/fun connect(url: String, token :String): Flow<SSEEvent> = callbackFlow {val request = Request.Builder().url(url).addHeader("Accept", "text/event-stream").addHeader("Cache-Control", "no-cache").addHeader("X-Access-Token", token).build()call = okHttpClient.newCall(request)call?.enqueue(object : Callback {override fun onFailure(call: Call, e: IOException) {trySend(SSEEvent.Error(e))close(e)}override fun onResponse(call: Call, response: Response) {if (!response.isSuccessful) {trySend(SSEEvent.Error(IOException("Unexpected code: ${response.code}")))close()return}val body = response.body ?: returntry {var currentEvent = "message"val dataBuffer = StringBuilder()while (true) {val line = body.source().readUtf8Line() ?: breakwhen {line.startsWith("event:") -> currentEvent = line.substring(6).trim()line.startsWith("data:") -> dataBuffer.append(line.substring(5).trim()).append("\n")line.isEmpty() -> { // 空行表示事件结束if (dataBuffer.isNotEmpty()) {val data = dataBuffer.toString().trim()trySend(SSEEvent.Message(currentEvent, data))dataBuffer.clear()}}}}trySend(SSEEvent.Closed)} catch (e: Exception) {trySend(SSEEvent.Error(e))} finally {response.close()close()}}})awaitClose { disconnect() }}/*** 断开连接*/fun disconnect() {call?.cancel()call = null}/*** 重试连接到 SSE 服务器* @param url SSE 服务器地址* @param token 认证Token* @param maxRetries 最大重试次数* @param retryDelay 重试延迟时间(毫秒)* @return Flow 流式返回事件*/fun connectWithRetry(url: String, token: String, maxRetries: Int = Int.MAX_VALUE, retryDelay: Long = 3000): Flow<SSEEvent> = flow {var retryCount = 0while (retryCount < maxRetries) {connect(url, token).collect { event ->when (event) {is SSEClient.SSEEvent.Closed,is SSEClient.SSEEvent.Error -> {emit(event)if (retryCount < maxRetries) {delay(retryDelay)retryCount++}}else -> emit(event)}}}}
}
使用方式示例
fun initSSE(token: String) {sseClient = SSEClient()val sseUrl = BASE_URL// 启动 SSE 连接sseClient.connect(sseUrl, token).onEach { event ->when (event) {is SSEClient.SSEEvent.Message -> {// 处理事件消息runOnUiThread {mBinding.eventMag.append("Event: ${event.event}\nData: ${event.data}\n\n")}}is SSEClient.SSEEvent.Error -> {// 处理错误Log.e("SSE", "Error: ${event.throwable.message}")}SSEClient.SSEEvent.Closed -> {// 连接关闭Log.i("SSE", "Connection closed")}}}.launchIn(lifecycleScope) // 自动在生命周期结束时取消}
自定义 SSE 客户端的优势
- ✅ 与 Kotlin Flow 深度集成,支持协程和生命周期自动管理
- ✅ 事件结构清晰,便于扩展和维护
- ✅ 可自定义重连、错误处理等高级功能
- ✅ 只依赖 OkHttp,包体积小
方式三:手动处理 HTTP 流(适合极简和特殊场景)
依赖配置
implementation 'com.squareup.okhttp3:okhttp:4.9.3'
实现方式
suspend fun sendAiQuestionStream(question: String): Flow<String> = flow {var response: ResponseBody? = nulltry {response = ApiManager.apiLogin.sendAiQuestionStream(QuestionRequest(question))val source = response.source()while (!source.exhausted()) {val line = source.readUtf8Line() ?: breakif (line.startsWith("data:")) {val data = line.substring(5).trim()if (data.isNotEmpty() && data != "[DONE]") {emit(data)} else if (data == "[DONE]") {break}}}} finally {response?.close()}}
手动处理方式的优势
- ✅ 代码极简,完全自定义
- ✅ 适合只需处理 data: 字段的场景
- ✅ 适合特殊业务需求(如 AI 流式响应)
4. 三种方式的对比与适用场景
特性/方式 | OkHttp SSE 库 | 自定义 SSEClient | 手动处理 |
---|---|---|---|
依赖 | OkHttp+SSE库 | 仅OkHttp | 仅OkHttp |
事件结构 | 标准Listener | Kotlin Flow/自定义 | 无结构/自定义 |
自动重连 | 内置 | 可自定义 | 需手动 |
代码复杂度 | 低 | 中 | 低 |
灵活性 | 一般 | 高 | 最高 |
适合场景 | 标准SSE推送 | Kotlin项目/自定义 | 极简/特殊需求 |
推荐选择:
- 标准SSE推送、Java项目:OkHttp SSE库
- Kotlin项目、需要流式/自定义事件/重连:自定义SSEClient
- 极简、特殊业务、AI流式响应:手动处理
5. 与 WebSocket、HTTP轮询的对比
特性 | SSE | WebSocket | HTTP轮询 |
---|---|---|---|
通信方向 | 单向(服务端→客户端) | 双向 | 单向/伪双向 |
协议 | HTTP | WebSocket | HTTP |
实时性 | 高 | 最高 | 低 |
实现复杂度 | 低 | 中 | 低 |
适用场景 | 实时推送、通知 | 聊天、游戏、协作 | 简单更新 |
浏览器支持 | 原生 | 原生 | 原生 |
6. 最佳实践与总结
- 标准SSE场景:优先用 OkHttp SSE 库,省心省力。
- Kotlin/协程/流式场景:自定义 SSEClient,灵活扩展,易于维护。
- 特殊/极简需求:手动处理,代码最少,完全自控。
- AI流式响应:推荐手动处理,便于处理特殊标记和自定义逻辑。
你的项目已经灵活结合了自定义 SSEClient 和手动处理两种方式,既保证了结构化、可维护性,又能满足特殊业务需求,是非常现代且实用的做法。
结论:
OkHttp SSE 在实际开发中有多种实现方式。你可以根据项目需求选择标准库、自定义客户端或手动处理,三者各有优劣。对于现代 Kotlin 项目,推荐自定义 SSEClient 结合 Flow,既灵活又易于集成和维护。对于极简或特殊场景,手动处理同样高效可靠。
你的实现方式,正是当前最佳实践之一!