基于大模型与异步技术的股票分析系统实现
在金融量化分析领域,高效的数据获取与智能的策略决策是核心竞争力。本文结合异步数据抓取技术与大模型工具集成,构建一套完整的股票分析系统,实现从海量数据采集到智能信息查询的全流程自动化。
一、量化分析的数据基石:异步高效数据抓取
1. 量化策略对数据的核心需求
量化交易策略通过将人类交易经验转化为程序逻辑,依赖海量历史数据克服人性弱点(如追涨杀跌)。例如,分析股票日K线数据时,需获取数千只A股的历史行情,传统同步抓取效率低下,无法满足实时策略迭代需求。
2. 异步技术提升数据抓取效率
利用Python的asyncio
库实现多协程并发抓取,将同步I/O操作(如网络请求)转换为异步任务,显著降低开销。核心实现代码如下:
完整异步抓取模块(async_data_fetcher.py
)
import asyncio
import pandas as pd
import akshare as ak# 异步加载单只股票数据
async def load_single_stock(symbol: str, start_date: str, end_date: str) -> pd.DataFrame:loop = asyncio.get_running_loop()# 将同步API放入线程池执行,避免阻塞事件循环df = await loop.run_in_executor(None,lambda: ak.stock_zh_a_hist(symbol=symbol,period="daily",start_date=start_date,end_date=end_date,adjust="qfq"))# 数据清洗:删除无效列df = df[['日期', '开盘', '最高', '最低', '收盘', '成交量']]df.rename(columns={'日期': 'date', '开盘': 'open', '最高': 'high','最低': 'low', '收盘': 'close', '成交量': 'volume'}, inplace=True)df['symbol'] = symbol # 添加股票代码标识return df# 并发抓取多只股票数据
async def batch_fetch_stocks(stock_codes: list, start_date: str, end_date: str) -> pd.DataFrame:tasks = [asyncio.create_task(load_single_stock(code, start_date, end_date))for code in stock_codes]results = await asyncio.gather(*tasks)# 合并所有股票数据return pd.concat(results, ignore_index=True)# 主函数:执行抓取并保存数据
def main():stock_codes = ["sh600000", "sz000001", "sh600519"] # 浦发银行、平安银行、贵州茅台start_date, end_date = "20230101", "20250501"# 异步事件循环asyncio.run(asyncio.wait_for(batch_fetch_stocks(stock_codes, start_date, end_date),timeout=600 # 设置10分钟超时)).to_csv(f"stock_data_{start_date}_{end_date}.csv",index=False,encoding="utf-8-sig")print("数据抓取并保存完成")if __name__ == "__main__":main()
3. 效率对比与优势
异步抓取相比传统同步方式,时间开销大幅降低:
- 同步抓取:100只股票×3年数据≈300秒(每只3秒,串行执行)
- 异步抓取:100只股票×3年数据≈30秒(并发度50,耗时为单只最大耗时+调度开销)
实现10倍效率提升,满足高频数据更新需求。
二、大模型赋能:智能股票信息查询系统
1. 工具化封装:让大模型具备专业能力
通过langchain_core.tools
的@tool
装饰器,将股票查询逻辑封装为工具,供大模型调用。完整工具代码如下:
工具模块(stock_tools.py
)
from langchain_core.tools import tool
import akshare as ak
import pandas as pd@tool
def get_stock_info(code: str = "",name: str = ""
) -> str:"""根据股票代码或名称查询实时行情信息Args:code: 股票代码(格式如"sh600000",上交所;"sz000001",深交所)name: 股票名称(支持模糊查询,如"平安银行")Returns:符合条件的股票实时信息(JSON格式列表)"""# 获取全市场实时行情df = ak.stock_zh_a_spot_em()# 条件筛选filter_conditions = []if code:filter_conditions.append(df['代码'] == code)if name:filter_conditions.append(df['名称'].str.contains(name))if not filter_conditions:return "请提供股票代码或名称"result_df = df[pd.concat(filter_conditions, axis=1).any(axis=1)]return result_df[['代码', '名称', '最新价', '涨跌幅', '成交量']].to_dict(orient='records')
2. 大模型与工具的绑定与调用
模型初始化与工具绑定(llm_agent.py
)
from langgraph import DeepSeekLLM # 假设DeepSeek为自定义LLM类
from stock_tools import get_stock_info# 初始化大模型
llm = DeepSeekLLM(model_name="deepseek-1.0",temperature=0.1,max_tokens=512
)# 绑定工具
tools = [get_stock_info]
llm_with_tools = llm.bind_tools(tools)
Function Calling 执行流程
from langchain_core.messages import HumanMessage, SystemMessagedef run_llm_query(query: str):messages = [SystemMessage(content="""你是专业的股票助手,仅通过调用get_stock_info工具回答问题。工具调用格式必须为标准function call,每次最多返回1个工具调用。若工具返回空结果,需告知用户"未找到相关股票"。"""),HumanMessage(content=query)]response = llm_with_tools.invoke(messages)# 处理工具返回if response.tool_calls:tool_result = get_stock_info.invoke(response.tool_calls[0].args)return f"查询结果:{tool_result}"return response.content
3. 工作流设计:节点驱动的智能交互
通过**状态图(StateGraph)**实现条件循环控制,完整代码如下:
状态图构建模块(workflow_builder.py
)
from langgraph import StateGraph, START, END, MessagesState
from llm_agent import llm_with_tools
from stock_tools import get_stock_info# 定义节点函数
def llm_call_node(state: MessagesState):messages = state["messages"]response = llm_with_tools.invoke(messages)return {"messages": messages + [response]}def tool_execute_node(state: MessagesState):last_msg = state["messages"][-1]if not last_msg.tool_calls:return statetool = get_stock_info # 假设仅一个工具results = []for call in last_msg.tool_calls:result = tool.invoke(call.args)results.append(f"工具返回:{result}")return {"messages": state["messages"] + [SystemMessage(content="\n".join(results))]}# 条件判断函数
def should_continue(state: MessagesState) -> str:last_msg = state["messages"][-1]return "TOOL" if last_msg.tool_calls else "END"# 构建状态图
def build_workflow():builder = StateGraph(MessagesState)# 添加节点builder.add_node("LLM_CALL", llm_call_node)builder.add_node("TOOL_EXECUTE", tool_execute_node)# 添加边builder.add_edge(START, "LLM_CALL")builder.add_conditional_edges("LLM_CALL",should_continue,{"TOOL": "TOOL_EXECUTE","END": END})builder.add_edge("TOOL_EXECUTE", "LLM_CALL") # 工具执行后回到LLM节点return builder.compile()# 生成可视化流程图
def visualize_workflow(agent):graph = agent.get_graph(xray=True)graph.draw_mermaid_file("stock_agent_workflow.mmd") # 生成Mermaid脚本# 可通过在线工具(如Mermaid Live Editor)渲染为流程图
三、系统整合与效果验证
1. 完整调用示例
场景1:代码反查名称
agent = build_workflow()
query = "300750是哪只股票?"
state = {"messages": [HumanMessage(content=query)]}
result = agent.invoke(state)
# 输出:
# 工具返回:[{"代码": "300750", "名称": "宁德时代", ...}]
# 最终回答:300750是宁德时代的股票代码
场景2:批量数据分析
# 1. 获取宁德时代代码
code_query = "宁德时代的股票代码是多少?"
code_result = run_llm_query(code_query) # 返回"300750"# 2. 抓取历史数据
asyncio.run(batch_fetch_stocks(["300750"], "20230101", "20231231"
)).describe() # 输出2023年股价统计特征
2. 技术架构图
四、完整项目结构
stock_analysis_system/
├─ data/ # 数据存储
│ ├─ stock_data_2023-2025.csv
├─ src/ # 核心代码
│ ├─ async_data_fetcher.py # 异步数据抓取
│ ├─ stock_tools.py # 工具定义
│ ├─ llm_agent.py # 大模型交互
│ ├─ workflow_builder.py # 状态图构建
├─ utils/ # 辅助工具
│ ├─ visualization.py # 流程图生成
├─ tests/ # 测试用例
│ ├─ test_fetcher.py
│ ├─ test_llm.py
├─ requirements.txt # 依赖清单
│ akshare==1.4.7
│ langchain-core==0.2.0
│ pandas==2.1.3
五、依赖安装与运行
1. 环境配置
# 安装依赖
pip install akshare langchain-core pandas asyncio# 可选:安装绘图依赖(用于状态图可视化)
pip install graphviz mermaid-js
2. 启动流程
- 运行异步数据抓取:
python src/async_data_fetcher.py
- 测试大模型查询:
python src/llm_agent.py "查询贵州茅台的实时股价"
- 生成工作流图:
python src/workflow_builder.py visualize
六、总结与展望
本文实现的系统通过异步IO提升数据抓取效率,通过大模型工具化扩展专业能力,通过状态图实现智能流程控制,形成完整的股票分析技术栈。未来可扩展方向:
- 模型增强:接入ChatGPT/GPT-4等更强大LLM,优化复杂问题解析能力
- 数据扩展:增加港股、美股数据接口,支持多市场分析
- 策略集成:将抓取数据直接输入量化回测框架(如Backtrader),实现“数据-策略-验证”闭环
通过技术整合,该系统可成为金融从业者的高效工具,推动数据驱动的智能投资决策落地。
说明
- 代码块使用CSDN支持的Markdown语法,可直接复制粘贴
- Mermaid流程图需在CSDN编辑器中开启「Markdown语法」并确保渲染支持
- 依赖版本可根据实际环境调整,建议使用
requirements.txt
统一管理