This commit is contained in:
满脸小星星 2025-09-22 12:04:42 +08:00
parent 326835967d
commit 4e4f4c8e4a
12 changed files with 2096 additions and 122 deletions

View File

@ -38,7 +38,6 @@ def get_database_config():
"""获取数据库配置"""
config = load_config()
print(config)
print("----------------------------------------------------------")
return config.get('database', {})
def get_redis_config():

View File

@ -314,6 +314,11 @@ class DatabaseManager:
if results:
# 更新现有持仓
current_quantity, current_cost_price = results[0]
# 确保类型转换,避免 Decimal 和 float 混合运算
current_cost_price = float(current_cost_price) if current_cost_price is not None else 0.0
price = float(price)
if is_buy:
# 买入:增加持仓
new_quantity = current_quantity + quantity_change
@ -324,7 +329,7 @@ class DatabaseManager:
else:
# 卖出:减少持仓
new_quantity = max(0, current_quantity - quantity_change)
new_cost_price = current_cost_price if new_quantity > 0 else 0
new_cost_price = current_cost_price if new_quantity > 0 else 0.0
if new_quantity > 0:
sql_update = """
@ -345,7 +350,7 @@ class DatabaseManager:
(stock_code, total_quantity, cost_price, create_time, update_time)
VALUES (%s, %s, %s, NOW(), NOW())
"""
self.execute_update(sql_insert, (stock_code, quantity_change, price))
self.execute_update(sql_insert, (stock_code, quantity_change, float(price)))
self.logger.info(f"持仓已更新: {stock_code} {'买入' if is_buy else '卖出'} {quantity_change}")
return True
@ -355,18 +360,23 @@ class DatabaseManager:
def insert_trading_log(self, log_data: Dict) -> bool:
"""插入交易日志"""
import json
sql = """
INSERT INTO trading_log
(order_id, stock_code, log_type, log_level, message, create_time)
VALUES (%s, %s, %s, %s, %s, %s)
(order_id, stock_code, log_type, log_level, message, extra_data, create_time)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
try:
# 将extra_data转换为JSON字符串
extra_data_json = json.dumps(log_data.get('extra_data'), ensure_ascii=False) if log_data.get('extra_data') else None
params = (
log_data.get('order_id'),
log_data.get('stock_code'),
log_data['log_type'],
log_data['log_level'],
log_data['message'],
extra_data_json,
log_data['create_time']
)
self.execute_update(sql, params)

View File

@ -87,36 +87,92 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback):
trade_direction = "买入" if is_buy else "卖出"
self.logger.info(f"识别交易方向: {trade_direction}")
# 更新内存和数据库持仓状态
update_position_in_memory(trade.stock_code, trade.traded_volume, is_buy, trade.traded_price, self.logger)
# 更新内存和数据库持仓状态确保价格类型为float
traded_price = float(trade.traded_price)
update_position_in_memory(trade.stock_code, trade.traded_volume, is_buy, traded_price, self.logger)
# 确保 order_id 是字符串类型
order_id_str = str(trade.order_id)
# 更新数据库订单状态
self.db_manager.update_order_status(
order_id_str,
'filled',
trade.traded_volume,
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
)
# 1. 先查询当前订单的已成交数量和委托数量
current_filled = self._get_current_filled_quantity(order_id_str)
order_quantity = self._get_order_quantity(order_id_str)
# 2. 计算新的累计成交数量
new_total_filled = current_filled + trade.traded_volume
# 3. 判断订单是否完全成交
is_order_completed = new_total_filled >= order_quantity
# 4. 更新数据库订单状态和成交数量
if is_order_completed:
# 完全成交
self.db_manager.update_order_status(
order_id_str,
'completed',
new_total_filled,
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
)
self.logger.info(f"订单完全成交: {order_id_str} 累计成交 {new_total_filled}/{order_quantity}")
# 从在途订单中移除
remove_pending_order(trade.stock_code, self.logger)
else:
# 部分成交
self.db_manager.update_order_status(
order_id_str,
'filled',
new_total_filled,
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
)
self.logger.info(f"订单部分成交: {order_id_str} 累计成交 {new_total_filled}/{order_quantity}")
# 5. 记录详细交易日志(包含成交信息)
trade_detail = {
'trade_id': getattr(trade, 'trade_id', ''),
'traded_price': traded_price,
'traded_volume': int(trade.traded_volume),
'traded_amount': float(traded_price * trade.traded_volume),
'trade_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'total_filled': new_total_filled,
'order_quantity': order_quantity,
'is_completed': is_order_completed,
'trade_direction': trade_direction
}
# 记录交易日志
log_data = {
'order_id': order_id_str,
'stock_code': trade.stock_code,
'log_type': 'trade_filled',
'log_level': 'INFO',
'message': f'{trade_direction}成交: {trade.stock_code} {trade.traded_volume}股 @ {trade.traded_price}',
'message': f'{trade_direction}成交: {trade.stock_code} {trade.traded_volume}股 @ {trade.traded_price}元 (累计: {new_total_filled}/{order_quantity})',
'extra_data': trade_detail,
'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
self.db_manager.insert_trading_log(log_data)
# 从在途订单中移除
remove_pending_order(trade.stock_code, self.logger)
except Exception as e:
self.logger.error(f"更新持仓状态失败: {str(e)}")
def _get_current_filled_quantity(self, order_id):
"""获取订单当前已成交数量"""
try:
sql = "SELECT COALESCE(filled_quantity, 0) FROM trading_order WHERE order_id = %s OR qmt_order_id = %s"
result = self.db_manager.execute_query(sql, (order_id, order_id))
return int(result[0][0]) if result else 0
except Exception as e:
self.logger.warning(f"获取已成交数量失败: {str(e)}")
return 0
def _get_order_quantity(self, order_id):
"""获取订单委托数量"""
try:
sql = "SELECT order_quantity FROM trading_order WHERE order_id = %s OR qmt_order_id = %s"
result = self.db_manager.execute_query(sql, (order_id, order_id))
return int(result[0][0]) if result else 0
except Exception as e:
self.logger.warning(f"获取委托数量失败: {str(e)}")
return 0
def on_order_error(self, order_error):
"""

View File

