import operator
from itertools import groupbyimport numpy as npimport PrivateMethod as pm
import pandas as pd
import datetime
import matplotlib.pyplot as plt
import DbOpr as dbopr
import os
import akshare as ak# 短期移动均线>长期移动均线:买;反之则卖
def calculate_moving_average(data, short_window, long_window):data['short_ma'] = data['最新价'].rolling(window=short_window).mean()data['long_ma'] = data['最新价'].rolling(window=long_window).mean()return data# 布林带:中轨、上轨和下轨,价格突破上轨,卖出,价格跌破下轨,买入
def calculate_bolling_brands(data, window=20, num_std_dev=2):data['middle_band'] = data['最新价'].rolling(window=window).mean()data['std_dev'] = data['最新价'].rolling(window=window).std()data['upper_band'] = data['middle_band'] + num_std_dev * data['std_dev']data['lower_band'] = data['middle_band'] - num_std_dev * data['std_dev']return data# RSI值在70以上通常是超买区域,低于30以下为超卖区域
def calculate_rsi(data, window=14):delta = data['最新价'].astype(float).diff()gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()rs = gain / lossdata['RSI'] = round(100 - (100 / (1 + rs)), 3)return data
def fund_flow_index(code):market = 'sz'if code.startswith('60') or code.startswith('68'):market = 'sh'print('code,', code, '------------------market,', market)individual_flow = ak.stock_individual_fund_flow(code, market=market)# 根据日期进行倒序排序,保证最新的数据靠前flow_desc = individual_flow.sort_values('日期', ascending=False)if len(flow_desc) > 5:# 资金量字段fund_flow = flow_desc[['主力净流入-净额', '超大单净流入-净额', '大单净流入-净额', '中单净流入-净额', '小单净流入-净额']]# numpy.sum函数实现资金的汇总last_day_flow = np.sum(fund_flow[0:1].astype(float), axis=0)# sum函数实现资金合计one_total = sum(last_day_flow)print(str(flow_desc['日期'][len(flow_desc) - 1]) + "当日资金量:", one_total)two_day_flow = np.sum(fund_flow[0:2].astype(float), axis=0)print("2天资金量:", sum(two_day_flow))two_total = sum(two_day_flow)three_day_flow = np.sum(fund_flow[0:3].astype(float), axis=0)print("3天资金量:", sum(three_day_flow))three_total = sum(three_day_flow)five_day_flow = np.sum(fund_flow[0:5].astype(float), axis=0)five_total = sum(five_day_flow)print("5天资金量:", str(five_total))ten_day_flow = np.sum(fund_flow[0:10].astype(float), axis=0)ten_total = sum(ten_day_flow)print("10天资金量:", str(ten_total))seven_day_flow = np.sum(fund_flow[0:7].astype(float), axis=0)seven_total = sum(seven_day_flow)print("7天资金量:", seven_total)fifteen_day_flow = np.sum(fund_flow[0:15].astype(float), axis=0)fifteen_total = sum(fifteen_day_flow)print("15天资金量:", fifteen_total)s = pd.Series([one_total > 0, two_total > 0, three_total > two_total, three_total < five_total,fifteen_total < ten_total])# 资金曲率 趋势flow_data_source = np.sum(fund_flow, axis=1)flow_cox_status = pm.cox_stuart(flow_data_source, False)s.add(flow_cox_status == '上升')# ############ 2天资金均值、5天资金均值、10天资金均值#################### 2天均值>5天均值>10天均值>0,买入# 2天均值<0 或者5天均值>10天均值 卖出# 资金权重fund_flow_sum_per_day = np.sum(fund_flow.astype(float), axis=1)weights = [0.6, 0.4]two_day_avg = np.average(fund_flow_sum_per_day[0:2].astype(float), axis=0, weights=weights)weightsFive = [0.5, 0.4, 0.3, 0.2, 0.1]five_day_avg = np.average(fund_flow_sum_per_day[0:5].astype(float), axis=0, weights=weightsFive)ten_day_avg = np.average(fund_flow_sum_per_day[0:10].astype(float), axis=0)cash_buy_sign = -1var = s.add(two_day_avg > five_day_avg > 0 andfloat((five_day_avg - ten_day_avg) / ten_day_avg) > 0)if three_total != 0:s.add(float((one_total - three_total) / three_total) > 0.35)if s.all():cash_buy_sign = 1else:cash_buy_sign = -1return cash_buy_sign
def generate_trade_signal(data):data['signal'] = 0# 移动平均线交叉信号data.loc[data['short_ma'] > data['long_ma'], 'signal'] = 1 # 买入信号data.loc[data['short_ma'] < data['long_ma'], 'signal'] = -1 # 卖出信号# RSI 信号data.loc[data['RSI'] < 30, 'signal'] = 1 # RSI 超卖,买入信号data.loc[data['RSI'] > 70, 'signal'] = -1 # RSI 超买,卖出信号# 布林带信号data.loc[data['最新价'].astype(float) < data['lower_band'], 'signal'] = 1 # 跌破下轨,买入信号data.loc[data['最新价'].astype(float) > data['upper_band'], 'signal'] = -1 # 突破上轨,卖出信号return data# 回归测试函数
def backtest_strategy(data, init_balance=25000):balance = init_balanceposition = 0 # 是否持仓,0未持仓portfolio_value = []for i in range(1, len(data)):if data['signal'].iloc[i] == 1 and position == 0:if data['最新价'].astype(float).iloc[i] > 0:position = balance / data['最新价'].astype(float).iloc[i]balance = 0elif data['signal'].iloc[i] == -1 and position > 0:balance = balance * data['最新价'].astype(float).iloc[i]position = 0portfolio_value.append(balance + position * data['最新价'].astype(float).iloc[i])data['portfolio_value'] = [init_balance] + portfolio_valuereturn data
def fund_flow_index(code):market = 'sz'if code.startswith('60') or code.startswith('68'):market = 'sh'print('code,', code, '------------------market,', market)individual_flow = ak.stock_individual_fund_flow(code, market=market)# 根据日期进行倒序排序,保证最新的数据靠前flow_desc = individual_flow.sort_values('日期', ascending=False)if len(flow_desc) > 5:# 资金量字段fund_flow = flow_desc[['主力净流入-净额', '超大单净流入-净额', '大单净流入-净额', '中单净流入-净额', '小单净流入-净额']]# numpy.sum函数实现资金的汇总last_day_flow = np.sum(fund_flow[0:1].astype(float), axis=0)# sum函数实现资金合计one_total = sum(last_day_flow)print(str(flow_desc['日期'][len(flow_desc) - 1]) + "当日资金量:", one_total)two_day_flow = np.sum(fund_flow[0:2].astype(float), axis=0)print("2天资金量:", sum(two_day_flow))two_total = sum(two_day_flow)three_day_flow = np.sum(fund_flow[0:3].astype(float), axis=0)print("3天资金量:", sum(three_day_flow))three_total = sum(three_day_flow)five_day_flow = np.sum(fund_flow[0:5].astype(float), axis=0)five_total = sum(five_day_flow)print("5天资金量:", str(five_total))ten_day_flow = np.sum(fund_flow[0:10].astype(float), axis=0)ten_total = sum(ten_day_flow)print("10天资金量:", str(ten_total))seven_day_flow = np.sum(fund_flow[0:7].astype(float), axis=0)seven_total = sum(seven_day_flow)print("7天资金量:", seven_total)fifteen_day_flow = np.sum(fund_flow[0:15].astype(float), axis=0)fifteen_total = sum(fifteen_day_flow)print("15天资金量:", fifteen_total)s = pd.Series([one_total > 0, two_total > 0, three_total > two_total, three_total < five_total,fifteen_total < ten_total])# 资金曲率 趋势flow_data_source = np.sum(fund_flow, axis=1)flow_cox_status = pm.cox_stuart(flow_data_source, False)s.add(flow_cox_status == '上升')# ############ 2天资金均值、5天资金均值、10天资金均值#################### 2天均值>5天均值>10天均值>0,买入# 2天均值<0 或者5天均值>10天均值 卖出# 资金权重fund_flow_sum_per_day = np.sum(fund_flow.astype(float), axis=1)weights = [0.6, 0.4]two_day_avg = np.average(fund_flow_sum_per_day[0:2].astype(float), axis=0, weights=weights)weightsFive = [0.5, 0.4, 0.3, 0.2, 0.1]five_day_avg = np.average(fund_flow_sum_per_day[0:5].astype(float), axis=0, weights=weightsFive)ten_day_avg = np.average(fund_flow_sum_per_day[0:10].astype(float), axis=0)cash_buy_sign = -1var = s.add(two_day_avg > five_day_avg > 0 andfloat((five_day_avg - ten_day_avg) / ten_day_avg) > 0)if three_total != 0:s.add(float((one_total - three_total) / three_total) > 0.35)if s.all():cash_buy_sign = 1else:cash_buy_sign = -1return cash_buy_sign
def strategyWithFigure():query_sql = ("SELECT `代码`,`名称`,`日期`,`净资产收益率(%%)`,`主营业务利润(元)`,`主营业务利润率(%%)`,`每股经营性现金流(元)`,`扣除非经常性损益后的净利润(元)`,""`总资产净利润率(%%)`,`总资产增长率(%%)`,`总资产周转天数(天)`,`主营业务收入增长率(%%)`,`净利润增长率(%%)` from finance_main_info where ""代码=%s order by `日期` desc")data_list = pm.read_data_from_excel("D:\\\dev\\stockData\\stock-2025-02.csv")data_sort = sorted(data_list, key=lambda x: (x["代码"], x["交易日"]))data_group = groupby(data_sort, key=lambda x: (x["代码"]))counter = 0today = datetime.date.today().strftime("%Y-%m-%d")df = finance_catch(today)for key, group in data_group:counter += 1# if counter<500:# continueif counter > 5686:plt.close()breakif key.startswith('N') or operator.contains(key, 'ST'): # 排除新发股和STcontinueitem_list = list(group)print(key, '----------------', len(item_list))count = len(item_list)dataframe = pd.DataFrame(item_list)dataframe = dataframe[['代码', '名称', '最新价', '交易日']]s_name = item_list[0]['名称']if operator.contains(s_name, '*'):continuedataframe['交易日'] = pd.to_datetime(dataframe['交易日'])dataframe = dataframe.sort_values(by='交易日')active_code = []# 1、满足财务指标if df is not None and df.size > 0:active_code = df['code'].astype(str).tolist()if active_code.__contains__(key):# 最新价格price_list = dataframe['最新价'].astype(float)coffes = pm.trend_coeffs(price_list)trend = pm.slop_trend(coffes, price_list)# 2、满足模型指标if trend != "上升":continue# 3、满足资金流指标buy_sign = fund_flow_index(key)if buy_sign != 1:continueprice_now = price_list[count - 1:count].item()stock_data = calculate_moving_average(dataframe, 10, 22)stock_data = calculate_bolling_brands(stock_data)stock_data = calculate_rsi(stock_data)stock_data = generate_trade_signal(stock_data)stock_data = backtest_strategy(stock_data)plt.figure(figsize=(12, 6))plt.rcParams["font.sans-serif"] = ["SimHei"] # 设置字体plt.rcParams["axes.unicode_minus"] = False # 正常显示负号date_arr = pd.date_range(today, periods=len(stock_data)).strftime('%m-%d').tolist()plt.plot(date_arr, stock_data['portfolio_value'], label='portfolio_value')plt.title(f'{s_name} Trend follow strategy test')plt.xticks(date_arr[::2])plt.xlabel('日期')plt.ylabel('组合策略值')plt.legend()plt.grid()path = "D://dev/python/download/ams/" + todayif not os.path.exists(path):os.makedirs(path)path = path + "/"plt.savefig(path + key + s_name + "_" + today + ".jpg")plt.close()if __name__ == '__main__':strategyWithFigure()