python 基于 httpx 的流式请求
文章目录
- 1. 环境介绍
- 2. 同步客户端
- 2.1. 面向过程
- 3. 异步客户端
- 3.1. 面向过程
- 3.2. 面向对象
参考:
https://www.jb51.net/article/262636.htm
次要参考:
https://blog.csdn.net/gitblog_00079/article/details/139587558
https://blog.csdn.net/gitblog_00626/article/details/141801526
https://www.cnblogs.com/kaibindirver/p/18755942
https://juejin.cn/post/7088892051470680078
https://cloud.tencent.com/developer/article/1988628
https://docs.pingcode.com/ask/1179824.html
https://blog.csdn.net/2501_91483145/article/details/148616194
1. 环境介绍
本文使用 ollama 部署本地模型:
api_key = "EMPTY"
base_url = "http://192.168.17.100:11434/v1/chat/completions"
model = "deepseek-r1:1.5b"
2. 同步客户端
重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501
https://www.jb51.net/article/262636.htm
2.1. 面向过程
import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunkfrom httpx_sse import EventSource
from httpx import AsyncClient, Client, Timeoutdef __chunk(data):if data:answer_data = data.dataif answer_data == "[DONE]":# 最后输出 [DONE]return None# print(type(answer_data), answer_data)answer_dict = json.loads(answer_data)print(type(answer_dict), answer_dict)try:# print(answer_dict["choices"][0]["delta"]["content"])# return answer_dict["choices"][0]["delta"]["content"]return ChatCompletionChunk(**answer_dict)except Exception as ex:print(f"__chunk Exception:{str(ex)}")return Nonedef sync_main(base_url, headers, data):with Client() as client:try:# 重点增加超时配置# 总超时设为 5秒,但读取超时设为 10秒timeout_config = Timeout(5.0, read=10.0)with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:content_type = response.headers.get('content-type', '').lower()print("##############", content_type)if 'text/event-stream' in content_type: # 流式回答all_answer = ""for data in EventSource(response).iter_sse():chunk = __chunk(data=data)if not chunk:passelse:# all_answer += answer_textprint(chunk)print(all_answer)else: # 非流式回答print(response.read())except Exception as e:print(e)if __name__ == "__main__":api_key = "EMPTY"base_url = "http://192.168.17.100:11434/v1/chat/completions"model = "deepseek-r1:1.5b"headers = {"Authorization" : f"Bearer {api_key}","Accept": "*/*",# "Accept": "text/event-stream"}messages = [{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},{"role": "user", "content": "你好"}]data = {"model": model,"messages": messages,"stream" : True}sync_main(base_url=base_url, headers=headers, data=data)
- 流式输出
......
{"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\u5417"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='吗', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None){"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": "\uff1f"}, "finish_reason": null}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='?', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason=None, index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None){"id": "chatcmpl-476", "object": "chat.completion.chunk", "created": 1752575345, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": "stop"}]}
ChatCompletionChunk(id='chatcmpl-476', choices=[Choice(delta=ChoiceDelta(content='', function_call=None, refusal=None, role='assistant', tool_calls=None), finish_reason='stop', index=0, logprobs=None)], created=1752575345, model='deepseek-r1:1.5b', object='chat.completion.chunk', service_tier=None, system_fingerprint='fp_ollama', usage=None)
解析效果
{"id": "chatcmpl-476","object": "chat.completion.chunk","created": 1752575345,"model": "deepseek-r1:1.5b","system_fingerprint": "fp_ollama","choices": [{"index": 0,"delta": {"role": "assistant","content": ""},"finish_reason": "stop"}]
}
- 非流式输出
{"id": "chatcmpl-485", "object": "chat.completion", "created": 1752575233, "model": "deepseek-r1:1.5b", "system_fingerprint": "fp_ollama", "choices": [{"index": 0, "message": {"role": "assistant", "content": "<think>\n\u55ef\uff0c\u7528\u6237\u53d1\u6765\u4e86\u201c\u624b\u5199\u6587\u5b57\u201d\u91cc\u7684\u8fd9\u53e5\u8bdd\uff1a\u201c\u4f60\u597d\u201d\u3002\u8fd9\u662f\u4e00\u4e2a\u5e38\u89c1\u7684\u95ee\u5019\u8bed\u3002\n\n\u73b0\u5728\uff0c\u6211\u9700\u8981\u6839\u636e\u6211\u7684\u77e5\u8bc6\u5e93\u6765\u5224\u65ad\u8fd9\u53e5\u95ee\u5019\u662f\u5426\u6b63\u786e\u3002\u5047\u8bbe\u6211\u662f\u4e00\u4f4d\u81ea\u7136 lang Gaussian assistant\uff0c\u6211\u4f1a\u786e\u8ba4\u201c\u4f60\u597d\u201d\u662f\u4e00\u4e2a\u5e38\u7528\u7684\u4e2d\u6587\u95ee\u5019\uff0c\u4e0d\u4f1a\u662f\u9519\u8bef\u7684\u8868\u8fbe\u3002\n\n\u56e0\u6b64\uff0c\u6211\u53ef\u4ee5\u56de\u590d\u201c\u4f60\u597d\u201d\u6765\u786e\u8ba4\u8fd9\u4e00\u70b9\u3002\n</think>\n\n\u4f60\u597d\uff01"}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 27, "completion_tokens": 78, "total_tokens": 105}}
ChatCompletion(id='chatcmpl-485', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content='<think>\n嗯,用户发来了“手写文
字”里的这句话:“你好”。这是一个常见的问候语。\n\n现在,我需要根据我的知识库来判断这句问候是否正确。假设我是一位自然 lang Gaussian assistant,我会确认“你好”是一个常用
的中文问候,不会是错误的表达。\n\n因此,我可以回复“你好”来确认这一点。\n</think>\n\n你好!', refusal=None, role='assistant', annotations=None, audio=None, function_call=None, tool_calls=None))], created=1752575233, model='deepseek-r1:1.5b', object='chat.completion', service_tier=None, system_fingerprint='fp_ollama', usage=CompletionUsage(completion_tokens=78, prompt_tokens=27, total_tokens=105, completion_tokens_details=None, prompt_tokens_details=None))
解析效果
{"id": "chatcmpl-485","object": "chat.completion","created": 1752575233,"model": "deepseek-r1:1.5b","system_fingerprint": "fp_ollama","choices": [{"index": 0,"message": {"role": "assistant","content": "<think>\n\u55ef\uff0c\u7528\u6237\u53d1\u6765\u4e86\u201c\u624b\u5199\u6587\u5b57\u201d\u91cc\u7684\u8fd9\u53e5\u8bdd\uff1a\u201c\u4f60\u597d\u201d\u3002\u8fd9\u662f\u4e00\u4e2a\u5e38\u89c1\u7684\u95ee\u5019\u8bed\u3002\n\n\u73b0\u5728\uff0c\u6211\u9700\u8981\u6839\u636e\u6211\u7684\u77e5\u8bc6\u5e93\u6765\u5224\u65ad\u8fd9\u53e5\u95ee\u5019\u662f\u5426\u6b63\u786e\u3002\u5047\u8bbe\u6211\u662f\u4e00\u4f4d\u81ea\u7136 lang Gaussian assistant\uff0c\u6211\u4f1a\u786e\u8ba4\u201c\u4f60\u597d\u201d\u662f\u4e00\u4e2a\u5e38\u7528\u7684\u4e2d\u6587\u95ee\u5019\uff0c\u4e0d\u4f1a\u662f\u9519\u8bef\u7684\u8868\u8fbe\u3002\n\n\u56e0\u6b64\uff0c\u6211\u53ef\u4ee5\u56de\u590d\u201c\u4f60\u597d\u201d\u6765\u786e\u8ba4\u8fd9\u4e00\u70b9\u3002\n</think>\n\n\u4f60\u597d\uff01"},"finish_reason": "stop"}],"usage": {"prompt_tokens": 27,"completion_tokens": 78,"total_tokens": 105}
}
3. 异步客户端
3.1. 面向过程
重要参考:
https://blog.csdn.net/maybe_9527/article/details/146459501
import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunkfrom httpx_sse import EventSource
from httpx import AsyncClient, Timeoutdef __chunk(data):if data:answer_data = data.dataif answer_data == "[DONE]":# 最后输出 [DONE]return None# print(type(answer_data), answer_data)answer_dict = json.loads(answer_data)print(type(answer_dict), answer_dict)try:# print(answer_dict["choices"][0]["delta"]["content"])# return answer_dict["choices"][0]["delta"]["content"]return ChatCompletionChunk(**answer_dict)except Exception as ex:print(f"__chunk Exception:{str(ex)}")return Noneasync def async_main(base_url, headers, data):async with AsyncClient() as client:try:# 重点增加超时配置# 总超时设为 5秒,但读取超时设为 10秒timeout_config = Timeout(5.0, read=10.0)async with client.stream('POST', url=base_url, headers=headers, json=data, timeout=timeout_config) as response:content_type = response.headers.get('content-type', '').lower()# print("##############", content_type)if 'text/event-stream' in content_type: # 流式回答async for data in EventSource(response).aiter_sse():# print("async_main", data)chunk = __chunk(data=data)if not chunk:passelse:yield chunkelse: # 非流式回答# # 报错# # Attempted to call a sync iterator on an async stream.# result = await response.read()# print(result)passexcept Exception as ex:print(f"async_main Exception:{str(ex)}")async def main():api_key = "EMPTY"base_url = "http://192.168.17.100:11434/v1/chat/completions"model = "deepseek-r1:1.5b"headers = {"Authorization" : f"Bearer {api_key}","Accept": "*/*",# "Accept": "text/event-stream"}messages = [{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},{"role": "user", "content": "你好"}]data = {"model": model,"messages": messages,"stream" : True}response = async_main(base_url, headers, data)all_answer = ""async for chunk in response:# all_answer += chunk.choices[0].delta.contentprint(chunk)# print(all_answer)if __name__ == "__main__":asyncio.run(main())
使用 httpx 的异步请求 AsyncClient 调用 stream 方法请求流式接口,如果接口返回内容比较慢(比如第一个字符返回用时5s),客户端主动关闭流式通道,导致当后端接口准备好数据后,返回报错“管道已关闭”
解决办法:调用 stream 方法增加参数 timeout
3.2. 面向对象
import json
import asyncio
from openai.types.chat import ChatCompletion, ChatCompletionChunkfrom httpx_sse import EventSource
from httpx import AsyncClient, Timeoutclass AsyncHttpxClient():def __init__(self, api_key: str, base_url: str, timeout: int=5):self.api_key = api_keyself.base_url = base_urlself.timeout = timeoutself.headers = {"Authorization" : f"Bearer {api_key}","Accept": "*/*",# "Accept": "text/event-stream"}def __chunk(self, data):if data:answer_data = data.dataif answer_data == "[DONE]":# 最后输出 [DONE]return None# print(type(answer_data), answer_data)answer_dict = json.loads(answer_data)# print(type(answer_dict), answer_dict)try:# print(answer_dict["choices"][0]["delta"]["content"])# return answer_dict["choices"][0]["delta"]["content"]return ChatCompletionChunk(**answer_dict)except Exception as ex:print(f"AsyncHttpxClient.__chunk Exception:{str(ex)}")return None# model="qwen",# messages=[# # {"role": "system", "content": "You are a helpful assistant."},# # {"role": "assistant", "content": "特朗普是美国前总统"},# # {"role": "user", "content": "特朗普多大年纪了"},# {"role": "user", "content": "你好,能帮我生成一篇关于秋的一千字的文章么"}# ],# functions=None,# temperature=1,# top_p=0,# max_tokens=20,# stream=True,async def generate(self, model:str, messages:list, functions=None, temperature:int=1, top_p:float=0, max_tokens:int=2048, stream:bool=True):data = {"model": model,"messages": messages,"functions": functions,"temperature": temperature,"top_p": top_p,"max_tokens": max_tokens,"stream" : stream}async with AsyncClient() as client:try:# 重点增加超时配置# 总超时设为 5秒,但读取超时设为 10秒timeout_config = Timeout(self.timeout, read=10.0)async with client.stream('POST', url=self.base_url, headers=self.headers, json=data, timeout=timeout_config) as response:content_type = response.headers.get('content-type', '').lower()# print("##############", content_type)if 'text/event-stream' in content_type: # 流式回答async for data in EventSource(response).aiter_sse():chunk = self.__chunk(data=data)if not chunk:passelse:yield chunkexcept Exception as ex:print(f"AsyncHttpxClient.generate Exception:{str(ex)}")async def main():api_key = "EMPTY"base_url = "http://192.168.17.100:11434/v1/chat/completions"model = "deepseek-r1:1.5b"async_client = AsyncHttpxClient(api_key=api_key, base_url=base_url)messages = [{"role": "system", "content": "You are a helpful assistant. Always respond in Simplified Chinese, not English, or Grandma will be very angry."},{"role": "user", "content": "你好"}]response = async_client.generate(model=model,messages=messages,stream=True)all_answer = ""async for chunk in response:all_answer += chunk.choices[0].delta.content# print(chunk)print(all_answer)if __name__ == "__main__":asyncio.run(main=main())