@ -25,6 +25,9 @@ from src.stock_analysis_v2 import run_backtest
# 导入PE/PB估值分析器
from src.valuation_analysis.pe_pb_analysis import ValuationAnalyzer
# 导入美股PE估值分析器
from src.quantitative_analysis.us_valuation_analyzer import us_valuation_analyzer
# 导入行业估值分析器
from src.valuation_analysis.industry_analysis import IndustryAnalyzer
@ -237,6 +240,27 @@ def run_stock_daily_collection2():
}), 200
@app.route('/scheduler/usStockDaily/collection', methods=['GET'])
def run_us_stock_daily_collection():
"""执行美股日线数据采集任务"""
try:
logger.info("开始执行美股日线数据采集")
# 获取当天日期
today = datetime.now().strftime('%Y-%m-%d')
# 定义数据库连接地址
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
# 导入美股采集函数
from src.quantitative_analysis.us_stock_daily_data_collector_v2 import collect_us_stock_daily_data_v2
collect_us_stock_daily_data_v2(db_url)
except Exception as e:
logger.error(f"启动美股日线数据采集任务失败: {str(e)}")
return jsonify({
"status": "success"
}), 200
@app.route('/scheduler/rzrq/collection', methods=['GET'])
def run_rzrq_initial_collection1():
"""执行融资融券数据更新采集 下午7点开始"""
@ -2436,6 +2460,141 @@ def get_stock_price_range():
"message": f"服务器错误: {str(e)}"
}), 500
@app.route('/api/us_stock/price_range', methods=['GET'])
def get_us_stock_price_range():
"""根据美股PE估值分位计算理论价格区间
根据当前PE的四分位数据反向计算出对应的理论股价区间
参数:
- stock_code: 必须美股代码 ( AAPL, GOOGL)
- start_date: 可选开始日期默认为一年前
返回内容:
{
"status": "success",
"data": {
"stock_code": "AAPL",
"stock_name": "Apple Inc.",
"current_price": 150.25,
"current_date": "2023-12-01",
"pe": {
"current": 25.5,
"q1": 22.3,
"q3": 28.7,
"q1_price": 131.4, // 对应PE为Q1时的理论股价
"q3_price": 169.1 // 对应PE为Q3时的理论股价
}
}
}
"""
try:
# 获取股票代码参数
stock_code = request.args.get('stock_code')
# 验证参数
if not stock_code:
return jsonify({
"status": "error",
"message": "缺少必要参数: stock_code"
}), 400
# 美股代码格式处理 (统一转换为大写)
stock_code = stock_code.strip().upper()
# 计算一年前的日期作为默认起始日期
default_start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
start_date = request.args.get('start_date', default_start_date)
# 通过美股估值分析器获取PE数据
pe_data = us_valuation_analyzer.get_us_historical_data(stock_code, start_date)
if pe_data.empty:
return jsonify({
"status": "error",
"message": f"未找到美股 {stock_code} 的历史数据"
}), 404
# 计算PE分位数
pe_percentiles = us_valuation_analyzer.calculate_us_percentiles(pe_data, 'pe')
if not pe_percentiles:
return jsonify({
"status": "error",
"message": f"无法计算美股 {stock_code} 的PE分位数"
}), 500
# 获取当前股价
current_price = None
current_date = None
if not pe_data.empty:
current_price_raw = pe_data.iloc[-1].get('close')
# 确保current_price是数值类型
try:
current_price = float(current_price_raw) if current_price_raw is not None else None
except (ValueError, TypeError):
current_price = None
current_date = pe_data.iloc[-1].get('timestamp').strftime('%Y-%m-%d') if 'timestamp' in pe_data.columns else None
if current_price is None:
return jsonify({
"status": "error",
"message": f"无法获取美股 {stock_code} 的当前股价"
}), 500
# 获取当前PE
current_pe = pe_percentiles.get('current')
# 获取PE的Q1和Q3
pe_q1 = pe_percentiles.get('q1')
pe_q3 = pe_percentiles.get('q3')
# 反向计算估值分位对应的股价
# 如果当前PE为X股价为Y则PE为Z时的理论股价 = Y * (X / Z)
# 计算PE对应的理论股价
pe_q1_price = None
pe_q3_price = None
if current_pe and current_pe > 0 and pe_q1 and pe_q3 and current_price:
try:
pe_q1_price = current_price * (pe_q1 / current_pe)
pe_q3_price = current_price * (pe_q3 / current_pe)
except (TypeError, ValueError) as e:
logger.error(f"计算理论股价时发生错误: {e}")
pe_q1_price = None
pe_q3_price = None
# 获取股票名称
stock_name = us_valuation_analyzer.get_us_stock_name(stock_code)
# 构建响应
response = {
"status": "success",
"data": {
"stock_code": stock_code,
"stock_name": stock_name,
"current_price": current_price,
"current_date": current_date,
"pe": {
"current": current_pe,
"q1": pe_q1,
"q3": pe_q3,
"q1_price": round(pe_q1_price, 2) if pe_q1_price is not None else None,
"q3_price": round(pe_q3_price, 2) if pe_q3_price is not None else None
}
}
}
return jsonify(response)
except Exception as e:
logger.error(f"计算美股价格区间异常: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器错误: {str(e)}"
}), 500
@app.route('/api/fear_greed/data', methods=['GET'])
def get_fear_greed_data():
"""获取恐贪指数数据

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
东方财富财务数据采集器 V2.0
东方财富财务数据采集器 V2.0--每次季度财报更新之后执行这个脚本
适配2025年新版接口
从东方财富网自动采集A股上市公司的财务报表数据包括

View File

@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
"""
科技主题基本面因子选股策略
科技主题基本面因子选股策略--这里就是入口--请执行这个文件
整合企业生命周期财务指标和平均距离因子分析
"""
@ -725,4 +725,4 @@ def main():
if __name__ == "__main__":
main()
main()

View File

@ -0,0 +1,293 @@
import requests
import pandas as pd
from datetime import datetime
import sys
import os
import redis
import json
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# 添加项目根目录到路径便于导入scripts.config
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(project_root)
# 读取雪球headers和Redis配置
try:
from src.scripts.config import XUEQIU_HEADERS
from src.valuation_analysis.config import REDIS_CONFIG
except ImportError:
XUEQIU_HEADERS = {
'User-Agent': 'Mozilla/5.0',
'Cookie': '', # 需要填写雪球cookie
}
REDIS_CONFIG = {
'host': 'localhost',
'port': 6379,
'db': 0,
'password': None
}
REDIS_KEY = 'xq_us_stock_changes_latest' # 存放美股行情的主键
# 条件导入代理管理器
proxy_manager = None
try:
from src.scripts.ProxyIP import EnhancedProxyManager
proxy_manager = EnhancedProxyManager()
except ImportError:
print("代理管理器导入失败,将使用直接请求模式")
def get_redis_conn():
"""获取Redis连接"""
pool = redis.ConnectionPool(
host=REDIS_CONFIG['host'],
port=REDIS_CONFIG['port'],
db=REDIS_CONFIG.get('db', 0),
password=REDIS_CONFIG.get('password', None),
decode_responses=True
)
return redis.Redis(connection_pool=pool)
def fetch_and_store_us_stock_data(page_size=90, max_workers=10, use_proxy=False):
"""
批量采集雪球美股股票的最新行情数据并保存到Redis
使用线程池并行请求提高采集效率
:param page_size: 每页采集数量
:param max_workers: 线程池最大工作线程数
:param use_proxy: 是否使用代理默认False
"""
base_url = 'https://stock.xueqiu.com/v5/stock/screener/quote/list.json'
headers = XUEQIU_HEADERS
all_data = []
data_lock = threading.Lock() # 线程安全锁
def fetch_page_data(page):
"""获取单页数据的函数"""
params = {
'page': page,
'size': page_size,
'order': 'desc',
'order_by': 'market_capital',
'market': 'US',
'type': 'us',
'is_delay': 'true'
}
try:
# 根据配置选择是否使用代理
if use_proxy and proxy_manager:
response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params)
else:
response = requests.get(base_url, headers=headers, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
page_data = data['data']['list']
# 线程安全地添加数据
with data_lock:
all_data.extend(page_data)
print(f"成功采集美股第 {page} 页数据,获取 {len(page_data)} 条记录")
return len(page_data)
else:
print(f"请求美股数据第 {page} 页失败,状态码:{response.status_code}")
return 0
except Exception as e:
print(f"请求美股数据第 {page} 页异常:{e}")
return 0
# 使用线程池并行采集数据
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
# 先获取总页数
params = {
'page': 1,
'size': page_size,
'order': 'desc',
'order_by': 'market_capital',
'market': 'US',
'type': 'us',
'is_delay': 'true'
}
try:
# 根据配置选择是否使用代理
if use_proxy and proxy_manager:
response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params)
else:
response = requests.get(base_url, headers=headers, params=params, timeout=10)
if response.status_code != 200:
print(f"请求美股数据失败,状态码:{response.status_code}")
return pd.DataFrame()
data = response.json()
total_count = data['data']['count']
total_pages = (total_count // page_size) + 1
print(f"开始采集美股数据,共 {total_pages} 页,总计 {total_count} 条记录")
# 提交所有页面的采集任务
for page in range(1, total_pages + 1):
future = executor.submit(fetch_page_data, page)
futures.append(future)
except Exception as e:
print(f"获取美股总页数失败:{e}")
return pd.DataFrame()
# 等待所有任务完成
print(f"正在并行采集美股数据,使用 {max_workers} 个线程...")
start_time = time.time()
completed_count = 0
for future in as_completed(futures):
completed_count += 1
try:
result = future.result()
if result > 0:
print(f"进度: {completed_count}/{len(futures)} 页完成")
except Exception as e:
print(f"采集任务异常:{e}")
end_time = time.time()
print(f"美股数据采集完成,耗时: {end_time - start_time:.2f}")
# 转换为 DataFrame
df = pd.DataFrame(all_data)
if not df.empty:
df['fetch_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 存入Redis使用hash结构key为symbolvalue为json字符串
r = get_redis_conn()
pipe = r.pipeline()
# 先清空旧数据
r.delete(REDIS_KEY)
print(f"正在将 {len(df)} 条美股记录写入Redis...")
for _, row in df.iterrows():
symbol = row.get('symbol')
if not symbol:
continue
# 只保留必要字段也可直接存row.to_dict()
value = row.to_dict()
pipe.hset(REDIS_KEY, symbol, json.dumps(value, ensure_ascii=False))
pipe.execute()
print(f"成功将美股数据写入Redis哈希 {REDIS_KEY},共{len(df)}条记录。")
# 返回DataFrame供其他脚本使用
return df
else:
print("未获取到任何美股数据。")
return pd.DataFrame()
def format_us_stock_code(stock_code):
"""
统一美股代码格式支持AAPLGOOGL等
返回雪球格式如AAPLGOOGL和Redis存储格式如AAPL
"""
stock_code = stock_code.upper()
# 美股代码通常直接返回
return stock_code, stock_code
def get_us_stock_realtime_info_from_redis(stock_code):
"""
根据美股代码从Redis查询实时行情并封装为指定结构
:param stock_code: 美股代码如AAPLGOOGL等
:return: dict or None
"""
_, redis_code = format_us_stock_code(stock_code)
r = get_redis_conn()
value = r.hget(REDIS_KEY, redis_code)
if not value:
return None
try:
data = json.loads(value)
except Exception:
return None
# 封装为指定结构
result = {
"code": None,
"crawlDate": None,
"marketValue": None,
"maxPrice": None,
"minPrice": None,
"nowPrice": None,
"pbRate": None,
"rangeRiseAndFall": None,
"shortName": None,
"todayStartPrice": None,
"ttm": None,
"turnoverRate": None,
"yesterdayEndPrice": None
}
# 赋值映射
result["code"] = data.get("symbol")
result["crawlDate"] = data.get("fetch_time")
result["marketValue"] = data.get("market_capital")
result["maxPrice"] = data.get("high") if "high" in data else data.get("high52w")
result["minPrice"] = data.get("low") if "low" in data else data.get("low52w")
result["nowPrice"] = data.get("current")
result["pbRate"] = data.get("pb")
result["rangeRiseAndFall"] = data.get("percent")
result["shortName"] = data.get("name")
result["todayStartPrice"] = data.get("open")
result["ttm"] = data.get("pe_ttm")
result["turnoverRate"] = data.get("turnover_rate")
result["yesterdayEndPrice"] = data.get("last_close") if "last_close" in data else data.get("pre_close")
# 兼容部分字段缺失
if result["maxPrice"] is None and "high" in data:
result["maxPrice"] = data["high"]
if result["minPrice"] is None and "low" in data:
result["minPrice"] = data["low"]
return result
def fetch_and_store_us_stock_data_optimized(page_size=90, max_workers=15, use_proxy=False):
"""
优化版本的美股批量采集函数支持更灵活的配置
:param page_size: 每页采集数量
:param max_workers: 线程池最大工作线程数建议10-20之间
:param use_proxy: 是否使用代理默认False
"""
print(f"开始批量采集美股数据...")
print(f"配置: 每页 {page_size} 条记录,最大线程数 {max_workers}")
print(f"代理模式: {'启用' if use_proxy else '禁用'}")
print(f"预计采集: 美股所有股票数据")
print("-" * 50)
try:
result = fetch_and_store_us_stock_data(page_size, max_workers, use_proxy)
if not result.empty:
print(f"美股采集完成!共获取 {len(result)} 只股票的数据")
print(f"数据已保存到Redis键: {REDIS_KEY}")
else:
print("美股采集完成,但未获取到数据")
except Exception as e:
print(f"美股采集过程中发生错误: {e}")
return None
return result
if __name__ == '__main__':
# 可以根据需要调整参数
# fetch_and_store_us_stock_data_optimized(page_size=100, max_workers=15, use_proxy=True)
fetch_and_store_us_stock_data_optimized(use_proxy=False) # 默认不使用代理

View File

@ -0,0 +1,472 @@
# coding:utf-8
import requests
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
from tqdm import tqdm
from src.scripts.config import XUEQIU_HEADERS
from src.scripts.ProxyIP import EnhancedProxyManager
import gc
class StockDailyDataCollector:
"""股票日线数据采集器类"""
def __init__(self, db_url):
self.engine = create_engine(
db_url,
pool_size=5,
max_overflow=10,
pool_recycle=3600
)
self.headers = XUEQIU_HEADERS
# 初始化代理管理器
self.proxy_manager = EnhancedProxyManager()
def fetch_all_stock_codes(self):
# 从us_code_all获取股票代码
query_all = "SELECT gp_code FROM gp_code_us"
df_all = pd.read_sql(query_all, self.engine)
codes_all = df_all['gp_code'].tolist()
# 合并去重
# all_codes = list(set(query_all))
print(f"获取到股票代码: {len(codes_all)} 个来自gp_code_us去重后共{len(codes_all)}")
return codes_all
def fetch_daily_stock_data(self, symbol, begin, count=-1):
"""获取日线数据count=-1表示最新一天-2表示最近两天-1800表示最近1800天"""
url = f"https://stock.xueqiu.com/v5/stock/chart/kline.json?symbol={symbol}&begin={begin}&period=day&type=before&count={count}&indicator=kline,pe,pb,ps,pcf,market_capital,agt,ggt,balance"
try:
# 使用代理管理器发送请求
# response = requests.get(url, headers=self.headers, timeout=20)
response = self.proxy_manager.request_with_proxy('get', url, headers=self.headers)
return response.json()
except Exception as e:
print(f"Request error for {symbol}: {e}")
return {'error_code': -1, 'error_description': str(e)}
def transform_data(self, data, symbol):
try:
items = data['data']['item']
columns = data['data']['column']
except KeyError as e:
print(f"KeyError for {symbol}: {e}")
return None
df = pd.DataFrame(items, columns=columns)
df['symbol'] = symbol
required_columns = ['timestamp', 'volume', 'open', 'high', 'low', 'close',
'chg', 'percent', 'turnoverrate', 'amount', 'symbol', 'pb', 'pe', 'ps']
existing_columns = [col for col in required_columns if col in df.columns]
df = df[existing_columns]
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai')
return df
def save_batch_to_database(self, batch):
if batch:
df_all = pd.concat(batch, ignore_index=True)
df_all.to_sql('us_day_data', self.engine, if_exists='append', index=False)
def fetch_data_for_date(self, date=None):
if date is None:
start_date = datetime.now()
date_str = start_date.strftime('%Y-%m-%d')
else:
start_date = datetime.strptime(date, '%Y-%m-%d')
date_str = date
# delete_query = text("DELETE FROM us_day_data WHERE `timestamp` LIKE :date_str")
# with self.engine.begin() as conn:
# conn.execute(delete_query, {"date_str": f"{date_str}%"})
stock_codes = self.fetch_all_stock_codes()
begin = int(start_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000)
batch_data = []
for idx, symbol in enumerate(tqdm(stock_codes, desc=f"Fetching and saving daily stock data for {date_str}")):
data = self.fetch_daily_stock_data(symbol, begin)
if data.get('error_code') == 0:
df = self.transform_data(data, symbol)
if df is not None:
batch_data.append(df)
else:
print(f"Error fetching data for {symbol} on {date_str}: {data.get('error_description')}")
if len(batch_data) >= 100:
self.save_batch_to_database(batch_data)
batch_data.clear()
gc.collect()
# Save remaining data
if batch_data:
self.save_batch_to_database(batch_data)
gc.collect()
self.engine.dispose()
print(f"Daily data fetching and saving completed for {date_str}.")
def delete_stock_history(self, symbol):
"""删除指定股票的全部历史数据"""
delete_query = text("DELETE FROM us_day_data WHERE symbol = :symbol")
try:
with self.engine.begin() as conn:
conn.execute(delete_query, {"symbol": symbol})
print(f"Deleted history for {symbol}")
return True
except Exception as e:
print(f"Error deleting history for {symbol}: {e}")
return False
def refetch_and_save_history(self, symbol, days=1800):
"""重新获取并保存指定股票的长期历史数据"""
print(f"Refetching last {days} days for {symbol}...")
begin = int(datetime.now().timestamp() * 1000)
data = self.fetch_daily_stock_data(symbol, begin, count=-days)
if data.get('error_code') == 0:
df = self.transform_data(data, symbol)
if df is not None and not df.empty:
self.save_batch_to_database([df])
print(f"Successfully refetched and saved history for {symbol}.")
else:
print(f"No data transformed for {symbol} after refetch.")
else:
print(f"Error refetching history for {symbol}: {data.get('error_description')}")
def check_and_fix_ex_rights_data(self):
"""
检查所有股票是否发生除权如果发生则删除历史数据并重新获取
新逻辑直接用API返回的上个交易日的时间戳去数据库查询更稳妥
记录除权日期股票代码
"""
all_codes = self.fetch_all_stock_codes()
ex_rights_log_data = []
print("--- Step 1: Checking for ex-rights stocks ---")
for symbol in tqdm(all_codes, desc="Comparing prices"):
# 1. 从API获取最近两天的日线数据
begin = int(datetime.now().timestamp() * 1000)
data = self.fetch_daily_stock_data(symbol, begin, count=-2)
api_timestamp_str = None
api_close = None
if data.get('error_code') == 0 and data.get('data', {}).get('item') and len(data['data']['item']) >= 2:
try:
# API返回的数据是按时间升序的[-2]是上个交易日
prev_day_data = data['data']['item'][-2]
columns = data['data']['column']
timestamp_index = columns.index('timestamp')
close_index = columns.index('close')
api_timestamp_ms = prev_day_data[timestamp_index]
api_close = prev_day_data[close_index]
# 将毫秒时间戳转换为'YYYY-MM-DD'格式,用于数据库查询
api_timestamp_str = pd.to_datetime(api_timestamp_ms, unit='ms', utc=True).tz_convert('Asia/Shanghai').strftime('%Y-%m-%d')
except (ValueError, IndexError, TypeError) as e:
print(f"\nError parsing API data for {symbol}: {e}")
continue # 处理下一只股票
else:
# 获取API数据失败或数据不足跳过此股票
continue
# 如果未能从API解析出上个交易日的数据则跳过
if api_timestamp_str is None or api_close is None:
continue
# 2. 根据API返回的时间戳从数据库查询当天的收盘价
db_close = None
query = text("SELECT `close` FROM us_day_data WHERE symbol = :symbol AND `timestamp` LIKE :date_str")
try:
with self.engine.connect() as conn:
result = conn.execute(query, {"symbol": symbol, "date_str": f"{api_timestamp_str}%"}).fetchone()
db_close = result[0] if result else None
except Exception as e:
print(f"\nError getting DB close for {symbol} on {api_timestamp_str}: {e}")
continue
# 3. 比较价格
if db_close is not None:
# 注意数据库中取出的db_close可能是Decimal类型需要转换
if not abs(float(db_close) - api_close) < 0.001:
print(f"\nEx-rights detected for {symbol} on {api_timestamp_str}: DB_close={db_close}, API_close={api_close}")
ex_rights_log_data.append({
'symbol': symbol,
'date': datetime.now().strftime('%Y-%m-%d'),
'db_price': float(db_close),
'api_price': api_close,
'log_time': datetime.now()
})
# 如果数据库当天没有数据,我们无法比较,所以不处理。
# 这可能是新股或之前采集失败,不属于除权范畴。
# 4. 对发生除权的股票进行记录和修复
if not ex_rights_log_data:
print("\n--- No ex-rights stocks found. Data is consistent. ---")
self.engine.dispose()
return
# 在修复前,先将日志保存到数据库
self.save_ex_rights_log(ex_rights_log_data)
# 从日志数据中提取出需要修复的股票代码列表
ex_rights_stocks = [item['symbol'] for item in ex_rights_log_data]
print(f"\n--- Step 2: Found {len(ex_rights_stocks)} stocks to fix: {ex_rights_stocks} ---")
for symbol in tqdm(ex_rights_stocks, desc="Fixing data"):
if self.delete_stock_history(symbol):
self.refetch_and_save_history(symbol, days=1800)
self.engine.dispose()
print("\n--- Ex-rights data fixing process completed. ---")
def save_ex_rights_log(self, log_data: list):
"""将除权日志保存到数据库"""
if not log_data:
return
print(f"--- Saving {len(log_data)} ex-rights events to log table... ---")
try:
df = pd.DataFrame(log_data)
# 确保列名与数据库字段匹配
df = df.rename(columns={
'symbol': 'stock_code',
'date': 'change_date',
'db_price': 'before_price',
'api_price': 'after_price',
'log_time': 'update_time'
})
df.to_sql('us_gp_ex_rights_log', self.engine, if_exists='append', index=False)
print("--- Ex-rights log saved successfully. ---")
except Exception as e:
print(f"!!! Error saving ex-rights log: {e}")
def fetch_single_stock_history(self, symbol, days=1800):
"""
获取单只股票的历史数据并保存到数据库
:param symbol: 股票代码
:param days: 获取的天数默认1800天
:return: 是否成功
"""
print(f"开始获取 {symbol} 最近 {days} 天的历史数据...")
begin = int(datetime.now().timestamp() * 1000)
data = self.fetch_daily_stock_data(symbol, begin, count=-days)
if data.get('error_code') == 0:
df = self.transform_data(data, symbol)
if df is not None and not df.empty:
df.to_sql('us_day_data', self.engine, if_exists='append', index=False)
print(f"成功保存 {symbol} 的历史数据,共 {len(df)} 条记录")
return True
else:
print(f"未能转换 {symbol} 的数据")
return False
else:
print(f"获取 {symbol} 数据失败: {data.get('error_description')}")
return False
def fetch_and_check_ex_rights_optimized(self, date=None):
"""
优化版一次遍历完成数据采集和除权检查
获取最近2天数据检查除权决定是更新当天数据还是重新获取历史数据
"""
if date is None:
start_date = datetime.now()
date_str = start_date.strftime('%Y-%m-%d')
else:
start_date = datetime.strptime(date, '%Y-%m-%d')
date_str = date
print(f"开始优化版数据采集和除权检查 - {date_str}")
# 删除今天的旧数据
delete_query = text("DELETE FROM us_day_data WHERE `timestamp` LIKE :date_str")
with self.engine.begin() as conn:
conn.execute(delete_query, {"date_str": f"{date_str}%"})
print(f"已删除今日 {date_str} 的旧数据")
stock_codes = self.fetch_all_stock_codes()
begin = int(start_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000)
# 统计信息
normal_update_count = 0
ex_rights_count = 0
error_count = 0
skipped_count = 0 # 跳过的股票数量(停牌等原因)
ex_rights_log_data = []
normal_batch_data = []
for idx, symbol in enumerate(tqdm(stock_codes, desc=f"采集和检查除权 {date_str}")):
try:
# 获取最近2天的数据
data = self.fetch_daily_stock_data(symbol, begin, count=-2)
if data.get('error_code') != 0:
print(f"获取 {symbol} 数据失败: {data.get('error_description')}")
error_count += 1
continue
df = self.transform_data(data, symbol)
if df is None or df.empty:
print(f"转换 {symbol} 数据失败")
error_count += 1
continue
# 检查是否有足够的数据进行除权判断
if len(df) < 2:
# 只有一天数据,检查是否为今天的数据
if len(df) == 1:
latest_date = df.iloc[0]['timestamp'].strftime('%Y-%m-%d')
if latest_date == date_str:
# 是今天的数据,保存
normal_batch_data.append(df)
normal_update_count += 1
else:
# 不是今天的数据,可能停牌,跳过
print(f"股票 {symbol} 最新数据日期 {latest_date} 不是今天 {date_str},跳过")
skipped_count += 1
continue
# 按时间排序,确保最新的数据在最后
df_sorted = df.sort_values('timestamp')
# 获取昨天和今天的数据
latest_row = df_sorted.iloc[-1] # 最新一天(今天)
previous_row = df_sorted.iloc[-2] # 前一天(昨天)
# 检查最新一天是否为今天的数据
latest_date = latest_row['timestamp'].strftime('%Y-%m-%d')
if latest_date != date_str:
# 最新数据不是今天的,可能停牌,跳过
print(f"股票 {symbol} 最新数据日期 {latest_date} 不是今天 {date_str},跳过")
skipped_count += 1
continue
current_close = latest_row['close']
previous_close = previous_row['close']
# 查询数据库中该股票昨天的收盘价
yesterday_date = previous_row['timestamp'].strftime('%Y-%m-%d')
query = text("""
SELECT `close` FROM us_day_data
WHERE symbol = :symbol AND DATE(`timestamp`) = :date
LIMIT 1
""")
with self.engine.connect() as conn:
result = conn.execute(query, {"symbol": symbol, "date": yesterday_date}).fetchone()
# 判断是否除权
is_ex_rights = False
if result:
db_previous_close = float(result[0])
# 比较API返回的昨日收盘价与数据库中的收盘价
if abs(db_previous_close - previous_close) > 0.001:
is_ex_rights = True
print(f"发现除权股票: {symbol}, 数据库昨收: {db_previous_close}, API昨收: {previous_close}")
# 记录除权日志
ex_rights_log_data.append({
'symbol': symbol,
'date': date_str,
'db_price': db_previous_close,
'api_price': previous_close,
'log_time': datetime.now()
})
if is_ex_rights:
# 除权处理删除历史数据重新获取1800天数据
delete_all_query = text("DELETE FROM us_day_data WHERE symbol = :symbol")
with self.engine.begin() as conn:
conn.execute(delete_all_query, {"symbol": symbol})
# 重新获取1800天历史数据
success = self.fetch_single_stock_history(symbol, 1800)
if success:
ex_rights_count += 1
print(f"除权股票 {symbol} 历史数据重新获取成功")
else:
error_count += 1
print(f"除权股票 {symbol} 历史数据重新获取失败")
else:
# 正常更新:只保存今天的数据
today_data = df_sorted.tail(1) # 只取最新一天的数据
normal_batch_data.append(today_data)
normal_update_count += 1
# 批量保存正常更新的数据
if len(normal_batch_data) >= 100:
for batch_df in normal_batch_data:
batch_df.to_sql('us_day_data', self.engine, if_exists='append', index=False)
normal_batch_data.clear()
gc.collect()
except Exception as e:
print(f"处理股票 {symbol} 时发生错误: {e}")
error_count += 1
continue
# 保存剩余的正常更新数据
if normal_batch_data:
for batch_df in normal_batch_data:
batch_df.to_sql('us_day_data', self.engine, if_exists='append', index=False)
gc.collect()
# 保存除权日志
if ex_rights_log_data:
self.save_ex_rights_log(ex_rights_log_data)
# 输出统计信息
total_processed = normal_update_count + ex_rights_count + error_count + skipped_count
print(f"\n=== 采集完成统计 ===")
print(f"总处理股票数: {total_processed}")
print(f"正常更新: {normal_update_count}")
print(f"除权处理: {ex_rights_count}")
print(f"跳过股票: {skipped_count} (停牌等原因)")
print(f"错误处理: {error_count}")
print(f"除权日志: {len(ex_rights_log_data)}")
self.engine.dispose()
print(f"优化版数据采集和除权检查完成 - {date_str}")
def collect_stock_daily_data(db_url, date=None):
"""
原始版本分两步执行先采集后检查除权
"""
collector = StockDailyDataCollector(db_url)
collector.fetch_data_for_date(date)
# collector.check_and_fix_ex_rights_data()
def collect_stock_daily_data_optimized(db_url, date=None):
"""
优化版本一次遍历完成数据采集和除权检查
"""
collector = StockDailyDataCollector(db_url)
collector.fetch_and_check_ex_rights_optimized(date)
if __name__ == "__main__":
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
# --- 使用方式 ---
# 1. 【推荐】优化版:一次遍历完成数据采集和除权检查
# collect_stock_daily_data_optimized(db_url)
# 2. 原始版本:分两步执行(先采集,后检查除权)
# collect_stock_daily_data(db_url)
# 3. 手动执行除权检查和数据修复
# collector = StockDailyDataCollector(db_url)
# collector.check_and_fix_ex_rights_data()
# 4. 单独获取某只股票的历史数据
collector = StockDailyDataCollector(db_url)
codes = collector.fetch_all_stock_codes()
for code in codes:
collector.fetch_single_stock_history(code, 1800)

View File

@ -0,0 +1,346 @@
# coding:utf-8
import requests
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
from tqdm import tqdm
import sys
import os
import gc
# 添加项目根目录到路径
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(project_root)
from src.scripts.config import XUEQIU_HEADERS
from src.scripts.ProxyIP import EnhancedProxyManager
from src.quantitative_analysis.us_batch_stock_price_collector import fetch_and_store_us_stock_data
from src.quantitative_analysis.us_stock_price_collector import USStockPriceCollector
from src.quantitative_analysis.us_stock_daily_data_collector import StockDailyDataCollector
class USStockDailyDataCollectorV2:
"""美股日线数据采集器V2版本 - 整合雪球和东方财富数据"""
def __init__(self, db_url):
self.engine = create_engine(
db_url,
pool_size=5,
max_overflow=10,
pool_recycle=3600
)
self.headers = XUEQIU_HEADERS
self.proxy_manager = EnhancedProxyManager()
# 创建东方财富美股数据采集器
self.eastmoney_collector = USStockPriceCollector(db_url)
# 创建原版采集器用于单只股票历史数据获取(如果支持美股的话)
self.original_collector = StockDailyDataCollector(db_url)
def convert_symbol_format(self, symbol):
"""
美股代码格式转换美股通常不需要格式转换
雪球格式AAPL -> 保持AAPL
"""
return symbol.upper()
def convert_eastmoney_to_xueqiu_format(self, stock_code):
"""
将东方财富格式的美股代码转换为雪球格式
东方财富格式AAPL -> 雪球格式AAPL
"""
return stock_code.upper()
def fetch_eastmoney_data(self):
"""获取东方财富的美股实时数据"""
print("正在获取东方财富美股数据...")
df = self.eastmoney_collector.fetch_all_data()
if not df.empty:
# 转换股票代码格式为雪球格式(美股通常不需要转换)
df['symbol'] = df['stock_code'].apply(self.convert_eastmoney_to_xueqiu_format)
print(f"成功获取东方财富美股数据,共 {len(df)} 条记录")
return df
def merge_data(self, xueqiu_df, eastmoney_df):
"""合并雪球和东方财富美股数据"""
print("正在合并雪球和东方财富美股数据...")
# 基于symbol进行合并
merged_df = pd.merge(
xueqiu_df,
eastmoney_df[['symbol', 'high_price', 'low_price', 'open_price', 'pre_close', 'list_date']],
on='symbol',
how='left'
)
print(f"美股数据合并完成,共 {len(merged_df)} 条记录")
return merged_df
def transform_to_us_day_data(self, merged_df):
"""将合并后的美股数据转换为us_day_data表结构"""
print("正在转换美股数据格式...")
# 创建符合us_day_data表结构的DataFrame
us_gp_day_df = pd.DataFrame()
# 映射字段
us_gp_day_df['symbol'] = merged_df['symbol']
# 将timestamp设置为当天的00:00:00格式
today = datetime.now().date()
us_gp_day_df['timestamp'] = pd.to_datetime(today)
us_gp_day_df['volume'] = merged_df['volume']
us_gp_day_df['open'] = merged_df['open_price']
us_gp_day_df['high'] = merged_df['high_price']
us_gp_day_df['low'] = merged_df['low_price']
us_gp_day_df['close'] = merged_df['current']
us_gp_day_df['chg'] = merged_df['chg']
us_gp_day_df['percent'] = merged_df['percent']
us_gp_day_df['turnoverrate'] = merged_df['turnover_rate']
us_gp_day_df['amount'] = merged_df['amount']
us_gp_day_df['pb'] = merged_df['pb']
us_gp_day_df['pe'] = merged_df['pe_ttm']
us_gp_day_df['ps'] = merged_df['ps']
# 添加pre_close字段用于除权检查
us_gp_day_df['pre_close'] = merged_df['pre_close']
print(f"美股数据转换完成,共 {len(us_gp_day_df)} 条记录")
return us_gp_day_df
def save_to_database(self, df):
"""保存美股数据到数据库"""
if df.empty:
print("没有美股数据需要保存")
return
print(f"正在保存美股数据到数据库,共 {len(df)} 条记录...")
# 删除今日数据
today_str = datetime.now().strftime('%Y-%m-%d')
delete_query = text("DELETE FROM us_day_data WHERE `timestamp` LIKE :date_str")
try:
with self.engine.begin() as conn:
conn.execute(delete_query, {"date_str": f"{today_str}%"})
print(f"已删除今日 {today_str} 的美股旧数据")
except Exception as e:
print(f"删除今日美股数据失败: {e}")
# 分批保存数据
batch_size = 1000
for i in range(0, len(df), batch_size):
batch = df.iloc[i:i+batch_size]
try:
batch.to_sql('us_day_data', self.engine, if_exists='append', index=False)
print(f"已保存第 {i//batch_size + 1} 批美股数据")
except Exception as e:
print(f"保存第 {i//batch_size + 1} 批美股数据失败: {e}")
print("美股数据保存完成")
def check_ex_rights_before_save(self, df):
"""在保存数据库之前检查美股除权情况,返回除权股票列表和除权日志数据"""
print("步骤5.1: 检查美股除权情况(保存前)...")
ex_rights_stocks = []
ex_rights_log_data = []
today_str = datetime.now().strftime('%Y-%m-%d')
for _, row in tqdm(df.iterrows(), total=len(df), desc="检查美股除权"):
symbol = row['symbol']
current_pre_close = row['pre_close']
# 如果pre_close为空跳过
if pd.isna(current_pre_close):
continue
# 查询数据库中该美股的最近两条收盘价记录
query = text("""
SELECT `close`, `timestamp` FROM us_day_data
WHERE symbol = :symbol
ORDER BY `timestamp` DESC
LIMIT 2
""")
try:
with self.engine.connect() as conn:
results = conn.execute(query, {"symbol": symbol}).fetchall()
if results:
# 检查最新记录是否为今天的数据
latest_record = results[0]
latest_timestamp = latest_record[1]
latest_date_str = latest_timestamp.strftime('%Y-%m-%d')
if latest_date_str == today_str and len(results) > 1:
# 如果最新记录是今天的,且有第二条记录,则用第二条记录比较
db_last_close = float(results[1][0])
else:
# 如果最新记录不是今天的,或者只有一条记录,则用最新记录比较
db_last_close = float(results[0][0])
# 比较pre_close和数据库中的收盘价
if abs(db_last_close - current_pre_close) > 0.001:
print(f"发现美股除权股票: {symbol}, 数据库收盘价: {db_last_close}, 当前昨收价: {current_pre_close}")
ex_rights_stocks.append(symbol)
# 收集除权日志数据
ex_rights_log_data.append({
'symbol': symbol,
'date': today_str,
'db_price': db_last_close,
'api_price': current_pre_close,
'log_time': datetime.now()
})
except Exception as e:
print(f"查询美股 {symbol} 历史数据失败: {e}")
continue
if ex_rights_stocks:
print(f"检测到 {len(ex_rights_stocks)} 只美股除权股票: {ex_rights_stocks}")
else:
print("未发现美股除权股票")
return ex_rights_stocks, ex_rights_log_data
def save_ex_rights_log(self, log_data: list):
"""将美股除权日志保存到数据库"""
if not log_data:
return
print(f"正在保存 {len(log_data)} 条美股除权日志到us_gp_ex_rights_log表...")
try:
df = pd.DataFrame(log_data)
# 确保列名与数据库字段匹配
df = df.rename(columns={
'symbol': 'stock_code',
'date': 'change_date',
'db_price': 'before_price',
'api_price': 'after_price',
'log_time': 'update_time'
})
df.to_sql('us_gp_ex_rights_log', self.engine, if_exists='append', index=False)
print("美股除权日志保存成功")
except Exception as e:
print(f"保存美股除权日志失败: {e}")
def fetch_single_stock_history(self, symbol, days=1800):
"""
获取单只美股的历史数据并保存到数据库
:param symbol: 美股代码
:param days: 获取的天数默认1800天
:return: 是否成功
"""
print(f"开始获取美股 {symbol} 最近 {days} 天的历史数据...")
begin = int(datetime.now().timestamp() * 1000)
# 使用原版采集器获取历史数据
data = self.original_collector.fetch_daily_stock_data(symbol, begin, count=-days)
if data.get('error_code') == 0:
df = self.original_collector.transform_data(data, symbol)
if df is not None and not df.empty:
df.to_sql('us_day_data', self.engine, if_exists='append', index=False)
print(f"成功保存美股 {symbol} 的历史数据,共 {len(df)} 条记录")
return True
else:
print(f"未能转换美股 {symbol} 的数据")
return False
else:
print(f"获取美股 {symbol} 数据失败: {data.get('error_description')}")
return False
def handle_ex_rights_stocks(self, ex_rights_stocks, ex_rights_log_data):
"""处理美股除权股票:保存日志、删除历史数据并重新获取历史数据"""
if not ex_rights_stocks:
return
print("步骤6: 处理美股除权股票...")
# 6.1 保存除权日志
if ex_rights_log_data:
self.save_ex_rights_log(ex_rights_log_data)
# 6.2 重新获取历史数据(注意:这里可能需要根据实际情况调整)
print(f"开始处理 {len(ex_rights_stocks)} 只美股除权股票,重新获取历史数据...")
for symbol in tqdm(ex_rights_stocks, desc="处理美股除权股票"):
try:
# 删除该美股的所有历史数据
delete_query = text("DELETE FROM us_day_data WHERE symbol = :symbol")
with self.engine.begin() as conn:
conn.execute(delete_query, {"symbol": symbol})
print(f"已删除美股 {symbol} 的历史数据")
# 重新获取1800天的历史数据
success = self.fetch_single_stock_history(symbol, 1800)
if success:
print(f"美股 {symbol} 历史数据重新获取成功")
else:
print(f"美股 {symbol} 历史数据重新获取失败")
except Exception as e:
print(f"处理美股除权股票 {symbol} 失败: {e}")
def run_daily_collection(self):
"""执行美股每日数据采集"""
print("=" * 60)
print("美股日线数据采集器V2 - 开始运行")
print("=" * 60)
try:
# 1. 获取雪球美股数据
print("步骤1: 获取雪球美股数据...")
xueqiu_df = fetch_and_store_us_stock_data()
if xueqiu_df.empty:
print("雪球美股数据获取失败,终止运行")
return
# 2. 获取东方财富美股数据
print("步骤2: 获取东方财富美股数据...")
eastmoney_df = self.fetch_eastmoney_data()
if eastmoney_df.empty:
print("东方财富美股数据获取失败,终止运行")
return
# 3. 合并数据
print("步骤3: 合并美股数据...")
merged_df = self.merge_data(xueqiu_df, eastmoney_df)
# 4. 转换数据格式
print("步骤4: 转换美股数据格式...")
us_gp_day_df = self.transform_to_us_day_data(merged_df)
# 5. 检查除权(保存前)
ex_rights_stocks, ex_rights_log_data = self.check_ex_rights_before_save(us_gp_day_df)
# 5.2. 保存到数据库
print("步骤5.2: 保存美股数据到数据库...")
self.save_to_database(us_gp_day_df)
# 6. 处理除权股票(保存后)
self.handle_ex_rights_stocks(ex_rights_stocks, ex_rights_log_data)
print("=" * 60)
print("美股日线数据采集完成")
print("=" * 60)
except Exception as e:
print(f"美股采集过程中发生错误: {e}")
finally:
# 清理资源
self.engine.dispose()
gc.collect()
def collect_us_stock_daily_data_v2(db_url):
"""V2版本的美股日线数据采集入口函数"""
collector = USStockDailyDataCollectorV2(db_url)
collector.run_daily_collection()
if __name__ == "__main__":
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
collect_us_stock_daily_data_v2(db_url)

View File

@ -0,0 +1,366 @@
"""
东方财富美股实时股价数据采集模块
提供从东方财富网站采集美股实时股价数据并存储到数据库的功能
功能包括
1. 采集美股实时股价数据
2. 存储数据到数据库
3. 定时自动更新数据
"""
import requests
import pandas as pd
import datetime
import logging
import time
import os
import sys
from pathlib import Path
from sqlalchemy import create_engine, text
from typing import Dict
# 添加项目根目录到Python路径
current_file = Path(__file__)
project_root = current_file.parent.parent.parent
sys.path.append(str(project_root))
from src.valuation_analysis.config import DB_URL, LOG_FILE
# 获取项目根目录
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 确保日志目录存在
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger("us_stock_price_collector")
def get_create_us_stock_table_sql() -> str:
"""
获取创建美股实时股价数据表的SQL语句
Returns:
创建表的SQL语句
"""
return """
CREATE TABLE IF NOT EXISTS us_stock_price_data (
stock_code VARCHAR(20) PRIMARY KEY COMMENT '美股代码',
stock_name VARCHAR(100) COMMENT '股票名称',
latest_price DECIMAL(10,2) COMMENT '最新价',
change_percent DECIMAL(10,2) COMMENT '涨跌幅',
change_amount DECIMAL(10,2) COMMENT '涨跌额',
volume BIGINT COMMENT '成交量(手)',
amount DECIMAL(20,2) COMMENT '成交额',
amplitude DECIMAL(10,2) COMMENT '振幅',
turnover_rate DECIMAL(10,2) COMMENT '换手率',
pe_ratio DECIMAL(10,2) COMMENT '市盈率',
high_price DECIMAL(10,2) COMMENT '最高价',
low_price DECIMAL(10,2) COMMENT '最低价',
open_price DECIMAL(10,2) COMMENT '开盘价',
pre_close DECIMAL(10,2) COMMENT '昨收价',
total_market_value DECIMAL(20,2) COMMENT '总市值',
float_market_value DECIMAL(20,2) COMMENT '流通市值',
pb_ratio DECIMAL(10,2) COMMENT '市净率',
list_date DATE COMMENT '上市日期',
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='美股实时股价数据表';
"""
class USStockPriceCollector:
"""东方财富美股实时股价数据采集器类"""
def __init__(self, db_url: str = DB_URL):
"""
初始化东方财富美股实时股价数据采集器
Args:
db_url: 数据库连接URL
"""
self.engine = create_engine(
db_url,
pool_size=5,
max_overflow=10,
pool_recycle=3600
)
self.base_url = "https://push2.eastmoney.com/api/qt/clist/get"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Origin": "https://quote.eastmoney.com",
"Referer": "https://quote.eastmoney.com/",
}
logger.info("东方财富美股实时股价数据采集器初始化完成")
def _ensure_table_exists(self) -> bool:
"""
确保数据表存在如果不存在则创建
Returns:
是否成功确保表存在
"""
try:
create_table_query = text(get_create_us_stock_table_sql())
with self.engine.connect() as conn:
conn.execute(create_table_query)
conn.commit()
logger.info("美股实时股价数据表创建成功")
return True
except Exception as e:
logger.error(f"确保美股数据表存在失败: {e}")
return False
def _convert_us_stock_code(self, code: str) -> str:
"""
转换美股代码格式保持原始格式
Args:
code: 原始股票代码
Returns:
转换后的股票代码
"""
# 美股代码通常直接返回,不需要添加后缀
return code.upper()
def _parse_list_date(self, date_str: str) -> datetime.date:
"""
解析上市日期
Args:
date_str: 日期字符串
Returns:
日期对象
"""
if not date_str or date_str == '-':
return None
try:
# 如果输入是整数,先转换为字符串
if isinstance(date_str, int):
date_str = str(date_str)
return datetime.datetime.strptime(date_str, "%Y%m%d").date()
except ValueError:
logger.warning(f"无法解析日期: {date_str}")
return None
def fetch_data(self, page: int = 1) -> pd.DataFrame:
"""
获取指定页码的美股实时股价数据
Args:
page: 页码
Returns:
包含实时股价数据的DataFrame
"""
try:
# 美股数据参数根据你提供的示例URL调整
params = {
"np": 1,
"fltt": 1, # 美股使用1
"invt": 2,
"fs": "m:105,m:106,m:107", # 美股市场标识
"fid": "f12",
"pn": page,
"pz": 100,
"po": 1, # 美股使用1
"dect": 1
}
logger.info(f"开始获取美股第 {page} 页数据")
response = requests.get(self.base_url, params=params, headers=self.headers)
if response.status_code != 200:
logger.error(f"获取美股第 {page} 页数据失败: HTTP {response.status_code}")
return pd.DataFrame()
data = response.json()
if not data.get("rc") == 0:
logger.error(f"获取美股数据失败: {data.get('message', '未知错误')}")
return pd.DataFrame()
# 提取数据列表
items = data.get("data", {}).get("diff", [])
if not items:
logger.warning(f"美股第 {page} 页未找到有效数据")
return pd.DataFrame()
# 转换为DataFrame
df = pd.DataFrame(items)
# 重命名列
column_mapping = {
"f12": "stock_code",
"f14": "stock_name",
"f2": "latest_price",
"f3": "change_percent",
"f4": "change_amount",
"f5": "volume",
"f6": "amount",
"f7": "amplitude",
"f8": "turnover_rate",
"f9": "pe_ratio",
"f15": "high_price",
"f16": "low_price",
"f17": "open_price",
"f18": "pre_close",
"f20": "total_market_value",
"f21": "float_market_value",
"f23": "pb_ratio",
"f26": "list_date"
}
df = df.rename(columns=column_mapping)
# 转换股票代码格式
df['stock_code'] = df['stock_code'].apply(self._convert_us_stock_code)
# 转换上市日期
df['list_date'] = df['list_date'].apply(self._parse_list_date)
# 转换价格字段东方财富API返回的价格需要除以1000转换为美元
price_columns = ['latest_price', 'change_amount', 'high_price', 'low_price', 'open_price', 'pre_close']
for col in price_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce') / 1000.0
# 转换市值字段市值字段也可能需要除以100
market_value_columns = ['total_market_value', 'float_market_value']
for col in market_value_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce') / 1000.0
logger.info(f"美股第 {page} 页数据获取成功,包含 {len(df)} 条记录")
return df
except Exception as e:
logger.error(f"获取美股第 {page} 页数据失败: {e}")
return pd.DataFrame()
def fetch_all_data(self) -> pd.DataFrame:
"""
获取所有页的美股实时股价数据
Returns:
包含所有实时股价数据的DataFrame
"""
all_data = []
page = 1
while True:
page_data = self.fetch_data(page)
if page_data.empty:
logger.info(f"美股第 {page} 页数据为空,停止采集")
break
all_data.append(page_data)
# 如果返回的数据少于100条说明是最后一页
if len(page_data) < 100:
break
page += 1
# 添加延迟,避免请求过于频繁
time.sleep(1)
if all_data:
combined_df = pd.concat(all_data, ignore_index=True)
logger.info(f"美股数据采集完成,共采集 {len(combined_df)} 条记录")
return combined_df
else:
logger.warning("未获取到任何有效美股数据")
return pd.DataFrame()
def save_to_database(self, data: pd.DataFrame) -> bool:
"""
将数据保存到数据库
Args:
data: 要保存的数据DataFrame
Returns:
是否成功保存数据
"""
if data.empty:
logger.warning("没有美股数据需要保存")
return False
try:
# 确保数据表存在
if not self._ensure_table_exists():
return False
# 清理数据
data = data.replace('-', None)
# 将nan值转换为None在SQL中会变成NULL
data = data.replace({pd.NA: None, pd.NaT: None})
data = data.where(pd.notnull(data), None)
# 保存数据到数据库
data.to_sql('us_stock_price_data', self.engine, if_exists='replace', index=False)
logger.info(f"成功保存 {len(data)} 条美股数据到数据库")
return True
except Exception as e:
logger.error(f"保存美股数据到数据库失败: {e}")
return False
def run_collection(self) -> bool:
"""
运行完整的美股数据采集流程
Returns:
是否成功完成采集
"""
try:
logger.info("开始美股数据采集流程")
# 获取所有数据
all_data = self.fetch_all_data()
if all_data.empty:
logger.warning("未获取到任何美股数据")
return False
# 保存到数据库
if self.save_to_database(all_data):
logger.info("美股数据采集流程完成")
return True
else:
logger.error("美股数据保存失败")
return False
except Exception as e:
logger.error(f"美股数据采集流程失败: {e}")
return False
def main():
"""主函数,用于测试美股数据采集"""
collector = USStockPriceCollector()
success = collector.run_collection()
if success:
print("美股数据采集成功完成")
else:
print("美股数据采集失败")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,373 @@
"""
美股PE估值分析模块
提供美股历史PE分位数分析功能包括
1. 美股历史PE数据获取
2. 分位数计算
3. 可视化展示
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sqlalchemy import create_engine, text
import datetime
import logging
from typing import Tuple, Dict, List, Optional, Union
import os
import matplotlib.dates as mdates
from matplotlib.ticker import FuncFormatter
from pathlib import Path
# 导入配置
try:
from src.valuation_analysis.config import DB_URL, OUTPUT_DIR, LOG_FILE
except ImportError:
# 如果导入失败,使用默认配置
DB_URL = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
OUTPUT_DIR = 'results/valuation_analysis'
LOG_FILE = 'logs/us_valuation_analysis.log'
# 确保输出目录存在
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger("us_valuation_analysis")
# 设置matplotlib中文字体
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
class USValuationAnalyzer:
"""美股估值分析器类"""
def __init__(self, db_url: str = DB_URL):
"""
初始化美股估值分析器
Args:
db_url: 数据库连接URL
"""
self.engine = create_engine(
db_url,
pool_size=5,
max_overflow=10,
pool_recycle=3600
)
logger.info("美股估值分析器初始化完成")
def get_us_stock_name(self, stock_code: str) -> str:
"""
根据美股代码获取股票名称
Args:
stock_code: 美股代码 ( AAPL, GOOGL)
Returns:
股票名称
"""
try:
# 查询数据库获取美股名称
query = text("""
SELECT DISTINCT stock_name
FROM us_stock_price_data
WHERE stock_code = :stock_code
LIMIT 1
""")
with self.engine.connect() as conn:
result = conn.execute(query, {"stock_code": stock_code}).fetchone()
if result:
return result[0]
else:
# 如果实时数据表中没有,尝试从日线数据表获取
query2 = text("""
SELECT DISTINCT symbol
FROM us_day_data
WHERE symbol = :stock_code
LIMIT 1
""")
with self.engine.connect() as conn:
result2 = conn.execute(query2, {"stock_code": stock_code}).fetchone()
if result2:
return stock_code # 返回股票代码作为名称
else:
logger.warning(f"未找到美股 {stock_code} 的名称信息")
return stock_code
except Exception as e:
logger.error(f"获取美股名称失败: {e}")
return stock_code
def get_us_historical_data(self, stock_code: str, start_date: str = '2018-01-01') -> pd.DataFrame:
"""
获取美股的历史PE数据
Args:
stock_code: 美股代码 ( AAPL, GOOGL)
start_date: 开始日期默认为2018-01-01
Returns:
包含历史PE和价格数据的DataFrame
"""
try:
# 美股数据查询只包含PE字段不包含PB
query = text("""
SELECT
`timestamp`, `close`, `pe`
FROM
us_day_data
WHERE
symbol = :symbol AND
`timestamp` >= :start_date AND
pe > 0 AND
pe IS NOT NULL
ORDER BY
`timestamp` ASC
""")
with self.engine.connect() as conn:
df = pd.read_sql(query, conn, params={"symbol": stock_code, "start_date": start_date})
if df.empty:
logger.warning(f"未找到美股 {stock_code} 的历史数据")
return df
# 转换时间戳列
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 确保数值列是正确类型
df['close'] = pd.to_numeric(df['close'], errors='coerce')
df['pe'] = pd.to_numeric(df['pe'], errors='coerce')
# 过滤异常值
df = self._filter_extreme_values(df, 'pe')
logger.info(f"获取美股 {stock_code} 历史数据成功,共 {len(df)} 条记录")
return df
except Exception as e:
logger.error(f"获取美股历史数据失败: {e}")
return pd.DataFrame()
def _filter_extreme_values(self, data: pd.DataFrame, metric: str) -> pd.DataFrame:
"""
过滤极端值
Args:
data: 数据DataFrame
metric: 指标名称
Returns:
过滤后的DataFrame
"""
if data.empty or metric not in data.columns:
return data
# 过滤负值和异常大的值
if metric == 'pe':
# PE通常在0-100之间过滤掉大于1000的异常值
data = data[(data[metric] > 0) & (data[metric] <= 1000)]
elif metric == 'pb':
# PB通常在0-50之间过滤掉大于100的异常值
data = data[(data[metric] > 0) & (data[metric] <= 100)]
# 创建过滤后的列
data[f'{metric}_filtered'] = data[metric]
return data
def calculate_us_percentiles(self, data: pd.DataFrame, metric: str = 'pe') -> Dict:
"""
计算美股估值指标的分位数
Args:
data: 历史数据DataFrame
metric: 估值指标只支持pe
Returns:
包含分位数信息的字典
"""
if data.empty:
logger.warning(f"数据为空,无法计算{metric}分位数")
return {}
# 美股只支持PE分析
if metric != 'pe':
logger.warning(f"美股估值分析只支持PE指标不支持{metric}")
return {}
# 使用过滤后的数据计算分位数
metric_filtered = f'{metric}_filtered'
if metric_filtered not in data.columns:
metric_filtered = metric
# 计算各种分位数
percentiles = {
'min': data[metric_filtered].min(),
'max': data[metric_filtered].max(),
'mean': data[metric_filtered].mean(),
'median': data[metric_filtered].median(),
'q1': data[metric_filtered].quantile(0.25),
'q3': data[metric_filtered].quantile(0.75),
'p10': data[metric_filtered].quantile(0.1),
'p90': data[metric_filtered].quantile(0.9),
'p5': data[metric_filtered].quantile(0.05),
'p95': data[metric_filtered].quantile(0.95)
}
# 获取当前值(最新值)
if not data.empty:
percentiles['current'] = data[metric_filtered].iloc[-1]
else:
percentiles['current'] = None
# 计算当前值在历史分位数中的位置
if percentiles['current'] is not None:
current_value = percentiles['current']
sorted_values = sorted(data[metric_filtered].dropna())
if sorted_values:
percentile_rank = (np.searchsorted(sorted_values, current_value) / len(sorted_values)) * 100
percentiles['current_percentile'] = percentile_rank
else:
percentiles['current_percentile'] = None
else:
percentiles['current_percentile'] = None
logger.info(f"美股{metric}分位数计算完成")
return percentiles
def get_us_industry_avg_data(self, industry_name: str, start_date: str, metric: str = 'pe') -> Optional[pd.DataFrame]:
"""
获取美股行业平均数据暂不支持返回None
Args:
industry_name: 行业名称
start_date: 开始日期
metric: 估值指标
Returns:
None (美股暂不支持行业分析)
"""
logger.info("美股暂不支持行业平均数据分析")
return None
def get_us_concept_avg_data(self, concept_name: str, start_date: str, metric: str = 'pe') -> Optional[pd.DataFrame]:
"""
获取美股概念板块平均数据暂不支持返回None
Args:
concept_name: 概念板块名称
start_date: 开始日期
metric: 估值指标
Returns:
None (美股暂不支持概念板块分析)
"""
logger.info("美股暂不支持概念板块平均数据分析")
return None
def create_us_valuation_chart(self, stock_code: str, start_date: str = '2018-01-01',
metric: str = 'pe', save_path: Optional[str] = None) -> str:
"""
创建美股估值分析图表
Args:
stock_code: 美股代码
start_date: 开始日期
metric: 估值指标只支持pe
save_path: 保存路径
Returns:
图表文件路径
"""
try:
# 获取历史数据
data = self.get_us_historical_data(stock_code, start_date)
if data.empty:
raise ValueError(f"未找到美股 {stock_code} 的历史数据")
# 计算分位数
percentiles = self.calculate_us_percentiles(data, metric)
if not percentiles:
raise ValueError(f"无法计算美股 {stock_code}{metric}分位数")
# 获取股票名称
stock_name = self.get_us_stock_name(stock_code)
# 创建图表
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 12))
# 上图:股价走势
ax1.plot(data['timestamp'], data['close'], linewidth=2, color='blue', label='股价')
ax1.set_title(f'{stock_name} ({stock_code}) 股价走势', fontsize=16, fontweight='bold')
ax1.set_ylabel('股价 (USD)', fontsize=12)
ax1.legend()
ax1.grid(True, alpha=0.3)
# 下图PE估值分析
metric_filtered = f'{metric}_filtered' if f'{metric}_filtered' in data.columns else metric
# 绘制PE曲线
ax2.plot(data['timestamp'], data[metric_filtered], linewidth=2, color='red', label=f'{metric.upper()}估值')
# 绘制分位数线
ax2.axhline(y=percentiles['p95'], color='purple', linestyle='--', alpha=0.7, label='95%分位线')
ax2.axhline(y=percentiles['q3'], color='orange', linestyle='--', alpha=0.7, label='75%分位线')
ax2.axhline(y=percentiles['median'], color='green', linestyle='-', alpha=0.7, label='中位数')
ax2.axhline(y=percentiles['q1'], color='orange', linestyle='--', alpha=0.7, label='25%分位线')
ax2.axhline(y=percentiles['p5'], color='purple', linestyle='--', alpha=0.7, label='5%分位线')
# 标记当前值
if percentiles['current'] is not None:
current_date = data['timestamp'].iloc[-1]
ax2.scatter([current_date], [percentiles['current']], color='red', s=100, zorder=5)
ax2.annotate(f'当前{metric.upper()}: {percentiles["current"]:.2f}',
xy=(current_date, percentiles['current']),
xytext=(10, 10), textcoords='offset points',
bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.7),
arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0'))
ax2.set_title(f'{stock_name} ({stock_code}) {metric.upper()}估值分析', fontsize=16, fontweight='bold')
ax2.set_xlabel('日期', fontsize=12)
ax2.set_ylabel(f'{metric.upper()}', fontsize=12)
ax2.legend()
ax2.grid(True, alpha=0.3)
# 格式化x轴日期
for ax in [ax1, ax2]:
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
ax.xaxis.set_major_locator(mdates.MonthLocator(interval=6))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
plt.tight_layout()
# 保存图表
if save_path is None:
save_path = os.path.join(OUTPUT_DIR, f'{stock_code}_{metric}_analysis_{datetime.datetime.now().strftime("%Y%m%d")}.png')
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.close()
logger.info(f"美股估值分析图表保存成功: {save_path}")
return save_path
except Exception as e:
logger.error(f"创建美股估值分析图表失败: {e}")
raise
# 创建全局实例
us_valuation_analyzer = USValuationAnalyzer()

