This commit is contained in:
liao 2025-08-07 14:24:19 +08:00
parent 1070d41a00
commit cce06d8710
5 changed files with 1294 additions and 1 deletions

View File

@ -2816,6 +2816,12 @@ def bigscreen_page():
"""渲染大屏展示页面""" """渲染大屏展示页面"""
return render_template('bigscreen.html') return render_template('bigscreen.html')
@app.route('/bigscreenv2')
def bigscreen_page_v2():
"""渲染大屏展示页面"""
return render_template('bigscreen_v2.html')
@app.route('/api/bigscreen_data', methods=['GET']) @app.route('/api/bigscreen_data', methods=['GET'])
def bigscreen_data(): def bigscreen_data():
"""聚合大屏所需的12张图数据便于前端一次性加载""" """聚合大屏所需的12张图数据便于前端一次性加载"""
@ -3001,6 +3007,186 @@ def run_batch_hk_stock_price_collection():
logger.error(f"批量采集A股行情失败: {str(e)}") logger.error(f"批量采集A股行情失败: {str(e)}")
return jsonify({"status": "error", "message": str(e)}) return jsonify({"status": "error", "message": str(e)})
@app.route('/api/portfolio/industry_allocation', methods=['GET'])
def get_portfolio_industry_allocation():
"""获取行业持仓占比数据"""
try:
# 导入持仓分析器
from src.valuation_analysis.portfolio_analyzer import PortfolioAnalyzer
# 创建分析器实例
analyzer = PortfolioAnalyzer()
# 获取行业持仓分配数据
result = analyzer.analyze_portfolio_allocation()
if result.get("success"):
return jsonify({
"status": "success",
"data": result["data"]
})
else:
return jsonify({
"status": "error",
"message": result.get("message", "获取持仓数据失败")
})
except Exception as e:
logger.error(f"获取行业持仓占比失败: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/api/notice/list', methods=['GET'])
def get_notice_list():
"""获取重要提醒列表"""
try:
# 模拟数据 - 实际项目中应该从数据库或外部API获取
mock_notices = [
"上证指数突破3200点市场情绪回暖",
"北向资金今日净流入85.6亿元",
"科技板块PE估值处于历史低位",
"新能源概念股集体上涨涨幅超3%",
"医药板块回调,建议关注低吸机会",
"融资融券余额连续三日增长",
"消费板块资金流入明显",
"市场恐贪指数回升至65",
"机器人概念板块技术面突破",
"先进封装概念获政策支持"
]
return jsonify({
"status": "success",
"data": mock_notices
})
except Exception as e:
logger.error(f"获取提醒列表失败: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/api/portfolio/summary', methods=['GET'])
def get_portfolio_summary():
"""获取持仓摘要信息"""
try:
# 导入持仓分析器
from src.valuation_analysis.portfolio_analyzer import PortfolioAnalyzer
# 创建分析器实例
analyzer = PortfolioAnalyzer()
# 获取持仓摘要数据
result = analyzer.get_portfolio_summary()
if result.get("success"):
return jsonify({
"status": "success",
"data": result["data"]
})
else:
return jsonify({
"status": "error",
"message": result.get("message", "获取持仓摘要失败")
})
except Exception as e:
logger.error(f"获取持仓摘要失败: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/api/portfolio/industry_holdings', methods=['GET'])
def get_industry_holdings_detail():
"""获取指定行业的详细持仓信息"""
try:
industry_name = request.args.get('industry_name')
if not industry_name:
return jsonify({'status': 'error', 'message': '缺少必要参数: industry_name'}), 400
# 导入持仓分析器
from src.valuation_analysis.portfolio_analyzer import PortfolioAnalyzer
# 创建分析器实例
analyzer = PortfolioAnalyzer()
# 获取行业详细持仓数据
result = analyzer.get_industry_holdings_detail(industry_name)
if result.get("success"):
return jsonify({
"status": "success",
"data": result["data"]
})
else:
return jsonify({
"status": "error",
"message": result.get("message", "获取行业持仓详情失败")
})
except Exception as e:
logger.error(f"获取行业持仓详情失败: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/api/valuation/indicator', methods=['POST'])
def analyze_valuation_indicator():
"""分析股票应该使用PE还是PB估值
POST参数:
- stock_code: 股票代码 (例如: 000001)
- stock_name: 股票名称 (例如: 平安银行)
返回格式:
{
"status": "success",
"data": {
"recommended_indicator": "PB",
"reasoning": "平安银行属于金融服务业作为商业银行其商业模式基于资产负债管理。金融机构的盈利受拨备、利率、市场波动影响而不够稳定。基于金融业的特殊性PB是更合适的估值指标..."
}
}
"""
try:
# 从POST表单参数获取
stock_code = request.form.get('stock_code')
stock_name = request.form.get('stock_name')
if not stock_code or not stock_name:
return jsonify({
"status": "error",
"message": "缺少必要参数: stock_code 或 stock_name"
}), 400
# 导入估值指标分析器
try:
from src.valuation_analysis.valuation_indicator_analyzer import ValuationIndicatorAnalyzer
logger.info("成功导入估值指标分析器")
except ImportError as e:
logger.error(f"无法导入估值指标分析器: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: 估值指标分析器不可用,错误详情: {str(e)}"
}), 500
# 创建分析器实例
analyzer = ValuationIndicatorAnalyzer()
# 执行分析
result = analyzer.analyze_valuation_indicator(stock_code, stock_name)
if result.get("success"):
return jsonify({
"status": "success",
"data": {
"recommended_indicator": result.get("recommended_indicator"),
"reasoning": result.get("reasoning", "")
}
})
else:
return jsonify({
"status": "error",
"message": result.get("error", "分析失败,无详细信息")
}), 500
except Exception as e:
logger.error(f"估值指标分析失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"估值指标分析失败: {str(e)}"
}), 500
if __name__ == '__main__': if __name__ == '__main__':
# 启动Web服务器 # 启动Web服务器

View File

@ -11,7 +11,7 @@ XUEQIU_HEADERS = {
'Accept-Encoding': 'gzip, deflate, br, zstd', 'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Client-Version': 'v2.44.75', 'Client-Version': 'v2.44.75',
'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; Hm_lvt_1db88642e346389874251b5a1eded6e3=1751936369; xq_a_token=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xqat=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzU2MDAyNjgyLCJjdG0iOjE3NTM0MTA2ODI0MTQsImNpZCI6ImQ5ZDBuNEFadXAifQ.AlnzQSY7oGKGABfaQcFLg0lAJsDdvBMiwUbgpCMCBlbx6VZPKhzERxWiylQb4dFIyyECvRRJ73SbO9cD46fAqgzOgTxArNHtTKD4lQapTnyb11diDADnpb_nzzaRr4k_BYQRKXWtcJxdUMzde2WLy-eAkSf76QkXmKrwS3kvRm5gfqhdye44whw5XMEGoZ_lXHzGLWGz_PludHZp6W3v-wwZc_0wLU6cTb_KdrwWUWT_8jw5JHXnJEmuZmQI8QWf60DtiHIYCYXarxv8XtyHK7lLKhIAa3C2QmGWw5wv2HGz4I5DPqm2uMPKumgkQxycfAk56-RWviLZ8LAPF-XcbA; xq_r_token=92527e51353f90ba14d5fd16581e5a7a2780baa2; acw_tc=ac11000117534287625894768e00740076244cbad53d8039638dfe8ed0f4b1; is_overseas=1; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=VRmy+KD3dGV+blntY70z3I+WQFcq8+JM1SqrC6E5295b/kwc4W5RkB+oXprCzEylFSeHXru7sQJDLmMr0mBJ+g%3D%3D; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1753429267; ssxmod_itna=eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0KHhqDylYw0i2YwjCo0ZD5D/KCeGzDiLPGhDBWAFdYGdTt4NFtiowCWKGwktpe9flQbeeYGlcD0aK7G=x3EujH5Zn7iIiRoeDU4GnD0=O7YmKqxGGI4GwDGoD34DiDDpED03Db4D+4=bD7rTiocW=EjeDQ4GyDitDKLe=xi3DA4Djnl=qYiTdwDDBDGtON9aDG4GfSmDD0qtBeqT4DYP=5Pr8d29mpOWSneDMixGXz71+NI1yoYcrdvU6r+bOpPGuDG6CP3POd7nEaafY66i0DeGmKK+j0DrlDpYwMihyYwQGGiBqqQGo3qqCexhGC0G4ixqQARmPHK4vP2OIPeDEZgXDfExF0iY+K+mdFrKGmQGvwBP40PpFDC7KzBqG7W4QGt/D3ixt+R5BYobxxD; ssxmod_itna2=eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0KHhqDylYw0i2YwjCo0ZYeDA4rYnRItORCDU1Z/PnDhxYG9pGD', 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; Hm_lvt_1db88642e346389874251b5a1eded6e3=1751936369; xq_a_token=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xqat=ada154d4707b8d3f8aa521ff0c960aa7f81cbf9e; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzU2MDAyNjgyLCJjdG0iOjE3NTM0MTA2ODI0MTQsImNpZCI6ImQ5ZDBuNEFadXAifQ.AlnzQSY7oGKGABfaQcFLg0lAJsDdvBMiwUbgpCMCBlbx6VZPKhzERxWiylQb4dFIyyECvRRJ73SbO9cD46fAqgzOgTxArNHtTKD4lQapTnyb11diDADnpb_nzzaRr4k_BYQRKXWtcJxdUMzde2WLy-eAkSf76QkXmKrwS3kvRm5gfqhdye44whw5XMEGoZ_lXHzGLWGz_PludHZp6W3v-wwZc_0wLU6cTb_KdrwWUWT_8jw5JHXnJEmuZmQI8QWf60DtiHIYCYXarxv8XtyHK7lLKhIAa3C2QmGWw5wv2HGz4I5DPqm2uMPKumgkQxycfAk56-RWviLZ8LAPF-XcbA; xq_r_token=92527e51353f90ba14d5fd16581e5a7a2780baa2; acw_tc=0a27aa0f17542694317833912e006564153fcd1bb89f49a865e382d9953601; is_overseas=0; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1754269439; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=HS+RscPvXRUz1ypZekks1pgGkAHHlHsHVuftTbDQCbUUaFqtm9BV4h7ghR2d5Nh+YD29otSyz2svRiKWvOJqgQ%3D%3D; ssxmod_itna=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuq4wQWiYMrK4N4hGRtDl=YoDZDGFdDqx0Ei6Fi7HKzYhtBoqzWKjw_wv5YlCZMPO8//1P9PQCNzkOQ4hviDB3DbqDy/dePxYYjDBYD74G_DDeDixdDj4GmDGYtOeDFfCuNq6R5dxDwDB=DmMIbfeDEDG3D0fbeCLRYwDDBDGUFxtaDG4Gf0mDD0wDAo0jooDGWfnu4s6mkeFKN57G3x0tWDBL5QvG3x/lnoGWNVtlfkS2FkPGuDG6Ogl0kDqQO3i2AfP4KGGIm0iBPKY_5leOQDqQe4YwQGDpl0xliO7Gm0DOGDz0G4ixqYw1n0aSpwhixgPXieD1NZcX3ZXDK4rm0IlvYRGImxqnmmlG4eK40w4Am1BqGYeeGn5ixXWa3m2b/DDgi3YD; ssxmod_itna2=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuq4wQWiYMrK4N4hGbYDiPbY44h7ie03dz7=3xDlouSdLRKl=Q_2YStYQ7OzOy_RBQ1oeziI2pkPsD8RSfPnSw5L7G4xcSPKKMxxoCD6zTiVCud28rNOm2tL7qASSMTjB2GcYPxzSRi94n0Kgjd6C6jKOMh5rMtOfkR2l8TGOPL277=81u9MRkBgIwRxDwx6iYEE4omE9FE1lonhzib3BUC6PD',
'Referer': 'https://weibo.com/u/7735765253', 'Referer': 'https://weibo.com/u/7735765253',
'Sec-Ch-Ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"', 'Sec-Ch-Ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"',
'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Mobile': '?0',

View File

@ -0,0 +1,376 @@
"""
持仓分析模块
提供持仓数据获取和行业分类统计功能包括
1. 从外部API获取持仓数据
2. 根据股票代码获取行业分类
3. 计算各行业持仓金额和占比
"""
import requests
import logging
from typing import Dict, List, Optional
from sqlalchemy import create_engine, text
from .config import DB_URL, LOG_FILE
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger("portfolio_analyzer")
class PortfolioAnalyzer:
"""持仓分析器类"""
def __init__(self, db_url: str = DB_URL):
"""
初始化持仓分析器
Args:
db_url: 数据库连接URL
"""
self.engine = create_engine(
db_url,
pool_size=5,
max_overflow=10,
pool_recycle=3600
)
self.api_url = ("https://to.bmbs.tech/aim/app/v1/derivativeTrading/getTradingRecordList?"
"projectId=182AE38B8C254BC88C715D86C643A6DD,C9B533175D294648A2372CB2966BCC96")
logger.info("持仓分析器初始化完成")
def get_trading_records(self) -> Optional[Dict]:
"""
从外部API获取交易记录数据
Returns:
交易记录数据字典如果请求失败返回None
"""
try:
headers = {
'authUserIdYh': '4028816c6759b6cb01675aacc98a00f6'
}
response = requests.get(self.api_url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if data.get("success") and data.get("code") == 200:
logger.info("成功获取交易记录数据")
return data.get("data", {})
else:
logger.error(f"API返回错误: {data.get('message', '未知错误')}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"请求交易记录API失败: {e}")
return None
except Exception as e:
logger.error(f"处理交易记录数据失败: {e}")
return None
def get_stock_industry(self, stock_code: str) -> List[str]:
"""
获取指定股票所属的行业列表
Args:
stock_code: 股票代码格式如603290.SH
Returns:
行业名称列表
"""
try:
# 转换股票代码格式
formatted_code = self._convert_stock_code_format(stock_code)
query = text("""
SELECT DISTINCT bk_name
FROM gp_hybk
WHERE gp_code = :stock_code
""")
with self.engine.connect() as conn:
result = conn.execute(query, {"stock_code": formatted_code}).fetchall()
if result:
return [row[0] for row in result]
else:
logger.warning(f"未找到股票 {stock_code} 的行业数据")
return []
except Exception as e:
logger.error(f"获取股票行业失败: {e}")
return []
def _convert_stock_code_format(self, stock_code: str) -> str:
"""
转换股票代码格式
Args:
stock_code: 原始股票代码格式如 "603290.SH"
Returns:
转换后的股票代码格式如 "SH603290"
"""
try:
code, market = stock_code.split('.')
return f"{market}{code}"
except Exception as e:
logger.error(f"转换股票代码格式失败: {str(e)}")
return stock_code
def calculate_margin_amount(self, notional_principal: float, margin_rate: float) -> float:
"""
计算保证金金额
Args:
notional_principal: 名义本金
margin_rate: 保证金率百分比
Returns:
保证金金额
"""
return notional_principal * (margin_rate / 100)
def analyze_portfolio_allocation(self) -> Dict:
"""
分析持仓行业分配
Returns:
包含行业持仓占比的字典
"""
try:
# 1. 获取交易记录数据
trading_data = self.get_trading_records()
if not trading_data:
return {"success": False, "message": "无法获取交易记录数据"}
data_list = trading_data.get("dataList", [])
if not data_list:
return {"success": False, "message": "交易记录数据为空"}
# 2. 处理持仓数据
industry_amounts = {} # 行业金额统计
total_amount = 0.0
for project in data_list:
project_name = project.get("projectName", "")
stock_code = project.get("stockCode", "")
trading_records = project.get("tradingRecoderList", [])
# 只处理未清仓的记录
for record in trading_records:
if record.get("projectStatus") == "published":
notional_principal = float(record.get("notionalPrincipal", 0))
margin_rate = float(record.get("marginRate", 0))
# 计算保证金金额
margin_amount = self.calculate_margin_amount(notional_principal, margin_rate)
total_amount += margin_amount
# 获取股票所属行业
industries = self.get_stock_industry(stock_code)
if industries:
# 如果股票属于多个行业,按行业数量平均分配
amount_per_industry = margin_amount / len(industries)
for industry in industries:
if industry in industry_amounts:
industry_amounts[industry] += amount_per_industry
else:
industry_amounts[industry] = amount_per_industry
else:
# 如果无法获取行业信息,归类为"其他"
other_industry = "其他"
if other_industry in industry_amounts:
industry_amounts[other_industry] += margin_amount
else:
industry_amounts[other_industry] = margin_amount
# 3. 生成返回数据
if not industry_amounts:
return {"success": False, "message": "未找到有效的持仓数据"}
# 按金额排序
sorted_industries = sorted(industry_amounts.items(), key=lambda x: x[1], reverse=True)
# 定义颜色映射
colors = [
"#5470c6", "#91cc75", "#fac858", "#ee6666", "#73c0de",
"#3ba272", "#fc8452", "#9a60b4", "#ea7ccc", "#ff9f7f"
]
industries_data = []
for i, (industry, amount) in enumerate(sorted_industries):
color = colors[i % len(colors)]
industries_data.append({
"industry": industry,
"amount": round(amount, 2),
"color": color
})
result = {
"success": True,
"data": {
"total_amount": round(total_amount, 2),
"industries": industries_data
}
}
logger.info(f"成功分析持仓行业分配,总金额: {total_amount:.2f}万元,共{len(industries_data)}个行业")
return result
except Exception as e:
logger.error(f"分析持仓行业分配失败: {e}")
return {"success": False, "message": f"分析持仓行业分配失败: {str(e)}"}
def get_portfolio_summary(self) -> Dict:
"""
获取持仓摘要信息
Returns:
持仓摘要信息字典
"""
try:
# 1. 获取交易记录数据
trading_data = self.get_trading_records()
if not trading_data:
return {"success": False, "message": "无法获取交易记录数据"}
data_list = trading_data.get("dataList", [])
if not data_list:
return {"success": False, "message": "交易记录数据为空"}
# 2. 统计信息
total_projects = len(data_list)
active_projects = 0
total_margin_amount = 0.0
project_details = []
for project in data_list:
project_name = project.get("projectName", "")
stock_code = project.get("stockCode", "")
trading_records = project.get("tradingRecoderList", [])
project_margin = 0.0
has_active_position = False
for record in trading_records:
if record.get("projectStatus") == "published":
has_active_position = True
notional_principal = float(record.get("notionalPrincipal", 0))
margin_rate = float(record.get("marginRate", 0))
margin_amount = self.calculate_margin_amount(notional_principal, margin_rate)
project_margin += margin_amount
if has_active_position:
active_projects += 1
total_margin_amount += project_margin
# 获取行业信息
industries = self.get_stock_industry(stock_code)
industry_names = ", ".join(industries) if industries else "未知"
project_details.append({
"project_name": project_name,
"stock_code": stock_code,
"industry": industry_names,
"margin_amount": round(project_margin, 2)
})
# 3. 生成摘要
summary = {
"success": True,
"data": {
"total_projects": total_projects,
"active_projects": active_projects,
"total_margin_amount": round(total_margin_amount, 2),
"project_details": project_details
}
}
logger.info(f"成功获取持仓摘要,总项目数: {total_projects},活跃项目数: {active_projects}")
return summary
except Exception as e:
logger.error(f"获取持仓摘要失败: {e}")
return {"success": False, "message": f"获取持仓摘要失败: {str(e)}"}
def get_industry_holdings_detail(self, industry_name: str) -> Dict:
"""
获取指定行业的详细持仓信息
Args:
industry_name: 行业名称
Returns:
包含行业详细持仓信息的字典
"""
try:
# 1. 获取交易记录数据
trading_data = self.get_trading_records()
if not trading_data:
return {"success": False, "message": "无法获取交易记录数据"}
data_list = trading_data.get("dataList", [])
if not data_list:
return {"success": False, "message": "交易记录数据为空"}
# 2. 筛选该行业的持仓记录
industry_holdings = []
for project in data_list:
project_name = project.get("projectName", "")
stock_code = project.get("stockCode", "")
trading_records = project.get("tradingRecoderList", [])
# 获取股票所属行业
industries = self.get_stock_industry(stock_code)
# 检查是否属于指定行业
if industry_name in industries:
for record in trading_records:
if record.get("projectStatus") == "published":
notional_principal = float(record.get("notionalPrincipal", 0))
margin_rate = float(record.get("marginRate", 0))
create_time = record.get("createTime", "")
# 计算保证金金额
margin_amount = self.calculate_margin_amount(notional_principal, margin_rate)
industry_holdings.append({
"project_name": project_name,
"stock_code": stock_code,
"notional_principal": notional_principal,
"margin_rate": margin_rate,
"margin_amount": margin_amount,
"create_time": create_time
})
# 3. 按保证金金额排序
industry_holdings.sort(key=lambda x: x["margin_amount"], reverse=True)
# 4. 生成返回数据
result = {
"success": True,
"data": {
"industry_name": industry_name,
"total_count": len(industry_holdings),
"total_margin_amount": sum(item["margin_amount"] for item in industry_holdings),
"holdings": industry_holdings
}
}
logger.info(f"成功获取行业 {industry_name} 的详细持仓信息,共 {len(industry_holdings)} 条记录")
return result
except Exception as e:
logger.error(f"获取行业详细持仓信息失败: {e}")
return {"success": False, "message": f"获取行业详细持仓信息失败: {str(e)}"}

View File

@ -0,0 +1,334 @@
# -*- coding: utf-8 -*-
"""
估值指标分析专用聊天机器人
专门用于分析股票应该使用PE还是PB估值
"""
import sys
import os
import logging
from typing import Dict, Any, Optional
from datetime import datetime
# 添加项目根目录到 Python 路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from openai import OpenAI
from src.scripts.config import get_random_api_key, get_model
# 设置日志
logger = logging.getLogger(__name__)
class ValuationChatBot:
"""估值指标分析专用聊天机器人"""
def __init__(self, model_type: str = "online_bot"):
"""初始化估值分析聊天机器人
Args:
model_type: 要使用的模型类型默认为联网智能体
"""
try:
# 从配置获取API密钥
self.api_key = get_random_api_key()
# 从配置获取模型ID
self.model = get_model(model_type)
logger.info(f"初始化ValuationChatBot使用模型: {self.model}")
# 初始化OpenAI客户端
self.client = OpenAI(
base_url="https://ark.cn-beijing.volces.com/api/v3/bots",
api_key=self.api_key
)
# 估值指标分析专用系统提示词
self.system_message = {
"role": "system",
"content": """你是一名顶级的、注重第一性原理的基本面分析师。你的核心任务是深入剖析一家公司的内在价值驱动因素并基于此判断“市盈盈率PE”和“市净率PB”哪个指标能更真实、更核心地反映其价值。
**你的分析必须超越简单的行业标签聚焦于公司的个性化特征** 即使是同一行业的公司由于商业模式和财务状况的差异也可能适用不同的估值指标
**你的决策逻辑框架如下**
1. **盈利质量与可预测性分析 - 这是判断PE有效性的基石**
* **分析要点** 公司的盈利是常态还是偶发是内生增长还是外部输血过去5年的盈利记录是否稳定且持续是否存在大量非经常性损益扭曲了利润公司的自由现金流状况如何是否与净利润匹配
* **决策倾向** 如果盈利质量高可预测性强则PE的权重增加如果盈利波动巨大不可持续或为负则PE的权重降低甚至失效
2. **资产价值与商业模式分析 - 这是判断PB有效性的基石**
* **分析要点** 公司的核心价值是沉淀在资产负债表上如厂房金融资产土地还是体现在资产负债表外如品牌技术网络效应客户关系公司的商业模式是资产驱动型还是智力/品牌驱动型
* **决策倾向** 如果公司价值与净资产高度相关如金融重资产制造资源型企业则PB的权重增加如果公司是典型的轻资产模式则PB的权重降低
3. **周期性与成长性交叉验证**
* **分析要点** 公司所处的行业周期性强弱如何公司自身是否展现出超越行业的成长性或防御性
* **决策倾向** 强周期性会削弱PE在特定时点的有效性使PB成为更稳健的参照而强成长性尤其是有利可图的成长会显著提升PE的适用性
**最终决策原则**
* **优先选择 PE 的核心理由** 公司具备持续稳定的盈利能力并且其核心价值能通过利润得到体现这是对股东回报最直接的衡量
* **优先选择 PB 的核心理由** 公司的盈利能力不可靠周期性/亏损或者其商业模式的根本是基于净资产的规模和质量如金融业PB此时是衡量价值的底线
**输出要求**
1. **明确结论** 首先明确推荐PE或PB作为主要估值指标
2. **深入的个股特质分析**
* **商业模式剖析** 详细说明公司如何赚钱其护城河是什么
* **财务特征分析** 重点分析盈利的稳定性与质量资产的轻重结构现金流状况
* **行业背景补充** 分析公司在行业中所处的生态位有何不同于同行的特质
3. **提供决策依据** 清晰地说明你是如何基于上述三层决策逻辑框架最终做出选择的
4. **给出合理的估值区间建议** 基于你选择的指标并结合公司的历史估值水平和未来成长性给出一个合理的估值区间"""
}
# 对话历史
self.conversation_history = [self.system_message]
except Exception as e:
logger.error(f"初始化ValuationChatBot时出错: {str(e)}")
raise
def chat(self, user_input: str, temperature: float = 0.3, top_p: float = 0.7, max_tokens: int = 2048, frequency_penalty: float = 0.0) -> Dict[str, Any]:
"""与AI进行估值指标分析对话
Args:
user_input: 用户输入的问题
temperature: 控制输出的随机性范围0-2默认0.3更确定性
top_p: 控制输出的多样性范围0-1默认0.7
max_tokens: 控制输出的最大长度默认2048
frequency_penalty: 频率惩罚范围-2.0到2.0默认0.0
Returns:
Dict包含对话结果
"""
try:
# 添加用户消息到对话历史
self.conversation_history.append({
"role": "user",
"content": user_input
})
# 调用OpenAI API
response = self.client.chat.completions.create(
model=self.model,
messages=self.conversation_history,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
frequency_penalty=frequency_penalty
)
# 获取AI回复
ai_response = response.choices[0].message.content
# 添加AI回复到对话历史
self.conversation_history.append({
"role": "assistant",
"content": ai_response
})
# 保持对话历史在合理长度内避免token过多
if len(self.conversation_history) > 10:
# 保留系统消息和最近的对话
self.conversation_history = [self.system_message] + self.conversation_history[-8:]
logger.info(f"ValuationChatBot对话成功回复长度: {len(ai_response)}")
return {
"success": True,
"response": ai_response,
"model": self.model,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"ValuationChatBot对话失败: {str(e)}")
return {
"success": False,
"error": str(e),
"model": self.model,
"timestamp": datetime.now().isoformat()
}
def clear_history(self):
"""清空对话历史"""
self.conversation_history = [self.system_message]
logger.info("ValuationChatBot对话历史已清空")
def get_conversation_history(self) -> list:
"""获取对话历史"""
return self.conversation_history.copy()
class ValuationOfflineChatBot:
"""估值指标分析专用离线聊天机器人"""
def __init__(self, model_type: str = "offline_bot"):
"""初始化离线估值分析聊天机器人
Args:
model_type: 要使用的模型类型默认为离线模型
"""
try:
# 尝试导入配置参考chat_bot_with_offline.py的方式
try:
from src.scripts.config import get_model_config
config = get_model_config("tl_qw_private", "GLM")
logger.info("成功从src.scripts.config导入配置")
except ImportError:
try:
from scripts.config import get_model_config
config = get_model_config("volc", "offline_model")
logger.info("成功从scripts.config导入配置")
except ImportError:
logger.warning("无法导入配置模块,使用默认配置")
# 使用默认配置
config = {
"base_url": "https://ark.cn-beijing.volces.com/api/v3/",
"api_key": "28cfe71a-c6fa-4c5d-9b4e-d8474f0d3b93",
"model": "ep-20250326090920-v7wns"
}
# 保存配置信息
self.api_key = config["api_key"]
self.model = config["model"]
self.base_url = config["base_url"]
logger.info(f"初始化ValuationOfflineChatBot使用模型: {self.model}")
# 初始化OpenAI客户端
self.client = OpenAI(
base_url=self.base_url,
api_key=self.api_key,
timeout=600
)
# 估值指标分析专用系统提示词(针对从分析报告中进行语义理解并提取最终结论)
self.system_message = {
"role": "system",
"content": """你是一个专注于**语义理解和结论提取**的AI。你的唯一任务是阅读一段分析报告理解其核心论点并判断作者最终推荐的估值指标是 "PE" 还是 "PB"
**你的核心工作流程**
1. **通读全文**完整地阅读用户提供的分析报告理解其对公司业务模式盈利能力和资产结构的整体评价
2. **定位结论性语段**重点关注报告的结尾部分或总结段落寻找那些**承上启下做出最终评判**的句子这些句子不一定包含固定的关键词但它们在语义上起到了总结和给出最终意见的作用
3. **进行意图判断**
* **判断为 "PE" 的信号**如果结论性语段的中心思想是强调盈利的稳定性高质量的增长强大的品牌价值轻资产模式的优势并最终将这些优势导向了某个估值方法那么结论就是 "PE"
* *例子* "考虑到该公司强大的品牌护城河和持续稳定的现金流创造能力,通过其盈利水平来评估价值显然是更为恰当的路径。" -> **应判断为 PE**
* **判断为 "PB" 的信号**如果结论性语段的中心思想是强调资产负债表的重要性行业的周期性风险盈利的不可靠性或者直接点明其金融属性并基于这些论据做出最终选择那么结论就是 "PB"
* *例子* "尽管公司短期盈利尚可,但其重资产和强周期的本质意味着盈利波动是常态,因此,基于其净资产的估值方法提供了一个更稳固的价值锚点。" -> **应判断为 PB**
**你必须遵守的铁律**
* **你的任务是理解和提取不是再次分析**你必须相信报告原文的逻辑是自洽的你的工作只是找出它的最终论点
* **只输出最终结果**你的输出**必须且只能是** "PE" "PB"不要添加任何解释理由或多余的字符
* **处理歧义**如果在极少数情况下报告的结论确实模棱两可无法从语义上明确判断**请默认输出 "PE"**以确保程序健壮性
"""
}
# 对话历史
self.conversation_history = [self.system_message]
except Exception as e:
logger.error(f"初始化ValuationOfflineChatBot时出错: {str(e)}")
raise
def chat(self, user_input: str, temperature: float = 0.1, top_p: float = 0.7, max_tokens: int = 1024, frequency_penalty: float = 0.0) -> Dict[str, Any]:
"""与离线AI进行估值指标分析对话
Args:
user_input: 用户输入的问题
temperature: 控制输出的随机性范围0-2默认0.1更确定性
top_p: 控制输出的多样性范围0-1默认0.7
max_tokens: 控制输出的最大长度默认1024
frequency_penalty: 频率惩罚范围-2.0到2.0默认0.0
Returns:
Dict包含对话结果
"""
try:
# 添加用户消息到对话历史
self.conversation_history.append({
"role": "user",
"content": user_input
})
# 调用本地GLM模型
ai_response = self._call_local_model(user_input, temperature, top_p, max_tokens, frequency_penalty)
# 添加AI回复到对话历史
self.conversation_history.append({
"role": "assistant",
"content": ai_response
})
# 保持对话历史在合理长度内
if len(self.conversation_history) > 6:
self.conversation_history = [self.system_message] + self.conversation_history[-4:]
logger.info(f"ValuationOfflineChatBot对话成功回复长度: {len(ai_response)}")
return {
"success": True,
"response": ai_response,
"model": self.model,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"ValuationOfflineChatBot对话失败: {str(e)}")
return {
"success": False,
"error": str(e),
"model": self.model,
"timestamp": datetime.now().isoformat()
}
def _call_local_model(self, user_input: str, temperature: float = 0.1, top_p: float = 0.7, max_tokens: int = 1024, frequency_penalty: float = 0.0) -> str:
"""调用本地GLM模型"""
try:
# 调用本地模型API使用初始化时创建的客户端
response = self.client.chat.completions.create(
model=self.model,
messages=self.conversation_history,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
frequency_penalty=frequency_penalty,
timeout=300
)
# 获取AI回复
ai_response = response.choices[0].message.content
# 清理回复内容确保只返回PE或PB
ai_response_clean = ai_response.strip().upper()
if "PE" in ai_response_clean and "PB" not in ai_response_clean:
return "PE"
elif "PB" in ai_response_clean and "PE" not in ai_response_clean:
return "PB"
elif ai_response_clean == "PE" or ai_response_clean == "PB":
return ai_response_clean
else:
# 如果回复不清晰,记录详细信息
logger.warning(f"本地模型回复不清晰: {ai_response_clean}")
return "PE" # 默认返回PE
except Exception as e:
logger.error(f"调用本地模型失败: {str(e)}")
return "PE" # 出错时默认返回PE
def clear_history(self):
"""清空对话历史"""
self.conversation_history = [self.system_message]
logger.info("ValuationOfflineChatBot对话历史已清空")
def get_conversation_history(self) -> list:
"""获取对话历史"""
return self.conversation_history.copy()
if __name__ == "__main__":
test_valuation_chat_bot()

View File

@ -0,0 +1,397 @@
# -*- coding: utf-8 -*-
"""
估值指标分析器
用于判断股票应该使用PE估值还是PB估值更合理
"""
import sys
import os
import logging
from typing import Dict, Any, Optional, Tuple
from datetime import datetime
# 添加项目根目录到 Python 路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from src.valuation_analysis.valuation_chat_bot import ValuationChatBot
from src.scripts.config import get_random_api_key, get_model
# 设置日志
logger = logging.getLogger(__name__)
class ValuationIndicatorAnalyzer:
"""估值指标分析器"""
def __init__(self):
"""初始化分析器"""
try:
# 初始化联网大模型(使用专用估值分析聊天机器人)
self.online_chatbot = ValuationChatBot(model_type="online_bot")
# 初始化本地GLM模型使用专用估值分析离线聊天机器人
try:
from src.valuation_analysis.valuation_chat_bot import ValuationOfflineChatBot
self.offline_chatbot = ValuationOfflineChatBot(model_type="offline_bot")
self.has_offline_model = True
except ImportError:
logger.warning("无法导入离线模型,将只使用联网模型")
self.offline_chatbot = None
self.has_offline_model = False
logger.info("估值指标分析器初始化成功")
except Exception as e:
logger.error(f"初始化估值指标分析器失败: {str(e)}")
raise
def analyze_valuation_indicator(self, stock_code: str, stock_name: str) -> Dict[str, Any]:
"""
分析股票应该使用PE还是PB估值
Args:
stock_code: 股票代码
stock_name: 股票名称
Returns:
Dict包含分析结果
"""
try:
logger.info(f"开始分析股票 {stock_name}({stock_code}) 的估值指标")
# 第一步:使用联网大模型进行初步分析
online_result = self._analyze_with_online_model(stock_code, stock_name)
# 第二步使用本地GLM模型进行格式化和验证
if self.has_offline_model:
offline_result = self._analyze_with_offline_model(stock_code, stock_name, online_result)
else:
offline_result = online_result
# 第三步:整合结果
final_result = self._integrate_results(online_result, offline_result)
logger.info(f"完成股票 {stock_name}({stock_code}) 的估值指标分析")
return final_result
except Exception as e:
logger.error(f"分析股票 {stock_name}({stock_code}) 估值指标时出错: {str(e)}")
return {
"success": False,
"error": str(e),
"recommended_indicator": None,
"reasoning": None,
"valuation_range": None
}
def _analyze_with_online_model(self, stock_code: str, stock_name: str) -> Dict[str, Any]:
"""
使用联网大模型进行估值指标分析
Args:
stock_code: 股票代码
stock_name: 股票名称
Returns:
联网模型的分析结果
"""
try:
# 构建用户提示词(让联网大模型专注于分析推理,不输出估值区间)
user_prompt = f"""请深入分析股票代码为{stock_code}、名称为{stock_name}的公司并判断其更适合使用PE还是PB进行估值。
你的分析应超越简单的行业标签聚焦于该公司的个性化特征请遵循以下分析框架提供详细的基于第一性原理的分析
1. **盈利质量与可预测性分析**:
* 这家公司的盈利是稳定持续的还是波动巨大难以预测的
* 其净利润是否真实反映了业务的现金创造能力
* 是否存在大量非经常性损益影响了其盈利的真实性
2. **资产价值与商业模式分析**:
* 公司的核心价值更多体现在资产负债表内的有形资产如设备土地金融资产还是表外的无形资产如品牌技术网络效应
* 它的商业模式是资产驱动型还是智力/品牌驱动型
3. **周期性与成长性分析**:
* 公司所处行业的周期性强弱如何它在周期中的位置是怎样的
* 公司自身的成长性如何是高于还是低于行业平均水平
4. **最终决策与依据**:
* 综合以上分析明确阐述你为什么认为PE或PB是更根本的估值指标请详细说明你的决策逻辑将公司的个性化特征与你的结论紧密联系起来
请确保你的分析是客观专业有深度的在本次分析中请不要提供具体的估值区间专注于提供选择估值指标的充分理由"""
# 调用联网模型
response = self.online_chatbot.chat(user_prompt, temperature=0.3, max_tokens=2048)
if response.get("success"):
return {
"success": True,
"raw_response": response.get("response", ""),
"model": "online"
}
else:
logger.error(f"联网模型分析失败: {response.get('error', '未知错误')}")
return {
"success": False,
"error": response.get('error', '联网模型分析失败'),
"model": "online"
}
except Exception as e:
logger.error(f"使用联网模型分析时出错: {str(e)}")
return {
"success": False,
"error": str(e),
"model": "online"
}
def _analyze_with_offline_model(self, stock_code: str, stock_name: str, online_result: Dict[str, Any]) -> Dict[str, Any]:
"""
使用本地GLM模型提取最终的PE/PB推荐
Args:
stock_code: 股票代码
stock_name: 股票名称
online_result: 联网模型的分析结果
Returns:
本地模型的分析结果
"""
try:
# 构建用户提示词让本地GLM专注于提取最终结果
user_prompt = f"""你是一个专注于语义理解和结论提取的AI。你的唯一任务是阅读一段分析报告理解其核心论点并判断作者最终推荐的估值指标是 "PE" 还是 "PB"
你的输出必须严格遵守以下规则
* 你的输出**必须且只能是** "PE" "PB"
* 不要添加任何解释理由或多余的字符
* 如果文本结论确实模棱两可无法明确判断**请默认输出 "PE"**
分析内容如下
---
{online_result.get('raw_response', '分析失败')}
---
"""
# 调用本地模型
response = self.offline_chatbot.chat(user_prompt, temperature=0.1, max_tokens=10)
if response.get("success"):
return {
"success": True,
"raw_response": response.get("response", "").strip(),
"model": "offline"
}
else:
logger.error(f"本地模型分析失败: {response.get('error', '未知错误')}")
return {
"success": False,
"error": response.get('error', '本地模型分析失败'),
"model": "offline"
}
except Exception as e:
logger.error(f"使用本地模型分析时出错: {str(e)}")
return {
"success": False,
"error": str(e),
"model": "offline"
}
def _integrate_results(self, online_result: Dict[str, Any], offline_result: Dict[str, Any]) -> Dict[str, Any]:
"""
整合联网模型和本地模型的结果
Args:
online_result: 联网模型结果
offline_result: 本地模型结果
Returns:
整合后的最终结果
"""
try:
# 如果联网模型成功
if online_result.get("success"):
# 获取联网模型的分析内容
online_analysis = online_result.get("raw_response", "")
# 如果本地模型也成功使用本地模型提取的PE/PB推荐
if offline_result.get("success"):
recommended_indicator = offline_result.get("raw_response", "").strip()
# 验证推荐指标是否有效
if recommended_indicator in ["PE", "PB"]:
return {
"success": True,
"recommended_indicator": recommended_indicator,
"reasoning": online_analysis
}
else:
# 本地模型输出无效,使用联网模型的结果
logger.warning(f"本地模型输出无效: {recommended_indicator}")
return {
"success": True,
"recommended_indicator": self._extract_indicator_from_text(online_analysis),
"reasoning": online_analysis
}
else:
# 只有联网模型成功
return {
"success": True,
"recommended_indicator": self._extract_indicator_from_text(online_analysis),
"reasoning": online_analysis
}
else:
# 联网模型失败
return {
"success": False,
"error": online_result.get("error", "分析失败")
}
except Exception as e:
logger.error(f"整合结果时出错: {str(e)}")
return {
"success": False,
"error": str(e)
}
def _extract_indicator_from_text(self, text: str) -> Optional[str]:
"""从文本中提取推荐的估值指标"""
try:
import re
# 查找推荐估值指标
patterns = [
r'推荐估值指标[:]\s*(PE|PB)',
r'应该使用\s*(PE|PB)\s*估值',
r'选择\s*(PE|PB)\s*作为',
r'优先选择\s*(PE|PB)'
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1).upper()
return None
except Exception as e:
logger.error(f"提取估值指标时出错: {str(e)}")
return None
def _extract_reasoning_from_text(self, text: str) -> Dict[str, str]:
"""从文本中提取推理过程"""
try:
reasoning = {
"industry_analysis": "",
"business_model_analysis": "",
"financial_analysis": "",
"decision_basis": ""
}
# 简单的文本提取逻辑
lines = text.split('\n')
current_section = None
for line in lines:
line = line.strip()
if not line:
continue
if '行业特征分析' in line or '行业分析' in line:
current_section = 'industry_analysis'
elif '商业模式分析' in line or '业务模式分析' in line:
current_section = 'business_model_analysis'
elif '财务特征分析' in line or '财务分析' in line:
current_section = 'financial_analysis'
elif '决策依据' in line or '选择依据' in line:
current_section = 'decision_basis'
elif current_section and line:
reasoning[current_section] += line + " "
# 清理空白内容
for key in reasoning:
reasoning[key] = reasoning[key].strip()
return reasoning
except Exception as e:
logger.error(f"提取推理过程时出错: {str(e)}")
return {
"industry_analysis": "",
"business_model_analysis": "",
"financial_analysis": "",
"decision_basis": ""
}
def _extract_valuation_range_from_text(self, text: str) -> Dict[str, Any]:
"""从文本中提取估值区间"""
try:
import re
# 查找PE或PB的估值区间
pe_pattern = r'PE.*?(\d+(?:\.\d+)?)[-~](\d+(?:\.\d+)?)'
pb_pattern = r'PB.*?(\d+(?:\.\d+)?)[-~](\d+(?:\.\d+)?)'
pe_match = re.search(pe_pattern, text, re.IGNORECASE)
pb_match = re.search(pb_pattern, text, re.IGNORECASE)
if pe_match:
return {
"type": "PE",
"min_value": float(pe_match.group(1)),
"max_value": float(pe_match.group(2)),
"unit": ""
}
elif pb_match:
return {
"type": "PB",
"min_value": float(pb_match.group(1)),
"max_value": float(pb_match.group(2)),
"unit": ""
}
else:
return {
"type": None,
"min_value": None,
"max_value": None,
"unit": ""
}
except Exception as e:
logger.error(f"提取估值区间时出错: {str(e)}")
return {
"type": None,
"min_value": None,
"max_value": None,
"unit": ""
}
def test_valuation_analyzer():
"""测试估值指标分析器"""
try:
analyzer = ValuationIndicatorAnalyzer()
# 测试用例
test_cases = [
("000001", "平安银行"), # 金融业应该推荐PB
("000002", "万科A"), # 房地产应该推荐PB
("000858", "五粮液"), # 消费品应该推荐PE
("002415", "海康威视"), # 科技股应该推荐PE
]
for stock_code, stock_name in test_cases:
print(f"\n测试股票: {stock_name}({stock_code})")
result = analyzer.analyze_valuation_indicator(stock_code, stock_name)
if result.get("success"):
print(f"推荐指标: {result.get('recommended_indicator')}")
print(f"推理过程: {result.get('reasoning')}")
print(f"估值区间: {result.get('valuation_range')}")
else:
print(f"分析失败: {result.get('error')}")
except Exception as e:
print(f"测试失败: {str(e)}")
if __name__ == "__main__":
test_valuation_analyzer()