From 28a1d2052907be513f3fa93c65a100b22487dff9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BB=A1=E8=84=B8=E5=B0=8F=E6=98=9F=E6=98=9F?= Date: Tue, 23 Dec 2025 10:53:27 +0800 Subject: [PATCH] commit; --- .gitignore | 21 ++ src/app.py | 35 +- src/tushare_scripts/index_daily_collector.py | 322 +++++++++++++++++++ 3 files changed, 377 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 src/tushare_scripts/index_daily_collector.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d57403a --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +.git +.gitignore +__pycache__ +*.pyc +*.pyo +*.pyd +.Python +env +venv +.env +*.log +logs/ +reports/ +.idea +.vscode +*.swp +*.swo +*.log +*.csv +*.xlsx +.history \ No newline at end of file diff --git a/src/app.py b/src/app.py index 722aa2f..ce4a7de 100644 --- a/src/app.py +++ b/src/app.py @@ -74,6 +74,7 @@ from src.tushare_scripts.moneyflow_ths_collector import collect_moneyflow_ths from src.tushare_scripts.chip_distribution_collector import collect_chip_distribution from src.tushare_scripts.stock_factor_collector import collect_stock_factor from src.tushare_scripts.stock_factor_pro_collector import collect_stock_factor_pro +from src.tushare_scripts.index_daily_collector import collect_index_daily # 导入科技主题基本面因子选股策略 from src.quantitative_analysis.tech_fundamental_factor_strategy_v3 import TechFundamentalFactorStrategy @@ -270,6 +271,38 @@ def run_stock_daily_collection1(): "message": "股票日线数据采集完成(V3-Tushare)" }), 200 + +@app.route('/scheduler/indexDaily/collection', methods=['GET']) +def run_index_daily_collection(): + """执行指数日K数据采集任务(使用Tushare数据源) 下午3点四十开始""" + try: + logger.info("开始执行指数日K数据采集(Tushare)") + # 获取当天日期 + today = datetime.now().strftime('%Y-%m-%d') + + # 定义数据库连接地址 + db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj' + + collect_index_daily( + db_url=db_url, + tushare_token=TUSHARE_TOKEN, + mode='daily' + ) + + logger.info("指数日K数据采集完成(Tushare)") + except Exception as e: + logger.error(f"启动指数日K数据采集任务失败: {str(e)}") + return jsonify({ + "status": "error", + "message": str(e) + }), 500 + + return jsonify({ + "status": "success", + "message": "指数日K数据采集完成(Tushare)" + }), 200 + + @app.route('/scheduler/stockDailyHK/collection', methods=['GET']) def run_stock_daily_collection2(): """执行港股日线数据采集任务(使用Tushare数据源)下午5点开始""" @@ -464,7 +497,7 @@ def run_chip_distribution_collection(): # mode='full' # ) collect_chip_distribution(db_url=db_url, tushare_token=TUSHARE_TOKEN, mode='full', - start_date='2021-01-03', end_date='2021-09-30') + start_date='2020-01-02', end_date='2020-12-31') # collect_chip_distribution( # db_url=db_url, # tushare_token=TUSHARE_TOKEN, diff --git a/src/tushare_scripts/index_daily_collector.py b/src/tushare_scripts/index_daily_collector.py new file mode 100644 index 0000000..31d76e9 --- /dev/null +++ b/src/tushare_scripts/index_daily_collector.py @@ -0,0 +1,322 @@ +# coding:utf-8 +""" +指数日K数据采集工具 +功能:从 Tushare 获取指数日K数据并落库 +API 文档: https://tushare.pro/document/2?doc_id=95 +说明:接口每日盘后更新 +""" + +import os +import sys +from datetime import datetime, timedelta +from typing import Optional + +import pandas as pd +import tushare as ts +from sqlalchemy import create_engine, text +import time + +# 添加项目根目录到路径,确保能够读取配置 +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.append(PROJECT_ROOT) + +from src.scripts.config import TUSHARE_TOKEN + + +class IndexDailyCollector: + """指数日K数据采集器""" + + def __init__( + self, + db_url: str, + tushare_token: str, + code_table_name: str = "zs_code_all", + data_table_name: str = "zs_day_data", + ): + """ + Args: + db_url: 数据库连接 URL + tushare_token: Tushare Token + code_table_name: 指数代码表名,默认 zs_code_all + data_table_name: 指数日K数据表名,默认 zs_day_data + """ + self.engine = create_engine( + db_url, + pool_size=5, + max_overflow=10, + pool_recycle=3600, + ) + self.code_table_name = code_table_name + self.data_table_name = data_table_name + + ts.set_token(tushare_token) + self.pro = ts.pro_api() + + print("=" * 60) + print("指数日K数据采集工具") + print(f"指数代码表: {self.code_table_name}") + print(f"指数日K数据表: {self.data_table_name}") + print("=" * 60) + + def fetch_all_index_codes(self) -> list: + """ + 从数据库获取所有需要采集的指数代码 + Returns: + 指数代码列表,格式: [{"ts_code": "399006.SZ", "name": "创业板"}, ...] + """ + try: + query = f"SELECT zs_code, zs_name FROM {self.code_table_name}" + df = pd.read_sql(query, self.engine) + codes = [] + for _, row in df.iterrows(): + codes.append({"ts_code": row["zs_code"], "name": row["zs_name"]}) + print(f"从 {self.code_table_name} 获取到 {len(codes)} 个指数") + return codes + except Exception as e: + print(f"获取指数代码失败: {e}") + return [] + + def fetch_data( + self, + ts_code: str, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + trade_date: Optional[str] = None, + ) -> pd.DataFrame: + """ + 调用 Tushare index_daily 接口获取数据 + Args 参数同接口文档 + """ + try: + if trade_date: + # 单日查询 + df = self.pro.index_daily(ts_code=ts_code, trade_date=trade_date) + else: + # 范围查询 + df = self.pro.index_daily( + ts_code=ts_code, start_date=start_date, end_date=end_date + ) + time.sleep(0.1) # API限频控制 + return df + except Exception as exc: + print(f"Tushare index_daily 调用失败 ({ts_code}): {exc}") + return pd.DataFrame() + + def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: + """ + 将 Tushare 返回的数据转换为数据库入库格式 + """ + if df.empty: + return pd.DataFrame() + + result = pd.DataFrame() + result["symbol"] = df["ts_code"] # 使用 symbol 字段存储 ts_code + result["timestamp"] = pd.to_datetime(df["trade_date"], format="%Y%m%d") + result["open"] = df.get("open") + result["high"] = df.get("high") + result["low"] = df.get("low") + result["close"] = df.get("close") + result["pre_close"] = df.get("pre_close") + result["chg"] = df.get("change") # 涨跌额 + result["percent"] = df.get("pct_chg") # 涨跌幅 + result["volume"] = df.get("vol") # 成交量(手) + result["amount"] = df.get("amount") # 成交额(千元) + + return result + + def save_dataframe(self, df: pd.DataFrame) -> None: + """ + 将数据写入数据库 + """ + if df.empty: + print("无数据需要保存") + return + df.to_sql(self.data_table_name, self.engine, if_exists="append", index=False) + print(f"成功写入 {len(df)} 条记录") + + def delete_by_date(self, date_str: str) -> None: + """ + 删除指定交易日的旧数据 + """ + with self.engine.begin() as conn: + delete_start = f"{date_str} 00:00:00" + delete_end = f"{date_str} 23:59:59" + delete_sql = text( + f"DELETE FROM {self.data_table_name} WHERE timestamp >= :start_time AND timestamp <= :end_time" + ) + affected = conn.execute( + delete_sql, {"start_time": delete_start, "end_time": delete_end} + ) + print(f"已删除 {date_str} 旧数据 {affected.rowcount} 条") + + def run_daily_collection(self, date: Optional[str] = None) -> None: + """ + 获取指定交易日(默认当天)的指数日K数据 + 从 zs_code_all 表遍历所有指数 + """ + # 获取所有指数代码 + index_list = self.fetch_all_index_codes() + if not index_list: + print("未获取到指数代码") + return + + target_date = date or datetime.now().strftime("%Y-%m-%d") + trade_date = datetime.strptime(target_date, "%Y-%m-%d").strftime("%Y%m%d") + + print(f"开始采集 {target_date} ({trade_date}) 指数日K数据") + self.delete_by_date(target_date) + + success_count = 0 + failed_count = 0 + total_records = 0 + + for index_info in index_list: + ts_code = index_info["ts_code"] + name = index_info["name"] + print(f"\n正在采集 {name} ({ts_code})...") + + try: + df = self.fetch_data(ts_code=ts_code, trade_date=trade_date) + if df.empty: + print(f" {name} 接口未返回数据(可能为非交易日)") + failed_count += 1 + continue + + result_df = self.transform_data(df) + if result_df.empty: + print(f" {name} 数据转换失败") + failed_count += 1 + continue + + self.save_dataframe(result_df) + success_count += 1 + total_records += len(result_df) + print(f" {name} 采集成功,{len(result_df)} 条记录") + except Exception as exc: + print(f" {name} 采集失败: {exc}") + failed_count += 1 + continue + + print("\n" + "=" * 60) + print("每日采集完成") + print(f"成功: {success_count} 个指数") + print(f"失败: {failed_count} 个指数") + print(f"累计记录: {total_records} 条") + print("=" * 60) + + def run_historical_collection(self, days: int = 5000) -> None: + """ + 执行历史数据采集(首次使用) + 从 zs_code_all 表遍历所有指数,每个指数获取近 N 天数据 + """ + # 获取所有指数代码 + index_list = self.fetch_all_index_codes() + if not index_list: + print("未获取到指数代码") + return + + print("=" * 60) + print(f"开始执行历史数据采集 - 近{days}天") + print("=" * 60) + + try: + # 计算日期范围 + end_date = datetime.now().strftime("%Y%m%d") + start_date = (datetime.now() - timedelta(days=days)).strftime("%Y%m%d") + + success_count = 0 + failed_count = 0 + total_records = 0 + + for index_info in index_list: + ts_code = index_info["ts_code"] + name = index_info["name"] + print(f"\n正在采集 {name} ({ts_code}) 历史数据...") + + try: + # 先删除该指数的所有历史数据 + with self.engine.begin() as conn: + delete_sql = text( + f"DELETE FROM {self.data_table_name} WHERE symbol = :ts_code" + ) + affected = conn.execute(delete_sql, {"ts_code": ts_code}) + print(f" 已删除 {name} 旧数据 {affected.rowcount} 条") + + # 获取历史数据 + df = self.fetch_data( + ts_code=ts_code, start_date=start_date, end_date=end_date + ) + if df.empty: + print(f" {name} 接口未返回数据") + failed_count += 1 + continue + + result_df = self.transform_data(df) + if result_df.empty: + print(f" {name} 数据转换失败") + failed_count += 1 + continue + + self.save_dataframe(result_df) + success_count += 1 + total_records += len(result_df) + print(f" {name} 采集成功,{len(result_df)} 条记录") + except Exception as exc: + print(f" {name} 采集失败: {exc}") + failed_count += 1 + continue + + print("\n" + "=" * 60) + print("历史数据采集完成") + print(f"采集天数: {days}") + print(f"总指数数: {len(index_list)}") + print(f"成功采集: {success_count}") + print(f"失败: {failed_count}") + print(f"累计记录: {total_records}") + print("=" * 60) + except Exception as exc: + print(f"历史数据采集失败: {exc}") + finally: + self.engine.dispose() + + +def collect_index_daily( + db_url: str, + tushare_token: str, + mode: str = "daily", + date: Optional[str] = None, + days: int = 5000, +): + """ + 采集入口 + mode: + - daily: 指定日期(默认今天)的指数数据 + - historical: 历史数据采集(首次使用) + """ + collector = IndexDailyCollector(db_url, tushare_token) + + if mode == "daily": + collector.run_daily_collection(date) + elif mode == "historical": + collector.run_historical_collection(days) + else: + raise ValueError(f"未知的采集模式: {mode}") + + +if __name__ == "__main__": + DB_URL = "mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj" + # DB_URL = "mysql+pymysql://fac_pattern:Chlry$%.8pattern@192.168.16.150:3306/factordb_mysql" + TOKEN = TUSHARE_TOKEN + + # 示例: + # 1. 采集当日指数数据 + # collect_index_daily(DB_URL, TOKEN, mode="daily") + + # 2. 采集指定日期指数数据 + collect_index_daily(DB_URL, TOKEN, mode="daily", date="2025-11-19") + + # 3. 首次使用:采集历史数据(5000天) + # collect_index_daily(DB_URL, TOKEN, mode="historical", days=8000) + + # collect_index_daily(DB_URL, TOKEN, mode="daily", date="2025-11-18") +