当前位置: 首页 > ds >正文

使用Python对接StockTV印度股票数据源的详细教程

使用Python对接StockTV印度股票数据源的详细教程,包含多个接口调用示例和代码说明:
获取key

import requests
import json
import pandas as pd
from websockets import connect
import asyncio# StockTV API基础配置
BASE_URL = "https://api.stocktv.top/stock"
API_KEY = "YOUR_API_KEY"  # 替换为实际获取的API Key# ================== 通用请求函数 ==================
def stocktv_api_request(endpoint, params=None):"""发送API请求的通用函数"""if params is None:params = {}params["key"] = API_KEY  # 添加API Keyurl = f"{BASE_URL}/{endpoint}"try:response = requests.get(url, params=params)response.raise_for_status()  # 检查HTTP错误return response.json()except requests.exceptions.RequestException as e:print(f"API请求失败: {e}")return None# ================== 印度股票接口示例 ==================# 示例1:获取印度市场股票列表
def get_indian_stocks(page=1, page_size=10):"""获取印度股票市场列表参数:countryId=14(印度国家代码)"""params = {"countryId": 14,"page": page,"pageSize": page_size}data = stocktv_api_request("stocks", params)if data and data.get("code") == 200:df = pd.DataFrame(data["data"]["records"])# 数据清洗df["time"] = pd.to_datetime(df["time"], unit="s") return df[["id", "symbol", "name", "last", "chgPct", "volume"]]return None# 示例2:查询特定股票详细信息
def get_stock_detail(stock_pid):"""通过股票PID查询详细信息"""params = {"id": stock_pid}data = stocktv_api_request("queryStocks", params)if data and data.get("code") == 200:return data["data"][0]return None# 示例3:获取股票K线数据
def get_kline_data(stock_pid, interval="PT15M"):"""获取股票K线数据支持间隔:PT5M/PT15M/PT1H/P1D等"""params = {"pid": stock_pid,"interval": interval}data = stocktv_api_request("kline", params)if data and data.get("code") == 200:df = pd.DataFrame(data["data"])df["time"] = pd.to_datetime(df["time"], unit="ms")return dfreturn None# 示例4:WebSocket实时数据订阅
async def stock_websocket():"""WebSocket实时数据订阅示例"""uri = f"wss://ws-api.stocktv.top/connect?key={API_KEY}"async with connect(uri) as websocket:# 心跳保持(每30秒发送心跳)while True:try:# 接收实时数据message = await asyncio.wait_for(websocket.recv(), timeout=30)data = json.loads(message)# 解析数据示例print(f"\n实时更新 [{data['time']}]")print(f"股票ID: {data['pid']}")print(f"最新价: {data['last_numeric']}")print(f"涨跌幅: {data['pcp']}%")# 发送心跳包await websocket.send('{"action":"ping"}')except asyncio.TimeoutError:await websocket.send('{"action":"ping"}')continue# ================== 使用示例 ==================
if __name__ == "__main__":# 获取印度股票列表print("印度股票市场列表:")indian_stocks = get_indian_stocks()print(indian_stocks.head())# 查询NSEI指数(示例PID)print("\nNifty 50指数详情:")nifty_data = get_stock_detail(17940)print(json.dumps(nifty_data, indent=2))# 获取TCS股票的15分钟K线print("\nTCS股票K线数据:")tcs_kline = get_kline_data(7310)  # 假设7310是TCS的PIDprint(tcs_kline.head())# 启动WebSocket监听(需要异步环境)# asyncio.get_event_loop().run_until_complete(stock_websocket())

