stock_fundamentals/src/scripts/stock_minute_data_collector.py

138 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding:utf-8
import requests
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
from tqdm import tqdm
from config import XUEQIU_HEADERS
class StockMinuteDataCollector:
"""股票分时数据采集器类"""
def __init__(self, db_url):
"""
初始化采集器
Parameters:
-----------
db_url : str
数据库连接URL
"""
self.engine = create_engine(db_url)
self.headers = XUEQIU_HEADERS
def fetch_all_stock_codes(self):
"""从数据库获取所有股票代码"""
query = "SELECT gp_code FROM gp_code_all"
df = pd.read_sql(query, self.engine)
return df['gp_code'].tolist()
def update_market_cap(self, symbol, market_cap):
"""更新数据库中的市值信息"""
query = text("UPDATE gp_code_all SET market_cap = :market_cap WHERE gp_code = :symbol")
with self.engine.connect() as conn:
conn.execute(query, {'market_cap': market_cap, 'symbol': symbol})
def fetch_market_cap(self, symbol):
"""获取股票市值信息"""
url = f"https://stock.xueqiu.com/v5/stock/realtime/quotec.json?symbol={symbol}"
response = requests.get(url, headers=self.headers)
data = response.json()
if data['error_code'] == 0:
return data['data'][0]['market_capital']
else:
print(f"Error fetching market cap for {symbol}: {data['error_description']}")
return None
def fetch_stock_data(self, symbol, begin, end):
"""获取股票分时数据"""
url = f"https://stock.xueqiu.com/v5/stock/chart/kline.json?symbol={symbol}&begin={begin}&end={end}&period=1m&type=before&count=-284&indicator=kline,pe,pb,ps,pcf,market_capital,agt,ggt,balance"
response = requests.get(url, headers=self.headers)
return response.json()
def save_to_database(self, data, symbol):
"""保存数据到数据库"""
try:
items = data['data']['item']
columns = data['data']['column']
except KeyError as e:
print(f"KeyError for {symbol}: {e}")
return
df = pd.DataFrame(items, columns=columns)
df['symbol'] = symbol
# 数据库中有的字段
required_columns = ['timestamp', 'volume', 'open', 'high', 'low', 'close', 'chg', 'percent', 'turnoverrate', 'amount', 'symbol']
# 检查并保留实际存在的字段
existing_columns = [col for col in required_columns if col in df.columns]
df = df[existing_columns]
# 数据类型转换
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True).dt.tz_convert('Asia/Shanghai')
df.to_sql('gp_min_data', self.engine, if_exists='append', index=False)
def fetch_data_for_date(self, date=None):
"""
获取指定日期或当天的数据
Parameters:
-----------
date : str, optional
日期字符串,格式为'YYYY-MM-DD'如果为None则获取当天数据
"""
if date is None:
# 如果没有指定日期,使用当天日期
date = datetime.now().strftime('%Y-%m-%d')
start_date = datetime.strptime(date, '%Y-%m-%d')
end_date = start_date + timedelta(days=1)
# 获取所有股票代码
stock_codes = self.fetch_all_stock_codes()
# 循环请求每只股票的数据并保存,使用进度条显示进度
for symbol in tqdm(stock_codes, desc=f"Fetching and saving stock data for {date}"):
begin = int(start_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000)
end = int(end_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp() * 1000)
data = self.fetch_stock_data(symbol, begin, end)
if data['error_code'] == 0:
self.save_to_database(data, symbol)
else:
print(f"Error fetching data for {symbol} on {date}: {data['error_description']}")
print(f"Data fetching and saving completed for {date}.")
def collect_stock_minute_data(db_url, date=None):
"""
快捷方法:收集股票分时数据
Parameters:
-----------
db_url : str
数据库连接URL
date : str, optional
日期字符串,格式为'YYYY-MM-DD'如果为None则获取当天数据
"""
collector = StockMinuteDataCollector(db_url)
collector.fetch_data_for_date(date)
if __name__ == "__main__":
# 示例调用
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
# 方法1使用快捷函数获取当天数据
collect_stock_minute_data(db_url)
# 方法2使用快捷函数获取指定日期数据
# collect_stock_minute_data(db_url, '2024-07-29')
# 方法3使用完整的类
# collector = StockMinuteDataCollector(db_url)
# collector.fetch_data_for_date() # 获取当天数据
# collector.fetch_data_for_date('2024-07-29') # 获取指定日期数据