当前位置: 首页 > ds >正文

WebSocket功能完整解析

WebSocket功能完整解析

概述

使用WebSocket实现实时双向通信。整个WebSocket系统设计得非常完善,包含了连接管理、消息处理、状态同步等完整功能。

架构设计

1. 分层架构

┌─────────────────┐
│   页面组件层     │  (chat/[id].tsx)
├─────────────────┤
│   状态管理层     │  (stores/chat)
├─────────────────┤
│   Hook封装层     │  (hooks/useWebSocket)
├─────────────────┤
│   工具类层       │  (utils/websocket.ts)
├─────────────────┤
│   WebSocket API  │  (浏览器原生API)
└─────────────────┘

核心组件详解

1. WebSocket管理器 (src/utils/websocket.ts)

这是整个WebSocket系统的核心底层,负责与浏览器原生WebSocket API交互。

主要功能:
export class WebSocketManager {private ws: WebSocket | null = nullprivate options: WebSocketOptions// 连接WebSocketconnect(): Promise<void>// 断开连接disconnect(): void// 发送消息send(data: any): void// 获取连接状态get isConnected(): boolean// 设置各种回调函数setMessageCallback(callback: (message: any) => void): voidsetConnectCallback(callback: () => void): voidsetDisconnectCallback(callback: () => void): void
}
关键特性:
  • Promise化连接:使用Promise包装连接过程,便于异步处理
  • 自动JSON解析:接收消息时自动解析JSON格式
  • 错误处理:完善的错误处理和日志记录
  • 状态管理:实时跟踪连接状态

2. WebSocket Hook (src/hooks/useWebSocket.ts)

这是Vue 3 Composition API的封装层,将WebSocket管理器与Vue组件生命周期结合。

主要功能:
export function useWebSocket(options: WSOptions) {const socketManager = ref<WebSocketManager | null>(null)// 连接WebSocketconst connect = async (params?: WSOptions) => {// 检查tokenif (!useCookie('token').value) return// 创建连接管理器socketManager.value = createWebSocketManager(typeof innerParmas === 'function' ? innerParmas() : innerParmas)// 建立连接await socketManager.value?.connect()// 设置断开连接回调socketManager.value?.setDisconnectCallback(() => {const { message } = useGlobalComponent()message.error('网络连接断开,请刷新页面')})}// 发送消息const sendMessage = <T>(data: T) => {if (!socketManager.value?.isConnected) {const { message } = useGlobalComponent()message.error('网络连接断开,请刷新页面')}socketManager.value?.send(data)}// 断开连接const disconnect = () => {socketManager.value?.disconnect()}// 检查连接状态const isConnected = () => socketManager.value?.isConnected// 组件卸载时自动断开连接onUnmounted(disconnect)return { sendMessage, connect, disconnect, isConnected }
}
关键特性:
  • 自动生命周期管理:组件卸载时自动断开连接
  • Token验证:连接前检查用户token
  • 错误提示:连接断开时自动显示错误信息
  • 类型安全:完整的TypeScript类型支持

3. 聊天状态管理 (src/stores/chat/methods.ts)

这是业务逻辑层,使用Pinia管理聊天相关的状态和操作。

