stock_fundamentals/src/app.py

2881 lines
113 KiB
Python
Raw Normal View History

2025-04-02 13:52:34 +08:00
import sys
import os
2025-06-05 10:42:14 +08:00
from datetime import datetime, timedelta
2025-05-06 15:13:15 +08:00
import pandas as pd
import uuid
import json
from threading import Thread
2025-06-05 10:42:14 +08:00
from sqlalchemy import text
from src.fundamentals_llm.fundamental_analysis_database import get_db
2025-04-02 17:38:26 +08:00
2025-04-02 13:52:34 +08:00
# 添加项目根目录到 Python 路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
2025-05-14 08:54:56 +08:00
from flask import Flask, jsonify, request, send_from_directory, render_template
2025-04-02 13:52:34 +08:00
from flask_cors import CORS
import logging
# 导入企业筛选器
from src.fundamentals_llm.enterprise_screener import EnterpriseScreener
2025-05-06 15:13:15 +08:00
# 导入股票回测器
2025-06-05 10:42:14 +08:00
from src.stock_analysis_v2 import run_backtest
2025-05-06 15:13:15 +08:00
2025-05-14 08:54:56 +08:00
# 导入PE/PB估值分析器
from src.valuation_analysis.pe_pb_analysis import ValuationAnalyzer
# 导入行业估值分析器
from src.valuation_analysis.industry_analysis import IndustryAnalyzer
2025-05-14 16:52:24 +08:00
# 导入沪深港通监控器
from src.valuation_analysis.hsgt_monitor import HSGTMonitor
2025-05-19 17:02:52 +08:00
# 导入融资融券数据采集器
from src.valuation_analysis.eastmoney_rzrq_collector import EastmoneyRzrqCollector
# 导入恐贪指数管理器
from src.valuation_analysis.fear_greed_index import FearGreedIndexManager
# 导入指数分析器
from src.valuation_analysis.index_analyzer import IndexAnalyzer
# 导入股票日线数据采集器
from src.scripts.stock_daily_data_collector import collect_stock_daily_data
2025-05-21 14:01:54 +08:00
from valuation_analysis.financial_analysis import FinancialAnalyzer
from src.valuation_analysis.stock_price_collector import StockPriceCollector
2025-04-02 13:52:34 +08:00
# 设置日志
logging.basicConfig(
level=logging.INFO,
2025-05-06 15:13:15 +08:00
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # 输出到控制台
logging.FileHandler(f'logs/app_{datetime.now().strftime("%Y%m%d")}.log', encoding='utf-8') # 输出到文件
]
2025-04-02 13:52:34 +08:00
)
2025-05-06 15:13:15 +08:00
# 确保logs和results目录存在
os.makedirs('logs', exist_ok=True)
os.makedirs('results', exist_ok=True)
os.makedirs('results/tasks', exist_ok=True)
os.makedirs('static/results', exist_ok=True)
2025-04-02 13:52:34 +08:00
logger = logging.getLogger(__name__)
2025-05-06 15:13:15 +08:00
logger.info("Flask应用启动")
2025-04-02 13:52:34 +08:00
# 创建 Flask 应用
2025-05-06 15:13:15 +08:00
app = Flask(__name__, static_folder='static')
2025-04-02 13:52:34 +08:00
CORS(app) # 启用跨域请求支持
# 创建企业筛选器实例
screener = EnterpriseScreener()
2025-05-14 08:54:56 +08:00
# 创建估值分析器实例
valuation_analyzer = ValuationAnalyzer()
# 创建行业分析器实例
industry_analyzer = IndustryAnalyzer()
2025-04-02 17:38:26 +08:00
2025-05-14 16:52:24 +08:00
# 创建监控器实例
hsgt_monitor = HSGTMonitor()
2025-05-19 17:02:52 +08:00
# 创建融资融券数据采集器实例
em_rzrq_collector = EastmoneyRzrqCollector()
# 创建恐贪指数管理器实例
fear_greed_manager = FearGreedIndexManager()
# 创建指数分析器实例
index_analyzer = IndexAnalyzer()
2025-05-06 15:13:15 +08:00
# 获取项目根目录
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
REPORTS_DIR = os.path.join(ROOT_DIR, 'src', 'reports')
# 确保reports目录存在
os.makedirs(REPORTS_DIR, exist_ok=True)
logger.info(f"报告目录路径: {REPORTS_DIR}")
# 存储回测任务状态的字典
backtest_tasks = {}
2025-05-19 17:02:52 +08:00
# 融资融券数据采集任务列表
rzrq_tasks = {}
2025-05-06 15:13:15 +08:00
def run_backtest_task(task_id, stocks_buy_dates, end_date):
"""
在后台运行回测任务
"""
try:
logger.info(f"开始执行回测任务 {task_id}")
# 更新任务状态为进行中
backtest_tasks[task_id]['status'] = 'running'
# 运行回测
results, stats_list = run_backtest(stocks_buy_dates, end_date)
# 如果回测成功
if results and stats_list:
# 计算总体统计
stats_df = pd.DataFrame(stats_list)
total_profit = stats_df['final_profit'].sum()
avg_win_rate = stats_df['win_rate'].mean()
avg_holding_days = stats_df['avg_holding_days'].mean()
# 找出最佳止盈比例(假设为0.15,实际应从回测结果中分析得出)
best_take_profit_pct = 0.15
# 获取图表URL
chart_urls = {
"all_stocks": f"/static/results/{task_id}/all_stocks_analysis.png",
"profit_matrix": f"/static/results/{task_id}/profit_matrix_analysis.png"
}
# 保存股票详细统计
stock_stats = []
for stat in stats_list:
stock_stats.append({
"symbol": stat['symbol'],
"total_trades": int(stat['total_trades']),
"profitable_trades": int(stat['profitable_trades']),
"loss_trades": int(stat['loss_trades']),
"win_rate": float(stat['win_rate']),
"avg_holding_days": float(stat['avg_holding_days']),
"final_profit": float(stat['final_profit']),
"entry_count": int(stat['entry_count'])
})
# 构建结果数据
result_data = {
"task_id": task_id,
"status": "completed",
"results": {
"total_profit": float(total_profit),
"win_rate": float(avg_win_rate),
"avg_holding_days": float(avg_holding_days),
"best_take_profit_pct": best_take_profit_pct,
"stock_stats": stock_stats,
"chart_urls": chart_urls
}
}
# 保存结果到文件
task_result_path = os.path.join('results', 'tasks', f"{task_id}.json")
with open(task_result_path, 'w', encoding='utf-8') as f:
json.dump(result_data, f, ensure_ascii=False, indent=2)
# 更新任务状态为已完成
backtest_tasks[task_id].update({
'status': 'completed',
'results': result_data['results']
})
logger.info(f"回测任务 {task_id} 已完成")
else:
# 更新任务状态为失败
backtest_tasks[task_id]['status'] = 'failed'
backtest_tasks[task_id]['error'] = "回测未产生有效结果"
logger.error(f"回测任务 {task_id} 失败:回测未产生有效结果")
except Exception as e:
# 更新任务状态为失败
backtest_tasks[task_id]['status'] = 'failed'
backtest_tasks[task_id]['error'] = str(e)
logger.error(f"回测任务 {task_id} 失败:{str(e)}")
2025-06-05 10:42:14 +08:00
@app.route('/scheduler/stockRealtimePrice/collection', methods=['GET'])
def update_stock_realtime_price():
"""更新实时股价数据 周内的9点半、10点半、11点半、2点、3点各更新一次"""
2025-05-21 14:01:54 +08:00
try:
2025-06-05 10:42:14 +08:00
collector = StockPriceCollector()
collector.update_latest_data()
2025-05-19 17:02:52 +08:00
except Exception as e:
2025-06-05 10:42:14 +08:00
logger.error(f"更新实时股价数据失败: {e}")
return jsonify({
"status": "success"
}), 200
2025-05-19 17:02:52 +08:00
2025-06-05 10:42:14 +08:00
@app.route('/scheduler/stockDaily/collection', methods=['GET'])
def run_stock_daily_collection1():
"""执行股票日线数据采集任务 下午3点四十开始"""
2025-05-19 17:02:52 +08:00
try:
logger.info("开始执行股票日线数据采集")
# 获取当天日期
today = datetime.now().strftime('%Y-%m-%d')
2025-06-05 10:42:14 +08:00
2025-05-19 17:02:52 +08:00
# 定义数据库连接地址
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
2025-06-05 10:42:14 +08:00
collect_stock_daily_data(db_url, today)
2025-05-19 17:02:52 +08:00
except Exception as e:
logger.error(f"启动股票日线数据采集任务失败: {str(e)}")
2025-06-05 10:42:14 +08:00
return jsonify({
"status": "success"
}), 200
2025-05-19 17:02:52 +08:00
2025-06-05 10:42:14 +08:00
@app.route('/scheduler/rzrq/collection', methods=['GET'])
def run_rzrq_initial_collection1():
"""执行融资融券数据更新采集 下午7点开始"""
2025-05-19 17:02:52 +08:00
try:
2025-06-05 10:42:14 +08:00
# 执行采集
em_rzrq_collector.initial_data_collection()
except Exception as e:
logger.error(f"启动融资融券数据更新任务失败: {str(e)}")
return jsonify({
"status": "success"
}), 200
@app.route('/scheduler/industry/crowding', methods=['GET'])
def precalculate_industry_crowding1():
"""预计算部分行业和概念板块的拥挤度指标 晚上10点开始"""
try:
from src.valuation_analysis.industry_analysis import IndustryAnalyzer
analyzer = IndustryAnalyzer()
# 固定行业和概念板块
industries = ["IT设备", "消费电子", "半导体", "军工电子", "专用设备", "乘用车", "产业互联网", "元器件", "光学光电", "医疗器械", "医疗服务", "汽车零部件", "航天装备", "自动化设备"]
concepts = ["先进封装", "芯片", "消费电子概念", "机器人概念"]
# 计算行业拥挤度
for industry in industries:
2025-05-19 17:02:52 +08:00
try:
2025-06-05 10:42:14 +08:00
analyzer.get_industry_crowding_index(industry, use_cache=False)
2025-05-19 17:02:52 +08:00
except Exception as e:
2025-06-05 10:42:14 +08:00
logger.error(f"预计算行业 {industry} 的拥挤度指标时出错: {str(e)}")
continue
# 计算概念板块拥挤度
for concept in concepts:
try:
analyzer.get_industry_crowding_index(concept, use_cache=False, is_concept=True)
except Exception as e:
logger.error(f"预计算概念板块 {concept} 的拥挤度指标时出错: {str(e)}")
continue
logger.info("指定行业和概念板块的拥挤度指标预计算完成")
2025-05-19 17:02:52 +08:00
except Exception as e:
2025-06-05 10:42:14 +08:00
logger.error(f"预计算行业拥挤度指标失败: {str(e)}")
return jsonify({
"status": "success"
}), 200
@app.route('/scheduler/financial/analysis', methods=['GET'])
def scheduler_financial_analysis():
"""预计算所有股票的财务分析数据 早晚各一次"""
try:
from src.valuation_analysis.financial_analysis import FinancialAnalyzer
analyzer = FinancialAnalyzer()
analyzer.analyze_financial_data('601021.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('601021.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('601021.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('600483.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('600483.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('600483.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('688596.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('688596.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('688596.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('002747.SZ', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('002747.SZ', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('002747.SZ', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('688012.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('688012.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('688012.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('603658.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('603658.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('603658.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('002409.SZ', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('002409.SZ', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('002409.SZ', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('600584.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('600584.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('600584.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('603055.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('603055.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('603055.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('601138.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('601138.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('601138.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('603659.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('603659.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('603659.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('688072.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('688072.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('688072.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('688008.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('688008.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('688008.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('300661.SZ', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('300661.SZ', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('300661.SZ', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('603986.SH', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('603986.SH', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('603986.SH', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
analyzer.analyze_financial_data('000733.SZ', current_year = '2024-12-31', previous_year = '2023-12-31', force_update=True)
analyzer.analyze_financial_data('000733.SZ', current_year = '2023-12-31', previous_year = '2022-12-31', force_update=True)
analyzer.analyze_financial_data('000733.SZ', current_year = '2022-12-31', previous_year = '2021-12-31', force_update=True)
except Exception as e:
logger.error(f"预计算所有股票的财务分析数据失败: {str(e)}")
return jsonify({
"status": "success"
}), 200
2025-05-19 17:02:52 +08:00
2025-05-14 08:54:56 +08:00
@app.route('/')
def index():
"""渲染主页"""
return render_template('index.html')
2025-05-06 15:13:15 +08:00
@app.route('/api/backtest/run', methods=['POST'])
def start_backtest():
"""启动回测任务
请求体格式:
{
"stocks_buy_dates": {
"SH600522": ["2022-05-10", "2022-06-10"], // 股票代码: [买入日期列表]
"SZ002340": ["2022-06-15"],
"SH601615": ["2022-07-20", "2022-08-01"]
},
"end_date": "2022-10-20" // 所有股票共同的结束日期
}
返回内容:
{
"task_id": "backtask-khkerhy4u237y489237489truiy8432"
}
"""
try:
# 从请求体获取参数
data = request.get_json()
if not data:
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供JSON数据"
}), 400
stocks_buy_dates = data.get('stocks_buy_dates')
end_date = data.get('end_date')
if not stocks_buy_dates or not isinstance(stocks_buy_dates, dict):
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供stocks_buy_dates字典"
}), 400
if not end_date or not isinstance(end_date, str):
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供有效的end_date"
}), 400
# 验证日期格式
try:
datetime.strptime(end_date, '%Y-%m-%d')
for stock_code, buy_dates in stocks_buy_dates.items():
if not isinstance(buy_dates, list):
return jsonify({
"status": "error",
"message": f"请求格式错误: 股票 {stock_code} 的买入日期必须是列表"
}), 400
for buy_date in buy_dates:
datetime.strptime(buy_date, '%Y-%m-%d')
except ValueError as e:
return jsonify({
"status": "error",
"message": f"日期格式错误: {str(e)}"
}), 400
# 生成任务ID
task_id = f"backtask-{uuid.uuid4().hex[:16]}"
# 创建任务目录
task_dir = os.path.join('static', 'results', task_id)
os.makedirs(task_dir, exist_ok=True)
# 记录任务信息
backtest_tasks[task_id] = {
'status': 'pending',
'created_at': datetime.now().isoformat(),
'stocks_buy_dates': stocks_buy_dates,
'end_date': end_date
}
# 创建线程运行回测
thread = Thread(target=run_backtest_task, args=(task_id, stocks_buy_dates, end_date))
thread.daemon = True
thread.start()
logger.info(f"已创建回测任务: {task_id}")
return jsonify({
"task_id": task_id
})
except Exception as e:
logger.error(f"创建回测任务失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"创建回测任务失败: {str(e)}"
}), 500
@app.route('/api/backtest/status', methods=['GET'])
def check_backtest_status():
"""查询回测任务状态
参数:
- task_id: 回测任务ID
返回内容:
{
"task_id": "backtask-khkerhy4u237y489237489truiy8432",
"status": "running" | "completed" | "failed",
"created_at": "2023-12-01T10:30:45",
"error": "错误信息(如有)"
}
"""
try:
task_id = request.args.get('task_id')
if not task_id:
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供task_id参数"
}), 400
# 检查任务是否存在
if task_id not in backtest_tasks:
return jsonify({
"status": "error",
"message": f"任务不存在: {task_id}"
}), 404
# 获取任务信息
task_info = backtest_tasks[task_id]
# 构建响应
response = {
"task_id": task_id,
"status": task_info['status'],
"created_at": task_info['created_at']
}
# 如果任务失败,添加错误信息
if task_info['status'] == 'failed' and 'error' in task_info:
response['error'] = task_info['error']
return jsonify(response)
except Exception as e:
logger.error(f"查询任务状态失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"查询任务状态失败: {str(e)}"
}), 500
@app.route('/api/backtest/result', methods=['GET'])
def get_backtest_result():
"""获取回测任务结果
参数:
- task_id: 回测任务ID
返回内容:
{
"task_id": "backtask-2023121-001",
"status": "completed",
"results": {
"total_profit": 123456.78,
"win_rate": 75.5,
"avg_holding_days": 12.3,
"best_take_profit_pct": 0.15,
"stock_stats": [...],
"chart_urls": {
"all_stocks": "/static/results/backtask-2023121-001/all_stocks_analysis.png",
"profit_matrix": "/static/results/backtask-2023121-001/profit_matrix_analysis.png"
}
}
}
"""
try:
task_id = request.args.get('task_id')
if not task_id:
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供task_id参数"
}), 400
# 检查任务是否存在
if task_id not in backtest_tasks:
# 尝试从文件加载
task_result_path = os.path.join('results', 'tasks', f"{task_id}.json")
if os.path.exists(task_result_path):
with open(task_result_path, 'r', encoding='utf-8') as f:
result_data = json.load(f)
return jsonify(result_data)
else:
return jsonify({
"status": "error",
"message": f"任务不存在: {task_id}"
}), 404
# 获取任务信息
task_info = backtest_tasks[task_id]
# 检查任务是否完成
if task_info['status'] != 'completed':
return jsonify({
"status": "error",
"message": f"任务尚未完成或已失败: {task_info['status']}"
}), 400
# 构建响应
response = {
"task_id": task_id,
"status": "completed",
"results": task_info['results']
}
return jsonify(response)
except Exception as e:
logger.error(f"获取任务结果失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"获取任务结果失败: {str(e)}"
}), 500
@app.route('/api/reports/<path:filename>')
def serve_report(filename):
"""提供PDF报告的访问"""
try:
logger.info(f"请求文件: {filename}")
logger.info(f"从目录: {REPORTS_DIR} 提供文件")
return send_from_directory(REPORTS_DIR, filename, as_attachment=True)
except Exception as e:
logger.error(f"提供报告文件失败: {str(e)}")
return jsonify({"error": "文件不存在"}), 404
2025-04-02 13:52:34 +08:00
@app.route('/api/health', methods=['GET'])
def health_check():
"""健康检查接口"""
return jsonify({"status": "ok", "message": "Service is running"})
@app.route('/api/stock_profiles', methods=['GET'])
def get_stock_profiles():
"""获取企业画像筛选结果列表"""
return jsonify({
"profiles": [
{"id": 1, "name": "高成长潜力企业", "method": "high_growth"},
{"id": 2, "name": "稳定型龙头企业", "method": "stable_leaders"},
{"id": 3, "name": "短期投资机会", "method": "short_term"},
{"id": 4, "name": "价值型投资标的", "method": "value_investment"},
{"id": 5, "name": "困境反转机会", "method": "turnaround"},
{"id": 6, "name": "风险规避标的", "method": "risk_averse"},
{"id": 7, "name": "创新驱动型企业", "method": "innovation_driven"},
{"id": 8, "name": "行业整合机会", "method": "industry_integration"},
{"id": 9, "name": "推荐投资企业", "method": "recommended"}
]
})
@app.route('/api/screen/<profile_type>', methods=['GET'])
def screen_stocks(profile_type):
"""根据企业画像类型筛选股票
Args:
profile_type: 企业画像类型 (method 字段值)
"""
try:
# 解析limit参数
limit = request.args.get('limit', None, type=int)
# 处理别名
if profile_type == 'innovation_driven':
profile_type = 'innovation'
elif profile_type == 'industry_integration':
profile_type = 'integration'
# 映射画像类型到筛选方法
method_map = {
'high_growth': screener.screen_high_growth_stocks,
'stable_leaders': screener.screen_stable_leaders,
'short_term': screener.screen_short_term_opportunities,
'value_investment': screener.screen_value_investment_stocks,
'turnaround': screener.screen_turnaround_opportunities,
'risk_averse': screener.screen_risk_averse_stocks,
'innovation': screener.screen_innovation_driven_stocks,
'integration': screener.screen_industry_integration_stocks,
'recommended': screener.screen_multi_term_investment_stocks
}
# 检查请求的画像类型是否有效
if profile_type not in method_map:
return jsonify({
"status": "error",
"message": f"Invalid profile type: {profile_type}"
}), 400
# 调用相应的筛选方法
stocks = method_map[profile_type](limit=limit)
# 转换结果格式
result = []
for code, name in stocks:
result.append({
"code": code,
"name": name
})
return jsonify({
"status": "success",
"profile_type": profile_type,
"count": len(result),
"stocks": result
})
except Exception as e:
logger.error(f"筛选股票失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"Failed to screen stocks: {str(e)}"
}), 500
# 条件筛选接口,接受自定义的筛选条件
@app.route('/api/screen/custom', methods=['POST'])
def screen_custom():
"""使用自定义条件筛选股票"""
try:
# 从请求体获取筛选条件
data = request.get_json()
if not data or 'conditions' not in data:
return jsonify({
"status": "error",
"message": "Missing conditions in request body"
}), 400
# 调用通用筛选方法
stocks = screener._screen_stocks_by_conditions(data['conditions'])
# 转换结果格式
result = []
for code, name in stocks:
result.append({
"code": code,
"name": name
})
return jsonify({
"status": "success",
"count": len(result),
"stocks": result
})
except Exception as e:
logger.error(f"自定义筛选股票失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"Failed to screen stocks with custom conditions: {str(e)}"
}), 500
@app.route('/api/generate_reports', methods=['POST'])
def generate_reports():
"""为指定的股票列表生成PDF投资报告
请求体格式:
{
"stocks": [
["603690", "至纯科技"],
["688053", "思科瑞"],
["300750", "宁德时代"]
]
}
"""
try:
# 从请求体获取股票列表
data = request.get_json()
if not data or 'stocks' not in data or not isinstance(data['stocks'], list):
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供股票列表"
}), 400
# 检查股票列表格式
stocks = data['stocks']
if not all(isinstance(item, list) and len(item) == 2 for item in stocks):
return jsonify({
"status": "error",
"message": "股票列表格式错误: 每个股票应该是 [代码, 名称] 格式"
}), 400
# 导入 PDF 生成器模块
try:
2025-05-06 15:13:15 +08:00
from src.fundamentals_llm.pdf_generator import PDFGenerator
2025-04-02 13:52:34 +08:00
except ImportError:
try:
2025-05-06 15:13:15 +08:00
from fundamentals_llm.pdf_generator import PDFGenerator
2025-04-02 13:52:34 +08:00
except ImportError as e:
logger.error(f"无法导入 PDF 生成器模块: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: PDF 生成器模块不可用, 错误详情: {str(e)}"
}), 500
# 生成报告
generated_reports = []
for stock_code, stock_name in stocks:
try:
2025-05-06 15:13:15 +08:00
# 创建 PDF 生成器实例
generator = PDFGenerator()
2025-04-02 13:52:34 +08:00
# 调用 PDF 生成器
2025-05-06 15:13:15 +08:00
report_path = generator.generate_pdf(
title=f"{stock_name}({stock_code}) 基本面分析报告",
content_dict={}, # 这里需要传入实际的内容字典
filename=f"{stock_name}_{stock_code}_analysis.pdf"
)
2025-04-02 13:52:34 +08:00
generated_reports.append({
"code": stock_code,
"name": stock_name,
"report_path": report_path,
"status": "success"
})
logger.info(f"成功生成 {stock_name}({stock_code}) 的投资报告: {report_path}")
except Exception as e:
logger.error(f"生成 {stock_name}({stock_code}) 的投资报告失败: {str(e)}")
generated_reports.append({
"code": stock_code,
"name": stock_name,
"status": "error",
"error": str(e)
})
# 返回结果
return jsonify({
"status": "success",
"message": f"处理了 {len(stocks)} 个股票的报告生成请求",
"reports": generated_reports
})
except Exception as e:
logger.error(f"处理报告生成请求失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"Failed to process report generation request: {str(e)}"
}), 500
@app.route('/api/recommended_stocks', methods=['GET'])
def recommended_stocks():
"""获取系统推荐的投资股票
该接口会返回系统基于基本面分析推荐的股票列表具有较高投资价值
可以通过以下参数进行过滤:
- profile_type: 企业画像类型例如 'high_growth', 'stable_leaders'
- limit: 限制返回的股票数量
"""
try:
# 解析请求参数
profile_type = request.args.get('profile_type', 'high_growth')
limit = request.args.get('limit', 10, type=int)
# 导入筛选器模块
try:
from src.fundamentals_llm.enterprise_screener import EnterpriseScreener
except ImportError:
try:
from fundamentals_llm.enterprise_screener import EnterpriseScreener
except ImportError as e:
logger.error(f"无法导入企业筛选器模块: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: 企业筛选器模块不可用, 错误详情: {str(e)}"
}), 500
# 创建筛选器实例
screener = EnterpriseScreener()
# 根据类型获取推荐股票
try:
if profile_type == 'recommended':
stocks = screener.screen_multi_term_investment_stocks(limit=limit)
elif profile_type == 'high_growth':
stocks = screener.screen_high_growth_stocks(limit=limit)
elif profile_type == 'stable_leaders':
stocks = screener.screen_stable_leaders(limit=limit)
elif profile_type == 'short_term':
stocks = screener.screen_short_term_opportunities(limit=limit)
elif profile_type == 'value_investment':
stocks = screener.screen_value_investment_stocks(limit=limit)
elif profile_type == 'turnaround':
stocks = screener.screen_turnaround_opportunities(limit=limit)
elif profile_type == 'risk_averse':
stocks = screener.screen_risk_averse_stocks(limit=limit)
elif profile_type == 'innovation' or profile_type == 'innovation_driven':
stocks = screener.screen_innovation_driven_stocks(limit=limit)
elif profile_type == 'integration' or profile_type == 'industry_integration':
stocks = screener.screen_industry_integration_stocks(limit=limit)
elif profile_type == 'custom' and 'conditions' in request.args:
# 解析自定义条件
try:
import json
conditions = json.loads(request.args.get('conditions'))
stocks = screener.screen_by_custom_conditions(conditions, limit=limit)
except Exception as e:
return jsonify({
"status": "error",
"message": f"解析自定义条件失败: {str(e)}"
}), 400
else:
return jsonify({
"status": "error",
"message": f"不支持的企业画像类型: {profile_type}"
}), 400
except Exception as e:
logger.error(f"筛选股票时出错: {str(e)}")
return jsonify({
"status": "error",
"message": f"筛选股票时出错: {str(e)}"
}), 500
# 格式化返回结果
formatted_stocks = []
for stock_code, stock_name in stocks:
formatted_stocks.append({
"code": stock_code,
"name": stock_name
})
# 返回结果
return jsonify({
"status": "success",
"profile_type": profile_type,
"count": len(formatted_stocks),
"stocks": formatted_stocks
})
except Exception as e:
logger.error(f"获取推荐股票失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"获取推荐股票失败: {str(e)}"
}), 500
@app.route('/api/analyze_and_recommend', methods=['POST'])
def analyze_and_recommend():
"""分析指定企业列表,生成投资建议,并筛选推荐投资的企业
请求体格式:
{
"stocks": [
["603690", "至纯科技"],
["688053", "思科瑞"],
["300750", "宁德时代"]
],
"limit": 10 // 可选限制返回推荐股票的数量
}
"""
try:
# 从请求体获取股票列表
data = request.get_json()
if not data or 'stocks' not in data or not isinstance(data['stocks'], list):
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供股票列表"
}), 400
# 检查股票列表格式
stocks = data['stocks']
if not all(isinstance(item, list) and len(item) == 2 for item in stocks):
return jsonify({
"status": "error",
"message": "股票列表格式错误: 每个股票应该是 [代码, 名称] 格式"
}), 400
# 解析limit参数
limit = data.get('limit', 10)
if not isinstance(limit, int) or limit <= 0:
limit = 10
# 导入必要的聊天机器人模块
try:
# 首先尝试导入聊天机器人模块
try:
from src.fundamentals_llm.chat_bot import ChatBot as OnlineChatBot
logger.info("成功从 src.fundamentals_llm.chat_bot 导入 ChatBot")
except ImportError as e1:
try:
from fundamentals_llm.chat_bot import ChatBot as OnlineChatBot
logger.info("成功从 fundamentals_llm.chat_bot 导入 ChatBot")
except ImportError as e2:
logger.error(f"无法导入在线聊天机器人模块: {str(e1)}, {str(e2)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: 聊天机器人模块不可用,错误详情: {str(e2)}"
}), 500
# 然后尝试导入离线聊天机器人模块
try:
from src.fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot
logger.info("成功从 src.fundamentals_llm.chat_bot_with_offline 导入 ChatBot")
except ImportError as e1:
try:
from fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot
logger.info("成功从 fundamentals_llm.chat_bot_with_offline 导入 ChatBot")
except ImportError as e2:
logger.warning(f"无法导入离线聊天机器人模块: {str(e1)}, {str(e2)}")
# 这里可以继续执行,因为某些功能可能不需要离线模型
# 最后导入基本面分析器
try:
from src.fundamentals_llm.fundamental_analysis import FundamentalAnalyzer
logger.info("成功从 src.fundamentals_llm.fundamental_analysis 导入 FundamentalAnalyzer")
except ImportError as e1:
try:
from fundamentals_llm.fundamental_analysis import FundamentalAnalyzer
logger.info("成功从 fundamentals_llm.fundamental_analysis 导入 FundamentalAnalyzer")
except ImportError as e2:
logger.error(f"无法导入基本面分析模块: {str(e1)}, {str(e2)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: 基本面分析模块不可用,错误详情: {str(e2)}"
}), 500
except Exception as e:
logger.error(f"导入必要模块时出错: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: 导入必要模块时出错,错误详情: {str(e)}"
}), 500
# 创建基本面分析器实例
analyzer = FundamentalAnalyzer()
# 为每个股票生成投资建议
investment_advices = []
for stock_code, stock_name in stocks:
try:
# 生成投资建议
success, advice, reasoning, references = analyzer.query_analysis(
stock_code, stock_name, "investment_advice"
)
if success:
investment_advices.append({
"code": stock_code,
"name": stock_name,
"advice": advice,
"reasoning": reasoning,
"references": references,
"status": "success"
})
logger.info(f"成功生成 {stock_name}({stock_code}) 的投资建议")
else:
investment_advices.append({
"code": stock_code,
"name": stock_name,
"status": "error",
"error": advice
})
logger.error(f"生成 {stock_name}({stock_code}) 的投资建议失败: {advice}")
except Exception as e:
logger.error(f"处理 {stock_name}({stock_code}) 时出错: {str(e)}")
investment_advices.append({
"code": stock_code,
"name": stock_name,
"status": "error",
"error": str(e)
})
# 导入企业筛选器
try:
from src.fundamentals_llm.enterprise_screener import EnterpriseScreener
except ImportError:
try:
from fundamentals_llm.enterprise_screener import EnterpriseScreener
except ImportError as e:
logger.error(f"无法导入企业筛选器模块: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: 企业筛选器模块不可用,错误详情: {str(e)}"
}), 500
# 创建筛选器实例
screener = EnterpriseScreener()
# 筛选符合推荐投资条件的股票
# 获取传入的所有股票代码
input_stock_codes = set(code for code, _ in stocks)
# 获取所有符合推荐投资条件的股票
recommended_stocks = screener.screen_multi_term_investment_stocks(limit=limit)
# 筛选出传入列表中符合推荐投资条件的股票
recommended_input_stocks = []
for code, name in recommended_stocks:
if code in input_stock_codes:
recommended_input_stocks.append({
"code": code,
"name": name
})
# 返回结果
return jsonify({
"status": "success",
"total_input_stocks": len(stocks),
"investment_advices": investment_advices,
"recommended_stocks": {
"count": len(recommended_input_stocks),
"stocks": recommended_input_stocks
}
})
except Exception as e:
logger.error(f"分析和推荐股票失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"分析和推荐股票失败: {str(e)}"
}), 500
@app.route('/api/comprehensive_analysis', methods=['POST'])
def comprehensive_analysis():
2025-05-06 15:13:15 +08:00
"""综合分析接口 - 使用队列方式处理被锁定的股票
2025-04-02 13:52:34 +08:00
请求体格式:
{
"stocks": [
["603690", "至纯科技"],
["688053", "思科瑞"],
["300750", "宁德时代"]
],
"generate_pdf": true,
"limit": 10, // 可选限制返回推荐股票的数量
"profile_filter": {
"type": "custom", // 可选值: 预定义的画像类型或 "custom"
"profile_name": "high_growth", // type = 预定义画像类型时使用
"conditions": [ // type = "custom" 时使用
{
"dimension": "financial_report",
"field": "financial_report_level",
"operator": ">=",
"value": 1
},
// 其他条件...
]
}
}
"""
try:
# 从请求体获取参数
data = request.get_json()
if not data or 'stocks' not in data or not isinstance(data['stocks'], list):
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供股票列表"
}), 400
# 检查股票列表格式
stocks = data['stocks']
if not all(isinstance(item, list) and len(item) == 2 for item in stocks):
return jsonify({
"status": "error",
"message": "股票列表格式错误: 每个股票应该是 [代码, 名称] 格式"
}), 400
# 解析其他参数
generate_pdf = data.get('generate_pdf', False)
profile_filter = data.get('profile_filter', None)
limit = data.get('limit', 10)
if not isinstance(limit, int) or limit <= 0:
limit = 10
2025-05-06 15:13:15 +08:00
# 导入必要的模块
2025-04-02 13:52:34 +08:00
try:
2025-05-06 15:13:15 +08:00
# 先导入基本面分析器
2025-04-02 13:52:34 +08:00
try:
from src.fundamentals_llm.fundamental_analysis import FundamentalAnalyzer
logger.info("成功从 src.fundamentals_llm.fundamental_analysis 导入 FundamentalAnalyzer")
except ImportError as e1:
try:
from fundamentals_llm.fundamental_analysis import FundamentalAnalyzer
logger.info("成功从 fundamentals_llm.fundamental_analysis 导入 FundamentalAnalyzer")
except ImportError as e2:
logger.error(f"无法导入基本面分析模块: {str(e1)}, {str(e2)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: 基本面分析模块不可用,错误详情: {str(e2)}"
}), 500
2025-05-06 15:13:15 +08:00
# 再导入其他可能需要的模块
try:
from src.fundamentals_llm.chat_bot import ChatBot as OnlineChatBot
from src.fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot
except ImportError:
try:
from fundamentals_llm.chat_bot import ChatBot as OnlineChatBot
from fundamentals_llm.chat_bot_with_offline import ChatBot as OfflineChatBot
except ImportError:
# 这些模块不是必须的,所以继续执行
logger.warning("无法导入聊天机器人模块,但这不会影响基本功能")
2025-04-02 13:52:34 +08:00
except Exception as e:
logger.error(f"导入必要模块时出错: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: 导入必要模块时出错,错误详情: {str(e)}"
}), 500
# 创建基本面分析器实例
analyzer = FundamentalAnalyzer()
2025-05-06 15:13:15 +08:00
# 准备结果容器
investment_advices = {} # 使用字典,股票代码作为键
processing_queue = list(stocks) # 初始处理队列
max_attempts = 5 # 最大重试次数
total_attempts = 0
# 导入数据库模块
from src.fundamentals_llm.fundamental_analysis_database import get_analysis_result, get_db
# 开始处理队列
while processing_queue and total_attempts < max_attempts:
total_attempts += 1
logger.info(f"开始第 {total_attempts} 轮处理,队列中有 {len(processing_queue)} 只股票")
# 暂存下一轮需要处理的股票
next_round_queue = []
# 处理当前队列中的所有股票
for stock_code, stock_name in processing_queue:
try:
# 检查是否被锁定
if analyzer.is_stock_locked(stock_code, "investment_advice"):
# 已被锁定,放到下一轮队列
next_round_queue.append([stock_code, stock_name])
# 记录状态
if stock_code not in investment_advices:
investment_advices[stock_code] = {
"code": stock_code,
"name": stock_name,
"status": "pending",
"message": f"股票 {stock_code} 正在被其他请求分析中,已加入等待队列"
}
logger.info(f"股票 {stock_name}({stock_code}) 已被锁定,放入下一轮队列")
continue
# 尝试锁定并分析
analyzer.lock_stock(stock_code, "investment_advice")
try:
# 执行分析
success, advice, reasoning, references = analyzer.query_analysis(
stock_code, stock_name, "investment_advice"
)
# 记录结果
if success:
investment_advices[stock_code] = {
"code": stock_code,
"name": stock_name,
"advice": advice,
"reasoning": reasoning,
"references": references,
"status": "success"
}
logger.info(f"成功分析 {stock_name}({stock_code})")
else:
investment_advices[stock_code] = {
"code": stock_code,
"name": stock_name,
"status": "error",
"error": advice or "分析失败,无详细信息"
}
logger.error(f"分析 {stock_name}({stock_code}) 失败: {advice}")
finally:
# 确保释放锁
analyzer.unlock_stock(stock_code, "investment_advice")
except Exception as e:
# 处理异常
investment_advices[stock_code] = {
2025-04-02 13:52:34 +08:00
"code": stock_code,
"name": stock_name,
"status": "error",
2025-05-06 15:13:15 +08:00
"error": str(e)
}
logger.error(f"处理 {stock_name}({stock_code}) 时出错: {str(e)}")
# 确保释放锁
try:
analyzer.unlock_stock(stock_code, "investment_advice")
except:
pass
# 如果还有下一轮要处理的股票,等待一段时间后继续
if next_round_queue:
logger.info(f"本轮结束,还有 {len(next_round_queue)} 只股票等待下一轮处理")
# 等待30秒再处理下一轮
import time
time.sleep(30)
processing_queue = next_round_queue
else:
# 所有股票都已处理完,退出循环
logger.info("所有股票处理完毕")
processing_queue = []
# 处理仍在队列中的股票(达到最大重试次数但仍未处理的)
for stock_code, stock_name in processing_queue:
if stock_code in investment_advices and investment_advices[stock_code]["status"] == "pending":
investment_advices[stock_code].update({
"status": "timeout",
"message": f"等待超时,股票 {stock_code} 可能正在被长时间分析"
2025-04-02 13:52:34 +08:00
})
2025-05-06 15:13:15 +08:00
logger.warning(f"股票 {stock_name}({stock_code}) 分析超时")
# 将字典转换为列表
investment_advice_list = list(investment_advices.values())
2025-04-02 13:52:34 +08:00
# 生成PDF报告如果需要
pdf_results = []
if generate_pdf:
try:
2025-05-06 15:13:15 +08:00
# 针对已成功分析的股票生成PDF
for stock_info in investment_advice_list:
if stock_info["status"] == "success":
stock_code = stock_info["code"]
stock_name = stock_info["name"]
try:
report_path = analyzer.generate_pdf_report(stock_code, stock_name)
if report_path:
pdf_results.append({
"code": stock_code,
"name": stock_name,
"report_path": report_path,
"status": "success"
})
logger.info(f"成功生成 {stock_name}({stock_code}) 的投资报告: {report_path}")
else:
pdf_results.append({
"code": stock_code,
"name": stock_name,
"status": "error",
"error": "生成报告失败"
})
logger.error(f"生成 {stock_name}({stock_code}) 的投资报告失败")
except Exception as e:
logger.error(f"生成 {stock_name}({stock_code}) 的投资报告失败: {str(e)}")
pdf_results.append({
"code": stock_code,
"name": stock_name,
"status": "error",
"error": str(e)
})
2025-04-02 13:52:34 +08:00
except Exception as e:
logger.error(f"处理PDF生成请求失败: {str(e)}")
pdf_results = []
# 应用企业画像筛选
filtered_stocks = []
if profile_filter:
try:
# 导入企业筛选器
try:
from src.fundamentals_llm.enterprise_screener import EnterpriseScreener
except ImportError:
try:
from fundamentals_llm.enterprise_screener import EnterpriseScreener
except ImportError as e:
logger.error(f"无法导入企业筛选器模块: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器配置错误: 企业筛选器模块不可用,错误详情: {str(e)}"
}), 500
# 创建筛选器实例
screener = EnterpriseScreener()
# 根据筛选类型获取推荐股票
filter_type = profile_filter.get('type', 'custom')
# 如果是自定义条件筛选
if filter_type == 'custom':
conditions = profile_filter.get('conditions', [])
if conditions:
print(f"使用自定义条件筛选: {len(conditions)} 个条件")
all_stocks = screener.screen_by_custom_conditions(conditions, limit=limit)
else:
# 如果没有提供条件使用默认的high_growth类型
print("未提供自定义条件使用默认的high_growth类型")
all_stocks = screener.screen_high_growth_stocks(limit=limit)
# 如果是预定义类型
else:
# 直接使用type作为画像类型如果用户同时提供了profile_name也兼容处理
profile_name = profile_filter.get('profile_name', filter_type)
print(f"使用预定义画像类型: {profile_name}")
if profile_name == 'high_growth':
all_stocks = screener.screen_high_growth_stocks(limit=limit)
elif profile_name == 'stable_leaders':
all_stocks = screener.screen_stable_leaders(limit=limit)
elif profile_name == 'short_term':
all_stocks = screener.screen_short_term_opportunities(limit=limit)
elif profile_name == 'value_investment':
all_stocks = screener.screen_value_investment_stocks(limit=limit)
elif profile_name == 'turnaround':
all_stocks = screener.screen_turnaround_opportunities(limit=limit)
elif profile_name == 'risk_averse':
all_stocks = screener.screen_risk_averse_stocks(limit=limit)
elif profile_name == 'innovation' or profile_name == 'innovation_driven':
all_stocks = screener.screen_innovation_driven_stocks(limit=limit)
elif profile_name == 'integration' or profile_name == 'industry_integration':
all_stocks = screener.screen_industry_integration_stocks(limit=limit)
elif profile_name == 'recommended':
all_stocks = screener.screen_multi_term_investment_stocks(limit=limit)
else:
# 如果类型无效使用默认的high_growth类型
logger.warning(f"无效的画像类型: {profile_name}使用默认的high_growth类型")
all_stocks = screener.screen_high_growth_stocks(limit=limit)
# 获取传入的所有股票代码
input_stock_codes = set(code for code, _ in stocks)
2025-05-14 08:54:56 +08:00
# 查询赛道关联信息
track_query = text("""
SELECT DISTINCT pc.stock_code, ci.belong_industry
FROM gp_product_category pc
JOIN gp_category_industry ci ON pc.category_name = ci.category_name
WHERE pc.stock_code IN :stock_codes
""")
# 获取赛道信息
track_results = {}
try:
# 获取数据库连接
db_session2 = next(get_db())
# 使用 Session 的 execute 方法执行查询
result = db_session2.execute(track_query, {"stock_codes": tuple(input_stock_codes)})
for row in result:
if row.stock_code not in track_results:
track_results[row.stock_code] = []
if row.belong_industry: # 确保 belong_industry 不为空
track_results[row.stock_code].append(row.belong_industry)
except Exception as e:
logger.error(f"查询赛道信息失败: {str(e)}")
track_results = {}
finally:
if 'db_session2' in locals() and db_session2 is not None: # 确保 db_session 已定义
db_session2.close() # <--- 关闭会话
2025-05-23 10:19:48 +08:00
# 获取企业市值信息
market_value_results = {}
try:
from src.valuation_analysis.stock_price_collector import StockPriceCollector
price_collector = StockPriceCollector()
for stock_code in input_stock_codes:
price_data = price_collector.get_stock_price_data(stock_code)
if price_data and 'total_market_value' in price_data:
market_value_results[stock_code] = price_data['total_market_value']
else:
market_value_results[stock_code] = None
except Exception as e:
logger.error(f"获取企业市值信息失败: {str(e)}")
market_value_results = {}
2025-05-14 08:54:56 +08:00
db_session = next(get_db())
2025-04-02 13:52:34 +08:00
# 筛选出传入列表中符合条件的股票
for code, name in all_stocks:
if code in input_stock_codes:
2025-04-02 17:38:26 +08:00
# 获取各个维度的分析结果
2025-05-14 08:54:56 +08:00
investment_advice_result = get_analysis_result(db_session, code, "investment_advice")
industry_competition_result = get_analysis_result(db_session, code, "industry_competition")
financial_report_result = get_analysis_result(db_session, code, "financial_report")
valuation_level_result = get_analysis_result(db_session, code, "valuation_level")
2025-04-02 17:38:26 +08:00
# 从ai_response和extra_info中提取所需的值
investment_advice = investment_advice_result.ai_response if investment_advice_result else None
industry_space = industry_competition_result.extra_info.get("industry_space") if industry_competition_result else 0
financial_report_level = financial_report_result.extra_info.get("financial_report_level") if financial_report_result else 0
pe_industry = valuation_level_result.extra_info.get("pe_industry") if valuation_level_result else 0
2025-04-02 13:52:34 +08:00
filtered_stocks.append({
"code": code,
2025-04-02 17:38:26 +08:00
"name": name,
"investment_advice": investment_advice, # 投资建议从ai_response获取
"industry_space": industry_space, # 行业发展空间2:高速增长, 1:稳定经营, 0:不确定性大, -1:不利经营)
"financial_report_level": financial_report_level, # 经营质量2:优秀, 1:较好, 0:一般, -1:存在隐患,-2较大隐患
2025-05-14 08:54:56 +08:00
"pe_industry": pe_industry, # 个股在行业的PE水平-1:高于行业, 0:接近行业, 1:低于行业)
2025-05-23 10:19:48 +08:00
"tracks": track_results.get(code, []), # 添加赛道信息
"market_value": market_value_results.get(code) # 添加企业市值信息
2025-04-02 13:52:34 +08:00
})
logger.info(f"筛选出 {len(filtered_stocks)} 个符合条件的股票")
except Exception as e:
logger.error(f"应用企业画像筛选失败: {str(e)}")
filtered_stocks = []
2025-05-14 08:54:56 +08:00
finally:
if 'db_session' in locals() and db_session is not None: # 确保 db_session 已定义
db_session.close() # <--- 关闭会话
2025-05-06 15:13:15 +08:00
# 统计各种状态的股票数量
success_count = sum(1 for item in investment_advice_list if item["status"] == "success")
pending_count = sum(1 for item in investment_advice_list if item["status"] == "pending")
timeout_count = sum(1 for item in investment_advice_list if item["status"] == "timeout")
error_count = sum(1 for item in investment_advice_list if item["status"] == "error")
2025-04-02 13:52:34 +08:00
# 返回结果
response = {
2025-05-06 15:13:15 +08:00
"status": "success" if success_count > 0 else "partial_success" if success_count + pending_count > 0 else "failed",
2025-04-02 13:52:34 +08:00
"total_input_stocks": len(stocks),
2025-05-06 15:13:15 +08:00
"stats": {
"success": success_count,
"pending": pending_count,
"timeout": timeout_count,
"error": error_count
},
"rounds_attempted": total_attempts,
"investment_advices": investment_advice_list
2025-04-02 13:52:34 +08:00
}
if profile_filter:
response["filtered_stocks"] = {
"count": len(filtered_stocks),
"stocks": filtered_stocks
}
if generate_pdf:
response["pdf_results"] = {
"count": len(pdf_results),
"reports": pdf_results
}
return jsonify(response)
except Exception as e:
logger.error(f"综合分析失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"综合分析失败: {str(e)}"
}), 500
2025-05-14 08:54:56 +08:00
@app.route('/api/valuation_analysis', methods=['GET'])
def valuation_analysis():
"""
估值分析接口 - 获取股票的PE/PB估值分析数据
参数:
2025-06-05 10:42:14 +08:00
- stock_code: 股票代码支持两种格式SH601021或601021.SH
2025-05-14 08:54:56 +08:00
- stock_name: 股票名称和stock_code二选一
- start_date: 开始日期可选默认为2018-01-01
- industry_name: 行业名称可选
- concept_name: 概念板块名称可选
- metric: 估值指标可选值为'pe''pb'默认为'pe'
返回:
用于构建ECharts图表的数据对象
"""
try:
# 解析参数
stock_code = request.args.get('stock_code')
stock_name = request.args.get('stock_name')
start_date = request.args.get('start_date', '2018-01-01')
industry_name = request.args.get('industry_name')
concept_name = request.args.get('concept_name')
metric = request.args.get('metric', 'pe')
# 检查参数
if not stock_code and not stock_name:
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供stock_code或stock_name参数"
}), 400
if metric not in ['pe', 'pb']:
return jsonify({
"status": "error",
"message": "请求格式错误: metric参数必须为'pe''pb'"
}), 400
2025-06-05 10:42:14 +08:00
# 处理股票代码格式
if stock_code:
if '.' in stock_code: # 处理601021.SH格式
code_parts = stock_code.split('.')
if len(code_parts) == 2:
stock_code = f"{code_parts[1]}{code_parts[0]}"
else:
return jsonify({
"status": "error",
"message": "股票代码格式错误: 应为601021.SH格式"
}), 400
2025-05-14 08:54:56 +08:00
# 验证日期格式
try:
datetime.strptime(start_date, '%Y-%m-%d')
except ValueError:
return jsonify({
"status": "error",
"message": f"日期格式错误: {start_date}应为YYYY-MM-DD格式"
}), 400
# 获取股票历史数据
stock_data = valuation_analyzer.get_historical_data(stock_code, start_date)
if stock_data.empty:
return jsonify({
"status": "error",
"message": f"未找到股票 {stock_code} 的历史数据"
}), 404
# 使用过滤后的数据
metric_filtered = f'{metric}_filtered' if f'{metric}_filtered' in stock_data.columns else metric
# 计算分位数
percentiles = valuation_analyzer.calculate_percentiles(stock_data, metric)
if not percentiles:
return jsonify({
"status": "error",
"message": f"无法计算股票 {stock_code}{metric} 分位数"
}), 500
# 获取行业平均数据
industry_data = None
if industry_name:
industry_data = valuation_analyzer.get_industry_avg_data(industry_name, start_date, metric)
# 获取概念板块平均数据
concept_data = None
if concept_name:
concept_data = valuation_analyzer.get_concept_avg_data(concept_name, start_date, metric)
# 获取股票名称
stock_name = valuation_analyzer.get_stock_name(stock_code) if not stock_name else stock_name
# 构建ECharts数据结构
dates = stock_data['timestamp'].dt.strftime('%Y-%m-%d').tolist()
# 准备行业和概念数据,使其与股票数据的日期对齐
aligned_industry_data = []
aligned_concept_data = []
# 日期映射,用于快速查找
if industry_data is not None and not industry_data.empty:
industry_date_map = dict(zip(industry_data['timestamp'].dt.strftime('%Y-%m-%d').tolist(),
industry_data[f'avg_{metric}'].tolist()))
# 根据股票日期创建对齐的行业数据
for date in dates:
aligned_industry_data.append(industry_date_map.get(date, None))
if concept_data is not None and not concept_data.empty:
concept_date_map = dict(zip(concept_data['timestamp'].dt.strftime('%Y-%m-%d').tolist(),
concept_data[f'avg_{metric}'].tolist()))
# 根据股票日期创建对齐的概念数据
for date in dates:
aligned_concept_data.append(concept_date_map.get(date, None))
# 构建结果
result = {
"status": "success",
"data": {
"title": {
"text": f"{stock_code} {stock_name} 历史{metric.upper()}分位数分析",
"subtext": f"当前{metric.upper()}百分位: {percentiles['percentile']:.2f}%"
},
"tooltip": {
"trigger": "axis",
"axisPointer": {
"type": "cross"
}
},
"legend": {
"data": [f"{stock_name} {metric.upper()}"]
},
"grid": {
"left": "3%",
"right": "4%",
"bottom": "3%",
"containLabel": True
},
"xAxis": {
"type": "category",
"boundaryGap": False,
"data": dates
},
"yAxis": {
"type": "value",
"name": f"{metric.upper()}"
},
"series": [
{
"name": f"{stock_name} {metric.upper()}",
"type": "line",
"data": stock_data[metric_filtered].tolist(),
"markLine": {
"data": [
{"name": "最小值", "yAxis": percentiles['min']},
{"name": "最大值", "yAxis": percentiles['max']},
{"name": "均值", "yAxis": percentiles['mean']},
{"name": "第一四分位数", "yAxis": percentiles['q1']},
{"name": "第三四分位数", "yAxis": percentiles['q3']},
{"name": "当前值", "yAxis": percentiles['current']}
]
}
}
],
"percentiles": {
"min": percentiles['min'],
"max": percentiles['max'],
"current": percentiles['current'],
"mean": percentiles['mean'],
"median": percentiles['median'],
"q1": percentiles['q1'],
"q3": percentiles['q3'],
"percentile": percentiles['percentile']
}
}
}
# 添加行业平均数据
if industry_data is not None and not industry_data.empty:
# 添加到legend
result["data"]["legend"]["data"].append(f"{industry_name}行业平均{metric.upper()}")
# 添加到series
result["data"]["series"].append({
"name": f"{industry_name}行业平均{metric.upper()}",
"type": "line",
"data": aligned_industry_data,
"lineStyle": {
"color": "#1e90ff", # 深蓝色
"width": 2
},
"itemStyle": {
"color": "#1e90ff",
"opacity": 0.9
},
"connectNulls": True # 连接空值点
})
# 添加行业统计信息
result["data"]["industry_stats"] = {
"name": industry_name,
"min_count": int(industry_data['stock_count'].min()),
"max_count": int(industry_data['stock_count'].max()),
"avg_count": float(industry_data['stock_count'].mean()),
"avg_value": float(industry_data[f'avg_{metric}'].mean())
}
# 添加概念板块平均数据
if concept_data is not None and not concept_data.empty:
# 添加到legend
result["data"]["legend"]["data"].append(f"{concept_name}概念平均{metric.upper()}")
# 添加到series
result["data"]["series"].append({
"name": f"{concept_name}概念平均{metric.upper()}",
"type": "line",
"data": aligned_concept_data,
"lineStyle": {
"color": "#ff7f50", # 珊瑚色
"width": 2
},
"itemStyle": {
"color": "#ff7f50",
"opacity": 0.9
},
"connectNulls": True # 连接空值点
})
# 添加概念统计信息
result["data"]["concept_stats"] = {
"name": concept_name,
"min_count": int(concept_data['stock_count'].min()),
"max_count": int(concept_data['stock_count'].max()),
"avg_count": float(concept_data['stock_count'].mean()),
"avg_value": float(concept_data[f'avg_{metric}'].mean())
}
return jsonify(result)
except Exception as e:
logger.error(f"估值分析失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"估值分析失败: {str(e)}"
}), 500
@app.route('/api/industry/list', methods=['GET'])
def get_industry_list():
"""
获取行业列表
返回:
{
"status": "success",
"data": [
{"code": "100001", "name": "银行"},
{"code": "100002", "name": "保险"},
...
]
}
"""
try:
industries = industry_analyzer.get_industry_list()
if not industries:
return jsonify({
"status": "error",
"message": "未找到行业数据"
}), 404
return jsonify({
"status": "success",
"count": len(industries),
"data": industries
})
except Exception as e:
logger.error(f"获取行业列表失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"获取行业列表失败: {str(e)}"
}), 500
2025-05-19 17:02:52 +08:00
@app.route('/api/concept/list', methods=['GET'])
def get_concept_list():
"""
获取概念板块列表
返回:
{
"status": "success",
"data": [
{"code": "200001", "name": "人工智能"},
{"code": "200002", "name": "大数据"},
...
]
}
"""
try:
# 使用IndustryAnalyzer获取概念板块列表
concepts = industry_analyzer.get_concept_list()
if not concepts:
logger.warning("未找到概念板块数据")
return jsonify({
"status": "error",
"message": "未找到概念板块数据"
}), 404
return jsonify({
"status": "success",
"count": len(concepts),
"data": concepts
})
except Exception as e:
logger.error(f"获取概念板块列表失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"获取概念板块列表失败: {str(e)}"
}), 500
2025-05-14 08:54:56 +08:00
@app.route('/api/industry/analysis', methods=['GET'])
def industry_analysis():
"""
2025-06-05 10:42:14 +08:00
行业/概念板块分析接口 - 获取行业或概念板块的PE/PB/PS估值分析数据和拥挤度指标
2025-05-14 08:54:56 +08:00
参数:
2025-06-05 10:42:14 +08:00
- industry_name: 行业名称与concept_name二选一
- concept_name: 概念板块名称与industry_name二选一
2025-05-14 08:54:56 +08:00
- metric: 估值指标可选值为'pe''pb''ps'默认为'pe'
- start_date: 开始日期可选默认为3年前
返回:
2025-06-05 10:42:14 +08:00
用于构建ECharts图表的行业/概念板块估值数据对象包含估值指标和拥挤度
2025-05-14 08:54:56 +08:00
注意
2025-06-05 10:42:14 +08:00
- 行业/概念板块PE/PB/PS计算中已剔除负值和极端值(如PE>1000)
- 所有百分位数据都是基于行业/概念板块平均值计算的
2025-05-14 08:54:56 +08:00
- 拥挤度数据固定使用最近3年的数据不受start_date参数影响
"""
try:
# 解析参数
industry_name = request.args.get('industry_name')
2025-06-05 10:42:14 +08:00
concept_name = request.args.get('concept_name')
2025-05-14 08:54:56 +08:00
metric = request.args.get('metric', 'pe')
start_date = request.args.get('start_date')
# 检查参数
2025-06-05 10:42:14 +08:00
if not industry_name and not concept_name:
return jsonify({
"status": "error",
"message": "请求格式错误: 需要提供industry_name或concept_name参数"
}), 400
if industry_name and concept_name:
2025-05-14 08:54:56 +08:00
return jsonify({
"status": "error",
2025-06-05 10:42:14 +08:00
"message": "请求格式错误: industry_name和concept_name不能同时提供"
2025-05-14 08:54:56 +08:00
}), 400
if metric not in ['pe', 'pb', 'ps']:
return jsonify({
"status": "error",
"message": "请求格式错误: metric参数必须为'pe''pb''ps'"
}), 400
2025-06-05 10:42:14 +08:00
# 获取分析数据
if industry_name:
result = industry_analyzer.get_industry_analysis(industry_name, metric, start_date)
title_name = industry_name
else:
result = industry_analyzer.get_concept_analysis(concept_name, metric, start_date)
title_name = concept_name
2025-05-14 08:54:56 +08:00
if not result.get('success', False):
return jsonify({
"status": "error",
"message": result.get('message', '未知错误')
}), 404
# 构建ECharts数据结构
metric_name = metric.upper()
# 估值指标数据
valuation_data = result['valuation']
percentiles = valuation_data['percentiles']
# 准备图例数据
legend_data = [
2025-06-05 10:42:14 +08:00
f"{title_name}平均{metric_name}",
f"平均{metric_name}历史最小值",
f"平均{metric_name}历史最大值",
f"平均{metric_name}历史Q1",
f"平均{metric_name}历史Q3"
2025-05-14 08:54:56 +08:00
]
# 构建结果
response = {
"status": "success",
"data": {
"title": {
2025-06-05 10:42:14 +08:00
"text": f"{title_name}历史{metric_name}分析",
2025-05-14 08:54:56 +08:00
"subtext": f"当前{metric_name}百分位: {percentiles['percentile']:.2f}%(剔除负值及极端值)"
},
"tooltip": {
"trigger": "axis",
"axisPointer": {
"type": "cross"
}
},
"legend": {
"data": legend_data
},
"grid": [
{
"left": "3%",
"right": "4%",
"top": "15%",
"height": "50%",
"containLabel": True
}
],
"xAxis": [
{
"type": "category",
"boundaryGap": False,
"data": valuation_data['dates'],
"axisLabel": {
"rotate": 45
},
"gridIndex": 0
}
],
"yAxis": [
{
"type": "value",
"name": f"{metric_name}",
"gridIndex": 0
}
],
"dataZoom": [
{
"type": "inside",
"start": 0,
"end": 100,
"xAxisIndex": [0]
},
{
"start": 0,
"end": 100,
"xAxisIndex": [0]
}
],
"series": [
{
2025-06-05 10:42:14 +08:00
"name": f"{title_name}平均{metric_name}",
2025-05-14 08:54:56 +08:00
"type": "line",
"data": valuation_data['avg_values'],
"markLine": {
"data": [
{"name": "历史最小值", "yAxis": percentiles['min'], "lineStyle": {"color": "#28a745", "type": "dashed"}},
{"name": "历史最大值", "yAxis": percentiles['max'], "lineStyle": {"color": "#dc3545", "type": "dashed"}},
{"name": "历史均值", "yAxis": percentiles['mean'], "lineStyle": {"color": "#9400d3", "type": "dashed"}},
{"name": "历史Q1", "yAxis": percentiles['q1'], "lineStyle": {"color": "#28a745", "type": "dashed"}},
{"name": "历史Q3", "yAxis": percentiles['q3'], "lineStyle": {"color": "#dc3545", "type": "dashed"}}
]
}
},
{
2025-06-05 10:42:14 +08:00
"name": f"平均{metric_name}历史最小值",
2025-05-14 08:54:56 +08:00
"type": "line",
"data": valuation_data['min_values'],
"lineStyle": {"width": 1, "opacity": 0.4, "color": "#28a745"},
"areaStyle": {"opacity": 0.1, "color": "#28a745"}
},
{
2025-06-05 10:42:14 +08:00
"name": f"平均{metric_name}历史最大值",
2025-05-14 08:54:56 +08:00
"type": "line",
"data": valuation_data['max_values'],
"lineStyle": {"width": 1, "opacity": 0.4, "color": "#dc3545"},
"areaStyle": {"opacity": 0.1, "color": "#dc3545"}
},
{
2025-06-05 10:42:14 +08:00
"name": f"平均{metric_name}历史Q1",
2025-05-14 08:54:56 +08:00
"type": "line",
"data": valuation_data['q1_values'],
"lineStyle": {"width": 1, "opacity": 0.6, "color": "#28a745"}
},
{
2025-06-05 10:42:14 +08:00
"name": f"平均{metric_name}历史Q3",
2025-05-14 08:54:56 +08:00
"type": "line",
"data": valuation_data['q3_values'],
"lineStyle": {"width": 1, "opacity": 0.6, "color": "#dc3545"}
}
],
"percentiles": {
"min": percentiles['min'],
"max": percentiles['max'],
"current": percentiles['current'],
"mean": percentiles['mean'],
"median": percentiles['median'],
"q1": percentiles['q1'],
"q3": percentiles['q3'],
"percentile": percentiles['percentile'],
"stock_count": percentiles['stock_count'],
"date": percentiles.get('date', valuation_data['dates'][-1])
},
"toolbox": {
"feature": {
"saveAsImage": {}
}
},
"valuation": {
"dates": valuation_data['dates'],
"avg_values": valuation_data['avg_values']
}
}
}
2025-06-05 10:42:14 +08:00
# 添加拥挤度指标(如果有)
2025-05-14 08:54:56 +08:00
if "crowding" in result:
crowding_data = result["crowding"]
current_crowding = crowding_data["current"]
# 添加拥挤度数据作为单独部分
response["data"]["crowding"] = {
"current_ratio": current_crowding["ratio"],
"current_percentile": current_crowding["percentile"],
"level": current_crowding["level"],
"dates": crowding_data["dates"],
"percentiles": crowding_data["percentiles"],
"ratios": crowding_data["ratios"]
}
# 如果有行业成交比例的历史统计数据,也添加到响应中
if "ratio_stats" in current_crowding:
response["data"]["crowding"]["ratio_stats"] = current_crowding["ratio_stats"]
return jsonify(response)
except Exception as e:
2025-06-05 10:42:14 +08:00
logger.error(f"行业/概念板块分析请求失败: {str(e)}")
2025-05-14 08:54:56 +08:00
return jsonify({
"status": "error",
"message": f"分析失败: {str(e)}"
}), 500
@app.route('/industry')
def industry_page():
"""渲染行业分析页面"""
return render_template('industry.html')
2025-05-14 16:52:24 +08:00
@app.route('/hsgt')
def hsgt_page():
"""渲染沪深港通监控页面"""
return render_template('hsgt_monitor.html')
@app.route('/api/hsgt/northbound', methods=['GET'])
def get_northbound_data():
"""获取北向资金流向数据接口
参数:
2025-06-05 10:42:14 +08:00
- start_time: 可选 开始时间戳
2025-05-14 16:52:24 +08:00
- end_time: 可选结束时间戳
返回北向资金流向数据
"""
try:
# 获取请求参数
start_time = request.args.get('start_time')
end_time = request.args.get('end_time')
# 转换为整数
if start_time:
start_time = int(start_time)
if end_time:
end_time = int(end_time)
# 调用数据获取方法
result = hsgt_monitor.fetch_northbound_data(start_time, end_time)
if result.get('success'):
return jsonify({
"status": "success",
"data": result
})
else:
return jsonify({
"status": "error",
"message": result.get('message', '获取北向资金数据失败')
}), 500
except ValueError as e:
return jsonify({
"status": "error",
"message": f"参数格式错误: {str(e)}"
}), 400
except Exception as e:
logger.error(f"获取北向资金数据异常: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器错误: {str(e)}"
}), 500
@app.route('/api/hsgt/southbound', methods=['GET'])
def get_southbound_data():
"""获取南向资金流向数据接口
参数:
- start_time: 可选开始时间戳
- end_time: 可选结束时间戳
返回南向资金流向数据
"""
try:
# 获取请求参数
start_time = request.args.get('start_time')
end_time = request.args.get('end_time')
# 转换为整数
if start_time:
start_time = int(start_time)
if end_time:
end_time = int(end_time)
# 调用数据获取方法
result = hsgt_monitor.fetch_southbound_data(start_time, end_time)
if result.get('success'):
return jsonify({
"status": "success",
"data": result
})
else:
return jsonify({
"status": "error",
"message": result.get('message', '获取南向资金数据失败')
}), 500
except ValueError as e:
return jsonify({
"status": "error",
"message": f"参数格式错误: {str(e)}"
}), 400
except Exception as e:
logger.error(f"获取南向资金数据异常: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器错误: {str(e)}"
}), 500
2025-05-19 17:02:52 +08:00
@app.route('/api/rzrq/chart_data', methods=['GET'])
def get_rzrq_chart_data():
"""获取融资融券数据用于图表展示
参数:
- days: 可选获取最近多少天的数据默认30天
返回内容
{
"status": "success",
"data": {
"success": true,
"dates": ["2023-01-01", "2023-01-02", ...],
"series": [
{
"name": "融资融券余额合计",
"data": [1234.56, 1235.67, ...],
"unit": "亿元"
},
// 其他系列数据...
],
"last_update": "2023-01-15 12:34:56"
}
}
"""
try:
# 获取天数参数
days = request.args.get('days', type=int, default=30)
# 限制天数范围
if days <= 0:
days = 30
elif days > 365:
days = 365
# 调用数据获取方法
result = em_rzrq_collector.get_chart_data(limit_days=days)
if result.get('success'):
return jsonify({
"status": "success",
"data": result
})
else:
return jsonify({
"status": "error",
"message": result.get('message', '获取融资融券数据失败')
}), 500
except ValueError as e:
return jsonify({
"status": "error",
"message": f"参数格式错误: {str(e)}"
}), 400
except Exception as e:
logger.error(f"获取融资融券图表数据异常: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器错误: {str(e)}"
}), 500
2025-05-14 16:52:24 +08:00
@app.route('/api/stock/tracks', methods=['GET'])
def get_stock_tracks():
"""根据股票代码获取相关赛道信息
参数:
- stock_code: 必须股票代码
返回赛道列表
"""
try:
# 获取股票代码参数
stock_code = request.args.get('stock_code')
# 验证参数
if not stock_code:
return jsonify({
"status": "error",
"message": "缺少必要参数: stock_code"
}), 400
# 查询赛道关联信息
track_query = text("""
SELECT DISTINCT pc.stock_code, ci.belong_industry
FROM gp_product_category pc
JOIN gp_category_industry ci ON pc.category_name = ci.category_name
WHERE pc.stock_code = :stock_code
""")
# 获取赛道信息
tracks = []
try:
# 获取数据库连接
db_session = next(get_db())
# 执行查询
result = db_session.execute(track_query, {"stock_code": stock_code})
for row in result:
if row.belong_industry: # 确保不为空
tracks.append(row.belong_industry)
except Exception as e:
logger.error(f"查询赛道信息失败: {str(e)}")
return jsonify({
"status": "error",
"message": f"查询赛道信息失败: {str(e)}"
}), 500
finally:
if 'db_session' in locals() and db_session is not None:
db_session.close() # 关闭会话
# 返回结果
return jsonify({
"status": "success",
"data": {
"stock_code": stock_code,
"tracks": tracks
}
})
except Exception as e:
logger.error(f"获取股票赛道信息异常: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器错误: {str(e)}"
}), 500
2025-05-19 17:02:52 +08:00
@app.route('/api/stock/price_range', methods=['GET'])
def get_stock_price_range():
"""根据股票估值分位计算理论价格区间
根据当前PE和PB的四分位数据反向计算出对应的理论股价区间
参数:
- stock_code: 必须股票代码
- start_date: 可选开始日期默认为一年前
返回内容:
{
"status": "success",
"data": {
"stock_code": "600000",
"stock_name": "浦发银行",
"current_price": 10.5,
"current_date": "2023-12-01",
"pe": {
"current": 5.2,
"q1": 4.8,
"q3": 6.5,
"q1_price": 9.7, // 对应PE为Q1时的理论股价
"q3_price": 13.1 // 对应PE为Q3时的理论股价
},
"pb": {
"current": 0.65,
"q1": 0.6,
"q3": 0.8,
"q1_price": 9.7, // 对应PB为Q1时的理论股价
"q3_price": 12.9 // 对应PB为Q3时的理论股价
}
}
}
"""
try:
# 获取股票代码参数
stock_code = request.args.get('stock_code')
# 验证参数
if not stock_code:
return jsonify({
"status": "error",
"message": "缺少必要参数: stock_code"
}), 400
# 计算一年前的日期作为默认起始日期
default_start_date = (datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')
start_date = request.args.get('start_date', default_start_date)
# 通过复用现有API的逻辑获取PE和PB数据
# 首先获取PE数据
pe_data = valuation_analyzer.get_historical_data(stock_code, start_date)
if pe_data.empty:
return jsonify({
"status": "error",
"message": f"未找到股票 {stock_code} 的历史数据"
}), 404
# 计算PE分位数
pe_percentiles = valuation_analyzer.calculate_percentiles(pe_data, 'pe')
if not pe_percentiles:
return jsonify({
"status": "error",
"message": f"无法计算股票 {stock_code} 的PE分位数"
}), 500
# 计算PB分位数
pb_percentiles = valuation_analyzer.calculate_percentiles(pe_data, 'pb')
if not pb_percentiles:
return jsonify({
"status": "error",
"message": f"无法计算股票 {stock_code} 的PB分位数"
}), 500
# 获取当前股价
current_price = None
current_date = None
if not pe_data.empty:
current_price = pe_data.iloc[-1].get('close')
current_date = pe_data.iloc[-1].get('timestamp').strftime('%Y-%m-%d') if 'timestamp' in pe_data.columns else None
if current_price is None:
return jsonify({
"status": "error",
"message": f"无法获取股票 {stock_code} 的当前股价"
}), 500
# 获取当前PE和PB
current_pe = pe_percentiles.get('current')
current_pb = pb_percentiles.get('current')
# 获取PE的Q1和Q3
pe_q1 = pe_percentiles.get('q1')
pe_q3 = pe_percentiles.get('q3')
# 获取PB的Q1和Q3
pb_q1 = pb_percentiles.get('q1')
pb_q3 = pb_percentiles.get('q3')
# 反向计算估值分位对应的股价
# 如果当前PE为X股价为Y则PE为Z时的理论股价 = Y * (X / Z)
# 计算PE对应的理论股价
pe_q1_price = None
pe_q3_price = None
if current_pe and current_pe > 0 and pe_q1 and pe_q3:
pe_q1_price = current_price * (pe_q1 / current_pe)
pe_q3_price = current_price * (pe_q3 / current_pe)
# 计算PB对应的理论股价
pb_q1_price = None
pb_q3_price = None
if current_pb and current_pb > 0 and pb_q1 and pb_q3:
pb_q1_price = current_price * (pb_q1 / current_pb)
pb_q3_price = current_price * (pb_q3 / current_pb)
# 获取股票名称
stock_name = valuation_analyzer.get_stock_name(stock_code)
# 构建响应
response = {
"status": "success",
"data": {
"stock_code": stock_code,
"stock_name": stock_name,
"current_price": current_price,
"current_date": current_date,
"pe": {
"current": current_pe,
"q1": pe_q1,
"q3": pe_q3,
"q1_price": round(pe_q1_price, 2) if pe_q1_price is not None else None,
"q3_price": round(pe_q3_price, 2) if pe_q3_price is not None else None
},
"pb": {
"current": current_pb,
"q1": pb_q1,
"q3": pb_q3,
"q1_price": round(pb_q1_price, 2) if pb_q1_price is not None else None,
"q3_price": round(pb_q3_price, 2) if pb_q3_price is not None else None
}
}
}
return jsonify(response)
except Exception as e:
logger.error(f"计算股票价格区间异常: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器错误: {str(e)}"
}), 500
@app.route('/api/fear_greed/data', methods=['GET'])
def get_fear_greed_data():
"""获取恐贪指数数据
参数:
- start_date: 可选开始日期YYYY-MM-DD格式
- end_date: 可选结束日期YYYY-MM-DD格式
- limit: 可选限制返回的记录数量默认为730约两年的交易日数量
返回内容
{
"status": "success",
"data": {
"dates": ["2023-01-01", "2023-01-02", ...],
"values": [45.67, 50.12, ...],
"latest": {
"id": 123,
"index_value": 50.12,
"trading_date": "2023-01-02",
"update_time": "2023-01-02 15:30:00"
},
"latest_status": "中性",
"update_time": "2023-01-02 16:00:00"
}
}
"""
try:
# 获取参数
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
limit = request.args.get('limit', type=int, default=730)
# 调用数据获取方法
result = fear_greed_manager.get_index_data(start_date, end_date, limit)
if result.get('success'):
return jsonify({
"status": "success",
"data": result
})
else:
return jsonify({
"status": "error",
"message": result.get('message', '获取恐贪指数数据失败')
}), 500
except ValueError as e:
return jsonify({
"status": "error",
"message": f"参数格式错误: {str(e)}"
}), 400
except Exception as e:
logger.error(f"获取恐贪指数数据异常: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器错误: {str(e)}"
}), 500
@app.route('/api/fear_greed/add', methods=['POST'])
def add_fear_greed_data():
"""添加恐贪指数数据
请求体格式:
{
"index_value": 45.67, // 恐贪指数值0-100之间的数值
"trading_date": "2023-01-01" // 交易日期YYYY-MM-DD格式
}
返回内容:
{
"status": "success",
"message": "数据添加成功"
}
"""
try:
# 从请求体获取参数
data = request.get_json()
if not data:
return jsonify({
"status": "error",
"message": "请求体为空"
}), 400
index_value = data.get('index_value')
trading_date = data.get('trading_date')
# 验证参数
if index_value is None:
return jsonify({
"status": "error",
"message": "缺少必要参数: index_value"
}), 400
if trading_date is None:
return jsonify({
"status": "error",
"message": "缺少必要参数: trading_date"
}), 400
# 尝试转换为浮点数
try:
index_value = float(index_value)
except ValueError:
return jsonify({
"status": "error",
"message": "index_value必须是数值"
}), 400
# 调用添加方法
result = fear_greed_manager.add_index_data(index_value, trading_date)
if result:
return jsonify({
"status": "success",
"message": "恐贪指数数据添加成功"
})
else:
return jsonify({
"status": "error",
"message": "恐贪指数数据添加失败"
}), 500
except Exception as e:
logger.error(f"添加恐贪指数数据异常: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器错误: {str(e)}"
}), 500
# 获取可用指数列表
@app.route('/api/indices/list', methods=['GET'])
def get_indices_list():
"""
获取可用指数列表
返回所有可用于叠加显示的指数列表
"""
try:
indices = index_analyzer.get_indices_list()
return jsonify({
"status": "success",
"data": indices
})
except Exception as e:
logger.error(f"获取指数列表失败: {str(e)}")
return jsonify({"status": "error", "message": str(e)})
# 获取指数历史数据
@app.route('/api/indices/data', methods=['GET'])
def get_index_data():
"""
获取指数历史数据
参数:
- code: 指数代码
- start_date: 开始日期 (可选默认为1年前)
- end_date: 结束日期 (可选默认为今天)
返回指数历史收盘价数据
"""
try:
index_code = request.args.get('code')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
if not index_code:
return jsonify({"status": "error", "message": "缺少指数代码参数"})
index_data = index_analyzer.get_index_data(index_code, start_date, end_date)
return jsonify({
"status": "success",
"data": index_data
})
except Exception as e:
logger.error(f"获取指数数据失败: {str(e)}")
return jsonify({"status": "error", "message": str(e)})
2025-05-21 14:01:54 +08:00
@app.route('/api/financial/analysis', methods=['GET'])
def financial_analysis():
"""
财务分析接口
请求参数:
stock_code: 股票代码
2025-06-05 10:42:14 +08:00
force_update: 是否强制更新缓存可选默认为false
current_year: 当前年份格式为'YYYY-12-31'可选默认为'2024-12-31'
previous_year: 上一年份格式为'YYYY-12-31'可选默认为'2023-12-31'
2025-05-19 17:02:52 +08:00
2025-05-21 14:01:54 +08:00
返回:
分析结果JSON
"""
try:
stock_code = request.args.get('stock_code')
if not stock_code:
return jsonify({
'success': False,
'message': '缺少必要参数stock_code'
}), 400
2025-06-05 10:42:14 +08:00
2025-05-21 14:01:54 +08:00
analyzer = FinancialAnalyzer()
2025-06-05 10:42:14 +08:00
result2024 = analyzer.analyze_financial_data(stock_code, current_year = '2024-12-31', previous_year = '2023-12-31')
result2023 = analyzer.analyze_financial_data(stock_code, current_year = '2023-12-31', previous_year = '2022-12-31')
result2022 = analyzer.analyze_financial_data(stock_code, current_year = '2022-12-31', previous_year = '2021-12-31')
# 合并2023和2022的is_better、change_rate、change和avg_score到2024的result
def merge_year_data(target, source, year):
for block in ["financial_strength", "profitability", "growth", "value_rating", "liquidity"]:
if block in target and block in source:
# avg_score
target[block][f"avg_score_{year}"] = source[block].get("avg_score")
# indicators
if "indicators" in target[block] and "indicators" in source[block]:
for t_item, s_item in zip(target[block]["indicators"], source[block]["indicators"]):
for k in ["is_better", "change_rate", "change"]:
if k in s_item:
t_item[f"{k}_{year}"] = s_item.get(k)
merge_year_data(result2024, result2023, "2023")
merge_year_data(result2024, result2022, "2022")
return jsonify(result2024)
2025-05-19 17:02:52 +08:00
except Exception as e:
2025-05-21 14:01:54 +08:00
logger.error(f"财务分析失败: {str(e)}")
return jsonify({
'success': False,
'message': f'财务分析失败: {str(e)}'
}), 500
@app.route('/api/financial/indicators', methods=['GET'])
def get_financial_indicators():
"""
获取财务指标接口
请求参数:
stock_code: 股票代码
返回:
JSON格式的财务指标数据
"""
try:
# 获取股票代码
stock_code = request.args.get('stock_code')
if not stock_code:
return jsonify({
'success': False,
'message': '缺少必要参数: stock_code'
}), 400
# 创建分析器实例
analyzer = FinancialAnalyzer()
# 获取财务指标
result = analyzer.extract_financial_indicators(stock_code)
return jsonify(result)
except Exception as e:
logger.error(f"获取财务指标失败: {str(e)}")
return jsonify({
'success': False,
'message': f'获取财务指标失败: {str(e)}'
}), 500
@app.route('/api/financial/test_structure', methods=['GET'])
def test_mongo_structure():
"""
测试MongoDB集合结构接口
请求参数:
stock_code: 股票代码可选
返回:
JSON格式的集合结构信息
"""
try:
# 获取股票代码(可选)
stock_code = request.args.get('stock_code')
# 创建分析器实例
analyzer = FinancialAnalyzer()
# 获取集合结构
result = analyzer.test_mongo_structure(stock_code)
return jsonify(result)
except Exception as e:
logger.error(f"测试MongoDB结构失败: {str(e)}")
return jsonify({
'success': False,
'message': f'测试MongoDB结构失败: {str(e)}'
}), 500
2025-05-19 17:02:52 +08:00
2025-06-05 10:42:14 +08:00
@app.route('/api/stock/real_time_price', methods=['GET'])
def get_real_time_price():
"""获取股票实时价格接口
2025-05-19 17:02:52 +08:00
2025-06-05 10:42:14 +08:00
参数:
- stock_code: 股票代码必填
2025-05-23 10:19:48 +08:00
2025-06-05 10:42:14 +08:00
返回:
{
"status": "success",
"data": {
"stock_code": "600000",
"stock_name": "浦发银行",
"current_price": 10.5,
"change_percent": 2.5,
"change_amount": 0.25,
"volume": 1234567,
"amount": 12345678.9,
"high": 10.8,
"low": 10.2,
"open": 10.3,
"pre_close": 10.25,
"update_time": "2024-01-20 14:30:00"
}
}
"""
try:
# 获取股票代码参数
stock_code = request.args.get('stock_code')
# 验证参数
if not stock_code:
return jsonify({
"status": "error",
"message": "缺少必要参数: stock_code"
}), 400
# 导入股票价格采集器
from src.valuation_analysis.stock_price_collector import StockPriceCollector
# 创建采集器实例
collector = StockPriceCollector()
# 获取实时价格数据
price_data = collector.get_stock_price_data(stock_code)
if not price_data:
return jsonify({
"status": "error",
"message": f"获取股票 {stock_code} 的实时价格失败"
}), 404
# 构建响应数据
response_data = {
"stock_code": stock_code,
"stock_name": price_data.get('stock_name'),
"current_price": price_data.get('current_price'),
"change_percent": price_data.get('change_percent'),
"change_amount": price_data.get('change_amount'),
"volume": price_data.get('volume'),
"amount": price_data.get('amount'),
"high": price_data.get('high'),
"low": price_data.get('low'),
"open": price_data.get('open'),
"pre_close": price_data.get('pre_close'),
"update_time": price_data.get('update_time')
}
return jsonify({
"status": "success",
"data": response_data
})
except Exception as e:
logger.error(f"获取股票实时价格异常: {str(e)}")
return jsonify({
"status": "error",
"message": f"服务器错误: {str(e)}"
}), 500
@app.route('/bigscreen')
def bigscreen_page():
"""渲染大屏展示页面"""
return render_template('bigscreen.html')
@app.route('/api/bigscreen_data', methods=['GET'])
def bigscreen_data():
"""聚合大屏所需的12张图数据便于前端一次性加载"""
try:
# 资金流向
north = hsgt_monitor.fetch_northbound_data()
south = hsgt_monitor.fetch_southbound_data()
# 融资融券
rzrq = em_rzrq_collector.get_chart_data(limit_days=90)
# 恐贪指数
fear_greed = fear_greed_manager.get_index_data(limit=180)
# 概念板块
concepts = [
("先进封装", "xjfz"),
("芯片", "xp"),
("消费电子概念", "xfdz"),
("机器人概念", "jqr")
]
concept_data = {}
for cname, key in concepts:
res = industry_analyzer.get_concept_analysis(cname, 'pe', None)
if res.get('success'):
# PE主线
pe = {
'dates': res['valuation']['dates'],
'values': res['valuation']['avg_values']
}
# 拥挤度
crowding = res.get('crowding', {})
crowding_obj = {
'dates': crowding.get('dates', []),
'values': crowding.get('percentiles', [])
} if crowding else {'dates': [], 'values': []}
concept_data[key] = {'pe': pe, 'crowding': crowding_obj}
else:
concept_data[key] = {'pe': {'dates': [], 'values': []}, 'crowding': {'dates': [], 'values': []}}
return jsonify({
'status': 'success',
'northbound': {
'dates': north.get('times', []),
'values': north.get('data', {}).get('total', [])
} if north.get('success') else {'dates': [], 'values': []},
'southbound': {
'dates': south.get('times', []),
'values': south.get('data', {}).get('total', [])
} if south.get('success') else {'dates': [], 'values': []},
'rzrq': {
'dates': rzrq.get('dates', []),
'values': rzrq.get('series', [{}])[0].get('data', [])
} if rzrq.get('success') and rzrq.get('series') else {'dates': [], 'values': []},
'fear_greed': {
'dates': fear_greed.get('dates', []),
'values': fear_greed.get('values', [])
} if fear_greed.get('success') else {'dates': [], 'values': []},
'concepts': concept_data
})
except Exception as e:
logger.error(f"大屏数据聚合失败: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)})
@app.route('/api/pep_stock_info_by_shortname', methods=['GET'])
def get_pep_stock_info_by_shortname():
"""根据股票简称查询pep_stock_info集合中的全部字段"""
short_name = request.args.get('short_name')
if not short_name:
return jsonify({'success': False, 'message': '缺少必要参数: short_name'}), 400
try:
analyzer = FinancialAnalyzer()
result = analyzer.get_pep_stock_info_by_shortname(short_name)
return jsonify(result)
except Exception as e:
return jsonify({'success': False, 'message': f'服务器错误: {str(e)}'}), 500
@app.route('/api/pep_stock_info_by_code', methods=['GET'])
def get_pep_stock_info_by_code():
"""根据股票简称查询pep_stock_info集合中的全部字段"""
short_code = request.args.get('code')
if not short_code:
return jsonify({'success': False, 'message': '缺少必要参数: short_code'}), 400
try:
analyzer = FinancialAnalyzer()
result = analyzer.get_pep_stock_info_by_code(short_code)
return jsonify(result)
except Exception as e:
return jsonify({'success': False, 'message': f'服务器错误: {str(e)}'}), 500
if __name__ == '__main__':
2025-05-19 17:02:52 +08:00
# 启动Web服务器
app.run(host='0.0.0.0', port=5000, debug=True)