stock_fundamentals/src/fundamentals_llm/fundamental_analysis.py

1503 lines
63 KiB
Python
Raw Normal View History

2025-04-02 13:52:34 +08:00
import logging
from typing import Dict, List, Optional, Callable, Tuple
# 修改导入路径,使用相对导入
try:
# 尝试相对导入
from .chat_bot import ChatBot
from .chat_bot_with_offline import ChatBot as OfflineChatBot
from .fundamental_analysis_database import get_db, save_analysis_result, update_analysis_result, get_analysis_result
from .pdf_generator import PDFGenerator
except ImportError:
# 如果相对导入失败,尝试尝试绝对导入
try:
from src.fundamentals_llm.chat_bot import ChatBot
from src.fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot
from src.fundamentals_llm.fundamental_analysis_database import get_db, save_analysis_result, update_analysis_result, get_analysis_result
from src.fundamentals_llm.pdf_generator import PDFGenerator
except ImportError:
# 最后尝试直接导入适用于当前目录已在PYTHONPATH中的情况
from chat_bot import ChatBot
from chat_bot_with_offline import ChatBot as OfflineChatBot
from fundamental_analysis_database import get_db, save_analysis_result, update_analysis_result, get_analysis_result
from pdf_generator import PDFGenerator
import json
import re
# 设置日志记录
logger = logging.getLogger(__name__)
class FundamentalAnalyzer:
"""基本面分析器"""
def __init__(self):
"""初始化分析器"""
# 使用联网模型进行基本面分析
self.chat_bot = ChatBot(model_type="online_bot")
# 使用离线模型进行其他分析
self.offline_bot = OfflineChatBot(platform="volc", model_type="offline_model")
self.db = next(get_db())
# 定义维度映射
self.dimension_methods = {
"company_profile": self.analyze_company_profile,
"management_ownership": self.analyze_management_ownership,
"financial_report": self.analyze_financial_report,
"industry_competition": self.analyze_industry_competition,
"recent_projects": self.analyze_recent_projects,
"stock_discussion": self.analyze_stock_discussion,
"industry_cooperation": self.analyze_industry_cooperation,
"target_price": self.analyze_target_price,
"investment_advice": self.generate_investment_advice
}
def query_analysis(self, stock_code: str, stock_name: str, dimension: str) -> Tuple[bool, str, Optional[str], Optional[list]]:
"""查询分析结果,如果不存在则生成新的分析
Args:
stock_code: 股票代码
stock_name: 股票名称
dimension: 分析维度
Returns:
Tuple[bool, str, Optional[str], Optional[list]]:
- 是否成功
- 分析结果
- 推理过程如果有
- 参考资料如果有
"""
try:
# 检查维度是否有效
if dimension not in self.dimension_methods:
return False, f"无效的分析维度: {dimension}", None, None
# 查询数据库
result = get_analysis_result(self.db, stock_code, dimension)
if result:
# 如果存在结果,直接返回
logger.info(f"从数据库获取到 {stock_name}({stock_code}) 的 {dimension} 分析结果")
return True, result.ai_response, result.reasoning_process, result.references
# 如果不存在,生成新的分析
logger.info(f"数据库中未找到 {stock_name}({stock_code}) 的 {dimension} 分析结果,开始生成")
success = self.dimension_methods[dimension](stock_code, stock_name)
if success:
# 重新查询数据库获取结果
result = get_analysis_result(self.db, stock_code, dimension)
if result:
return True, result.ai_response, result.reasoning_process, result.references
return False, f"生成 {dimension} 分析失败", None, None
except Exception as e:
logger.error(f"查询分析结果失败: {str(e)}")
return False, f"查询失败: {str(e)}", None, None
def _remove_references_from_response(self, response: str) -> str:
"""从响应中移除参考资料部分
Args:
response: 原始响应文本
Returns:
str: 移除参考资料后的响应文本
"""
# # 查找"参考资料:"的位置
# ref_start = response.find("参考资料:")
# if ref_start != -1:
# # 如果找到参考资料,只保留前面的部分,并移除末尾的换行符
# return response[:ref_start].rstrip()
return response.strip()
def analyze_company_profile(self, stock_code: str, stock_name: str) -> bool:
"""分析公司简介"""
try:
# 构建提示词
prompt = f"""请对{stock_name}({stock_code})进行公司简介分析严格要求输出控制在500字以内主营业务介绍300字成立背景与历程200字请严格按照以下格式输出
1. 主营业务介绍
- 核心业务领域和主要产品
- 技术特点和产业链布局
- 企业的绝对亮点行业地位竞争优势客户资源等
2. 成立背景与发展历程
- 成立背景和重要发展节点
- 关键战略决策和转型
请提供专业客观的分析突出关键信息避免冗长描述"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
return save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="company_profile",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
except Exception as e:
logger.error(f"分析公司简介失败: {str(e)}")
return False
def analyze_management_ownership(self, stock_code: str, stock_name: str) -> bool:
"""分析实控人和管理层持股情况"""
try:
prompt = f"""请对{stock_name}({stock_code})的实控人和管理层持股情况进行简要分析要求输出控制在300字以内请严格按照以下格式输出
1. 实控人情况
- 实控人姓名
- 行业地位如有
- 持股比例冻结/解禁
2. 管理层持股
- 主要管理层持股比例
- 近3年增减持情况
请提供专业客观的分析突出关键信息避免冗长描述"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="management_ownership",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取实控人和管理层信息并更新结果
if success:
self.extract_management_info(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析实控人和管理层持股情况失败: {str(e)}")
return False
def extract_management_info(self, ownership_text: str, stock_code: str, stock_name: str) -> Dict[str, int]:
"""从实控人和管理层持股分析中提取持股情况和能力评价并更新数据库
Args:
ownership_text: 完整的实控人和管理层持股分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
Dict[str, int]: 包含shareholding和ability的字典
"""
try:
# 提取持股情况
shareholding = self._extract_shareholding_status(ownership_text, stock_code, stock_name)
# 提取领军人物和处罚情况
ability = self._extract_management_ability(ownership_text, stock_code, stock_name)
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "management_ownership")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="management_ownership",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"shareholding": shareholding,
"ability": ability
}
)
logger.info(f"已更新实控人和管理层信息到数据库: shareholding={shareholding}, ability={ability}")
return {"shareholding": shareholding, "ability": ability}
except Exception as e:
logger.error(f"提取实控人和管理层信息失败: {str(e)}")
return {"shareholding": 0, "ability": 0}
def _extract_shareholding_status(self, ownership_text: str, stock_code: str, stock_name: str) -> int:
"""从实控人和管理层持股分析中提取持股减持情况
Args:
ownership_text: 完整的实控人和管理层持股分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 减持情况评价 (1:减持低, 0:减持适中, -1:减持高)
"""
try:
# 使用在线模型分析减持情况
prompt = f"""请对{stock_name}({stock_code})的大股东和管理层近三年减持情况进行专业分析,并返回对应的数值评级:
- 如果大股东或管理层近三年减持比例很低或次数很低减持比例低于2%减持次数少于5减持后持股比例仍然高于5%返回数值"1"
- 如果大股东减持比例适中减持比例在2%-5%之间减持次数不超过10次减持后持股比例仍然高于5%返回数值"0"
- 如果近三年减持次数较多或比例较高且大股东持股比例已经减持到5%以下返回数值"-1"
持股分析内容
{ownership_text}
请仅返回一个数值10-1不要包含任何解释或说明"""
# 使用在线模型进行分析
response = self.chat_bot.chat(prompt)
shareholding_value_str = response["response"].strip()
# 提取数值
shareholding_value = 0 # 默认值
# 尝试从响应中提取数值
import re
value_match = re.search(r"([-]?[0-9])", shareholding_value_str)
if value_match:
try:
shareholding_value = int(value_match.group(1))
# 确保值在有效范围内
if shareholding_value < -1 or shareholding_value > 1:
logger.warning(f"提取的持股情况值超出范围: {shareholding_value}设置为默认值0")
shareholding_value = 0
except ValueError:
logger.warning(f"无法将提取的持股情况值转换为整数设置为默认值0")
return shareholding_value
except Exception as e:
logger.error(f"提取持股情况失败: {str(e)}")
return 0
def _extract_management_ability(self, ownership_text: str, stock_code: str, stock_name: str) -> int:
"""从实控人和管理层持股分析中提取管理层能力评价
Args:
ownership_text: 完整的实控人和管理层持股分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 管理层能力评价 (1:有领军人物, 0:一般, -1:有处罚/风险)
"""
try:
# 使用在线模型分析管理层能力
prompt = f"""请对{stock_name}({stock_code})的实控人和高管团队进行深入研究和专业分析,并返回对应的数值评级:
- 如果实控人或高管中存在行业领军人物返回数值"1"
- 如果没有明显的行业领军人物返回数值"0"
- 如果实控人或高管存在被证监会处罚情况或者存在司法犯罪可能性返回数值"-1"
实控人和管理层信息
{ownership_text}
请仅返回一个数值10-1不要包含任何解释或说明"""
# 使用在线模型进行分析
response = self.chat_bot.chat(prompt)
ability_value_str = response["response"].strip()
# 提取数值
ability_value = 0 # 默认值
# 尝试从响应中提取数值
import re
value_match = re.search(r"([-]?[0-9])", ability_value_str)
if value_match:
try:
ability_value = int(value_match.group(1))
# 确保值在有效范围内
if ability_value < -1 or ability_value > 1:
logger.warning(f"提取的管理层能力值超出范围: {ability_value}设置为默认值0")
ability_value = 0
except ValueError:
logger.warning(f"无法将提取的管理层能力值转换为整数设置为默认值0")
return ability_value
except Exception as e:
logger.error(f"提取管理层能力失败: {str(e)}")
return 0
def analyze_financial_report(self, stock_code: str, stock_name: str) -> bool:
"""分析企业财报情况"""
try:
prompt = f"""请对{stock_name}({stock_code})的财报情况进行简要分析严格要求最新财报情况200字以内最新业绩预告情况100字以内近三年变化趋势150字以内请严格按照以下格式输出
1. 最新财报情况
- 营业收入及同比变化
- 主要成本构成及变化
- 净利润及同比变化
- 毛利率和净利率变化
- 其他重要财务指标如ROE资产负债率等
2. 最新业绩预告情况没有就不要提
- 预告类型预增/预减/扭亏/续亏
- 预计业绩区间
- 变动原因
3. 近三年变化趋势
- 收入增长趋势
- 利润变化趋势
- 盈利能力变化
- 经营质量变化
请提供专业客观的分析突出关键信息避免冗长描述"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="financial_report",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取财报水平并更新结果
if success:
self.extract_financial_report_level(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析财报情况失败: {str(e)}")
return False
def extract_financial_report_level(self, report_text: str, stock_code: str, stock_name: str) -> int:
"""从财报分析中提取财报水平评级并更新数据库
Args:
report_text: 完整的财报分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 财报水平评级 (2:边际向好无风险, 1:稳定风险小, 0:稳定有隐患, -1:波动大有隐患, -2:波动大隐患大)
"""
try:
# 使用在线模型分析财报水平
prompt = f"""请对{stock_name}({stock_code})的财报水平进行专业分析,并返回对应的数值评级:
- 如果财报水平边际向好最新财报没有任何风险返回数值"2"
- 如果边际变化波动不高较为稳定并且风险很小返回数值"1"
- 如果波动不高较为稳定但其中存在一定财报隐患返回数值"0"
- 如果财报波动较大亏损或者盈利并且存在一定财报隐患返回数值"-1"
- 如果财报波动较大亏损或者盈利并且存在较大财报隐患返回数值"-2"
财报分析内容
{report_text}
请仅返回一个数值210-1-2不要包含任何解释或说明"""
# 使用在线模型进行分析
response = self.chat_bot.chat(prompt)
full_response = response["response"].strip()
# 提取数值
report_level = 0 # 默认值
# 尝试从响应中提取数值
import re
value_match = re.search(r"([-]?[0-9])", full_response)
if value_match:
try:
report_level = int(value_match.group(1))
# 确保值在有效范围内
if report_level < -2 or report_level > 2:
logger.warning(f"提取的财报水平评级值超出范围: {report_level}设置为默认值0")
report_level = 0
except ValueError:
logger.warning(f"无法将提取的财报水平评级值转换为整数设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "financial_report")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="financial_report",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"financial_report_level": report_level
}
)
logger.info(f"已更新财报水平评级到数据库: financial_report_level={report_level}")
return report_level
except Exception as e:
logger.error(f"提取财报水平评级失败: {str(e)}")
return 0
def analyze_industry_competition(self, stock_code: str, stock_name: str) -> bool:
"""分析行业发展趋势和竞争格局"""
try:
prompt = f"""请对{stock_name}({stock_code})所在行业的发展趋势和竞争格局进行简要分析要求输出控制在400字以内请严格按照以下格式输出
1. 市场需求
- 主要下游应用领域
- 需求增长驱动因素
- 市场规模和增速
2. 竞争格局
- 主要竞争对手及特点
- 行业集中度
- 竞争壁垒
3. 行业环境
- 行业平均利润率
- 政策环境影响
- 技术发展趋势
- 市场阶段结论新兴市场成熟市场衰退市场
4. 小结
- 简要说明当前市场是否有利于企业经营
请提供专业客观的分析突出关键信息避免冗长描述"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="industry_competition",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取行业发展空间并更新结果
if success:
self.extract_industry_space(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析行业发展趋势和竞争格局失败: {str(e)}")
return False
def extract_industry_space(self, industry_text: str, stock_code: str, stock_name: str) -> int:
"""从行业发展趋势和竞争格局中提取行业发展空间并更新数据库
Args:
industry_text: 完整的行业发展趋势和竞争格局文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 行业发展空间值 (2:高速增长, 1:稳定经营, 0:不确定性大, -1:不利经营)
"""
try:
# 使用离线模型分析行业发展空间
prompt = f"""请分析以下{stock_name}({stock_code})的行业发展趋势和竞争格局文本,评估当前市场环境、阶段和竞争格局对企业未来的影响,并返回对应的数值:
- 如果当前市场环境阶段和竞争格局符合未来企业高速增长返回数值"2"
- 如果当前市场环境阶段和竞争格局符合未来企业稳定经营返回数值"1"
- 如果当前市场环境阶段和竞争格局存在较大不确定性返回数值"0"
- 如果当前市场环境阶段和竞争格局不利于企业正常经营返回数值"-1"
行业发展趋势和竞争格局文本
{industry_text}
请仅返回一个数值210-1不要包含任何解释或说明"""
# 使用离线模型进行分析
space_value_str = self.offline_bot.chat(prompt).strip()
# 提取数值
space_value = 0 # 默认值
# 尝试将响应转换为整数
try:
space_value = int(space_value_str)
# 确保值在有效范围内
if space_value < -1 or space_value > 2:
logger.warning(f"提取的行业发展空间值超出范围: {space_value}设置为默认值0")
space_value = 0
except ValueError:
logger.warning(f"无法将提取的行业发展空间值转换为整数: {space_value_str}设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "industry_competition")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="industry_competition",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"industry_space": space_value
}
)
logger.info(f"已更新行业发展空间到数据库: industry_space={space_value}")
return space_value
except Exception as e:
logger.error(f"提取行业发展空间失败: {str(e)}")
return 0
def analyze_recent_projects(self, stock_code: str, stock_name: str) -> bool:
"""分析近期重大订单和项目进展"""
try:
prompt = f"""请对{stock_name}({stock_code})的近期重大订单和项目进展进行简要分析要求输出控制在500字以内请严格按照以下格式输出
1. 主要业务领域进展300字左右
- 各业务领域的重要订单情况
- 项目投产和建设进展
- 产能扩张计划
- 技术突破和产品创新
2. 海外布局如有200字左右
- 海外生产基地建设
- 国际合作项目
- 市场拓展计划
请提供专业客观的分析突出关键信息避免冗长描述如果企业近期没有重大订单或项目进展请直接说明"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="recent_projects",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取重大事件评价并更新结果
if success:
self.extract_major_events(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析近期重大订单和项目进展失败: {str(e)}")
return False
def extract_major_events(self, projects_text: str, stock_code: str, stock_name: str) -> int:
"""从重大订单和项目进展中提取进展情况并更新数据库
Args:
projects_text: 完整的重大订单和项目进展文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 项目进展评价 (1:超预期, 0:顺利但未超预期, -1:不顺利或在验证中)
"""
try:
# 使用离线模型分析项目进展情况
prompt = f"""请分析以下{stock_name}({stock_code})的重大订单和项目进展情况,并返回对应的数值:
- 如果项目进展顺利且订单交付/建厂等超预期返回数值"1"
- 如果进展顺利但没有超预期返回数值"0"
- 如果进展不顺利或者按照进度进行但仍然在验证中返回数值"-1"
重大订单和项目进展内容
{projects_text}
请仅返回一个数值10-1不要包含任何解释或说明"""
# 使用离线模型进行分析
events_value_str = self.offline_bot.chat(prompt).strip()
# 提取数值
events_value = 0 # 默认值
# 尝试将响应转换为整数
try:
events_value = int(events_value_str)
# 确保值在有效范围内
if events_value < -1 or events_value > 1:
logger.warning(f"提取的项目进展评价值超出范围: {events_value}设置为默认值0")
events_value = 0
except ValueError:
logger.warning(f"无法将提取的项目进展评价值转换为整数: {events_value_str}设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "recent_projects")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="recent_projects",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"major_events": events_value
}
)
logger.info(f"已更新重大项目进展评价到数据库: major_events={events_value}")
return events_value
except Exception as e:
logger.error(f"提取重大项目进展评价失败: {str(e)}")
return 0
def analyze_stock_discussion(self, stock_code: str, stock_name: str) -> bool:
"""分析股吧讨论内容"""
try:
prompt = f"""请对{stock_name}({stock_code})的股吧讨论内容进行简要分析要求输出控制在300字以内请严格按照以下格式输出
1. 主要讨论话题150字左右
- 近期热点事件
- 投资者关注焦点
- 市场情绪倾向
2. 重要信息汇总150字左右
- 公司相关动态
- 行业政策变化
- 市场预期变化
请提供专业客观的分析突出关键信息避免冗长描述重点关注投资者普遍关注的话题和重要市场信息"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="stock_discussion",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取股吧情绪并更新结果
if success:
self.extract_stock_discussion_emotion(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析股吧讨论内容失败: {str(e)}")
return False
def extract_stock_discussion_emotion(self, discussion_text: str, stock_code: str, stock_name: str) -> int:
"""从股吧讨论内容中提取市场情绪并更新数据库
Args:
discussion_text: 完整的股吧讨论内容分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 市场情绪值 (1:乐观, 0:中性, -1:悲观)
"""
try:
# 使用离线模型分析市场情绪
prompt = f"""请分析以下{stock_name}({stock_code})的股吧讨论内容分析,判断整体市场情绪倾向,并返回对应的数值:
- 如果股吧讨论情绪偏乐观返回数值"1"
- 如果股吧讨论情绪偏中性返回数值"0"
- 如果股吧讨论情绪偏悲观返回数值"-1"
股吧讨论内容分析
{discussion_text}
请仅返回一个数值10-1不要包含任何解释或说明"""
# 使用离线模型进行分析
emotion_value_str = self.offline_bot.chat(prompt).strip()
# 提取数值
emotion_value = 0 # 默认值
# 尝试将响应转换为整数
try:
emotion_value = int(emotion_value_str)
# 确保值在有效范围内
if emotion_value < -1 or emotion_value > 1:
logger.warning(f"提取的股吧情绪值超出范围: {emotion_value}设置为默认值0")
emotion_value = 0
except ValueError:
logger.warning(f"无法将提取的股吧情绪值转换为整数: {emotion_value_str}设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "stock_discussion")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="stock_discussion",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"emotion": emotion_value
}
)
logger.info(f"已更新股吧情绪值到数据库: emotion={emotion_value}")
return emotion_value
except Exception as e:
logger.error(f"提取股吧情绪值失败: {str(e)}")
return 0
def analyze_industry_cooperation(self, stock_code: str, stock_name: str) -> bool:
"""分析产业链上下游合作动态"""
try:
prompt = f"""请对{stock_name}({stock_code})最近半年内的产业链上下游合作动态进行简要分析要求输出控制在400字以内请严格按照以下格式输出
1. 重要客户合作200字左右
- 主要客户合作进展
- 产品供应情况
- 合作深度和规模
2. 产业链布局200字左右
- 上下游合作动态
- 新业务领域拓展
- 战略合作项目
请提供专业客观的分析突出关键信息避免冗长描述重点关注最近半年内的合作动态如果没有相关动态请直接说明"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="industry_cooperation",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取产业链合作动态质量并更新结果
if success:
self.extract_collaboration_dynamics(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析产业链上下游合作动态失败: {str(e)}")
return False
def extract_collaboration_dynamics(self, cooperation_text: str, stock_code: str, stock_name: str) -> int:
"""从产业链上下游合作动态中提取合作动态质量评级并更新数据库
Args:
cooperation_text: 完整的产业链上下游合作动态文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 合作动态质量值 (2:质量高, 1:一般, 0:/质量低, -1:负面)
"""
try:
# 使用在线模型分析合作动态质量
prompt = f"""请评估{stock_name}({stock_code})的产业链上下游合作动态质量,并返回相应数值:
- 如果企业近期有较多且质量高的新合作动态具备新业务拓展能力以及可以体现在近一年财报中返回数值"2"
- 如果企业半年内合作动态频率低或质量一般在原有业务上合作关系的衍生对财报影响一般返回数值"1"
- 如果企业没有合作动态或质量低返回数值"0"
- 如果企业有负面合作关系解除合作或业务被其他厂商瓜分返回数值"-1"
以下是合作动态相关信息
{cooperation_text}
请仅返回一个数值210-1不要包含任何解释或说明"""
# 使用在线模型进行分析
response = self.chat_bot.chat(prompt)
full_response = response["response"].strip()
# 提取数值
dynamics_value = 0 # 默认值
# 尝试从响应中提取数值
import re
value_match = re.search(r"([-]?[0-9])", full_response)
if value_match:
try:
dynamics_value = int(value_match.group(1))
# 确保值在有效范围内
if dynamics_value < -1 or dynamics_value > 2:
logger.warning(f"提取的合作动态质量值超出范围: {dynamics_value}设置为默认值0")
dynamics_value = 0
except ValueError:
logger.warning(f"无法将提取的合作动态质量值转换为整数设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "industry_cooperation")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="industry_cooperation",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"collaboration_dynamics": dynamics_value
}
)
logger.info(f"已更新产业链合作动态质量到数据库: collaboration_dynamics={dynamics_value}")
return dynamics_value
except Exception as e:
logger.error(f"提取产业链合作动态质量失败: {str(e)}")
return 0
def analyze_target_price(self, stock_code: str, stock_name: str) -> bool:
"""分析券商和研究机构目标股价"""
try:
prompt = f"""请对{stock_name}({stock_code})的券商和研究机构目标股价进行简要分析要求输出控制在300字以内请严格按照以下格式输出
1. 目标股价情况150字左右
- 半年内所有券商/研究机构的目标价
- 半年内评级情况买入/增持/中性/减持/卖出
2. 当前最新股价对比150字左右
- 当前最新股价与目标价对比
- 上涨/下跌空间
- 评级建议
请提供专业客观的分析突出关键信息避免冗长描述如果没有券商或研究机构的目标价请直接说明"""
# 获取联网AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="target_price",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取券商评级和上涨/下跌空间并更新结果
if success:
self.extract_target_price_info(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析目标股价失败: {str(e)}")
return False
def extract_target_price_info(self, price_text: str, stock_code: str, stock_name: str) -> Dict[str, int]:
"""从目标股价分析中提取券商评级和上涨/下跌空间并更新数据库
Args:
price_text: 完整的目标股价分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
Dict[str, int]: 包含securities_rating和odds的字典
"""
try:
# 提取券商评级和上涨/下跌空间
securities_rating = self._extract_securities_rating(price_text)
odds = self._extract_odds(price_text)
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "target_price")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="target_price",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"securities_rating": securities_rating,
"odds": odds
}
)
logger.info(f"已更新目标股价信息到数据库: securities_rating={securities_rating}, odds={odds}")
return {"securities_rating": securities_rating, "odds": odds}
except Exception as e:
logger.error(f"提取目标股价信息失败: {str(e)}")
return {"securities_rating": 0, "odds": 0}
def _extract_securities_rating(self, price_text: str) -> int:
"""从目标股价分析中提取券商评级
Args:
price_text: 完整的目标股价分析文本
Returns:
int: 券商评级值 (买入:2, 增持:1, 中性:0, 减持:-1, 卖出:-2, 无评级:0)
"""
try:
# 使用离线模型提取评级
prompt = f"""请仔细分析以下目标股价文本,判断半年内券商评级的主要倾向,并返回对应的数值:
- 如果半年内券商评级以"买入"居多返回数值"2"
- 如果半年内券商评级以"增持"居多返回数值"1"
- 如果半年内券商评级以"中性"居多返回数值"0"
- 如果半年内券商评级以"减持"居多返回数值"-1"
- 如果半年内券商评级以"卖出"居多返回数值"-2"
- 如果半年内没有券商评级信息返回数值"0"
目标股价文本
{price_text}
只需要输出一个数值不要输出任何说明或解释只输出2,1,0,-1-2"""
response = self.chat_bot.chat(prompt)
# 尝试将响应转换为整数
try:
rating = int(response)
# 确保评级在有效范围内
if rating < -2 or rating > 2:
logger.warning(f"提取的券商评级值超出范围: {rating}设置为默认值0")
return 0
return rating
except ValueError:
logger.warning(f"无法将提取的券商评级值转换为整数: {response}设置为默认值0")
return 0
except Exception as e:
logger.error(f"提取券商评级失败: {str(e)}")
return 0
def _extract_odds(self, price_text: str) -> int:
"""从目标最新股价分析中提取上涨/下跌空间
Args:
price_text: 完整的目标股价分析文本
Returns:
int: 上涨/下跌空间值 (上涨空间大:1, 差不多:0, 下跌空间大:-1)
"""
try:
prompt = f"""请仔细分析以下目标股价文本,判断上涨空间和下跌空间的关系,并返回对应的数值:
- 如果上涨空间大于下跌空间返回数值"1"
- 如果上涨空间和下跌空间差不多返回数值"0"
- 如果下跌空间大于上涨空间返回数值"-1"
- 如果文本中没有相关信息返回数值"0"
目标股价文本
{price_text}
只需要输出一个数值不要输出任何说明或解释只输出10-1不要包含任何解释或说明"""
response = self.chat_bot.chat(prompt)
# 检查是否是错误响应
if isinstance(response, str) and "抱歉,发生错误" in response:
logger.warning(f"获取上涨/下跌空间失败: {response}")
return 0
# 尝试将响应转换为整数
try:
odds = int(response)
# 确保值在有效范围内
if odds < -1 or odds > 1:
logger.warning(f"提取的上涨/下跌空间值超出范围: {odds}设置为默认值0")
return 0
return odds
except ValueError:
logger.warning(f"无法将提取的上涨/下跌空间值转换为整数: {response}设置为默认值0")
return 0
except Exception as e:
logger.error(f"提取上涨/下跌空间失败: {str(e)}")
return 0
def generate_investment_advice(self, stock_code: str, stock_name: str) -> bool:
"""生成最终投资建议"""
try:
# 收集所有维度的分析结果排除investment_advice
all_results = {}
analysis_dimensions = [dim for dim in self.dimension_methods.keys() if dim != "investment_advice"]
for dimension in analysis_dimensions:
# 查询数据库
result = get_analysis_result(self.db, stock_code, dimension)
if not result:
# 如果数据库中没有结果,生成新的分析
self.dimension_methods[dimension](stock_code, stock_name)
result = get_analysis_result(self.db, stock_code, dimension)
if result:
all_results[dimension] = result.ai_response
# 构建提示词
prompt = f"""请根据以下{stock_name}({stock_code})的各个维度分析结果生成最终的投资建议要求输出控制在300字以内请严格按照以下格式输出
投资建议请从以下几个方面进行总结
1. 业绩表现和增长预期
2. 当前估值水平和市场预期
3. 行业竞争环境和风险
4. 投资建议和理由请根据以下标准明确给出投资建议
- 短期持有近期1-3个月内有明确利好因素催化事件或阶段性业绩改善
- 中期持有短期无明确利好但中期3-12个月业绩面临向上拐点或行业处于上升周期
- 长期持有公司具备长期稳定的盈利能力行业地位稳固长期成长性好
- 不建议投资存在明显风险因素基本面恶化估值过高行业前景不佳或者存在退市风险
请提供专业客观的分析突出关键信息避免冗长描述重点关注投资价值和风险在输出投资建议时请明确指出是短期持有中期持有长期持有还是不建议投资
各维度分析结果
{json.dumps(all_results, ensure_ascii=False, indent=2)}"""
self.offline_bot.clear_history()
# 使用离线模型生成建议
result = self.offline_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="investment_advice",
ai_response=result,
reasoning_process=None,
references=None
)
# 提取投资建议类型并更新结果
if success:
self.offline_bot.clear_history()
investment_type = self.extract_investment_advice_type(result, stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"生成投资建议失败: {str(e)}")
return False
def extract_investment_advice_type(self, advice_text: str, stock_code: str, stock_name: str) -> str:
"""从投资建议中提取建议类型并更新数据库
Args:
advice_text: 完整的投资建议文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
str: 提取的投资建议类型短期中期长期不建议 None
"""
try:
valid_types = ["短期", "中期", "长期", "不建议"]
max_attempts = 3
# 调用辅助函数尝试多次提取
found_type = self._try_extract_advice_type(advice_text, max_attempts)
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "investment_advice")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="investment_advice",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={"investment_advice_type": found_type}
)
if found_type:
logger.info(f"已更新投资建议类型到数据库: {found_type}")
else:
logger.info("已将投资建议类型更新为 null")
return found_type
except Exception as e:
logger.error(f"提取投资建议类型失败: {str(e)}")
return None
def _clean_model_output(self, output: str) -> str:
"""清理模型输出,移除推理过程,只保留最终结果
Args:
output: 模型原始输出文本
Returns:
str: 清理后的输出文本
"""
try:
# 找到</think>标签的位置
think_end = output.find('</think>')
if think_end != -1:
# 移除</think>标签及其之前的所有内容
output = output[think_end + len('</think>'):]
# 处理可能存在的空行
lines = output.split('\n')
cleaned_lines = []
for line in lines:
line = line.strip()
if line: # 只保留非空行
cleaned_lines.append(line)
# 重新组合文本
output = '\n'.join(cleaned_lines)
return output.strip()
except Exception as e:
logger.error(f"清理模型输出失败: {str(e)}")
return output.strip()
def _try_extract_advice_type(self, advice_text: str, max_attempts: int = 3) -> Optional[str]:
"""尝试多次从投资建议中提取建议类型
Args:
advice_text: 完整的投资建议文本
max_attempts: 最大尝试次数
Returns:
Optional[str]: 提取的投资建议类型如果所有尝试都失败则返回None
"""
valid_types = ["短期", "中期", "长期", "不建议"]
for attempt in range(1, max_attempts + 1):
try:
logger.info(f"尝试提取投资建议类型 (尝试 {attempt}/{max_attempts})")
# 根据尝试次数获取不同的提示词
prompt = self._get_extract_prompt_by_attempt(advice_text, attempt)
# 使用千问离线模型提取建议类型
offline_bot_tl_qw = OfflineChatBot(platform="tl_qw_private", model_type="qwq")
result = offline_bot_tl_qw.chat(prompt)
# 检查是否是错误响应
if isinstance(result, str) and "抱歉,发生错误" in result:
logger.warning(f"获取投资建议类型失败: {result}")
continue
# 清理模型输出
cleaned_result = self._clean_model_output(result)
# 检查结果是否为有效类型
if cleaned_result in valid_types:
logger.info(f"成功提取投资建议类型: {cleaned_result}(尝试 {attempt}/{max_attempts}")
return cleaned_result
else:
logger.warning(f"未能提取有效的投资建议类型(尝试 {attempt}/{max_attempts}),获取到: '{cleaned_result}'")
except Exception as e:
logger.error(f"提取投资建议类型失败(尝试 {attempt}/{max_attempts}: {str(e)}")
# 所有尝试都失败
logger.warning(f"经过 {max_attempts} 次尝试后未能提取有效的投资建议类型,设置为 null")
return None
def _get_extract_prompt_by_attempt(self, advice_text: str, attempt: int) -> str:
"""根据尝试次数获取不同的提取提示词
Args:
advice_text: 完整的投资建议文本
attempt: 当前尝试次数
Returns:
str: 提取提示词
"""
if attempt == 1:
return f"""请从以下投资建议中提取明确的投资建议类型。仅输出以下四种类型之一:
- 短期如果建议短期持有
- 中期如果建议中期持有
- 长期如果建议长期持有
- 不建议如果不建议投资
投资建议文本
{advice_text}
只需要输出一个词不要输出其他任何内容"""
elif attempt == 2:
return f"""请仔细分析以下投资建议文本,并严格按照要求输出结果。
请仅输出以下四个词之一短期中期长期不建议
不要输出其他任何内容不要加任何解释
投资建议文本
{advice_text}
如果建议短期持有输出"短期"
如果建议中期持有输出"中期"
如果建议长期持有输出"长期"
如果不建议投资输出"不建议"
请再次确认你只输出一个词没有任何额外内容"""
else:
return f"""请判断以下投资建议文本最符合哪种情况,只输出对应的一个词:
短期近期1-3个月内有明确利好因素催化事件或阶段性业绩改善
中期无明确短期利好但中期3-12个月业绩面临向上拐点或行业处于上升周期
长期公司具备长期稳定的盈利能力行业地位稳固长期成长性好
不建议存在风险因素基本面恶化估值过高或行业前景不佳
投资建议文本
{advice_text}
只输出短期中期长期不建议中的一个不要有任何其他内容"""
def analyze_all_dimensions(self, stock_code: str, stock_name: str) -> Dict[str, bool]:
"""分析所有维度"""
results = {}
# 逐个分析每个维度
for dimension, method in self.dimension_methods.items():
logger.info(f"开始分析 {stock_name}({stock_code}) 的 {dimension}")
results[dimension] = method(stock_code, stock_name)
logger.info(f"{dimension} 分析完成: {'成功' if results[dimension] else '失败'}")
return results
def generate_pdf_report(self, stock_code: str, stock_name: str) -> Optional[str]:
"""生成PDF分析报告
Args:
stock_code: 股票代码
stock_name: 股票名称
Returns:
Optional[str]: 生成的PDF文件路径如果失败则返回None
"""
try:
# 维度名称映射
dimension_names = {
"company_profile": "公司简介",
"management_ownership": "管理层持股",
"financial_report": "财务报告",
"industry_competition": "行业竞争",
"recent_projects": "近期项目",
"stock_discussion": "市场讨论",
"industry_cooperation": "产业合作",
"target_price": "目标股价",
"investment_advice": "投资建议"
}
# 收集所有可用的分析结果
content_dict = {}
for dimension in self.dimension_methods.keys():
result = get_analysis_result(self.db, stock_code, dimension)
if result and result.ai_response:
content_dict[dimension_names[dimension]] = result.ai_response
if not content_dict:
logger.warning(f"未找到 {stock_name}({stock_code}) 的任何分析结果")
return None
# 创建PDF生成器实例
generator = PDFGenerator()
# 生成PDF报告
filepath = generator.generate_pdf(
title=f"{stock_name}({stock_code}) 基本面分析报告",
content_dict=content_dict,
filename=f"{stock_name}_{stock_code}_analysis.pdf"
)
if filepath:
logger.info(f"PDF报告已生成: {filepath}")
else:
logger.error("PDF报告生成失败")
return filepath
except Exception as e:
logger.error(f"生成PDF报告失败: {str(e)}")
return None
def test_single_method(method: Callable, stock_code: str, stock_name: str) -> bool:
"""测试单个分析方法"""
try:
analyzer = FundamentalAnalyzer()
logger.info(f"开始测试 {method.__name__} 方法")
logger.info(f"测试股票: {stock_name}({stock_code})")
result = method(stock_code, stock_name)
logger.info(f"测试结果: {'成功' if result else '失败'}")
return result
except Exception as e:
logger.error(f"测试失败: {str(e)}")
return False
def test_single_stock(analyzer: FundamentalAnalyzer, stock_code: str, stock_name: str) -> Dict[str, bool]:
"""测试单个股票的所有维度"""
logger.info(f"开始测试股票: {stock_name}({stock_code})")
results = analyzer.analyze_all_dimensions(stock_code, stock_name)
success_count = sum(1 for r in results.values() if r)
logger.info(f"测试完成: 成功维度数 {success_count}/{len(results)}")
return results
def main():
"""主函数"""
# 设置日志级别
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 测试股票列表
test_stocks = [
("603690", "至纯科技"),
("300767", "震安科技"),
("300750", "宁德时代")
]
# 创建分析器实例
analyzer = FundamentalAnalyzer()
# 测试选项
print("\n请选择测试模式:")
print("1. 查询单个维度分析")
print("2. 测试单个方法")
print("3. 测试单个股票")
print("4. 测试所有股票")
print("5. 生成PDF报告")
print("6. 生成投资建议并生成PDF")
print("7. 退出")
choice = input("\n请输入选项1-7: ").strip()
if choice == "1":
# 查询单个维度分析
print("\n可用的分析维度:")
for i, dimension in enumerate(analyzer.dimension_methods.keys(), 1):
print(f"{i}. {dimension}")
dimension_choice = input("\n请选择要查询的维度1-8: ").strip()
if dimension_choice.isdigit() and 1 <= int(dimension_choice) <= len(analyzer.dimension_methods):
dimension = list(analyzer.dimension_methods.keys())[int(dimension_choice) - 1]
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要查询的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
# 查询分析结果
success, response, reasoning, references = analyzer.query_analysis(stock_code, stock_name, dimension)
if success:
print(f"\n分析结果:\n{response}")
if reasoning:
print(f"\n推理过程:\n{reasoning}")
if references:
print("\n参考资料:")
for ref in references:
print(f"\n{ref}")
else:
print(f"\n查询失败:{response}")
elif choice == "2":
# 测试单个方法
print("\n可用的分析方法:")
for i, (dimension, method) in enumerate(analyzer.dimension_methods.items(), 1):
print(f"{i}. {dimension}")
method_choice = input("\n请选择要测试的方法1-8: ").strip()
if method_choice.isdigit() and 1 <= int(method_choice) <= len(analyzer.dimension_methods):
method = list(analyzer.dimension_methods.values())[int(method_choice) - 1]
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要测试的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
test_single_method(method, stock_code, stock_name)
elif choice == "3":
# 测试单个股票
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要测试的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
test_single_stock(analyzer, stock_code, stock_name)
elif choice == "4":
# 测试所有股票
for stock_code, stock_name in test_stocks:
test_single_stock(analyzer, stock_code, stock_name)
elif choice == "5":
# 生成PDF报告
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要生成报告的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
filepath = analyzer.generate_pdf_report(stock_code, stock_name)
if filepath:
print(f"\n报告已生成: {filepath}")
else:
print("\n报告生成失败")
elif choice == "6":
# 生成投资建议并生成PDF
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要生成投资建议的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
# 先生成投资建议
print("\n正在生成投资建议...")
success = analyzer.generate_investment_advice(stock_code, stock_name)
if success:
print("投资建议生成成功")
# 然后生成PDF报告
filepath = analyzer.generate_pdf_report(stock_code, stock_name)
if filepath:
print(f"\n报告已生成: {filepath}")
else:
print("\n报告生成失败")
else:
print("\n投资建议生成失败")
elif choice == "7":
print("程序退出")
else:
print("无效的选项")
if __name__ == "__main__":
main()