stock_fundamentals/src/scripts/stock_price_change_analyzer.py

231 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding:utf-8
import requests
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime
import time
from config import XUEQIU_HEADERS
def ensure_stock_price_changes_table_exists(engine):
"""确保股价变化表存在"""
try:
create_table_query = text("""
CREATE TABLE IF NOT EXISTS stock_price_changes (
id INT AUTO_INCREMENT PRIMARY KEY,
symbol VARCHAR(10),
name VARCHAR(100),
current FLOAT,
percent FLOAT,
time_mark VARCHAR(20), -- '9:24:40''9:25:00'
add_time DATETIME,
UNIQUE KEY unique_stock_time (symbol, time_mark, add_time)
)
""")
create_analysis_table_query = text("""
CREATE TABLE IF NOT EXISTS rapid_price_changes (
id INT AUTO_INCREMENT PRIMARY KEY,
symbol VARCHAR(10),
name VARCHAR(100),
price_change_ratio FLOAT, -- 两个时间点之间的涨幅
sector_count INT, -- 所属板块数量
sectors TEXT, -- 所属板块名称(逗号分隔)
add_time DATETIME
)
""")
with engine.connect() as conn:
conn.execute(create_table_query)
conn.execute(create_analysis_table_query)
conn.commit()
print("股价变化表检查/创建完成")
except Exception as e:
print("创建股价变化表时发生错误: {}".format(str(e)))
def fetch_stock_data(db_url, time_mark):
"""获取雪球数据并保存到数据库"""
base_url = 'https://stock.xueqiu.com/v5/stock/screener/quote/list.json'
types = ['sha', 'sza', 'kcb']
headers = XUEQIU_HEADERS
all_data = []
for stock_type in types:
# 先获取总数据量
params = {
'page': 1,
'size': 100,
'order': 'desc',
'order_by': 'percent',
'market': 'CN',
'type': stock_type
}
# 初次请求以获取总页数
response = requests.get(base_url, headers=headers, params=params)
if response.status_code != 200:
print(f"请求 {stock_type} 数据失败,状态码:{response.status_code}")
continue
data = response.json()
total_count = data['data']['count']
total_pages = (total_count // params['size']) + 1
# 获取所有页面的数据
for page in range(1, total_pages + 1):
params['page'] = page
response = requests.get(base_url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
all_data.extend(data['data']['list'])
else:
print(f"请求 {stock_type} 数据第 {page} 页失败,状态码:{response.status_code}")
if all_data:
df = pd.DataFrame(all_data)
selected_columns = ['symbol', 'name', 'current', 'percent']
df = df[selected_columns]
df['symbol'] = df['symbol'].str[2:] # 去掉前缀
df['time_mark'] = time_mark
df['add_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
engine = create_engine(db_url)
ensure_stock_price_changes_table_exists(engine)
# 先删除同一时间标记的旧数据
with engine.connect() as conn:
delete_query = text("""
DELETE FROM stock_price_changes
WHERE time_mark = :time_mark
AND DATE(add_time) = CURDATE()
""")
conn.execute(delete_query, {'time_mark': time_mark})
conn.commit()
# 保存新数据
df.to_sql('stock_price_changes', con=engine, if_exists='append', index=False)
print(f"成功保存{len(df)}{time_mark}的股价数据")
return True
return False
def analyze_price_changes(db_url):
"""分析两个时间点之间的板块涨跌情况"""
engine = create_engine(db_url)
try:
# 获取所有股票的涨跌幅数据
query = text("""
SELECT
t1.symbol,
t1.name,
((t2.current - t1.current) / t1.current * 100) as price_change_ratio
FROM
stock_price_changes t1
JOIN stock_price_changes t2 ON t1.symbol = t2.symbol
WHERE
t1.time_mark = '9:24:00'
AND t2.time_mark = '9:25:10'
AND DATE(t1.add_time) = CURDATE()
AND DATE(t2.add_time) = CURDATE()
""")
price_changes_df = pd.read_sql_query(query, engine)
if not price_changes_df.empty:
# 获取板块信息
gp_gnbk_df = pd.read_sql_table('gp_gnbk', con=engine)
# 合并获取板块信息
merged_df = price_changes_df.merge(gp_gnbk_df, left_on='symbol', right_on='gp_code')
# 按板块统计涨幅超过1%的股票数量和总数
sector_stats = []
for bk_name in merged_df['bk_name'].unique():
sector_stocks = merged_df[merged_df['bk_name'] == bk_name]
total_count = len(sector_stocks)
up_count = len(sector_stocks[sector_stocks['price_change_ratio'] > 0.7])
ratio = up_count / total_count if total_count > 0 else 0
sector_stats.append({
'sector_name': bk_name,
'up_count': up_count,
'total_count': total_count,
'ratio': ratio,
'add_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
# 转换为DataFrame并按比例排序
result_df = pd.DataFrame(sector_stats)
result_df = result_df.sort_values('ratio', ascending=False)
# 打印分析结果
print("\n=== 板块涨幅分析结果 ===")
print(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\n{:<15} {:<8} {:<8} {:<10}".format(
"板块名称", "上涨家数", "总家数", "占比"
))
print("-" * 45)
# 打印所有板块数据
for _, row in result_df.iterrows():
print("{:<15} {:<8d} {:<8d} {:<10.2f}%".format(
row['sector_name'],
row['up_count'],
row['total_count'],
row['ratio'] * 100
))
print("\n总计分析板块数量:", len(result_df))
else:
print("未获取到股价变化数据")
except Exception as e:
print("分析数据时发生错误: {}".format(str(e)))
def main(db_url):
"""主函数"""
engine = create_engine(db_url)
target_time_1 = datetime.now().replace(hour=9, minute=24, second=0, microsecond=0)
target_time_2 = datetime.now().replace(hour=9, minute=25, second=10, microsecond=0)
current_time = datetime.now()
# 如果当前时间已经超过了第二个时间点,直接进行分析
if current_time > target_time_2:
print("当前时间已超过9:24:00直接进行数据分析...")
analyze_price_changes(db_url)
return
# 如果还没到第一个时间点,等待
if current_time < target_time_1:
wait_seconds = (target_time_1 - current_time).total_seconds()
print(f"等待{wait_seconds}9:24:00...")
time.sleep(wait_seconds)
# 获取第一次数据
success_1 = fetch_stock_data(db_url, '9:24:00')
if success_1:
# 如果还没到第二个时间点,等待
current_time = datetime.now()
if current_time < target_time_2:
wait_seconds = (target_time_2 - current_time).total_seconds()
print(f"等待{wait_seconds}9:25:10...")
time.sleep(wait_seconds)
# 获取第二次数据
success_2 = fetch_stock_data(db_url, '9:25:10')
if success_2:
# 分析数据
analyze_price_changes(db_url)
else:
print("9:25:10数据失败")
else:
print("9:24:00数据失败")
if __name__ == "__main__":
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
main(db_url)