diff --git a/src/app.py b/src/app.py index 5078ade..d10e65a 100644 --- a/src/app.py +++ b/src/app.py @@ -2816,6 +2816,12 @@ def bigscreen_page(): """渲染大屏展示页面""" return render_template('bigscreen.html') + +@app.route('/bigscreenv2') +def bigscreen_page_v2(): + """渲染大屏展示页面""" + return render_template('bigscreen_v2.html') + @app.route('/api/bigscreen_data', methods=['GET']) def bigscreen_data(): """聚合大屏所需的12张图数据,便于前端一次性加载""" @@ -3001,6 +3007,186 @@ def run_batch_hk_stock_price_collection(): logger.error(f"批量采集A股行情失败: {str(e)}") return jsonify({"status": "error", "message": str(e)}) +@app.route('/api/portfolio/industry_allocation', methods=['GET']) +def get_portfolio_industry_allocation(): + """获取行业持仓占比数据""" + try: + # 导入持仓分析器 + from src.valuation_analysis.portfolio_analyzer import PortfolioAnalyzer + + # 创建分析器实例 + analyzer = PortfolioAnalyzer() + + # 获取行业持仓分配数据 + result = analyzer.analyze_portfolio_allocation() + + if result.get("success"): + return jsonify({ + "status": "success", + "data": result["data"] + }) + else: + return jsonify({ + "status": "error", + "message": result.get("message", "获取持仓数据失败") + }) + + except Exception as e: + logger.error(f"获取行业持仓占比失败: {str(e)}") + return jsonify({'status': 'error', 'message': str(e)}) + +@app.route('/api/notice/list', methods=['GET']) +def get_notice_list(): + """获取重要提醒列表""" + try: + # 模拟数据 - 实际项目中应该从数据库或外部API获取 + mock_notices = [ + "上证指数突破3200点,市场情绪回暖", + "北向资金今日净流入85.6亿元", + "科技板块PE估值处于历史低位", + "新能源概念股集体上涨,涨幅超3%", + "医药板块回调,建议关注低吸机会", + "融资融券余额连续三日增长", + "消费板块资金流入明显", + "市场恐贪指数回升至65", + "机器人概念板块技术面突破", + "先进封装概念获政策支持" + ] + + return jsonify({ + "status": "success", + "data": mock_notices + }) + except Exception as e: + logger.error(f"获取提醒列表失败: {str(e)}") + return jsonify({'status': 'error', 'message': str(e)}) + +@app.route('/api/portfolio/summary', methods=['GET']) +def get_portfolio_summary(): + """获取持仓摘要信息""" + try: + # 导入持仓分析器 + from src.valuation_analysis.portfolio_analyzer import PortfolioAnalyzer + + # 创建分析器实例 + analyzer = PortfolioAnalyzer() + + # 获取持仓摘要数据 + result = analyzer.get_portfolio_summary() + + if result.get("success"): + return jsonify({ + "status": "success", + "data": result["data"] + }) + else: + return jsonify({ + "status": "error", + "message": result.get("message", "获取持仓摘要失败") + }) + + except Exception as e: + logger.error(f"获取持仓摘要失败: {str(e)}") + return jsonify({'status': 'error', 'message': str(e)}) + +@app.route('/api/portfolio/industry_holdings', methods=['GET']) +def get_industry_holdings_detail(): + """获取指定行业的详细持仓信息""" + try: + industry_name = request.args.get('industry_name') + if not industry_name: + return jsonify({'status': 'error', 'message': '缺少必要参数: industry_name'}), 400 + + # 导入持仓分析器 + from src.valuation_analysis.portfolio_analyzer import PortfolioAnalyzer + + # 创建分析器实例 + analyzer = PortfolioAnalyzer() + + # 获取行业详细持仓数据 + result = analyzer.get_industry_holdings_detail(industry_name) + + if result.get("success"): + return jsonify({ + "status": "success", + "data": result["data"] + }) + else: + return jsonify({ + "status": "error", + "message": result.get("message", "获取行业持仓详情失败") + }) + + except Exception as e: + logger.error(f"获取行业持仓详情失败: {str(e)}") + return jsonify({'status': 'error', 'message': str(e)}) + + +@app.route('/api/valuation/indicator', methods=['POST']) +def analyze_valuation_indicator(): + """分析股票应该使用PE还是PB估值 + POST参数: + - stock_code: 股票代码 (例如: 000001) + - stock_name: 股票名称 (例如: 平安银行) + + 返回格式: + { + "status": "success", + "data": { + "recommended_indicator": "PB", + "reasoning": "平安银行属于金融服务业,作为商业银行,其商业模式基于资产负债管理。金融机构的盈利受拨备、利率、市场波动影响而不够稳定。基于金融业的特殊性,PB是更合适的估值指标..." + } + } + """ + try: + # 从POST表单参数获取 + stock_code = request.form.get('stock_code') + stock_name = request.form.get('stock_name') + + if not stock_code or not stock_name: + return jsonify({ + "status": "error", + "message": "缺少必要参数: stock_code 或 stock_name" + }), 400 + + # 导入估值指标分析器 + try: + from src.valuation_analysis.valuation_indicator_analyzer import ValuationIndicatorAnalyzer + logger.info("成功导入估值指标分析器") + except ImportError as e: + logger.error(f"无法导入估值指标分析器: {str(e)}") + return jsonify({ + "status": "error", + "message": f"服务器配置错误: 估值指标分析器不可用,错误详情: {str(e)}" + }), 500 + + # 创建分析器实例 + analyzer = ValuationIndicatorAnalyzer() + + # 执行分析 + result = analyzer.analyze_valuation_indicator(stock_code, stock_name) + + if result.get("success"): + return jsonify({ + "status": "success", + "data": { + "recommended_indicator": result.get("recommended_indicator"), + "reasoning": result.get("reasoning", "") + } + }) + else: + return jsonify({ + "status": "error", + "message": result.get("error", "分析失败,无详细信息") + }), 500 + + except Exception as e: + logger.error(f"估值指标分析失败: {str(e)}") + return jsonify({ + "status": "error", + "message": f"估值指标分析失败: {str(e)}" + }), 500 + if __name__ == '__main__': # 启动Web服务器 diff --git a/src/scripts/config.py b/src/scripts/config.py index 9e4414d..aaff9b9 100644 --- a/src/scripts/config.py +++ b/src/scripts/config.py @@ -11,7 +11,7 @@ XUEQIU_HEADERS = { 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Client-Version': 'v2.44.75', - 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; Hm_lvt_1db88642e346389874251b5a1eded6e3=1751936369; xq_a_token=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xqat=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzU2MDAyNjgyLCJjdG0iOjE3NTM0MTA2ODI0MTQsImNpZCI6ImQ5ZDBuNEFadXAifQ.AlnzQSY7oGKGABfaQcFLg0lAJsDdvBMiwUbgpCMCBlbx6VZPKhzERxWiylQb4dFIyyECvRRJ73SbO9cD46fAqgzOgTxArNHtTKD4lQapTnyb11diDADnpb_nzzaRr4k_BYQRKXWtcJxdUMzde2WLy-eAkSf76QkXmKrwS3kvRm5gfqhdye44whw5XMEGoZ_lXHzGLWGz_PludHZp6W3v-wwZc_0wLU6cTb_KdrwWUWT_8jw5JHXnJEmuZmQI8QWf60DtiHIYCYXarxv8XtyHK7lLKhIAa3C2QmGWw5wv2HGz4I5DPqm2uMPKumgkQxycfAk56-RWviLZ8LAPF-XcbA; xq_r_token=92527e51353f90ba14d5fd16581e5a7a2780baa2; acw_tc=ac11000117534287625894768e00740076244cbad53d8039638dfe8ed0f4b1; is_overseas=1; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=VRmy+KD3dGV+blntY70z3I+WQFcq8+JM1SqrC6E5295b/kwc4W5RkB+oXprCzEylFSeHXru7sQJDLmMr0mBJ+g%3D%3D; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1753429267; ssxmod_itna=eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0KHhqDylYw0i2YwjCo0ZD5D/KCeGzDiLPGhDBWAFdYGdTt4NFtiowCWKGwktpe9flQbeeYGlcD0aK7G=x3EujH5Zn7iIiRoeDU4GnD0=O7YmKqxGGI4GwDGoD34DiDDpED03Db4D+4=bD7rTiocW=EjeDQ4GyDitDKLe=xi3DA4Djnl=qYiTdwDDBDGtON9aDG4GfSmDD0qtBeqT4DYP=5Pr8d29mpOWSneDMixGXz71+NI1yoYcrdvU6r+bOpPGuDG6CP3POd7nEaafY66i0DeGmKK+j0DrlDpYwMihyYwQGGiBqqQGo3qqCexhGC0G4ixqQARmPHK4vP2OIPeDEZgXDfExF0iY+K+mdFrKGmQGvwBP40PpFDC7KzBqG7W4QGt/D3ixt+R5BYobxxD; ssxmod_itna2=eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0KHhqDylYw0i2YwjCo0ZYeDA4rYnRItORCDU1Z/PnDhxYG9pGD', + 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; Hm_lvt_1db88642e346389874251b5a1eded6e3=1751936369; xq_a_token=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xqat=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzU2MDAyNjgyLCJjdG0iOjE3NTM0MTA2ODI0MTQsImNpZCI6ImQ5ZDBuNEFadXAifQ.AlnzQSY7oGKGABfaQcFLg0lAJsDdvBMiwUbgpCMCBlbx6VZPKhzERxWiylQb4dFIyyECvRRJ73SbO9cD46fAqgzOgTxArNHtTKD4lQapTnyb11diDADnpb_nzzaRr4k_BYQRKXWtcJxdUMzde2WLy-eAkSf76QkXmKrwS3kvRm5gfqhdye44whw5XMEGoZ_lXHzGLWGz_PludHZp6W3v-wwZc_0wLU6cTb_KdrwWUWT_8jw5JHXnJEmuZmQI8QWf60DtiHIYCYXarxv8XtyHK7lLKhIAa3C2QmGWw5wv2HGz4I5DPqm2uMPKumgkQxycfAk56-RWviLZ8LAPF-XcbA; xq_r_token=92527e51353f90ba14d5fd16581e5a7a2780baa2; acw_tc=0a27aa0f17542694317833912e006564153fcd1bb89f49a865e382d9953601; is_overseas=0; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1754269439; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=HS+RscPvXRUz1ypZekks1pgGkAHHlHsHVuftTbDQCbUUaFqtm9BV4h7ghR2d5Nh+YD29otSyz2svRiKWvOJqgQ%3D%3D; ssxmod_itna=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuq4wQWiYMrK4N4hGRtDl=YoDZDGFdDqx0Ei6Fi7HKzYhtBoqzWKjw_wv5YlCZMPO8//1P9PQCNzkOQ4hviDB3DbqDy/dePxYYjDBYD74G_DDeDixdDj4GmDGYtOeDFfCuNq6R5dxDwDB=DmMIbfeDEDG3D0fbeCLRYwDDBDGUFxtaDG4Gf0mDD0wDAo0jooDGWfnu4s6mkeFKN57G3x0tWDBL5QvG3x/lnoGWNVtlfkS2FkPGuDG6Ogl0kDqQO3i2AfP4KGGIm0iBPKY_5leOQDqQe4YwQGDpl0xliO7Gm0DOGDz0G4ixqYw1n0aSpwhixgPXieD1NZcX3ZXDK4rm0IlvYRGImxqnmmlG4eK40w4Am1BqGYeeGn5ixXWa3m2b/DDgi3YD; ssxmod_itna2=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuq4wQWiYMrK4N4hGbYDiPbY44h7ie03dz7=3xDlouSdLRKl=Q_2YStYQ7OzOy_RBQ1oeziI2pkPsD8RSfPnSw5L7G4xcSPKKMxxoCD6zTiVCud28rNOm2tL7qASSMTjB2GcYPxzSRi94n0Kgjd6C6jKOMh5rMtOfkR2l8TGOPL277=81u9MRkBgIwRxDwx6iYEE4omE9FE1lonhzib3BUC6PD', 'Referer': 'https://weibo.com/u/7735765253', 'Sec-Ch-Ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"', 'Sec-Ch-Ua-Mobile': '?0', diff --git a/src/valuation_analysis/portfolio_analyzer.py b/src/valuation_analysis/portfolio_analyzer.py new file mode 100644 index 0000000..d77a135 --- /dev/null +++ b/src/valuation_analysis/portfolio_analyzer.py @@ -0,0 +1,376 @@ +""" +持仓分析模块 + +提供持仓数据获取和行业分类统计功能,包括: +1. 从外部API获取持仓数据 +2. 根据股票代码获取行业分类 +3. 计算各行业持仓金额和占比 +""" + +import requests +import logging +from typing import Dict, List, Optional +from sqlalchemy import create_engine, text + +from .config import DB_URL, LOG_FILE + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(LOG_FILE), + logging.StreamHandler() + ] +) +logger = logging.getLogger("portfolio_analyzer") + + +class PortfolioAnalyzer: + """持仓分析器类""" + + def __init__(self, db_url: str = DB_URL): + """ + 初始化持仓分析器 + + Args: + db_url: 数据库连接URL + """ + self.engine = create_engine( + db_url, + pool_size=5, + max_overflow=10, + pool_recycle=3600 + ) + self.api_url = ("https://to.bmbs.tech/aim/app/v1/derivativeTrading/getTradingRecordList?" + "projectId=182AE38B8C254BC88C715D86C643A6DD,C9B533175D294648A2372CB2966BCC96") + logger.info("持仓分析器初始化完成") + + def get_trading_records(self) -> Optional[Dict]: + """ + 从外部API获取交易记录数据 + + Returns: + 交易记录数据字典,如果请求失败返回None + """ + try: + headers = { + 'authUserIdYh': '4028816c6759b6cb01675aacc98a00f6' + } + + response = requests.get(self.api_url, headers=headers, timeout=10) + response.raise_for_status() + + data = response.json() + if data.get("success") and data.get("code") == 200: + logger.info("成功获取交易记录数据") + return data.get("data", {}) + else: + logger.error(f"API返回错误: {data.get('message', '未知错误')}") + return None + + except requests.exceptions.RequestException as e: + logger.error(f"请求交易记录API失败: {e}") + return None + except Exception as e: + logger.error(f"处理交易记录数据失败: {e}") + return None + + def get_stock_industry(self, stock_code: str) -> List[str]: + """ + 获取指定股票所属的行业列表 + + Args: + stock_code: 股票代码(格式如:603290.SH) + + Returns: + 行业名称列表 + """ + try: + # 转换股票代码格式 + formatted_code = self._convert_stock_code_format(stock_code) + + query = text(""" + SELECT DISTINCT bk_name + FROM gp_hybk + WHERE gp_code = :stock_code + """) + + with self.engine.connect() as conn: + result = conn.execute(query, {"stock_code": formatted_code}).fetchall() + + if result: + return [row[0] for row in result] + else: + logger.warning(f"未找到股票 {stock_code} 的行业数据") + return [] + + except Exception as e: + logger.error(f"获取股票行业失败: {e}") + return [] + + def _convert_stock_code_format(self, stock_code: str) -> str: + """ + 转换股票代码格式 + + Args: + stock_code: 原始股票代码,格式如 "603290.SH" + + Returns: + 转换后的股票代码,格式如 "SH603290" + """ + try: + code, market = stock_code.split('.') + return f"{market}{code}" + except Exception as e: + logger.error(f"转换股票代码格式失败: {str(e)}") + return stock_code + + def calculate_margin_amount(self, notional_principal: float, margin_rate: float) -> float: + """ + 计算保证金金额 + + Args: + notional_principal: 名义本金 + margin_rate: 保证金率(百分比) + + Returns: + 保证金金额 + """ + return notional_principal * (margin_rate / 100) + + def analyze_portfolio_allocation(self) -> Dict: + """ + 分析持仓行业分配 + + Returns: + 包含行业持仓占比的字典 + """ + try: + # 1. 获取交易记录数据 + trading_data = self.get_trading_records() + if not trading_data: + return {"success": False, "message": "无法获取交易记录数据"} + + data_list = trading_data.get("dataList", []) + if not data_list: + return {"success": False, "message": "交易记录数据为空"} + + # 2. 处理持仓数据 + industry_amounts = {} # 行业金额统计 + total_amount = 0.0 + + for project in data_list: + project_name = project.get("projectName", "") + stock_code = project.get("stockCode", "") + trading_records = project.get("tradingRecoderList", []) + + # 只处理未清仓的记录 + for record in trading_records: + if record.get("projectStatus") == "published": + notional_principal = float(record.get("notionalPrincipal", 0)) + margin_rate = float(record.get("marginRate", 0)) + + # 计算保证金金额 + margin_amount = self.calculate_margin_amount(notional_principal, margin_rate) + total_amount += margin_amount + + # 获取股票所属行业 + industries = self.get_stock_industry(stock_code) + + if industries: + # 如果股票属于多个行业,按行业数量平均分配 + amount_per_industry = margin_amount / len(industries) + for industry in industries: + if industry in industry_amounts: + industry_amounts[industry] += amount_per_industry + else: + industry_amounts[industry] = amount_per_industry + else: + # 如果无法获取行业信息,归类为"其他" + other_industry = "其他" + if other_industry in industry_amounts: + industry_amounts[other_industry] += margin_amount + else: + industry_amounts[other_industry] = margin_amount + + # 3. 生成返回数据 + if not industry_amounts: + return {"success": False, "message": "未找到有效的持仓数据"} + + # 按金额排序 + sorted_industries = sorted(industry_amounts.items(), key=lambda x: x[1], reverse=True) + + # 定义颜色映射 + colors = [ + "#5470c6", "#91cc75", "#fac858", "#ee6666", "#73c0de", + "#3ba272", "#fc8452", "#9a60b4", "#ea7ccc", "#ff9f7f" + ] + + industries_data = [] + for i, (industry, amount) in enumerate(sorted_industries): + color = colors[i % len(colors)] + industries_data.append({ + "industry": industry, + "amount": round(amount, 2), + "color": color + }) + + result = { + "success": True, + "data": { + "total_amount": round(total_amount, 2), + "industries": industries_data + } + } + + logger.info(f"成功分析持仓行业分配,总金额: {total_amount:.2f}万元,共{len(industries_data)}个行业") + return result + + except Exception as e: + logger.error(f"分析持仓行业分配失败: {e}") + return {"success": False, "message": f"分析持仓行业分配失败: {str(e)}"} + + def get_portfolio_summary(self) -> Dict: + """ + 获取持仓摘要信息 + + Returns: + 持仓摘要信息字典 + """ + try: + # 1. 获取交易记录数据 + trading_data = self.get_trading_records() + if not trading_data: + return {"success": False, "message": "无法获取交易记录数据"} + + data_list = trading_data.get("dataList", []) + if not data_list: + return {"success": False, "message": "交易记录数据为空"} + + # 2. 统计信息 + total_projects = len(data_list) + active_projects = 0 + total_margin_amount = 0.0 + project_details = [] + + for project in data_list: + project_name = project.get("projectName", "") + stock_code = project.get("stockCode", "") + trading_records = project.get("tradingRecoderList", []) + + project_margin = 0.0 + has_active_position = False + + for record in trading_records: + if record.get("projectStatus") == "published": + has_active_position = True + notional_principal = float(record.get("notionalPrincipal", 0)) + margin_rate = float(record.get("marginRate", 0)) + margin_amount = self.calculate_margin_amount(notional_principal, margin_rate) + project_margin += margin_amount + + if has_active_position: + active_projects += 1 + total_margin_amount += project_margin + + # 获取行业信息 + industries = self.get_stock_industry(stock_code) + industry_names = ", ".join(industries) if industries else "未知" + + project_details.append({ + "project_name": project_name, + "stock_code": stock_code, + "industry": industry_names, + "margin_amount": round(project_margin, 2) + }) + + # 3. 生成摘要 + summary = { + "success": True, + "data": { + "total_projects": total_projects, + "active_projects": active_projects, + "total_margin_amount": round(total_margin_amount, 2), + "project_details": project_details + } + } + + logger.info(f"成功获取持仓摘要,总项目数: {total_projects},活跃项目数: {active_projects}") + return summary + + except Exception as e: + logger.error(f"获取持仓摘要失败: {e}") + return {"success": False, "message": f"获取持仓摘要失败: {str(e)}"} + + def get_industry_holdings_detail(self, industry_name: str) -> Dict: + """ + 获取指定行业的详细持仓信息 + + Args: + industry_name: 行业名称 + + Returns: + 包含行业详细持仓信息的字典 + """ + try: + # 1. 获取交易记录数据 + trading_data = self.get_trading_records() + if not trading_data: + return {"success": False, "message": "无法获取交易记录数据"} + + data_list = trading_data.get("dataList", []) + if not data_list: + return {"success": False, "message": "交易记录数据为空"} + + # 2. 筛选该行业的持仓记录 + industry_holdings = [] + + for project in data_list: + project_name = project.get("projectName", "") + stock_code = project.get("stockCode", "") + trading_records = project.get("tradingRecoderList", []) + + # 获取股票所属行业 + industries = self.get_stock_industry(stock_code) + + # 检查是否属于指定行业 + if industry_name in industries: + for record in trading_records: + if record.get("projectStatus") == "published": + notional_principal = float(record.get("notionalPrincipal", 0)) + margin_rate = float(record.get("marginRate", 0)) + create_time = record.get("createTime", "") + + # 计算保证金金额 + margin_amount = self.calculate_margin_amount(notional_principal, margin_rate) + + industry_holdings.append({ + "project_name": project_name, + "stock_code": stock_code, + "notional_principal": notional_principal, + "margin_rate": margin_rate, + "margin_amount": margin_amount, + "create_time": create_time + }) + + # 3. 按保证金金额排序 + industry_holdings.sort(key=lambda x: x["margin_amount"], reverse=True) + + # 4. 生成返回数据 + result = { + "success": True, + "data": { + "industry_name": industry_name, + "total_count": len(industry_holdings), + "total_margin_amount": sum(item["margin_amount"] for item in industry_holdings), + "holdings": industry_holdings + } + } + + logger.info(f"成功获取行业 {industry_name} 的详细持仓信息,共 {len(industry_holdings)} 条记录") + return result + + except Exception as e: + logger.error(f"获取行业详细持仓信息失败: {e}") + return {"success": False, "message": f"获取行业详细持仓信息失败: {str(e)}"} \ No newline at end of file diff --git a/src/valuation_analysis/valuation_chat_bot.py b/src/valuation_analysis/valuation_chat_bot.py new file mode 100644 index 0000000..bb0054c --- /dev/null +++ b/src/valuation_analysis/valuation_chat_bot.py @@ -0,0 +1,334 @@ +# -*- coding: utf-8 -*- +""" +估值指标分析专用聊天机器人 +专门用于分析股票应该使用PE还是PB估值 +""" + +import sys +import os +import logging +from typing import Dict, Any, Optional +from datetime import datetime + +# 添加项目根目录到 Python 路径 +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +from openai import OpenAI +from src.scripts.config import get_random_api_key, get_model + +# 设置日志 +logger = logging.getLogger(__name__) + +class ValuationChatBot: + """估值指标分析专用聊天机器人""" + + def __init__(self, model_type: str = "online_bot"): + """初始化估值分析聊天机器人 + + Args: + model_type: 要使用的模型类型,默认为联网智能体 + """ + try: + # 从配置获取API密钥 + self.api_key = get_random_api_key() + # 从配置获取模型ID + self.model = get_model(model_type) + + logger.info(f"初始化ValuationChatBot,使用模型: {self.model}") + + # 初始化OpenAI客户端 + self.client = OpenAI( + base_url="https://ark.cn-beijing.volces.com/api/v3/bots", + api_key=self.api_key + ) + + # 估值指标分析专用系统提示词 + self.system_message = { + "role": "system", + "content": """你是一名顶级的、注重第一性原理的基本面分析师。你的核心任务是深入剖析一家公司的内在价值驱动因素,并基于此判断“市盈盈率(PE)”和“市净率(PB)”哪个指标能更真实、更核心地反映其价值。 + + **你的分析必须超越简单的行业标签,聚焦于公司的个性化特征。** 即使是同一行业的公司,由于商业模式和财务状况的差异,也可能适用不同的估值指标。 + + **你的决策逻辑框架如下:** + + 1. **【盈利质量与可预测性分析】 - 这是判断PE有效性的基石** + * **分析要点:** 公司的盈利是常态还是偶发?是内生增长还是外部输血?过去5年的盈利记录是否稳定且持续?是否存在大量非经常性损益扭曲了利润?公司的自由现金流状况如何,是否与净利润匹配? + * **决策倾向:** 如果盈利质量高、可预测性强,则PE的权重增加。如果盈利波动巨大、不可持续或为负,则PE的权重降低甚至失效。 + + 2. **【资产价值与商业模式分析】 - 这是判断PB有效性的基石** + * **分析要点:** 公司的核心价值是沉淀在资产负债表上(如厂房、金融资产、土地),还是体现在资产负债表外(如品牌、技术、网络效应、客户关系)?公司的商业模式是“资产驱动型”还是“智力/品牌驱动型”? + * **决策倾向:** 如果公司价值与净资产高度相关(如金融、重资产制造、资源型企业),则PB的权重增加。如果公司是典型的轻资产模式,则PB的权重降低。 + + 3. **【周期性与成长性交叉验证】** + * **分析要点:** 公司所处的行业周期性强弱如何?公司自身是否展现出超越行业的成长性或防御性? + * **决策倾向:** 强周期性会削弱PE在特定时点的有效性,使PB成为更稳健的参照。而强成长性(尤其是有利可图的成长)会显著提升PE的适用性。 + + **最终决策原则:** + + * **优先选择 PE 的核心理由:** 公司具备持续、稳定的盈利能力,并且其核心价值能通过利润得到体现。这是对股东回报最直接的衡量。 + * **优先选择 PB 的核心理由:** 公司的盈利能力不可靠(周期性/亏损),或者其商业模式的根本是基于净资产的规模和质量(如金融业)。PB此时是衡量价值的“锚”或“底线”。 + + **输出要求:** + + 1. **明确结论:** 首先明确推荐PE或PB作为主要估值指标。 + 2. **深入的个股特质分析:** + * **商业模式剖析:** 详细说明公司如何赚钱,其护城河是什么。 + * **财务特征分析:** 重点分析盈利的稳定性与质量、资产的轻重结构、现金流状况。 + * **行业背景补充:** 分析公司在行业中所处的生态位,有何不同于同行的特质。 + 3. **提供决策依据:** 清晰地说明你是如何基于上述三层决策逻辑框架,最终做出选择的。 + 4. **给出合理的估值区间建议:** 基于你选择的指标,并结合公司的历史估值水平和未来成长性,给出一个合理的估值区间。""" + } + + # 对话历史 + self.conversation_history = [self.system_message] + + except Exception as e: + logger.error(f"初始化ValuationChatBot时出错: {str(e)}") + raise + + def chat(self, user_input: str, temperature: float = 0.3, top_p: float = 0.7, max_tokens: int = 2048, frequency_penalty: float = 0.0) -> Dict[str, Any]: + """与AI进行估值指标分析对话 + + Args: + user_input: 用户输入的问题 + temperature: 控制输出的随机性,范围0-2,默认0.3(更确定性) + top_p: 控制输出的多样性,范围0-1,默认0.7 + max_tokens: 控制输出的最大长度,默认2048 + frequency_penalty: 频率惩罚,范围-2.0到2.0,默认0.0 + + Returns: + Dict包含对话结果 + """ + try: + # 添加用户消息到对话历史 + self.conversation_history.append({ + "role": "user", + "content": user_input + }) + + # 调用OpenAI API + response = self.client.chat.completions.create( + model=self.model, + messages=self.conversation_history, + temperature=temperature, + top_p=top_p, + max_tokens=max_tokens, + frequency_penalty=frequency_penalty + ) + + # 获取AI回复 + ai_response = response.choices[0].message.content + + # 添加AI回复到对话历史 + self.conversation_history.append({ + "role": "assistant", + "content": ai_response + }) + + # 保持对话历史在合理长度内(避免token过多) + if len(self.conversation_history) > 10: + # 保留系统消息和最近的对话 + self.conversation_history = [self.system_message] + self.conversation_history[-8:] + + logger.info(f"ValuationChatBot对话成功,回复长度: {len(ai_response)}") + + return { + "success": True, + "response": ai_response, + "model": self.model, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"ValuationChatBot对话失败: {str(e)}") + return { + "success": False, + "error": str(e), + "model": self.model, + "timestamp": datetime.now().isoformat() + } + + def clear_history(self): + """清空对话历史""" + self.conversation_history = [self.system_message] + logger.info("ValuationChatBot对话历史已清空") + + def get_conversation_history(self) -> list: + """获取对话历史""" + return self.conversation_history.copy() + + +class ValuationOfflineChatBot: + """估值指标分析专用离线聊天机器人""" + + def __init__(self, model_type: str = "offline_bot"): + """初始化离线估值分析聊天机器人 + + Args: + model_type: 要使用的模型类型,默认为离线模型 + """ + try: + # 尝试导入配置(参考chat_bot_with_offline.py的方式) + try: + from src.scripts.config import get_model_config + config = get_model_config("tl_qw_private", "GLM") + logger.info("成功从src.scripts.config导入配置") + except ImportError: + try: + from scripts.config import get_model_config + config = get_model_config("volc", "offline_model") + logger.info("成功从scripts.config导入配置") + except ImportError: + logger.warning("无法导入配置模块,使用默认配置") + # 使用默认配置 + config = { + "base_url": "https://ark.cn-beijing.volces.com/api/v3/", + "api_key": "28cfe71a-c6fa-4c5d-9b4e-d8474f0d3b93", + "model": "ep-20250326090920-v7wns" + } + + # 保存配置信息 + self.api_key = config["api_key"] + self.model = config["model"] + self.base_url = config["base_url"] + + logger.info(f"初始化ValuationOfflineChatBot,使用模型: {self.model}") + + # 初始化OpenAI客户端 + self.client = OpenAI( + base_url=self.base_url, + api_key=self.api_key, + timeout=600 + ) + + # 估值指标分析专用系统提示词(针对从分析报告中进行语义理解并提取最终结论) + self.system_message = { + "role": "system", + "content": """你是一个专注于**语义理解和结论提取**的AI。你的唯一任务是阅读一段分析报告,理解其核心论点,并判断作者最终推荐的估值指标是 "PE" 还是 "PB"。 + + **你的核心工作流程:** + + 1. **通读全文**:完整地阅读用户提供的分析报告,理解其对公司业务模式、盈利能力和资产结构的整体评价。 + + 2. **定位结论性语段**:重点关注报告的结尾部分或总结段落。寻找那些**承上启下、做出最终评判**的句子。这些句子不一定包含固定的关键词,但它们在语义上起到了总结和给出最终意见的作用。 + + 3. **进行意图判断**: + * **判断为 "PE" 的信号**:如果结论性语段的中心思想是强调“盈利的稳定性”、“高质量的增长”、“强大的品牌价值”、“轻资产模式的优势”,并最终将这些优势导向了某个估值方法,那么结论就是 "PE"。 + * *例子:* "考虑到该公司强大的品牌护城河和持续稳定的现金流创造能力,通过其盈利水平来评估价值显然是更为恰当的路径。" -> **应判断为 PE** + + * **判断为 "PB" 的信号**:如果结论性语段的中心思想是强调“资产负债表的重要性”、“行业的周期性风险”、“盈利的不可靠性”,或者直接点明其“金融属性”,并基于这些论据做出最终选择,那么结论就是 "PB"。 + * *例子:* "尽管公司短期盈利尚可,但其重资产和强周期的本质意味着盈利波动是常态,因此,基于其净资产的估值方法提供了一个更稳固的价值锚点。" -> **应判断为 PB** + + **你必须遵守的铁律:** + + * **你的任务是理解和提取,不是再次分析**。你必须相信报告原文的逻辑是自洽的,你的工作只是找出它的最终论点。 + * **只输出最终结果**:你的输出**必须且只能是** "PE" 或 "PB"。不要添加任何解释、理由或多余的字符。 + * **处理歧义**:如果在极少数情况下,报告的结论确实模棱两可,无法从语义上明确判断,**请默认输出 "PE"**,以确保程序健壮性。 + """ + } + + # 对话历史 + self.conversation_history = [self.system_message] + + except Exception as e: + logger.error(f"初始化ValuationOfflineChatBot时出错: {str(e)}") + raise + + def chat(self, user_input: str, temperature: float = 0.1, top_p: float = 0.7, max_tokens: int = 1024, frequency_penalty: float = 0.0) -> Dict[str, Any]: + """与离线AI进行估值指标分析对话 + + Args: + user_input: 用户输入的问题 + temperature: 控制输出的随机性,范围0-2,默认0.1(更确定性) + top_p: 控制输出的多样性,范围0-1,默认0.7 + max_tokens: 控制输出的最大长度,默认1024 + frequency_penalty: 频率惩罚,范围-2.0到2.0,默认0.0 + + Returns: + Dict包含对话结果 + """ + try: + # 添加用户消息到对话历史 + self.conversation_history.append({ + "role": "user", + "content": user_input + }) + + # 调用本地GLM模型 + ai_response = self._call_local_model(user_input, temperature, top_p, max_tokens, frequency_penalty) + + # 添加AI回复到对话历史 + self.conversation_history.append({ + "role": "assistant", + "content": ai_response + }) + + # 保持对话历史在合理长度内 + if len(self.conversation_history) > 6: + self.conversation_history = [self.system_message] + self.conversation_history[-4:] + + logger.info(f"ValuationOfflineChatBot对话成功,回复长度: {len(ai_response)}") + + return { + "success": True, + "response": ai_response, + "model": self.model, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"ValuationOfflineChatBot对话失败: {str(e)}") + return { + "success": False, + "error": str(e), + "model": self.model, + "timestamp": datetime.now().isoformat() + } + + def _call_local_model(self, user_input: str, temperature: float = 0.1, top_p: float = 0.7, max_tokens: int = 1024, frequency_penalty: float = 0.0) -> str: + """调用本地GLM模型""" + try: + # 调用本地模型API(使用初始化时创建的客户端) + response = self.client.chat.completions.create( + model=self.model, + messages=self.conversation_history, + temperature=temperature, + top_p=top_p, + max_tokens=max_tokens, + frequency_penalty=frequency_penalty, + timeout=300 + ) + + # 获取AI回复 + ai_response = response.choices[0].message.content + + # 清理回复内容,确保只返回PE或PB + ai_response_clean = ai_response.strip().upper() + if "PE" in ai_response_clean and "PB" not in ai_response_clean: + return "PE" + elif "PB" in ai_response_clean and "PE" not in ai_response_clean: + return "PB" + elif ai_response_clean == "PE" or ai_response_clean == "PB": + return ai_response_clean + else: + # 如果回复不清晰,记录详细信息 + logger.warning(f"本地模型回复不清晰: {ai_response_clean}") + return "PE" # 默认返回PE + + except Exception as e: + logger.error(f"调用本地模型失败: {str(e)}") + return "PE" # 出错时默认返回PE + + def clear_history(self): + """清空对话历史""" + self.conversation_history = [self.system_message] + logger.info("ValuationOfflineChatBot对话历史已清空") + + def get_conversation_history(self) -> list: + """获取对话历史""" + return self.conversation_history.copy() + + + +if __name__ == "__main__": + test_valuation_chat_bot() \ No newline at end of file diff --git a/src/valuation_analysis/valuation_indicator_analyzer.py b/src/valuation_analysis/valuation_indicator_analyzer.py new file mode 100644 index 0000000..acb56ee --- /dev/null +++ b/src/valuation_analysis/valuation_indicator_analyzer.py @@ -0,0 +1,397 @@ +# -*- coding: utf-8 -*- +""" +估值指标分析器 +用于判断股票应该使用PE估值还是PB估值更合理 +""" + +import sys +import os +import logging +from typing import Dict, Any, Optional, Tuple +from datetime import datetime + +# 添加项目根目录到 Python 路径 +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +from src.valuation_analysis.valuation_chat_bot import ValuationChatBot +from src.scripts.config import get_random_api_key, get_model + +# 设置日志 +logger = logging.getLogger(__name__) + +class ValuationIndicatorAnalyzer: + """估值指标分析器""" + + def __init__(self): + """初始化分析器""" + try: + # 初始化联网大模型(使用专用估值分析聊天机器人) + self.online_chatbot = ValuationChatBot(model_type="online_bot") + + # 初始化本地GLM模型(使用专用估值分析离线聊天机器人) + try: + from src.valuation_analysis.valuation_chat_bot import ValuationOfflineChatBot + self.offline_chatbot = ValuationOfflineChatBot(model_type="offline_bot") + self.has_offline_model = True + except ImportError: + logger.warning("无法导入离线模型,将只使用联网模型") + self.offline_chatbot = None + self.has_offline_model = False + + logger.info("估值指标分析器初始化成功") + + except Exception as e: + logger.error(f"初始化估值指标分析器失败: {str(e)}") + raise + + def analyze_valuation_indicator(self, stock_code: str, stock_name: str) -> Dict[str, Any]: + """ + 分析股票应该使用PE还是PB估值 + + Args: + stock_code: 股票代码 + stock_name: 股票名称 + + Returns: + Dict包含分析结果 + """ + try: + logger.info(f"开始分析股票 {stock_name}({stock_code}) 的估值指标") + + # 第一步:使用联网大模型进行初步分析 + online_result = self._analyze_with_online_model(stock_code, stock_name) + + # 第二步:使用本地GLM模型进行格式化和验证 + if self.has_offline_model: + offline_result = self._analyze_with_offline_model(stock_code, stock_name, online_result) + else: + offline_result = online_result + + # 第三步:整合结果 + final_result = self._integrate_results(online_result, offline_result) + + logger.info(f"完成股票 {stock_name}({stock_code}) 的估值指标分析") + return final_result + + except Exception as e: + logger.error(f"分析股票 {stock_name}({stock_code}) 估值指标时出错: {str(e)}") + return { + "success": False, + "error": str(e), + "recommended_indicator": None, + "reasoning": None, + "valuation_range": None + } + + def _analyze_with_online_model(self, stock_code: str, stock_name: str) -> Dict[str, Any]: + """ + 使用联网大模型进行估值指标分析 + + Args: + stock_code: 股票代码 + stock_name: 股票名称 + + Returns: + 联网模型的分析结果 + """ + try: + # 构建用户提示词(让联网大模型专注于分析推理,不输出估值区间) + user_prompt = f"""请深入分析股票代码为{stock_code}、名称为{stock_name}的公司,并判断其更适合使用PE还是PB进行估值。 + + 你的分析应超越简单的行业标签,聚焦于该公司的个性化特征。请遵循以下分析框架,提供详细的、基于第一性原理的分析: + + 1. **盈利质量与可预测性分析**: + * 这家公司的盈利是稳定、持续的,还是波动巨大、难以预测的? + * 其净利润是否真实反映了业务的现金创造能力? + * 是否存在大量非经常性损益影响了其盈利的真实性? + + 2. **资产价值与商业模式分析**: + * 公司的核心价值更多体现在资产负债表内的有形资产(如设备、土地、金融资产),还是表外的无形资产(如品牌、技术、网络效应)? + * 它的商业模式是“资产驱动型”还是“智力/品牌驱动型”? + + 3. **周期性与成长性分析**: + * 公司所处行业的周期性强弱如何?它在周期中的位置是怎样的? + * 公司自身的成长性如何,是高于还是低于行业平均水平? + + 4. **最终决策与依据**: + * 综合以上分析,明确阐述你为什么认为PE或PB是更根本的估值指标。请详细说明你的决策逻辑,将公司的个性化特征与你的结论紧密联系起来。 + + 请确保你的分析是客观、专业、有深度的。在本次分析中,请不要提供具体的估值区间,专注于提供选择估值指标的充分理由。""" + + # 调用联网模型 + response = self.online_chatbot.chat(user_prompt, temperature=0.3, max_tokens=2048) + + if response.get("success"): + return { + "success": True, + "raw_response": response.get("response", ""), + "model": "online" + } + else: + logger.error(f"联网模型分析失败: {response.get('error', '未知错误')}") + return { + "success": False, + "error": response.get('error', '联网模型分析失败'), + "model": "online" + } + + except Exception as e: + logger.error(f"使用联网模型分析时出错: {str(e)}") + return { + "success": False, + "error": str(e), + "model": "online" + } + + def _analyze_with_offline_model(self, stock_code: str, stock_name: str, online_result: Dict[str, Any]) -> Dict[str, Any]: + """ + 使用本地GLM模型提取最终的PE/PB推荐 + + Args: + stock_code: 股票代码 + stock_name: 股票名称 + online_result: 联网模型的分析结果 + + Returns: + 本地模型的分析结果 + """ + try: + # 构建用户提示词(让本地GLM专注于提取最终结果) + user_prompt = f"""你是一个专注于语义理解和结论提取的AI。你的唯一任务是阅读一段分析报告,理解其核心论点,并判断作者最终推荐的估值指标是 "PE" 还是 "PB"。 + + 你的输出必须严格遵守以下规则: + * 你的输出**必须且只能是** "PE" 或 "PB"。 + * 不要添加任何解释、理由或多余的字符。 + * 如果文本结论确实模棱两可,无法明确判断,**请默认输出 "PE"**。 + + 分析内容如下: + --- + {online_result.get('raw_response', '分析失败')} + --- + """ + + # 调用本地模型 + response = self.offline_chatbot.chat(user_prompt, temperature=0.1, max_tokens=10) + + if response.get("success"): + return { + "success": True, + "raw_response": response.get("response", "").strip(), + "model": "offline" + } + else: + logger.error(f"本地模型分析失败: {response.get('error', '未知错误')}") + return { + "success": False, + "error": response.get('error', '本地模型分析失败'), + "model": "offline" + } + + except Exception as e: + logger.error(f"使用本地模型分析时出错: {str(e)}") + return { + "success": False, + "error": str(e), + "model": "offline" + } + + def _integrate_results(self, online_result: Dict[str, Any], offline_result: Dict[str, Any]) -> Dict[str, Any]: + """ + 整合联网模型和本地模型的结果 + + Args: + online_result: 联网模型结果 + offline_result: 本地模型结果 + + Returns: + 整合后的最终结果 + """ + try: + # 如果联网模型成功 + if online_result.get("success"): + # 获取联网模型的分析内容 + online_analysis = online_result.get("raw_response", "") + + # 如果本地模型也成功,使用本地模型提取的PE/PB推荐 + if offline_result.get("success"): + recommended_indicator = offline_result.get("raw_response", "").strip() + + # 验证推荐指标是否有效 + if recommended_indicator in ["PE", "PB"]: + return { + "success": True, + "recommended_indicator": recommended_indicator, + "reasoning": online_analysis + } + else: + # 本地模型输出无效,使用联网模型的结果 + logger.warning(f"本地模型输出无效: {recommended_indicator}") + return { + "success": True, + "recommended_indicator": self._extract_indicator_from_text(online_analysis), + "reasoning": online_analysis + } + else: + # 只有联网模型成功 + return { + "success": True, + "recommended_indicator": self._extract_indicator_from_text(online_analysis), + "reasoning": online_analysis + } + else: + # 联网模型失败 + return { + "success": False, + "error": online_result.get("error", "分析失败") + } + + except Exception as e: + logger.error(f"整合结果时出错: {str(e)}") + return { + "success": False, + "error": str(e) + } + + def _extract_indicator_from_text(self, text: str) -> Optional[str]: + """从文本中提取推荐的估值指标""" + try: + import re + + # 查找推荐估值指标 + patterns = [ + r'推荐估值指标[::]\s*(PE|PB)', + r'应该使用\s*(PE|PB)\s*估值', + r'选择\s*(PE|PB)\s*作为', + r'优先选择\s*(PE|PB)' + ] + + for pattern in patterns: + match = re.search(pattern, text, re.IGNORECASE) + if match: + return match.group(1).upper() + + return None + + except Exception as e: + logger.error(f"提取估值指标时出错: {str(e)}") + return None + + def _extract_reasoning_from_text(self, text: str) -> Dict[str, str]: + """从文本中提取推理过程""" + try: + reasoning = { + "industry_analysis": "", + "business_model_analysis": "", + "financial_analysis": "", + "decision_basis": "" + } + + # 简单的文本提取逻辑 + lines = text.split('\n') + current_section = None + + for line in lines: + line = line.strip() + if not line: + continue + + if '行业特征分析' in line or '行业分析' in line: + current_section = 'industry_analysis' + elif '商业模式分析' in line or '业务模式分析' in line: + current_section = 'business_model_analysis' + elif '财务特征分析' in line or '财务分析' in line: + current_section = 'financial_analysis' + elif '决策依据' in line or '选择依据' in line: + current_section = 'decision_basis' + elif current_section and line: + reasoning[current_section] += line + " " + + # 清理空白内容 + for key in reasoning: + reasoning[key] = reasoning[key].strip() + + return reasoning + + except Exception as e: + logger.error(f"提取推理过程时出错: {str(e)}") + return { + "industry_analysis": "", + "business_model_analysis": "", + "financial_analysis": "", + "decision_basis": "" + } + + def _extract_valuation_range_from_text(self, text: str) -> Dict[str, Any]: + """从文本中提取估值区间""" + try: + import re + + # 查找PE或PB的估值区间 + pe_pattern = r'PE.*?(\d+(?:\.\d+)?)[-~](\d+(?:\.\d+)?)' + pb_pattern = r'PB.*?(\d+(?:\.\d+)?)[-~](\d+(?:\.\d+)?)' + + pe_match = re.search(pe_pattern, text, re.IGNORECASE) + pb_match = re.search(pb_pattern, text, re.IGNORECASE) + + if pe_match: + return { + "type": "PE", + "min_value": float(pe_match.group(1)), + "max_value": float(pe_match.group(2)), + "unit": "倍" + } + elif pb_match: + return { + "type": "PB", + "min_value": float(pb_match.group(1)), + "max_value": float(pb_match.group(2)), + "unit": "倍" + } + else: + return { + "type": None, + "min_value": None, + "max_value": None, + "unit": "倍" + } + + except Exception as e: + logger.error(f"提取估值区间时出错: {str(e)}") + return { + "type": None, + "min_value": None, + "max_value": None, + "unit": "倍" + } + + +def test_valuation_analyzer(): + """测试估值指标分析器""" + try: + analyzer = ValuationIndicatorAnalyzer() + + # 测试用例 + test_cases = [ + ("000001", "平安银行"), # 金融业,应该推荐PB + ("000002", "万科A"), # 房地产,应该推荐PB + ("000858", "五粮液"), # 消费品,应该推荐PE + ("002415", "海康威视"), # 科技股,应该推荐PE + ] + + for stock_code, stock_name in test_cases: + print(f"\n测试股票: {stock_name}({stock_code})") + result = analyzer.analyze_valuation_indicator(stock_code, stock_name) + + if result.get("success"): + print(f"推荐指标: {result.get('recommended_indicator')}") + print(f"推理过程: {result.get('reasoning')}") + print(f"估值区间: {result.get('valuation_range')}") + else: + print(f"分析失败: {result.get('error')}") + + except Exception as e: + print(f"测试失败: {str(e)}") + + +if __name__ == "__main__": + test_valuation_analyzer() \ No newline at end of file