View File

@ -1,100 +0,0 @@
# coding:utf-8
# 更新港股列表的代码
import requests
import pandas as pd
from sqlalchemy import create_engine, text
import sys
import os
# 将项目根目录添加到Python路径以便导入config
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.scripts.config import XUEQIU_HEADERS
def collect_us_stock_codes(db_url):
"""
采集雪球港股列表数据并存储到数据库
"""
engine = create_engine(db_url)
headers = XUEQIU_HEADERS
base_url = "https://stock.xueqiu.com/v5/stock/screener/quote/list.json"
page = 1
page_size = 90
all_data = []
print("--- Starting to collect Hong Kong stock codes ---")
# 采集前先清空表
try:
with engine.begin() as conn:
conn.execute(text("TRUNCATE TABLE gp_code_us"))
print("Table `gp_code_us` has been truncated.")
except Exception as e:
print(f"Error truncating table `gp_code_us`: {e}")
return
while True:
params = {
'page': page,
'size': page_size,
'order': 'desc',
'order_by': 'market_capital',
'market': 'US',
'type': 'us',
'is_delay': 'true'
}
print(f"Fetching page {page}...")
try:
response = requests.get(base_url, headers=headers, params=params, timeout=20)
if response.status_code != 200:
print(f"Request failed with status code {response.status_code}")
break
data = response.json()
if data.get('error_code') != 0:
print(f"API error: {data.get('error_description')}")
break
stock_list = data.get('data', {}).get('list', [])
if not stock_list:
print("No more data found. Collection finished.")
break
all_data.extend(stock_list)
# 如果获取到的数据少于每页数量,说明是最后一页
if len(stock_list) < page_size:
print("Reached the last page. Collection finished.")
break
page += 1
except requests.exceptions.RequestException as e:
print(f"Request exception on page {page}: {e}")
break
if all_data:
print(f"--- Collected a total of {len(all_data)} stocks. Preparing to save to database. ---")
df = pd.DataFrame(all_data)
# 数据映射和转换
df_to_save = pd.DataFrame()
df_to_save['gp_name'] = df['name']
df_to_save['gp_code'] = df['symbol']
df_to_save['gp_code_two'] = 'US.' + df['symbol'].astype(str)
df_to_save['market_cap'] = df['market_capital']
try:
df_to_save.to_sql('gp_code_us', engine, if_exists='append', index=False)
print("--- Successfully saved all data to `gp_code_us`. ---")
except Exception as e:
print(f"Error saving data to database: {e}")
else:
print("--- No data collected. ---")
engine.dispose()
if __name__ == "__main__":
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
collect_us_stock_codes(db_url)