================== 代码说明 ==================

  1. 通用请求函数

    • 使用requests库处理HTTP请求
    • 自动添加API Key参数
    • 包含错误处理机制
  2. 核心功能模块

    • 市场列表:获取印度市场股票基础信息
    • 股票查询:通过PID获取详细数据
    • K线数据:获取不同时间粒度的历史数据
    • WebSocket:实时行情订阅(支持异步)
  3. 数据处理

    • 使用Pandas进行数据格式化
    • 时间戳转换(秒→日期时间)
    • 关键数据字段筛选
  4. 注意事项

    • 替换YOUR_API_KEY为实际申请的密钥
    • WebSocket需要异步环境运行
    • 建议添加速率限制(默认QPS=5)
    • 生产环境需添加更完善的错误处理

扩展建议

  1. 数据存储:添加数据库存储模块(MySQL/MongoDB)
def save_to_db(data, collection_name):from pymongo import MongoClientclient = MongoClient("mongodb://localhost:27017")db = client["stocktv"]return db[collection_name].insert_many(data)
  1. 可视化:使用Matplotlib绘制K线图
def plot_kline(df):import matplotlib.pyplot as pltplt.figure(figsize=(12,6))plt.plot(df["time"], df["close"], label="Close Price")plt.title("Stock Price Trend")plt.xlabel("Date")plt.ylabel("Price")plt.legend()plt.show()
  1. 报警机制:设置价格预警
def price_alert(pid, threshold):async def check_price():while True:data = get_stock_detail(pid)if float(data["last"]) > threshold:print(f"预警!股票{pid}价格超过{threshold}")await asyncio.sleep(60)  # 每分钟检查一次asyncio.create_task(check_price())

注意事项

  1. API限制

    • 免费版通常有调用频率限制(如5次/秒)
    • 历史数据获取注意时间范围限制
  2. 时区处理

    # 设置印度时区(UTC+5:30)
    df["time"] = df["time"].dt.tz_localize("UTC").dt.tz_convert("Asia/Kolkata")
    
  3. 错误代码处理

ERROR_CODES = {401: "无效API Key",429: "请求过于频繁",500: "服务器内部错误"
}def handle_error(response):if response.status_code in ERROR_CODES:print(f"错误 {response.status_code}: {ERROR_CODES[response.status_code]}")else:print(f"未知错误: {response.text}")

建议在实际使用中结合具体需求调整参数和处理逻辑,并添加日志记录、重试机制等生产级功能。

http://www.xdnf.cn/news/2966.html

相关文章:

  • MiniLLM:大型语言模型的知识蒸馏
  • InnoDB对LRU算法的优化
  • 哪些CAD看图软件适合初学者使用?
  • Jackson 使用方法详解
  • Starrocks导入数据时报错too many versions
  • 网络安全之红队LLM的大模型自动化越狱
  • RAG当知识库非常大导致大语言模型不准确,该如何处理
  • 通过langchain访问大模型并实现简单的查询
  • 操作系统——第四章(文件管理与文件的逻辑结构)
  • power bi获取局域网内共享文件
  • arm设备树基础知识
  • “专精特新”中小企业数字化转型呈现 4 大转型特征
  • 同步时钟与异步时钟
  • 1.24g 雨晨 19045.5796 Windows 10 企业版 x64 极速版
  • pymsql(SQL注入与防SQL注入)
  • Spring反射机制
  • Dijkstra算法的学习
  • cmake qt 项目编译
  • 开源 Agent 框架对比:LangChain vs AutoGen vs CrewAI
  • 牛客:AB1 【模板】栈
  • 天猫TP代运营服务商-品融电商:助力品牌破局增长的专业推手
  • 【HCIA】4种NAT的配置方式
  • AI专题(二)----由浅入深初识LLM
  • 大模型性能测试
  • 数据要素如何驱动的新质IDC一体化运营体系发展?
  • Jtti:nginx服务器如何限制访问频率
  • 在android 系统上qnn sdk转换,运行模型示例
  • MCU低功耗运行模式与唤醒机制解析
  • 数据结构每日一题day12(链表)★★★★★
  • 【AI论文】PHYBench:大型语言模型中物理感知与推理能力的全面评估