diff --git a/src/QMT/config_loader.py b/src/QMT/config_loader.py index 062cf79..99dd77a 100644 --- a/src/QMT/config_loader.py +++ b/src/QMT/config_loader.py @@ -38,7 +38,6 @@ def get_database_config(): """获取数据库配置""" config = load_config() print(config) - print("----------------------------------------------------------") return config.get('database', {}) def get_redis_config(): diff --git a/src/QMT/database_manager.py b/src/QMT/database_manager.py index 32f4347..b987163 100644 --- a/src/QMT/database_manager.py +++ b/src/QMT/database_manager.py @@ -314,6 +314,11 @@ class DatabaseManager: if results: # 更新现有持仓 current_quantity, current_cost_price = results[0] + + # 确保类型转换,避免 Decimal 和 float 混合运算 + current_cost_price = float(current_cost_price) if current_cost_price is not None else 0.0 + price = float(price) + if is_buy: # 买入:增加持仓 new_quantity = current_quantity + quantity_change @@ -324,7 +329,7 @@ class DatabaseManager: else: # 卖出:减少持仓 new_quantity = max(0, current_quantity - quantity_change) - new_cost_price = current_cost_price if new_quantity > 0 else 0 + new_cost_price = current_cost_price if new_quantity > 0 else 0.0 if new_quantity > 0: sql_update = """ @@ -345,7 +350,7 @@ class DatabaseManager: (stock_code, total_quantity, cost_price, create_time, update_time) VALUES (%s, %s, %s, NOW(), NOW()) """ - self.execute_update(sql_insert, (stock_code, quantity_change, price)) + self.execute_update(sql_insert, (stock_code, quantity_change, float(price))) self.logger.info(f"持仓已更新: {stock_code} {'买入' if is_buy else '卖出'} {quantity_change}股") return True @@ -355,18 +360,23 @@ class DatabaseManager: def insert_trading_log(self, log_data: Dict) -> bool: """插入交易日志""" + import json sql = """ INSERT INTO trading_log - (order_id, stock_code, log_type, log_level, message, create_time) - VALUES (%s, %s, %s, %s, %s, %s) + (order_id, stock_code, log_type, log_level, message, extra_data, create_time) + VALUES (%s, %s, %s, %s, %s, %s, %s) """ try: + # 将extra_data转换为JSON字符串 + extra_data_json = json.dumps(log_data.get('extra_data'), ensure_ascii=False) if log_data.get('extra_data') else None + params = ( log_data.get('order_id'), log_data.get('stock_code'), log_data['log_type'], log_data['log_level'], log_data['message'], + extra_data_json, log_data['create_time'] ) self.execute_update(sql, params) diff --git a/src/QMT/trader_callback.py b/src/QMT/trader_callback.py index 50245b2..39427e3 100644 --- a/src/QMT/trader_callback.py +++ b/src/QMT/trader_callback.py @@ -87,36 +87,92 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): trade_direction = "买入" if is_buy else "卖出" self.logger.info(f"识别交易方向: {trade_direction}") - # 更新内存和数据库持仓状态 - update_position_in_memory(trade.stock_code, trade.traded_volume, is_buy, trade.traded_price, self.logger) + # 更新内存和数据库持仓状态(确保价格类型为float) + traded_price = float(trade.traded_price) + update_position_in_memory(trade.stock_code, trade.traded_volume, is_buy, traded_price, self.logger) # 确保 order_id 是字符串类型 order_id_str = str(trade.order_id) - # 更新数据库订单状态 - self.db_manager.update_order_status( - order_id_str, - 'filled', - trade.traded_volume, - datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') - ) + # 1. 先查询当前订单的已成交数量和委托数量 + current_filled = self._get_current_filled_quantity(order_id_str) + order_quantity = self._get_order_quantity(order_id_str) + + # 2. 计算新的累计成交数量 + new_total_filled = current_filled + trade.traded_volume + + # 3. 判断订单是否完全成交 + is_order_completed = new_total_filled >= order_quantity + + # 4. 更新数据库订单状态和成交数量 + if is_order_completed: + # 完全成交 + self.db_manager.update_order_status( + order_id_str, + 'completed', + new_total_filled, + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + ) + self.logger.info(f"订单完全成交: {order_id_str} 累计成交 {new_total_filled}/{order_quantity}") + + # 从在途订单中移除 + remove_pending_order(trade.stock_code, self.logger) + else: + # 部分成交 + self.db_manager.update_order_status( + order_id_str, + 'filled', + new_total_filled, + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + ) + self.logger.info(f"订单部分成交: {order_id_str} 累计成交 {new_total_filled}/{order_quantity}") + + # 5. 记录详细交易日志(包含成交信息) + trade_detail = { + 'trade_id': getattr(trade, 'trade_id', ''), + 'traded_price': traded_price, + 'traded_volume': int(trade.traded_volume), + 'traded_amount': float(traded_price * trade.traded_volume), + 'trade_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'total_filled': new_total_filled, + 'order_quantity': order_quantity, + 'is_completed': is_order_completed, + 'trade_direction': trade_direction + } - # 记录交易日志 log_data = { 'order_id': order_id_str, 'stock_code': trade.stock_code, 'log_type': 'trade_filled', 'log_level': 'INFO', - 'message': f'{trade_direction}成交: {trade.stock_code} {trade.traded_volume}股 @ {trade.traded_price}元', + 'message': f'{trade_direction}成交: {trade.stock_code} {trade.traded_volume}股 @ {trade.traded_price}元 (累计: {new_total_filled}/{order_quantity})', + 'extra_data': trade_detail, 'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') } self.db_manager.insert_trading_log(log_data) - # 从在途订单中移除 - remove_pending_order(trade.stock_code, self.logger) - except Exception as e: self.logger.error(f"更新持仓状态失败: {str(e)}") + + def _get_current_filled_quantity(self, order_id): + """获取订单当前已成交数量""" + try: + sql = "SELECT COALESCE(filled_quantity, 0) FROM trading_order WHERE order_id = %s OR qmt_order_id = %s" + result = self.db_manager.execute_query(sql, (order_id, order_id)) + return int(result[0][0]) if result else 0 + except Exception as e: + self.logger.warning(f"获取已成交数量失败: {str(e)}") + return 0 + + def _get_order_quantity(self, order_id): + """获取订单委托数量""" + try: + sql = "SELECT order_quantity FROM trading_order WHERE order_id = %s OR qmt_order_id = %s" + result = self.db_manager.execute_query(sql, (order_id, order_id)) + return int(result[0][0]) if result else 0 + except Exception as e: + self.logger.warning(f"获取委托数量失败: {str(e)}") + return 0 def on_order_error(self, order_error): """ diff --git a/src/app.py b/src/app.py index 759ec8c..ae1288b 100644 --- a/src/app.py +++ b/src/app.py @@ -25,6 +25,9 @@ from src.stock_analysis_v2 import run_backtest # 导入PE/PB估值分析器 from src.valuation_analysis.pe_pb_analysis import ValuationAnalyzer +# 导入美股PE估值分析器 +from src.quantitative_analysis.us_valuation_analyzer import us_valuation_analyzer + # 导入行业估值分析器 from src.valuation_analysis.industry_analysis import IndustryAnalyzer @@ -237,6 +240,27 @@ def run_stock_daily_collection2(): }), 200 +@app.route('/scheduler/usStockDaily/collection', methods=['GET']) +def run_us_stock_daily_collection(): + """执行美股日线数据采集任务""" + try: + logger.info("开始执行美股日线数据采集") + # 获取当天日期 + today = datetime.now().strftime('%Y-%m-%d') + + # 定义数据库连接地址 + db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj' + + # 导入美股采集函数 + from src.quantitative_analysis.us_stock_daily_data_collector_v2 import collect_us_stock_daily_data_v2 + collect_us_stock_daily_data_v2(db_url) + except Exception as e: + logger.error(f"启动美股日线数据采集任务失败: {str(e)}") + return jsonify({ + "status": "success" + }), 200 + + @app.route('/scheduler/rzrq/collection', methods=['GET']) def run_rzrq_initial_collection1(): """执行融资融券数据更新采集 下午7点开始""" @@ -2436,6 +2460,141 @@ def get_stock_price_range(): "message": f"服务器错误: {str(e)}" }), 500 + +@app.route('/api/us_stock/price_range', methods=['GET']) +def get_us_stock_price_range(): + """根据美股PE估值分位计算理论价格区间 + + 根据当前PE的四分位数据,反向计算出对应的理论股价区间 + + 参数: + - stock_code: 必须,美股代码 (如 AAPL, GOOGL) + - start_date: 可选,开始日期,默认为一年前 + + 返回内容: + { + "status": "success", + "data": { + "stock_code": "AAPL", + "stock_name": "Apple Inc.", + "current_price": 150.25, + "current_date": "2023-12-01", + "pe": { + "current": 25.5, + "q1": 22.3, + "q3": 28.7, + "q1_price": 131.4, // 对应PE为Q1时的理论股价 + "q3_price": 169.1 // 对应PE为Q3时的理论股价 + } + } + } + """ + try: + # 获取股票代码参数 + stock_code = request.args.get('stock_code') + + # 验证参数 + if not stock_code: + return jsonify({ + "status": "error", + "message": "缺少必要参数: stock_code" + }), 400 + + # 美股代码格式处理 (统一转换为大写) + stock_code = stock_code.strip().upper() + + # 计算一年前的日期作为默认起始日期 + default_start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d') + start_date = request.args.get('start_date', default_start_date) + + # 通过美股估值分析器获取PE数据 + pe_data = us_valuation_analyzer.get_us_historical_data(stock_code, start_date) + if pe_data.empty: + return jsonify({ + "status": "error", + "message": f"未找到美股 {stock_code} 的历史数据" + }), 404 + + # 计算PE分位数 + pe_percentiles = us_valuation_analyzer.calculate_us_percentiles(pe_data, 'pe') + if not pe_percentiles: + return jsonify({ + "status": "error", + "message": f"无法计算美股 {stock_code} 的PE分位数" + }), 500 + + # 获取当前股价 + current_price = None + current_date = None + if not pe_data.empty: + current_price_raw = pe_data.iloc[-1].get('close') + # 确保current_price是数值类型 + try: + current_price = float(current_price_raw) if current_price_raw is not None else None + except (ValueError, TypeError): + current_price = None + + current_date = pe_data.iloc[-1].get('timestamp').strftime('%Y-%m-%d') if 'timestamp' in pe_data.columns else None + + if current_price is None: + return jsonify({ + "status": "error", + "message": f"无法获取美股 {stock_code} 的当前股价" + }), 500 + + # 获取当前PE + current_pe = pe_percentiles.get('current') + + # 获取PE的Q1和Q3 + pe_q1 = pe_percentiles.get('q1') + pe_q3 = pe_percentiles.get('q3') + + # 反向计算估值分位对应的股价 + # 如果当前PE为X,股价为Y,则PE为Z时的理论股价 = Y * (X / Z) + + # 计算PE对应的理论股价 + pe_q1_price = None + pe_q3_price = None + if current_pe and current_pe > 0 and pe_q1 and pe_q3 and current_price: + try: + pe_q1_price = current_price * (pe_q1 / current_pe) + pe_q3_price = current_price * (pe_q3 / current_pe) + except (TypeError, ValueError) as e: + logger.error(f"计算理论股价时发生错误: {e}") + pe_q1_price = None + pe_q3_price = None + + # 获取股票名称 + stock_name = us_valuation_analyzer.get_us_stock_name(stock_code) + + # 构建响应 + response = { + "status": "success", + "data": { + "stock_code": stock_code, + "stock_name": stock_name, + "current_price": current_price, + "current_date": current_date, + "pe": { + "current": current_pe, + "q1": pe_q1, + "q3": pe_q3, + "q1_price": round(pe_q1_price, 2) if pe_q1_price is not None else None, + "q3_price": round(pe_q3_price, 2) if pe_q3_price is not None else None + } + } + } + + return jsonify(response) + + except Exception as e: + logger.error(f"计算美股价格区间异常: {str(e)}") + return jsonify({ + "status": "error", + "message": f"服务器错误: {str(e)}" + }), 500 + + @app.route('/api/fear_greed/data', methods=['GET']) def get_fear_greed_data(): """获取恐贪指数数据 diff --git a/src/quantitative_analysis/financial_data_collector.py b/src/quantitative_analysis/financial_data_collector.py index 3b1d890..6999dd5 100644 --- a/src/quantitative_analysis/financial_data_collector.py +++ b/src/quantitative_analysis/financial_data_collector.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -东方财富财务数据采集器 V2.0 +东方财富财务数据采集器 V2.0--每次季度财报更新之后执行这个脚本 适配2025年新版接口 从东方财富网自动采集A股上市公司的财务报表数据,包括: diff --git a/src/quantitative_analysis/tech_fundamental_factor_strategy.py b/src/quantitative_analysis/tech_fundamental_factor_strategy.py index 1d53e67..7d3a0cc 100644 --- a/src/quantitative_analysis/tech_fundamental_factor_strategy.py +++ b/src/quantitative_analysis/tech_fundamental_factor_strategy.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- """ -科技主题基本面因子选股策略 +科技主题基本面因子选股策略--这里就是入口--请执行这个文件! 整合企业生命周期、财务指标和平均距离因子分析 """ @@ -725,4 +725,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() \ No newline at end of file diff --git a/src/quantitative_analysis/us_batch_stock_price_collector.py b/src/quantitative_analysis/us_batch_stock_price_collector.py new file mode 100644 index 0000000..576f549 --- /dev/null +++ b/src/quantitative_analysis/us_batch_stock_price_collector.py @@ -0,0 +1,293 @@ +import requests +import pandas as pd +from datetime import datetime +import sys +import os +import redis +import json +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +import time + +# 添加项目根目录到路径,便于导入scripts.config +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(project_root) + +# 读取雪球headers和Redis配置 +try: + from src.scripts.config import XUEQIU_HEADERS + from src.valuation_analysis.config import REDIS_CONFIG +except ImportError: + XUEQIU_HEADERS = { + 'User-Agent': 'Mozilla/5.0', + 'Cookie': '', # 需要填写雪球cookie + } + REDIS_CONFIG = { + 'host': 'localhost', + 'port': 6379, + 'db': 0, + 'password': None + } + +REDIS_KEY = 'xq_us_stock_changes_latest' # 存放美股行情的主键 + +# 条件导入代理管理器 +proxy_manager = None +try: + from src.scripts.ProxyIP import EnhancedProxyManager + proxy_manager = EnhancedProxyManager() +except ImportError: + print("代理管理器导入失败,将使用直接请求模式") + + +def get_redis_conn(): + """获取Redis连接""" + pool = redis.ConnectionPool( + host=REDIS_CONFIG['host'], + port=REDIS_CONFIG['port'], + db=REDIS_CONFIG.get('db', 0), + password=REDIS_CONFIG.get('password', None), + decode_responses=True + ) + return redis.Redis(connection_pool=pool) + + +def fetch_and_store_us_stock_data(page_size=90, max_workers=10, use_proxy=False): + """ + 批量采集雪球美股股票的最新行情数据,并保存到Redis。 + 使用线程池并行请求,提高采集效率。 + + :param page_size: 每页采集数量 + :param max_workers: 线程池最大工作线程数 + :param use_proxy: 是否使用代理(默认False) + """ + base_url = 'https://stock.xueqiu.com/v5/stock/screener/quote/list.json' + headers = XUEQIU_HEADERS + + all_data = [] + data_lock = threading.Lock() # 线程安全锁 + + def fetch_page_data(page): + """获取单页数据的函数""" + params = { + 'page': page, + 'size': page_size, + 'order': 'desc', + 'order_by': 'market_capital', + 'market': 'US', + 'type': 'us', + 'is_delay': 'true' + } + + try: + # 根据配置选择是否使用代理 + if use_proxy and proxy_manager: + response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) + else: + response = requests.get(base_url, headers=headers, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + page_data = data['data']['list'] + + # 线程安全地添加数据 + with data_lock: + all_data.extend(page_data) + + print(f"成功采集美股第 {page} 页数据,获取 {len(page_data)} 条记录") + return len(page_data) + else: + print(f"请求美股数据第 {page} 页失败,状态码:{response.status_code}") + return 0 + except Exception as e: + print(f"请求美股数据第 {page} 页异常:{e}") + return 0 + + # 使用线程池并行采集数据 + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + + # 先获取总页数 + params = { + 'page': 1, + 'size': page_size, + 'order': 'desc', + 'order_by': 'market_capital', + 'market': 'US', + 'type': 'us', + 'is_delay': 'true' + } + + try: + # 根据配置选择是否使用代理 + if use_proxy and proxy_manager: + response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) + else: + response = requests.get(base_url, headers=headers, params=params, timeout=10) + + if response.status_code != 200: + print(f"请求美股数据失败,状态码:{response.status_code}") + return pd.DataFrame() + + data = response.json() + total_count = data['data']['count'] + total_pages = (total_count // page_size) + 1 + + print(f"开始采集美股数据,共 {total_pages} 页,总计 {total_count} 条记录") + + # 提交所有页面的采集任务 + for page in range(1, total_pages + 1): + future = executor.submit(fetch_page_data, page) + futures.append(future) + + except Exception as e: + print(f"获取美股总页数失败:{e}") + return pd.DataFrame() + + # 等待所有任务完成 + print(f"正在并行采集美股数据,使用 {max_workers} 个线程...") + start_time = time.time() + + completed_count = 0 + for future in as_completed(futures): + completed_count += 1 + try: + result = future.result() + if result > 0: + print(f"进度: {completed_count}/{len(futures)} 页完成") + except Exception as e: + print(f"采集任务异常:{e}") + + end_time = time.time() + print(f"美股数据采集完成,耗时: {end_time - start_time:.2f} 秒") + + # 转换为 DataFrame + df = pd.DataFrame(all_data) + + if not df.empty: + df['fetch_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # 存入Redis,使用hash结构,key为symbol,value为json字符串 + r = get_redis_conn() + pipe = r.pipeline() + + # 先清空旧数据 + r.delete(REDIS_KEY) + + print(f"正在将 {len(df)} 条美股记录写入Redis...") + + for _, row in df.iterrows(): + symbol = row.get('symbol') + if not symbol: + continue + # 只保留必要字段,也可直接存row.to_dict() + value = row.to_dict() + pipe.hset(REDIS_KEY, symbol, json.dumps(value, ensure_ascii=False)) + + pipe.execute() + print(f"成功将美股数据写入Redis哈希 {REDIS_KEY},共{len(df)}条记录。") + + # 返回DataFrame供其他脚本使用 + return df + else: + print("未获取到任何美股数据。") + return pd.DataFrame() + + +def format_us_stock_code(stock_code): + """ + 统一美股代码格式,支持AAPL、GOOGL等 + 返回雪球格式(如AAPL、GOOGL)和Redis存储格式(如AAPL) + """ + stock_code = stock_code.upper() + # 美股代码通常直接返回 + return stock_code, stock_code + + +def get_us_stock_realtime_info_from_redis(stock_code): + """ + 根据美股代码从Redis查询实时行情,并封装为指定结构。 + :param stock_code: 美股代码如AAPL、GOOGL等 + :return: dict or None + """ + _, redis_code = format_us_stock_code(stock_code) + r = get_redis_conn() + value = r.hget(REDIS_KEY, redis_code) + if not value: + return None + try: + data = json.loads(value) + except Exception: + return None + # 封装为指定结构 + result = { + "code": None, + "crawlDate": None, + "marketValue": None, + "maxPrice": None, + "minPrice": None, + "nowPrice": None, + "pbRate": None, + "rangeRiseAndFall": None, + "shortName": None, + "todayStartPrice": None, + "ttm": None, + "turnoverRate": None, + "yesterdayEndPrice": None + } + # 赋值映射 + result["code"] = data.get("symbol") + result["crawlDate"] = data.get("fetch_time") + result["marketValue"] = data.get("market_capital") + result["maxPrice"] = data.get("high") if "high" in data else data.get("high52w") + result["minPrice"] = data.get("low") if "low" in data else data.get("low52w") + result["nowPrice"] = data.get("current") + result["pbRate"] = data.get("pb") + result["rangeRiseAndFall"] = data.get("percent") + result["shortName"] = data.get("name") + result["todayStartPrice"] = data.get("open") + result["ttm"] = data.get("pe_ttm") + result["turnoverRate"] = data.get("turnover_rate") + result["yesterdayEndPrice"] = data.get("last_close") if "last_close" in data else data.get("pre_close") + # 兼容部分字段缺失 + if result["maxPrice"] is None and "high" in data: + result["maxPrice"] = data["high"] + if result["minPrice"] is None and "low" in data: + result["minPrice"] = data["low"] + return result + + +def fetch_and_store_us_stock_data_optimized(page_size=90, max_workers=15, use_proxy=False): + """ + 优化版本的美股批量采集函数,支持更灵活的配置 + + :param page_size: 每页采集数量 + :param max_workers: 线程池最大工作线程数(建议10-20之间) + :param use_proxy: 是否使用代理(默认False) + """ + print(f"开始批量采集美股数据...") + print(f"配置: 每页 {page_size} 条记录,最大线程数 {max_workers}") + print(f"代理模式: {'启用' if use_proxy else '禁用'}") + print(f"预计采集: 美股所有股票数据") + print("-" * 50) + + try: + result = fetch_and_store_us_stock_data(page_size, max_workers, use_proxy) + if not result.empty: + print(f"美股采集完成!共获取 {len(result)} 只股票的数据") + print(f"数据已保存到Redis键: {REDIS_KEY}") + else: + print("美股采集完成,但未获取到数据") + except Exception as e: + print(f"美股采集过程中发生错误: {e}") + return None + + return result + + +if __name__ == '__main__': + # 可以根据需要调整参数 + # fetch_and_store_us_stock_data_optimized(page_size=100, max_workers=15, use_proxy=True) + fetch_and_store_us_stock_data_optimized(use_proxy=False) # 默认不使用代理 + + diff --git a/src/quantitative_analysis/us_stock_daily_data_collector.py b/src/quantitative_analysis/us_stock_daily_data_collector.py new file mode 100644 index 0000000..d5f72e3 --- /dev/null +++ b/src/quantitative_analysis/us_stock_daily_data_collector.py @@ -0,0 +1,472 @@ +# coding:utf-8 + +import requests +import pandas as pd +from sqlalchemy import create_engine, text +from datetime import datetime, timedelta +from tqdm import tqdm +from src.scripts.config import XUEQIU_HEADERS +from src.scripts.ProxyIP import EnhancedProxyManager +import gc + +class StockDailyDataCollector: + """股票日线数据采集器类""" + + def __init__(self, db_url): + self.engine = create_engine( + db_url, + pool_size=5, + max_overflow=10, + pool_recycle=3600 + ) + self.headers = XUEQIU_HEADERS + # 初始化代理管理器 + self.proxy_manager = EnhancedProxyManager() + + def fetch_all_stock_codes(self): + # 从us_code_all获取股票代码 + query_all = "SELECT gp_code FROM gp_code_us" + df_all = pd.read_sql(query_all, self.engine) + codes_all = df_all['gp_code'].tolist() + + # 合并去重 + # all_codes = list(set(query_all)) + print(f"获取到股票代码: {len(codes_all)} 个来自gp_code_us,去重后共{len(codes_all)}个") + return codes_all + + def fetch_daily_stock_data(self, symbol, begin, count=-1): + """获取日线数据,count=-1表示最新一天,-2表示最近两天,-1800表示最近1800天""" + url = f"https://stock.xueqiu.com/v5/stock/chart/kline.json?symbol={symbol}&begin={begin}&period=day&type=before&count={count}&indicator=kline,pe,pb,ps,pcf,market_capital,agt,ggt,balance" + try: + # 使用代理管理器发送请求 + # response = requests.get(url, headers=self.headers, timeout=20) + response = self.proxy_manager.request_with_proxy('get', url, headers=self.headers) + return response.json() + except Exception as e: + print(f"Request error for {symbol}: {e}") + return {'error_code': -1, 'error_description': str(e)} + + def transform_data(self, data, symbol): + try: + items = data['data']['item'] + columns = data['data']['column'] + except KeyError as e: + print(f"KeyError for {symbol}: {e}") + return None + + df = pd.DataFrame(items, columns=columns) + df['symbol'] = symbol + + required_columns = ['timestamp', 'volume', 'open', 'high', 'low', 'close', + 'chg', 'percent', 'turnoverrate', 'amount', 'symbol', 'pb', 'pe', 'ps'] + existing_columns = [col for col in required_columns if col in df.columns] + df = df[existing_columns] + + if 'timestamp' in df.columns: + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai') + + return df + + def save_batch_to_database(self, batch): + if batch: + df_all = pd.concat(batch, ignore_index=True) + df_all.to_sql('us_day_data', self.engine, if_exists='append', index=False) + + def fetch_data_for_date(self, date=None): + if date is None: + start_date = datetime.now() + date_str = start_date.strftime('%Y-%m-%d') + else: + start_date = datetime.strptime(date, '%Y-%m-%d') + date_str = date + + # delete_query = text("DELETE FROM us_day_data WHERE `timestamp` LIKE :date_str") + # with self.engine.begin() as conn: + # conn.execute(delete_query, {"date_str": f"{date_str}%"}) + + stock_codes = self.fetch_all_stock_codes() + begin = int(start_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000) + + batch_data = [] + for idx, symbol in enumerate(tqdm(stock_codes, desc=f"Fetching and saving daily stock data for {date_str}")): + data = self.fetch_daily_stock_data(symbol, begin) + + if data.get('error_code') == 0: + df = self.transform_data(data, symbol) + if df is not None: + batch_data.append(df) + else: + print(f"Error fetching data for {symbol} on {date_str}: {data.get('error_description')}") + + if len(batch_data) >= 100: + self.save_batch_to_database(batch_data) + batch_data.clear() + gc.collect() + + # Save remaining data + if batch_data: + self.save_batch_to_database(batch_data) + gc.collect() + + self.engine.dispose() + print(f"Daily data fetching and saving completed for {date_str}.") + + def delete_stock_history(self, symbol): + """删除指定股票的全部历史数据""" + delete_query = text("DELETE FROM us_day_data WHERE symbol = :symbol") + try: + with self.engine.begin() as conn: + conn.execute(delete_query, {"symbol": symbol}) + print(f"Deleted history for {symbol}") + return True + except Exception as e: + print(f"Error deleting history for {symbol}: {e}") + return False + + def refetch_and_save_history(self, symbol, days=1800): + """重新获取并保存指定股票的长期历史数据""" + print(f"Refetching last {days} days for {symbol}...") + begin = int(datetime.now().timestamp() * 1000) + data = self.fetch_daily_stock_data(symbol, begin, count=-days) + if data.get('error_code') == 0: + df = self.transform_data(data, symbol) + if df is not None and not df.empty: + self.save_batch_to_database([df]) + print(f"Successfully refetched and saved history for {symbol}.") + else: + print(f"No data transformed for {symbol} after refetch.") + else: + print(f"Error refetching history for {symbol}: {data.get('error_description')}") + + def check_and_fix_ex_rights_data(self): + """ + 检查所有股票是否发生除权,如果发生,则删除历史数据并重新获取。 + 新逻辑:直接用API返回的上个交易日的时间戳去数据库查询,更稳妥。 + 记录:除权日期、股票代码()、 + """ + all_codes = self.fetch_all_stock_codes() + ex_rights_log_data = [] + + print("--- Step 1: Checking for ex-rights stocks ---") + for symbol in tqdm(all_codes, desc="Comparing prices"): + # 1. 从API获取最近两天的日线数据 + begin = int(datetime.now().timestamp() * 1000) + data = self.fetch_daily_stock_data(symbol, begin, count=-2) + + api_timestamp_str = None + api_close = None + + if data.get('error_code') == 0 and data.get('data', {}).get('item') and len(data['data']['item']) >= 2: + try: + # API返回的数据是按时间升序的,[-2]是上个交易日 + prev_day_data = data['data']['item'][-2] + columns = data['data']['column'] + + timestamp_index = columns.index('timestamp') + close_index = columns.index('close') + + api_timestamp_ms = prev_day_data[timestamp_index] + api_close = prev_day_data[close_index] + + # 将毫秒时间戳转换为'YYYY-MM-DD'格式,用于数据库查询 + api_timestamp_str = pd.to_datetime(api_timestamp_ms, unit='ms', utc=True).tz_convert('Asia/Shanghai').strftime('%Y-%m-%d') + except (ValueError, IndexError, TypeError) as e: + print(f"\nError parsing API data for {symbol}: {e}") + continue # 处理下一只股票 + else: + # 获取API数据失败或数据不足,跳过此股票 + continue + + # 如果未能从API解析出上个交易日的数据,则跳过 + if api_timestamp_str is None or api_close is None: + continue + + # 2. 根据API返回的时间戳,从数据库查询当天的收盘价 + db_close = None + query = text("SELECT `close` FROM us_day_data WHERE symbol = :symbol AND `timestamp` LIKE :date_str") + try: + with self.engine.connect() as conn: + result = conn.execute(query, {"symbol": symbol, "date_str": f"{api_timestamp_str}%"}).fetchone() + db_close = result[0] if result else None + except Exception as e: + print(f"\nError getting DB close for {symbol} on {api_timestamp_str}: {e}") + continue + + # 3. 比较价格 + if db_close is not None: + # 注意:数据库中取出的db_close可能是Decimal类型,需要转换 + if not abs(float(db_close) - api_close) < 0.001: + print(f"\nEx-rights detected for {symbol} on {api_timestamp_str}: DB_close={db_close}, API_close={api_close}") + ex_rights_log_data.append({ + 'symbol': symbol, + 'date': datetime.now().strftime('%Y-%m-%d'), + 'db_price': float(db_close), + 'api_price': api_close, + 'log_time': datetime.now() + }) + # 如果数据库当天没有数据,我们无法比较,所以不处理。 + # 这可能是新股或之前采集失败,不属于除权范畴。 + + # 4. 对发生除权的股票进行记录和修复 + if not ex_rights_log_data: + print("\n--- No ex-rights stocks found. Data is consistent. ---") + self.engine.dispose() + return + + # 在修复前,先将日志保存到数据库 + self.save_ex_rights_log(ex_rights_log_data) + + # 从日志数据中提取出需要修复的股票代码列表 + ex_rights_stocks = [item['symbol'] for item in ex_rights_log_data] + + print(f"\n--- Step 2: Found {len(ex_rights_stocks)} stocks to fix: {ex_rights_stocks} ---") + for symbol in tqdm(ex_rights_stocks, desc="Fixing data"): + if self.delete_stock_history(symbol): + self.refetch_and_save_history(symbol, days=1800) + + self.engine.dispose() + print("\n--- Ex-rights data fixing process completed. ---") + + def save_ex_rights_log(self, log_data: list): + """将除权日志保存到数据库""" + if not log_data: + return + + print(f"--- Saving {len(log_data)} ex-rights events to log table... ---") + try: + df = pd.DataFrame(log_data) + # 确保列名与数据库字段匹配 + df = df.rename(columns={ + 'symbol': 'stock_code', + 'date': 'change_date', + 'db_price': 'before_price', + 'api_price': 'after_price', + 'log_time': 'update_time' + }) + df.to_sql('us_gp_ex_rights_log', self.engine, if_exists='append', index=False) + print("--- Ex-rights log saved successfully. ---") + except Exception as e: + print(f"!!! Error saving ex-rights log: {e}") + + def fetch_single_stock_history(self, symbol, days=1800): + """ + 获取单只股票的历史数据并保存到数据库 + :param symbol: 股票代码 + :param days: 获取的天数,默认1800天 + :return: 是否成功 + """ + print(f"开始获取 {symbol} 最近 {days} 天的历史数据...") + begin = int(datetime.now().timestamp() * 1000) + data = self.fetch_daily_stock_data(symbol, begin, count=-days) + + if data.get('error_code') == 0: + df = self.transform_data(data, symbol) + if df is not None and not df.empty: + df.to_sql('us_day_data', self.engine, if_exists='append', index=False) + print(f"成功保存 {symbol} 的历史数据,共 {len(df)} 条记录") + return True + else: + print(f"未能转换 {symbol} 的数据") + return False + else: + print(f"获取 {symbol} 数据失败: {data.get('error_description')}") + return False + + def fetch_and_check_ex_rights_optimized(self, date=None): + """ + 优化版:一次遍历完成数据采集和除权检查 + 获取最近2天数据,检查除权,决定是更新当天数据还是重新获取历史数据 + """ + if date is None: + start_date = datetime.now() + date_str = start_date.strftime('%Y-%m-%d') + else: + start_date = datetime.strptime(date, '%Y-%m-%d') + date_str = date + + print(f"开始优化版数据采集和除权检查 - {date_str}") + + # 删除今天的旧数据 + delete_query = text("DELETE FROM us_day_data WHERE `timestamp` LIKE :date_str") + with self.engine.begin() as conn: + conn.execute(delete_query, {"date_str": f"{date_str}%"}) + print(f"已删除今日 {date_str} 的旧数据") + + stock_codes = self.fetch_all_stock_codes() + begin = int(start_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000) + + # 统计信息 + normal_update_count = 0 + ex_rights_count = 0 + error_count = 0 + skipped_count = 0 # 跳过的股票数量(停牌等原因) + ex_rights_log_data = [] + normal_batch_data = [] + + for idx, symbol in enumerate(tqdm(stock_codes, desc=f"采集和检查除权 {date_str}")): + try: + # 获取最近2天的数据 + data = self.fetch_daily_stock_data(symbol, begin, count=-2) + + if data.get('error_code') != 0: + print(f"获取 {symbol} 数据失败: {data.get('error_description')}") + error_count += 1 + continue + + df = self.transform_data(data, symbol) + if df is None or df.empty: + print(f"转换 {symbol} 数据失败") + error_count += 1 + continue + + # 检查是否有足够的数据进行除权判断 + if len(df) < 2: + # 只有一天数据,检查是否为今天的数据 + if len(df) == 1: + latest_date = df.iloc[0]['timestamp'].strftime('%Y-%m-%d') + if latest_date == date_str: + # 是今天的数据,保存 + normal_batch_data.append(df) + normal_update_count += 1 + else: + # 不是今天的数据,可能停牌,跳过 + print(f"股票 {symbol} 最新数据日期 {latest_date} 不是今天 {date_str},跳过") + skipped_count += 1 + continue + + # 按时间排序,确保最新的数据在最后 + df_sorted = df.sort_values('timestamp') + + # 获取昨天和今天的数据 + latest_row = df_sorted.iloc[-1] # 最新一天(今天) + previous_row = df_sorted.iloc[-2] # 前一天(昨天) + + # 检查最新一天是否为今天的数据 + latest_date = latest_row['timestamp'].strftime('%Y-%m-%d') + if latest_date != date_str: + # 最新数据不是今天的,可能停牌,跳过 + print(f"股票 {symbol} 最新数据日期 {latest_date} 不是今天 {date_str},跳过") + skipped_count += 1 + continue + + current_close = latest_row['close'] + previous_close = previous_row['close'] + + # 查询数据库中该股票昨天的收盘价 + yesterday_date = previous_row['timestamp'].strftime('%Y-%m-%d') + query = text(""" + SELECT `close` FROM us_day_data + WHERE symbol = :symbol AND DATE(`timestamp`) = :date + LIMIT 1 + """) + + with self.engine.connect() as conn: + result = conn.execute(query, {"symbol": symbol, "date": yesterday_date}).fetchone() + + # 判断是否除权 + is_ex_rights = False + if result: + db_previous_close = float(result[0]) + # 比较API返回的昨日收盘价与数据库中的收盘价 + if abs(db_previous_close - previous_close) > 0.001: + is_ex_rights = True + print(f"发现除权股票: {symbol}, 数据库昨收: {db_previous_close}, API昨收: {previous_close}") + + # 记录除权日志 + ex_rights_log_data.append({ + 'symbol': symbol, + 'date': date_str, + 'db_price': db_previous_close, + 'api_price': previous_close, + 'log_time': datetime.now() + }) + + if is_ex_rights: + # 除权处理:删除历史数据,重新获取1800天数据 + delete_all_query = text("DELETE FROM us_day_data WHERE symbol = :symbol") + with self.engine.begin() as conn: + conn.execute(delete_all_query, {"symbol": symbol}) + + # 重新获取1800天历史数据 + success = self.fetch_single_stock_history(symbol, 1800) + if success: + ex_rights_count += 1 + print(f"除权股票 {symbol} 历史数据重新获取成功") + else: + error_count += 1 + print(f"除权股票 {symbol} 历史数据重新获取失败") + else: + # 正常更新:只保存今天的数据 + today_data = df_sorted.tail(1) # 只取最新一天的数据 + normal_batch_data.append(today_data) + normal_update_count += 1 + + # 批量保存正常更新的数据 + if len(normal_batch_data) >= 100: + for batch_df in normal_batch_data: + batch_df.to_sql('us_day_data', self.engine, if_exists='append', index=False) + normal_batch_data.clear() + gc.collect() + + except Exception as e: + print(f"处理股票 {symbol} 时发生错误: {e}") + error_count += 1 + continue + + # 保存剩余的正常更新数据 + if normal_batch_data: + for batch_df in normal_batch_data: + batch_df.to_sql('us_day_data', self.engine, if_exists='append', index=False) + gc.collect() + + # 保存除权日志 + if ex_rights_log_data: + self.save_ex_rights_log(ex_rights_log_data) + + # 输出统计信息 + total_processed = normal_update_count + ex_rights_count + error_count + skipped_count + print(f"\n=== 采集完成统计 ===") + print(f"总处理股票数: {total_processed}") + print(f"正常更新: {normal_update_count}") + print(f"除权处理: {ex_rights_count}") + print(f"跳过股票: {skipped_count} (停牌等原因)") + print(f"错误处理: {error_count}") + print(f"除权日志: {len(ex_rights_log_data)} 条") + + self.engine.dispose() + print(f"优化版数据采集和除权检查完成 - {date_str}") + +def collect_stock_daily_data(db_url, date=None): + """ + 原始版本:分两步执行(先采集,后检查除权) + """ + collector = StockDailyDataCollector(db_url) + collector.fetch_data_for_date(date) + # collector.check_and_fix_ex_rights_data() + +def collect_stock_daily_data_optimized(db_url, date=None): + """ + 优化版本:一次遍历完成数据采集和除权检查 + """ + collector = StockDailyDataCollector(db_url) + collector.fetch_and_check_ex_rights_optimized(date) + +if __name__ == "__main__": + db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj' + + # --- 使用方式 --- + # 1. 【推荐】优化版:一次遍历完成数据采集和除权检查 + # collect_stock_daily_data_optimized(db_url) + + # 2. 原始版本:分两步执行(先采集,后检查除权) + # collect_stock_daily_data(db_url) + + # 3. 手动执行除权检查和数据修复 + # collector = StockDailyDataCollector(db_url) + # collector.check_and_fix_ex_rights_data() + + # 4. 单独获取某只股票的历史数据 + collector = StockDailyDataCollector(db_url) + codes = collector.fetch_all_stock_codes() + for code in codes: + collector.fetch_single_stock_history(code, 1800) diff --git a/src/quantitative_analysis/us_stock_daily_data_collector_v2.py b/src/quantitative_analysis/us_stock_daily_data_collector_v2.py new file mode 100644 index 0000000..90e64e9 --- /dev/null +++ b/src/quantitative_analysis/us_stock_daily_data_collector_v2.py @@ -0,0 +1,346 @@ +# coding:utf-8 + +import requests +import pandas as pd +from sqlalchemy import create_engine, text +from datetime import datetime, timedelta +from tqdm import tqdm +import sys +import os +import gc + +# 添加项目根目录到路径 +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(project_root) + +from src.scripts.config import XUEQIU_HEADERS +from src.scripts.ProxyIP import EnhancedProxyManager +from src.quantitative_analysis.us_batch_stock_price_collector import fetch_and_store_us_stock_data +from src.quantitative_analysis.us_stock_price_collector import USStockPriceCollector +from src.quantitative_analysis.us_stock_daily_data_collector import StockDailyDataCollector + + +class USStockDailyDataCollectorV2: + """美股日线数据采集器V2版本 - 整合雪球和东方财富数据""" + + def __init__(self, db_url): + self.engine = create_engine( + db_url, + pool_size=5, + max_overflow=10, + pool_recycle=3600 + ) + self.headers = XUEQIU_HEADERS + self.proxy_manager = EnhancedProxyManager() + # 创建东方财富美股数据采集器 + self.eastmoney_collector = USStockPriceCollector(db_url) + # 创建原版采集器用于单只股票历史数据获取(如果支持美股的话) + self.original_collector = StockDailyDataCollector(db_url) + + def convert_symbol_format(self, symbol): + """ + 美股代码格式转换(美股通常不需要格式转换) + 雪球格式:AAPL -> 保持AAPL + """ + return symbol.upper() + + def convert_eastmoney_to_xueqiu_format(self, stock_code): + """ + 将东方财富格式的美股代码转换为雪球格式 + 东方财富格式:AAPL -> 雪球格式:AAPL + """ + return stock_code.upper() + + def fetch_eastmoney_data(self): + """获取东方财富的美股实时数据""" + print("正在获取东方财富美股数据...") + df = self.eastmoney_collector.fetch_all_data() + if not df.empty: + # 转换股票代码格式为雪球格式(美股通常不需要转换) + df['symbol'] = df['stock_code'].apply(self.convert_eastmoney_to_xueqiu_format) + print(f"成功获取东方财富美股数据,共 {len(df)} 条记录") + return df + + def merge_data(self, xueqiu_df, eastmoney_df): + """合并雪球和东方财富美股数据""" + print("正在合并雪球和东方财富美股数据...") + + # 基于symbol进行合并 + merged_df = pd.merge( + xueqiu_df, + eastmoney_df[['symbol', 'high_price', 'low_price', 'open_price', 'pre_close', 'list_date']], + on='symbol', + how='left' + ) + + print(f"美股数据合并完成,共 {len(merged_df)} 条记录") + return merged_df + + def transform_to_us_day_data(self, merged_df): + """将合并后的美股数据转换为us_day_data表结构""" + print("正在转换美股数据格式...") + + # 创建符合us_day_data表结构的DataFrame + us_gp_day_df = pd.DataFrame() + + # 映射字段 + us_gp_day_df['symbol'] = merged_df['symbol'] + # 将timestamp设置为当天的00:00:00格式 + today = datetime.now().date() + us_gp_day_df['timestamp'] = pd.to_datetime(today) + us_gp_day_df['volume'] = merged_df['volume'] + us_gp_day_df['open'] = merged_df['open_price'] + us_gp_day_df['high'] = merged_df['high_price'] + us_gp_day_df['low'] = merged_df['low_price'] + us_gp_day_df['close'] = merged_df['current'] + us_gp_day_df['chg'] = merged_df['chg'] + us_gp_day_df['percent'] = merged_df['percent'] + us_gp_day_df['turnoverrate'] = merged_df['turnover_rate'] + us_gp_day_df['amount'] = merged_df['amount'] + us_gp_day_df['pb'] = merged_df['pb'] + us_gp_day_df['pe'] = merged_df['pe_ttm'] + us_gp_day_df['ps'] = merged_df['ps'] + + # 添加pre_close字段(用于除权检查) + us_gp_day_df['pre_close'] = merged_df['pre_close'] + + print(f"美股数据转换完成,共 {len(us_gp_day_df)} 条记录") + return us_gp_day_df + + def save_to_database(self, df): + """保存美股数据到数据库""" + if df.empty: + print("没有美股数据需要保存") + return + + print(f"正在保存美股数据到数据库,共 {len(df)} 条记录...") + + # 删除今日数据 + today_str = datetime.now().strftime('%Y-%m-%d') + delete_query = text("DELETE FROM us_day_data WHERE `timestamp` LIKE :date_str") + + try: + with self.engine.begin() as conn: + conn.execute(delete_query, {"date_str": f"{today_str}%"}) + print(f"已删除今日 {today_str} 的美股旧数据") + except Exception as e: + print(f"删除今日美股数据失败: {e}") + + # 分批保存数据 + batch_size = 1000 + for i in range(0, len(df), batch_size): + batch = df.iloc[i:i+batch_size] + try: + batch.to_sql('us_day_data', self.engine, if_exists='append', index=False) + print(f"已保存第 {i//batch_size + 1} 批美股数据") + except Exception as e: + print(f"保存第 {i//batch_size + 1} 批美股数据失败: {e}") + + print("美股数据保存完成") + + def check_ex_rights_before_save(self, df): + """在保存数据库之前检查美股除权情况,返回除权股票列表和除权日志数据""" + print("步骤5.1: 检查美股除权情况(保存前)...") + + ex_rights_stocks = [] + ex_rights_log_data = [] + today_str = datetime.now().strftime('%Y-%m-%d') + + for _, row in tqdm(df.iterrows(), total=len(df), desc="检查美股除权"): + symbol = row['symbol'] + current_pre_close = row['pre_close'] + + # 如果pre_close为空,跳过 + if pd.isna(current_pre_close): + continue + + # 查询数据库中该美股的最近两条收盘价记录 + query = text(""" + SELECT `close`, `timestamp` FROM us_day_data + WHERE symbol = :symbol + ORDER BY `timestamp` DESC + LIMIT 2 + """) + + try: + with self.engine.connect() as conn: + results = conn.execute(query, {"symbol": symbol}).fetchall() + + if results: + # 检查最新记录是否为今天的数据 + latest_record = results[0] + latest_timestamp = latest_record[1] + latest_date_str = latest_timestamp.strftime('%Y-%m-%d') + + if latest_date_str == today_str and len(results) > 1: + # 如果最新记录是今天的,且有第二条记录,则用第二条记录比较 + db_last_close = float(results[1][0]) + else: + # 如果最新记录不是今天的,或者只有一条记录,则用最新记录比较 + db_last_close = float(results[0][0]) + + # 比较pre_close和数据库中的收盘价 + if abs(db_last_close - current_pre_close) > 0.001: + print(f"发现美股除权股票: {symbol}, 数据库收盘价: {db_last_close}, 当前昨收价: {current_pre_close}") + ex_rights_stocks.append(symbol) + + # 收集除权日志数据 + ex_rights_log_data.append({ + 'symbol': symbol, + 'date': today_str, + 'db_price': db_last_close, + 'api_price': current_pre_close, + 'log_time': datetime.now() + }) + except Exception as e: + print(f"查询美股 {symbol} 历史数据失败: {e}") + continue + + if ex_rights_stocks: + print(f"检测到 {len(ex_rights_stocks)} 只美股除权股票: {ex_rights_stocks}") + else: + print("未发现美股除权股票") + + return ex_rights_stocks, ex_rights_log_data + + def save_ex_rights_log(self, log_data: list): + """将美股除权日志保存到数据库""" + if not log_data: + return + + print(f"正在保存 {len(log_data)} 条美股除权日志到us_gp_ex_rights_log表...") + try: + df = pd.DataFrame(log_data) + # 确保列名与数据库字段匹配 + df = df.rename(columns={ + 'symbol': 'stock_code', + 'date': 'change_date', + 'db_price': 'before_price', + 'api_price': 'after_price', + 'log_time': 'update_time' + }) + df.to_sql('us_gp_ex_rights_log', self.engine, if_exists='append', index=False) + print("美股除权日志保存成功") + except Exception as e: + print(f"保存美股除权日志失败: {e}") + + def fetch_single_stock_history(self, symbol, days=1800): + """ + 获取单只美股的历史数据并保存到数据库 + :param symbol: 美股代码 + :param days: 获取的天数,默认1800天 + :return: 是否成功 + """ + print(f"开始获取美股 {symbol} 最近 {days} 天的历史数据...") + begin = int(datetime.now().timestamp() * 1000) + + # 使用原版采集器获取历史数据 + data = self.original_collector.fetch_daily_stock_data(symbol, begin, count=-days) + + if data.get('error_code') == 0: + df = self.original_collector.transform_data(data, symbol) + if df is not None and not df.empty: + df.to_sql('us_day_data', self.engine, if_exists='append', index=False) + print(f"成功保存美股 {symbol} 的历史数据,共 {len(df)} 条记录") + return True + else: + print(f"未能转换美股 {symbol} 的数据") + return False + else: + print(f"获取美股 {symbol} 数据失败: {data.get('error_description')}") + return False + + def handle_ex_rights_stocks(self, ex_rights_stocks, ex_rights_log_data): + """处理美股除权股票:保存日志、删除历史数据并重新获取历史数据""" + if not ex_rights_stocks: + return + + print("步骤6: 处理美股除权股票...") + + # 6.1 保存除权日志 + if ex_rights_log_data: + self.save_ex_rights_log(ex_rights_log_data) + + # 6.2 重新获取历史数据(注意:这里可能需要根据实际情况调整) + print(f"开始处理 {len(ex_rights_stocks)} 只美股除权股票,重新获取历史数据...") + + for symbol in tqdm(ex_rights_stocks, desc="处理美股除权股票"): + try: + # 删除该美股的所有历史数据 + delete_query = text("DELETE FROM us_day_data WHERE symbol = :symbol") + with self.engine.begin() as conn: + conn.execute(delete_query, {"symbol": symbol}) + print(f"已删除美股 {symbol} 的历史数据") + + # 重新获取1800天的历史数据 + success = self.fetch_single_stock_history(symbol, 1800) + if success: + print(f"美股 {symbol} 历史数据重新获取成功") + else: + print(f"美股 {symbol} 历史数据重新获取失败") + + except Exception as e: + print(f"处理美股除权股票 {symbol} 失败: {e}") + + def run_daily_collection(self): + """执行美股每日数据采集""" + print("=" * 60) + print("美股日线数据采集器V2 - 开始运行") + print("=" * 60) + + try: + # 1. 获取雪球美股数据 + print("步骤1: 获取雪球美股数据...") + xueqiu_df = fetch_and_store_us_stock_data() + if xueqiu_df.empty: + print("雪球美股数据获取失败,终止运行") + return + + # 2. 获取东方财富美股数据 + print("步骤2: 获取东方财富美股数据...") + eastmoney_df = self.fetch_eastmoney_data() + if eastmoney_df.empty: + print("东方财富美股数据获取失败,终止运行") + return + + # 3. 合并数据 + print("步骤3: 合并美股数据...") + merged_df = self.merge_data(xueqiu_df, eastmoney_df) + + # 4. 转换数据格式 + print("步骤4: 转换美股数据格式...") + us_gp_day_df = self.transform_to_us_day_data(merged_df) + + # 5. 检查除权(保存前) + ex_rights_stocks, ex_rights_log_data = self.check_ex_rights_before_save(us_gp_day_df) + + # 5.2. 保存到数据库 + print("步骤5.2: 保存美股数据到数据库...") + self.save_to_database(us_gp_day_df) + + # 6. 处理除权股票(保存后) + self.handle_ex_rights_stocks(ex_rights_stocks, ex_rights_log_data) + + print("=" * 60) + print("美股日线数据采集完成") + print("=" * 60) + + except Exception as e: + print(f"美股采集过程中发生错误: {e}") + finally: + # 清理资源 + self.engine.dispose() + gc.collect() + + +def collect_us_stock_daily_data_v2(db_url): + """V2版本的美股日线数据采集入口函数""" + collector = USStockDailyDataCollectorV2(db_url) + collector.run_daily_collection() + + +if __name__ == "__main__": + db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj' + collect_us_stock_daily_data_v2(db_url) + + diff --git a/src/quantitative_analysis/us_stock_price_collector.py b/src/quantitative_analysis/us_stock_price_collector.py new file mode 100644 index 0000000..cdccce7 --- /dev/null +++ b/src/quantitative_analysis/us_stock_price_collector.py @@ -0,0 +1,366 @@ +""" +东方财富美股实时股价数据采集模块 +提供从东方财富网站采集美股实时股价数据并存储到数据库的功能 +功能包括: +1. 采集美股实时股价数据 +2. 存储数据到数据库 +3. 定时自动更新数据 +""" + +import requests +import pandas as pd +import datetime +import logging +import time +import os +import sys +from pathlib import Path +from sqlalchemy import create_engine, text +from typing import Dict + +# 添加项目根目录到Python路径 +current_file = Path(__file__) +project_root = current_file.parent.parent.parent +sys.path.append(str(project_root)) + +from src.valuation_analysis.config import DB_URL, LOG_FILE + +# 获取项目根目录 +ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +# 确保日志目录存在 +os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(LOG_FILE), + logging.StreamHandler() + ] +) +logger = logging.getLogger("us_stock_price_collector") + + +def get_create_us_stock_table_sql() -> str: + """ + 获取创建美股实时股价数据表的SQL语句 + + Returns: + 创建表的SQL语句 + """ + return """ + CREATE TABLE IF NOT EXISTS us_stock_price_data ( + stock_code VARCHAR(20) PRIMARY KEY COMMENT '美股代码', + stock_name VARCHAR(100) COMMENT '股票名称', + latest_price DECIMAL(10,2) COMMENT '最新价', + change_percent DECIMAL(10,2) COMMENT '涨跌幅', + change_amount DECIMAL(10,2) COMMENT '涨跌额', + volume BIGINT COMMENT '成交量(手)', + amount DECIMAL(20,2) COMMENT '成交额', + amplitude DECIMAL(10,2) COMMENT '振幅', + turnover_rate DECIMAL(10,2) COMMENT '换手率', + pe_ratio DECIMAL(10,2) COMMENT '市盈率', + high_price DECIMAL(10,2) COMMENT '最高价', + low_price DECIMAL(10,2) COMMENT '最低价', + open_price DECIMAL(10,2) COMMENT '开盘价', + pre_close DECIMAL(10,2) COMMENT '昨收价', + total_market_value DECIMAL(20,2) COMMENT '总市值', + float_market_value DECIMAL(20,2) COMMENT '流通市值', + pb_ratio DECIMAL(10,2) COMMENT '市净率', + list_date DATE COMMENT '上市日期', + update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='美股实时股价数据表'; + """ + + +class USStockPriceCollector: + """东方财富美股实时股价数据采集器类""" + + def __init__(self, db_url: str = DB_URL): + """ + 初始化东方财富美股实时股价数据采集器 + + Args: + db_url: 数据库连接URL + """ + self.engine = create_engine( + db_url, + pool_size=5, + max_overflow=10, + pool_recycle=3600 + ) + self.base_url = "https://push2.eastmoney.com/api/qt/clist/get" + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Origin": "https://quote.eastmoney.com", + "Referer": "https://quote.eastmoney.com/", + } + logger.info("东方财富美股实时股价数据采集器初始化完成") + + def _ensure_table_exists(self) -> bool: + """ + 确保数据表存在,如果不存在则创建 + + Returns: + 是否成功确保表存在 + """ + try: + create_table_query = text(get_create_us_stock_table_sql()) + + with self.engine.connect() as conn: + conn.execute(create_table_query) + conn.commit() + + logger.info("美股实时股价数据表创建成功") + return True + + except Exception as e: + logger.error(f"确保美股数据表存在失败: {e}") + return False + + def _convert_us_stock_code(self, code: str) -> str: + """ + 转换美股代码格式,保持原始格式 + + Args: + code: 原始股票代码 + + Returns: + 转换后的股票代码 + """ + # 美股代码通常直接返回,不需要添加后缀 + return code.upper() + + def _parse_list_date(self, date_str: str) -> datetime.date: + """ + 解析上市日期 + + Args: + date_str: 日期字符串 + + Returns: + 日期对象 + """ + if not date_str or date_str == '-': + return None + try: + # 如果输入是整数,先转换为字符串 + if isinstance(date_str, int): + date_str = str(date_str) + return datetime.datetime.strptime(date_str, "%Y%m%d").date() + except ValueError: + logger.warning(f"无法解析日期: {date_str}") + return None + + def fetch_data(self, page: int = 1) -> pd.DataFrame: + """ + 获取指定页码的美股实时股价数据 + + Args: + page: 页码 + + Returns: + 包含实时股价数据的DataFrame + """ + try: + # 美股数据参数,根据你提供的示例URL调整 + params = { + "np": 1, + "fltt": 1, # 美股使用1 + "invt": 2, + "fs": "m:105,m:106,m:107", # 美股市场标识 + "fid": "f12", + "pn": page, + "pz": 100, + "po": 1, # 美股使用1 + "dect": 1 + } + + logger.info(f"开始获取美股第 {page} 页数据") + + response = requests.get(self.base_url, params=params, headers=self.headers) + if response.status_code != 200: + logger.error(f"获取美股第 {page} 页数据失败: HTTP {response.status_code}") + return pd.DataFrame() + + data = response.json() + if not data.get("rc") == 0: + logger.error(f"获取美股数据失败: {data.get('message', '未知错误')}") + return pd.DataFrame() + + # 提取数据列表 + items = data.get("data", {}).get("diff", []) + if not items: + logger.warning(f"美股第 {page} 页未找到有效数据") + return pd.DataFrame() + + # 转换为DataFrame + df = pd.DataFrame(items) + + # 重命名列 + column_mapping = { + "f12": "stock_code", + "f14": "stock_name", + "f2": "latest_price", + "f3": "change_percent", + "f4": "change_amount", + "f5": "volume", + "f6": "amount", + "f7": "amplitude", + "f8": "turnover_rate", + "f9": "pe_ratio", + "f15": "high_price", + "f16": "low_price", + "f17": "open_price", + "f18": "pre_close", + "f20": "total_market_value", + "f21": "float_market_value", + "f23": "pb_ratio", + "f26": "list_date" + } + + df = df.rename(columns=column_mapping) + + # 转换股票代码格式 + df['stock_code'] = df['stock_code'].apply(self._convert_us_stock_code) + + # 转换上市日期 + df['list_date'] = df['list_date'].apply(self._parse_list_date) + + # 转换价格字段:东方财富API返回的价格需要除以1000转换为美元 + price_columns = ['latest_price', 'change_amount', 'high_price', 'low_price', 'open_price', 'pre_close'] + for col in price_columns: + if col in df.columns: + df[col] = pd.to_numeric(df[col], errors='coerce') / 1000.0 + + # 转换市值字段:市值字段也可能需要除以100 + market_value_columns = ['total_market_value', 'float_market_value'] + for col in market_value_columns: + if col in df.columns: + df[col] = pd.to_numeric(df[col], errors='coerce') / 1000.0 + + logger.info(f"美股第 {page} 页数据获取成功,包含 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"获取美股第 {page} 页数据失败: {e}") + return pd.DataFrame() + + def fetch_all_data(self) -> pd.DataFrame: + """ + 获取所有页的美股实时股价数据 + + Returns: + 包含所有实时股价数据的DataFrame + """ + all_data = [] + page = 1 + + while True: + page_data = self.fetch_data(page) + if page_data.empty: + logger.info(f"美股第 {page} 页数据为空,停止采集") + break + + all_data.append(page_data) + + # 如果返回的数据少于100条,说明是最后一页 + if len(page_data) < 100: + break + + page += 1 + # 添加延迟,避免请求过于频繁 + time.sleep(1) + + if all_data: + combined_df = pd.concat(all_data, ignore_index=True) + logger.info(f"美股数据采集完成,共采集 {len(combined_df)} 条记录") + return combined_df + else: + logger.warning("未获取到任何有效美股数据") + return pd.DataFrame() + + def save_to_database(self, data: pd.DataFrame) -> bool: + """ + 将数据保存到数据库 + + Args: + data: 要保存的数据DataFrame + + Returns: + 是否成功保存数据 + """ + if data.empty: + logger.warning("没有美股数据需要保存") + return False + + try: + # 确保数据表存在 + if not self._ensure_table_exists(): + return False + + # 清理数据 + data = data.replace('-', None) + # 将nan值转换为None(在SQL中会变成NULL) + data = data.replace({pd.NA: None, pd.NaT: None}) + data = data.where(pd.notnull(data), None) + + # 保存数据到数据库 + data.to_sql('us_stock_price_data', self.engine, if_exists='replace', index=False) + + logger.info(f"成功保存 {len(data)} 条美股数据到数据库") + return True + + except Exception as e: + logger.error(f"保存美股数据到数据库失败: {e}") + return False + + def run_collection(self) -> bool: + """ + 运行完整的美股数据采集流程 + + Returns: + 是否成功完成采集 + """ + try: + logger.info("开始美股数据采集流程") + + # 获取所有数据 + all_data = self.fetch_all_data() + if all_data.empty: + logger.warning("未获取到任何美股数据") + return False + + # 保存到数据库 + if self.save_to_database(all_data): + logger.info("美股数据采集流程完成") + return True + else: + logger.error("美股数据保存失败") + return False + + except Exception as e: + logger.error(f"美股数据采集流程失败: {e}") + return False + + +def main(): + """主函数,用于测试美股数据采集""" + collector = USStockPriceCollector() + success = collector.run_collection() + + if success: + print("美股数据采集成功完成") + else: + print("美股数据采集失败") + + +if __name__ == "__main__": + main() + + diff --git a/src/quantitative_analysis/us_valuation_analyzer.py b/src/quantitative_analysis/us_valuation_analyzer.py new file mode 100644 index 0000000..2f3b143 --- /dev/null +++ b/src/quantitative_analysis/us_valuation_analyzer.py @@ -0,0 +1,373 @@ +""" +美股PE估值分析模块 + +提供美股历史PE分位数分析功能,包括: +1. 美股历史PE数据获取 +2. 分位数计算 +3. 可视化展示 +""" + +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from sqlalchemy import create_engine, text +import datetime +import logging +from typing import Tuple, Dict, List, Optional, Union +import os +import matplotlib.dates as mdates +from matplotlib.ticker import FuncFormatter +from pathlib import Path + +# 导入配置 +try: + from src.valuation_analysis.config import DB_URL, OUTPUT_DIR, LOG_FILE +except ImportError: + # 如果导入失败,使用默认配置 + DB_URL = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj' + OUTPUT_DIR = 'results/valuation_analysis' + LOG_FILE = 'logs/us_valuation_analysis.log' + +# 确保输出目录存在 +os.makedirs(OUTPUT_DIR, exist_ok=True) +os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(LOG_FILE), + logging.StreamHandler() + ] +) +logger = logging.getLogger("us_valuation_analysis") + +# 设置matplotlib中文字体 +plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签 +plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 + + +class USValuationAnalyzer: + """美股估值分析器类""" + + def __init__(self, db_url: str = DB_URL): + """ + 初始化美股估值分析器 + + Args: + db_url: 数据库连接URL + """ + self.engine = create_engine( + db_url, + pool_size=5, + max_overflow=10, + pool_recycle=3600 + ) + logger.info("美股估值分析器初始化完成") + + def get_us_stock_name(self, stock_code: str) -> str: + """ + 根据美股代码获取股票名称 + + Args: + stock_code: 美股代码 (如 AAPL, GOOGL) + + Returns: + 股票名称 + """ + try: + # 查询数据库获取美股名称 + query = text(""" + SELECT DISTINCT stock_name + FROM us_stock_price_data + WHERE stock_code = :stock_code + LIMIT 1 + """) + + with self.engine.connect() as conn: + result = conn.execute(query, {"stock_code": stock_code}).fetchone() + + if result: + return result[0] + else: + # 如果实时数据表中没有,尝试从日线数据表获取 + query2 = text(""" + SELECT DISTINCT symbol + FROM us_day_data + WHERE symbol = :stock_code + LIMIT 1 + """) + + with self.engine.connect() as conn: + result2 = conn.execute(query2, {"stock_code": stock_code}).fetchone() + + if result2: + return stock_code # 返回股票代码作为名称 + else: + logger.warning(f"未找到美股 {stock_code} 的名称信息") + return stock_code + except Exception as e: + logger.error(f"获取美股名称失败: {e}") + return stock_code + + def get_us_historical_data(self, stock_code: str, start_date: str = '2018-01-01') -> pd.DataFrame: + """ + 获取美股的历史PE数据 + + Args: + stock_code: 美股代码 (如 AAPL, GOOGL) + start_date: 开始日期,默认为2018-01-01 + + Returns: + 包含历史PE和价格数据的DataFrame + """ + try: + # 美股数据查询,只包含PE字段,不包含PB + query = text(""" + SELECT + `timestamp`, `close`, `pe` + FROM + us_day_data + WHERE + symbol = :symbol AND + `timestamp` >= :start_date AND + pe > 0 AND + pe IS NOT NULL + ORDER BY + `timestamp` ASC + """) + + with self.engine.connect() as conn: + df = pd.read_sql(query, conn, params={"symbol": stock_code, "start_date": start_date}) + + if df.empty: + logger.warning(f"未找到美股 {stock_code} 的历史数据") + return df + + # 转换时间戳列 + df['timestamp'] = pd.to_datetime(df['timestamp']) + + # 确保数值列是正确类型 + df['close'] = pd.to_numeric(df['close'], errors='coerce') + df['pe'] = pd.to_numeric(df['pe'], errors='coerce') + + # 过滤异常值 + df = self._filter_extreme_values(df, 'pe') + + logger.info(f"获取美股 {stock_code} 历史数据成功,共 {len(df)} 条记录") + return df + + except Exception as e: + logger.error(f"获取美股历史数据失败: {e}") + return pd.DataFrame() + + def _filter_extreme_values(self, data: pd.DataFrame, metric: str) -> pd.DataFrame: + """ + 过滤极端值 + + Args: + data: 数据DataFrame + metric: 指标名称 + + Returns: + 过滤后的DataFrame + """ + if data.empty or metric not in data.columns: + return data + + # 过滤负值和异常大的值 + if metric == 'pe': + # PE通常在0-100之间,过滤掉大于1000的异常值 + data = data[(data[metric] > 0) & (data[metric] <= 1000)] + elif metric == 'pb': + # PB通常在0-50之间,过滤掉大于100的异常值 + data = data[(data[metric] > 0) & (data[metric] <= 100)] + + # 创建过滤后的列 + data[f'{metric}_filtered'] = data[metric] + + return data + + def calculate_us_percentiles(self, data: pd.DataFrame, metric: str = 'pe') -> Dict: + """ + 计算美股估值指标的分位数 + + Args: + data: 历史数据DataFrame + metric: 估值指标,只支持pe + + Returns: + 包含分位数信息的字典 + """ + if data.empty: + logger.warning(f"数据为空,无法计算{metric}分位数") + return {} + + # 美股只支持PE分析 + if metric != 'pe': + logger.warning(f"美股估值分析只支持PE指标,不支持{metric}") + return {} + + # 使用过滤后的数据计算分位数 + metric_filtered = f'{metric}_filtered' + if metric_filtered not in data.columns: + metric_filtered = metric + + # 计算各种分位数 + percentiles = { + 'min': data[metric_filtered].min(), + 'max': data[metric_filtered].max(), + 'mean': data[metric_filtered].mean(), + 'median': data[metric_filtered].median(), + 'q1': data[metric_filtered].quantile(0.25), + 'q3': data[metric_filtered].quantile(0.75), + 'p10': data[metric_filtered].quantile(0.1), + 'p90': data[metric_filtered].quantile(0.9), + 'p5': data[metric_filtered].quantile(0.05), + 'p95': data[metric_filtered].quantile(0.95) + } + + # 获取当前值(最新值) + if not data.empty: + percentiles['current'] = data[metric_filtered].iloc[-1] + else: + percentiles['current'] = None + + # 计算当前值在历史分位数中的位置 + if percentiles['current'] is not None: + current_value = percentiles['current'] + sorted_values = sorted(data[metric_filtered].dropna()) + if sorted_values: + percentile_rank = (np.searchsorted(sorted_values, current_value) / len(sorted_values)) * 100 + percentiles['current_percentile'] = percentile_rank + else: + percentiles['current_percentile'] = None + else: + percentiles['current_percentile'] = None + + logger.info(f"美股{metric}分位数计算完成") + return percentiles + + def get_us_industry_avg_data(self, industry_name: str, start_date: str, metric: str = 'pe') -> Optional[pd.DataFrame]: + """ + 获取美股行业平均数据(暂不支持,返回None) + + Args: + industry_name: 行业名称 + start_date: 开始日期 + metric: 估值指标 + + Returns: + None (美股暂不支持行业分析) + """ + logger.info("美股暂不支持行业平均数据分析") + return None + + def get_us_concept_avg_data(self, concept_name: str, start_date: str, metric: str = 'pe') -> Optional[pd.DataFrame]: + """ + 获取美股概念板块平均数据(暂不支持,返回None) + + Args: + concept_name: 概念板块名称 + start_date: 开始日期 + metric: 估值指标 + + Returns: + None (美股暂不支持概念板块分析) + """ + logger.info("美股暂不支持概念板块平均数据分析") + return None + + def create_us_valuation_chart(self, stock_code: str, start_date: str = '2018-01-01', + metric: str = 'pe', save_path: Optional[str] = None) -> str: + """ + 创建美股估值分析图表 + + Args: + stock_code: 美股代码 + start_date: 开始日期 + metric: 估值指标(只支持pe) + save_path: 保存路径 + + Returns: + 图表文件路径 + """ + try: + # 获取历史数据 + data = self.get_us_historical_data(stock_code, start_date) + if data.empty: + raise ValueError(f"未找到美股 {stock_code} 的历史数据") + + # 计算分位数 + percentiles = self.calculate_us_percentiles(data, metric) + if not percentiles: + raise ValueError(f"无法计算美股 {stock_code} 的{metric}分位数") + + # 获取股票名称 + stock_name = self.get_us_stock_name(stock_code) + + # 创建图表 + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 12)) + + # 上图:股价走势 + ax1.plot(data['timestamp'], data['close'], linewidth=2, color='blue', label='股价') + ax1.set_title(f'{stock_name} ({stock_code}) 股价走势', fontsize=16, fontweight='bold') + ax1.set_ylabel('股价 (USD)', fontsize=12) + ax1.legend() + ax1.grid(True, alpha=0.3) + + # 下图:PE估值分析 + metric_filtered = f'{metric}_filtered' if f'{metric}_filtered' in data.columns else metric + + # 绘制PE曲线 + ax2.plot(data['timestamp'], data[metric_filtered], linewidth=2, color='red', label=f'{metric.upper()}估值') + + # 绘制分位数线 + ax2.axhline(y=percentiles['p95'], color='purple', linestyle='--', alpha=0.7, label='95%分位线') + ax2.axhline(y=percentiles['q3'], color='orange', linestyle='--', alpha=0.7, label='75%分位线') + ax2.axhline(y=percentiles['median'], color='green', linestyle='-', alpha=0.7, label='中位数') + ax2.axhline(y=percentiles['q1'], color='orange', linestyle='--', alpha=0.7, label='25%分位线') + ax2.axhline(y=percentiles['p5'], color='purple', linestyle='--', alpha=0.7, label='5%分位线') + + # 标记当前值 + if percentiles['current'] is not None: + current_date = data['timestamp'].iloc[-1] + ax2.scatter([current_date], [percentiles['current']], color='red', s=100, zorder=5) + ax2.annotate(f'当前{metric.upper()}: {percentiles["current"]:.2f}', + xy=(current_date, percentiles['current']), + xytext=(10, 10), textcoords='offset points', + bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.7), + arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0')) + + ax2.set_title(f'{stock_name} ({stock_code}) {metric.upper()}估值分析', fontsize=16, fontweight='bold') + ax2.set_xlabel('日期', fontsize=12) + ax2.set_ylabel(f'{metric.upper()}', fontsize=12) + ax2.legend() + ax2.grid(True, alpha=0.3) + + # 格式化x轴日期 + for ax in [ax1, ax2]: + ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m')) + ax.xaxis.set_major_locator(mdates.MonthLocator(interval=6)) + plt.setp(ax.xaxis.get_majorticklabels(), rotation=45) + + plt.tight_layout() + + # 保存图表 + if save_path is None: + save_path = os.path.join(OUTPUT_DIR, f'{stock_code}_{metric}_analysis_{datetime.datetime.now().strftime("%Y%m%d")}.png') + + plt.savefig(save_path, dpi=300, bbox_inches='tight') + plt.close() + + logger.info(f"美股估值分析图表保存成功: {save_path}") + return save_path + + except Exception as e: + logger.error(f"创建美股估值分析图表失败: {e}") + raise + + +# 创建全局实例 +us_valuation_analyzer = USValuationAnalyzer() diff --git a/src/scripts/us_stock_code_collector.py b/src/scripts/us_stock_code_collector.py deleted file mode 100644 index 5a0dbdb..0000000 --- a/src/scripts/us_stock_code_collector.py +++ /dev/null @@ -1,100 +0,0 @@ -# coding:utf-8 -# 更新港股列表的代码 - -import requests -import pandas as pd -from sqlalchemy import create_engine, text -import sys -import os - -# 将项目根目录添加到Python路径,以便导入config -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from src.scripts.config import XUEQIU_HEADERS - -def collect_us_stock_codes(db_url): - """ - 采集雪球港股列表数据,并存储到数据库。 - """ - engine = create_engine(db_url) - headers = XUEQIU_HEADERS - base_url = "https://stock.xueqiu.com/v5/stock/screener/quote/list.json" - page = 1 - page_size = 90 - all_data = [] - - print("--- Starting to collect Hong Kong stock codes ---") - - # 采集前先清空表 - try: - with engine.begin() as conn: - conn.execute(text("TRUNCATE TABLE gp_code_us")) - print("Table `gp_code_us` has been truncated.") - except Exception as e: - print(f"Error truncating table `gp_code_us`: {e}") - return - - while True: - params = { - 'page': page, - 'size': page_size, - 'order': 'desc', - 'order_by': 'market_capital', - 'market': 'US', - 'type': 'us', - 'is_delay': 'true' - } - - print(f"Fetching page {page}...") - try: - response = requests.get(base_url, headers=headers, params=params, timeout=20) - if response.status_code != 200: - print(f"Request failed with status code {response.status_code}") - break - - data = response.json() - if data.get('error_code') != 0: - print(f"API error: {data.get('error_description')}") - break - - stock_list = data.get('data', {}).get('list', []) - if not stock_list: - print("No more data found. Collection finished.") - break - - all_data.extend(stock_list) - - # 如果获取到的数据少于每页数量,说明是最后一页 - if len(stock_list) < page_size: - print("Reached the last page. Collection finished.") - break - - page += 1 - - except requests.exceptions.RequestException as e: - print(f"Request exception on page {page}: {e}") - break - - if all_data: - print(f"--- Collected a total of {len(all_data)} stocks. Preparing to save to database. ---") - df = pd.DataFrame(all_data) - - # 数据映射和转换 - df_to_save = pd.DataFrame() - df_to_save['gp_name'] = df['name'] - df_to_save['gp_code'] = df['symbol'] - df_to_save['gp_code_two'] = 'US.' + df['symbol'].astype(str) - df_to_save['market_cap'] = df['market_capital'] - - try: - df_to_save.to_sql('gp_code_us', engine, if_exists='append', index=False) - print("--- Successfully saved all data to `gp_code_us`. ---") - except Exception as e: - print(f"Error saving data to database: {e}") - else: - print("--- No data collected. ---") - - engine.dispose() - -if __name__ == "__main__": - db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj' - collect_us_stock_codes(db_url) \ No newline at end of file