当前位置: 首页 > backend >正文

LLM对话框项目 EventSource封装和MessageServiceClass流式展示封装

你这段代码实现了一个对浏览器原生 EventSource 的封装,主要目的是更方便地管理服务端事件推送(Server-Sent Events,简称 SSE),如自动构建 URL、配置管理、连接控制等。


EventSourceAPI

✅ 面试时介绍建议(逻辑清晰)

面试官您好,我封装了一个 EventSourceWrapper 类,用于统一管理 SSE(Server-Sent Events)连接。这是一个单例模式实现,确保在整个应用生命周期中只有一个 SSE 实例。以下是它的几个核心设计点:
封装了一个 EventSourceWrapper 类,用于统一管理客户端与服务端之间的 SSE(Server-Sent Events)连接。这个类采用单例模式确保全局只存在一个长连接实例,避免重复连接的问题。它支持配置参数的动态更新通过构造函数进行依赖注入),自动构建带查询参数的连接 URL,并对 onopen、onmessage、onerror 和关闭事件做了统一的封装处理。考虑到原生 SSE 的自动重连机制较为有限(不能监听重连次数、不知道是否已经掉线 、不可自定义重连逻辑等),我还设计了一个可选的自定义重连机制,支持最大重试次数、指数退避等策略,使连接管理更加可控和健壮。此外,它还提供了 reconnect 接口,便于业务层在需要时手动触发重连。


1. 单例模式保证全局唯一性

  • 类中通过静态方法 getESWrapperInstance 创建或复用唯一实例。
  • 适用于聊天类应用中长连接唯一的场景,避免资源浪费或重复连接
static getESWrapperInstance(): EventSourceWrapper

2. 配置灵活、支持动态更新

  • 初始配置通过构造函数传入(依赖注入),内部通过 setConfig 实现对原配置的合并更新(特别是 queryParams 的深层合并)。
setConfig(config: Partial<EventSourceConfig>)

3. 自动构建连接 URL

  • 封装了 buildUrl 方法,根据 queryParams 自动生成合法 SSE 请求地址。

4. 事件监听器封装清晰

  • onopen, onmessage, onerror, close 全部通过统一接口回调管理。
  • 错误处理逻辑健壮,连接异常自动关闭,并调用 onError 通知业务层。
  • 提供 reconnect 方法便于手动重连。

5. 易于拓展

  • 可以按需添加 onRetry, onReconnect 等回调,拓展性强。

//utils/EventSourceWrapper.ts
import {EventSourceConfig} from "../type/EventSourceConfig";export class EventSourceWrapper {private eventSource: EventSource | null = null;private config: EventSourceConfig;private isActive: boolean = false;private static instance:EventSourceWrapper|null=null;private retryCount = 0;private reconnectTimer: ReturnType<typeof setTimeout> | null = null;constructor(options: EventSourceConfig) {this.config = {withCredentials: false,...options};}static getESWrapperInstance():EventSourceWrapper{if(!this.instance) {this.instance = new EventSourceWrapper({url: "/dev-api/chatStream",queryParams: {content: "",contentType: "",},onError: (err) => {console.log('Error occurred:', err);}});}return this.instance;}getConfig():EventSourceConfig{return this.config;}//Partial 是 TypeScript 中的一个工具类型,用于创建一个新类型,其中所有属性都是可选的。setConfig(config: Partial<EventSourceConfig>): void {this.config = {...this.config, // 保留原有配置...config,      // 覆盖新配置queryParams: {  // 针对嵌套对象特殊处理...this.config?.queryParams,...config?.queryParams},};}private scheduleReconnect(): void {const {autoReconnect = false,maxRetries = 5,retryInterval = 2000,backoffMultiplier = 2} = this.config;if (!autoReconnect) return;if (this.retryCount >= maxRetries) {console.warn('SSE max retries reached');this.config.onError?.(new Error('Max reconnect attempts reached'));return;}const delay = retryInterval * Math.pow(backoffMultiplier, this.retryCount);console.log(`Retry #${this.retryCount + 1} in ${delay}ms`);this.reconnectTimer = setTimeout(() => {this.retryCount++;this.connect();}, delay);}private buildUrl(): string {const { url, queryParams } = this.config;return queryParams? `${url}?${new URLSearchParams(queryParams)}`: url;}connect(): void {if (this.isActive) return;try {this.isActive = true;const url = this.buildUrl();this.eventSource = new EventSource(url, {withCredentials: this.config.withCredentials});this.eventSource.onopen = (event) => {console.log('SSE connection established', event);};this.eventSource.onmessage = ({ data }) => {this.config.onMessage?.(data);this.retryCount = 0;if (this.reconnectTimer) {clearTimeout(this.reconnectTimer);this.reconnectTimer = null;}};//解答完一次为什么都会报错this.eventSource.onerror = (event) => {console.error('SSE connection error:', event);this.close();this.config.onError?.(new Error('Connection failed'));this.scheduleReconnect(); // 自定义重连};this.eventSource.addEventListener('close', () => {this.close();// console.log("close")this.config.onEnd?.("SSE connection close");});} catch (error) {this.close();this.config.onError?.(error instanceof Error ? error : new Error('Connection error'));}}close(): void {if (this.eventSource) {this.eventSource.close();this.eventSource = null;this.isActive = false;}if (this.reconnectTimer) {clearTimeout(this.reconnectTimer);this.reconnectTimer = null;}}// 手动重新连接(如果需要)reconnect(): void {this.close();this.connect();}
}

