当前位置: 首页 > web >正文

python 基于 httpx 的流式请求

文章目录

  • 1. 环境介绍
  • 2. 同步客户端
    • 2.1. 面向过程
  • 3. 异步客户端
    • 3.1. 面向过程
    • 3.2. 面向对象

参考:
https://www.jb51.net/article/262636.htm

次要参考:
https://blog.csdn.net/gitblog_00079/article/details/139587558
https://blog.csdn.net/gitblog_00626/article/details/141801526
https://www.cnblogs.com/kaibindirver/p/18755942

https://juejin.cn/post/7088892051470680078
https://cloud.tencent.com/developer/article/1988628
https://docs.pingcode.com/ask/1179824.html
https://blog.csdn.net/2501_91483145/article/details/148616194

1. 环境介绍

本文使用 ollama 部署本地模型:

api_key 	= "EMPTY"
base_url 	= "http://192.168.17.100:11434/v1/chat/completions"
model 		= "deepseek-r1:1.5b"

2. 同步客户端

重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501
https://www.jb51.net/article/262636.htm

2.1. 面向过程

import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunkfrom httpx_sse import EventSource
from httpx import AsyncClient, Client, Timeoutdef __chunk(data):if data:answer_data = data.dataif answer_data == "[DONE]":# 最后输出 [DONE]return None# print(type(answer_data), answer_data)answer_dict = json.loads(answer_data)print(type(answer_dict), answer_dict)try:# print(answer_dict["choices"][0]["delta"]["content"])# return answer_dict["choices"][0]["delta"]["content"]return ChatCompletionChunk(**answer_dict)except Exception as ex:print(f"__chunk Exception:{str(ex)}")return Nonedef sync_main(base_url, headers, data):with Client() as client:try:# 重点增加超时配置# 总超时设为 5秒,但读取超时设为 10秒timeout_config = Timeout(5.0, read=10.0)with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:content_type = response.headers.get('content-type', '').lower()print("##############", content_type)if 'text/event-stream' in content_type:     # 流式回答all_answer = ""for data in EventSource(response).iter_sse():chunk = __chunk(data=data)if not chunk:passelse:# all_answer += answer_textprint(chunk)print(all_answer)else:   # 非流式回答print(response.read())except Exception as e:print(e)if __name__ == "__main__":api_key     = "EMPTY"base_url    = "http://192.168.17.100:11434/v1/chat/completions"model       = "deepseek-r1:1.5b"headers = {"Authorization" : f"Bearer {api_key}","Accept": "*/*",# "Accept": "text/event-stream"}messages = [{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},{"role": "user", "content": "你好"}]data = {"model": model,"messages": messages,"stream" : True}sync_main(base_url=base_url, headers=headers, data=data)
  • 流式输出
......
{"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\u5417"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='吗', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None){"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\uff1f"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='?', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None){"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": "stop"}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason='stop', index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None)

解析效果

