使用Python对接StockTV印度股票数据源的详细教程
使用Python对接StockTV印度股票数据源的详细教程,包含多个接口调用示例和代码说明:
获取key
import requests
import json
import pandas as pd
from websockets import connect
import asyncio# StockTV API基础配置
BASE_URL = "https://api.stocktv.top/stock"
API_KEY = "YOUR_API_KEY" # 替换为实际获取的API Key# ================== 通用请求函数 ==================
def stocktv_api_request(endpoint, params=None):"""发送API请求的通用函数"""if params is None:params = {}params["key"] = API_KEY # 添加API Keyurl = f"{BASE_URL}/{endpoint}"try:response = requests.get(url, params=params)response.raise_for_status() # 检查HTTP错误return response.json()except requests.exceptions.RequestException as e:print(f"API请求失败: {e}")return None# ================== 印度股票接口示例 ==================# 示例1:获取印度市场股票列表
def get_indian_stocks(page=1, page_size=10):"""获取印度股票市场列表参数:countryId=14(印度国家代码)"""params = {"countryId": 14,"page": page,"pageSize": page_size}data = stocktv_api_request("stocks", params)if data and data.get("code") == 200:df = pd.DataFrame(data["data"]["records"])# 数据清洗df["time"] = pd.to_datetime(df["time"], unit="s") return df[["id", "symbol", "name", "last", "chgPct", "volume"]]return None# 示例2:查询特定股票详细信息
def get_stock_detail(stock_pid):"""通过股票PID查询详细信息"""params = {"id": stock_pid}data = stocktv_api_request("queryStocks", params)if data and data.get("code") == 200:return data["data"][0]return None# 示例3:获取股票K线数据
def get_kline_data(stock_pid, interval="PT15M"):"""获取股票K线数据支持间隔:PT5M/PT15M/PT1H/P1D等"""params = {"pid": stock_pid,"interval": interval}data = stocktv_api_request("kline", params)if data and data.get("code") == 200:df = pd.DataFrame(data["data"])df["time"] = pd.to_datetime(df["time"], unit="ms")return dfreturn None# 示例4:WebSocket实时数据订阅
async def stock_websocket():"""WebSocket实时数据订阅示例"""uri = f"wss://ws-api.stocktv.top/connect?key={API_KEY}"async with connect(uri) as websocket:# 心跳保持(每30秒发送心跳)while True:try:# 接收实时数据message = await asyncio.wait_for(websocket.recv(), timeout=30)data = json.loads(message)# 解析数据示例print(f"\n实时更新 [{data['time']}]")print(f"股票ID: {data['pid']}")print(f"最新价: {data['last_numeric']}")print(f"涨跌幅: {data['pcp']}%")# 发送心跳包await websocket.send('{"action":"ping"}')except asyncio.TimeoutError:await websocket.send('{"action":"ping"}')continue# ================== 使用示例 ==================
if __name__ == "__main__":# 获取印度股票列表print("印度股票市场列表:")indian_stocks = get_indian_stocks()print(indian_stocks.head())# 查询NSEI指数(示例PID)print("\nNifty 50指数详情:")nifty_data = get_stock_detail(17940)print(json.dumps(nifty_data, indent=2))# 获取TCS股票的15分钟K线print("\nTCS股票K线数据:")tcs_kline = get_kline_data(7310) # 假设7310是TCS的PIDprint(tcs_kline.head())# 启动WebSocket监听(需要异步环境)# asyncio.get_event_loop().run_until_complete(stock_websocket())
================== 代码说明 ==================
-
通用请求函数
- 使用
requests
库处理HTTP请求 - 自动添加API Key参数
- 包含错误处理机制
- 使用
-
核心功能模块
- 市场列表:获取印度市场股票基础信息
- 股票查询:通过PID获取详细数据
- K线数据:获取不同时间粒度的历史数据
- WebSocket:实时行情订阅(支持异步)
-
数据处理
- 使用Pandas进行数据格式化
- 时间戳转换(秒→日期时间)
- 关键数据字段筛选
-
注意事项
- 替换
YOUR_API_KEY
为实际申请的密钥 - WebSocket需要异步环境运行
- 建议添加速率限制(默认QPS=5)
- 生产环境需添加更完善的错误处理
- 替换
扩展建议
- 数据存储:添加数据库存储模块(MySQL/MongoDB)
def save_to_db(data, collection_name):from pymongo import MongoClientclient = MongoClient("mongodb://localhost:27017")db = client["stocktv"]return db[collection_name].insert_many(data)
- 可视化:使用Matplotlib绘制K线图
def plot_kline(df):import matplotlib.pyplot as pltplt.figure(figsize=(12,6))plt.plot(df["time"], df["close"], label="Close Price")plt.title("Stock Price Trend")plt.xlabel("Date")plt.ylabel("Price")plt.legend()plt.show()
- 报警机制:设置价格预警
def price_alert(pid, threshold):async def check_price():while True:data = get_stock_detail(pid)if float(data["last"]) > threshold:print(f"预警!股票{pid}价格超过{threshold}")await asyncio.sleep(60) # 每分钟检查一次asyncio.create_task(check_price())
注意事项
-
API限制:
- 免费版通常有调用频率限制(如5次/秒)
- 历史数据获取注意时间范围限制
-
时区处理:
# 设置印度时区(UTC+5:30) df["time"] = df["time"].dt.tz_localize("UTC").dt.tz_convert("Asia/Kolkata")
-
错误代码处理:
ERROR_CODES = {401: "无效API Key",429: "请求过于频繁",500: "服务器内部错误"
}def handle_error(response):if response.status_code in ERROR_CODES:print(f"错误 {response.status_code}: {ERROR_CODES[response.status_code]}")else:print(f"未知错误: {response.text}")
建议在实际使用中结合具体需求调整参数和处理逻辑,并添加日志记录、重试机制等生产级功能。