✅ Mermaid 类图

EventSourceWrapper
-eventSource: EventSource
-config: EventSourceConfig
-isActive: boolean
-retryCount: number
-reconnectTimer: Timeout
-static instance: EventSourceWrapper
+constructor(config: EventSourceConfig)
+connect()
+close()
+reconnect()
+getConfig()
+setConfig(config: Partial)
+static getESWrapperInstance()
-buildUrl()
-scheduleReconnect()
EventSourceConfig
+url: string
+queryParams?: Record<string, string>
+withCredentials?: boolean
+autoReconnect?: boolean
+maxRetries?: number
+retryInterval?: number
+backoffMultiplier?: number
+onMessage?:(data)
+onError?:(error)
+onEnd?:(msg: string)

✅ 面试官可能问的问题及回应建议

面试官提问应答建议
为什么使用单例?保证全局只有一个连接实例,避免多个 SSE 并发浪费资源或产生冲突。
错误处理怎么做的?onerror 中做了关闭连接和触发回调,并可通过 reconnect() 手动重连。
如何支持动态传参?通过 setConfig 合并旧配置,支持 queryParams 动态更新。
线程安全问题?由于前端 JS 是单线程的,不涉及并发修改,设计上已满足需求。

MessageServiceClass

你这段代码是一个非常清晰、职责明确的封装,负责处理“用户发送消息 -> 通过 SSE 获取 AI 响应 -> 流式展示结果”的完整流程。


// src/services/MessageService.ts
import { v4 as uuidv4 } from 'uuid';
import {ContentType} from "../type/Info.ts";
import {EventSourceWrapper} from "./EventSourceWrapper.ts";
import {ChatMessage} from "../type/EventSourceConfig.ts";type MessageHandler = {setHistoryContent: (updater: (prev: ChatMessage[]) => ChatMessage[]) => void;setContent: (content: string) => void;setStreamId: (id: string | null) => void;esInstance: EventSourceWrapper; // SSE连接配置类型需根据实际SDK定义
};export class MessageService {private botMessageId: string | null = null;constructor(private handler: MessageHandler) {}public send = async (content: string) => {if (!this.validateInput(content)) return;this.addUserMessage(content);this.prepareBotResponse();try {await this.establishSSEConnection(content);} catch (error) {this.handleError(error as Error);} finally {this.cleanup();}};private validateInput = (content: string): boolean => {const trimmed = content.trim();if (!trimmed) {console.warn('空消息被拦截');return false;}return true;};private addUserMessage = (content: string) => {this.handler.setHistoryContent(prev => [...prev,{ id: uuidv4(), author: 'User', text: content }]);};private prepareBotResponse = () => {this.botMessageId = uuidv4();this.handler.setHistoryContent(prev => [...prev,{id: this.botMessageId,author: 'Assistance',text: 'AI正在思考中...',isStreaming: true}]);this.handler.setStreamId(this.botMessageId);};private establishSSEConnection = (content: string) => {return new Promise<void>((resolve, reject) => {try {this.handler.esInstance.setConfig({onMessage: (chunk: string) =>{this.handleChunk(chunk);}});this.handler.esInstance.setConfig({queryParams:{content,contentType: ContentType.TXT}});this.handler.esInstance.setConfig({onEnd: (arg) => {console.log(arg)resolve();}})this.handler.esInstance.connect();this.handler.setContent('');} catch (error) {reject(error);}});};private handleChunk = (chunk: string) => {if (!this.botMessageId) return;this.handler.setHistoryContent(prev =>prev.map(msg =>msg.id === this.botMessageId? {...msg,text: msg.text === 'AI正在思考中...'? chunk: msg.text + chunk.replace(/\\n/g, '\n')}: msg));};private handleError = (error: Error) => {console.error('消息服务错误:', error);if (!this.botMessageId) return;this.handler.setHistoryContent(prev =>prev.map(msg =>msg.id === this.botMessageId? { ...msg, text: '请求失败,请重试', isStreaming: false }: msg));};private cleanup = () => {this.handler.setStreamId(null);this.handler.setHistoryContent(prev =>prev.map(msg =>msg.id === this.botMessageId? { ...msg, isStreaming: false }: msg));this.botMessageId = null;};
}