{"id": "chatcmpl-476","object": "chat.completion.chunk","created": 1752575345,"model": "deepseek-r1:1.5b","system_fingerprint": "fp_ollama","choices": [{"index": 0,"delta": {"role": "assistant","content": ""},"finish_reason": "stop"}]
}
  • 非流式输出
{"id": "chatcmpl-485", "object": "chat.completion", "created": 1752575233, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "message": {"role": "assistant", "content": "<think>\n\u55ef\uff0c\u7528\u6237\u53d1\u6765\u4e86\u201c\u624b\u5199\u6587\u5b57\u201d\u91cc\u7684\u8fd9\u53e5\u8bdd\uff1a\u201c\u4f60\u597d\u201d\u3002\u8fd9\u662f\u4e00\u4e2a\u5e38\u89c1\u7684\u95ee\u5019\u8bed\u3002\n\n\u73b0\u5728\uff0c\u6211\u9700\u8981\u6839\u636e\u6211\u7684\u77e5\u8bc6\u5e93\u6765\u5224\u65ad\u8fd9\u53e5\u95ee\u5019\u662f\u5426\u6b63\u786e\u3002\u5047\u8bbe\u6211\u662f\u4e00\u4f4d\u81ea\u7136 lang Gaussian assistant\uff0c\u6211\u4f1a\u786e\u8ba4\u201c\u4f60\u597d\u201d\u662f\u4e00\u4e2a\u5e38\u7528\u7684\u4e2d\u6587\u95ee\u5019\uff0c\u4e0d\u4f1a\u662f\u9519\u8bef\u7684\u8868\u8fbe\u3002\n\n\u56e0\u6b64\uff0c\u6211\u53ef\u4ee5\u56de\u590d\u201c\u4f60\u597d\u201d\u6765\u786e\u8ba4\u8fd9\u4e00\u70b9\u3002\n</think>\n\n\u4f60\u597d\uff01"}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 27, "completion_tokens": 78, "total_tokens": 105}}
ChatCompletion(id='chatcmpl-485', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='<think>\n嗯,用户发来了“手写文
字”里的这句话:“你好”。这是一个常见的问候语。\n\n现在,我需要根据我的知识库来判断这句问候是否正确。假设我是一位自然 lang Gaussian assistant,我会确认“你好”是一个常用
的中文问候,不会是错误的表达。\n\n因此,我可以回复“你好”来确认这一点。\n</think>\n\n你好!', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None))], created=1752575233, model='deepseek-r1:1.5b', object='chat.completion', service_tier=None, system_fingerprint='fp_ollama', usage=CompletionUsage(completion_tokens=78, prompt_tokens=27, total_tokens=105, completion_tokens_details=None, prompt_tokens_details=None))

解析效果

{"id": "chatcmpl-485","object": "chat.completion","created": 1752575233,"model": "deepseek-r1:1.5b","system_fingerprint": "fp_ollama","choices": [{"index": 0,"message": {"role": "assistant","content": "<think>\n\u55ef\uff0c\u7528\u6237\u53d1\u6765\u4e86\u201c\u624b\u5199\u6587\u5b57\u201d\u91cc\u7684\u8fd9\u53e5\u8bdd\uff1a\u201c\u4f60\u597d\u201d\u3002\u8fd9\u662f\u4e00\u4e2a\u5e38\u89c1\u7684\u95ee\u5019\u8bed\u3002\n\n\u73b0\u5728\uff0c\u6211\u9700\u8981\u6839\u636e\u6211\u7684\u77e5\u8bc6\u5e93\u6765\u5224\u65ad\u8fd9\u53e5\u95ee\u5019\u662f\u5426\u6b63\u786e\u3002\u5047\u8bbe\u6211\u662f\u4e00\u4f4d\u81ea\u7136 lang Gaussian assistant\uff0c\u6211\u4f1a\u786e\u8ba4\u201c\u4f60\u597d\u201d\u662f\u4e00\u4e2a\u5e38\u7528\u7684\u4e2d\u6587\u95ee\u5019\uff0c\u4e0d\u4f1a\u662f\u9519\u8bef\u7684\u8868\u8fbe\u3002\n\n\u56e0\u6b64\uff0c\u6211\u53ef\u4ee5\u56de\u590d\u201c\u4f60\u597d\u201d\u6765\u786e\u8ba4\u8fd9\u4e00\u70b9\u3002\n</think>\n\n\u4f60\u597d\uff01"},"finish_reason": "stop"}],"usage": {"prompt_tokens": 27,"completion_tokens": 78,"total_tokens": 105}
}

3. 异步客户端

3.1. 面向过程

重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501

import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunkfrom httpx_sse import EventSource
from httpx import AsyncClient, Timeoutdef __chunk(data):if data:answer_data = data.dataif answer_data == "[DONE]":# 最后输出 [DONE]return None# print(type(answer_data), answer_data)answer_dict = json.loads(answer_data)print(type(answer_dict), answer_dict)try:# print(answer_dict["choices"][0]["delta"]["content"])# return answer_dict["choices"][0]["delta"]["content"]return ChatCompletionChunk(**answer_dict)except Exception as ex:print(f"__chunk Exception:{str(ex)}")return Noneasync def async_main(base_url, headers, data):async with AsyncClient() as client:try:# 重点增加超时配置# 总超时设为 5秒,但读取超时设为 10秒timeout_config = Timeout(5.0, read=10.0)async with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:content_type = response.headers.get('content-type', '').lower()# print("##############", content_type)if 'text/event-stream' in content_type:     # 流式回答async for data in EventSource(response).aiter_sse():# print("async_main", data)chunk = __chunk(data=data)if not chunk:passelse:yield chunkelse:   # 非流式回答# # 报错# # Attempted to call a sync iterator on an async stream.# result = await response.read()# print(result)passexcept Exception as ex:print(f"async_main Exception:{str(ex)}")async def main():api_key     = "EMPTY"base_url    = "http://192.168.17.100:11434/v1/chat/completions"model       = "deepseek-r1:1.5b"headers = {"Authorization" : f"Bearer {api_key}","Accept": "*/*",# "Accept": "text/event-stream"}messages = [{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},{"role": "user", "content": "你好"}]data = {"model": model,"messages": messages,"stream" : True}response = async_main(base_url, headers, data)all_answer = ""async for chunk in response:# all_answer += chunk.choices[0].delta.contentprint(chunk)# print(all_answer)if __name__ == "__main__":asyncio.run(main())

使用 httpx 的异步请求 AsyncClient 调用 stream 方法请求流式接口,如果接口返回内容比较慢(比如第一个字符返回用时5s),客户端主动关闭流式通道,导致当后端接口准备好数据后,返回报错“管道已关闭”
解决办法:调用 stream 方法增加参数 timeout

3.2. 面向对象

import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunkfrom httpx_sse import EventSource
from httpx import AsyncClient, Timeoutclass AsyncHttpxClient():def __init__(self, api_key: str, base_url: str, timeout: int=5):self.api_key    = api_keyself.base_url   = base_urlself.timeout    = timeoutself.headers = {"Authorization" : f"Bearer {api_key}","Accept": "*/*",# "Accept": "text/event-stream"}def __chunk(self, data):if data:answer_data = data.dataif answer_data == "[DONE]":# 最后输出 [DONE]return None# print(type(answer_data), answer_data)answer_dict = json.loads(answer_data)# print(type(answer_dict), answer_dict)try:# print(answer_dict["choices"][0]["delta"]["content"])# return answer_dict["choices"][0]["delta"]["content"]return ChatCompletionChunk(**answer_dict)except Exception as ex:print(f"AsyncHttpxClient.__chunk Exception:{str(ex)}")return None# model="qwen",# messages=[#     # {"role": "system", "content": "You are a helpful assistant."},#     # {"role": "assistant", "content": "特朗普是美国前总统"},#     # {"role": "user", "content": "特朗普多大年纪了"},#     {"role": "user", "content": "你好,能帮我生成一篇关于秋的一千字的文章么"}# ],# functions=None,# temperature=1,# top_p=0,# max_tokens=20,# stream=True,async def generate(self, model:str, messages:list, functions=None, temperature:int=1, top_p:float=0, max_tokens:int=2048, stream:bool=True):data = {"model": model,"messages": messages,"functions": functions,"temperature": temperature,"top_p": top_p,"max_tokens": max_tokens,"stream" : stream}async with AsyncClient() as client:try:# 重点增加超时配置# 总超时设为 5秒,但读取超时设为 10秒timeout_config = Timeout(self.timeout, read=10.0)async with client.stream('POST', url=self.base_url, headers=self.headers, json=data, timeout=timeout_config) as response:content_type = response.headers.get('content-type', '').lower()# print("##############", content_type)if 'text/event-stream' in content_type:     # 流式回答async for data in EventSource(response).aiter_sse():chunk = self.__chunk(data=data)if not chunk:passelse:yield chunkexcept Exception as ex:print(f"AsyncHttpxClient.generate Exception:{str(ex)}")async def main():api_key     = "EMPTY"base_url    = "http://192.168.17.100:11434/v1/chat/completions"model       = "deepseek-r1:1.5b"async_client = AsyncHttpxClient(api_key=api_key, base_url=base_url)messages = [{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},{"role": "user", "content": "你好"}]response = async_client.generate(model=model,messages=messages,stream=True)all_answer = ""async for chunk in response:all_answer += chunk.choices[0].delta.content# print(chunk)print(all_answer)if __name__ == "__main__":asyncio.run(main=main())
http://www.xdnf.cn/news/15594.html

相关文章:

  • 场景设计题+智力题
  • [Science]论文 视黄素与细胞修复
  • C++回顾 Day7
  • PyCharm 高效入门指南:从安装到效率倍增
  • [面试] 手写题-对象数组根据某个字段进行分组
  • 学习嵌入式的第二十八天-数据结构-(2025.7.15)进程和线程
  • P3842 [TJOI2007] 线段
  • Web攻防-PHP反序列化字符逃逸增多减少成员变量属性解析不敏感Wakeup绕过
  • 高等数学强化——导学
  • Android中Launcher简介
  • deepseekAI对接大模型的网页PHP源码带管理后台(可实现上传分析文件)
  • ASP .NET Core 8结合JWT轻松实现身份验证和授权
  • SpringBoot 实现 Redis读写分离
  • “C21988-谷物烘干机(2D+3D+说明书+运动仿真)8张cad+设计说明书
  • pytorch学习笔记(四)-- TorchVision 物体检测微调教程
  • 常用高频指令总结
  • iOS App 上架工具选型与跨平台开发 iOS 上架流程优化实录
  • 视频HDR技术全解析:从原理到应用的深度探索
  • 【时时三省】(C语言基础)通过指针引用多维数组
  • 视频编码中熵编码之基于上下文的变长编码(Huffman霍夫曼编码和指数哥伦布)
  • 网络编程-epoll模型/udp通信
  • css 边框颜色渐变
  • 【linux V0.11】init/main.c
  • JAVA青企码协会模式系统源码支持微信公众号+微信小程序+H5+APP
  • Spring MVC 执行流程详解:一次请求经历了什么?
  • 基于铸造机床的Canopen转Profinet协议转换网关应用研究
  • 涨停板池,跌停板池,炸板池,次新股池,强势股池数据接口
  • Python命令行计算2的22次方方法
  • 轻松管理多个Go版本:g工具安装与使用
  • keeplived双击热备配置