This commit is contained in:
满脸小星星 2025-12-23 10:53:27 +08:00
parent 112c55fbda
commit 28a1d20529
3 changed files with 377 additions and 1 deletions

21
.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
.git
.gitignore
__pycache__
*.pyc
*.pyo
*.pyd
.Python
env
venv
.env
*.log
logs/
reports/
.idea
.vscode
*.swp
*.swo
*.log
*.csv
*.xlsx
.history

View File

@ -74,6 +74,7 @@ from src.tushare_scripts.moneyflow_ths_collector import collect_moneyflow_ths
from src.tushare_scripts.chip_distribution_collector import collect_chip_distribution
from src.tushare_scripts.stock_factor_collector import collect_stock_factor
from src.tushare_scripts.stock_factor_pro_collector import collect_stock_factor_pro
from src.tushare_scripts.index_daily_collector import collect_index_daily
# 导入科技主题基本面因子选股策略
from src.quantitative_analysis.tech_fundamental_factor_strategy_v3 import TechFundamentalFactorStrategy
@ -270,6 +271,38 @@ def run_stock_daily_collection1():
"message": "股票日线数据采集完成V3-Tushare"
}), 200
@app.route('/scheduler/indexDaily/collection', methods=['GET'])
def run_index_daily_collection():
"""执行指数日K数据采集任务使用Tushare数据源 下午3点四十开始"""
try:
logger.info("开始执行指数日K数据采集Tushare")
# 获取当天日期
today = datetime.now().strftime('%Y-%m-%d')
# 定义数据库连接地址
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
collect_index_daily(
db_url=db_url,
tushare_token=TUSHARE_TOKEN,
mode='daily'
)
logger.info("指数日K数据采集完成Tushare")
except Exception as e:
logger.error(f"启动指数日K数据采集任务失败: {str(e)}")
return jsonify({
"status": "error",
"message": str(e)
}), 500
return jsonify({
"status": "success",
"message": "指数日K数据采集完成Tushare"
}), 200
@app.route('/scheduler/stockDailyHK/collection', methods=['GET'])
def run_stock_daily_collection2():
"""执行港股日线数据采集任务使用Tushare数据源下午5点开始"""
@ -464,7 +497,7 @@ def run_chip_distribution_collection():
# mode='full'
# )
collect_chip_distribution(db_url=db_url, tushare_token=TUSHARE_TOKEN, mode='full',
start_date='2021-01-03', end_date='2021-09-30')
start_date='2020-01-02', end_date='2020-12-31')
# collect_chip_distribution(
# db_url=db_url,
# tushare_token=TUSHARE_TOKEN,

View File

