当前位置: 首页 > ops >正文

Robyn高性能Web框架系列06:使用WebSocket实现产品智能助理

使用WebSocket实现产品智能助理

    • WebSocket原理与应用场景
    • Robyn的WebSocket基本使用
      • 1、创建WebSocket服务
      • 2、侦听WebSocket事件
      • 3、向客户端发送消息
      • 4、向客户端广播消息
      • 5、使用查询参数
      • 6、主动关闭连接
    • 示例:简易的产品智能助理
      • 1、产品数据部分
      • 2、产品信息部分
      • 3、智能助理部分


Robyn原生支持WebSocket协议,通过WebSocket协议,客户端(通常是浏览器)和服务器可以随时主动向对方发送数据,这在一定程度上弥补了HTTP协议在实时性上的不足,并带来了一些新的应用场景,本节我们介绍基于Robyn的WebSocket应用开发。

WebSocket原理与应用场景

WebSocket 是一种基于 TCP 的全双工通信协议,允许客户端和服务器之间进行持续、低延迟的双向通信。它是为了解决传统 HTTP 协议的请求-响应限制而设计的,尤其适合实时性要求高的场景,如聊天室、在线游戏、股票行情推送等。
在使用HTTP协议来实现这些实时性要求高的功能时,通常只能通过轮询或长轮询的方式,这就会带来以下问题:

  • 效率低: 大部分轮询请求可能没有新数据(浪费带宽和服务器资源)。
  • 延迟高: 从事件发生到客户端知道,需要等待下一个轮询请求的间隔时间。
  • 开销大: 每个 HTTP 请求都包含完整的HTTP头信息,即使数据体很小。

而WebSocket则通过下列设计特点解决了上述问题:

  • 一次握手,持久连接
    连接开始时,客户端通过一个特殊的 HTTP Upgrade 请求发起 WebSocket 握手。如果服务器支持 WebSocket协议,它会返回一个 101 Switching Protocols 响应。一旦握手成功,底层的 TCP 连接就保持打开状态,用于后续的 WebSocket 数据帧传输,不需要反复建立连接。

  • 全双工通信
    连接建立后,服务器和客户端可以随时、独立地向对方发送数据。服务器不需要等待客户端的请求就能主动推送信息。

  • 轻量级数据帧
    WebSocket 传输的数据单位是“帧”。相比 HTTP 请求/响应的头部,WebSocket 帧的头部开销非常小(通常只有几个字节)。这对于传输小量但频繁的数据(如聊天消息、游戏指令)特别高效。

  • 低延迟
    由于连接是持久的,并且服务器可以主动推送,数据从一端产生到另一端接收的延迟极低,接近网络传输本身的延迟。

下面是WebSocket 基本通信过程:

客户端(浏览器) 服务器 HTTP 请求\n(带 Upgrade: websocket 头) HTTP 101 Switching Protocols\n(协议升级确认) WebSocket 连接建立成功\n进入全双工通信状态 发送消息(如:{"type": "ping"}) 返回消息(如:{"type": "pong"}) 主动推送数据(如:价格更新) 客户端响应 / 再次发送数据 可持续传输,直到任一方关闭连接 关闭连接请求(Close Frame) 确认关闭(Close Frame) WebSocket 连接关闭 客户端(浏览器) 服务器

Robyn的WebSocket基本使用

1、创建WebSocket服务

Robyn可以通过实例化一个WebSocket类,绑定到Robyn主应用中,就能为当前Robyn应用开启WebSocket功能。

from robyn import Robyn, jsonify, WebSocketapp = Robyn(__file__)
websocket = WebSocket(app, "/web_socket")

2、侦听WebSocket事件

通过WebSocket的on方法,可以为不同的WebSocket的connectmessageclose事件编写不同的处理代码。

@websocket.on("message")
def connect():return "Hello world, from ws"@websocket.on("close")
def close():return "Goodbye world, from ws"@websocket.on("connect")
def message():return "Connected to ws"

3、向客户端发送消息

通过WebSocket的sync_send_to方法(同步方式)或async_send_to方法(异步方式)可以向客户端发送消息。

@websocket.on("message")def message(ws, msg, global_dependencies) -> str:websocket_id = ws.idws.sync_send_to(websocket_id, "This is a message to self")# await ws.async_send_to(websocket_id, "This is a message to self")return ""

