当前位置: 首页 > ai >正文

基于大模型与异步技术的股票分析系统实现

在金融量化分析领域,高效的数据获取与智能的策略决策是核心竞争力。本文结合异步数据抓取技术大模型工具集成,构建一套完整的股票分析系统,实现从海量数据采集到智能信息查询的全流程自动化。

一、量化分析的数据基石:异步高效数据抓取

1. 量化策略对数据的核心需求

量化交易策略通过将人类交易经验转化为程序逻辑,依赖海量历史数据克服人性弱点(如追涨杀跌)。例如,分析股票日K线数据时,需获取数千只A股的历史行情,传统同步抓取效率低下,无法满足实时策略迭代需求。

2. 异步技术提升数据抓取效率

利用Python的asyncio库实现多协程并发抓取,将同步I/O操作(如网络请求)转换为异步任务,显著降低开销。核心实现代码如下:

完整异步抓取模块(async_data_fetcher.py
import asyncio
import pandas as pd
import akshare as ak# 异步加载单只股票数据
async def load_single_stock(symbol: str, start_date: str, end_date: str) -> pd.DataFrame:loop = asyncio.get_running_loop()# 将同步API放入线程池执行,避免阻塞事件循环df = await loop.run_in_executor(None,lambda: ak.stock_zh_a_hist(symbol=symbol,period="daily",start_date=start_date,end_date=end_date,adjust="qfq"))# 数据清洗:删除无效列df = df[['日期', '开盘', '最高', '最低', '收盘', '成交量']]df.rename(columns={'日期': 'date', '开盘': 'open', '最高': 'high','最低': 'low', '收盘': 'close', '成交量': 'volume'}, inplace=True)df['symbol'] = symbol  # 添加股票代码标识return df# 并发抓取多只股票数据
async def batch_fetch_stocks(stock_codes: list, start_date: str, end_date: str) -> pd.DataFrame:tasks = [asyncio.create_task(load_single_stock(code, start_date, end_date))for code in stock_codes]results = await asyncio.gather(*tasks)# 合并所有股票数据return pd.concat(results, ignore_index=True)# 主函数:执行抓取并保存数据
def main():stock_codes = ["sh600000", "sz000001", "sh600519"]  # 浦发银行、平安银行、贵州茅台start_date, end_date = "20230101", "20250501"# 异步事件循环asyncio.run(asyncio.wait_for(batch_fetch_stocks(stock_codes, start_date, end_date),timeout=600  # 设置10分钟超时)).to_csv(f"stock_data_{start_date}_{end_date}.csv",index=False,encoding="utf-8-sig")print("数据抓取并保存完成")if __name__ == "__main__":main()

3. 效率对比与优势

异步抓取相比传统同步方式,时间开销大幅降低:

  • 同步抓取:100只股票×3年数据≈300秒(每只3秒,串行执行)
  • 异步抓取:100只股票×3年数据≈30秒(并发度50,耗时为单只最大耗时+调度开销)
    实现10倍效率提升,满足高频数据更新需求。

二、大模型赋能:智能股票信息查询系统

1. 工具化封装:让大模型具备专业能力

通过langchain_core.tools@tool装饰器,将股票查询逻辑封装为工具,供大模型调用。完整工具代码如下:

工具模块(stock_tools.py
from langchain_core.tools import tool
import akshare as ak
import pandas as pd@tool
def get_stock_info(code: str = "",name: str = ""
) -> str:"""根据股票代码或名称查询实时行情信息Args:code: 股票代码(格式如"sh600000",上交所;"sz000001",深交所)name: 股票名称(支持模糊查询,如"平安银行")Returns:符合条件的股票实时信息(JSON格式列表)"""# 获取全市场实时行情df = ak.stock_zh_a_spot_em()# 条件筛选filter_conditions = []if code:filter_conditions.append(df['代码'] == code)if name:filter_conditions.append(df['名称'].str.contains(name))if not filter_conditions:return "请提供股票代码或名称"result_df = df[pd.concat(filter_conditions, axis=1).any(axis=1)]return result_df[['代码', '名称', '最新价', '涨跌幅', '成交量']].to_dict(orient='records')

2. 大模型与工具的绑定与调用

模型初始化与工具绑定(llm_agent.py
from langgraph import DeepSeekLLM  # 假设DeepSeek为自定义LLM类
from stock_tools import get_stock_info# 初始化大模型
llm = DeepSeekLLM(model_name="deepseek-1.0",temperature=0.1,max_tokens=512
)# 绑定工具
tools = [get_stock_info]
llm_with_tools = llm.bind_tools(tools)
Function Calling 执行流程
from langchain_core.messages import HumanMessage, SystemMessagedef run_llm_query(query: str):messages = [SystemMessage(content="""你是专业的股票助手,仅通过调用get_stock_info工具回答问题。工具调用格式必须为标准function call,每次最多返回1个工具调用。若工具返回空结果,需告知用户"未找到相关股票"。"""),HumanMessage(content=query)]response = llm_with_tools.invoke(messages)# 处理工具返回if response.tool_calls:tool_result = get_stock_info.invoke(response.tool_calls[0].args)return f"查询结果:{tool_result}"return response.content

3. 工作流设计:节点驱动的智能交互

通过**状态图(StateGraph)**实现条件循环控制,完整代码如下:

状态图构建模块(workflow_builder.py
from langgraph import StateGraph, START, END, MessagesState
from llm_agent import llm_with_tools
from stock_tools import get_stock_info# 定义节点函数
def llm_call_node(state: MessagesState):messages = state["messages"]response = llm_with_tools.invoke(messages)return {"messages": messages + [response]}def tool_execute_node(state: MessagesState):last_msg = state["messages"][-1]if not last_msg.tool_calls:return statetool = get_stock_info  # 假设仅一个工具results = []for call in last_msg.tool_calls:result = tool.invoke(call.args)results.append(f"工具返回:{result}")return {"messages": state["messages"] + [SystemMessage(content="\n".join(results))]}# 条件判断函数
def should_continue(state: MessagesState) -> str:last_msg = state["messages"][-1]return "TOOL" if last_msg.tool_calls else "END"# 构建状态图
def build_workflow():builder = StateGraph(MessagesState)# 添加节点builder.add_node("LLM_CALL", llm_call_node)builder.add_node("TOOL_EXECUTE", tool_execute_node)# 添加边builder.add_edge(START, "LLM_CALL")builder.add_conditional_edges("LLM_CALL",should_continue,{"TOOL": "TOOL_EXECUTE","END": END})builder.add_edge("TOOL_EXECUTE", "LLM_CALL")  # 工具执行后回到LLM节点return builder.compile()# 生成可视化流程图
def visualize_workflow(agent):graph = agent.get_graph(xray=True)graph.draw_mermaid_file("stock_agent_workflow.mmd")  # 生成Mermaid脚本# 可通过在线工具(如Mermaid Live Editor)渲染为流程图

三、系统整合与效果验证

1. 完整调用示例

场景1:代码反查名称
agent = build_workflow()
query = "300750是哪只股票?"
state = {"messages": [HumanMessage(content=query)]}
result = agent.invoke(state)
# 输出:
# 工具返回:[{"代码": "300750", "名称": "宁德时代", ...}]
# 最终回答:300750是宁德时代的股票代码
场景2:批量数据分析
# 1. 获取宁德时代代码
code_query = "宁德时代的股票代码是多少?"
code_result = run_llm_query(code_query)  # 返回"300750"# 2. 抓取历史数据
asyncio.run(batch_fetch_stocks(["300750"], "20230101", "20231231"
)).describe()  # 输出2023年股价统计特征

2. 技术架构图

需要工具
无需工具
用户查询
大模型决策
调用get_stock_info
异步数据接口
数据清洗整合
返回工具结果
直接回答
输出结果

四、完整项目结构

stock_analysis_system/
├─ data/                # 数据存储
│  ├─ stock_data_2023-2025.csv
├─ src/                 # 核心代码
│  ├─ async_data_fetcher.py  # 异步数据抓取
│  ├─ stock_tools.py       # 工具定义
│  ├─ llm_agent.py         # 大模型交互
│  ├─ workflow_builder.py  # 状态图构建
├─ utils/               # 辅助工具
│  ├─ visualization.py   # 流程图生成
├─ tests/               # 测试用例
│  ├─ test_fetcher.py
│  ├─ test_llm.py
├─ requirements.txt     # 依赖清单
│     akshare==1.4.7
│     langchain-core==0.2.0
│     pandas==2.1.3

五、依赖安装与运行

1. 环境配置

# 安装依赖
pip install akshare langchain-core pandas asyncio# 可选:安装绘图依赖(用于状态图可视化)
pip install graphviz mermaid-js

2. 启动流程

  • 运行异步数据抓取:
    python src/async_data_fetcher.py
    
  • 测试大模型查询:
    python src/llm_agent.py "查询贵州茅台的实时股价"
    
  • 生成工作流图:
    python src/workflow_builder.py visualize
    

六、总结与展望

本文实现的系统通过异步IO提升数据抓取效率,通过大模型工具化扩展专业能力,通过状态图实现智能流程控制,形成完整的股票分析技术栈。未来可扩展方向:

  1. 模型增强:接入ChatGPT/GPT-4等更强大LLM,优化复杂问题解析能力
  2. 数据扩展:增加港股、美股数据接口,支持多市场分析
  3. 策略集成:将抓取数据直接输入量化回测框架(如Backtrader),实现“数据-策略-验证”闭环

通过技术整合,该系统可成为金融从业者的高效工具,推动数据驱动的智能投资决策落地。

说明

  1. 代码块使用CSDN支持的Markdown语法,可直接复制粘贴
  2. Mermaid流程图需在CSDN编辑器中开启「Markdown语法」并确保渲染支持
  3. 依赖版本可根据实际环境调整,建议使用requirements.txt统一管理
http://www.xdnf.cn/news/5069.html

相关文章:

  • 在 Flink + Kafka 实时数仓中,如何确保端到端的 Exactly-Once
  • Stable Diffusion进阶之Controlnet插件使用
  • python连接sqllite数据库工具类
  • 二维旋转矩阵:让图形动起来的数学魔法 ✨
  • 操作系统 第2章节 进程,线程和作业
  • 移动设备常用电子屏幕类型对比
  • 互联网大厂Java求职面试:基于RAG的智能问答系统设计与实现-1
  • 驱动-信号量
  • 【Day 23】HarmonyOS开发实战:从AR应用到元宇宙交互
  • 容联云孔淼:AI Agent应深耕垂直场景,从效率提效向价值挖掘升级
  • Godot4.3类星露谷游戏开发之【昼夜循环】
  • 【大模型】LLM概念相关问题(上)
  • C++面向对象特性之多态篇
  • 如何解决按钮重复点击
  • 第十七章,反病毒---防病毒网管
  • MOS关断时波形下降沿振荡怎么解决
  • C语言实现:打印素数、最大公约数
  • gradle3.5的安装以及配置环境变量
  • 进行性核上性麻痹饮食指南:科学膳食守护神经健康
  • OpenMagnetic的介绍与使用
  • Redis 存储原理与数据模型(三)
  • 基于RAG+MCP开发【企文小智】企业智能体
  • (强连通分量)洛谷 P2812 校园网络(加强版)题解
  • 【强化学习】强化学习算法 - 马尔可夫决策过程
  • ROS动态参数 - dynamic reconfigure 动态配置参数
  • JDK21之虚拟线程
  • 在Mathematica中加速绘制图形(LibraryLink)
  • Vue3项目中如何实现网页加载进度条。
  • 专题练习1
  • 图像移动图像归类代码