@ -0,0 +1,322 @@
# coding:utf-8
"""
指数日K数据采集工具
功能 Tushare 获取指数日K数据并落库
API 文档: https://tushare.pro/document/2?doc_id=95
说明接口每日盘后更新
"""
import os
import sys
from datetime import datetime, timedelta
from typing import Optional
import pandas as pd
import tushare as ts
from sqlalchemy import create_engine, text
import time
# 添加项目根目录到路径,确保能够读取配置
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(PROJECT_ROOT)
from src.scripts.config import TUSHARE_TOKEN
class IndexDailyCollector:
"""指数日K数据采集器"""
def __init__(
self,
db_url: str,
tushare_token: str,
code_table_name: str = "zs_code_all",
data_table_name: str = "zs_day_data",
):
"""
Args:
db_url: 数据库连接 URL
tushare_token: Tushare Token
code_table_name: 指数代码表名默认 zs_code_all
data_table_name: 指数日K数据表名默认 zs_day_data
"""
self.engine = create_engine(
db_url,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
)
self.code_table_name = code_table_name
self.data_table_name = data_table_name
ts.set_token(tushare_token)
self.pro = ts.pro_api()
print("=" * 60)
print("指数日K数据采集工具")
print(f"指数代码表: {self.code_table_name}")
print(f"指数日K数据表: {self.data_table_name}")
print("=" * 60)
def fetch_all_index_codes(self) -> list:
"""
从数据库获取所有需要采集的指数代码
Returns:
指数代码列表格式: [{"ts_code": "399006.SZ", "name": "创业板"}, ...]
"""
try:
query = f"SELECT zs_code, zs_name FROM {self.code_table_name}"
df = pd.read_sql(query, self.engine)
codes = []
for _, row in df.iterrows():
codes.append({"ts_code": row["zs_code"], "name": row["zs_name"]})
print(f"{self.code_table_name} 获取到 {len(codes)} 个指数")
return codes
except Exception as e:
print(f"获取指数代码失败: {e}")
return []
def fetch_data(
self,
ts_code: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
trade_date: Optional[str] = None,
) -> pd.DataFrame:
"""
调用 Tushare index_daily 接口获取数据
Args 参数同接口文档
"""
try:
if trade_date:
# 单日查询
df = self.pro.index_daily(ts_code=ts_code, trade_date=trade_date)
else:
# 范围查询
df = self.pro.index_daily(
ts_code=ts_code, start_date=start_date, end_date=end_date
)
time.sleep(0.1) # API限频控制
return df
except Exception as exc:
print(f"Tushare index_daily 调用失败 ({ts_code}): {exc}")
return pd.DataFrame()
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Tushare 返回的数据转换为数据库入库格式
"""
if df.empty:
return pd.DataFrame()
result = pd.DataFrame()
result["symbol"] = df["ts_code"] # 使用 symbol 字段存储 ts_code
result["timestamp"] = pd.to_datetime(df["trade_date"], format="%Y%m%d")
result["open"] = df.get("open")
result["high"] = df.get("high")
result["low"] = df.get("low")
result["close"] = df.get("close")
result["pre_close"] = df.get("pre_close")
result["chg"] = df.get("change") # 涨跌额
result["percent"] = df.get("pct_chg") # 涨跌幅
result["volume"] = df.get("vol") # 成交量(手)
result["amount"] = df.get("amount") # 成交额(千元)
return result
def save_dataframe(self, df: pd.DataFrame) -> None:
"""
将数据写入数据库
"""
if df.empty:
print("无数据需要保存")
return
df.to_sql(self.data_table_name, self.engine, if_exists="append", index=False)
print(f"成功写入 {len(df)} 条记录")
def delete_by_date(self, date_str: str) -> None:
"""
删除指定交易日的旧数据
"""
with self.engine.begin() as conn:
delete_start = f"{date_str} 00:00:00"
delete_end = f"{date_str} 23:59:59"
delete_sql = text(
f"DELETE FROM {self.data_table_name} WHERE timestamp >= :start_time AND timestamp <= :end_time"
)
affected = conn.execute(
delete_sql, {"start_time": delete_start, "end_time": delete_end}
)
print(f"已删除 {date_str} 旧数据 {affected.rowcount}")
def run_daily_collection(self, date: Optional[str] = None) -> None:
"""
获取指定交易日默认当天的指数日K数据
zs_code_all 表遍历所有指数
"""
# 获取所有指数代码
index_list = self.fetch_all_index_codes()
if not index_list:
print("未获取到指数代码")
return
target_date = date or datetime.now().strftime("%Y-%m-%d")
trade_date = datetime.strptime(target_date, "%Y-%m-%d").strftime("%Y%m%d")
print(f"开始采集 {target_date} ({trade_date}) 指数日K数据")
self.delete_by_date(target_date)
success_count = 0
failed_count = 0
total_records = 0
for index_info in index_list:
ts_code = index_info["ts_code"]
name = index_info["name"]
print(f"\n正在采集 {name} ({ts_code})...")
try:
df = self.fetch_data(ts_code=ts_code, trade_date=trade_date)
if df.empty:
print(f" {name} 接口未返回数据(可能为非交易日)")
failed_count += 1
continue
result_df = self.transform_data(df)
if result_df.empty:
print(f" {name} 数据转换失败")
failed_count += 1
continue
self.save_dataframe(result_df)
success_count += 1
total_records += len(result_df)
print(f" {name} 采集成功,{len(result_df)} 条记录")
except Exception as exc:
print(f" {name} 采集失败: {exc}")
failed_count += 1
continue
print("\n" + "=" * 60)
print("每日采集完成")
print(f"成功: {success_count} 个指数")
print(f"失败: {failed_count} 个指数")
print(f"累计记录: {total_records}")
print("=" * 60)
def run_historical_collection(self, days: int = 5000) -> None:
"""
执行历史数据采集首次使用
zs_code_all 表遍历所有指数每个指数获取近 N 天数据
"""
# 获取所有指数代码
index_list = self.fetch_all_index_codes()
if not index_list:
print("未获取到指数代码")
return
print("=" * 60)
print(f"开始执行历史数据采集 - 近{days}")
print("=" * 60)
try:
# 计算日期范围
end_date = datetime.now().strftime("%Y%m%d")
start_date = (datetime.now() - timedelta(days=days)).strftime("%Y%m%d")
success_count = 0
failed_count = 0
total_records = 0
for index_info in index_list:
ts_code = index_info["ts_code"]
name = index_info["name"]
print(f"\n正在采集 {name} ({ts_code}) 历史数据...")
try:
# 先删除该指数的所有历史数据
with self.engine.begin() as conn:
delete_sql = text(
f"DELETE FROM {self.data_table_name} WHERE symbol = :ts_code"
)
affected = conn.execute(delete_sql, {"ts_code": ts_code})
print(f" 已删除 {name} 旧数据 {affected.rowcount}")
# 获取历史数据
df = self.fetch_data(
ts_code=ts_code, start_date=start_date, end_date=end_date
)
if df.empty:
print(f" {name} 接口未返回数据")
failed_count += 1
continue
result_df = self.transform_data(df)
if result_df.empty:
print(f" {name} 数据转换失败")
failed_count += 1
continue
self.save_dataframe(result_df)
success_count += 1
total_records += len(result_df)
print(f" {name} 采集成功,{len(result_df)} 条记录")
except Exception as exc:
print(f" {name} 采集失败: {exc}")
failed_count += 1
continue
print("\n" + "=" * 60)
print("历史数据采集完成")
print(f"采集天数: {days}")
print(f"总指数数: {len(index_list)}")
print(f"成功采集: {success_count}")
print(f"失败: {failed_count}")
print(f"累计记录: {total_records}")
print("=" * 60)
except Exception as exc:
print(f"历史数据采集失败: {exc}")
finally:
self.engine.dispose()
def collect_index_daily(
db_url: str,
tushare_token: str,
mode: str = "daily",
date: Optional[str] = None,
days: int = 5000,
):
"""
采集入口
mode:
- daily: 指定日期默认今天的指数数据
- historical: 历史数据采集首次使用
"""
collector = IndexDailyCollector(db_url, tushare_token)
if mode == "daily":
collector.run_daily_collection(date)
elif mode == "historical":
collector.run_historical_collection(days)
else:
raise ValueError(f"未知的采集模式: {mode}")
if __name__ == "__main__":
DB_URL = "mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj"
# DB_URL = "mysql+pymysql://fac_pattern:Chlry$%.8pattern@192.168.16.150:3306/factordb_mysql"
TOKEN = TUSHARE_TOKEN
# 示例:
# 1. 采集当日指数数据
# collect_index_daily(DB_URL, TOKEN, mode="daily")
# 2. 采集指定日期指数数据
collect_index_daily(DB_URL, TOKEN, mode="daily", date="2025-11-19")
# 3. 首次使用采集历史数据5000天
# collect_index_daily(DB_URL, TOKEN, mode="historical", days=8000)
# collect_index_daily(DB_URL, TOKEN, mode="daily", date="2025-11-18")