import sys import os # 添加项目根目录到 Python 路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from flask import Flask, jsonify, request from flask_cors import CORS import logging # 导入企业筛选器 from src.fundamentals_llm.enterprise_screener import EnterpriseScreener # 设置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 创建 Flask 应用 app = Flask(__name__) CORS(app) # 启用跨域请求支持 # 创建企业筛选器实例 screener = EnterpriseScreener() @app.route('/api/health', methods=['GET']) def health_check(): """健康检查接口""" return jsonify({"status": "ok", "message": "Service is running"}) @app.route('/api/stock_profiles', methods=['GET']) def get_stock_profiles(): """获取企业画像筛选结果列表""" return jsonify({ "profiles": [ {"id": 1, "name": "高成长潜力企业", "method": "high_growth"}, {"id": 2, "name": "稳定型龙头企业", "method": "stable_leaders"}, {"id": 3, "name": "短期投资机会", "method": "short_term"}, {"id": 4, "name": "价值型投资标的", "method": "value_investment"}, {"id": 5, "name": "困境反转机会", "method": "turnaround"}, {"id": 6, "name": "风险规避标的", "method": "risk_averse"}, {"id": 7, "name": "创新驱动型企业", "method": "innovation_driven"}, {"id": 8, "name": "行业整合机会", "method": "industry_integration"}, {"id": 9, "name": "推荐投资企业", "method": "recommended"} ] }) @app.route('/api/screen/', methods=['GET']) def screen_stocks(profile_type): """根据企业画像类型筛选股票 Args: profile_type: 企业画像类型 (method 字段值) """ try: # 解析limit参数 limit = request.args.get('limit', None, type=int) # 处理别名 if profile_type == 'innovation_driven': profile_type = 'innovation' elif profile_type == 'industry_integration': profile_type = 'integration' # 映射画像类型到筛选方法 method_map = { 'high_growth': screener.screen_high_growth_stocks, 'stable_leaders': screener.screen_stable_leaders, 'short_term': screener.screen_short_term_opportunities, 'value_investment': screener.screen_value_investment_stocks, 'turnaround': screener.screen_turnaround_opportunities, 'risk_averse': screener.screen_risk_averse_stocks, 'innovation': screener.screen_innovation_driven_stocks, 'integration': screener.screen_industry_integration_stocks, 'recommended': screener.screen_multi_term_investment_stocks } # 检查请求的画像类型是否有效 if profile_type not in method_map: return jsonify({ "status": "error", "message": f"Invalid profile type: {profile_type}" }), 400 # 调用相应的筛选方法 stocks = method_map[profile_type](limit=limit) # 转换结果格式 result = [] for code, name in stocks: result.append({ "code": code, "name": name }) return jsonify({ "status": "success", "profile_type": profile_type, "count": len(result), "stocks": result }) except Exception as e: logger.error(f"筛选股票失败: {str(e)}") return jsonify({ "status": "error", "message": f"Failed to screen stocks: {str(e)}" }), 500 # 条件筛选接口,接受自定义的筛选条件 @app.route('/api/screen/custom', methods=['POST']) def screen_custom(): """使用自定义条件筛选股票""" try: # 从请求体获取筛选条件 data = request.get_json() if not data or 'conditions' not in data: return jsonify({ "status": "error", "message": "Missing conditions in request body" }), 400 # 调用通用筛选方法 stocks = screener._screen_stocks_by_conditions(data['conditions']) # 转换结果格式 result = [] for code, name in stocks: result.append({ "code": code, "name": name }) return jsonify({ "status": "success", "count": len(result), "stocks": result }) except Exception as e: logger.error(f"自定义筛选股票失败: {str(e)}") return jsonify({ "status": "error", "message": f"Failed to screen stocks with custom conditions: {str(e)}" }), 500 @app.route('/api/generate_reports', methods=['POST']) def generate_reports(): """为指定的股票列表生成PDF投资报告 请求体格式: { "stocks": [ ["603690", "至纯科技"], ["688053", "思科瑞"], ["300750", "宁德时代"] ] } """ try: # 从请求体获取股票列表 data = request.get_json() if not data or 'stocks' not in data or not isinstance(data['stocks'], list): return jsonify({ "status": "error", "message": "请求格式错误: 需要提供股票列表" }), 400 # 检查股票列表格式 stocks = data['stocks'] if not all(isinstance(item, list) and len(item) == 2 for item in stocks): return jsonify({ "status": "error", "message": "股票列表格式错误: 每个股票应该是 [代码, 名称] 格式" }), 400 # 导入 PDF 生成器模块 try: from src.fundamentals_llm.pdf_generator import generate_investment_report except ImportError: try: from fundamentals_llm.pdf_generator import generate_investment_report except ImportError as e: logger.error(f"无法导入 PDF 生成器模块: {str(e)}") return jsonify({ "status": "error", "message": f"服务器配置错误: PDF 生成器模块不可用, 错误详情: {str(e)}" }), 500 # 生成报告 generated_reports = [] for stock_code, stock_name in stocks: try: # 调用 PDF 生成器 report_path = generate_investment_report(stock_code, stock_name) generated_reports.append({ "code": stock_code, "name": stock_name, "report_path": report_path, "status": "success" }) logger.info(f"成功生成 {stock_name}({stock_code}) 的投资报告: {report_path}") except Exception as e: logger.error(f"生成 {stock_name}({stock_code}) 的投资报告失败: {str(e)}") generated_reports.append({ "code": stock_code, "name": stock_name, "status": "error", "error": str(e) }) # 返回结果 return jsonify({ "status": "success", "message": f"处理了 {len(stocks)} 个股票的报告生成请求", "reports": generated_reports }) except Exception as e: logger.error(f"处理报告生成请求失败: {str(e)}") return jsonify({ "status": "error", "message": f"Failed to process report generation request: {str(e)}" }), 500 @app.route('/api/recommended_stocks', methods=['GET']) def recommended_stocks(): """获取系统推荐的投资股票 该接口会返回系统基于基本面分析推荐的股票列表,具有较高投资价值 可以通过以下参数进行过滤: - profile_type: 企业画像类型,例如 'high_growth', 'stable_leaders' 等 - limit: 限制返回的股票数量 """ try: # 解析请求参数 profile_type = request.args.get('profile_type', 'high_growth') limit = request.args.get('limit', 10, type=int) # 导入筛选器模块 try: from src.fundamentals_llm.enterprise_screener import EnterpriseScreener except ImportError: try: from fundamentals_llm.enterprise_screener import EnterpriseScreener except ImportError as e: logger.error(f"无法导入企业筛选器模块: {str(e)}") return jsonify({ "status": "error", "message": f"服务器配置错误: 企业筛选器模块不可用, 错误详情: {str(e)}" }), 500 # 创建筛选器实例 screener = EnterpriseScreener() # 根据类型获取推荐股票 try: if profile_type == 'recommended': stocks = screener.screen_multi_term_investment_stocks(limit=limit) elif profile_type == 'high_growth': stocks = screener.screen_high_growth_stocks(limit=limit) elif profile_type == 'stable_leaders': stocks = screener.screen_stable_leaders(limit=limit) elif profile_type == 'short_term': stocks = screener.screen_short_term_opportunities(limit=limit) elif profile_type == 'value_investment': stocks = screener.screen_value_investment_stocks(limit=limit) elif profile_type == 'turnaround': stocks = screener.screen_turnaround_opportunities(limit=limit) elif profile_type == 'risk_averse': stocks = screener.screen_risk_averse_stocks(limit=limit) elif profile_type == 'innovation' or profile_type == 'innovation_driven': stocks = screener.screen_innovation_driven_stocks(limit=limit) elif profile_type == 'integration' or profile_type == 'industry_integration': stocks = screener.screen_industry_integration_stocks(limit=limit) elif profile_type == 'custom' and 'conditions' in request.args: # 解析自定义条件 try: import json conditions = json.loads(request.args.get('conditions')) stocks = screener.screen_by_custom_conditions(conditions, limit=limit) except Exception as e: return jsonify({ "status": "error", "message": f"解析自定义条件失败: {str(e)}" }), 400 else: return jsonify({ "status": "error", "message": f"不支持的企业画像类型: {profile_type}" }), 400 except Exception as e: logger.error(f"筛选股票时出错: {str(e)}") return jsonify({ "status": "error", "message": f"筛选股票时出错: {str(e)}" }), 500 # 格式化返回结果 formatted_stocks = [] for stock_code, stock_name in stocks: formatted_stocks.append({ "code": stock_code, "name": stock_name }) # 返回结果 return jsonify({ "status": "success", "profile_type": profile_type, "count": len(formatted_stocks), "stocks": formatted_stocks }) except Exception as e: logger.error(f"获取推荐股票失败: {str(e)}") return jsonify({ "status": "error", "message": f"获取推荐股票失败: {str(e)}" }), 500 @app.route('/api/analyze_and_recommend', methods=['POST']) def analyze_and_recommend(): """分析指定企业列表,生成投资建议,并筛选推荐投资的企业 请求体格式: { "stocks": [ ["603690", "至纯科技"], ["688053", "思科瑞"], ["300750", "宁德时代"] ], "limit": 10 // 可选,限制返回推荐股票的数量 } """ try: # 从请求体获取股票列表 data = request.get_json() if not data or 'stocks' not in data or not isinstance(data['stocks'], list): return jsonify({ "status": "error", "message": "请求格式错误: 需要提供股票列表" }), 400 # 检查股票列表格式 stocks = data['stocks'] if not all(isinstance(item, list) and len(item) == 2 for item in stocks): return jsonify({ "status": "error", "message": "股票列表格式错误: 每个股票应该是 [代码, 名称] 格式" }), 400 # 解析limit参数 limit = data.get('limit', 10) if not isinstance(limit, int) or limit <= 0: limit = 10 # 导入必要的聊天机器人模块 try: # 首先尝试导入聊天机器人模块 try: from src.fundamentals_llm.chat_bot import ChatBot as OnlineChatBot logger.info("成功从 src.fundamentals_llm.chat_bot 导入 ChatBot") except ImportError as e1: try: from fundamentals_llm.chat_bot import ChatBot as OnlineChatBot logger.info("成功从 fundamentals_llm.chat_bot 导入 ChatBot") except ImportError as e2: logger.error(f"无法导入在线聊天机器人模块: {str(e1)}, {str(e2)}") return jsonify({ "status": "error", "message": f"服务器配置错误: 聊天机器人模块不可用,错误详情: {str(e2)}" }), 500 # 然后尝试导入离线聊天机器人模块 try: from src.fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot logger.info("成功从 src.fundamentals_llm.chat_bot_with_offline 导入 ChatBot") except ImportError as e1: try: from fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot logger.info("成功从 fundamentals_llm.chat_bot_with_offline 导入 ChatBot") except ImportError as e2: logger.warning(f"无法导入离线聊天机器人模块: {str(e1)}, {str(e2)}") # 这里可以继续执行,因为某些功能可能不需要离线模型 # 最后导入基本面分析器 try: from src.fundamentals_llm.fundamental_analysis import FundamentalAnalyzer logger.info("成功从 src.fundamentals_llm.fundamental_analysis 导入 FundamentalAnalyzer") except ImportError as e1: try: from fundamentals_llm.fundamental_analysis import FundamentalAnalyzer logger.info("成功从 fundamentals_llm.fundamental_analysis 导入 FundamentalAnalyzer") except ImportError as e2: logger.error(f"无法导入基本面分析模块: {str(e1)}, {str(e2)}") return jsonify({ "status": "error", "message": f"服务器配置错误: 基本面分析模块不可用,错误详情: {str(e2)}" }), 500 except Exception as e: logger.error(f"导入必要模块时出错: {str(e)}") return jsonify({ "status": "error", "message": f"服务器配置错误: 导入必要模块时出错,错误详情: {str(e)}" }), 500 # 创建基本面分析器实例 analyzer = FundamentalAnalyzer() # 为每个股票生成投资建议 investment_advices = [] for stock_code, stock_name in stocks: try: # 生成投资建议 success, advice, reasoning, references = analyzer.query_analysis( stock_code, stock_name, "investment_advice" ) if success: investment_advices.append({ "code": stock_code, "name": stock_name, "advice": advice, "reasoning": reasoning, "references": references, "status": "success" }) logger.info(f"成功生成 {stock_name}({stock_code}) 的投资建议") else: investment_advices.append({ "code": stock_code, "name": stock_name, "status": "error", "error": advice }) logger.error(f"生成 {stock_name}({stock_code}) 的投资建议失败: {advice}") except Exception as e: logger.error(f"处理 {stock_name}({stock_code}) 时出错: {str(e)}") investment_advices.append({ "code": stock_code, "name": stock_name, "status": "error", "error": str(e) }) # 导入企业筛选器 try: from src.fundamentals_llm.enterprise_screener import EnterpriseScreener except ImportError: try: from fundamentals_llm.enterprise_screener import EnterpriseScreener except ImportError as e: logger.error(f"无法导入企业筛选器模块: {str(e)}") return jsonify({ "status": "error", "message": f"服务器配置错误: 企业筛选器模块不可用,错误详情: {str(e)}" }), 500 # 创建筛选器实例 screener = EnterpriseScreener() # 筛选符合推荐投资条件的股票 # 获取传入的所有股票代码 input_stock_codes = set(code for code, _ in stocks) # 获取所有符合推荐投资条件的股票 recommended_stocks = screener.screen_multi_term_investment_stocks(limit=limit) # 筛选出传入列表中符合推荐投资条件的股票 recommended_input_stocks = [] for code, name in recommended_stocks: if code in input_stock_codes: recommended_input_stocks.append({ "code": code, "name": name }) # 返回结果 return jsonify({ "status": "success", "total_input_stocks": len(stocks), "investment_advices": investment_advices, "recommended_stocks": { "count": len(recommended_input_stocks), "stocks": recommended_input_stocks } }) except Exception as e: logger.error(f"分析和推荐股票失败: {str(e)}") return jsonify({ "status": "error", "message": f"分析和推荐股票失败: {str(e)}" }), 500 @app.route('/api/comprehensive_analysis', methods=['POST']) def comprehensive_analysis(): """综合分析接口 - 组合多种功能和参数 请求体格式: { "stocks": [ ["603690", "至纯科技"], ["688053", "思科瑞"], ["300750", "宁德时代"] ], "generate_pdf": true, "limit": 10, // 可选,限制返回推荐股票的数量 "profile_filter": { "type": "custom", // 可选值: 预定义的画像类型或 "custom" "profile_name": "high_growth", // 当 type = 预定义画像类型时使用 "conditions": [ // 当 type = "custom" 时使用 { "dimension": "financial_report", "field": "financial_report_level", "operator": ">=", "value": 1 }, // 其他条件... ] } } """ try: # 从请求体获取参数 data = request.get_json() if not data or 'stocks' not in data or not isinstance(data['stocks'], list): return jsonify({ "status": "error", "message": "请求格式错误: 需要提供股票列表" }), 400 # 检查股票列表格式 stocks = data['stocks'] if not all(isinstance(item, list) and len(item) == 2 for item in stocks): return jsonify({ "status": "error", "message": "股票列表格式错误: 每个股票应该是 [代码, 名称] 格式" }), 400 # 解析其他参数 generate_pdf = data.get('generate_pdf', False) profile_filter = data.get('profile_filter', None) limit = data.get('limit', 10) if not isinstance(limit, int) or limit <= 0: limit = 10 # 导入必要的聊天机器人模块 try: # 首先尝试导入聊天机器人模块 try: from src.fundamentals_llm.chat_bot import ChatBot as OnlineChatBot logger.info("成功从 src.fundamentals_llm.chat_bot 导入 ChatBot") except ImportError as e1: try: from fundamentals_llm.chat_bot import ChatBot as OnlineChatBot logger.info("成功从 fundamentals_llm.chat_bot 导入 ChatBot") except ImportError as e2: logger.error(f"无法导入在线聊天机器人模块: {str(e1)}, {str(e2)}") return jsonify({ "status": "error", "message": f"服务器配置错误: 聊天机器人模块不可用,错误详情: {str(e2)}" }), 500 # 然后尝试导入离线聊天机器人模块 try: from src.fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot logger.info("成功从 src.fundamentals_llm.chat_bot_with_offline 导入 ChatBot") except ImportError as e1: try: from fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot logger.info("成功从 fundamentals_llm.chat_bot_with_offline 导入 ChatBot") except ImportError as e2: logger.warning(f"无法导入离线聊天机器人模块: {str(e1)}, {str(e2)}") # 这里可以继续执行,因为某些功能可能不需要离线模型 # 最后导入基本面分析器 try: from src.fundamentals_llm.fundamental_analysis import FundamentalAnalyzer logger.info("成功从 src.fundamentals_llm.fundamental_analysis 导入 FundamentalAnalyzer") except ImportError as e1: try: from fundamentals_llm.fundamental_analysis import FundamentalAnalyzer logger.info("成功从 fundamentals_llm.fundamental_analysis 导入 FundamentalAnalyzer") except ImportError as e2: logger.error(f"无法导入基本面分析模块: {str(e1)}, {str(e2)}") return jsonify({ "status": "error", "message": f"服务器配置错误: 基本面分析模块不可用,错误详情: {str(e2)}" }), 500 except Exception as e: logger.error(f"导入必要模块时出错: {str(e)}") return jsonify({ "status": "error", "message": f"服务器配置错误: 导入必要模块时出错,错误详情: {str(e)}" }), 500 # 创建基本面分析器实例 analyzer = FundamentalAnalyzer() # 为每个股票生成投资建议 investment_advices = [] for stock_code, stock_name in stocks: try: # 生成投资建议 success, advice, reasoning, references = analyzer.query_analysis( stock_code, stock_name, "investment_advice" ) if success: investment_advices.append({ "code": stock_code, "name": stock_name, "advice": advice, "reasoning": reasoning, "references": references, "status": "success" }) logger.info(f"成功生成 {stock_name}({stock_code}) 的投资建议") else: investment_advices.append({ "code": stock_code, "name": stock_name, "status": "error", "error": advice }) logger.error(f"生成 {stock_name}({stock_code}) 的投资建议失败: {advice}") except Exception as e: logger.error(f"处理 {stock_name}({stock_code}) 时出错: {str(e)}") investment_advices.append({ "code": stock_code, "name": stock_name, "status": "error", "error": str(e) }) # 生成PDF报告(如果需要) pdf_results = [] if generate_pdf: try: # 导入PDF生成器模块 try: from src.fundamentals_llm.pdf_generator import generate_investment_report except ImportError: try: from fundamentals_llm.pdf_generator import generate_investment_report except ImportError as e: logger.error(f"无法导入 PDF 生成器模块: {str(e)}") return jsonify({ "status": "error", "message": f"服务器配置错误: PDF 生成器模块不可用, 错误详情: {str(e)}" }), 500 # 生成报告 for stock_code, stock_name in stocks: try: # 调用 PDF 生成器 report_path = generate_investment_report(stock_code, stock_name) pdf_results.append({ "code": stock_code, "name": stock_name, "report_path": report_path, "status": "success" }) logger.info(f"成功生成 {stock_name}({stock_code}) 的投资报告: {report_path}") except Exception as e: logger.error(f"生成 {stock_name}({stock_code}) 的投资报告失败: {str(e)}") pdf_results.append({ "code": stock_code, "name": stock_name, "status": "error", "error": str(e) }) except Exception as e: logger.error(f"处理PDF生成请求失败: {str(e)}") pdf_results = [] # 应用企业画像筛选 filtered_stocks = [] if profile_filter: try: # 导入企业筛选器 try: from src.fundamentals_llm.enterprise_screener import EnterpriseScreener except ImportError: try: from fundamentals_llm.enterprise_screener import EnterpriseScreener except ImportError as e: logger.error(f"无法导入企业筛选器模块: {str(e)}") return jsonify({ "status": "error", "message": f"服务器配置错误: 企业筛选器模块不可用,错误详情: {str(e)}" }), 500 # 创建筛选器实例 screener = EnterpriseScreener() # 根据筛选类型获取推荐股票 filter_type = profile_filter.get('type', 'custom') # 如果是自定义条件筛选 if filter_type == 'custom': conditions = profile_filter.get('conditions', []) if conditions: print(f"使用自定义条件筛选: {len(conditions)} 个条件") all_stocks = screener.screen_by_custom_conditions(conditions, limit=limit) else: # 如果没有提供条件,使用默认的high_growth类型 print("未提供自定义条件,使用默认的high_growth类型") all_stocks = screener.screen_high_growth_stocks(limit=limit) # 如果是预定义类型 else: # 直接使用type作为画像类型,如果用户同时提供了profile_name也兼容处理 profile_name = profile_filter.get('profile_name', filter_type) print(f"使用预定义画像类型: {profile_name}") if profile_name == 'high_growth': all_stocks = screener.screen_high_growth_stocks(limit=limit) elif profile_name == 'stable_leaders': all_stocks = screener.screen_stable_leaders(limit=limit) elif profile_name == 'short_term': all_stocks = screener.screen_short_term_opportunities(limit=limit) elif profile_name == 'value_investment': all_stocks = screener.screen_value_investment_stocks(limit=limit) elif profile_name == 'turnaround': all_stocks = screener.screen_turnaround_opportunities(limit=limit) elif profile_name == 'risk_averse': all_stocks = screener.screen_risk_averse_stocks(limit=limit) elif profile_name == 'innovation' or profile_name == 'innovation_driven': all_stocks = screener.screen_innovation_driven_stocks(limit=limit) elif profile_name == 'integration' or profile_name == 'industry_integration': all_stocks = screener.screen_industry_integration_stocks(limit=limit) elif profile_name == 'recommended': all_stocks = screener.screen_multi_term_investment_stocks(limit=limit) else: # 如果类型无效,使用默认的high_growth类型 logger.warning(f"无效的画像类型: {profile_name},使用默认的high_growth类型") all_stocks = screener.screen_high_growth_stocks(limit=limit) # 获取传入的所有股票代码 input_stock_codes = set(code for code, _ in stocks) # 筛选出传入列表中符合条件的股票 for code, name in all_stocks: if code in input_stock_codes: filtered_stocks.append({ "code": code, "name": name }) logger.info(f"筛选出 {len(filtered_stocks)} 个符合条件的股票") except Exception as e: logger.error(f"应用企业画像筛选失败: {str(e)}") filtered_stocks = [] # 返回结果 response = { "status": "success", "total_input_stocks": len(stocks), "investment_advices": investment_advices } if profile_filter: response["filtered_stocks"] = { "count": len(filtered_stocks), "stocks": filtered_stocks } if generate_pdf: response["pdf_results"] = { "count": len(pdf_results), "reports": pdf_results } return jsonify(response) except Exception as e: logger.error(f"综合分析失败: {str(e)}") return jsonify({ "status": "error", "message": f"综合分析失败: {str(e)}" }), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True)