diff --git a/manage-instances.sh b/manage-instances.sh index 80e9208..59bb893 100644 --- a/manage-instances.sh +++ b/manage-instances.sh @@ -11,7 +11,7 @@ show_help() { echo " start [实例ID] 启动指定实例或所有实例" echo " stop [实例ID] 停止指定实例或所有实例" echo " restart [实例ID] 重启指定实例或所有实例" - echo " logs [实例ID] 查看指定实例的日志" + echo " logs [实例ID] 实时查看指定实例的日志 (Ctrl+C 退出)" echo " status 显示实例状态概览" echo " remove [实例ID] 删除指定实例或所有实例" echo " rebuild [数量] 重新构建镜像并部署指定数量的实例" @@ -68,8 +68,9 @@ restart_instance() { # 函数:查看实例日志 view_logs() { - echo "实例 $1 的日志:" - docker logs stock-app-$1 + echo "正在实时显示实例 $1 的日志 (按 Ctrl+C 退出):" + echo "----------------------------------------" + docker logs -f stock-app-$1 } # 函数:显示状态概览 diff --git a/src/app.py b/src/app.py index 7690daa..759ec8c 100644 --- a/src/app.py +++ b/src/app.py @@ -2897,21 +2897,64 @@ def get_pep_stock_info_by_shortname(): @app.route('/api/pep_stock_info_by_code', methods=['GET']) def get_pep_stock_info_by_code(): - """根据股票代码查询Redis中的实时行情并返回指定结构""" + """根据股票代码查询Redis中的实时行情并返回指定结构,支持A股和港股""" short_code = request.args.get('code') if not short_code: - return jsonify({'success': False, 'message': '缺少必要参数: short_code'}), 400 + return jsonify({'success': False, 'message': '缺少必要参数: code'}), 400 + try: - # 兼容600001.SH/SH600001等格式 - from src.quantitative_analysis.batch_stock_price_collector import get_stock_realtime_info_from_redis - result = get_stock_realtime_info_from_redis(short_code) - if result: - return jsonify(result) + # 判断股票类型并调用相应的查询函数 + if is_hk_stock_code(short_code): + # 港股代码 + from src.quantitative_analysis.hk_stock_price_collector import get_hk_stock_realtime_info_from_redis + result = get_hk_stock_realtime_info_from_redis(short_code) + if result: + return jsonify(result) + else: + return jsonify({'success': False, 'message': f'未找到港股 {short_code} 的实时行情'}), 404 else: - return jsonify({'success': False, 'message': f'未找到股票 {short_code} 的实时行情'}), 404 + # A股代码 + from src.quantitative_analysis.batch_stock_price_collector import get_stock_realtime_info_from_redis + result = get_stock_realtime_info_from_redis(short_code) + if result: + return jsonify(result) + else: + return jsonify({'success': False, 'message': f'未找到A股 {short_code} 的实时行情'}), 404 except Exception as e: return jsonify({'success': False, 'message': f'服务器错误: {str(e)}'}), 500 +def is_hk_stock_code(stock_code): + """ + 判断是否为港股代码 + 支持格式:00700, 00700.HK, HK00700, 0700.HK等 + """ + if not stock_code: + return False + + stock_code = stock_code.upper().strip() + + # 港股代码特征: + # 1. 包含.HK后缀 + if '.HK' in stock_code: + return True + + # 2. 以HK开头 + if stock_code.startswith('HK'): + return True + + # 3. 纯数字且长度为4-5位(港股代码通常是4-5位数字) + if stock_code.isdigit() and 4 <= len(stock_code) <= 5: + # 进一步判断:港股代码通常以0、1、2、3、6、8、9开头 + if stock_code[0] in ['0', '1', '2', '3', '6', '8', '9']: + return True + + # 4. 特殊港股代码(如腾讯00700、阿里巴巴09988等) + common_hk_codes = ['00700', '09988', '03690', '09888', '06618', '02318', '02020', '01810'] + if stock_code in common_hk_codes: + return True + + return False + @app.route('/api/industry/crowding/filter', methods=['GET']) def filter_industry_crowding(): """根据拥挤度百分位区间筛选行业和概念板块""" @@ -2991,7 +3034,7 @@ def get_momentum_by_plate(): def run_batch_stock_price_collection(): """批量采集A股行情并保存到数据库""" try: - fetch_and_store_stock_data() + fetch_and_store_stock_data(use_proxy=True) return jsonify({"status": "success", "message": "批量采集A股行情并保存到数据库成功"}) except Exception as e: logger.error(f"批量采集A股行情失败: {str(e)}") @@ -3001,7 +3044,7 @@ def run_batch_stock_price_collection(): def run_batch_hk_stock_price_collection(): """批量采集港股行情并保存到数据库""" try: - fetch_and_store_hk_stock_data() + fetch_and_store_hk_stock_data(use_proxy=True) return jsonify({"status": "success", "message": "批量采集A股行情并保存到数据库成功"}) except Exception as e: logger.error(f"批量采集A股行情失败: {str(e)}") diff --git a/src/quantitative_analysis/batch_stock_price_collector.py b/src/quantitative_analysis/batch_stock_price_collector.py index 2089c34..feac000 100644 --- a/src/quantitative_analysis/batch_stock_price_collector.py +++ b/src/quantitative_analysis/batch_stock_price_collector.py @@ -5,14 +5,14 @@ import sys import os import redis import json +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +import time # 添加项目根目录到路径,便于导入scripts.config project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) -# 导入代理管理器 -from src.scripts.ProxyIP import EnhancedProxyManager - # 读取雪球headers和Redis配置 try: from src.scripts.config import XUEQIU_HEADERS @@ -31,8 +31,13 @@ except ImportError: REDIS_KEY = 'xq_stock_changes_latest' # 存放行情的主键 -# 创建全局代理管理器实例 -proxy_manager = EnhancedProxyManager() +# 条件导入代理管理器 +proxy_manager = None +try: + from src.scripts.ProxyIP import EnhancedProxyManager + proxy_manager = EnhancedProxyManager() +except ImportError: + print("代理管理器导入失败,将使用直接请求模式") def get_redis_conn(): @@ -47,58 +52,130 @@ def get_redis_conn(): return redis.Redis(connection_pool=pool) -def fetch_and_store_stock_data(page_size=90): +def fetch_and_store_stock_data(page_size=90, max_workers=10, use_proxy=False): """ 批量采集雪球A股(上证、深证、科创板)股票的最新行情数据,并保存到Redis。 + 使用线程池并行请求,提高采集效率。 + :param page_size: 每页采集数量 + :param max_workers: 线程池最大工作线程数 + :param use_proxy: 是否使用代理(默认False) """ base_url = 'https://stock.xueqiu.com/v5/stock/screener/quote/list.json' types = ['sha', 'sza', 'kcb'] # 上证、深证、科创板 headers = XUEQIU_HEADERS all_data = [] + data_lock = threading.Lock() # 线程安全锁 - for stock_type in types: + def fetch_page_data(stock_type, page): + """获取单页数据的函数""" params = { - 'page': 1, + 'page': page, 'size': page_size, 'order': 'desc', 'order_by': 'dividend_yield', 'market': 'CN', 'type': stock_type } - - # 初次请求以获取总页数,使用代理 - response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) - # response = requests.get(base_url, headers=headers, params=params) - if response.status_code != 200: - print(f"请求 {stock_type} 数据失败,状态码:{response.status_code}") - continue - - data = response.json() - total_count = data['data']['count'] - total_pages = (total_count // page_size) + 1 - - for page in range(1, total_pages + 1): - params['page'] = page - # response = requests.get(base_url, headers=headers, params=params) - response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) + + try: + # 根据配置选择是否使用代理 + if use_proxy and proxy_manager: + response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) + else: + response = requests.get(base_url, headers=headers, params=params, timeout=10) + if response.status_code == 200: data = response.json() - all_data.extend(data['data']['list']) - print(f"成功采集第 {page}/{total_pages} 页数据") + page_data = data['data']['list'] + + # 线程安全地添加数据 + with data_lock: + all_data.extend(page_data) + + print(f"成功采集 {stock_type} 第 {page} 页数据,获取 {len(page_data)} 条记录") + return len(page_data) else: print(f"请求 {stock_type} 数据第 {page} 页失败,状态码:{response.status_code}") + return 0 + except Exception as e: + print(f"请求 {stock_type} 数据第 {page} 页异常:{e}") + return 0 + + # 使用线程池并行采集数据 + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + + for stock_type in types: + # 先获取总页数 + params = { + 'page': 1, + 'size': page_size, + 'order': 'desc', + 'order_by': 'dividend_yield', + 'market': 'CN', + 'type': stock_type + } + + try: + # 根据配置选择是否使用代理 + if use_proxy and proxy_manager: + response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) + else: + response = requests.get(base_url, headers=headers, params=params, timeout=10) + + if response.status_code != 200: + print(f"请求 {stock_type} 数据失败,状态码:{response.status_code}") + continue + + data = response.json() + total_count = data['data']['count'] + total_pages = (total_count // page_size) + 1 + + print(f"开始采集 {stock_type} 数据,共 {total_pages} 页,总计 {total_count} 条记录") + + # 提交所有页面的采集任务 + for page in range(1, total_pages + 1): + future = executor.submit(fetch_page_data, stock_type, page) + futures.append(future) + + except Exception as e: + print(f"获取 {stock_type} 总页数失败:{e}") + continue + + # 等待所有任务完成 + print(f"正在并行采集数据,使用 {max_workers} 个线程...") + start_time = time.time() + + completed_count = 0 + for future in as_completed(futures): + completed_count += 1 + try: + result = future.result() + if result > 0: + print(f"进度: {completed_count}/{len(futures)} 页完成") + except Exception as e: + print(f"采集任务异常:{e}") + + end_time = time.time() + print(f"数据采集完成,耗时: {end_time - start_time:.2f} 秒") + # 转换为 DataFrame df = pd.DataFrame(all_data) if not df.empty: df['fetch_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + # 存入Redis,使用hash结构,key为symbol,value为json字符串 r = get_redis_conn() pipe = r.pipeline() + # 先清空旧数据 r.delete(REDIS_KEY) + + print(f"正在将 {len(df)} 条记录写入Redis...") + for _, row in df.iterrows(): symbol = row.get('symbol') if not symbol: @@ -106,6 +183,7 @@ def fetch_and_store_stock_data(page_size=90): # 只保留必要字段,也可直接存row.to_dict() value = row.to_dict() pipe.hset(REDIS_KEY, symbol, json.dumps(value, ensure_ascii=False)) + pipe.execute() print(f"成功将数据写入Redis哈希 {REDIS_KEY},共{len(df)}条记录。") @@ -192,5 +270,35 @@ def get_stock_realtime_info_from_redis(stock_code): return result +def fetch_and_store_stock_data_optimized(page_size=90, max_workers=15, use_proxy=False): + """ + 优化版本的批量采集函数,支持更灵活的配置 + + :param page_size: 每页采集数量 + :param max_workers: 线程池最大工作线程数(建议10-20之间) + :param use_proxy: 是否使用代理(默认False) + """ + print(f"开始批量采集A股数据...") + print(f"配置: 每页 {page_size} 条记录,最大线程数 {max_workers}") + print(f"代理模式: {'启用' if use_proxy else '禁用'}") + print(f"预计采集: 上证、深证、科创板所有股票数据") + print("-" * 50) + + try: + result = fetch_and_store_stock_data(page_size, max_workers, use_proxy) + if not result.empty: + print(f"采集完成!共获取 {len(result)} 只股票的数据") + print(f"数据已保存到Redis键: {REDIS_KEY}") + else: + print("采集完成,但未获取到数据") + except Exception as e: + print(f"采集过程中发生错误: {e}") + return None + + return result + + if __name__ == '__main__': - fetch_and_store_stock_data() \ No newline at end of file + # 可以根据需要调整参数 + # fetch_and_store_stock_data_optimized(page_size=100, max_workers=15, use_proxy=True) + fetch_and_store_stock_data_optimized(use_proxy=False) # 默认不使用代理 \ No newline at end of file diff --git a/src/quantitative_analysis/hk_stock_price_collector.py b/src/quantitative_analysis/hk_stock_price_collector.py index c1cf63f..cd9b796 100644 --- a/src/quantitative_analysis/hk_stock_price_collector.py +++ b/src/quantitative_analysis/hk_stock_price_collector.py @@ -5,14 +5,14 @@ import sys import os import redis import json +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +import time # 添加项目根目录到路径,便于导入scripts.config project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) -# 导入代理管理器 -from src.scripts.ProxyIP import EnhancedProxyManager - # 读取雪球headers和Redis配置 try: from src.scripts.config import XUEQIU_HEADERS @@ -31,8 +31,13 @@ except ImportError: REDIS_KEY = 'xq_hk_stock_changes_latest' # 存放港股行情的主键 -# 创建全局代理管理器实例 -proxy_manager = EnhancedProxyManager() +# 条件导入代理管理器 +proxy_manager = None +try: + from src.scripts.ProxyIP import EnhancedProxyManager + proxy_manager = EnhancedProxyManager() +except ImportError: + print("代理管理器导入失败,将使用直接请求模式") def get_redis_conn(): @@ -47,67 +52,128 @@ def get_redis_conn(): return redis.Redis(connection_pool=pool) -def fetch_and_store_hk_stock_data(page_size=90): +def fetch_and_store_hk_stock_data(page_size=90, max_workers=10, use_proxy=False): """ 批量采集雪球港股所有股票的最新行情数据,并保存到Redis。 + 使用线程池并行请求,提高采集效率。 + :param page_size: 每页采集数量 + :param max_workers: 线程池最大工作线程数 + :param use_proxy: 是否使用代理(默认False) """ base_url = 'https://stock.xueqiu.com/v5/stock/screener/quote/list.json' headers = XUEQIU_HEADERS all_data = [] + data_lock = threading.Lock() # 线程安全锁 - # 使用港股API参数 + def fetch_page_data(page): + """获取单页数据的函数""" + params = { + 'page': page, + 'size': page_size, + 'order': 'desc', + 'order_by': 'dividend_yield', + 'market': 'HK', # 港股市场 + 'type': 'hk' # 港股类型 + } + + try: + # 根据配置选择是否使用代理 + if use_proxy and proxy_manager: + response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) + else: + response = requests.get(base_url, headers=headers, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + page_data = data['data']['list'] + + # 线程安全地添加数据 + with data_lock: + all_data.extend(page_data) + + print(f"成功采集港股第 {page} 页数据,获取 {len(page_data)} 条记录") + return len(page_data) + else: + print(f"请求港股数据第 {page} 页失败,状态码:{response.status_code}") + return 0 + except Exception as e: + print(f"请求港股数据第 {page} 页异常:{e}") + return 0 + + # 先获取总页数 params = { 'page': 1, 'size': page_size, 'order': 'desc', 'order_by': 'dividend_yield', - 'market': 'HK', # 港股市场 - 'type': 'hk' # 港股类型 + 'market': 'HK', + 'type': 'hk' } - - # 初次请求以获取总页数,使用代理 + try: - response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) + # 根据配置选择是否使用代理 + if use_proxy and proxy_manager: + response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) + else: + response = requests.get(base_url, headers=headers, params=params, timeout=10) + if response.status_code != 200: print(f"请求港股数据失败,状态码:{response.status_code}") - return + return pd.DataFrame() + + data = response.json() + total_count = data['data']['count'] + total_pages = (total_count // page_size) + 1 + + print(f"开始采集港股数据,共 {total_pages} 页,总计 {total_count} 条记录") + + # 使用线程池并行采集数据 + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + + # 提交所有页面的采集任务 + for page in range(1, total_pages + 1): + future = executor.submit(fetch_page_data, page) + futures.append(future) + + # 等待所有任务完成 + print(f"正在并行采集港股数据,使用 {max_workers} 个线程...") + start_time = time.time() + + completed_count = 0 + for future in as_completed(futures): + completed_count += 1 + try: + result = future.result() + if result > 0: + print(f"进度: {completed_count}/{len(futures)} 页完成") + except Exception as e: + print(f"采集任务异常:{e}") + + end_time = time.time() + print(f"港股数据采集完成,耗时: {end_time - start_time:.2f} 秒") + except Exception as e: - print(f"请求港股数据时发生异常:{e}") - return + print(f"获取港股总页数失败:{e}") + return pd.DataFrame() - data = response.json() - total_count = data['data']['count'] - total_pages = (total_count // page_size) + 1 - - print(f"开始采集港股数据,共 {total_pages} 页,{total_count} 条记录") - - # 循环获取所有页面的数据 - for page in range(1, total_pages + 1): - params['page'] = page - try: - response = proxy_manager.request_with_proxy('get', base_url, headers=headers, params=params) - if response.status_code == 200: - data = response.json() - all_data.extend(data['data']['list']) - print(f"成功采集港股第 {page}/{total_pages} 页数据") - else: - print(f"请求港股数据第 {page} 页失败,状态码:{response.status_code}") - except Exception as e: - print(f"请求港股数据第 {page} 页时发生异常:{e}") - continue - # 转换为 DataFrame df = pd.DataFrame(all_data) if not df.empty: df['fetch_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + # 存入Redis,使用hash结构,key为symbol,value为json字符串 r = get_redis_conn() pipe = r.pipeline() + # 先清空旧数据 r.delete(REDIS_KEY) + + print(f"正在将 {len(df)} 条港股记录写入Redis...") + for _, row in df.iterrows(): symbol = row.get('symbol') if not symbol: @@ -115,10 +181,15 @@ def fetch_and_store_hk_stock_data(page_size=90): # 只保留必要字段,也可直接存row.to_dict() value = row.to_dict() pipe.hset(REDIS_KEY, symbol, json.dumps(value, ensure_ascii=False)) + pipe.execute() print(f"成功将港股数据写入Redis哈希 {REDIS_KEY},共{len(df)}条记录。") + + # 返回DataFrame供其他脚本使用 + return df else: print("未获取到任何港股数据。") + return pd.DataFrame() def format_hk_stock_code(stock_code): @@ -175,7 +246,7 @@ def get_hk_stock_realtime_info_from_redis(stock_code): result["crawlDate"] = data.get("fetch_time") result["marketValue"] = data.get("market_capital") result["maxPrice"] = data.get("high") if "high" in data else data.get("high52w") - result["minPrice"] = data.get("low") if "low" in data else data.get("low52w") + result["minPrice"] = data.get("low") if "low" in data else data.get("high52w") result["nowPrice"] = data.get("current") result["pbRate"] = data.get("pb") result["rangeRiseAndFall"] = data.get("percent") @@ -193,5 +264,35 @@ def get_hk_stock_realtime_info_from_redis(stock_code): return result +def fetch_and_store_hk_stock_data_optimized(page_size=90, max_workers=15, use_proxy=False): + """ + 优化版本的港股批量采集函数,支持更灵活的配置 + + :param page_size: 每页采集数量 + :param max_workers: 线程池最大工作线程数(建议10-20之间) + :param use_proxy: 是否使用代理(默认False) + """ + print(f"开始批量采集港股数据...") + print(f"配置: 每页 {page_size} 条记录,最大线程数 {max_workers}") + print(f"代理模式: {'启用' if use_proxy else '禁用'}") + print(f"预计采集: 港股所有股票数据") + print("-" * 50) + + try: + result = fetch_and_store_hk_stock_data(page_size, max_workers, use_proxy) + if not result.empty: + print(f"港股采集完成!共获取 {len(result)} 只股票的数据") + print(f"数据已保存到Redis键: {REDIS_KEY}") + else: + print("港股采集完成,但未获取到数据") + except Exception as e: + print(f"港股采集过程中发生错误: {e}") + return None + + return result + + if __name__ == '__main__': - fetch_and_store_hk_stock_data() \ No newline at end of file + # 可以根据需要调整参数 + # fetch_and_store_hk_stock_data_optimized(page_size=100, max_workers=15, use_proxy=True) + fetch_and_store_hk_stock_data_optimized(use_proxy=False) # 默认不使用代理 \ No newline at end of file diff --git a/src/scripts/ProxyIP.py b/src/scripts/ProxyIP.py index 4858e06..bc71f5c 100644 --- a/src/scripts/ProxyIP.py +++ b/src/scripts/ProxyIP.py @@ -339,4 +339,37 @@ class EnhancedProxyManager: 'manual_proxies': self.redis_conn.hlen(manual_key), 'auto_refresh': self.auto_refresh, 'last_update': datetime.now().strftime("%Y-%m-%d %H:%M:%S") - } \ No newline at end of file + } + + def get_multiple_proxies(self, count: int = 5) -> List[Dict]: + """ + 获取多个代理,用于并发请求 + + :param count: 需要的代理数量 + :return: 代理列表 + """ + proxies = [] + manual_key = self._get_redis_key('manual') + + # 获取手动代理池中的所有代理 + manual_proxies = self.redis_conn.hgetall(manual_key) + active_proxies = [] + + for proxy_json in manual_proxies.values(): + proxy = json.loads(proxy_json) + if proxy.get('status') == 'active': + active_proxies.append(proxy) + + if not active_proxies: + return [] + + # 随机选择指定数量的代理 + selected_count = min(count, len(active_proxies)) + selected_proxies = random.sample(active_proxies, selected_count) + + # 为每个代理添加Redis键信息 + for proxy in selected_proxies: + proxy['_redis_key'] = self._get_redis_key(proxy['source']) + proxies.append(proxy) + + return proxies \ No newline at end of file diff --git a/src/scripts/config.py b/src/scripts/config.py index 74562f7..c662c80 100644 --- a/src/scripts/config.py +++ b/src/scripts/config.py @@ -11,7 +11,7 @@ XUEQIU_HEADERS = { 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Client-Version': 'v2.44.75', - 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; xq_a_token=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xqat=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzU2MDAyNjgyLCJjdG0iOjE3NTM0MTA2ODI0MTQsImNpZCI6ImQ5ZDBuNEFadXAifQ.AlnzQSY7oGKGABfaQcFLg0lAJsDdvBMiwUbgpCMCBlbx6VZPKhzERxWiylQb4dFIyyECvRRJ73SbO9cD46fAqgzOgTxArNHtTKD4lQapTnyb11diDADnpb_nzzaRr4k_BYQRKXWtcJxdUMzde2WLy-eAkSf76QkXmKrwS3kvRm5gfqhdye44whw5XMEGoZ_lXHzGLWGz_PludHZp6W3v-wwZc_0wLU6cTb_KdrwWUWT_8jw5JHXnJEmuZmQI8QWf60DtiHIYCYXarxv8XtyHK7lLKhIAa3C2QmGWw5wv2HGz4I5DPqm2uMPKumgkQxycfAk56-RWviLZ8LAPF-XcbA; xq_r_token=92527e51353f90ba14d5fd16581e5a7a2780baa2; acw_tc=1a0c655917546366986673411e68d25d3c69c1719d6d1d6283c7271cc1529f; is_overseas=0; Hm_lvt_1db88642e346389874251b5a1eded6e3=1754636834; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1754636837; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=Hvg6Ac+qmPnDgzOvFuCePWwm7reK8TPoE9ayL8cyLnFg+Jhg1RJO2WnkeH2T8Q18+iV9bDh+UAq222GxdelHBg%3D%3D; ssxmod_itna=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuqbKOOQYMxPsMKjqDsqze4GzDiLPGhDBWAFdYjdqN4NCtAoqzWWF2ruqe8bOZqKKFS96SM6sXUGQKhexGLDY=DCuXiieGGU4GwDGoD34DiDDpLD03Db4D_nWrD7ORQMluokjeDQ4GyDiUk3ObDm4DfDDLorA6osQ4DGqDSFcyTxD3DfRb4DDN4CIDu_mDDbObt5jcbUx7OBCGxIeDMixGXzGC4InyRNvDrgjMXvzEKH1aDtqD9_au4XxKdr3NEAEP4KGGpC0inpge_5neOQDqix1oeee4eQvxQ5O7Gv0DOGDz0G4ix_jwP_RUWjiihW9PeGAShXZ=E/ZND6q3mi40weUmXjmvYIzSQzWDW9wsemhYedCrwihQYbKYvWRD3YD; ssxmod_itna2=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuqbKOOQYMxPsMKe4DWhzmxhTKRDjR_xWs_DDs6KmhfHjRKnZkBxNA3TIO4Arip5wU2kO0SwUfkEzryfSk6Rzud3ARD49fiKFd344obYvCv1lxYhY3qdzQe3vWD', + 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; Hm_lvt_1db88642e346389874251b5a1eded6e3=1754636834; xq_a_token=4ea8af8f9cb5850af2ba654c5255cbf6bf797b39; xqat=4ea8af8f9cb5850af2ba654c5255cbf6bf797b39; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzU3NjM3OTA4LCJjdG0iOjE3NTUwNDU5MDg0ODksImNpZCI6ImQ5ZDBuNEFadXAifQ.jAeKlW2r1xRuyoZ3cuy2rTgfSoW_79wGJeuup7I7sZMSH5QeRDJrGx5JWXO4373YRpNW0qnAXR51Ygd8Plmko1u99iN8LifGzyMtblXDPgs17aS0zyHr6cMAsURU984wCXkmZxdvRMCHdevc8XWNHnuqeGfQNSgBSdO6Zv7Xc5-t965TJba96UOsNBpv2GghV9B2mcrUQyW3edi9kRAN_Fxmx5M1Iri4Yfppcaj-VSZYkdZtUpizrN5BbVYujcnQjj4kceUYYAl3Ccs273KVNSMFKpHMIOJcMJATY6PRgLvbEu8_ttIfBnbG4mmZ71bU7RXigleXIj1qhcDL2rDzQQ; xq_r_token=2b5db3e3897cb3e46b8fa2fa384471b334ec59cb; acw_tc=ac11000117550489614555169ef3ec63ec008e1bfba0fe5321bc8a30c2deb8; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1755050072; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=kE/XuROkIJ4APDhfxUYOb9lRiDFNJT8KxiXYwJAuoCeNlkaxlcytBSuiCXGjqxhydALLguC/FB4qIXfLut408Q%3D%3D; ssxmod_itna=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuuqxGeR0qxPehAjqDsqze4GzDiLPGhDBWAFdYjw7ivq8RG5pxMBrGHuhLMEzoiqds=iCpxusTh1K/Zjw=KmoeK4xGLDY=DCTKq1QeD4S3Dt4DIDAYDDxDWU4DLDYoDY3nYxGP=xpWTcmRbD0YDzqDgUEe=xi3DA4DjnehqYiTdwDDBDGtO=9aDG4GfSmDD0wDLoGQQoDGWnCneE6mkiFIr6TTDjqPD/Shc59791vGW56CM9zo3paFDtqD90aAFn=GrvFaE_n93e4F4qibH7GYziTmrzt4xmrKi44mBDmAQQ0TKe4Bxq3DPQDhtTH7OY8FYS_Qqx4Gn/lHDcnDd7YerPCYC70bYbC42q3i8WTx3e8/rijIosDPRGrE0WdoqzGh_YPeD; ssxmod_itna2=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuuqxGeR0qxPehAe4DWhYeRonANsR7vNG4g2BxpQTxTiD', 'Referer': 'https://weibo.com/u/7735765253', 'Sec-Ch-Ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"', 'Sec-Ch-Ua-Mobile': '?0', diff --git a/src/scripts/stock_price_change_analyzer.py b/src/scripts/stock_price_change_analyzer.py index 117f39d..80bf702 100644 --- a/src/scripts/stock_price_change_analyzer.py +++ b/src/scripts/stock_price_change_analyzer.py @@ -184,55 +184,6 @@ def analyze_price_changes(db_url): except Exception as e: print("分析数据时发生错误: {}".format(str(e))) -def get_previous_sector_ranks(engine): - """获取上一次的板块排名""" - try: - query = text(""" - SELECT sector_name, rank_num - FROM sector_performance - WHERE DATE(add_time) = CURDATE() - ORDER BY add_time DESC - LIMIT 1 - """) - - result = pd.read_sql_query(query, engine) - if result.empty: - return {} - - return dict(zip(result['sector_name'], result['rank_num'])) - except Exception as e: - print("获取上一次排名数据时发生错误: {}".format(str(e))) - return {} - -def calculate_rank_change(row, previous_ranks): - """计算排名变化""" - previous_rank = previous_ranks.get(row['sector_name']) - if previous_rank is None: - return 0 - return previous_rank - row['rank_num'] - -def get_cache_mark(): - """获取当前时间对应的缓存标记""" - current_minute = datetime.now().minute - mark = (current_minute % 10) // 2 * 2 - return "{}m".format(mark) - -def save_sector_cache(engine, df_result, cache_mark): - """保存板块数据到缓存表""" - try: - df_cache = df_result.copy() - df_cache['cache_mark'] = cache_mark - - with engine.connect() as conn: - delete_query = text("DELETE FROM sector_performance_cache WHERE cache_mark = :cache_mark") - conn.execute(delete_query, {'cache_mark': cache_mark}) - conn.commit() - - df_cache.to_sql('sector_performance_cache', con=engine, if_exists='append', index=False) - print(f"缓存数据已保存,标记: {cache_mark}") - except Exception as e: - print("保存缓存数据时发生错误: {}".format(str(e))) - def main(db_url): """主函数""" engine = create_engine(db_url) diff --git a/src/scripts/stock_sector_analysis.py b/src/scripts/stock_sector_analysis.py index f8e9018..8d8282c 100644 --- a/src/scripts/stock_sector_analysis.py +++ b/src/scripts/stock_sector_analysis.py @@ -1,62 +1,74 @@ # coding:utf-8 -import requests import pandas as pd from sqlalchemy import create_engine, text from datetime import datetime -from config import XUEQIU_HEADERS +import redis +import json +import sys +import os -def fetch_and_store_stock_data(db_url, table_name='stock_changes', page_size=90): - """获取雪球数据并保存到数据库""" - base_url = 'https://stock.xueqiu.com/v5/stock/screener/quote/list.json' - types = ['sha', 'sza', 'kcb'] # 数据类型 - headers = XUEQIU_HEADERS +# 添加项目根目录到路径,便于导入config +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(project_root) - all_data = [] +# 读取配置 +try: + from src.valuation_analysis.config import REDIS_CONFIG +except ImportError: + REDIS_CONFIG = { + 'host': 'localhost', + 'port': 6379, + 'db': 0, + 'password': None + } - for stock_type in types: - params = { - 'page': 1, - 'size': page_size, - 'order': 'desc', - 'order_by': 'percent', - 'market': 'CN', - 'type': stock_type - } +REDIS_KEY = 'xq_stock_changes_latest' # Redis中存放行情的主键 - # 初次请求以获取总页数 - response = requests.get(base_url, headers=headers, params=params) - if response.status_code != 200: - print(f"请求 {stock_type} 数据失败,状态码:{response.status_code}") - continue +def get_redis_conn(): + """获取Redis连接""" + pool = redis.ConnectionPool( + host=REDIS_CONFIG['host'], + port=REDIS_CONFIG['port'], + db=REDIS_CONFIG.get('db', 0), + password=REDIS_CONFIG.get('password', None), + decode_responses=True + ) + return redis.Redis(connection_pool=pool) - data = response.json() - total_count = data['data']['count'] - total_pages = (total_count // page_size) + 1 - - for page in range(1, total_pages + 1): - params['page'] = page - response = requests.get(base_url, headers=headers, params=params) - if response.status_code == 200: - data = response.json() - all_data.extend(data['data']['list']) - else: - print(f"请求 {stock_type} 数据第 {page} 页失败,状态码:{response.status_code}") - # 转换为 DataFrame - df = pd.DataFrame(all_data) - - if not df.empty: - # 添加 id 列 - df['id'] = range(1, len(df) + 1) - - # 创建数据库连接 - engine = create_engine(db_url) - - # 将数据写入数据库表 - df.to_sql(table_name, con=engine, if_exists='replace', index=False) - print(f"成功将数据写入数据库表 {table_name}。") +def fetch_stock_data_from_redis(): + """从Redis获取股票数据""" + try: + r = get_redis_conn() + + # 从Redis获取所有股票数据 + all_stock_data = r.hgetall(REDIS_KEY) + + if not all_stock_data: + print("Redis中没有找到股票数据") + return pd.DataFrame() + + # 转换为DataFrame + stock_list = [] + for symbol, value in all_stock_data.items(): + try: + stock_data = json.loads(value) + stock_list.append(stock_data) + except json.JSONDecodeError: + print(f"解析股票数据失败: {symbol}") + continue + + if stock_list: + df = pd.DataFrame(stock_list) + print(f"从Redis成功获取 {len(df)} 条股票数据") + return df else: - print("未获取到任何数据。") + print("Redis数据解析失败") + return pd.DataFrame() + + except Exception as e: + print(f"从Redis获取数据时发生错误: {str(e)}") + return pd.DataFrame() def get_cache_mark(): """获取当前时间对应的缓存标记(0m, 2m, 4m, 6m, 8m)""" @@ -164,21 +176,43 @@ def get_high_performance_stocks(db_url): # 确保缓存表存在 ensure_cache_table_exists(engine) - # 先获取最新数据 - fetch_and_store_stock_data(db_url) + # 从Redis获取股票数据 + print("从Redis获取股票数据...") + stock_changes_df = fetch_stock_data_from_redis() + + if stock_changes_df.empty: + print("Redis数据获取失败,无法进行分析") + return [] - # 读取数据 - stock_changes_df = pd.read_sql_table('stock_changes', con=engine) + print(f"Redis数据获取成功,共 {len(stock_changes_df)} 条记录") + + # 读取板块信息 gp_gnbk_df = pd.read_sql_table('gp_gnbk', con=engine) - # 去掉 symbol 字段的前两个字符 - stock_changes_df['symbol'] = stock_changes_df['symbol'].str[2:] + # 处理symbol字段 - 根据Redis数据格式,symbol已经是完整格式(如SZ002916) + if 'symbol' in stock_changes_df.columns: + # 保持原始格式用于关联板块表,因为板块表使用的是完整格式(如SZ000973) + stock_changes_df['symbol_clean'] = stock_changes_df['symbol'] + else: + print("股票数据中缺少symbol字段") + return [] # 筛选涨幅超过 1.5% 的股票 high_performance_stocks = stock_changes_df[stock_changes_df['percent'] > 1.5] + print(f"筛选结果: {len(high_performance_stocks)} 只股票涨幅超过1.5%") + + if len(high_performance_stocks) == 0: + print("没有股票涨幅超过1.5%,尝试降低阈值到0.5%...") + high_performance_stocks = stock_changes_df[stock_changes_df['percent'] > 0.5] + print(f"筛选结果: {len(high_performance_stocks)} 只股票涨幅超过0.5%") + + if len(high_performance_stocks) == 0: + print("仍然没有股票涨幅超过0.5%,尝试获取所有上涨股票...") + high_performance_stocks = stock_changes_df[stock_changes_df['percent'] > 0] + print(f"筛选结果: {len(high_performance_stocks)} 只股票上涨") # 关联两个表,获取 bk_name - merged_df = high_performance_stocks.merge(gp_gnbk_df, left_on='symbol', right_on='gp_code') + merged_df = high_performance_stocks.merge(gp_gnbk_df, left_on='symbol_clean', right_on='gp_code') # 统计每个 bk_name 的数量 total_counts = gp_gnbk_df['bk_name'].value_counts() @@ -215,6 +249,7 @@ def get_high_performance_stocks(db_url): save_sector_cache(engine, df_result, cache_mark) # 输出结果 + print("\n板块分析结果:") for _, row in df_result.iterrows(): print("板块名称: {}, 上涨家数: {}, 总数: {}, 比重: {:.2%}, 排名: {}, 排名变化: {}".format( row['sector_name'], row['up_count'], row['total_count'], @@ -251,21 +286,31 @@ def get_top_industries_and_stocks(db_url, top_start=1, top_end=10): # 提取指定范围的行名称 top_industry_names = [industry[0] for industry in top_industries[top_start-1:top_end]] - # 读取数据 - stock_changes_df = pd.read_sql_table('stock_changes', con=engine) + # 从Redis获取股票数据 + stock_changes_df = fetch_stock_data_from_redis() + if stock_changes_df.empty: + print("无法获取股票数据") + return + + # 读取板块信息 gp_gnbk_df = pd.read_sql_table('gp_gnbk', con=engine) - # 去掉 symbol 字段的前两个字符 - stock_changes_df['symbol'] = stock_changes_df['symbol'].str[2:] + # 处理symbol字段 + if 'symbol' in stock_changes_df.columns: + # 保持原始格式用于关联板块表 + stock_changes_df['symbol_clean'] = stock_changes_df['symbol'] + else: + print("股票数据中缺少symbol字段") + return # 关联两个表,获取 bk_name 和 gp_name - merged_df = stock_changes_df.merge(gp_gnbk_df, left_on='symbol', right_on='gp_code') + merged_df = stock_changes_df.merge(gp_gnbk_df, left_on='symbol_clean', right_on='gp_code') # 筛选指定范围的行业的股票 filtered_df = merged_df[merged_df['bk_name'].isin(top_industry_names)] # 统计每只股票命中行业的数量 - stock_industry_counts = filtered_df.groupby(['symbol', 'gp_name'])['bk_name'].nunique().sort_values(ascending=False).head(10) + stock_industry_counts = filtered_df.groupby(['symbol', 'name'])['bk_name'].nunique().sort_values(ascending=False).head(10) # 获取每只股票的命中行业数量 stock_industry_list = stock_industry_counts.reset_index() @@ -332,5 +377,17 @@ if __name__ == "__main__": # 清理历史数据 clean_historical_data(engine) - # 执行主要分析 - get_top_industries_and_stocks(db_url, 1, 10) \ No newline at end of file + # 执行主要分析 - 完全使用Redis数据源 + print("=" * 60) + print("板块分析脚本启动 - Redis数据源版本") + print("=" * 60) + print("使用Redis作为数据源") + print("确保batch_stock_price_collector.py脚本正在运行") + print("-" * 60) + + # 执行分析 + get_top_industries_and_stocks(db_url, 1, 10) + + print("=" * 60) + print("板块分析完成") + print("=" * 60) \ No newline at end of file diff --git a/src/static/js/bigscreen_v2.js b/src/static/js/bigscreen_v2.js index 2f98dd8..2bc0ca2 100644 --- a/src/static/js/bigscreen_v2.js +++ b/src/static/js/bigscreen_v2.js @@ -172,6 +172,13 @@ $(function() { // 隐藏重置按钮 $('#resetViewBtn').hide(); + // 静默请求接口,不传递行业参数 + $.get('https://spb.bmbs.tech/api/dify/webSelectStockIndustry') + .fail(function(xhr, status, error) { + // 静默处理错误,不显示给用户 + console.log('返回默认视图通知接口调用失败:', error); + }); + // 恢复第二行的原始布局 const rowContainer = document.querySelector('.row.d-flex2'); rowContainer.innerHTML = ` @@ -223,6 +230,9 @@ $(function() { element.remove(); } }); + + // 重新绑定持仓点击事件 + setTimeout(bindIndustryHoldingsClick, 1000); // 延迟1秒绑定,确保数据加载完成 } // 加载行业详情函数 @@ -399,10 +409,10 @@ $(function() {
${factor}: ${value}
`; + if (factorInfo && factorInfo.factors_details) { + factorInfo.factors_details.forEach((factorDetail) => { + const factor = factorDetail.factor; + const detail = factorDetail.detail; + const signal = factorDetail.signal; + + // 根据signal显示箭头 + let arrowIcon = ''; + if (signal === '买') { + arrowIcon = '⬆'; + } else if (signal === '卖') { + arrowIcon = '⬇'; + } + + html += `${arrowIcon}${factor}: ${detail}
`; }); } else { html += '暂无因子数据
'; @@ -485,10 +507,6 @@ $(function() { container.innerHTML = html; } - // 这个函数已经不再使用,删除 - - // 这个函数已经不再使用,删除 - function convertStockCode(stockCode) { // 将 600584.SH 格式转换为 SH600584 格式 const parts = stockCode.split('.'); @@ -702,13 +720,16 @@ $(function() { function bindIndustryHoldingsClick() { // 为每个持仓容器添加点击事件 const holdingsContainers = ['holdings_xjfz', 'holdings_xp', 'holdings_xfdz', 'holdings_jqr']; - const industries = ['半导体', '自动化设备', '军工电子', '消费电子']; // 默认行业,实际会根据持仓动态更新 holdingsContainers.forEach((containerId, index) => { const container = document.getElementById(containerId); if (container) { - container.style.cursor = 'pointer'; - container.addEventListener('click', function() { + // 移除之前的事件监听器,避免重复绑定 + container.replaceWith(container.cloneNode(true)); + const newContainer = document.getElementById(containerId); + + newContainer.style.cursor = 'pointer'; + newContainer.addEventListener('click', function() { // 获取当前显示的行业名称 const industryTitle = this.parentElement.querySelector('.chart-title'); if (industryTitle) { @@ -752,7 +773,7 @@ $(function() { html = '