stock_fundamentals/src/scripts/stock_daily_data_collector.py

482 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding:utf-8
import requests
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
from tqdm import tqdm
from src.scripts.config import XUEQIU_HEADERS
from src.scripts.ProxyIP import EnhancedProxyManager
import gc
class StockDailyDataCollector:
"""股票日线数据采集器类"""
def __init__(self, db_url):
self.engine = create_engine(
db_url,
pool_size=5,
max_overflow=10,
pool_recycle=3600
)
self.headers = XUEQIU_HEADERS
# 初始化代理管理器
self.proxy_manager = EnhancedProxyManager()
def fetch_all_stock_codes(self):
# 从gp_code_all获取股票代码
query_all = "SELECT gp_code FROM gp_code_all"
df_all = pd.read_sql(query_all, self.engine)
codes_all = df_all['gp_code'].tolist()
# 从gp_code_zs获取股票代码
query_zs = "SELECT gp_code FROM gp_code_zs"
df_zs = pd.read_sql(query_zs, self.engine)
codes_zs = df_zs['gp_code'].tolist()
# 从gp_code_hk获取股票代码
query_hk = "SELECT gp_code FROM gp_code_hk"
df_hk = pd.read_sql(query_hk, self.engine)
codes_hk = df_hk['gp_code'].tolist()
# 合并去重
# all_codes = list(set(codes_all + codes_zs + codes_hk))
all_codes = list(set(codes_zs + codes_hk))
print(f"获取到股票代码: {len(codes_all)} 个来自gp_code_all, {len(codes_zs)}个来自gp_code_zs, {len(codes_hk)}个来自gp_code_hk, 去重后共{len(all_codes)}")
return all_codes
def fetch_daily_stock_data(self, symbol, begin, count=-1):
"""获取日线数据count=-1表示最新一天-2表示最近两天-1800表示最近1800天"""
url = f"https://stock.xueqiu.com/v5/stock/chart/kline.json?symbol={symbol}&begin={begin}&period=day&type=before&count={count}&indicator=kline,pe,pb,ps,pcf,market_capital,agt,ggt,balance"
try:
# 使用代理管理器发送请求
# response = requests.get(url, headers=self.headers, timeout=20)
response = self.proxy_manager.request_with_proxy('get', url, headers=self.headers)
return response.json()
except Exception as e:
print(f"Request error for {symbol}: {e}")
return {'error_code': -1, 'error_description': str(e)}
def transform_data(self, data, symbol):
try:
items = data['data']['item']
columns = data['data']['column']
except KeyError as e:
print(f"KeyError for {symbol}: {e}")
return None
df = pd.DataFrame(items, columns=columns)
df['symbol'] = symbol
required_columns = ['timestamp', 'volume', 'open', 'high', 'low', 'close',
'chg', 'percent', 'turnoverrate', 'amount', 'symbol', 'pb', 'pe', 'ps']
existing_columns = [col for col in required_columns if col in df.columns]
df = df[existing_columns]
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai')
return df
def save_batch_to_database(self, batch):
if batch:
df_all = pd.concat(batch, ignore_index=True)
df_all.to_sql('gp_day_data', self.engine, if_exists='append', index=False)
def fetch_data_for_date(self, date=None):
if date is None:
start_date = datetime.now()
date_str = start_date.strftime('%Y-%m-%d')
else:
start_date = datetime.strptime(date, '%Y-%m-%d')
date_str = date
# delete_query = text("DELETE FROM gp_day_data WHERE `timestamp` LIKE :date_str")
# with self.engine.begin() as conn:
# conn.execute(delete_query, {"date_str": f"{date_str}%"})
stock_codes = self.fetch_all_stock_codes()
begin = int(start_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000)
batch_data = []
for idx, symbol in enumerate(tqdm(stock_codes, desc=f"Fetching and saving daily stock data for {date_str}")):
data = self.fetch_daily_stock_data(symbol, begin)
if data.get('error_code') == 0:
df = self.transform_data(data, symbol)
if df is not None:
batch_data.append(df)
else:
print(f"Error fetching data for {symbol} on {date_str}: {data.get('error_description')}")
if len(batch_data) >= 100:
self.save_batch_to_database(batch_data)
batch_data.clear()
gc.collect()
# Save remaining data
if batch_data:
self.save_batch_to_database(batch_data)
gc.collect()
self.engine.dispose()
print(f"Daily data fetching and saving completed for {date_str}.")
def delete_stock_history(self, symbol):
"""删除指定股票的全部历史数据"""
delete_query = text("DELETE FROM gp_day_data WHERE symbol = :symbol")
try:
with self.engine.begin() as conn:
conn.execute(delete_query, {"symbol": symbol})
print(f"Deleted history for {symbol}")
return True
except Exception as e:
print(f"Error deleting history for {symbol}: {e}")
return False
def refetch_and_save_history(self, symbol, days=1800):
"""重新获取并保存指定股票的长期历史数据"""
print(f"Refetching last {days} days for {symbol}...")
begin = int(datetime.now().timestamp() * 1000)
data = self.fetch_daily_stock_data(symbol, begin, count=-days)
if data.get('error_code') == 0:
df = self.transform_data(data, symbol)
if df is not None and not df.empty:
self.save_batch_to_database([df])
print(f"Successfully refetched and saved history for {symbol}.")
else:
print(f"No data transformed for {symbol} after refetch.")
else:
print(f"Error refetching history for {symbol}: {data.get('error_description')}")
def check_and_fix_ex_rights_data(self):
"""
检查所有股票是否发生除权,如果发生,则删除历史数据并重新获取。
新逻辑直接用API返回的上个交易日的时间戳去数据库查询更稳妥。
记录:除权日期、股票代码()、
"""
all_codes = self.fetch_all_stock_codes()
ex_rights_log_data = []
print("--- Step 1: Checking for ex-rights stocks ---")
for symbol in tqdm(all_codes, desc="Comparing prices"):
# 1. 从API获取最近两天的日线数据
begin = int(datetime.now().timestamp() * 1000)
data = self.fetch_daily_stock_data(symbol, begin, count=-2)
api_timestamp_str = None
api_close = None
if data.get('error_code') == 0 and data.get('data', {}).get('item') and len(data['data']['item']) >= 2:
try:
# API返回的数据是按时间升序的[-2]是上个交易日
prev_day_data = data['data']['item'][-2]
columns = data['data']['column']
timestamp_index = columns.index('timestamp')
close_index = columns.index('close')
api_timestamp_ms = prev_day_data[timestamp_index]
api_close = prev_day_data[close_index]
# 将毫秒时间戳转换为'YYYY-MM-DD'格式,用于数据库查询
api_timestamp_str = pd.to_datetime(api_timestamp_ms, unit='ms', utc=True).tz_convert('Asia/Shanghai').strftime('%Y-%m-%d')
except (ValueError, IndexError, TypeError) as e:
print(f"\nError parsing API data for {symbol}: {e}")
continue # 处理下一只股票
else:
# 获取API数据失败或数据不足跳过此股票
continue
# 如果未能从API解析出上个交易日的数据则跳过
if api_timestamp_str is None or api_close is None:
continue
# 2. 根据API返回的时间戳从数据库查询当天的收盘价
db_close = None
query = text("SELECT `close` FROM gp_day_data WHERE symbol = :symbol AND `timestamp` LIKE :date_str")
try:
with self.engine.connect() as conn:
result = conn.execute(query, {"symbol": symbol, "date_str": f"{api_timestamp_str}%"}).fetchone()
db_close = result[0] if result else None
except Exception as e:
print(f"\nError getting DB close for {symbol} on {api_timestamp_str}: {e}")
continue
# 3. 比较价格
if db_close is not None:
# 注意数据库中取出的db_close可能是Decimal类型需要转换
if not abs(float(db_close) - api_close) < 0.001:
print(f"\nEx-rights detected for {symbol} on {api_timestamp_str}: DB_close={db_close}, API_close={api_close}")
ex_rights_log_data.append({
'symbol': symbol,
'date': datetime.now().strftime('%Y-%m-%d'),
'db_price': float(db_close),
'api_price': api_close,
'log_time': datetime.now()
})
# 如果数据库当天没有数据,我们无法比较,所以不处理。
# 这可能是新股或之前采集失败,不属于除权范畴。
# 4. 对发生除权的股票进行记录和修复
if not ex_rights_log_data:
print("\n--- No ex-rights stocks found. Data is consistent. ---")
self.engine.dispose()
return
# 在修复前,先将日志保存到数据库
self.save_ex_rights_log(ex_rights_log_data)
# 从日志数据中提取出需要修复的股票代码列表
ex_rights_stocks = [item['symbol'] for item in ex_rights_log_data]
print(f"\n--- Step 2: Found {len(ex_rights_stocks)} stocks to fix: {ex_rights_stocks} ---")
for symbol in tqdm(ex_rights_stocks, desc="Fixing data"):
if self.delete_stock_history(symbol):
self.refetch_and_save_history(symbol, days=1800)
self.engine.dispose()
print("\n--- Ex-rights data fixing process completed. ---")
def save_ex_rights_log(self, log_data: list):
"""将除权日志保存到数据库"""
if not log_data:
return
print(f"--- Saving {len(log_data)} ex-rights events to log table... ---")
try:
df = pd.DataFrame(log_data)
# 确保列名与数据库字段匹配
df = df.rename(columns={
'symbol': 'stock_code',
'date': 'change_date',
'db_price': 'before_price',
'api_price': 'after_price',
'log_time': 'update_time'
})
df.to_sql('gp_ex_rights_log', self.engine, if_exists='append', index=False)
print("--- Ex-rights log saved successfully. ---")
except Exception as e:
print(f"!!! Error saving ex-rights log: {e}")
def fetch_single_stock_history(self, symbol, days=1800):
"""
获取单只股票的历史数据并保存到数据库
:param symbol: 股票代码
:param days: 获取的天数默认1800天
:return: 是否成功
"""
print(f"开始获取 {symbol} 最近 {days} 天的历史数据...")
begin = int(datetime.now().timestamp() * 1000)
data = self.fetch_daily_stock_data(symbol, begin, count=-days)
if data.get('error_code') == 0:
df = self.transform_data(data, symbol)
if df is not None and not df.empty:
df.to_sql('gp_day_data', self.engine, if_exists='append', index=False)
print(f"成功保存 {symbol} 的历史数据,共 {len(df)} 条记录")
return True
else:
print(f"未能转换 {symbol} 的数据")
return False
else:
print(f"获取 {symbol} 数据失败: {data.get('error_description')}")
return False
def fetch_and_check_ex_rights_optimized(self, date=None):
"""
优化版:一次遍历完成数据采集和除权检查
获取最近2天数据检查除权决定是更新当天数据还是重新获取历史数据
"""
if date is None:
start_date = datetime.now()
date_str = start_date.strftime('%Y-%m-%d')
else:
start_date = datetime.strptime(date, '%Y-%m-%d')
date_str = date
print(f"开始优化版数据采集和除权检查 - {date_str}")
# 删除今天的旧数据
# delete_query = text("DELETE FROM gp_day_data WHERE `timestamp` LIKE :date_str")
# with self.engine.begin() as conn:
# conn.execute(delete_query, {"date_str": f"{date_str}%"})
# print(f"已删除今日 {date_str} 的旧数据")
stock_codes = self.fetch_all_stock_codes()
begin = int(start_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000)
# 统计信息
normal_update_count = 0
ex_rights_count = 0
error_count = 0
skipped_count = 0 # 跳过的股票数量(停牌等原因)
ex_rights_log_data = []
normal_batch_data = []
for idx, symbol in enumerate(tqdm(stock_codes, desc=f"采集和检查除权 {date_str}")):
try:
# 获取最近2天的数据
data = self.fetch_daily_stock_data(symbol, begin, count=-2)
if data.get('error_code') != 0:
print(f"获取 {symbol} 数据失败: {data.get('error_description')}")
error_count += 1
continue
df = self.transform_data(data, symbol)
if df is None or df.empty:
print(f"转换 {symbol} 数据失败")
error_count += 1
continue
# 检查是否有足够的数据进行除权判断
if len(df) < 2:
# 只有一天数据,检查是否为今天的数据
if len(df) == 1:
latest_date = df.iloc[0]['timestamp'].strftime('%Y-%m-%d')
if latest_date == date_str:
# 是今天的数据,保存
normal_batch_data.append(df)
normal_update_count += 1
else:
# 不是今天的数据,可能停牌,跳过
print(f"股票 {symbol} 最新数据日期 {latest_date} 不是今天 {date_str},跳过")
skipped_count += 1
continue
# 按时间排序,确保最新的数据在最后
df_sorted = df.sort_values('timestamp')
# 获取昨天和今天的数据
latest_row = df_sorted.iloc[-1] # 最新一天(今天)
previous_row = df_sorted.iloc[-2] # 前一天(昨天)
# 检查最新一天是否为今天的数据
latest_date = latest_row['timestamp'].strftime('%Y-%m-%d')
if latest_date != date_str:
# 最新数据不是今天的,可能停牌,跳过
print(f"股票 {symbol} 最新数据日期 {latest_date} 不是今天 {date_str},跳过")
skipped_count += 1
continue
current_close = latest_row['close']
previous_close = previous_row['close']
# 查询数据库中该股票昨天的收盘价
yesterday_date = previous_row['timestamp'].strftime('%Y-%m-%d')
query = text("""
SELECT `close` FROM gp_day_data
WHERE symbol = :symbol AND DATE(`timestamp`) = :date
LIMIT 1
""")
with self.engine.connect() as conn:
result = conn.execute(query, {"symbol": symbol, "date": yesterday_date}).fetchone()
# 判断是否除权
is_ex_rights = False
if result:
db_previous_close = float(result[0])
# 比较API返回的昨日收盘价与数据库中的收盘价
if abs(db_previous_close - previous_close) > 0.001:
is_ex_rights = True
print(f"发现除权股票: {symbol}, 数据库昨收: {db_previous_close}, API昨收: {previous_close}")
# 记录除权日志
ex_rights_log_data.append({
'symbol': symbol,
'date': date_str,
'db_price': db_previous_close,
'api_price': previous_close,
'log_time': datetime.now()
})
if is_ex_rights:
# 除权处理删除历史数据重新获取1800天数据
delete_all_query = text("DELETE FROM gp_day_data WHERE symbol = :symbol")
with self.engine.begin() as conn:
conn.execute(delete_all_query, {"symbol": symbol})
# 重新获取1800天历史数据
success = self.fetch_single_stock_history(symbol, 1800)
if success:
ex_rights_count += 1
print(f"除权股票 {symbol} 历史数据重新获取成功")
else:
error_count += 1
print(f"除权股票 {symbol} 历史数据重新获取失败")
else:
# 正常更新:只保存今天的数据
today_data = df_sorted.tail(1) # 只取最新一天的数据
normal_batch_data.append(today_data)
normal_update_count += 1
# 批量保存正常更新的数据
if len(normal_batch_data) >= 100:
for batch_df in normal_batch_data:
batch_df.to_sql('gp_day_data', self.engine, if_exists='append', index=False)
normal_batch_data.clear()
gc.collect()
except Exception as e:
print(f"处理股票 {symbol} 时发生错误: {e}")
error_count += 1
continue
# 保存剩余的正常更新数据
if normal_batch_data:
for batch_df in normal_batch_data:
batch_df.to_sql('gp_day_data', self.engine, if_exists='append', index=False)
gc.collect()
# 保存除权日志
if ex_rights_log_data:
self.save_ex_rights_log(ex_rights_log_data)
# 输出统计信息
total_processed = normal_update_count + ex_rights_count + error_count + skipped_count
print(f"\n=== 采集完成统计 ===")
print(f"总处理股票数: {total_processed}")
print(f"正常更新: {normal_update_count}")
print(f"除权处理: {ex_rights_count}")
print(f"跳过股票: {skipped_count} (停牌等原因)")
print(f"错误处理: {error_count}")
print(f"除权日志: {len(ex_rights_log_data)}")
self.engine.dispose()
print(f"优化版数据采集和除权检查完成 - {date_str}")
def collect_stock_daily_data(db_url, date=None):
"""
原始版本:分两步执行(先采集,后检查除权)
"""
collector = StockDailyDataCollector(db_url)
collector.fetch_data_for_date(date)
collector.check_and_fix_ex_rights_data()
def collect_stock_daily_data_optimized(db_url, date=None):
"""
优化版本:一次遍历完成数据采集和除权检查
"""
collector = StockDailyDataCollector(db_url)
collector.fetch_and_check_ex_rights_optimized(date)
if __name__ == "__main__":
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
# --- 使用方式 ---
# 1. 【推荐】优化版:一次遍历完成数据采集和除权检查
collect_stock_daily_data_optimized(db_url)
# 2. 原始版本:分两步执行(先采集,后检查除权)
# collect_stock_daily_data(db_url)
# 3. 手动执行除权检查和数据修复
# collector = StockDailyDataCollector(db_url)
# collector.check_and_fix_ex_rights_data()
# 4. 单独获取某只股票的历史数据
# collector = StockDailyDataCollector(db_url)
# collector.fetch_single_stock_history('SH600000', 1800)