4、向客户端广播消息

通过WebSocket的sync_broadcast方法(同步方式)或async_broadcast方法(异步方式)可以向客户端广播消息。

  @websocket.on("message")def message(ws, msg, global_dependencies) -> str:websocket_id = ws.idws.sync_broadcast("This is a message to self")return ""

5、使用查询参数

在WebSocket路由函数中同样也能处理查询参数,WebSocket提供了query_params集合用于存储来自客户端的查询参数。

  @websocket.on("message")async def message(ws, msg, global_dependencies) -> str:websocket_id = ws.idif (ws.query_params.get("name") == "gordon" and ws.query_params.get("desg") == "commissioner"):ws.sync_broadcast("Gordon authorized to login!")return ""

6、主动关闭连接

通过WebSocket的close方法,可以主动关闭与客户端的连接,并将该客户端从WebSocket 注册表中删除。

@websocket.on("message")
def message(ws, msg):if msg == "disconnect":ws.close()return "Closing connection"return "Message received"

示例:简易的产品智能助理

下面我们通过一个综合实例来进一步熟悉一下Robyn的WebSocket编程。在实例中,我们模拟一个产品智能助理程序,用户可以通过与产品智能助理的交流来了解一款产品的基本信息。整个程序结构大体如下:

产品信息页面
产品数据库
产品智能助理

1、产品数据部分

产品数据部分主要为产品信息页面产品智能助理提供基础数据,它可以是来自Mysql等数据库,这里用dict做简单的模拟。

product_list={  "1": {"id":"1","name": "iPhone 14 Pro", "price": 999, "color": "黑色、银色", "size": "6.1英寸", "delivery": "2-3天"},  "2": {"id":"2","name": "Samsung Galaxy S22", "price": 799, "color": "白色、黑色、红色", "size": "6.2英寸", "delivery": "3-5天"},  "3": {"id":"3","name": "Google Pixel 6", "price": 599, "color": "蓝色、白色", "size": "6.4英寸", "delivery": "1-2天"}  
}

2、产品信息部分

产品信息部分用于展示产品基本信息,它是一个普通的网页,这里我们使用了Robyn的模板功能,结合使用Jinja2生成一个动态的产品网页。
1)初始化Jinja2模板引擎

import json  
import os  
import pathlib  
from robyn import Robyn, WebSocket  
from robyn.templating import JinjaTemplate  app = Robyn(__file__)  
current_file_path = pathlib.Path(__file__).parent.resolve()  
jinja_template = JinjaTemplate(os.path.join(current_file_path, "templates"))

2)编写html模板文件

<!DOCTYPE html>  
<html lang="en">  
<head>  <meta charset="UTF-8">  <title>{{name}}</title>  
</head>  
<body>  
<h2>Robyn Product Assistant</h2>  
<hr />  
<div id="product_info">  <h3>{{name}}</h3>  
</div>
</body>  
</html>

3)编写产品信息路由函数

@app.get("/product/:product_id")  
async def product_info(request,path_params):  product_id = path_params["product_id"]  if product_id in product_list:  product = product_list[product_id]  return jinja_template.render_template("product.html", **product)  else:  return "404 Not Found"

3、智能助理部分

智能助理部分包括嵌入在产品信息页面中的Javascript websocket 客户端和利用Robyn的WebSocket技术编写的服务端两个部分。
1)智能助理服务端帮助函数

# 用于根据产品信息产生咨询结果,这里可以通过连接大模型获得更好的咨询结果
def gen_response(msg, product):  if '价格' in msg:  return f"{product['name']}现在只需要{product['price']}美元哦。"  elif '颜色' in msg:  return f"{product['name']}有多种颜色,包括{product['color']}。"  elif '尺寸' in msg:  return f"{product['name']}的屏幕尺寸是{product['size']}。"  elif '配送' in msg:  return f"{product['name']}的配送时间大约是{product['delivery']}。"  else:  return "抱歉,我不太明白您的意思。"  # 用于统计当前的咨询人数  
def update_customer_count(count, global_dependencies):  customer_count = global_dependencies.get("customer_count", 0)  new_count = count + customer_count  global_dependencies["customer_count"] = new_count  return new_count

2)智能助理服务端
我们在客户端建立和关闭连接时广播一次咨询人数,并通过使用query_params来判断当前用户所咨询的产品信息,最后根据用户发送的消息来生成咨询响应结果。