✅ 逻辑梳理

整个 MessageService 类的工作流程如下:

🌟 1. 用户输入消息后,触发 send(content)

  1. validateInput(content):过滤掉空消息。

  2. addUserMessage(content):将用户消息添加到历史记录。

  3. prepareBotResponse():生成一个唯一 bot 消息 ID,先展示 "AI正在思考中..." 的占位消息,并标记为 isStreaming: true

  4. establishSSEConnection(content):启动 SSE 连接:

    • 设置 onMessage 处理流式片段(chunk)
    • 设置 onEnd 处理结束回调
    • 设置 queryParams 作为请求参数
    • 调用 connect() 启动事件源
  5. handleChunk(chunk):每次服务端发送数据片段,就更新 AI 响应内容。

  6. 若出错,handleError() 会将 bot 消息内容替换为“请求失败,请重试”。

  7. 最后调用 cleanup():关闭流式标记、重置状态。


🧠 一句话向面试官介绍这个类

我封装了一个 MessageService 类,专门用于管理用户消息的发送与 AI 响应的流式展示。它通过注入的 handler 操作 UI 层状态,并结合我封装的 EventSourceWrapper 实现稳定的 SSE 长连接,支持自动重连、状态清理与错误处理等能力。整个消息流转过程被拆分为输入校验、消息插入、连接建立、数据处理、异常兜底、连接回收六个步骤,逻辑清晰、职责单一,有利于维护和扩展。


📈 Mermaid 类图

MessageService
-botMessageId: string
-handler: MessageHandler
+constructor(handler: MessageHandler)
+send(content: string)
-validateInput(content: string)
-addUserMessage(content: string)
-prepareBotResponse()
-establishSSEConnection(content: string)
-handleChunk(chunk: string)
-handleError(error: Error)
-cleanup()
EventSourceWrapper
ChatMessage
+id: string
+author: string
+text: string
+isStreaming?: boolean
MessageHandler
+esInstance: EventSourceWrapper
+setHistoryContent(updater)
+setContent(content)
+setStreamId(id)

在这里插入图片描述

✅ 亮点总结(面试加分点)

亮点说明
状态分离MessageService 不直接操作 UI,而是通过 MessageHandler 依赖注入,解耦逻辑与视图。
渐进式流处理支持服务端分片返回(chunk)消息,实时拼接展示。
可拓展性强错误处理、连接状态、Bot 消息标记都独立封装,便于扩展如中断续传、撤回、重试等功能。
复用性好配合封装的 EventSourceWrapper,可以在多个模块中复用消息推送逻辑。

http://www.xdnf.cn/news/14410.html

相关文章:

  • MFE微前端高级版:Angular + Module Federation + webpack + 路由(Route way)完整示例
  • vue相关爬坑总结
  • [windows工具]OCR多区域识别导出excel工具1.2版本使用教程及注意事项
  • 变幻莫测:CoreData 中 Transformable 类型面面俱到(四)
  • 深度神经网络学习
  • 设计模式-装饰器模式
  • React-router 路由历史的模式和原理
  • AI 神经网略小白学习笔记(一) -- 环境搭建
  • 【1】Redis 缓存穿透原理和解决方案
  • [AAAI Oral] 简单通用的公平分类方法
  • React-router 多类型历史记录栈
  • 《仿盒马》app开发技术分享-- 回收金提现安全锁校验(端云一体)
  • NodeJS中老生代和新生代和垃圾回收机制
  • Arduino入门教程:6、计时与定时
  • 阿帕奇基金会软件授权与公司贡献者许可协议(中英双语版)
  • (笔记)1.web3学习-区块链技术
  • Web3-代币ERC20/ERC721以及合约安全溢出和下溢的研究
  • EXCEL破解VBA密码 ( 仅供学习研究使用)
  • [VSCode] VSCode 设置 python 的编译器
  • 40-Oracle 23 ai Bigfile~Smallfile-Basicfile~Securefile矩阵对比
  • NodeJS里经常用到require,require的模块加载机制是什么
  • lua版的Frpc
  • go.work
  • 车载通信架构 --- IP ECU 在连接被拒绝后的重连机制
  • Spring Cloud Gateway 全面学习指南
  • 论文略读:MLPs Learn In-Context on Regression and Classification Tasks
  • CM工作室发展史 下
  • Python装饰器:优雅增强函数行为的艺术
  • AI+预测3D新模型百十个定位预测+胆码预测+去和尾2025年6月14日第108弹
  • Win10安装DockerDesktop踩坑记