stock_fundamentals/src/fundamentals_llm/fundamental_analysis.py

1947 lines
86 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
import sys
from datetime import datetime, timedelta
import time
import redis
from typing import Dict, List, Optional, Tuple, Callable, Any
# 修改导入路径,使用相对导入
try:
# 尝试相对导入
from .chat_bot import ChatBot
from .chat_bot_with_offline import ChatBot as OfflineChatBot
from .fundamental_analysis_database import get_db, save_analysis_result, update_analysis_result, get_analysis_result
from .pdf_generator import PDFGenerator
from .text_processor import TextProcessor
except ImportError:
# 如果相对导入失败,尝试尝试绝对导入
try:
from src.fundamentals_llm.chat_bot import ChatBot
from src.fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot
from src.fundamentals_llm.fundamental_analysis_database import get_db, save_analysis_result, update_analysis_result, get_analysis_result
from src.fundamentals_llm.pdf_generator import PDFGenerator
from src.fundamentals_llm.text_processor import TextProcessor
except ImportError:
# 最后尝试直接导入适用于当前目录已在PYTHONPATH中的情况
from chat_bot import ChatBot
from chat_bot_with_offline import ChatBot as OfflineChatBot
from fundamental_analysis_database import get_db, save_analysis_result, update_analysis_result, get_analysis_result
from pdf_generator import PDFGenerator
from text_processor import TextProcessor
import json
import re
# 获取项目根目录的绝对路径
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# 创建logs目录如果不存在
LOGS_DIR = os.path.join(ROOT_DIR, "logs")
os.makedirs(LOGS_DIR, exist_ok=True)
# 配置日志格式
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
date_format = '%Y-%m-%d %H:%M:%S'
# 创建文件处理器
log_file = os.path.join(LOGS_DIR, f"fundamental_analysis_{datetime.now().strftime('%Y%m%d')}.log")
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(log_format, date_format))
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter(log_format, date_format))
# 配置根日志记录器
logging.basicConfig(
level=logging.INFO,
handlers=[file_handler, console_handler],
format=log_format,
datefmt=date_format
)
logger = logging.getLogger(__name__)
logger.info("测试日志输出 - 程序启动")
from typing import Dict, List, Optional, Any, Union
from pydantic import BaseModel, Field
# 定义基础数据结构
class TextAnalysisResult(BaseModel):
"""文本分析结果包含分析正文、推理过程和引用URL"""
analysis_text: str = Field(description="详细的分析文本")
reasoning_process: Optional[str] = Field(description="模型的推理过程", default=None)
references: Optional[List[str]] = Field(description="参考资料和引用URL列表", default=None)
class NumericalAnalysisResult(BaseModel):
"""数值分析结果,包含数值和分析描述"""
value: str = Field(description="评估值")
description: str = Field(description="评估描述")
# 添加Redis客户端
redis_client = redis.Redis(
host='192.168.18.208', # Redis服务器地址根据实际情况调整
port=6379,
password='wlkj2018',
db=14,
socket_timeout=5,
decode_responses=True
)
class FundamentalAnalyzer:
"""基本面分析器"""
def __init__(self):
"""初始化分析器"""
# 使用联网模型进行基本面分析
self.chat_bot = ChatBot(model_type="online_bot")
# 使用离线模型进行其他分析
self.offline_bot = OfflineChatBot(platform="volc", model_type="offline_model")
# 千问打杂
# self.offline_bot_tl_qw = OfflineChatBot(platform="tl_qw_private", model_type="qwq")
self.offline_bot_tl_qw = OfflineChatBot(platform="tl_qw_private", model_type="GLM")
self.db = next(get_db())
# 定义维度映射
self.dimension_methods = {
"company_profile": self.analyze_company_profile,
"management_ownership": self.analyze_management_ownership,
"financial_report": self.analyze_financial_report,
"industry_competition": self.analyze_industry_competition,
"recent_projects": self.analyze_recent_projects,
"stock_discussion": self.analyze_stock_discussion,
"industry_cooperation": self.analyze_industry_cooperation,
"target_price": self.analyze_target_price,
"valuation_level": self.analyze_valuation_level,
"investment_advice": self.generate_investment_advice
}
def query_analysis(self, stock_code: str, stock_name: str, dimension: str) -> Tuple[bool, str, Optional[str], Optional[list]]:
"""查询分析结果,如果不存在则生成新的分析
Args:
stock_code: 股票代码
stock_name: 股票名称
dimension: 分析维度
Returns:
Tuple[bool, str, Optional[str], Optional[list]]:
- 是否成功
- 分析结果
- 推理过程(如果有)
- 参考资料(如果有)
"""
try:
# 检查维度是否有效
if dimension not in self.dimension_methods:
return False, f"无效的分析维度: {dimension}", None, None
# 查询数据库
result = get_analysis_result(self.db, stock_code, dimension)
if result:
# 如果存在结果,直接返回
logger.info(f"从数据库获取到 {stock_name}({stock_code}) 的 {dimension} 分析结果")
return True, result.ai_response, result.reasoning_process, result.references
# 如果不存在,生成新的分析
logger.info(f"数据库中未找到 {stock_name}({stock_code}) 的 {dimension} 分析结果,开始生成")
success = self.dimension_methods[dimension](stock_code, stock_name)
if success:
# 重新查询数据库获取结果
result = get_analysis_result(self.db, stock_code, dimension)
if result:
return True, result.ai_response, result.reasoning_process, result.references
return False, f"生成 {dimension} 分析失败", None, None
except Exception as e:
logger.error(f"查询分析结果失败: {str(e)}")
return False, f"查询失败: {str(e)}", None, None
def _remove_references_from_response(self, response: str) -> str:
"""从响应中移除参考资料部分
Args:
response: 原始响应文本
Returns:
str: 移除参考资料后的响应文本
"""
# # 查找"参考资料:"的位置
# ref_start = response.find("参考资料:")
# if ref_start != -1:
# # 如果找到参考资料,只保留前面的部分,并移除末尾的换行符
# return response[:ref_start].rstrip()
return response.strip()
def analyze_company_profile(self, stock_code: str, stock_name: str) -> bool:
"""分析公司简介"""
try:
# 构建提示词
prompt = f"""请对{stock_name}({stock_code})进行公司简介分析严格要求输出控制在500字以内主营业务介绍300字成立背景与历程200字请严格按照以下格式输出
1. 主营业务介绍:
- 核心业务领域和主要产品
- 技术特点和产业链布局
- 企业的绝对亮点(行业地位,竞争优势,客户资源等)
2. 成立背景与发展历程:
- 成立背景和重要发展节点
- 关键战略决策和转型
请提供专业、客观的分析,突出关键信息,避免冗长描述。"""
#开头清上下文缓存
self.chat_bot.clear_history()
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
return save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="company_profile",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
except Exception as e:
logger.error(f"分析公司简介失败: {str(e)}")
return False
def analyze_management_ownership(self, stock_code: str, stock_name: str) -> bool:
"""分析实控人和管理层持股情况"""
try:
prompt = f"""请对{stock_name}({stock_code})的实控人和管理层持股情况进行简要分析要求输出控制在300字以内请严格按照以下格式输出
1. 实控人情况:
- 实控人姓名
- 行业地位(如有)
- 持股比例(冻结/解禁)
2. 管理层持股:
- 主要管理层持股比例
- 近3年增减持情况
请提供专业、客观的分析,突出关键信息,避免冗长描述。"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="management_ownership",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取实控人和管理层信息并更新结果
if success:
self.extract_management_info(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析实控人和管理层持股情况失败: {str(e)}")
return False
def extract_management_info(self, ownership_text: str, stock_code: str, stock_name: str) -> Dict[str, int]:
"""从实控人和管理层持股分析中提取持股情况和能力评价并更新数据库
Args:
ownership_text: 完整的实控人和管理层持股分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
Dict[str, int]: 包含shareholding和ability的字典
"""
try:
# 提取持股情况
shareholding = self._extract_shareholding_status(ownership_text, stock_code, stock_name)
# 提取领军人物和处罚情况
ability = self._extract_management_ability(ownership_text, stock_code, stock_name)
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "management_ownership")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="management_ownership",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"shareholding": shareholding,
"ability": ability
}
)
logger.info(f"已更新实控人和管理层信息到数据库: shareholding={shareholding}, ability={ability}")
return {"shareholding": shareholding, "ability": ability}
except Exception as e:
logger.error(f"提取实控人和管理层信息失败: {str(e)}")
return {"shareholding": 0, "ability": 0}
def _extract_shareholding_status(self, ownership_text: str, stock_code: str, stock_name: str) -> int:
"""从实控人和管理层持股分析中提取持股减持情况
Args:
ownership_text: 完整的实控人和管理层持股分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 减持情况评价 (1:减持低, 0:减持适中, -1:减持高)
"""
try:
# 使用在线模型分析减持情况
prompt = f"""请对{stock_name}({stock_code})的大股东和管理层近三年减持情况进行专业分析,并返回对应的数值评级:
- 如果大股东或管理层近三年减持比例很低或次数很低减持比例低于2%减持次数少于5减持后持股比例仍然高于5%),返回数值"1"
- 如果大股东减持比例适中减持比例在2%-5%之间减持次数不超过10次减持后持股比例仍然高于5%),返回数值"0"
- 如果近三年减持次数较多或比例较高且大股东持股比例已经减持到5%以下,返回数值"-1"
持股分析内容:
{ownership_text}
请仅返回一个数值1、0或-1不要包含任何解释或说明。"""
# 使用在线模型进行分析
response = self.chat_bot.chat(prompt)
shareholding_value_str = response["response"].strip()
# 提取数值
shareholding_value = 0 # 默认值
# 尝试从响应中提取数值
import re
value_match = re.search(r"([-]?[0-9])", shareholding_value_str)
if value_match:
try:
shareholding_value = int(value_match.group(1))
# 确保值在有效范围内
if shareholding_value < -1 or shareholding_value > 1:
logger.warning(f"提取的持股情况值超出范围: {shareholding_value}设置为默认值0")
shareholding_value = 0
except ValueError:
logger.warning(f"无法将提取的持股情况值转换为整数设置为默认值0")
return shareholding_value
except Exception as e:
logger.error(f"提取持股情况失败: {str(e)}")
return 0
def _extract_management_ability(self, ownership_text: str, stock_code: str, stock_name: str) -> int:
"""从实控人和管理层持股分析中提取管理层能力评价
Args:
ownership_text: 完整的实控人和管理层持股分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 管理层能力评价 (1:有领军人物, 0:一般, -1:有处罚/风险)
"""
try:
# 使用在线模型分析管理层能力
prompt = f"""请对{stock_name}({stock_code})的实控人和高管团队进行深入研究和专业分析,并返回对应的数值评级:
- 如果实控人或高管中存在行业领军人物,返回数值"1"
- 如果没有明显的行业领军人物,返回数值"0"
- 如果实控人或高管存在被证监会处罚情况,或者存在司法犯罪可能性,返回数值"-1"
实控人和管理层信息:
{ownership_text}
请仅返回一个数值1、0或-1不要包含任何解释或说明。"""
# 使用在线模型进行分析
response = self.chat_bot.chat(prompt)
ability_value_str = response["response"].strip()
# 提取数值
ability_value = 0 # 默认值
# 尝试从响应中提取数值
import re
value_match = re.search(r"([-]?[0-9])", ability_value_str)
if value_match:
try:
ability_value = int(value_match.group(1))
# 确保值在有效范围内
if ability_value < -1 or ability_value > 1:
logger.warning(f"提取的管理层能力值超出范围: {ability_value}设置为默认值0")
ability_value = 0
except ValueError:
logger.warning(f"无法将提取的管理层能力值转换为整数设置为默认值0")
return ability_value
except Exception as e:
logger.error(f"提取管理层能力失败: {str(e)}")
return 0
def analyze_financial_report(self, stock_code: str, stock_name: str) -> bool:
"""分析企业财报情况"""
try:
prompt = f"""请对{stock_name}({stock_code})的财报情况进行简要分析严格要求最新财报情况200字以内最新业绩预告情况100字以内近三年变化趋势150字以内请严格按照以下格式输出
1. 最新财报情况
- 营业收入及同比变化
- 主要成本构成及变化
- 净利润及同比变化
- 毛利率和净利率变化
- 其他重要财务指标如ROE、资产负债率等
2. 最新业绩预告情况(没有就不要提)
- 预告类型(预增/预减/扭亏/续亏)
- 预计业绩区间
- 变动原因
3. 近三年变化趋势
- 收入增长趋势
- 利润变化趋势
- 盈利能力变化
- 经营质量变化
请提供专业、客观的分析,突出关键信息,避免冗长描述。"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="financial_report",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取财报水平并更新结果
if success:
self.extract_financial_report_level(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析财报情况失败: {str(e)}")
return False
def extract_financial_report_level(self, report_text: str, stock_code: str, stock_name: str) -> int:
"""从财报分析中提取财报水平评级并更新数据库
Args:
report_text: 完整的财报分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 财报水平评级 (2:边际向好无风险, 1:稳定风险小, 0:稳定有隐患, -1:波动大有隐患, -2:波动大隐患大)
"""
try:
# 使用在线模型分析财报水平
prompt = f"""请对{stock_name}({stock_code})的财报水平进行专业分析,并返回对应的数值评级:
- 如果财报水平边际向好,最新财报没有任何风险,返回数值"2"
- 如果边际变化波动不高较为稳定,并且风险很小,返回数值"1"
- 如果波动不高较为稳定,但其中存在一定财报隐患,返回数值"0"
- 如果财报波动较大(亏损或者盈利),并且存在一定财报隐患,返回数值"-1"
- 如果财报波动较大(亏损或者盈利),并且存在较大财报隐患,返回数值"-2"
财报分析内容:
{report_text}
请仅返回一个数值2、1、0、-1或-2不要包含任何解释或说明。"""
# 使用在线模型进行分析
response = self.chat_bot.chat(prompt)
full_response = response["response"].strip()
# 提取数值
report_level = 0 # 默认值
# 尝试从响应中提取数值
import re
value_match = re.search(r"([-]?[0-9])", full_response)
if value_match:
try:
report_level = int(value_match.group(1))
# 确保值在有效范围内
if report_level < -2 or report_level > 2:
logger.warning(f"提取的财报水平评级值超出范围: {report_level}设置为默认值0")
report_level = 0
except ValueError:
logger.warning(f"无法将提取的财报水平评级值转换为整数设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "financial_report")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="financial_report",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"financial_report_level": report_level
}
)
logger.info(f"已更新财报水平评级到数据库: financial_report_level={report_level}")
return report_level
except Exception as e:
logger.error(f"提取财报水平评级失败: {str(e)}")
return 0
def analyze_industry_competition(self, stock_code: str, stock_name: str) -> bool:
"""分析行业发展趋势和竞争格局"""
try:
prompt = f"""请对{stock_name}({stock_code})所在行业的发展趋势和竞争格局进行简要分析要求输出控制在400字以内请严格按照以下格式输出
1. 市场需求:
- 主要下游应用领域
- 需求增长驱动因素
- 市场规模和增速
2. 竞争格局:
- 主要竞争对手及特点
- 行业集中度
- 竞争壁垒
3. 行业环境:
- 行业平均利润率
- 政策环境影响
- 技术发展趋势
- 市场阶段结论:新兴市场、成熟市场、衰退市场
4. 小结:
- 简要说明当前市场是否有利于企业经营
请提供专业、客观的分析,突出关键信息,避免冗长描述。"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="industry_competition",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取行业发展空间并更新结果
if success:
self.extract_industry_space(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析行业发展趋势和竞争格局失败: {str(e)}")
return False
def extract_industry_space(self, industry_text: str, stock_code: str, stock_name: str) -> int:
"""从行业发展趋势和竞争格局中提取行业发展空间并更新数据库
Args:
industry_text: 完整的行业发展趋势和竞争格局文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 行业发展空间值 (2:高速增长, 1:稳定经营, 0:不确定性大, -1:不利经营)
"""
try:
# 使用离线模型分析行业发展空间
prompt = f"""请分析以下{stock_name}({stock_code})的行业发展趋势和竞争格局文本,评估当前市场环境、阶段和竞争格局对企业未来的影响,并返回对应的数值:
- 如果当前市场环境、阶段和竞争格局符合未来企业高速增长,返回数值"2"
- 如果当前市场环境、阶段和竞争格局符合未来企业稳定经营,返回数值"1"
- 如果当前市场环境、阶段和竞争格局存在较大不确定性,返回数值"0"
- 如果当前市场环境、阶段和竞争格局不利于企业正常经营,返回数值"-1"
行业发展趋势和竞争格局文本:
{industry_text}
请仅返回一个数值2、1、0或-1不要包含任何解释或说明。"""
self.offline_bot_tl_qw.clear_history()
# 使用离线模型进行分析
space_value_str = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
space_value_str = TextProcessor.clean_thought_process(space_value_str)
# 提取数值
space_value = 0 # 默认值
# 尝试将响应转换为整数
try:
space_value = int(space_value_str)
# 确保值在有效范围内
if space_value < -1 or space_value > 2:
logger.warning(f"提取的行业发展空间值超出范围: {space_value}设置为默认值0")
space_value = 0
except ValueError:
logger.warning(f"无法将提取的行业发展空间值转换为整数: {space_value_str}设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "industry_competition")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="industry_competition",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"industry_space": space_value
}
)
logger.info(f"已更新行业发展空间到数据库: industry_space={space_value}")
return space_value
except Exception as e:
logger.error(f"提取行业发展空间失败: {str(e)}")
return 0
def analyze_recent_projects(self, stock_code: str, stock_name: str) -> bool:
"""分析近期重大订单和项目进展"""
try:
prompt = f"""请对{stock_name}({stock_code})的近期重大订单和项目进展进行简要分析要求输出控制在500字以内请严格按照以下格式输出
1. 主要业务领域进展300字左右
- 各业务领域的重要订单情况
- 项目投产和建设进展
- 产能扩张计划
- 技术突破和产品创新
2. 海外布局如有200字左右
- 海外生产基地建设
- 国际合作项目
- 市场拓展计划
请提供专业、客观的分析,突出关键信息,避免冗长描述。如果企业近期没有重大订单或项目进展,请直接说明。"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="recent_projects",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取重大事件评价并更新结果
if success:
self.extract_major_events(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析近期重大订单和项目进展失败: {str(e)}")
return False
def extract_major_events(self, projects_text: str, stock_code: str, stock_name: str) -> int:
"""从重大订单和项目进展中提取进展情况并更新数据库
Args:
projects_text: 完整的重大订单和项目进展文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 项目进展评价 (1:超预期, 0:顺利但未超预期, -1:不顺利或在验证中)
"""
try:
# 使用离线模型分析项目进展情况
prompt = f"""请分析以下{stock_name}({stock_code})的重大订单和项目进展情况,并返回对应的数值:
- 如果项目进展顺利,且订单交付/建厂等超预期,返回数值"1"
- 如果进展顺利,但没有超预期,返回数值"0"
- 如果进展不顺利,或者按照进度进行但仍然在验证中,返回数值"-1"
重大订单和项目进展内容:
{projects_text}
请仅返回一个数值1、0或-1不要包含任何解释或说明。"""
self.offline_bot_tl_qw.clear_history()
# 使用离线模型进行分析
events_value_str = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
# 数据清洗
events_value_str = TextProcessor.clean_thought_process(events_value_str)
# 提取数值
events_value = 0 # 默认值
# 尝试将响应转换为整数
try:
events_value = int(events_value_str)
# 确保值在有效范围内
if events_value < -1 or events_value > 1:
logger.warning(f"提取的项目进展评价值超出范围: {events_value}设置为默认值0")
events_value = 0
except ValueError:
logger.warning(f"无法将提取的项目进展评价值转换为整数: {events_value_str}设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "recent_projects")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="recent_projects",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"major_events": events_value
}
)
logger.info(f"已更新重大项目进展评价到数据库: major_events={events_value}")
return events_value
except Exception as e:
logger.error(f"提取重大项目进展评价失败: {str(e)}")
return 0
def analyze_stock_discussion(self, stock_code: str, stock_name: str) -> bool:
"""分析股吧讨论内容"""
try:
prompt = f"""请对{stock_name}({stock_code})的股吧讨论内容进行简要分析要求输出控制在400字以内(主要讨论话题200字重要信息汇总200字),请严格按照以下格式输出:
1. 主要讨论话题:
- 近期热点事件
- 投资者关注焦点
- 市场情绪倾向
2. 重要信息汇总:
- 公司相关动态
- 行业政策变化
- 市场预期变化
请提供专业、客观的分析,突出关键信息,避免冗长描述。重点关注投资者普遍关注的话题和重要市场信息。"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="stock_discussion",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取股吧情绪并更新结果
if success:
self.extract_stock_discussion_emotion(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析股吧讨论内容失败: {str(e)}")
return False
def extract_stock_discussion_emotion(self, discussion_text: str, stock_code: str, stock_name: str) -> int:
"""从股吧讨论内容中提取市场情绪并更新数据库
Args:
discussion_text: 完整的股吧讨论内容分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 市场情绪值 (1:乐观, 0:中性, -1:悲观)
"""
try:
# 使用离线模型分析市场情绪
prompt = f"""请分析以下{stock_name}({stock_code})的股吧讨论内容分析,判断整体市场情绪倾向,并返回对应的数值:
- 如果股吧讨论情绪偏乐观,返回数值"1"
- 如果股吧讨论情绪偏中性,返回数值"0"
- 如果股吧讨论情绪偏悲观,返回数值"-1"
股吧讨论内容分析:
{discussion_text}
请仅返回一个数值1、0或-1不要包含任何解释或说明。"""
self.offline_bot_tl_qw.clear_history()
# 使用离线模型进行分析
emotion_value_str = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
# 数据清洗
emotion_value_str = TextProcessor.clean_thought_process(emotion_value_str)
# 提取数值
emotion_value = 0 # 默认值
# 尝试将响应转换为整数
try:
emotion_value = int(emotion_value_str)
# 确保值在有效范围内
if emotion_value < -1 or emotion_value > 1:
logger.warning(f"提取的股吧情绪值超出范围: {emotion_value}设置为默认值0")
emotion_value = 0
except ValueError:
logger.warning(f"无法将提取的股吧情绪值转换为整数: {emotion_value_str}设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "stock_discussion")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="stock_discussion",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"emotion": emotion_value
}
)
logger.info(f"已更新股吧情绪值到数据库: emotion={emotion_value}")
return emotion_value
except Exception as e:
logger.error(f"提取股吧情绪值失败: {str(e)}")
return 0
def analyze_industry_cooperation(self, stock_code: str, stock_name: str) -> bool:
"""分析产业链上下游合作动态"""
try:
prompt = f"""请对{stock_name}({stock_code})最近半年内的产业链上下游合作动态进行简要分析要求输出控制在400字以内请严格按照以下格式输出
1. 重要客户合作200字左右
- 主要客户合作进展
- 产品供应情况
- 合作深度和规模
2. 产业链布局200字左右
- 上下游合作动态
- 新业务领域拓展
- 战略合作项目
请提供专业、客观的分析,突出关键信息,避免冗长描述。重点关注最近半年内的合作动态,如果没有相关动态,请直接说明。"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="industry_cooperation",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取产业链合作动态质量并更新结果
if success:
self.extract_collaboration_dynamics(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析产业链上下游合作动态失败: {str(e)}")
return False
def extract_collaboration_dynamics(self, cooperation_text: str, stock_code: str, stock_name: str) -> int:
"""从产业链上下游合作动态中提取合作动态质量评级并更新数据库
Args:
cooperation_text: 完整的产业链上下游合作动态文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
int: 合作动态质量值 (2:质量高, 1:一般, 0:无/质量低, -1:负面)
"""
try:
# 使用在线模型分析合作动态质量
prompt = f"""请评估{stock_name}({stock_code})的产业链上下游合作动态质量,并返回相应数值:
- 如果企业近期有较多且质量高的新合作动态(具备新业务拓展能力,以及可以体现在近一年财报中),返回数值"2"
- 如果企业半年内合作动态频率低或质量一般(在原有业务上合作关系的衍生,对财报影响一般),返回数值"1"
- 如果企业没有合作动态或质量低,返回数值"0"
- 如果企业有负面合作关系(解除合作,或业务被其他厂商瓜分),返回数值"-1"
以下是合作动态相关信息:
{cooperation_text}
请仅返回一个数值2、1、0或-1不要包含任何解释或说明。"""
# 使用在线模型进行分析
response = self.chat_bot.chat(prompt)
full_response = response["response"].strip()
# 提取数值
dynamics_value = 0 # 默认值
# 尝试从响应中提取数值
import re
value_match = re.search(r"([-]?[0-9])", full_response)
if value_match:
try:
dynamics_value = int(value_match.group(1))
# 确保值在有效范围内
if dynamics_value < -1 or dynamics_value > 2:
logger.warning(f"提取的合作动态质量值超出范围: {dynamics_value}设置为默认值0")
dynamics_value = 0
except ValueError:
logger.warning(f"无法将提取的合作动态质量值转换为整数设置为默认值0")
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "industry_cooperation")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="industry_cooperation",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"collaboration_dynamics": dynamics_value
}
)
logger.info(f"已更新产业链合作动态质量到数据库: collaboration_dynamics={dynamics_value}")
return dynamics_value
except Exception as e:
logger.error(f"提取产业链合作动态质量失败: {str(e)}")
return 0
def analyze_target_price(self, stock_code: str, stock_name: str) -> bool:
"""分析券商和研究机构目标股价"""
try:
prompt = f"""请对{stock_name}({stock_code})的券商和研究机构目标股价进行简要分析要求输出控制在300字以内请严格按照以下格式输出
1. 目标股价情况150字左右
- 半年内所有券商/研究机构的目标价
- 半年内评级情况(买入/增持/中性/减持/卖出)
2. 当前最新股价对比150字左右
- 当前最新股价与目标价对比
- 上涨/下跌空间
- 评级建议
请提供专业、客观的分析,突出关键信息,避免冗长描述。如果没有券商或研究机构的目标价,请直接说明。"""
# 获取联网AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="target_price",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取券商评级和上涨/下跌空间并更新结果
if success:
self.extract_target_price_info(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析目标股价失败: {str(e)}")
return False
def extract_target_price_info(self, price_text: str, stock_code: str, stock_name: str) -> Dict[str, int]:
"""从目标股价分析中提取券商评级和上涨/下跌空间并更新数据库
Args:
price_text: 完整的目标股价分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
Dict[str, int]: 包含securities_rating和odds的字典
"""
try:
# 提取券商评级和上涨/下跌空间
securities_rating = self._extract_securities_rating(price_text)
odds = self._extract_odds(price_text)
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "target_price")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="target_price",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"securities_rating": securities_rating,
"odds": odds
}
)
logger.info(f"已更新目标股价信息到数据库: securities_rating={securities_rating}, odds={odds}")
return {"securities_rating": securities_rating, "odds": odds}
except Exception as e:
logger.error(f"提取目标股价信息失败: {str(e)}")
return {"securities_rating": 0, "odds": 0}
def _extract_securities_rating(self, price_text: str) -> int:
"""从目标股价分析中提取券商评级
Args:
price_text: 完整的目标股价分析文本
Returns:
int: 券商评级值 (买入:2, 增持:1, 中性:0, 减持:-1, 卖出:-2, 无评级:0)
"""
try:
# 使用离线模型提取评级
prompt = f"""请仔细分析以下目标股价文本,判断半年内券商评级的主要倾向,并返回对应的数值:
- 如果半年内券商评级以"买入"居多,返回数值"2"
- 如果半年内券商评级以"增持"居多,返回数值"1"
- 如果半年内券商评级以"中性"居多,返回数值"0"
- 如果半年内券商评级以"减持"居多,返回数值"-1"
- 如果半年内券商评级以"卖出"居多,返回数值"-2"
- 如果半年内没有券商评级信息,返回数值"0"
目标股价文本:
{price_text}
只需要输出一个数值不要输出任何说明或解释。只输出2,1,0,-1或-2。"""
response = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
# 提取数值
rating_str = TextProcessor.extract_numeric_value_from_response(response)
# 尝试将响应转换为整数
try:
rating = int(rating_str)
# 确保评级在有效范围内
if rating < -2 or rating > 2:
logger.warning(f"提取的券商评级值超出范围: {rating}设置为默认值0")
return 0
return rating
except ValueError:
logger.warning(f"无法将提取的券商评级值转换为整数: {rating_str}设置为默认值0")
return 0
except Exception as e:
logger.error(f"提取券商评级失败: {str(e)}")
return 0
def _extract_odds(self, price_text: str) -> int:
"""从目标最新股价分析中提取上涨/下跌空间
Args:
price_text: 完整的目标股价分析文本
Returns:
int: 上涨/下跌空间值 (上涨空间大:1, 差不多:0, 下跌空间大:-1)
"""
try:
prompt = f"""请仔细分析以下目标股价文本,判断上涨空间和下跌空间的关系,并返回对应的数值:
- 如果上涨空间大于下跌空间,返回数值"1"
- 如果上涨空间和下跌空间差不多,返回数值"0"
- 如果下跌空间大于上涨空间,返回数值"-1"
- 如果文本中没有相关信息,返回数值"0"
目标股价文本:
{price_text}
只需要输出一个数值不要输出任何说明或解释。只输出1、0、-1不要包含任何解释或说明。"""
response = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
# 提取数值
odds_str = TextProcessor.extract_numeric_value_from_response(response)
# 尝试将响应转换为整数
try:
odds = int(odds_str)
# 确保值在有效范围内
if odds < -1 or odds > 1:
logger.warning(f"提取的上涨/下跌空间值超出范围: {odds}设置为默认值0")
return 0
return odds
except ValueError:
logger.warning(f"无法将提取的上涨/下跌空间值转换为整数: {odds_str}设置为默认值0")
return 0
except Exception as e:
logger.error(f"提取上涨/下跌空间失败: {str(e)}")
return 0
def analyze_valuation_level(self, stock_code: str, stock_name: str) -> bool:
"""分析企业PE和PB在历史分位水平和行业平均水平的对比情况"""
try:
prompt = f"""请对{stock_name}({stock_code})的估值水平进行简要分析要求输出控制在300字以内请严格按照以下格式输出
1. 历史估值水平150字左右
- 当前PE值及其在历史分位水平的位置高于/接近/低于历史平均分位)
- 当前PB值及其在历史分位水平的位置高于/接近/低于历史平均分位)
- 历史估值变化趋势简要分析
2. 行业估值对比150字左右
- 当前PE值与行业平均水平的比较高于/接近/低于行业平均)
- 当前PB值与行业平均水平的比较高于/接近/低于行业平均)
- 与可比公司估值的简要对比
请提供专业、客观的分析,突出关键信息,避免冗长描述。如果无法获取某项数据,请直接说明。"""
# 获取AI分析结果
result = self.chat_bot.chat(prompt)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="valuation_level",
ai_response=self._remove_references_from_response(result["response"]),
reasoning_process=result["reasoning_process"],
references=result["references"]
)
# 提取估值水平分类并更新结果
if success:
self.extract_valuation_classification(result["response"], stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"分析估值水平失败: {str(e)}")
return False
def extract_valuation_classification(self, valuation_text: str, stock_code: str, stock_name: str) -> Dict[str, int]:
"""从估值水平分析中提取历史和行业估值分类并更新数据库
Args:
valuation_text: 完整的估值水平分析文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
Dict[str, int]: 包含四个分类值的字典:
- pe_historical: PE历史分位分类 (-1:高于历史, 0:接近历史, 1:低于历史)
- pb_historical: PB历史分位分类 (-1:高于历史, 0:接近历史, 1:低于历史)
- pe_industry: PE行业对比分类 (-1:高于行业, 0:接近行业, 1:低于行业)
- pb_industry: PB行业对比分类 (-1:高于行业, 0:接近行业, 1:低于行业)
"""
try:
# 直接提取四个分类值
pe_historical = self._extract_pe_historical(valuation_text)
pb_historical = self._extract_pb_historical(valuation_text)
pe_industry = self._extract_pe_industry(valuation_text)
pb_industry = self._extract_pb_industry(valuation_text)
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "valuation_level")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="valuation_level",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={
"pe_historical": pe_historical,
"pb_historical": pb_historical,
"pe_industry": pe_industry,
"pb_industry": pb_industry
}
)
logger.info(f"已更新估值分类到数据库: pe_historical={pe_historical}, pb_historical={pb_historical}, pe_industry={pe_industry}, pb_industry={pb_industry}")
return {
"pe_historical": pe_historical,
"pb_historical": pb_historical,
"pe_industry": pe_industry,
"pb_industry": pb_industry
}
except Exception as e:
logger.error(f"提取估值分类失败: {str(e)}")
return {
"pe_historical": 0,
"pb_historical": 0,
"pe_industry": 0,
"pb_industry": 0
}
def _extract_pe_historical(self, valuation_text: str) -> int:
"""从估值水平分析中提取PE历史分位分类
Args:
valuation_text: 完整的估值水平分析文本
Returns:
int: PE历史分位分类值 (-1:高于历史, 0:接近历史, 1:低于历史)
"""
try:
prompt = f"""请仔细分析以下估值水平文本判断当前PE在历史分位的位置并返回对应的数值
- 如果当前PE为负数返回数值"-1"
- 如果当前PE明显高于历史平均水平返回数值"-1"
- 如果当前PE接近历史平均水平返回数值"0"
- 如果当前PE明显低于历史平均水平返回数值"1"
- 如果文本中没有相关信息,返回数值"0"
估值水平文本:
{valuation_text}
只需要输出一个数值,不要输出任何说明或解释。只输出:-1、0或1。"""
self.offline_bot_tl_qw.clear_history()
response = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
pe_hist_str = TextProcessor.clean_thought_process(response)
try:
pe_hist = int(pe_hist_str)
if pe_hist < -1 or pe_hist > 1:
logger.warning(f"提取的PE历史分位分类值超出范围: {pe_hist}设置为默认值0")
return 0
return pe_hist
except ValueError:
logger.warning(f"无法将提取的PE历史分位分类值转换为整数: {pe_hist_str}设置为默认值0")
return 0
except Exception as e:
logger.error(f"提取PE历史分位分类失败: {str(e)}")
return 0
def _extract_pb_historical(self, valuation_text: str) -> int:
"""从估值水平分析中提取PB历史分位分类
Args:
valuation_text: 完整的估值水平分析文本
Returns:
int: PB历史分位分类值 (-1:高于历史, 0:接近历史, 1:低于历史)
"""
try:
prompt = f"""请仔细分析以下估值水平文本判断当前PB在历史分位的位置并返回对应的数值
- 如果当前PB为负数返回数值"-1"
- 如果当前PB明显高于历史平均水平返回数值"-1"
- 如果当前PB接近历史平均水平返回数值"0"
- 如果当前PB明显低于历史平均水平返回数值"1"
- 如果文本中没有相关信息,返回数值"0"
估值水平文本:
{valuation_text}
只需要输出一个数值,不要输出任何说明或解释。只输出:-1、0或1。"""
self.offline_bot_tl_qw.clear_history()
response = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
pb_hist_str = TextProcessor.clean_thought_process(response)
try:
pb_hist = int(pb_hist_str)
if pb_hist < -1 or pb_hist > 1:
logger.warning(f"提取的PB历史分位分类值超出范围: {pb_hist}设置为默认值0")
return 0
return pb_hist
except ValueError:
logger.warning(f"无法将提取的PB历史分位分类值转换为整数: {pb_hist_str}设置为默认值0")
return 0
except Exception as e:
logger.error(f"提取PB历史分位分类失败: {str(e)}")
return 0
def _extract_pe_industry(self, valuation_text: str) -> int:
"""从估值水平分析中提取PE行业对比分类
Args:
valuation_text: 完整的估值水平分析文本
Returns:
int: PE行业对比分类值 (-1:高于行业, 0:接近行业, 1:低于行业)
"""
try:
prompt = f"""请仔细分析以下估值水平文本判断当前PE与行业平均水平的对比情况并返回对应的数值
- 如果当前PE为负数返回数值"-1"
- 如果当前PE明显高于行业平均水平返回数值"-1"
- 如果当前PE接近行业平均水平返回数值"0"
- 如果当前PE明显低于行业平均水平返回数值"1"
- 如果文本中没有相关信息,返回数值"0"
估值水平文本:
{valuation_text}
只需要输出一个数值,不要输出任何说明或解释。只输出:-1、0或1。"""
self.offline_bot_tl_qw.clear_history()
response = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
pe_ind_str = TextProcessor.clean_thought_process(response)
try:
pe_ind = int(pe_ind_str)
if pe_ind < -1 or pe_ind > 1:
logger.warning(f"提取的PE行业对比分类值超出范围: {pe_ind}设置为默认值0")
return 0
return pe_ind
except ValueError:
logger.warning(f"无法将提取的PE行业对比分类值转换为整数: {pe_ind_str}设置为默认值0")
return 0
except Exception as e:
logger.error(f"提取PE行业对比分类失败: {str(e)}")
return 0
def _extract_pb_industry(self, valuation_text: str) -> int:
"""从估值水平分析中提取PB行业对比分类
Args:
valuation_text: 完整的估值水平分析文本
Returns:
int: PB行业对比分类值 (-1:高于行业, 0:接近行业, 1:低于行业)
"""
try:
prompt = f"""请仔细分析以下估值水平文本判断当前PB与行业平均水平的对比情况并返回对应的数值
- 如果当前PB为负数返回数值"-1"
- 如果当前PB明显高于行业平均水平返回数值"-1"
- 如果当前PB接近行业平均水平返回数值"0"
- 如果当前PB明显低于行业平均水平返回数值"1"
- 如果文本中没有相关信息,返回数值"0"
估值水平文本:
{valuation_text}
只需要输出一个数值,不要输出任何说明或解释。只输出:-1、0或1。"""
self.offline_bot_tl_qw.clear_history()
response = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
pb_ind_str = TextProcessor.clean_thought_process(response)
try:
pb_ind = int(pb_ind_str)
if pb_ind < -1 or pb_ind > 1:
logger.warning(f"提取的PB行业对比分类值超出范围: {pb_ind}设置为默认值0")
return 0
return pb_ind
except ValueError:
logger.warning(f"无法将提取的PB行业对比分类值转换为整数: {pb_ind_str}设置为默认值0")
return 0
except Exception as e:
logger.error(f"提取PB行业对比分类失败: {str(e)}")
return 0
def generate_investment_advice(self, stock_code: str, stock_name: str) -> bool:
"""生成最终投资建议"""
try:
# 收集所有维度的分析结果排除investment_advice
all_results = {}
analysis_dimensions = [dim for dim in self.dimension_methods.keys() if dim != "investment_advice"]
for dimension in analysis_dimensions:
# 查询数据库
result = get_analysis_result(self.db, stock_code, dimension)
if not result:
# 如果数据库中没有结果,生成新的分析
self.dimension_methods[dimension](stock_code, stock_name)
result = get_analysis_result(self.db, stock_code, dimension)
if result:
all_results[dimension] = result.ai_response
# 构建提示词
prompt = f"""请根据以下{stock_name}({stock_code})的各个维度分析结果生成最终的投资建议要求输出控制在300字以内请严格按照以下格式输出
投资建议:请从以下几个方面进行总结:
1. 业绩表现和增长预期
2. 当前估值水平和市场预期
3. 行业竞争环境和风险
4. 投资建议和理由(请根据以下标准明确给出投资建议):
- 短期持有近期1-3个月内有明确利好因素、催化事件或阶段性业绩改善
- 中期持有短期无明确利好但中期3-12个月业绩面临向上拐点或行业处于上升周期
- 长期持有:公司具备长期稳定的盈利能力、行业地位稳固、长期成长性好
- 不建议投资:存在明显风险因素、基本面恶化、估值过高、行业前景不佳或者存在退市风险
请提供专业、客观的分析,突出关键信息,避免冗长描述。重点关注投资价值和风险。在输出投资建议时,请明确指出是短期持有、中期持有、长期持有还是不建议投资。
各维度分析结果:
{json.dumps(all_results, ensure_ascii=False, indent=2)}"""
self.offline_bot.clear_history()
# 使用离线模型生成建议
result = self.offline_bot.chat(prompt,max_tokens=20000)
# 清理模型输出
result = TextProcessor.clean_thought_process(result)
# 保存到数据库
success = save_analysis_result(
self.db,
stock_code=stock_code,
stock_name=stock_name,
dimension="investment_advice",
ai_response=result,
reasoning_process=None,
references=None
)
# 提取投资建议类型并更新结果
if success:
self.offline_bot.clear_history()
investment_type = self.extract_investment_advice_type(result, stock_code, stock_name)
return True
return success
except Exception as e:
logger.error(f"生成投资建议失败: {str(e)}")
return False
def extract_investment_advice_type(self, advice_text: str, stock_code: str, stock_name: str) -> str:
"""从投资建议中提取建议类型并更新数据库
Args:
advice_text: 完整的投资建议文本
stock_code: 股票代码
stock_name: 股票名称
Returns:
str: 提取的投资建议类型(短期、中期、长期、不建议)或 None
"""
try:
valid_types = ["短期", "中期", "长期", "不建议"]
max_attempts = 3
# 调用辅助函数尝试多次提取
found_type = self._try_extract_advice_type(advice_text, max_attempts)
# 更新数据库中的记录
result = get_analysis_result(self.db, stock_code, "investment_advice")
if result:
update_analysis_result(
self.db,
stock_code=stock_code,
dimension="investment_advice",
ai_response=result.ai_response,
reasoning_process=result.reasoning_process,
references=result.references,
extra_info={"investment_advice_type": found_type}
)
if found_type:
logger.info(f"已更新投资建议类型到数据库: {found_type}")
else:
logger.info("已将投资建议类型更新为 null")
return found_type
except Exception as e:
logger.error(f"提取投资建议类型失败: {str(e)}")
return None
def _try_extract_advice_type(self, advice_text: str, max_attempts: int = 3) -> Optional[str]:
"""尝试多次从投资建议中提取建议类型
Args:
advice_text: 完整的投资建议文本
max_attempts: 最大尝试次数
Returns:
Optional[str]: 提取的投资建议类型如果所有尝试都失败则返回None
"""
valid_types = ["短期", "中期", "长期", "不建议"]
for attempt in range(1, max_attempts + 1):
try:
logger.info(f"尝试提取投资建议类型 (尝试 {attempt}/{max_attempts})")
# 根据尝试次数获取不同的提示词
prompt = self._get_extract_prompt_by_attempt(advice_text, attempt)
# 使用千问离线模型提取建议类型
result = self.offline_bot_tl_qw.chat(prompt,temperature=0.0)
# 检查是否是错误响应
if isinstance(result, str) and "抱歉,发生错误" in result:
logger.warning(f"获取投资建议类型失败: {result}")
continue
# 清理模型输出
cleaned_result = TextProcessor.clean_thought_process(result)
# 检查结果是否为有效类型
if cleaned_result in valid_types:
logger.info(f"成功提取投资建议类型: {cleaned_result}(尝试 {attempt}/{max_attempts}")
return cleaned_result
else:
logger.warning(f"未能提取有效的投资建议类型(尝试 {attempt}/{max_attempts}),获取到: '{cleaned_result}'")
except Exception as e:
logger.error(f"提取投资建议类型失败(尝试 {attempt}/{max_attempts}: {str(e)}")
# 所有尝试都失败
logger.warning(f"经过 {max_attempts} 次尝试后未能提取有效的投资建议类型,设置为 null")
return None
def _get_extract_prompt_by_attempt(self, advice_text: str, attempt: int) -> str:
"""根据尝试次数获取不同的提取提示词
Args:
advice_text: 完整的投资建议文本
attempt: 当前尝试次数
Returns:
str: 提取提示词
"""
if attempt == 1:
return f"""请从以下投资建议中提取明确的投资建议类型。仅输出以下四种类型之一:
- 短期:如果建议短期持有
- 中期:如果建议中期持有
- 长期:如果建议长期持有
- 不建议:如果不建议投资
投资建议文本:
{advice_text}
只需要输出一个词,不要输出其他任何内容。"""
elif attempt == 2:
return f"""请仔细分析以下投资建议文本,并严格按照要求输出结果。
请仅输出以下四个词之一:短期、中期、长期、不建议。
不要输出其他任何内容,不要加任何解释。
投资建议文本:
{advice_text}
如果建议短期持有,输出"短期"
如果建议中期持有,输出"中期"
如果建议长期持有,输出"长期"
如果不建议投资,输出"不建议"
请再次确认你只输出一个词,没有任何额外内容。"""
else:
return f"""请判断以下投资建议文本最符合哪种情况,只输出对应的一个词:
短期近期1-3个月内有明确利好因素、催化事件或阶段性业绩改善
中期无明确短期利好但中期3-12个月业绩面临向上拐点或行业处于上升周期
长期:公司具备长期稳定的盈利能力、行业地位稳固、长期成长性好
不建议:存在风险因素、基本面恶化、估值过高或行业前景不佳
投资建议文本:
{advice_text}
只输出:短期、中期、长期、不建议中的一个,不要有任何其他内容。"""
def analyze_all_dimensions(self, stock_code: str, stock_name: str) -> Dict[str, bool]:
"""分析所有维度"""
results = {}
# 逐个分析每个维度
for dimension, method in self.dimension_methods.items():
logger.info(f"开始分析 {stock_name}({stock_code}) 的 {dimension}")
results[dimension] = method(stock_code, stock_name)
logger.info(f"{dimension} 分析完成: {'成功' if results[dimension] else '失败'}")
return results
def generate_pdf_report(self, stock_code: str, stock_name: str) -> Optional[str]:
"""生成PDF分析报告
Args:
stock_code: 股票代码
stock_name: 股票名称
Returns:
Optional[str]: 生成的PDF文件路径如果失败则返回None
"""
try:
# 检查是否已存在PDF文件
reports_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'reports')
os.makedirs(reports_dir, exist_ok=True)
# 构建可能的文件名格式
possible_filenames = [
f"{stock_name}_{stock_code}_analysis.pdf",
f"{stock_name}_{stock_code}.SZ_analysis.pdf",
f"{stock_name}_{stock_code}.SH_analysis.pdf"
]
# 检查是否存在已生成的PDF文件
for filename in possible_filenames:
filepath = os.path.join(reports_dir, filename)
if os.path.exists(filepath):
logger.info(f"找到已存在的PDF报告: {filepath}")
return filepath
# 维度名称映射
dimension_names = {
"company_profile": "公司简介",
"management_ownership": "管理层持股",
"financial_report": "财务报告",
"industry_competition": "行业竞争",
"recent_projects": "近期项目",
"stock_discussion": "市场讨论",
"industry_cooperation": "产业合作",
"target_price": "目标股价",
"valuation_level": "估值水平",
"investment_advice": "投资建议"
}
# 收集所有可用的分析结果
content_dict = {}
for dimension in self.dimension_methods.keys():
result = get_analysis_result(self.db, stock_code, dimension)
if result and result.ai_response:
content_dict[dimension_names[dimension]] = result.ai_response
if not content_dict:
logger.warning(f"未找到 {stock_name}({stock_code}) 的任何分析结果")
return None
# 确保字体目录存在
fonts_dir = os.path.join(os.path.dirname(__file__), "fonts")
os.makedirs(fonts_dir, exist_ok=True)
# 检查是否存在字体文件,如果不存在则创建简单的默认字体标记文件
font_path = os.path.join(fonts_dir, "simhei.ttf")
if not os.path.exists(font_path):
# 尝试从系统字体目录复制
try:
import shutil
# 尝试常见的系统字体位置
system_fonts = [
"C:/Windows/Fonts/simhei.ttf", # Windows
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", # Linux
"/usr/share/fonts/wqy-microhei/wqy-microhei.ttc", # 其他Linux
"/System/Library/Fonts/PingFang.ttc" # macOS
]
for system_font in system_fonts:
if os.path.exists(system_font):
shutil.copy2(system_font, font_path)
logger.info(f"已复制字体文件: {system_font} -> {font_path}")
break
except Exception as font_error:
logger.warning(f"复制字体文件失败: {str(font_error)}")
# 创建PDF生成器实例
generator = PDFGenerator()
# 生成PDF报告
try:
# 第一次尝试生成
filepath = generator.generate_pdf(
title=f"{stock_name}({stock_code}) 基本面分析报告",
content_dict=content_dict,
output_dir=reports_dir,
filename=f"{stock_name}_{stock_code}_analysis.pdf"
)
if filepath:
logger.info(f"PDF报告已生成: {filepath}")
return filepath
except Exception as pdf_error:
logger.error(f"生成PDF报告第一次尝试失败: {str(pdf_error)}")
# 如果是字体问题,可能需要使用备选方案
if "找不到中文字体文件" in str(pdf_error):
# 导出为文本文件作为备选
try:
txt_filename = f"{stock_name}_{stock_code}_analysis.txt"
txt_filepath = os.path.join(reports_dir, txt_filename)
with open(txt_filepath, 'w', encoding='utf-8') as f:
f.write(f"{stock_name}({stock_code}) 基本面分析报告\n")
f.write(f"生成时间:{datetime.now().strftime('%Y年%m月%d%H:%M:%S')}\n\n")
for section_title, content in content_dict.items():
if content:
f.write(f"## {section_title}\n\n")
f.write(f"{content}\n\n")
logger.info(f"由于PDF生成失败已生成文本报告: {txt_filepath}")
return txt_filepath
except Exception as txt_error:
logger.error(f"生成文本报告失败: {str(txt_error)}")
logger.error("PDF报告生成失败")
return None
except Exception as e:
logger.error(f"生成PDF报告失败: {str(e)}")
return None
def is_stock_locked(self, stock_code: str, dimension: str) -> bool:
"""检查股票是否已被锁定
Args:
stock_code: 股票代码
dimension: 分析维度
Returns:
bool: 是否被锁定
"""
try:
lock_key = f"stock_analysis_lock:{stock_code}:{dimension}"
# 检查是否存在锁
existing_lock = redis_client.get(lock_key)
if existing_lock:
lock_time = int(existing_lock)
current_time = int(time.time())
# 锁超过30分钟(1800秒)视为过期
if current_time - lock_time > 1800:
# 锁已过期,可以释放
redis_client.delete(lock_key)
logger.info(f"股票 {stock_code} 维度 {dimension} 的过期锁已释放")
return False
else:
# 锁未过期,被锁定
return True
# 不存在锁
return False
except Exception as e:
logger.error(f"检查股票 {stock_code} 锁状态时出错: {str(e)}")
# 出错时保守处理,返回未锁定
return False
def lock_stock(self, stock_code: str, dimension: str) -> bool:
"""锁定股票
Args:
stock_code: 股票代码
dimension: 分析维度
Returns:
bool: 是否成功锁定
"""
try:
lock_key = f"stock_analysis_lock:{stock_code}:{dimension}"
current_time = int(time.time())
# 设置锁过期时间1小时
redis_client.set(lock_key, current_time, ex=3600)
logger.info(f"股票 {stock_code} 维度 {dimension} 已锁定")
return True
except Exception as e:
logger.error(f"锁定股票 {stock_code} 时出错: {str(e)}")
return False
def unlock_stock(self, stock_code: str, dimension: str) -> bool:
"""解锁股票
Args:
stock_code: 股票代码
dimension: 分析维度
Returns:
bool: 是否成功解锁
"""
try:
lock_key = f"stock_analysis_lock:{stock_code}:{dimension}"
redis_client.delete(lock_key)
logger.info(f"股票 {stock_code} 维度 {dimension} 已解锁")
return True
except Exception as e:
logger.error(f"解锁股票 {stock_code} 时出错: {str(e)}")
return False
def test_single_method(method: Callable, stock_code: str, stock_name: str) -> bool:
"""测试单个分析方法"""
try:
analyzer = FundamentalAnalyzer()
logger.info(f"开始测试 {method.__name__} 方法")
logger.info(f"测试股票: {stock_name}({stock_code})")
result = method(stock_code, stock_name)
logger.info(f"测试结果: {'成功' if result else '失败'}")
return result
except Exception as e:
logger.error(f"测试失败: {str(e)}")
return False
def test_single_stock(analyzer: FundamentalAnalyzer, stock_code: str, stock_name: str) -> Dict[str, bool]:
"""测试单个股票的所有维度"""
logger.info(f"开始测试股票: {stock_name}({stock_code})")
results = analyzer.analyze_all_dimensions(stock_code, stock_name)
success_count = sum(1 for r in results.values() if r)
logger.info(f"测试完成: 成功维度数 {success_count}/{len(results)}")
return results
def main():
"""主函数"""
# 测试股票列表
test_stocks = [
("603690", "至纯科技"),
("300767", "震安科技"),
("300750", "宁德时代")
]
# 创建分析器实例
analyzer = FundamentalAnalyzer()
# 测试选项
print("\n请选择测试模式:")
print("1. 查询单个维度分析")
print("2. 测试单个方法")
print("3. 测试单个股票")
print("4. 测试所有股票")
print("5. 生成PDF报告")
print("6. 生成投资建议并生成PDF")
print("7. 退出")
choice = input("\n请输入选项1-7: ").strip()
if choice == "1":
# 查询单个维度分析
print("\n可用的分析维度:")
for i, dimension in enumerate(analyzer.dimension_methods.keys(), 1):
print(f"{i}. {dimension}")
dimension_choice = input("\n请选择要查询的维度1-8: ").strip()
if dimension_choice.isdigit() and 1 <= int(dimension_choice) <= len(analyzer.dimension_methods):
dimension = list(analyzer.dimension_methods.keys())[int(dimension_choice) - 1]
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要查询的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
# 查询分析结果
success, response, reasoning, references = analyzer.query_analysis(stock_code, stock_name, dimension)
if success:
print(f"\n分析结果:\n{response}")
if reasoning:
print(f"\n推理过程:\n{reasoning}")
if references:
print("\n参考资料:")
for ref in references:
print(f"\n{ref}")
else:
print(f"\n查询失败:{response}")
elif choice == "2":
# 测试单个方法
print("\n可用的分析方法:")
for i, (dimension, method) in enumerate(analyzer.dimension_methods.items(), 1):
print(f"{i}. {dimension}")
method_choice = input("\n请选择要测试的方法1-8: ").strip()
if method_choice.isdigit() and 1 <= int(method_choice) <= len(analyzer.dimension_methods):
method = list(analyzer.dimension_methods.values())[int(method_choice) - 1]
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要测试的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
test_single_method(method, stock_code, stock_name)
elif choice == "3":
# 测试单个股票
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要测试的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
test_single_stock(analyzer, stock_code, stock_name)
elif choice == "4":
# 测试所有股票
for stock_code, stock_name in test_stocks:
test_single_stock(analyzer, stock_code, stock_name)
elif choice == "5":
# 生成PDF报告
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要生成报告的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
filepath = analyzer.generate_pdf_report(stock_code, stock_name)
if filepath:
print(f"\n报告已生成: {filepath}")
else:
print("\n报告生成失败")
elif choice == "6":
# 生成投资建议并生成PDF
print("\n可用的股票:")
for i, (code, name) in enumerate(test_stocks, 1):
print(f"{i}. {name}({code})")
stock_choice = input("\n请选择要生成投资建议的股票1-3: ").strip()
if stock_choice.isdigit() and 1 <= int(stock_choice) <= len(test_stocks):
stock_code, stock_name = test_stocks[int(stock_choice) - 1]
# 先生成投资建议
print("\n正在生成投资建议...")
success = analyzer.generate_investment_advice(stock_code, stock_name)
if success:
print("投资建议生成成功")
# 然后生成PDF报告
filepath = analyzer.generate_pdf_report(stock_code, stock_name)
if filepath:
print(f"\n报告已生成: {filepath}")
else:
print("\n报告生成失败")
else:
print("\n投资建议生成失败")
elif choice == "7":
print("程序退出")
else:
print("无效的选项")
if __name__ == "__main__":
main()