diff --git a/src/app.py b/src/app.py index c84598a..895f387 100644 --- a/src/app.py +++ b/src/app.py @@ -46,6 +46,7 @@ from src.scripts.stock_daily_data_collector import collect_stock_daily_data from valuation_analysis.financial_analysis import FinancialAnalyzer from src.valuation_analysis.stock_price_collector import StockPriceCollector from src.quantitative_analysis.batch_stock_price_collector import fetch_and_store_stock_data, get_stock_realtime_info_from_redis +from src.quantitative_analysis.momentum_analysis import MomentumAnalyzer # 设置日志 logging.basicConfig( @@ -1960,6 +1961,7 @@ def industry_analysis(): "type": "line", "data": valuation_data['avg_values'], "markLine": { + "symbol": "none", "data": [ {"name": "历史最小值", "yAxis": percentiles['min'], "lineStyle": {"color": "#28a745", "type": "dashed"}}, {"name": "历史最大值", "yAxis": percentiles['max'], "lineStyle": {"color": "#dc3545", "type": "dashed"}}, @@ -2323,6 +2325,20 @@ def get_stock_price_range(): "message": "缺少必要参数: stock_code" }), 400 + # 兼容处理股票代码 (SZ002009, 002009.SZ, 002009) + stock_code = stock_code.strip().upper() + if '.' in stock_code: # 处理 002009.SZ 格式 + parts = stock_code.split('.') + if len(parts) == 2: + stock_code = f"{parts[1]}{parts[0]}" + elif stock_code.isdigit(): # 处理 002009 格式 + if stock_code.startswith(('60', '68')): + stock_code = f"SH{stock_code}" + elif stock_code.startswith(('00', '30', '20')): + stock_code = f"SZ{stock_code}" + elif stock_code.startswith(('8', '43', '87')): + stock_code = f"BJ{stock_code}" + # 计算一年前的日期作为默认起始日期 default_start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d') start_date = request.args.get('start_date', default_start_date) @@ -2923,6 +2939,59 @@ def filter_industry_crowding(): 'message': str(e) }), 500 +@app.route('/api/quantitative/momentum_by_plate', methods=['GET']) +def get_momentum_by_plate(): + """ + 根据行业或概念板块名称,批量获取所有成分股的动量数据 + --- + parameters: + - name: name + in: query + type: string + required: true + description: 行业或概念的名称. + - name: type + in: query + type: string + required: false + description: 板块类型, 'industry' (默认) 或 'concept'. + responses: + 200: + description: A list of momentum indicators for stocks in the plate. + schema: + type: object + properties: + success: + type: boolean + data: + type: array + items: + type: object + """ + try: + plate_name = request.args.get('name') + plate_type = request.args.get('type', 'industry') + + if not plate_name: + return jsonify({'success': False, 'message': '必须提供板块名称(name参数)'}), 400 + + is_concept = plate_type.lower() == 'concept' + + analyzer = MomentumAnalyzer() + result = analyzer.analyze_momentum_by_name(plate_name, is_concept=is_concept) + + if result.get('success'): + return jsonify(result) + else: + return jsonify(result), 404 + + except Exception as e: + logger.error(f"批量获取板块动量数据接口异常: {str(e)}") + return jsonify({ + 'success': False, + 'message': str(e) + }), 500 + @app.route('/scheduler/batch_stock_price/collection', methods=['GET']) def run_batch_stock_price_collection(): """批量采集A股行情并保存到数据库""" diff --git a/src/quantitative_analysis/batch_stock_price_collector.py b/src/quantitative_analysis/batch_stock_price_collector.py index f3e025d..df270c1 100644 --- a/src/quantitative_analysis/batch_stock_price_collector.py +++ b/src/quantitative_analysis/batch_stock_price_collector.py @@ -57,7 +57,7 @@ def fetch_and_store_stock_data(page_size=90): 'page': 1, 'size': page_size, 'order': 'desc', - 'order_by': 'percent', + 'order_by': 'dividend_yield', 'market': 'CN', 'type': stock_type } diff --git a/src/quantitative_analysis/momentum_analysis.py b/src/quantitative_analysis/momentum_analysis.py new file mode 100644 index 0000000..c79a8b6 --- /dev/null +++ b/src/quantitative_analysis/momentum_analysis.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +import sys +import os +import requests +import logging +from typing import Dict, List, Optional + +# 添加项目根目录到 Python 路径 +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +try: + from src.valuation_analysis.industry_analysis import IndustryAnalyzer +except ImportError: + # 兼容在不同环境下执行 + from valuation_analysis.industry_analysis import IndustryAnalyzer + +# 设置日志 +logger = logging.getLogger("momentum_analysis") + +class MomentumAnalyzer: + """动量分析器类""" + + def __init__(self): + """初始化动量分析器""" + self.industry_analyzer = IndustryAnalyzer() + self.momentum_api_url = "http://192.168.18.42:5000/api/dify/getStockMomentumIndex" + logger.info("动量分析器初始化完成") + + def get_stocks_by_name(self, name: str, is_concept: bool = False) -> List[str]: + """ + 根据行业或概念名称获取股票列表。 + 返回的股票代码格式为 '600036.SH' + """ + if is_concept: + # 调用获取概念成份股的方法 + raw_codes = self.industry_analyzer.get_concept_stocks(name) + else: + # 调用获取行业成份股的方法 + raw_codes = self.industry_analyzer.get_industry_stocks(name) + + # 统一将 'SH600036' 格式转换为 '600036.SH' 格式 + stock_list = [] + if raw_codes: + for code in raw_codes: + if not isinstance(code, str): continue + + if code.startswith('SH'): + stock_list.append(f"{code[2:]}.SH") + elif code.startswith('SZ'): + stock_list.append(f"{code[2:]}.SZ") + elif code.startswith('BJ'): + stock_list.append(f"{code[2:]}.BJ") + + return stock_list + + def get_momentum_indicators(self, stock_code: str, industry_codes: List[str]) -> Optional[Dict]: + """ + 获取单个股票的动量指标数据。 + + Args: + stock_code: 目标股票代码 (e.g., '600036.SH') + industry_codes: 相关的股票代码列表 (e.g., ['600036.SH', '600000.SH']) + + Returns: + 动量指标数据字典或None + """ + try: + payload = { + # 接口需要无后缀的代码列表 + "code_list": industry_codes, + "target_code": stock_code + } + + response = requests.post(self.momentum_api_url, json=payload, timeout=500) + + if response.status_code != 200: + logger.error(f"获取动量指标失败({stock_code}): HTTP {response.status_code}, {response.text}") + return None + + data = response.json() + # 为返回结果补充股票代码和名称 + data['stock_code'] = stock_code + return data + + except requests.exceptions.Timeout: + logger.error(f"获取动量指标超时({stock_code})") + return None + except Exception as e: + logger.error(f"获取动量指标异常({stock_code}): {str(e)}") + return None + + def analyze_momentum_by_name(self, name: str, is_concept: bool = False) -> Dict: + """ + 根据行业或概念名称,批量获取其中所有股票的动量指标。 + + Args: + name: 行业或概念的名称 + is_concept: 是否为概念板块 + + Returns: + 包含所有股票动量数据的字典 + """ + # 1. 获取板块内所有股票代码 + stock_list = self.get_stocks_by_name(name, is_concept) + if not stock_list: + return {'success': False, 'message': f'未找到板块 "{name}" 中的股票'} + + all_results = [] + + # 2. 依次请求所有股票的动量数据 + for stock_code in stock_list: + try: + temp_list = [stock_code] + result = self.get_momentum_indicators(stock_code, temp_list) + if result: + all_results.append(result) + except Exception as exc: + logger.error(f"处理股票 {stock_code} 的动量分析时产生异常: {exc}") + + return { + 'success': True, + 'plate_name': name, + 'is_concept': is_concept, + 'stock_count': len(stock_list), + 'results_count': len(all_results), + 'data': all_results + } + +if __name__ == '__main__': + # 示例用法 + analyzer = MomentumAnalyzer() + + # 1. 测试行业 + industry_name = "证券" + industry_results = analyzer.analyze_momentum_by_name(industry_name, is_concept=False) + print(f"\n行业 '{industry_name}' 动量分析结果 (前5条):") + if industry_results['success']: + # 打印部分结果 + for item in industry_results['data'][:5]: + print(item) + else: + print(industry_results['message']) + + # 2. 测试概念 + concept_name = "芯片" + concept_results = analyzer.analyze_momentum_by_name(concept_name, is_concept=True) + print(f"\n概念 '{concept_name}' 动量分析结果 (前5条):") + if concept_results['success']: + # 打印部分结果 + for item in concept_results['data'][:5]: + print(item) + else: + print(concept_results['message']) \ No newline at end of file diff --git a/src/scripts/config.py b/src/scripts/config.py index 0388266..72bf64b 100644 --- a/src/scripts/config.py +++ b/src/scripts/config.py @@ -11,7 +11,7 @@ XUEQIU_HEADERS = { 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Client-Version': 'v2.44.75', - 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; snbim_minify=true; xq_a_token=ef79e6da376751a4bf6c1538103e9894d44473e1; xqat=ef79e6da376751a4bf6c1538103e9894d44473e1; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzUxNTE1MDgxLCJjdG0iOjE3NDg5MjMwODE2NDQsImNpZCI6ImQ5ZDBuNEFadXAifQ.gQrIt4VI73JLUFGVSTKpXidhFIMwlusBKyrzYwClwCBszXCooQY3WnFqlbXqSX3SwnMapuveOFUM5sGIOoZ8oDF8cZYs3HDz5vezR-2nes9gfZr2nZcUfZzNRJ299wlX3Zis5NbnzNlfnisUhv9GUfEZjQ_Rs37B4qRbQZVC2kdN1Z0xB8j1MplSTOsYj4IliQntuaTo-8SBh-4zz5244dnF85xREBVxtFzzCtHUhn9B-mzxE81_42nwrDscvow-4_jtlJXlqbehiAFxld-dCWDXwmCju9lRWu_WzdoQe19n-c6jhCZZ1pU1JGsYyhIAsd1gV064jQ6FxfN38so1Eg; xq_r_token=30a80318ebcabffbe194e7deecb108b665e8c894; Hm_lvt_1db88642e346389874251b5a1eded6e3=1749028611; is_overseas=0; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1750034926; ssxmod_itna=eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q07hqDyliiYwirG4tshiQBKD/KlYeDZDGFdDqx0Ei6FiBFKCezjCGbKBACQ5xC3o0aOyndbV3Ab3t8NXiK3y4xB3DExGkR0iYeK4DxrPD5xDTDWeDGDD3WxGaDmeDeho+D0bmHUOvrU7oD7eDXxGCDQFor4GWDiPD7Po45iim=KxD0xD1ESkEDDPDaroxDG5NlQ9weDi3rfgsLFbLDdwIqQimD753DlcqwXLXmktxGfoyzd=bdI8fDCKDjxdIx93QW33vimn4xqiDb37Yn4qe2qW+QKGxAiUmrKiirFBDFAQQDT7GN0xq3DPQGQBwOb7Y3FrjAPdmrRiYIvZRHm9HV4hY3Ybz777DbRGxi4T/48Bxt3GhUhtBhNfqtRDWq2qADPfYYsihWYPeD; ssxmod_itna2=eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q07hqDyliiYwirG4tshiQYeDA4pg2w+RYD7POqKepgqDlcb9wAqieP5+6V+KK5w6Q4dMdqCLomgLyxQn93CdLjc3pXEYq0/0h+jCdiOWudEmrIjmRf1+lLX0OGj02GXiiblBod5++dyGbWSnufTL+nxBWIQimWCI3ueZSne50WYT6afRSyCo79FGa6WEk2j30a5d9LFRZFb==8bO73cfarqe=kkkK09RmTUISi6qQwqZfChNd3Ktj6E3tj9GjXLWwV59vpUqOnFXIp9/rujWHt7v3KhIHMUrH70=mn1em1A7ujba3Y4jwqKyWRDR4q7/rCDFoyF7AiK4rNz018Ix0rYfYx+OYm2=nxNlxPGTKYStcOPuEDD', + 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; Hm_lvt_1db88642e346389874251b5a1eded6e3=1749028611; acw_tc=0a27aa3317504105803118918e00823643107980bbedc1c9307d37d1cf7fb7; xq_a_token=5b11f7f5a3986a802def7dea48868a3b2849e719; xqat=5b11f7f5a3986a802def7dea48868a3b2849e719; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzUzMDAzNjExLCJjdG0iOjE3NTA0MTE2MTEyODIsImNpZCI6ImQ5ZDBuNEFadXAifQ.FB12KEYSdWo5g3UqQbnfqR-Gopar8JkuDf54eSf86FzmuGG9XugW7osl3idav9oTgLzgWBut4X6a5-gbqn61wPPV7OV3dMO8oNyBZUxMjisaMBW_-IcUuQ1z-gtXBcHleNamANA-2H3Xf5mZNdVXAW_E0rQZE_y0TEqzeiLxfU5B_RJOTR1Zq_-BQaaOn_Tk0or_hu-nOZR-26lBtcBl1VoTR2Ov1tm_CRN375ohMcZniA265X8umpL_tysQ4m7oazNyezopJE6W7jt-djNGJXZAbLoVXF1U2ULKV325dPWHvPcSZOevxGprItb665QNZvXEzhBB-4fuzhAnYBsqGw; xq_r_token=2ba0614b400ec779704c3adaa7f17c2c2c88143b; is_overseas=0; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1750411602; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=Jg9N/8vN3mjfEOHOPlAxHQ+1x+X4nN7jc9vkKRkIGulMwceWqptDd3OUgWPM6XqKNq/15EvM032gWoeeYMHgRg%3D%3D; ssxmod_itna=eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0OHhqDyliGQQmhGtKq0aCDD/KlYeDZDGFdDqx0Ei6FiDHICezjQgDKgACjktpeBflQR5RYGlcNpp=0IDpnOAGdeGLDY=DCTKK420iDYYfDBYD74G+DDeDih3Dj4GmDGY=aeDFIQutVCRKdxDwDB=DmqG23ObDm4DfDDLorBD4Il2YDDtDAkaGNPDADA3doDDlYD84Kdb4DYpogQ0FdgahphusIeDMixGXzAlzx9CnoiWtV/vfrf2aHPGuDG=OcC0Hh2bmRT3f8hGxYDo5Qe8hx+Bx3rKq0DW7HRYqYYeYAh+2DR0DQhxRDxgGYgEw/rdPrd5kh6WdYYrcqsMkbZMshie5QhNiNQDoOBtQgdeAde6D/r5l05Dr=grAWG4HmmNBiQm44D; ssxmod_itna2=eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0OHhqDyliGQQmhGtKq0aeDWhYebouIdHFW5NsDoenRT6eeD', 'Referer': 'https://weibo.com/u/7735765253', 'Sec-Ch-Ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"', 'Sec-Ch-Ua-Mobile': '?0', diff --git a/src/scripts/hk_stock_code_collector.py b/src/scripts/hk_stock_code_collector.py new file mode 100644 index 0000000..4ccf6ee --- /dev/null +++ b/src/scripts/hk_stock_code_collector.py @@ -0,0 +1,100 @@ +# coding:utf-8 +# 更新港股列表的代码 + +import requests +import pandas as pd +from sqlalchemy import create_engine, text +import sys +import os + +# 将项目根目录添加到Python路径,以便导入config +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from src.scripts.config import XUEQIU_HEADERS + +def collect_hk_stock_codes(db_url): + """ + 采集雪球港股列表数据,并存储到数据库。 + """ + engine = create_engine(db_url) + headers = XUEQIU_HEADERS + base_url = "https://stock.xueqiu.com/v5/stock/screener/quote/list.json" + page = 1 + page_size = 90 + all_data = [] + + print("--- Starting to collect Hong Kong stock codes ---") + + # 采集前先清空表 + try: + with engine.begin() as conn: + conn.execute(text("TRUNCATE TABLE gp_code_hk")) + print("Table `gp_code_hk` has been truncated.") + except Exception as e: + print(f"Error truncating table `gp_code_hk`: {e}") + return + + while True: + params = { + 'page': page, + 'size': page_size, + 'order': 'desc', + 'order_by': 'market_capital', + 'market': 'HK', + 'type': 'hk', + 'is_delay': 'true' + } + + print(f"Fetching page {page}...") + try: + response = requests.get(base_url, headers=headers, params=params, timeout=20) + if response.status_code != 200: + print(f"Request failed with status code {response.status_code}") + break + + data = response.json() + if data.get('error_code') != 0: + print(f"API error: {data.get('error_description')}") + break + + stock_list = data.get('data', {}).get('list', []) + if not stock_list: + print("No more data found. Collection finished.") + break + + all_data.extend(stock_list) + + # 如果获取到的数据少于每页数量,说明是最后一页 + if len(stock_list) < page_size: + print("Reached the last page. Collection finished.") + break + + page += 1 + + except requests.exceptions.RequestException as e: + print(f"Request exception on page {page}: {e}") + break + + if all_data: + print(f"--- Collected a total of {len(all_data)} stocks. Preparing to save to database. ---") + df = pd.DataFrame(all_data) + + # 数据映射和转换 + df_to_save = pd.DataFrame() + df_to_save['gp_name'] = df['name'] + df_to_save['gp_code'] = df['symbol'] + df_to_save['gp_code_two'] = 'HK' + df['symbol'].astype(str) + df_to_save['market_cap'] = df['market_capital'] + + try: + df_to_save.to_sql('gp_code_hk', engine, if_exists='append', index=False) + print("--- Successfully saved all data to `gp_code_hk`. ---") + except Exception as e: + print(f"Error saving data to database: {e}") + else: + print("--- No data collected. ---") + + engine.dispose() + +if __name__ == "__main__": + db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj' + collect_hk_stock_codes(db_url) \ No newline at end of file diff --git a/src/scripts/stock_daily_data_collector.py b/src/scripts/stock_daily_data_collector.py index 8d6e324..a634e6a 100644 --- a/src/scripts/stock_daily_data_collector.py +++ b/src/scripts/stock_daily_data_collector.py @@ -3,7 +3,7 @@ import requests import pandas as pd from sqlalchemy import create_engine, text -from datetime import datetime +from datetime import datetime, timedelta from tqdm import tqdm from src.scripts.config import XUEQIU_HEADERS import gc @@ -30,16 +30,22 @@ class StockDailyDataCollector: query_zs = "SELECT gp_code FROM gp_code_zs" df_zs = pd.read_sql(query_zs, self.engine) codes_zs = df_zs['gp_code'].tolist() + + # 从gp_code_hk获取股票代码 + query_hk = "SELECT gp_code FROM gp_code_hk" + df_hk = pd.read_sql(query_hk, self.engine) + codes_hk = df_hk['gp_code'].tolist() # 合并去重 - all_codes = list(set(codes_all + codes_zs)) - print(f"获取到股票代码: {len(codes_all)}个来自gp_code_all, {len(codes_zs)}个来自gp_code_zs, 去重后共{len(all_codes)}个") + all_codes = list(set(codes_all + codes_zs + codes_hk)) + print(f"获取到股票代码: {len(codes_all)}个来自gp_code_all, {len(codes_zs)}个来自gp_code_zs, {len(codes_hk)}个来自gp_code_hk, 去重后共{len(all_codes)}个") return all_codes - def fetch_daily_stock_data(self, symbol, begin): - url = f"https://stock.xueqiu.com/v5/stock/chart/kline.json?symbol={symbol}&begin={begin}&period=day&type=before&count=-1&indicator=kline,pe,pb,ps,pcf,market_capital,agt,ggt,balance" + def fetch_daily_stock_data(self, symbol, begin, count=-1): + """获取日线数据,count=-1表示最新一天,-2表示最近两天,-1800表示最近1800天""" + url = f"https://stock.xueqiu.com/v5/stock/chart/kline.json?symbol={symbol}&begin={begin}&period=day&type=before&count={count}&indicator=kline,pe,pb,ps,pcf,market_capital,agt,ggt,balance" try: - response = requests.get(url, headers=self.headers, timeout=10) + response = requests.get(url, headers=self.headers, timeout=20) return response.json() except Exception as e: print(f"Request error for {symbol}: {e}") @@ -110,10 +116,155 @@ class StockDailyDataCollector: self.engine.dispose() print(f"Daily data fetching and saving completed for {date_str}.") + def delete_stock_history(self, symbol): + """删除指定股票的全部历史数据""" + delete_query = text("DELETE FROM gp_day_data WHERE symbol = :symbol") + try: + with self.engine.begin() as conn: + conn.execute(delete_query, {"symbol": symbol}) + print(f"Deleted history for {symbol}") + return True + except Exception as e: + print(f"Error deleting history for {symbol}: {e}") + return False + + def refetch_and_save_history(self, symbol, days=1800): + """重新获取并保存指定股票的长期历史数据""" + print(f"Refetching last {days} days for {symbol}...") + begin = int(datetime.now().timestamp() * 1000) + data = self.fetch_daily_stock_data(symbol, begin, count=-days) + if data.get('error_code') == 0: + df = self.transform_data(data, symbol) + if df is not None and not df.empty: + self.save_batch_to_database([df]) + print(f"Successfully refetched and saved history for {symbol}.") + else: + print(f"No data transformed for {symbol} after refetch.") + else: + print(f"Error refetching history for {symbol}: {data.get('error_description')}") + + def check_and_fix_ex_rights_data(self): + """ + 检查所有股票是否发生除权,如果发生,则删除历史数据并重新获取。 + 新逻辑:直接用API返回的上个交易日的时间戳去数据库查询,更稳妥。 + 记录:除权日期、股票代码()、 + """ + all_codes = self.fetch_all_stock_codes() + ex_rights_log_data = [] + + print("--- Step 1: Checking for ex-rights stocks ---") + for symbol in tqdm(all_codes, desc="Comparing prices"): + # 1. 从API获取最近两天的日线数据 + begin = int(datetime.now().timestamp() * 1000) + data = self.fetch_daily_stock_data(symbol, begin, count=-2) + + api_timestamp_str = None + api_close = None + + if data.get('error_code') == 0 and data.get('data', {}).get('item') and len(data['data']['item']) >= 2: + try: + # API返回的数据是按时间升序的,[-2]是上个交易日 + prev_day_data = data['data']['item'][-2] + columns = data['data']['column'] + + timestamp_index = columns.index('timestamp') + close_index = columns.index('close') + + api_timestamp_ms = prev_day_data[timestamp_index] + api_close = prev_day_data[close_index] + + # 将毫秒时间戳转换为'YYYY-MM-DD'格式,用于数据库查询 + api_timestamp_str = pd.to_datetime(api_timestamp_ms, unit='ms', utc=True).tz_convert('Asia/Shanghai').strftime('%Y-%m-%d') + except (ValueError, IndexError, TypeError) as e: + print(f"\nError parsing API data for {symbol}: {e}") + continue # 处理下一只股票 + else: + # 获取API数据失败或数据不足,跳过此股票 + continue + + # 如果未能从API解析出上个交易日的数据,则跳过 + if api_timestamp_str is None or api_close is None: + continue + + # 2. 根据API返回的时间戳,从数据库查询当天的收盘价 + db_close = None + query = text("SELECT `close` FROM gp_day_data WHERE symbol = :symbol AND `timestamp` LIKE :date_str") + try: + with self.engine.connect() as conn: + result = conn.execute(query, {"symbol": symbol, "date_str": f"{api_timestamp_str}%"}).fetchone() + db_close = result[0] if result else None + except Exception as e: + print(f"\nError getting DB close for {symbol} on {api_timestamp_str}: {e}") + continue + + # 3. 比较价格 + if db_close is not None: + # 注意:数据库中取出的db_close可能是Decimal类型,需要转换 + if not abs(float(db_close) - api_close) < 0.001: + print(f"\nEx-rights detected for {symbol} on {api_timestamp_str}: DB_close={db_close}, API_close={api_close}") + ex_rights_log_data.append({ + 'symbol': symbol, + 'date': datetime.now().strftime('%Y-%m-%d'), + 'db_price': float(db_close), + 'api_price': api_close, + 'log_time': datetime.now() + }) + # 如果数据库当天没有数据,我们无法比较,所以不处理。 + # 这可能是新股或之前采集失败,不属于除权范畴。 + + # 4. 对发生除权的股票进行记录和修复 + if not ex_rights_log_data: + print("\n--- No ex-rights stocks found. Data is consistent. ---") + self.engine.dispose() + return + + # 在修复前,先将日志保存到数据库 + self.save_ex_rights_log(ex_rights_log_data) + + # 从日志数据中提取出需要修复的股票代码列表 + ex_rights_stocks = [item['symbol'] for item in ex_rights_log_data] + + print(f"\n--- Step 2: Found {len(ex_rights_stocks)} stocks to fix: {ex_rights_stocks} ---") + for symbol in tqdm(ex_rights_stocks, desc="Fixing data"): + if self.delete_stock_history(symbol): + self.refetch_and_save_history(symbol, days=1800) + + self.engine.dispose() + print("\n--- Ex-rights data fixing process completed. ---") + + def save_ex_rights_log(self, log_data: list): + """将除权日志保存到数据库""" + if not log_data: + return + + print(f"--- Saving {len(log_data)} ex-rights events to log table... ---") + try: + df = pd.DataFrame(log_data) + # 确保列名与数据库字段匹配 + df = df.rename(columns={ + 'symbol': 'stock_code', + 'date': 'change_date', + 'db_price': 'before_price', + 'api_price': 'after_price', + 'log_time': 'update_time' + }) + df.to_sql('gp_ex_rights_log', self.engine, if_exists='append', index=False) + print("--- Ex-rights log saved successfully. ---") + except Exception as e: + print(f"!!! Error saving ex-rights log: {e}") + def collect_stock_daily_data(db_url, date=None): collector = StockDailyDataCollector(db_url) collector.fetch_data_for_date(date) + collector.check_and_fix_ex_rights_data() if __name__ == "__main__": db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj' - collect_stock_daily_data(db_url) + + # --- 使用方式 --- + # 1. 日常采集当天数据 + # collect_stock_daily_data(db_url) + + # 2. 手动执行除权检查和数据修复 + # collector = StockDailyDataCollector(db_url) + # collector.check_and_fix_ex_rights_data() diff --git a/src/test_imports.py b/src/test_imports.py deleted file mode 100644 index 8cb2954..0000000 --- a/src/test_imports.py +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/env python -""" -测试脚本: 验证导入路径是否正常工作 -""" - -import sys -import os -import logging - -# 设置日志 -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - -# 显示当前工作目录和Python路径 -logger.info(f"当前工作目录: {os.getcwd()}") -logger.info(f"Python路径: {sys.path}") - -def test_imports(): - """测试导入各个模块""" - success = True - - # 测试导入 chat_bot - try: - logger.info("尝试导入 chat_bot 模块...") - from fundamentals_llm.chat_bot import ChatBot - logger.info("成功导入 chat_bot 模块") - except ImportError as e: - logger.error(f"导入 chat_bot 模块失败: {str(e)}") - success = False - - # 尝试替代路径 - try: - from src.fundamentals_llm.chat_bot import ChatBot - logger.info("成功从替代路径导入 chat_bot 模块") - success = True - except ImportError as e2: - logger.error(f"从替代路径导入 chat_bot 模块失败: {str(e2)}") - - # 测试导入 chat_bot_with_offline - try: - logger.info("尝试导入 chat_bot_with_offline 模块...") - from fundamentals_llm.chat_bot_with_offline import ChatBot - logger.info("成功导入 chat_bot_with_offline 模块") - except ImportError as e: - logger.error(f"导入 chat_bot_with_offline 模块失败: {str(e)}") - success = False - - # 尝试替代路径 - try: - from src.fundamentals_llm.chat_bot_with_offline import ChatBot - logger.info("成功从替代路径导入 chat_bot_with_offline 模块") - success = True - except ImportError as e2: - logger.error(f"从替代路径导入 chat_bot_with_offline 模块失败: {str(e2)}") - - # 测试导入 fundamental_analysis - try: - logger.info("尝试导入 fundamental_analysis 模块...") - from fundamentals_llm.fundamental_analysis import FundamentalAnalyzer - logger.info("成功导入 fundamental_analysis 模块") - except ImportError as e: - logger.error(f"导入 fundamental_analysis 模块失败: {str(e)}") - success = False - - # 尝试替代路径 - try: - from src.fundamentals_llm.fundamental_analysis import FundamentalAnalyzer - logger.info("成功从替代路径导入 fundamental_analysis 模块") - success = True - except ImportError as e2: - logger.error(f"从替代路径导入 fundamental_analysis 模块失败: {str(e2)}") - - # 测试导入 pdf_generator - try: - logger.info("尝试导入 pdf_generator 模块...") - from fundamentals_llm.pdf_generator import generate_investment_report - logger.info("成功导入 pdf_generator 模块") - except ImportError as e: - logger.error(f"导入 pdf_generator 模块失败: {str(e)}") - success = False - - # 尝试替代路径 - try: - from src.fundamentals_llm.pdf_generator import generate_investment_report - logger.info("成功从替代路径导入 pdf_generator 模块") - success = True - except ImportError as e2: - logger.error(f"从替代路径导入 pdf_generator 模块失败: {str(e2)}") - - return success - -if __name__ == "__main__": - logger.info("开始测试导入...") - if test_imports(): - logger.info("所有导入测试通过") - sys.exit(0) - else: - logger.error("导入测试失败") - sys.exit(1) \ No newline at end of file