核心状态:
const state = reactive<ChatState>({list: [],           // 消息列表title: '',          // 会话标题isFirst: false,     // 是否首次会话agentState: {       // Agent状态content: 'switch Agent can complete the selection process',show: false,modelType: 'model',thinking: false,network: false,},
})
WebSocket集成:
const { sendMessage, connect, disconnect, isConnected } = useWebSocket(() => ({// WebSocket服务器地址url: `ws://192.168.201.49:8088/api/websocket?token=${useCookie('token').value}`,// 消息处理回调onMessage: msg => {// 处理接收到的消息state.list = onMessage(msg, state.list)// 处理会话完成逻辑if (state.isFirst && [NotificationType.FINISH, NotificationType.END].includes(msg?.data?.type)) {getChatSummary(msg.data.session_id).then(res => {state.title = resstate.isFirst = false})}},})
)
发送消息功能:
const send = (data: string, id: number) => {const lastMsg = state.list[state.list.length - 1]let callerInstanceId = ''// 获取调用者实例IDif (lastMsg && 'caller' in lastMsg) {callerInstanceId = (lastMsg?.caller as { instance_id: string })?.instance_id}// 创建用户消息和响应消息const msg = createUserMessage(data, id, callerInstanceId)const question = createResponseMessage(data, id, callerInstanceId)// 更新消息列表if (state.list.length) {if (state.list[state.list.length - 1]?.session_id === id) {state.list = [...state.list, question]} else {state.list = [question]state.isFirst = true}} else {state.list = [question]state.isFirst = true}// 发送WebSocket消息sendMessage(msg)return question
}
路由监听:
watch(() => route.path,() => {if (import.meta.server) return// 重置状态setTitle('')state.list = []// 检查tokenif (!useCookie('token').value) {return disconnect()}// 如果已连接则跳过if (isConnected()) return// 建立连接connect()},{ immediate: true }
)

4. 消息处理工具 (src/stores/chat/utils.ts)

这是消息处理的核心逻辑,负责解析、转换和更新消息。

消息创建函数:

创建用户消息:

export function createUserMessage(content: string,session_id: number,instance_id?: string
): ChatMessageWrapper {return {direction: ChatDirection.INPUT,type: ChatType.CHAT,data: {session_id,type: 'user_input',content: [{ id: 0, step: 0, type: 'text', content }],agents: [{agent_id: 'agent-commander',custom_id: 'agent-commander',published_version: '1.0.0',url: 'local',type: '',agent_provider_id: 2106919896481796,description: '',name: '',}],tools: [],caller: { type: 'user', instance_id: instance_id || '' },},}
}

创建响应消息:

export function createResponseMessage(content: string,session_id: number,instance_id?: string
): ResponseMessageData {return {id: dayjs().valueOf(),type: NotificationType.QUESTION,content: [{ id: 0, type: 'text', content, step: 0 }],timestamp: Date.now(),read: false,session_id,event_id: '',task_id: 0,agent_id: '',agent_instance_id: 0,parent_agent_instance_id: 0,step: 0,artifact: [],action: {},ext: {},call_batch_id: '',caller_type: 'user',caller_instance_id: instance_id || '',user_id: '',stream_uuid: '',tree: [],has_todo: false,}
}
核心消息处理函数:
export function onMessage(msg: ChatMessageWrapper,messages: ResponseMessageData[]
) {const data = msg.data as ResponseMessageDataif (msg.type === ChatType.CHAT && data.type) {if (data.type === NotificationType.START) {// 开始新会话return [...messages, { ...data, tree: [] }]} else {// 更新现有消息const targetIndex = messages.findIndex(i => i.event_id === data.event_id)if (targetIndex === -1) return messagesconst newMessages = [...messages]const targetData = newMessages[targetIndex]!const target = [...targetData.content]// 处理内容更新(关键:累加内容)if (data.type !== NotificationType.AGENT_START) {data.content.forEach(item => {const targetContentItemIndex = target.findIndex(i =>i.id === data.agent_instance_id &&i.type === item.type &&i.step === data.step)if (targetContentItemIndex !== -1) {// 累加内容(实现流式渲染)const currentItem = target[targetContentItemIndex]!target[targetContentItemIndex] = {...currentItem,content: currentItem.content + item.content,}} else {// 添加新内容项target.push({id: data.agent_instance_id,type: item.type,content: item.content,step: data.step,})}})}// 处理附件data.artifact.forEach(item => {// 处理附件状态更新逻辑})// 更新树形结构createTree(targetData.tree, data)const newData = { ...data, tree: [...targetData.tree], content: target }newMessages[targetIndex] = newDatareturn newMessages}}return messages
}

5. 树形结构管理 (src/stores/chat/utils.ts)

负责管理AI Agent的执行流程树形结构。

export function createTree(tree: ResponseMessageData['tree'],data: ResponseMessageData
) {const node: (typeof tree)[0] = {plan_id: data.agent_instance_id,type: 'base',loading: true,parent_plan_id: data.parent_agent_instance_id,child_plan_ids: [],content: [],status: 'start',label: data.agent_id,timestamp: data.timestamp,}// 处理不同类型的消息switch (data.type) {case NotificationType.AGENT_START:// Agent开始执行node.content = data.content.reduce((acc, item) => {if (item.type === 'text') {acc.push({id: data.agent_instance_id,type: item.type,content: item.content,step: data.step,})}return acc}, [] as (typeof tree)[0]['content'])tree.push(node)breakcase NotificationType.AGENT_END:// Agent执行结束target.loading = falsetarget.status = 'end'breakcase NotificationType.ARTIFACT_START:// 附件处理开始tree.push(node)breakcase NotificationType.ARTIFACT_END:// 附件处理结束target.loading = falsetarget.status = 'end'breakcase NotificationType.ARTIFACT_RESULT:// 附件处理结果target.content = data.content.map(item => ({id: data.agent_instance_id,type: 'artifact',content: JSON.stringify(item),step: data.step,status: 'end',}))break}// 更新父子关系tree.forEach((item, index) => {if (!index) {item.child_plan_ids = []return}if (item.parent_plan_id === -1) {tree[0]!.child_plan_ids.push(item.plan_id)}item.child_plan_ids = tree.reduce((acc, curr) => {if (curr.parent_plan_id === item.plan_id) {acc.push(curr.plan_id)}return acc}, [] as number[])})
}

消息类型系统

1. 基础枚举类型 (src/types/chat.ts)

// WebSocket消息类型
export enum ChatType {CHAT = 'chat',           // 聊天消息HEADRBEAT = 'heartbeat', // 心跳CONNECTED = 'connected', // 连接FINISH = 'finish',       // 正常结束END = 'end',             // SSE断开结束START = 'start',         // 开始
}// 通知类型(业务消息类型)
export enum NotificationType {QUESTION = 'question',           // 用户问题START = 'start',                 // 开始MODEL_START = 'model_start',     // 模型开始MODEL_RESULT = 'model_result',   // 模型结果MODEL_END = 'model_end',         // 模型结束AGENT_START = 'agent_start',     // Agent开始AGENT_RESULT = 'agent_result',   // Agent结果AGENT_END = 'agent_end',         // Agent结束ARTIFACT_START = 'artifact_start', // 附件开始ARTIFACT_RESULT = 'artifact_result', // 附件结果ARTIFACT_END = 'artifact_end',   // 附件结束FINISH = 'finish',               // 完成ERROR = 'error',                 // 错误END = 'end',                     // 结束
}

2. 消息数据结构

// 响应消息数据
export interface ResponseMessageData {has_todo: booleanid: numbertype: NotificationTypecontent: ContentItem[]           // 消息内容数组timestamp: numberread: booleansession_id: numberevent_id: stringtask_id: numberagent_id: stringagent_instance_id: numberparent_agent_instance_id: numberstep: numberartifact: Artifact[]            // 附件数组action: {}ext: {}call_batch_id: stringcaller_type: 'user' | 'agent' | 'event_source' | 'unknown'caller_instance_id: stringuser_id: stringstream_uuid: stringtree: WSTreeNode[]              // 树形结构
}// 内容项
export interface ContentItem {id: numbertype: 'text' | 'think' | 'artifact' | 'image_url' | 'image_base64'status?: 'start' | 'end'content: stringread?: booleanstep: number
}// 树节点
export interface WSTreeNode {plan_id: numbertype: 'start' | 'base'loading: booleanparent_plan_id: number | nullchild_plan_ids: number[]content: ContentItem[]status: 'start' | 'end'label: stringtimestamp: number
}

页面集成

1. 聊天页面 (src/pages/chat/[id].tsx)

// 发送消息回调
const sendCallback = (val: string) => {console.log('发送消息123123:', val)// 检查WebSocket连接状态if (!chatStore.isConnected()) {return message.error('WebSocket未连接, 无法发送消息')}if (!route.params.id) return// 发送消息并更新UIchatStore.value.list = [...chatStore.value.list,chatStore.send(val, Number(route.params.id)),]
}// 监听消息变化,更新画布
watch(() => chatStore.value.list,val => {if (val[val.length - 1]?.tree) {if (timer) returntimer = setTimeout(() => {timer = nullconst { tree } = chatStore.value.list[chatStore.value.list.length - 1]!state.data = generateVueFlowData(tree)  // 生成画布数据state.loading = false}, 300)  // 防抖处理}}
)

SSE支持

项目还支持Server-Sent Events (SSE)作为备选方案:

1. SSE连接类 (src/utils/sse.ts)

export class SSEConnection {private eventSource: EventSource | null = nullprivate messageCallback?: (data: any) => void// 连接SSEconnect(): Promise<void>// 断开连接disconnect(): void// 设置消息回调setMessageCallback(callback: (data: any) => void): void// 获取连接状态get isConnected(): boolean
}

2. SSE API (src/api/chat/method.ts)

// 创建SSE连接
export function createChatSSE(sessionId: string,options?: {onMessage?: (data: any) => void}
): SSEConnection {const baseURL = '/api'const sseUrl = `${baseURL}/events/${sessionId}`return createSSEConnection(sseUrl, options)
}

完整工作流程

1. 连接建立流程

1. 用户访问聊天页面
2. 检查用户token
3. 创建WebSocket连接管理器
4. 建立WebSocket连接
5. 设置消息处理回调
6. 设置断开连接回调

2. 消息发送流程

1. 用户在输入框输入消息
2. 点击发送按钮
3. 检查WebSocket连接状态
4. 创建用户消息对象
5. 创建响应消息对象
6. 更新本地消息列表
7. 通过WebSocket发送消息
8. 触发UI更新

3. 消息接收流程

1. WebSocket接收到服务器消息
2. 解析JSON数据
3. 调用onMessage处理函数
4. 根据消息类型进行不同处理
5. 更新消息列表状态
6. 触发Vue响应式更新
7. 更新UI显示
8. 如果是流式消息,累加内容实现打字机效果

4. 流式渲染机制

1. 服务器发送部分内容
2. 前端接收并累加到现有内容
3. 触发Vue响应式更新
4. TextChunk组件检测内容变化
5. 启动打字机动画
6. 逐字显示内容
7. 动画完成后停止
8. 等待下一批内容

关键特性总结

1. 实时双向通信

  • WebSocket提供全双工通信
  • 支持实时消息发送和接收

2. 流式渲染

  • 内容累加机制实现流式显示
  • 打字机效果提升用户体验

3. 状态管理

  • Pinia管理全局状态
  • 响应式更新确保UI同步

4. 错误处理

  • 完善的连接错误处理
  • 自动重连和用户提示

5. 生命周期管理

  • 自动连接和断开
  • 组件卸载时清理资源

6. 类型安全

  • 完整的TypeScript类型定义
  • 编译时错误检查

7. 扩展性

  • 支持多种消息类型
  • 模块化设计便于扩展

这个WebSocket系统设计得非常完善,既保证了功能的完整性,又考虑了性能和用户体验。通过分层架构和模块化设计,代码结构清晰,易于维护和扩展。

http://www.xdnf.cn/news/19192.html

相关文章:

  • 疯狂星期四文案网第54天运营日记
  • 【web3】十分钟了解web3是什么?
  • golang接口详细解释
  • Maya绑定:连接编辑器的简单使用
  • HGDB全文检索/中文分词的使用
  • 机器人电源电感的认证和认证细节,知多少?
  • Web网络开发 -- 常见CSS属性
  • 使用Docker搭建StackEdit在线MarkDown编辑器
  • 【论文阅读】CLIP: 从自然语言监督中学习可迁移的视觉模型
  • 【Depth与RGB对齐算法(D2C)】
  • 首次创建Django项目初始化
  • 沙箱操作工具
  • 计算机组成原理3-3-1:无符号数乘法运算的硬件逻辑实现
  • 学习做动画6.瞄准偏移
  • JavaScript初识:给小白的第一堂编程课
  • 大数据毕业设计选题推荐-基于大数据的痴呆症预测数据可视化分析系统-Spark-Hadoop-Bigdata
  • openEuler常用操作指令
  • AT_abc407_e [ABC407E] Most Valuable Parentheses
  • 客户案例 | 国际知名内衣品牌x甄知科技,领航IT服务新征程
  • 算法题打卡力扣第15题:三数之和(mid)
  • 用 PyTorch 搭建 CNN 实现 MNIST 手写数字识别
  • QT:【第一个QT程序】【信号和槽】
  • 2025通用证书研究:方法论、岗位映射与四证对比
  • 腾讯云重保流程详解:从预案到复盘的全周期安全防护
  • 【练习九】Java实现加油站支付小程序:存款与消费
  • 大数据原生集群 (Hadoop3.X为核心) 本地测试环境搭建三
  • Unity游戏打包——Android打包环境(Mac下)
  • 0828 C++基础
  • PhotoshopImageGenerator:基于Photoshop的自动化图像数据集生成工具
  • BGP路由协议(一):基本概念