stock_fundamentals/src/quantitative_analysis/financial_indicator_analyze...

1328 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
财务指标分析器
用于分析MongoDB中eastmoney_financial_data_v2集合的财务数据
只返回具体的数值信息,不进行评估
"""
import sys
import pymongo
import datetime
import logging
from typing import Dict, List, Optional, Union
from pathlib import Path
from sqlalchemy import create_engine, text
import numpy as np
# 添加项目根路径到Python路径
project_root = Path(__file__).parent.parent.parent
sys.path.append(str(project_root))
# 导入配置
from src.valuation_analysis.config import MONGO_CONFIG2, DB_URL
# 导入股票代码格式转换工具
from tools.stock_code_formatter import StockCodeFormatter
# 设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class FinancialIndicatorAnalyzer:
"""财务指标分析器"""
def __init__(self):
"""初始化"""
# MongoDB连接
self.mongo_client = None
self.db = None
self.collection_name = 'eastmoney_financial_data_v2'
self.collection = None
# MySQL连接
self.mysql_engine = None
# 股票代码格式转换器
self.code_formatter = StockCodeFormatter()
self.connect_mongodb()
self.connect_mysql()
def connect_mongodb(self):
"""连接MongoDB数据库"""
try:
self.mongo_client = pymongo.MongoClient(
host=MONGO_CONFIG2['host'],
port=MONGO_CONFIG2['port'],
username=MONGO_CONFIG2['username'],
password=MONGO_CONFIG2['password']
)
self.db = self.mongo_client[MONGO_CONFIG2['db']]
self.collection = self.db[self.collection_name]
# 测试连接
self.mongo_client.admin.command('ping')
logger.info(f"MongoDB连接成功使用集合: {self.collection_name}")
except Exception as e:
logger.error(f"MongoDB连接失败: {str(e)}")
raise
def connect_mysql(self):
"""连接MySQL数据库"""
try:
self.mysql_engine = create_engine(
DB_URL,
pool_size=5,
max_overflow=10,
pool_recycle=3600
)
# 测试连接
with self.mysql_engine.connect() as conn:
conn.execute(text("SELECT 1"))
logger.info("MySQL数据库连接成功")
except Exception as e:
logger.error(f"MySQL数据库连接失败: {str(e)}")
raise
def normalize_stock_code(self, stock_code: str) -> str:
"""
标准化股票代码格式,转换为数据库中使用的格式 (如 SZ300661)
Args:
stock_code: 输入的股票代码,支持多种格式:
- 300661.SZ -> SZ300661
- 300661 -> SZ300661
- SZ300661 -> SZ300661 (已是标准格式)
Returns:
str: 标准化后的股票代码
"""
stock_code = stock_code.strip().upper()
if '.' in stock_code: # 处理 300661.SZ 格式
parts = stock_code.split('.')
if len(parts) == 2:
stock_code = f"{parts[1]}{parts[0]}"
elif stock_code.isdigit(): # 处理 300661 格式
if stock_code.startswith(('60', '68')):
stock_code = f"SH{stock_code}"
elif stock_code.startswith(('00', '30', '20')):
stock_code = f"SZ{stock_code}"
elif stock_code.startswith(('8', '43', '87')):
stock_code = f"BJ{stock_code}"
# 如果已经是 SZ300661 格式,则不需要处理
return stock_code
def get_latest_pe_pb_data(self, stock_code: str) -> Optional[Dict]:
"""
从gp_day_data表获取股票最新的PE和PB数据
Args:
stock_code: 股票代码,支持多种格式 (300661.SZ, 300661, SZ300661)
Returns:
Dict: 包含pe和pb的字典如果没有找到则返回None
"""
try:
# 标准化股票代码格式
normalized_code = self.code_formatter.to_prefix_format(stock_code)
query = text("""
SELECT pe, pb, `timestamp`
FROM gp_day_data
WHERE symbol = :stock_code
ORDER BY `timestamp` DESC
LIMIT 1
""")
with self.mysql_engine.connect() as conn:
result = conn.execute(query, {"stock_code": normalized_code}).fetchone()
if result:
return {
'pe': float(result[0]) if result[0] is not None else None,
'pb': float(result[1]) if result[1] is not None else None,
'timestamp': result[2]
}
else:
logger.warning(f"未找到股票 {stock_code} (标准化后: {normalized_code}) 的PE/PB数据")
return None
except Exception as e:
logger.error(f"获取股票 {stock_code} PE/PB数据失败: {str(e)}")
return None
def get_financial_data(self, stock_code: str, report_date: Optional[str] = None) -> Optional[Dict]:
"""
获取指定股票的财务数据
Args:
stock_code: 股票代码
report_date: 报告日期,格式 'YYYY-MM-DD'如果为None则获取最新数据
Returns:
Dict: 财务数据如果没有找到则返回None
"""
try:
formatted_stock_code = self.code_formatter.to_dot_format(stock_code)
if report_date:
# 查询指定日期的数据
query = {
'stock_code': formatted_stock_code,
'report_date': report_date
}
data = self.collection.find_one(query)
else:
# 查询最新数据
query = {'stock_code': formatted_stock_code}
data = self.collection.find_one(
query,
sort=[('report_date', -1)]
)
return data
except Exception as e:
logger.error(f"获取财务数据失败 {stock_code} - {report_date}: {str(e)}")
return None
def safe_divide(self, numerator: Union[float, int, None], denominator: Union[float, int, None]) -> Optional[float]:
"""
安全除法处理None值和零除错误
Args:
numerator: 分子
denominator: 分母
Returns:
float: 计算结果如果无法计算则返回None
"""
try:
if numerator is None or denominator is None or denominator == 0:
return None
return float(numerator) / float(denominator)
except (TypeError, ValueError, ZeroDivisionError):
return None
# ==================== 盈利能力指标 ====================
def analyze_roe(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析净资产收益率(ROE)
公式: ROE = 归母净利润 / 归母股东权益
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: ROE值(%)如果无法计算则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取归母净利润
profit = data.get('profit_statement', {}).get('PARENT_NETPROFIT')
# 获取归母股东权益
equity = data.get('balance_sheet', {}).get('TOTAL_PARENT_EQUITY')
roe = self.safe_divide(profit, equity)
if roe:
roe = roe * 100 # 转换为百分比
return roe
except Exception as e:
logger.error(f"分析ROE失败 {stock_code}: {str(e)}")
return None
def analyze_gross_profit_margin(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析毛利率
公式: 毛利率 = (营业收入 - 营业成本) / 营业收入
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 毛利率(%)如果无法计算则返回None
"""
try:
fm_stock_code = self.code_formatter.to_dot_format(stock_code)
data = self.get_financial_data(fm_stock_code, report_date)
if not data:
return None
# 获取营业收入和营业成本
revenue = data.get('profit_statement', {}).get('OPERATE_INCOME')
cost = data.get('profit_statement', {}).get('OPERATE_COST')
if revenue and cost:
gross_profit = revenue - cost
margin = self.safe_divide(gross_profit, revenue)
if margin:
margin = margin * 100 # 转换为百分比
else:
margin = None
return margin
except Exception as e:
logger.error(f"分析毛利率失败 {stock_code}: {str(e)}")
return None
def analyze_net_profit_margin(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析净利率
公式: 净利率 = 归母净利润 / 营业收入
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 净利率(%)如果无法计算则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取归母净利润和营业收入
profit = data.get('profit_statement', {}).get('PARENT_NETPROFIT')
revenue = data.get('profit_statement', {}).get('OPERATE_INCOME')
margin = self.safe_divide(profit, revenue)
if margin:
margin = margin * 100 # 转换为百分比
return margin
except Exception as e:
logger.error(f"分析净利率失败 {stock_code}: {str(e)}")
return None
# ==================== 成长能力指标 ====================
def analyze_revenue_growth(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析营业收入增长率使用YOY数据
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 增长率(%)如果无法获取则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取营业收入同比增长率
growth_rate = data.get('profit_statement', {}).get('OPERATE_INCOME_YOY')
return growth_rate
except Exception as e:
logger.error(f"分析营业收入增长率失败 {stock_code}: {str(e)}")
return None
def analyze_profit_growth(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析净利润增长率使用YOY数据
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 增长率(%)如果无法获取则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取归母净利润同比增长率
growth_rate = data.get('profit_statement', {}).get('PARENT_NETPROFIT_YOY')
return growth_rate
except Exception as e:
logger.error(f"分析净利润增长率失败 {stock_code}: {str(e)}")
return None
def analyze_total_assets_growth(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析总资产增长率使用YOY数据
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 增长率(%)如果无法获取则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取总资产同比增长率
growth_rate = data.get('balance_sheet', {}).get('TOTAL_ASSETS_YOY')
return growth_rate
except Exception as e:
logger.error(f"分析总资产增长率失败 {stock_code}: {str(e)}")
return None
def analyze_growth_capability(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析成长能力指标
公式: 成长能力 = 最新净利润增长率 / 过去8个季度净利润同比变化的标准差
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 成长能力指标如果无法计算则返回None
"""
try:
# 将股票代码转换为点分格式 (如 430139.BJ)
formatted_stock_code = self.code_formatter.to_dot_format(stock_code)
if not formatted_stock_code:
logger.error(f"无法格式化股票代码: {stock_code}")
return None
# 生成需要查询的季度列表往前推13个季度
# 原因需要8个季度计算增长率每个增长率需要去年同期数据(+4)
# 加上1个季度缓冲8+4+1=13个季度
quarters = self._generate_quarters(report_date or "2025-03-31", 13)
# 获取多个季度的财务数据
quarterly_data = []
for quarter in quarters:
data = self.get_financial_data(formatted_stock_code, quarter)
if data:
profit = data.get('profit_statement', {}).get('PARENT_NETPROFIT')
quarterly_data.append({
'quarter': quarter,
'profit': profit
})
if len(quarterly_data) < 9:
logger.warning(f"股票 {stock_code} (格式化后: {formatted_stock_code}) 原始季度数据不足({len(quarterly_data)}),无法计算成长能力")
return None
# 处理季度数据的累积性,计算单季度净利润
processed_data = self._process_quarterly_profits(quarterly_data)
if len(processed_data) < 9:
logger.warning(f"股票 {stock_code} (格式化后: {formatted_stock_code}) 处理后季度数据不足({len(processed_data)})")
return None
# 计算每个季度的同比增长率
growth_rates = self._calculate_yoy_growth_rates(processed_data)
if len(growth_rates) < 8:
logger.warning(f"股票 {stock_code} (格式化后: {formatted_stock_code}) 同比增长率数据不足")
return None
# 取最新8个季度的增长率计算标准差
recent_8_quarters = growth_rates[-8:]
# 最新的净利润增长率
latest_growth_rate = recent_8_quarters[-1]
# 计算标准差
std_dev = np.std(recent_8_quarters)
if std_dev == 0:
return None # 避免除零错误
# 成长能力 = 净利润增长率 / 标准差
growth_capability = latest_growth_rate / std_dev
return growth_capability
except Exception as e:
logger.error(f"分析成长能力失败 {stock_code}: {str(e)}")
return None
def _generate_quarters(self, start_date: str, num_quarters: int) -> List[str]:
"""
生成季度列表
Args:
start_date: 起始日期 (YYYY-MM-DD)
num_quarters: 需要的季度数量
Returns:
List[str]: 季度列表,格式为 YYYY-MM-DD
"""
from datetime import datetime, timedelta
import calendar
quarters = []
year, month, day = map(int, start_date.split('-'))
# 确定起始季度
if month <= 3:
quarter = 1
elif month <= 6:
quarter = 2
elif month <= 9:
quarter = 3
else:
quarter = 4
current_year = year
current_quarter = quarter
for _ in range(num_quarters):
# 根据季度生成日期
if current_quarter == 1:
quarter_date = f"{current_year}-03-31"
elif current_quarter == 2:
quarter_date = f"{current_year}-06-30"
elif current_quarter == 3:
quarter_date = f"{current_year}-09-30"
else: # quarter == 4
quarter_date = f"{current_year}-12-31"
quarters.append(quarter_date)
# 移动到上一个季度
current_quarter -= 1
if current_quarter == 0:
current_quarter = 4
current_year -= 1
return quarters
def _process_quarterly_profits(self, quarterly_data: List[Dict]) -> List[Dict]:
"""
处理季度数据的累积性,计算单季度净利润
Args:
quarterly_data: 包含季度和累积利润的数据列表
Returns:
List[Dict]: 处理后的单季度利润数据
"""
# 按季度排序(从旧到新)
quarterly_data.sort(key=lambda x: x['quarter'])
processed_data = []
# 创建一个字典来快速查找季度数据
quarter_dict = {data['quarter']: data['profit'] for data in quarterly_data if data['profit'] is not None}
for data in quarterly_data:
quarter = data['quarter']
cumulative_profit = data['profit']
if cumulative_profit is None:
continue
# 判断季度类型
month = quarter.split('-')[1]
if month == '03': # 一季报,直接使用
single_quarter_profit = cumulative_profit
else:
# 需要减去前面季度的累积值
if month == '06': # 半年报 = 半年报 - 一季报
prev_quarter = quarter.replace('-06-30', '-03-31')
elif month == '09': # 三季报 = 三季报 - 半年报
prev_quarter = quarter.replace('-09-30', '-06-30')
else: # month == '12', 年报 = 年报 - 三季报
prev_quarter = quarter.replace('-12-31', '-09-30')
# 查找前一个季度的累积利润
prev_cumulative_profit = quarter_dict.get(prev_quarter)
if prev_cumulative_profit is not None:
single_quarter_profit = cumulative_profit - prev_cumulative_profit
else:
# 如果缺少前一季度数据,我们仍然可以尝试使用这个数据
# 但需要标记它可能不准确
logger.warning(f"缺少前一季度数据 {prev_quarter},直接使用累积值 {quarter}")
single_quarter_profit = cumulative_profit
processed_data.append({
'quarter': quarter,
'single_quarter_profit': single_quarter_profit
})
return processed_data
def _calculate_yoy_growth_rates(self, processed_data: List[Dict]) -> List[float]:
"""
计算同比增长率
Args:
processed_data: 处理后的单季度利润数据
Returns:
List[float]: 同比增长率列表
"""
growth_rates = []
# 按季度排序
processed_data.sort(key=lambda x: x['quarter'])
for i in range(4, len(processed_data)): # 从第5个季度开始计算同比
current_profit = processed_data[i]['single_quarter_profit']
prev_year_profit = processed_data[i-4]['single_quarter_profit'] # 去年同期
if prev_year_profit is not None and prev_year_profit != 0 and current_profit is not None:
growth_rate = (current_profit - prev_year_profit) / abs(prev_year_profit) * 100
growth_rates.append(growth_rate)
return growth_rates
# ==================== 营运效率指标 ====================
def analyze_admin_expense_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析管理费用/营业收入占比
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 管理费用率(%)如果无法计算则返回None
"""
try:
formatted_stock_code = self.code_formatter.to_dot_format(stock_code)
data = self.get_financial_data(formatted_stock_code, report_date)
if not data:
return None
# 获取管理费用和营业收入
admin_expense = data.get('profit_statement', {}).get('MANAGE_EXPENSE')
revenue = data.get('profit_statement', {}).get('OPERATE_INCOME')
ratio = self.safe_divide(admin_expense, revenue)
if ratio:
ratio = ratio * 100 # 转换为百分比
return ratio
except Exception as e:
logger.error(f"分析管理费用率失败 {stock_code}: {str(e)}")
return None
def analyze_rd_expense_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析研发费用/营业收入占比
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 研发费用率(%)如果无法计算则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取研发费用和营业收入
rd_expense = data.get('profit_statement', {}).get('RESEARCH_EXPENSE')
revenue = data.get('profit_statement', {}).get('OPERATE_INCOME')
ratio = self.safe_divide(rd_expense, revenue)
if ratio:
ratio = ratio * 100 # 转换为百分比
return ratio
except Exception as e:
logger.error(f"分析研发费用率失败 {stock_code}: {str(e)}")
return None
def analyze_sales_expense_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析销售费用/营业收入占比
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 销售费用率(%)如果无法计算则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取销售费用和营业收入
sales_expense = data.get('profit_statement', {}).get('SALE_EXPENSE')
revenue = data.get('profit_statement', {}).get('OPERATE_INCOME')
ratio = self.safe_divide(sales_expense, revenue)
if ratio:
ratio = ratio * 100 # 转换为百分比
return ratio
except Exception as e:
logger.error(f"分析销售费用率失败 {stock_code}: {str(e)}")
return None
# ==================== 规模扩张指标 ====================
def analyze_asset_liability_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析资产负债率
公式: 资产负债率 = 总负债 / 总资产
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 资产负债率(%)如果无法计算则返回None
"""
try:
formatted_stock_code = self.code_formatter.to_dot_format(stock_code)
data = self.get_financial_data(formatted_stock_code, report_date)
if not data:
return None
# 获取总负债和总资产
total_liabilities = data.get('balance_sheet', {}).get('TOTAL_LIABILITIES')
total_assets = data.get('balance_sheet', {}).get('TOTAL_ASSETS')
ratio = self.safe_divide(total_liabilities, total_assets)
if ratio:
ratio = ratio * 100 # 转换为百分比
return ratio
except Exception as e:
logger.error(f"分析资产负债率失败 {stock_code}: {str(e)}")
return None
def analyze_current_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析流动比率
公式: 流动比率 = 流动资产 / 流动负债
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 流动比率如果无法计算则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取流动资产和流动负债
current_assets = data.get('balance_sheet', {}).get('TOTAL_CURRENT_ASSETS')
current_liabilities = data.get('balance_sheet', {}).get('TOTAL_CURRENT_LIAB')
ratio = self.safe_divide(current_assets, current_liabilities)
return ratio
except Exception as e:
logger.error(f"分析流动比率失败 {stock_code}: {str(e)}")
return None
def analyze_quick_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析速动比率
公式: 速动比率 = (流动资产 - 存货) / 流动负债
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 速动比率如果无法计算则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取流动资产、存货和流动负债
current_assets = data.get('balance_sheet', {}).get('TOTAL_CURRENT_ASSETS')
inventory = data.get('balance_sheet', {}).get('INVENTORY', 0)
current_liabilities = data.get('balance_sheet', {}).get('TOTAL_CURRENT_LIAB')
if current_assets and current_liabilities:
quick_assets = current_assets - (inventory or 0)
ratio = self.safe_divide(quick_assets, current_liabilities)
else:
ratio = None
return ratio
except Exception as e:
logger.error(f"分析速动比率失败 {stock_code}: {str(e)}")
return None
# ==================== 研发投入指标 ====================
def analyze_rd_investment_growth(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析研发费用增长率使用YOY数据
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 增长率(%)如果无法获取则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取研发费用同比增长率
growth_rate = data.get('profit_statement', {}).get('RESEARCH_EXPENSE_YOY')
return growth_rate
except Exception as e:
logger.error(f"分析研发费用增长率失败 {stock_code}: {str(e)}")
return None
def analyze_rd_intensity(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析研发强度(研发费用/总资产)
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 研发强度(%)如果无法计算则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取研发费用和总资产
rd_expense = data.get('profit_statement', {}).get('RESEARCH_EXPENSE')
total_assets = data.get('balance_sheet', {}).get('TOTAL_ASSETS')
ratio = self.safe_divide(rd_expense, total_assets)
if ratio:
ratio = ratio * 100 # 转换为百分比
return ratio
except Exception as e:
logger.error(f"分析研发强度失败 {stock_code}: {str(e)}")
return None
# ==================== 供应商客户占比指标 ====================
def analyze_supplier_concentration(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析前五供应商占比
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 供应商集中度(%)如果无法获取则返回None
"""
try:
formatted_stock_code = self.code_formatter.to_dot_format(stock_code)
data = self.get_financial_data(formatted_stock_code, report_date)
if not data:
return None
# 获取前五供应商占比
supplier_ratio = data.get('top_five_suppliers_ratio')
if supplier_ratio:
supplier_ratio = supplier_ratio * 100 # 转换为百分比
return supplier_ratio
except Exception as e:
logger.error(f"分析供应商集中度失败 {stock_code}: {str(e)}")
return None
def analyze_customer_concentration(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析前五客户占比
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 客户集中度(%)如果无法获取则返回None
"""
try:
formatted_stock_code = self.code_formatter.to_dot_format(stock_code)
data = self.get_financial_data(formatted_stock_code, report_date)
if not data:
return None
# 获取前五客户占比
customer_ratio = data.get('top_five_customers_ratio')
if customer_ratio:
customer_ratio = customer_ratio * 100 # 转换为百分比
return customer_ratio
except Exception as e:
logger.error(f"分析客户集中度失败 {stock_code}: {str(e)}")
return None
# ==================== 现金流指标 ====================
def analyze_operating_cash_flow_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析经营现金流量比率
公式: 经营现金流量比率 = 经营活动现金流量净额 / 流动负债
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 经营现金流量比率如果无法计算则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取经营活动现金流量净额和流动负债
operating_cash_flow = data.get('cash_flow_statement', {}).get('NETCASH_OPERATE')
current_liabilities = data.get('balance_sheet', {}).get('TOTAL_CURRENT_LIAB')
ratio = self.safe_divide(operating_cash_flow, current_liabilities)
return ratio
except Exception as e:
logger.error(f"分析经营现金流量比率失败 {stock_code}: {str(e)}")
return None
def analyze_cash_flow_coverage_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析现金流量覆盖比率
公式: 现金流量覆盖比率 = 经营活动现金流量净额 / 归母净利润
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: 现金流量覆盖比率如果无法计算则返回None
"""
try:
data = self.get_financial_data(stock_code, report_date)
if not data:
return None
# 获取经营活动现金流量净额和归母净利润
operating_cash_flow = data.get('cash_flow_statement', {}).get('NETCASH_OPERATE')
net_profit = data.get('profit_statement', {}).get('PARENT_NETPROFIT')
ratio = self.safe_divide(operating_cash_flow, net_profit)
return ratio
except Exception as e:
logger.error(f"分析现金流量覆盖比率失败 {stock_code}: {str(e)}")
return None
# ==================== 估值指标 ====================
def analyze_pe_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析PE比率从gp_day_data表获取最新PE数据
Args:
stock_code: 股票代码
report_date: 报告日期默认最新注意当前只支持获取最新PE数据
Returns:
float: PE比率如果无法获取则返回None
"""
try:
pe_pb_data = self.get_latest_pe_pb_data(stock_code)
if pe_pb_data and pe_pb_data.get('pe') is not None:
return pe_pb_data['pe']
else:
logger.warning(f"未找到股票 {stock_code} 的PE数据")
return None
except Exception as e:
logger.error(f"分析PE比率失败 {stock_code}: {str(e)}")
return None
def analyze_pb_ratio(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析PB比率从gp_day_data表获取最新PB数据
Args:
stock_code: 股票代码
report_date: 报告日期默认最新注意当前只支持获取最新PB数据
Returns:
float: PB比率如果无法获取则返回None
"""
try:
pe_pb_data = self.get_latest_pe_pb_data(stock_code)
if pe_pb_data and pe_pb_data.get('pb') is not None:
return pe_pb_data['pb']
else:
logger.warning(f"未找到股票 {stock_code} 的PB数据")
return None
except Exception as e:
logger.error(f"分析PB比率失败 {stock_code}: {str(e)}")
return None
def get_all_stocks_pb_data(self) -> Dict[str, float]:
"""
获取全A股最新的PB数据
Returns:
Dict[str, float]: {股票代码: PB值} 的字典
"""
try:
query = text("""
SELECT symbol, pb
FROM gp_day_data
WHERE `timestamp` = (
SELECT MAX(`timestamp`) FROM gp_day_data
)
AND pb IS NOT NULL
AND pb > 0
""")
pb_data = {}
with self.mysql_engine.connect() as conn:
result = conn.execute(query)
for row in result:
symbol = row[0]
pb = float(row[1])
# 转换为点分格式
formatted_code = self.code_formatter.to_dot_format(symbol)
if formatted_code:
pb_data[formatted_code] = pb
logger.info(f"获取到 {len(pb_data)} 只股票的PB数据")
return pb_data
except Exception as e:
logger.error(f"获取全A股PB数据失败: {str(e)}")
return {}
def get_all_stocks_roe_data(self, report_date: Optional[str] = None) -> Dict[str, float]:
"""
获取全A股的ROE数据使用MongoDB聚合查询优化性能
Args:
report_date: 报告日期,默认最新
Returns:
Dict[str, float]: {股票代码: ROE值} 的字典
"""
try:
if report_date:
# 查询指定日期的数据
match_stage = {"report_date": report_date}
else:
# 查询每只股票的最新数据
match_stage = {}
pipeline = [
{"$match": match_stage},
{
"$sort": {
"stock_code": 1,
"report_date": -1
}
}
]
# 如果没有指定日期,则获取每只股票的最新数据
if not report_date:
pipeline.extend([
{
"$group": {
"_id": "$stock_code",
"latest_data": {"$first": "$$ROOT"}
}
},
{
"$replaceRoot": {"newRoot": "$latest_data"}
}
])
# 添加ROE计算
pipeline.extend([
{
"$addFields": {
"parent_netprofit": {
"$toDouble": "$profit_statement.PARENT_NETPROFIT"
},
"total_parent_equity": {
"$toDouble": "$balance_sheet.TOTAL_PARENT_EQUITY"
}
}
},
{
"$match": {
"parent_netprofit": {"$ne": None, "$ne": 0},
"total_parent_equity": {"$ne": None, "$ne": 0}
}
},
{
"$addFields": {
"roe": {
"$multiply": [
{"$divide": ["$parent_netprofit", "$total_parent_equity"]},
100
]
}
}
},
{
"$project": {
"stock_code": 1,
"roe": 1
}
}
])
roe_data = {}
cursor = self.collection.aggregate(pipeline)
for doc in cursor:
stock_code = doc.get('stock_code')
roe = doc.get('roe')
if stock_code and roe is not None:
roe_data[stock_code] = float(roe)
logger.info(f"获取到 {len(roe_data)} 只股票的ROE数据")
return roe_data
except Exception as e:
logger.error(f"获取全A股ROE数据失败: {str(e)}")
return {}
def calculate_pb_roe_rank_factor(self, stock_code: str, all_pb_data: Dict[str, float], all_roe_data: Dict[str, float]) -> Optional[float]:
"""
计算单只股票的PB-ROE排名因子
公式: Rank(PB_MRQ) - Rank(ROE_TTM)
Args:
stock_code: 股票代码
all_pb_data: 全A股PB数据字典
all_roe_data: 全A股ROE数据字典
Returns:
float: PB-ROE排名因子如果无法计算则返回None
"""
try:
# 标准化股票代码
formatted_stock_code = self.code_formatter.to_dot_format(stock_code)
if not formatted_stock_code:
return None
# 计算PB排名升序越小越好
pb_values = list(all_pb_data.values())
pb_values.sort()
target_pb = all_pb_data[formatted_stock_code]
pb_rank = pb_values.index(target_pb) + 1
# 计算ROE排名降序越大越好
roe_values = list(all_roe_data.values())
roe_values.sort(reverse=True)
target_roe = all_roe_data[formatted_stock_code]
roe_rank = roe_values.index(target_roe) + 1
# 计算因子Rank(PB) - Rank(ROE)
pb_roe_factor = pb_rank - roe_rank
return float(pb_roe_factor)
except Exception as e:
logger.error(f"计算股票 {stock_code} PB-ROE排名因子失败: {str(e)}")
return None
def analyze_pb_roe_rank_factor(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析PB-ROE排名因子保持向后兼容
公式: Rank(PB_MRQ) - Rank(ROE_TTM)
排名越小越好因子值越低说明PB估值较低、ROE较高
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: PB-ROE排名因子如果无法计算则返回None
"""
try:
# 获取全A股数据
all_pb_data = self.get_all_stocks_pb_data()
all_roe_data = self.get_all_stocks_roe_data(report_date)
if not all_pb_data or not all_roe_data:
return None
return self.calculate_pb_roe_rank_factor(stock_code, all_pb_data, all_roe_data)
except Exception as e:
logger.error(f"分析PB-ROE排名因子失败 {stock_code}: {str(e)}")
return None
def analyze_pb_roe(self, stock_code: str, report_date: Optional[str] = None) -> Optional[float]:
"""
分析PB-ROE指标原有方法保持向后兼容
公式: PB/ROE用于评估股票的价值投资潜力
Args:
stock_code: 股票代码
report_date: 报告日期,默认最新
Returns:
float: PB-ROE指标如果无法计算则返回None
"""
try:
# 获取PB值
pb = self.analyze_pb_ratio(stock_code, report_date)
# 获取ROE值
roe = self.analyze_roe(stock_code, report_date)
if pb is not None and roe is not None and roe != 0:
# ROE以百分比形式返回需要转换为小数
roe_decimal = roe / 100
pb_roe = pb / roe_decimal
return pb_roe
else:
logger.warning(f"无法计算股票 {stock_code} 的PB-ROE指标: PB={pb}, ROE={roe}")
return None
except Exception as e:
logger.error(f"分析PB-ROE指标失败 {stock_code}: {str(e)}")
return None
def close_connection(self):
"""关闭数据库连接"""
try:
if self.mongo_client:
self.mongo_client.close()
logger.info("MongoDB连接已关闭")
except Exception as e:
logger.error(f"关闭MongoDB连接失败: {str(e)}")
try:
if self.mysql_engine:
self.mysql_engine.dispose()
logger.info("MySQL连接已关闭")
except Exception as e:
logger.error(f"关闭MySQL连接失败: {str(e)}")
def main():
"""主函数 - 示例用法"""
analyzer = None
try:
# 创建分析器实例
analyzer = FinancialIndicatorAnalyzer()
# 示例:分析某只股票的财务指标
stock_code = "300661.SZ" # 圣邦股份
print(f"=== 财务指标分析:{stock_code} ===")
# 盈利能力指标
roe = analyzer.analyze_roe(stock_code)
print(f"ROE: {roe:.2f}%" if roe else "ROE: 无数据")
gross_margin = analyzer.analyze_gross_profit_margin(stock_code)
print(f"毛利率: {gross_margin:.2f}%" if gross_margin else "毛利率: 无数据")
net_margin = analyzer.analyze_net_profit_margin(stock_code)
print(f"净利率: {net_margin:.2f}%" if net_margin else "净利率: 无数据")
# 成长能力指标
growth_capability = analyzer.analyze_growth_capability(stock_code)
print(f"成长能力: {growth_capability:.2f}" if growth_capability else "成长能力: 无数据")
# 营运效率指标
admin_ratio = analyzer.analyze_admin_expense_ratio(stock_code)
print(f"管理费用率: {admin_ratio:.2f}%" if admin_ratio else "管理费用率: 无数据")
rd_ratio = analyzer.analyze_rd_expense_ratio(stock_code)
print(f"研发费用率: {rd_ratio:.2f}%" if rd_ratio else "研发费用率: 无数据")
# 规模扩张指标
asset_liability = analyzer.analyze_asset_liability_ratio(stock_code)
print(f"资产负债率: {asset_liability:.2f}%" if asset_liability else "资产负债率: 无数据")
current_ratio = analyzer.analyze_current_ratio(stock_code)
print(f"流动比率: {current_ratio:.2f}" if current_ratio else "流动比率: 无数据")
# 供应商客户占比指标
supplier_conc = analyzer.analyze_supplier_concentration(stock_code,'2024-12-31')
print(f"供应商集中度: {supplier_conc:.2f}%" if supplier_conc else "供应商集中度: 无数据")
customer_conc = analyzer.analyze_customer_concentration(stock_code, '2024-12-31')
print(f"客户集中度: {customer_conc:.2f}%" if customer_conc else "客户集中度: 无数据")
# 估值指标
pe_ratio = analyzer.analyze_pe_ratio(stock_code)
print(f"PE比率: {pe_ratio:.2f}" if pe_ratio else "PE比率: 无数据")
pb_ratio = analyzer.analyze_pb_ratio(stock_code)
print(f"PB比率: {pb_ratio:.2f}" if pb_ratio else "PB比率: 无数据")
pb_roe = analyzer.analyze_pb_roe(stock_code)
print(f"PB-ROE指标: {pb_roe:.2f}" if pb_roe else "PB-ROE指标: 无数据")
except Exception as e:
logger.error(f"程序执行失败: {str(e)}")
finally:
if analyzer:
analyzer.close_connection()
if __name__ == "__main__":
main()