pa = WebSocket(app, "/pa")  @pa.on("connect")  
def connect(ws, global_dependencies):  new_count = update_customer_count(1, global_dependencies)  ws.sync_broadcast(json.dumps({  "type": "customer_count",  "msg": new_count  }))  @pa.on("close")  
def close(ws, global_dependencies):  new_count = update_customer_count(-1, global_dependencies)  ws.sync_broadcast(json.dumps({  "type": "customer_count",  "count": new_count  }))  @pa.on("message")  
def message(ws, msg, global_dependencies):  product_id = ws.query_params.get("id",None)  if product_id and len(msg)>0:  ws.sync_broadcast(json.dumps({  "type": "response",  "msg": gen_response(msg, product_list.get(product_id))  }))

3)Javascript websocket 客户端
我们在产品信息的html页面中加入以下代码作为智能助理的客户端部分。

<div>  <span id="online_count">0</span>人正在咨询。  <textarea id="msg" placeholder="请输入您需要了解的内容,如颜色、尺寸、价格、配送方式等。" cols="50" rows="8"></textarea><br />  <button onclick="send()">发送消息</button>  <hr />  <h3>消息记录</h3>  <div id="messages" style="list-style-type: none; padding: 0;"></div>  
</div>  
<script>  const ws = new WebSocket("ws://0.0.0.0:8080/pa?id={{id}}");  ws.onmessage = (event) => {  const rep = JSON.parse(event.data)  if (rep.type === 'customer_count') {  document.getElementById("online_count").innerText = rep.msg;  }else{  addMessage('Robyn', rep.msg);  }  };  function send() {  const input = document.getElementById("msg");  ws.send(input.value);  addMessage('You', input.value);  input.value = "";  }  function addMessage(user,message) {  const msg_div = document.createElement("div");  msg_div.innerHTML = `<strong>${user}:</strong><br /> ${message}`;  msg_list = document.getElementById("messages");  msg_list.insertBefore(msg_div, msg_list.firstChild);  }  
</script>

这样一个基于Robyn WebSocket的简易产品智能助理就完成了,在实际的应用中可以通过使用连接池以及Redis分布式推送以提升其性能,通过与大模型API进行整合以提升其智能,从而形成一个真正的智能助理。

http://www.xdnf.cn/news/14647.html

相关文章:

  • SQL学习笔记3
  • 图像质量对比感悟
  • 智表ZCELL产品V3.2 版发布,新增拖动调整行列功能,修复了插件引用相对路径等问题
  • 【C++11】右值引用和移动语义
  • Hive3.1.3加载paimon-hive-connector-3.1-1.1.1.jar报错UnsatisfiedLinkError
  • 解决uniapp vue3版本封装组件后:deep()样式穿透不生效的问题
  • 【攻防篇】解决:阿里云docker 容器中自动启动xmrig挖矿
  • 超实用AI工具分享——ViiTor AI视频配音功能教程(附图文)
  • php项目部署----------酒店项目
  • 知攻善防应急靶机 Windows web 3
  • LVS-DR负载均衡群集深度实践:高性能架构设计与排障指南
  • 笔记02:布线-差分对的设置与添加
  • Liunx操作系统笔记2
  • 《解锁前端潜力:自动化流程搭建秘籍》
  • Boosting:从理论到实践——集成学习中的偏差征服者
  • linux-修改文件命令(补充)
  • Jenkins Pipeline 与 Python 脚本之间使用环境变量通信
  • 数的三次方根
  • 【深度学习新浪潮】空间计算的医疗应用技术分析(简要版)
  • TCP/UDP协议深度解析(二):TCP连接管理全解,三次握手四次挥手的完整流程
  • Linux docker拉取镜像报错解决
  • 空间理解模型 SpatialLM 正式发布首份技术报告
  • 数据结构 顺序表与链表
  • 一步部署APache编译安装脚本
  • 基于SSM框架+mysql实现的监考安排管理系统[含源码+数据库+项目开发技术手册]
  • 使用VIVADO合并FPGA bit文件和Microblaze elf
  • SQL学习笔记2
  • 【大厂机试题解法笔记】可以组成网络的服务器
  • 使用亮数据网页抓取API自动获取Tiktok数据
  • Windows下安装zookeeper