Robyn高性能Web框架系列06:使用WebSocket实现产品智能助理
使用WebSocket实现产品智能助理
- WebSocket原理与应用场景
- Robyn的WebSocket基本使用
- 1、创建WebSocket服务
- 2、侦听WebSocket事件
- 3、向客户端发送消息
- 4、向客户端广播消息
- 5、使用查询参数
- 6、主动关闭连接
- 示例:简易的产品智能助理
- 1、产品数据部分
- 2、产品信息部分
- 3、智能助理部分
Robyn原生支持WebSocket协议,通过WebSocket协议,客户端(通常是浏览器)和服务器可以随时主动向对方发送数据,这在一定程度上弥补了HTTP协议在实时性上的不足,并带来了一些新的应用场景,本节我们介绍基于Robyn的WebSocket应用开发。
WebSocket原理与应用场景
WebSocket 是一种基于 TCP 的全双工通信协议,允许客户端和服务器之间进行持续、低延迟的双向通信。它是为了解决传统 HTTP 协议的请求-响应限制而设计的,尤其适合实时性要求高的场景,如聊天室、在线游戏、股票行情推送等。
在使用HTTP协议来实现这些实时性要求高的功能时,通常只能通过轮询或长轮询的方式,这就会带来以下问题:
- 效率低: 大部分轮询请求可能没有新数据(浪费带宽和服务器资源)。
- 延迟高: 从事件发生到客户端知道,需要等待下一个轮询请求的间隔时间。
- 开销大: 每个 HTTP 请求都包含完整的HTTP头信息,即使数据体很小。
而WebSocket则通过下列设计特点解决了上述问题:
-
一次握手,持久连接
连接开始时,客户端通过一个特殊的 HTTP Upgrade 请求发起 WebSocket 握手。如果服务器支持 WebSocket协议,它会返回一个 101 Switching Protocols 响应。一旦握手成功,底层的 TCP 连接就保持打开状态,用于后续的 WebSocket 数据帧传输,不需要反复建立连接。 -
全双工通信
连接建立后,服务器和客户端可以随时、独立地向对方发送数据。服务器不需要等待客户端的请求就能主动推送信息。 -
轻量级数据帧
WebSocket 传输的数据单位是“帧”。相比 HTTP 请求/响应的头部,WebSocket 帧的头部开销非常小(通常只有几个字节)。这对于传输小量但频繁的数据(如聊天消息、游戏指令)特别高效。 -
低延迟
由于连接是持久的,并且服务器可以主动推送,数据从一端产生到另一端接收的延迟极低,接近网络传输本身的延迟。
下面是WebSocket 基本通信过程:
Robyn的WebSocket基本使用
1、创建WebSocket服务
Robyn可以通过实例化一个WebSocket类,绑定到Robyn主应用中,就能为当前Robyn应用开启WebSocket功能。
from robyn import Robyn, jsonify, WebSocketapp = Robyn(__file__)
websocket = WebSocket(app, "/web_socket")
2、侦听WebSocket事件
通过WebSocket的on
方法,可以为不同的WebSocket的connect
、message
、close
事件编写不同的处理代码。
@websocket.on("message")
def connect():return "Hello world, from ws"@websocket.on("close")
def close():return "Goodbye world, from ws"@websocket.on("connect")
def message():return "Connected to ws"
3、向客户端发送消息
通过WebSocket的sync_send_to
方法(同步方式)或async_send_to
方法(异步方式)可以向客户端发送消息。
@websocket.on("message")def message(ws, msg, global_dependencies) -> str:websocket_id = ws.idws.sync_send_to(websocket_id, "This is a message to self")# await ws.async_send_to(websocket_id, "This is a message to self")return ""
4、向客户端广播消息
通过WebSocket的sync_broadcast
方法(同步方式)或async_broadcast
方法(异步方式)可以向客户端广播消息。
@websocket.on("message")def message(ws, msg, global_dependencies) -> str:websocket_id = ws.idws.sync_broadcast("This is a message to self")return ""
5、使用查询参数
在WebSocket路由函数中同样也能处理查询参数,WebSocket提供了query_params
集合用于存储来自客户端的查询参数。
@websocket.on("message")async def message(ws, msg, global_dependencies) -> str:websocket_id = ws.idif (ws.query_params.get("name") == "gordon" and ws.query_params.get("desg") == "commissioner"):ws.sync_broadcast("Gordon authorized to login!")return ""
6、主动关闭连接
通过WebSocket的close
方法,可以主动关闭与客户端的连接,并将该客户端从WebSocket 注册表中删除。
@websocket.on("message")
def message(ws, msg):if msg == "disconnect":ws.close()return "Closing connection"return "Message received"
示例:简易的产品智能助理
下面我们通过一个综合实例来进一步熟悉一下Robyn的WebSocket编程。在实例中,我们模拟一个产品智能助理程序,用户可以通过与产品智能助理的交流来了解一款产品的基本信息。整个程序结构大体如下:
1、产品数据部分
产品数据部分主要为产品信息页面和产品智能助理提供基础数据,它可以是来自Mysql等数据库,这里用dict做简单的模拟。
product_list={ "1": {"id":"1","name": "iPhone 14 Pro", "price": 999, "color": "黑色、银色", "size": "6.1英寸", "delivery": "2-3天"}, "2": {"id":"2","name": "Samsung Galaxy S22", "price": 799, "color": "白色、黑色、红色", "size": "6.2英寸", "delivery": "3-5天"}, "3": {"id":"3","name": "Google Pixel 6", "price": 599, "color": "蓝色、白色", "size": "6.4英寸", "delivery": "1-2天"}
}
2、产品信息部分
产品信息部分用于展示产品基本信息,它是一个普通的网页,这里我们使用了Robyn的模板功能,结合使用Jinja2
生成一个动态的产品网页。
1)初始化Jinja2模板引擎
import json
import os
import pathlib
from robyn import Robyn, WebSocket
from robyn.templating import JinjaTemplate app = Robyn(__file__)
current_file_path = pathlib.Path(__file__).parent.resolve()
jinja_template = JinjaTemplate(os.path.join(current_file_path, "templates"))
2)编写html模板文件
<!DOCTYPE html>
<html lang="en">
<head> <meta charset="UTF-8"> <title>{{name}}</title>
</head>
<body>
<h2>Robyn Product Assistant</h2>
<hr />
<div id="product_info"> <h3>{{name}}</h3>
</div>
</body>
</html>
3)编写产品信息路由函数
@app.get("/product/:product_id")
async def product_info(request,path_params): product_id = path_params["product_id"] if product_id in product_list: product = product_list[product_id] return jinja_template.render_template("product.html", **product) else: return "404 Not Found"
3、智能助理部分
智能助理部分包括嵌入在产品信息页面中的Javascript websocket 客户端和利用Robyn的WebSocket技术编写的服务端两个部分。
1)智能助理服务端帮助函数
# 用于根据产品信息产生咨询结果,这里可以通过连接大模型获得更好的咨询结果
def gen_response(msg, product): if '价格' in msg: return f"{product['name']}现在只需要{product['price']}美元哦。" elif '颜色' in msg: return f"{product['name']}有多种颜色,包括{product['color']}。" elif '尺寸' in msg: return f"{product['name']}的屏幕尺寸是{product['size']}。" elif '配送' in msg: return f"{product['name']}的配送时间大约是{product['delivery']}。" else: return "抱歉,我不太明白您的意思。" # 用于统计当前的咨询人数
def update_customer_count(count, global_dependencies): customer_count = global_dependencies.get("customer_count", 0) new_count = count + customer_count global_dependencies["customer_count"] = new_count return new_count
2)智能助理服务端
我们在客户端建立和关闭连接时广播一次咨询人数,并通过使用query_params
来判断当前用户所咨询的产品信息,最后根据用户发送的消息来生成咨询响应结果。
pa = WebSocket(app, "/pa") @pa.on("connect")
def connect(ws, global_dependencies): new_count = update_customer_count(1, global_dependencies) ws.sync_broadcast(json.dumps({ "type": "customer_count", "msg": new_count })) @pa.on("close")
def close(ws, global_dependencies): new_count = update_customer_count(-1, global_dependencies) ws.sync_broadcast(json.dumps({ "type": "customer_count", "count": new_count })) @pa.on("message")
def message(ws, msg, global_dependencies): product_id = ws.query_params.get("id",None) if product_id and len(msg)>0: ws.sync_broadcast(json.dumps({ "type": "response", "msg": gen_response(msg, product_list.get(product_id)) }))
3)Javascript websocket 客户端
我们在产品信息的html页面中加入以下代码作为智能助理的客户端部分。
<div> <span id="online_count">0</span>人正在咨询。 <textarea id="msg" placeholder="请输入您需要了解的内容,如颜色、尺寸、价格、配送方式等。" cols="50" rows="8"></textarea><br /> <button onclick="send()">发送消息</button> <hr /> <h3>消息记录</h3> <div id="messages" style="list-style-type: none; padding: 0;"></div>
</div>
<script> const ws = new WebSocket("ws://0.0.0.0:8080/pa?id={{id}}"); ws.onmessage = (event) => { const rep = JSON.parse(event.data) if (rep.type === 'customer_count') { document.getElementById("online_count").innerText = rep.msg; }else{ addMessage('Robyn', rep.msg); } }; function send() { const input = document.getElementById("msg"); ws.send(input.value); addMessage('You', input.value); input.value = ""; } function addMessage(user,message) { const msg_div = document.createElement("div"); msg_div.innerHTML = `<strong>${user}:</strong><br /> ${message}`; msg_list = document.getElementById("messages"); msg_list.insertBefore(msg_div, msg_list.firstChild); }
</script>
这样一个基于Robyn WebSocket的简易产品智能助理就完成了,在实际的应用中可以通过使用连接池以及Redis分布式推送以提升其性能,通过与大模型API进行整合以提升其智能,从而形成一个真正的智能助理。