This commit is contained in:
满脸小星星 2025-09-19 11:54:39 +08:00
parent f2400305e9
commit 326835967d
14 changed files with 2116 additions and 283 deletions

View File

@ -19,3 +19,4 @@ pandas==2.2.3
apscheduler==3.11.0 apscheduler==3.11.0
pymongo==4.13.0 pymongo==4.13.0
scikit-learn==1.6.1 scikit-learn==1.6.1
dbutils==3.1.2

52
src/QMT/config.yaml Normal file
View File

@ -0,0 +1,52 @@
# QMT交易配置
qmt_config:
# 客户端路径
client_path: "D:\\SoftwareCenter\\中金财富QMT个人版模拟交易端38421\\userdata_mini"
# 账户配置
account:
account_id: "10839603" # 资金账号
account_type: "STOCK" # 账号类型STOCK(股票) CREDIT(信用) FUTURE(期货)
# 数据库配置
database:
host: "192.168.18.199"
port: 3306
user: "root"
password: "Chlry#$.8"
database: "db_gp_cj"
# 日志配置
logging:
level: "INFO"
log_dir: "../logs"
log_format: "%(asctime)s - %(levelname)s - %(message)s"
# Redis 配置
redis:
host: "192.168.18.208"
port: 6379
db: 13
password: "wlkj2018"
socket_timeout: 5
# 计划同步配置从MySQL同步到Redis
plan_sync:
interval_seconds: 60 # 定时同步周期,秒
# 交易策略配置
strategy:
# 买入配置
buy:
amount: 50000 # 每只股票买入金额(元)
enabled: true # 是否启用买入策略
# 卖出配置
sell:
enabled: true # 是否启用卖出策略
# 卖出策略:当价格高于目标价时卖出全部持仓
# 订单配置
order:
timeout: 300 # 订单超时时间默认5分钟
cooldown: 60 # 同股票下单冷却时间(秒),避免频繁下单

52
src/QMT/config_loader.py Normal file
View File

@ -0,0 +1,52 @@
# coding:utf-8
import os
import yaml
import sys
def load_config():
"""加载配置文件"""
config_path = os.path.join(os.path.dirname(__file__), 'config.yaml')
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
return config
except Exception as e:
print(f"加载配置文件失败: {str(e)}")
sys.exit(1)
def get_qmt_config():
"""获取QMT配置"""
config = load_config()
return config['qmt_config']
def get_database_config():
"""获取数据库配置"""
config = load_config()
return config['database']
def get_logging_config():
"""获取日志配置"""
config = load_config()
return config['logging']
def get_strategy_config():
"""获取策略配置"""
config = load_config()
return config.get('strategy', {})
def get_database_config():
"""获取数据库配置"""
config = load_config()
print(config)
print("----------------------------------------------------------")
return config.get('database', {})
def get_redis_config():
"""获取Redis配置"""
config = load_config()
return config.get('redis', {})
def get_plan_sync_config():
"""获取计划同步配置"""
config = load_config()
return config.get('plan_sync', {'interval_seconds': 60})

575
src/QMT/database_manager.py Normal file
View File

@ -0,0 +1,575 @@
# coding:utf-8
import pymysql
import logging
import datetime
import time
from typing import Dict, List, Optional, Tuple
from dbutils.pooled_db import PooledDB # 1. 引入 PooledDB
from config_loader import get_database_config
# --- 连接池的配置 ---
# 2. 创建全局唯一的数据库连接池
# 我们可以把连接池看作一个"连接的仓库",所有线程都从这里借用和归还连接
try:
db_config = get_database_config()
pool = PooledDB(
creator=pymysql, # 指定使用 pymysql 作为数据库连接库
mincached=2, # 初始化时,池中至少创建的空闲连接数
maxcached=5, # 池中最多能存放的空闲连接数
maxconnections=10, # 连接池允许的最大连接数
blocking=True, # 连接池中无可用连接时,是否阻塞等待
host=db_config['host'],
port=db_config['port'],
user=db_config['user'],
password=db_config['password'],
database=db_config['database'],
charset='utf8mb4',
autocommit=True, # 注意:这里设置 autocommit=True每次执行完 SQL 会自动提交
# 连接保活配置
connect_timeout=10,
read_timeout=60,
write_timeout=60
)
# 获取一个初始化日志记录器
initial_logger = logging.getLogger(__name__)
initial_logger.info("数据库连接池创建成功")
except Exception as e:
initial_logger = logging.getLogger(__name__)
initial_logger.error(f"数据库连接池创建失败: {str(e)}")
pool = None # 创建失败,则 pool 为 None
def close_pool():
"""关闭连接池(在应用退出时调用)"""
if pool:
pool.close()
logging.getLogger(__name__).info("数据库连接池已关闭")
class DatabaseManager:
"""
数据库管理器类已改造为使用连接池
现在这个类的每个实例都是轻量级的并且线程安全
"""
def __init__(self, logger=None):
self.logger = logger or logging.getLogger(__name__)
if pool is None:
raise Exception("数据库连接池未初始化,请检查配置")
# 3. 每个实例共享同一个连接池
self.pool = pool
def _get_connection(self):
"""从连接池获取一个连接"""
return self.pool.connection()
def execute_query(self, sql: str, params: tuple = None) -> List[Tuple]:
"""执行查询SQL从连接池获取连接"""
# 4. 每次执行都从池中获取一个新连接,用完后自动归还
conn = self._get_connection()
try:
with conn.cursor() as cursor:
cursor.execute(sql, params)
result = cursor.fetchall()
return result
except Exception as e:
self.logger.error(f"执行查询失败: {sql}, 错误: {str(e)}")
# 连接池会自动处理坏掉的连接,所以通常不需要复杂的重连逻辑
raise
finally:
if conn:
# 5. conn.close() 在这里不是关闭连接,而是将连接【归还】给连接池
conn.close()
def execute_update(self, sql: str, params: tuple = None) -> int:
"""执行更新SQL从连接池获取连接"""
conn = self._get_connection()
try:
with conn.cursor() as cursor:
affected_rows = cursor.execute(sql, params)
# 因为设置了 autocommit=True, 所以不需要手动 conn.commit()
return affected_rows
except Exception as e:
self.logger.error(f"执行更新失败: {sql}, 错误: {str(e)}")
raise
finally:
if conn:
# 同样,将连接归还给池
conn.close()
# ======================================================================
# 注意:以下所有业务逻辑方法,都不需要做任何修改!
# 因为它们都依赖于 execute_query 和 execute_update而这两个方法已经被改造好了。
# ======================================================================
def get_active_buy_plans(self, strategy_id: int = 1) -> Dict[str, Dict]:
"""获取激活的买入计划(仅当日最新记录)"""
sql = """
SELECT t.stock_code, t.stock_name, t.target_price, t.buy_amount, t.trading_version
FROM trading_buy_plan t
JOIN (
SELECT stock_code, MAX(update_time) AS max_ut
FROM trading_buy_plan
WHERE strategy_id=%s AND is_active=1 AND DATE(trading_time)=CURDATE()
GROUP BY stock_code
) m ON t.stock_code=m.stock_code AND t.update_time=m.max_ut
WHERE t.strategy_id=%s AND t.is_active=1
"""
try:
results = self.execute_query(sql, (strategy_id, strategy_id))
buy_plans = {}
for row in results:
stock_code, stock_name, target_price, buy_amount, tver = row
buy_plans[stock_code] = {
'stock_name': stock_name,
'target_price': float(target_price),
'buy_amount': float(buy_amount),
'trading_version': int(tver or 0)
}
self.logger.info(f"加载买入计划: {len(buy_plans)}")
return buy_plans
except Exception as e:
self.logger.error(f"获取买入计划失败: {str(e)}")
return {}
def get_active_sell_plans(self, strategy_id: int = 1) -> Dict[str, Dict]:
"""获取激活的卖出计划(仅当日最新记录)"""
sql = """
SELECT t.stock_code, t.stock_name, t.target_price, t.sell_quantity, t.trading_version
FROM trading_sell_plan t
JOIN (
SELECT stock_code, MAX(update_time) AS max_ut
FROM trading_sell_plan
WHERE strategy_id=%s AND is_active=1 AND DATE(trading_time)=CURDATE()
GROUP BY stock_code
) m ON t.stock_code=m.stock_code AND t.update_time=m.max_ut
WHERE t.strategy_id=%s AND t.is_active=1
"""
try:
results = self.execute_query(sql, (strategy_id, strategy_id))
sell_plans = {}
for row in results:
stock_code, stock_name, target_price, sell_quantity, tver = row
sell_plans[stock_code] = {
'stock_name': stock_name,
'target_price': float(target_price),
'sell_quantity': sell_quantity,
'trading_version': int(tver or 0)
}
self.logger.info(f"加载卖出计划: {len(sell_plans)}")
return sell_plans
except Exception as e:
self.logger.error(f"获取卖出计划失败: {str(e)}")
return {}
# ===== Plan versioned read (optional trading_version column) =====
def get_max_plan_version(self, strategy_id: int = 1) -> int:
"""获取买/卖计划的最大版本号(需要表中存在 trading_version 列。若列不存在返回0。"""
try:
buy_sql = "SELECT COALESCE(MAX(trading_version), 0) FROM trading_buy_plan WHERE strategy_id=%s AND is_active=1"
sell_sql = "SELECT COALESCE(MAX(trading_version), 0) FROM trading_sell_plan WHERE strategy_id=%s AND is_active=1"
bmax = self.execute_query(buy_sql, (strategy_id,))[0][0]
smax = self.execute_query(sell_sql, (strategy_id,))[0][0]
return max(int(bmax or 0), int(smax or 0))
except Exception as e:
self.logger.warning(f"读取最大计划版本失败,可能缺少 trading_version 列: {str(e)}")
return 0
def get_buy_plans_since(self, strategy_id: int, version: int) -> Dict[str, Dict]:
"""获取版本号大于给定值的买入计划(需要 trading_version 列)。列缺失时返回全部激活计划。"""
try:
sql = """
SELECT stock_code, stock_name, target_price, buy_amount, trading_version
FROM trading_buy_plan WHERE strategy_id=%s AND is_active=1 AND trading_version>%s
"""
results = self.execute_query(sql, (strategy_id, version))
plans = {}
for row in results:
stock_code, stock_name, target_price, buy_amount, tver = row
plans[stock_code] = {
'stock_name': stock_name,
'target_price': float(target_price),
'buy_amount': float(buy_amount),
'trading_version': int(tver or 0)
}
return plans
except Exception:
# 回退到全部激活计划
return self.get_active_buy_plans(strategy_id)
def get_sell_plans_since(self, strategy_id: int, version: int) -> Dict[str, Dict]:
"""获取版本号大于给定值的卖出计划(需要 trading_version 列)。列缺失时返回全部激活计划。"""
try:
sql = """
SELECT stock_code, stock_name, target_price, sell_quantity, trading_version
FROM trading_sell_plan WHERE strategy_id=%s AND is_active=1 AND trading_version>%s
"""
results = self.execute_query(sql, (strategy_id, version))
plans = {}
for row in results:
stock_code, stock_name, target_price, sell_quantity, tver = row
plans[stock_code] = {
'stock_name': stock_name,
'target_price': float(target_price),
'sell_quantity': sell_quantity,
'trading_version': int(tver or 0)
}
return plans
except Exception:
return self.get_active_sell_plans(strategy_id)
def insert_order(self, order_data: Dict) -> bool:
"""插入订单记录"""
sql = """
INSERT INTO trading_order
(order_id, strategy_id, stock_code, order_side, order_quantity, order_price, target_price,
order_status, order_time, order_remark)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
try:
params = (
order_data['order_id'],
order_data['strategy_id'],
order_data['stock_code'],
order_data['order_side'],
order_data['order_quantity'],
order_data['order_price'],
order_data['target_price'],
order_data['order_status'],
order_data['order_time'],
order_data['order_remark']
)
self.execute_update(sql, params)
self.logger.info(f"订单记录已插入: {order_data['order_id']}")
return True
except Exception as e:
self.logger.error(f"插入订单记录失败: {str(e)}")
return False
def update_order_status(self, order_id: str, status: str, filled_quantity: int = None, filled_time: str = None) -> bool:
"""更新订单状态"""
try:
# 添加调试信息
self.logger.info(f"准备更新订单状态: order_id={order_id} (类型: {type(order_id)}), status={status}")
# 确保 order_id 是字符串类型
order_id_str = str(order_id)
if filled_quantity is not None and filled_time:
sql = """
UPDATE trading_order
SET order_status = %s, filled_quantity = %s, filled_time = %s
WHERE order_id = %s OR qmt_order_id = %s
"""
params = (status, filled_quantity, filled_time, order_id_str, order_id_str)
else:
sql = """
UPDATE trading_order
SET order_status = %s
WHERE order_id = %s OR qmt_order_id = %s
"""
params = (status, order_id_str, order_id_str)
# 添加 SQL 调试信息
self.logger.info(f"执行 SQL: {sql}")
self.logger.info(f"参数: {params}")
affected_rows = self.execute_update(sql, params)
# if affected_rows > 0:
self.logger.info(f"订单状态已更新: {order_id_str} -> {status}")
return True
# else:
# self.logger.warning(f"订单状态更新失败,未找到订单: {order_id_str}")
# return False
except Exception as e:
self.logger.error(f"更新订单状态失败: {str(e)}")
return False
def update_qmt_order_id(self, our_order_id: str, qmt_order_id: str) -> bool:
"""更新QMT内部订单ID"""
try:
sql = """
UPDATE trading_order
SET qmt_order_id = %s
WHERE order_id = %s
"""
params = (str(qmt_order_id), our_order_id)
affected_rows = self.execute_update(sql, params)
# if affected_rows > 0:
self.logger.info(f"QMT订单ID已更新: {our_order_id} -> {qmt_order_id}")
return True
# else:
# self.logger.warning(f"更新QMT订单ID失败未找到订单: {our_order_id}")
# return False
except Exception as e:
self.logger.error(f"更新QMT订单ID失败: {str(e)}")
return False
def update_position(self, stock_code: str, quantity_change: int, price: float, is_buy: bool) -> bool:
"""更新持仓"""
try:
# 先查询当前持仓
sql_select = "SELECT total_quantity, cost_price FROM trading_position WHERE stock_code = %s"
results = self.execute_query(sql_select, (stock_code,))
if results:
# 更新现有持仓
current_quantity, current_cost_price = results[0]
if is_buy:
# 买入:增加持仓
new_quantity = current_quantity + quantity_change
if current_quantity > 0:
new_cost_price = (current_cost_price * current_quantity + price * quantity_change) / new_quantity
else:
new_cost_price = price
else:
# 卖出:减少持仓
new_quantity = max(0, current_quantity - quantity_change)
new_cost_price = current_cost_price if new_quantity > 0 else 0
if new_quantity > 0:
sql_update = """
UPDATE trading_position
SET total_quantity = %s, cost_price = %s, update_time = NOW()
WHERE stock_code = %s
"""
self.execute_update(sql_update, (new_quantity, new_cost_price, stock_code))
else:
# 持仓为0删除记录
sql_delete = "DELETE FROM trading_position WHERE stock_code = %s"
self.execute_update(sql_delete, (stock_code,))
else:
# 新增持仓(仅买入时)
if is_buy and quantity_change > 0:
sql_insert = """
INSERT INTO trading_position
(stock_code, total_quantity, cost_price, create_time, update_time)
VALUES (%s, %s, %s, NOW(), NOW())
"""
self.execute_update(sql_insert, (stock_code, quantity_change, price))
self.logger.info(f"持仓已更新: {stock_code} {'买入' if is_buy else '卖出'} {quantity_change}")
return True
except Exception as e:
self.logger.error(f"更新持仓失败: {str(e)}")
return False
def insert_trading_log(self, log_data: Dict) -> bool:
"""插入交易日志"""
sql = """
INSERT INTO trading_log
(order_id, stock_code, log_type, log_level, message, create_time)
VALUES (%s, %s, %s, %s, %s, %s)
"""
try:
params = (
log_data.get('order_id'),
log_data.get('stock_code'),
log_data['log_type'],
log_data['log_level'],
log_data['message'],
log_data['create_time']
)
self.execute_update(sql, params)
return True
except Exception as e:
self.logger.error(f"插入交易日志失败: {str(e)}")
return False
def get_current_positions(self) -> Dict[str, int]:
"""获取当前持仓"""
sql = "SELECT stock_code, total_quantity FROM trading_position WHERE total_quantity > 0"
try:
results = self.execute_query(sql)
positions = {row[0]: row[1] for row in results}
self.logger.info(f"从数据库加载持仓: {positions}")
return positions
except Exception as e:
self.logger.error(f"获取当前持仓失败: {str(e)}")
return {}
def get_open_orders(self) -> Dict[str, Dict]:
"""获取未完成订单pending/submitted"""
sql = """
SELECT order_id, stock_code, order_side, order_quantity, order_price, order_status, order_time
FROM trading_order
WHERE order_status IN ('pending','submitted')
"""
try:
results = self.execute_query(sql)
orders = {}
for row in results:
order_id, stock_code, side, qty, price, status, order_time = row
orders[stock_code] = {
'order_id': order_id,
'order_quantity': qty,
'order_price': float(price) if price is not None else None,
'status': status,
'side': side,
'order_time': order_time.strftime('%Y-%m-%d %H:%M:%S') if order_time else None
}
return orders
except Exception as e:
self.logger.error(f"获取未完成订单失败: {str(e)}")
return {}
def get_todays_orders_by_statuses(self, statuses: List[str]) -> List[Dict]:
"""获取今日指定状态集合的订单列表"""
if not statuses:
return []
placeholders = ','.join(['%s'] * len(statuses))
sql = f"""
SELECT order_id, stock_code, order_side, order_quantity, order_price, order_status, order_time
FROM trading_order
WHERE DATE(order_time)=CURDATE() AND order_status IN ({placeholders})
ORDER BY order_time ASC
"""
try:
results = self.execute_query(sql, tuple(statuses))
orders: List[Dict] = []
for row in results:
order_id, stock_code, side, qty, price, status, order_time = row
orders.append({
'order_id': str(order_id),
'stock_code': stock_code,
'side': side,
'order_quantity': int(qty) if qty is not None else 0,
'order_price': float(price) if price is not None else None,
'order_status': status,
'order_time': order_time,
})
return orders
except Exception as e:
self.logger.error(f"获取今日订单失败: {str(e)}")
return []
# ===== 持仓快照同步(覆盖式) =====
def upsert_position_snapshot(self, stock_code: str, total_quantity: int, available_quantity: int,
frozen_quantity: int, cost_price: float, market_price: float,
market_value: float, profit_loss: float, profit_loss_ratio: float) -> None:
"""按快照覆盖更新单条持仓(存在则更新,不存在则插入)"""
sql = """
INSERT INTO trading_position
(stock_code, total_quantity, available_quantity, frozen_quantity, cost_price,
market_price, market_value, profit_loss, profit_loss_ratio, create_time, update_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
ON DUPLICATE KEY UPDATE
total_quantity=VALUES(total_quantity),
available_quantity=VALUES(available_quantity),
frozen_quantity=VALUES(frozen_quantity),
cost_price=VALUES(cost_price),
market_price=VALUES(market_price),
market_value=VALUES(market_value),
profit_loss=VALUES(profit_loss),
profit_loss_ratio=VALUES(profit_loss_ratio),
update_time=NOW()
"""
self.execute_update(sql, (stock_code, total_quantity, available_quantity, frozen_quantity,
cost_price, market_price, market_value, profit_loss, profit_loss_ratio))
def delete_positions_except(self, stock_codes: List[str]) -> None:
"""删除不在给定集合中的所有持仓记录(用于快照清理)"""
if not stock_codes:
# 如果为空,清空整表
sql = "DELETE FROM trading_position"
self.execute_update(sql)
return
placeholders = ','.join(['%s'] * len(stock_codes))
sql = f"DELETE FROM trading_position WHERE stock_code NOT IN ({placeholders})"
self.execute_update(sql, tuple(stock_codes))
def update_account_funds(self, account_id: str, account_type: str, total_asset: float,
available_cash: float, frozen_cash: float, market_value: float,
profit_loss: float, profit_loss_ratio: float) -> bool:
"""更新账户资金信息"""
try:
sql = """
INSERT INTO trading_account_funds
(account_id, account_type, total_asset, available_cash, frozen_cash,
market_value, profit_loss, profit_loss_ratio, update_time)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE
account_type = VALUES(account_type),
total_asset = VALUES(total_asset),
available_cash = VALUES(available_cash),
frozen_cash = VALUES(frozen_cash),
market_value = VALUES(market_value),
profit_loss = VALUES(profit_loss),
profit_loss_ratio = VALUES(profit_loss_ratio),
update_time = NOW()
"""
params = (account_id, account_type, total_asset, available_cash,
frozen_cash, market_value, profit_loss, profit_loss_ratio)
affected_rows = self.execute_update(sql, params)
if affected_rows > 0:
self.logger.info(f"账户资金信息更新成功: {account_id}")
return True
else:
self.logger.warning(f"账户资金信息更新失败: {account_id}")
return False
except Exception as e:
self.logger.error(f"更新账户资金信息失败: {str(e)}")
return False
def get_account_funds(self, account_id: str) -> Optional[Dict]:
"""获取账户资金信息"""
try:
sql = "SELECT * FROM trading_account_funds WHERE account_id = %s ORDER BY update_time DESC LIMIT 1"
result = self.execute_query(sql, (account_id,))
if result:
row = result[0]
return {
'id': row[0],
'account_id': row[1],
'account_type': row[2],
'total_asset': float(row[3]),
'available_cash': float(row[4]),
'frozen_cash': float(row[5]),
'market_value': float(row[6]),
'profit_loss': float(row[7]),
'profit_loss_ratio': float(row[8]),
'update_time': row[9],
'create_time': row[10]
}
return None
except Exception as e:
self.logger.error(f"获取账户资金信息失败: {str(e)}")
return None
def get_account_funds_history(self, account_id: str, days: int = 7) -> List[Dict]:
"""获取账户资金历史记录"""
try:
sql = """
SELECT * FROM trading_account_funds
WHERE account_id = %s
AND update_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
ORDER BY update_time DESC
"""
result = self.execute_query(sql, (account_id, days))
funds_history = []
for row in result:
funds_history.append({
'id': row[0],
'account_id': row[1],
'account_type': row[2],
'total_asset': float(row[3]),
'available_cash': float(row[4]),
'frozen_cash': float(row[5]),
'market_value': float(row[6]),
'profit_loss': float(row[7]),
'profit_loss_ratio': float(row[8]),
'update_time': row[9],
'create_time': row[10]
})
return funds_history
except Exception as e:
self.logger.error(f"获取账户资金历史记录失败: {str(e)}")
return []
def close(self):
"""关闭数据库连接池(注意:这里不需要关闭,因为池是全局的)"""
# DBUtils.PooledDB 的池是全局的,不需要在这里关闭
# 应该在应用退出时调用 close_pool() 函数
pass

310
src/QMT/main.py Normal file
View File

@ -0,0 +1,310 @@
# coding:utf-8
import time
import datetime
import logging
import os
import sys
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
# 导入自定义模块
from config_loader import get_qmt_config, get_logging_config, get_strategy_config
from strategy import load_initial_positions, create_buy_strategy_callback, create_sell_strategy_callback, start_account_funds_monitor, start_reconciliation_monitor, start_position_snapshot_monitor
from redis_state_manager import RedisStateManager
from database_manager import DatabaseManager, close_pool
from trader_callback import MyXtQuantTraderCallback
def setup_logging():
"""设置日志配置"""
log_config = get_logging_config()
# 创建logs目录如果不存在
log_dir = os.path.join(os.path.dirname(__file__), '..', 'logs')
os.makedirs(log_dir, exist_ok=True)
# 生成日志文件名(包含日期时间)
log_filename = f"qmt_auto_buy_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
log_filepath = os.path.join(log_dir, log_filename)
# 配置日志格式
logging.basicConfig(
level=getattr(logging, log_config['level']),
format=log_config['log_format'],
handlers=[
logging.FileHandler(log_filepath, encoding='utf-8'),
logging.StreamHandler(sys.stdout) # 同时输出到控制台
]
)
logger = logging.getLogger(__name__)
logger.info(f"日志文件已创建: {log_filepath}")
return logger
def interact():
"""执行后进入repl模式"""
import code
code.InteractiveConsole(locals=globals()).interact()
if __name__ == '__main__':
# 初始化日志
logger = setup_logging()
# 获取配置
qmt_config = get_qmt_config()
strategy_config = get_strategy_config()
buy_amount = strategy_config.get('buy', {}).get('amount', 50000)
logger.info("启动基于数据库的自动买入卖出策略")
logger.info("="*50)
# 初始化计划先从MySQL拉取“已发布计划”覆盖写入到Redis一次性然后读取Redis供订阅
rsm = RedisStateManager(logger)
try:
dbm = DatabaseManager(logger)
db_buy = dbm.get_active_buy_plans(strategy_id=1)
db_sell = dbm.get_active_sell_plans(strategy_id=1)
# 清空Redis计划并全量覆盖
rsm.clear_buy_plans()
rsm.clear_sell_plans()
for code, plan in db_buy.items():
# 兼容数据库返回结构
rsm.set_buy_plan(code, {
'stock_name': plan.get('stock_name'),
'target_price': plan['target_price'],
'buy_amount': plan['buy_amount'],
'is_active': 1
})
for code, plan in db_sell.items():
rsm.set_sell_plan(code, {
'stock_name': plan.get('stock_name'),
'target_price': plan['target_price'],
'sell_quantity': plan.get('sell_quantity'),
'is_active': 1
})
logger.info(f"已用MySQL已发布计划初始化Redis买入{len(db_buy)},卖出{len(db_sell)}")
# 设置已同步版本
try:
max_ver = dbm.get_max_plan_version(strategy_id=1)
rsm.set_synced_version(max_ver)
logger.info(f"初始化同步版本设置为 {max_ver}")
except Exception as _e:
logger.warning(f"初始化同步版本设置失败: {_e}")
except Exception as e:
logger.warning(f"初始化Redis计划失败将使用已有Redis计划{str(e)}")
buy_plans = rsm.get_all_buy_plans()
sell_plans = rsm.get_all_sell_plans()
logger.info(f"买入计划数量(Redis): {len(buy_plans)}")
logger.info(f"卖出计划数量(Redis): {len(sell_plans)}")
logger.info(f"每只股票买入金额: {buy_amount}")
logger.info("="*50)
# 显示买入计划
if buy_plans:
logger.info("买入计划:")
for stock_code, plan_info in buy_plans.items():
logger.info(f" {stock_code} ({plan_info['stock_name']}): 目标价格 {plan_info['target_price']:.2f}, 买入金额 {plan_info['buy_amount']:.2f}")
# 显示卖出计划
if sell_plans:
logger.info("卖出计划:")
for stock_code, plan_info in sell_plans.items():
sell_qty = plan_info['sell_quantity'] if plan_info['sell_quantity'] else '全部'
logger.info(f" {stock_code} ({plan_info['stock_name']}): 目标价格 {plan_info['target_price']:.2f}, 卖出数量 {sell_qty}")
if not buy_plans and not sell_plans:
logger.warning("警告:买入和卖出计划都为空!请检查数据库配置。")
logger.info("="*50)
# 从配置文件获取客户端路径
client_path = qmt_config['client_path']
logger.info(f"客户端路径: {client_path}")
# 生成session id 整数类型 同时运行的策略不能重复
session_id = int(time.time())
xt_trader = XtQuantTrader(client_path, session_id)
# 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程但是查询和推送的数据在时序上会变得不确定
# 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程
# xt_trader.set_relaxed_response_order_enabled(True)
# 从配置文件获取账户信息
account_id = qmt_config['account']['account_id']
account_type = qmt_config['account']['account_type']
# 创建资金账号对象
acc = StockAccount(account_id, account_type)
logger.info(f"账户信息: {account_id} ({account_type})")
# 创建交易回调类对象,并声明接收回调
callback = MyXtQuantTraderCallback(logger)
xt_trader.register_callback(callback)
# 启动交易线程
xt_trader.start()
# 建立交易连接返回0表示连接成功
connect_result = xt_trader.connect()
logger.info(f'建立交易连接返回0表示连接成功 {connect_result}')
# 对交易回调进行订阅订阅后可以收到交易主推返回0表示订阅成功
subscribe_result = xt_trader.subscribe(acc)
logger.info(f'对交易回调进行订阅订阅后可以收到交易主推返回0表示订阅成功 {subscribe_result}')
# 查询账户资金
account_info = xt_trader.query_stock_asset(acc)
print(account_info)
print("=========================================")
available_cash = account_info.m_dCash
logger.info(f"账户可用资金: {available_cash:.2f}")
# 启动时一次性加载并同步持仓券商→Redis
initial_positions = load_initial_positions(xt_trader, acc, logger)
logger.info(f"初始持仓: {initial_positions}")
# 覆盖式同步DB未完成订单到Redis
try:
rsm = RedisStateManager(logger)
dbm = DatabaseManager(logger)
open_orders = dbm.get_open_orders()
rsm.clear_pending()
for stock_code, info in open_orders.items():
rsm.set_pending(stock_code, info)
logger.info(f"已用DB未完成订单覆盖Redis在途订单{len(open_orders)}")
except Exception as e:
logger.warning(f"Redis状态读取失败: {str(e)}")
# 启动账户资金监控(每小时更新一次)
start_account_funds_monitor(xt_trader, acc, logger)
# 停用订单超时监控(策略为价格触发挂单,挂单后不自动撤单)
logger.info("已停用订单超时监控与自动撤单逻辑:挂单成功后将持续等待成交")
# 启动每10分钟对账线程兜底无回调或程序中断时可在配置里调整频率
start_reconciliation_monitor(xt_trader, acc, logger, interval_seconds=600)
# 启动每10分钟持仓快照覆盖更新线程只更新DB不叠加
start_position_snapshot_monitor(xt_trader, acc, logger, interval_seconds=600)
# 下载股票数据
xtdata.download_sector_data()
# 创建买入策略回调函数
buy_strategy_callback = create_buy_strategy_callback(xt_trader, acc, buy_amount, logger)
# 创建卖出策略回调函数
sell_strategy_callback = create_sell_strategy_callback(xt_trader, acc, logger)
# 订阅买卖计划中的股票行情基于Redis
if buy_plans:
buy_code_list = list(buy_plans.keys())
logger.info(f"开始订阅 {len(buy_code_list)} 只买入股票的行情...")
for code in buy_code_list:
xtdata.subscribe_quote(code, '1m', callback=buy_strategy_callback)
logger.info(f"已订阅买入 {code} ({buy_plans[code].get('stock_name','')}) 行情")
if sell_plans:
sell_code_list = list(sell_plans.keys())
logger.info(f"开始订阅 {len(sell_code_list)} 只卖出股票的行情...")
for code in sell_code_list:
xtdata.subscribe_quote(code, '1m', callback=sell_strategy_callback)
logger.info(f"已订阅卖出 {code} ({sell_plans[code].get('stock_name','')}) 行情")
# 计划同步线程每60秒从MySQL同步到Redis并直接更新订阅不再分线程管理订阅
import threading, time
def plan_sync_worker():
interval = 60 # 固定60秒同步一次
# 当前已订阅集合
subscribed_buy = set(buy_plans.keys())
subscribed_sell = set(sell_plans.keys())
while True:
try:
time.sleep(interval)
dbm_local = DatabaseManager(logger)
# 拉取当日最新激活计划
latest_buy = dbm_local.get_active_buy_plans(strategy_id=1)
latest_sell = dbm_local.get_active_sell_plans(strategy_id=1)
# 对比Redis找出新增/删除/变更(版本或数值变化)
curr_buy = rsm.get_all_buy_plans()
curr_sell = rsm.get_all_sell_plans()
latest_buy_codes = set(latest_buy.keys())
latest_sell_codes = set(latest_sell.keys())
# 新增/删除
add_buy = latest_buy_codes - set(curr_buy.keys())
del_buy = set(curr_buy.keys()) - latest_buy_codes
add_sell = latest_sell_codes - set(curr_sell.keys())
del_sell = set(curr_sell.keys()) - latest_sell_codes
# 变更(比较 trading_version 或关键字段)
changed_buy = set()
for code in (latest_buy_codes & set(curr_buy.keys())):
lb = latest_buy[code]
cb = curr_buy[code]
if (
lb.get('trading_version', 0) != cb.get('trading_version', 0)
or lb['target_price'] != cb.get('target_price')
or lb['buy_amount'] != cb.get('buy_amount')
):
changed_buy.add(code)
changed_sell = set()
for code in (latest_sell_codes & set(curr_sell.keys())):
ls = latest_sell[code]
cs = curr_sell[code]
if (
ls.get('trading_version', 0) != cs.get('trading_version', 0)
or ls['target_price'] != cs.get('target_price')
or (ls.get('sell_quantity') != cs.get('sell_quantity'))
):
changed_sell.add(code)
# 更新Redis
for code in add_buy | changed_buy:
rsm.set_buy_plan(code, latest_buy[code])
for code in add_sell | changed_sell:
rsm.set_sell_plan(code, latest_sell[code])
for code in del_buy:
rsm.del_buy_plan(code)
for code in del_sell:
rsm.del_sell_plan(code)
# 直接同步订阅(差分)
new_buy_sub = latest_buy_codes
new_sell_sub = latest_sell_codes
# 买入订阅差分
for code in new_buy_sub - subscribed_buy:
xtdata.subscribe_quote(code, '1m', callback=buy_strategy_callback)
logger.info(f"[订阅新增-买入] {code}")
for code in subscribed_buy - new_buy_sub:
xtdata.unsubscribe_quote(code)
logger.info(f"[取消订阅-买入] {code}")
# 卖出订阅差分
for code in new_sell_sub - subscribed_sell:
xtdata.subscribe_quote(code, '1m', callback=sell_strategy_callback)
logger.info(f"[订阅新增-卖出] {code}")
for code in subscribed_sell - new_sell_sub:
xtdata.unsubscribe_quote(code)
logger.info(f"[取消订阅-卖出] {code}")
subscribed_buy = new_buy_sub
subscribed_sell = new_sell_sub
logger.info("计划同步与订阅更新完成")
except Exception as e:
logger.warning(f"计划同步线程异常: {str(e)}")
threading.Thread(target=plan_sync_worker, daemon=True).start()
logger.info("策略启动完成,等待行情推送...")
if buy_plans:
logger.info("买入策略:当股票价格低于目标价格时,将自动买入指定金额")
if sell_plans:
logger.info("卖出策略:当股票价格高于目标价格时,将自动卖出指定数量")
logger.info("按 Ctrl+C 停止策略")
# 阻塞主线程退出
try:
xt_trader.run_forever()
except KeyboardInterrupt:
logger.info("策略已停止")
finally:
# 程序退出时清理连接池
try:
close_pool()
logger.info("数据库连接池已清理")
except Exception as e:
logger.error(f"清理数据库连接池失败: {str(e)}")
# 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 把上一行的run_forever注释掉 否则不会执行到这里)
# interact()

View File

@ -0,0 +1,166 @@
# coding:utf-8
import redis
import json
import datetime
from typing import Dict, Optional, List
from config_loader import get_redis_config
POSITIONS_KEY = 'qmt:positions' # Hash: stock_code -> quantity
PENDING_KEY = 'qmt:pending_orders' # Hash: stock_code -> json(order_info)
META_KEY = 'qmt:meta' # Hash: various metadata
PLAN_BUY_KEY = 'qmt:plan:buy' # Hash: stock_code -> json(plan)
PLAN_SELL_KEY = 'qmt:plan:sell' # Hash: stock_code -> json(plan)
PLAN_VERSION_KEY = 'qmt:plan:version' # String/integer: version bump on change
PLAN_SYNCED_VERSION_KEY = 'qmt:plan:synced_version' # 上次从DB同步的版本
class RedisStateManager:
def __init__(self, logger=None):
self.logger = logger
cfg = get_redis_config()
pool = redis.ConnectionPool(
host=cfg.get('host', 'localhost'),
port=cfg.get('port', 6379),
db=cfg.get('db', 0),
password=cfg.get('password'),
socket_timeout=cfg.get('socket_timeout', 5),
decode_responses=True,
)
self.r = redis.Redis(connection_pool=pool)
# Positions
def get_all_positions(self) -> Dict[str, int]:
data = self.r.hgetall(POSITIONS_KEY)
return {k: int(v) for k, v in data.items()} if data else {}
def get_position(self, stock_code: str) -> int:
v = self.r.hget(POSITIONS_KEY, stock_code)
return int(v) if v is not None else 0
def set_position(self, stock_code: str, quantity: int) -> None:
if quantity > 0:
self.r.hset(POSITIONS_KEY, stock_code, quantity)
else:
self.r.hdel(POSITIONS_KEY, stock_code)
self._touch_meta('positions_updated_at')
def incr_position(self, stock_code: str, delta: int) -> int:
new_qty = self.r.hincrby(POSITIONS_KEY, stock_code, delta)
if new_qty <= 0:
self.r.hdel(POSITIONS_KEY, stock_code)
new_qty = 0
self._touch_meta('positions_updated_at')
return new_qty
# Pending Orders
def get_all_pending(self) -> Dict[str, Dict]:
data = self.r.hgetall(PENDING_KEY)
return {k: json.loads(v) for k, v in data.items()} if data else {}
def get_pending(self, stock_code: str) -> Optional[Dict]:
v = self.r.hget(PENDING_KEY, stock_code)
return json.loads(v) if v else None
def set_pending(self, stock_code: str, order_info: Dict) -> None:
self.r.hset(PENDING_KEY, stock_code, json.dumps(order_info, ensure_ascii=False))
self._touch_meta('pending_updated_at')
def del_pending(self, stock_code: str) -> None:
self.r.hdel(PENDING_KEY, stock_code)
self._touch_meta('pending_updated_at')
def clear_pending(self) -> None:
self.r.delete(PENDING_KEY)
self._touch_meta('pending_cleared_at')
# Meta
def _touch_meta(self, field: str) -> None:
self.r.hset(META_KEY, field, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def clear_all(self) -> None:
pipe = self.r.pipeline()
pipe.delete(POSITIONS_KEY)
pipe.delete(PENDING_KEY)
pipe.delete('qmt_order_mapping') # 清理订单映射
pipe.execute()
self._touch_meta('cleared_at')
# 订单ID映射管理
def set_qmt_order_mapping(self, qmt_order_id: str, our_order_id: str) -> None:
"""建立QMT内部订单ID到我们订单ID的映射"""
self.r.hset('qmt_order_mapping', qmt_order_id, our_order_id)
def get_our_order_id(self, qmt_order_id: str) -> Optional[str]:
"""根据QMT内部订单ID获取我们的订单ID"""
return self.r.hget('qmt_order_mapping', qmt_order_id)
def del_qmt_order_mapping(self, qmt_order_id: str) -> None:
"""删除订单ID映射"""
self.r.hdel('qmt_order_mapping', qmt_order_id)
# Plans (buy/sell)
def get_buy_plan(self, stock_code: str) -> Optional[Dict]:
v = self.r.hget(PLAN_BUY_KEY, stock_code)
return json.loads(v) if v else None
def get_sell_plan(self, stock_code: str) -> Optional[Dict]:
v = self.r.hget(PLAN_SELL_KEY, stock_code)
return json.loads(v) if v else None
def set_buy_plan(self, stock_code: str, plan: Dict) -> None:
self.r.hset(PLAN_BUY_KEY, stock_code, json.dumps(plan, ensure_ascii=False))
self.bump_plan_version()
def set_sell_plan(self, stock_code: str, plan: Dict) -> None:
self.r.hset(PLAN_SELL_KEY, stock_code, json.dumps(plan, ensure_ascii=False))
self.bump_plan_version()
def del_buy_plan(self, stock_code: str) -> None:
self.r.hdel(PLAN_BUY_KEY, stock_code)
self.bump_plan_version()
def del_sell_plan(self, stock_code: str) -> None:
self.r.hdel(PLAN_SELL_KEY, stock_code)
self.bump_plan_version()
def get_all_buy_plans(self) -> Dict[str, Dict]:
data = self.r.hgetall(PLAN_BUY_KEY)
return {k: json.loads(v) for k, v in data.items()} if data else {}
def get_all_sell_plans(self) -> Dict[str, Dict]:
data = self.r.hgetall(PLAN_SELL_KEY)
return {k: json.loads(v) for k, v in data.items()} if data else {}
def get_buy_codes(self) -> List[str]:
return list(self.r.hkeys(PLAN_BUY_KEY))
def get_sell_codes(self) -> List[str]:
return list(self.r.hkeys(PLAN_SELL_KEY))
def get_plan_version(self) -> int:
v = self.r.get(PLAN_VERSION_KEY)
try:
return int(v) if v is not None else 0
except Exception:
return 0
def bump_plan_version(self) -> int:
return int(self.r.incr(PLAN_VERSION_KEY))
def clear_buy_plans(self) -> None:
self.r.delete(PLAN_BUY_KEY)
self.bump_plan_version()
def clear_sell_plans(self) -> None:
self.r.delete(PLAN_SELL_KEY)
self.bump_plan_version()
# synced version helpers
def get_synced_version(self) -> int:
v = self.r.get(PLAN_SYNCED_VERSION_KEY)
try:
return int(v) if v is not None else 0
except Exception:
return 0
def set_synced_version(self, version: int) -> None:
self.r.set(PLAN_SYNCED_VERSION_KEY, version)

601
src/QMT/strategy.py Normal file
View File

@ -0,0 +1,601 @@
# coding:utf-8
import time
import datetime
import threading
import logging
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
from xtquant import xtconstant
from database_manager import DatabaseManager
from redis_state_manager import RedisStateManager
def is_call_auction_time(now_time=None):
"""
判断当前时间是否处于开盘集合竞价时段
"""
if not now_time:
now_time = datetime.datetime.now().time()
# 开盘集合竞价时间段
call_auction_start = datetime.time(9, 15)
call_auction_end = datetime.time(9, 25)
return call_auction_start <= now_time <= call_auction_end
def is_continuous_trading_time(now_time=None):
"""
判断当前时间是否处于连续交易时段
"""
if not now_time:
now_time = datetime.datetime.now().time()
# 上午连续交易时段
morning_start = datetime.time(9, 30)
morning_end = datetime.time(11, 30)
# 下午连续交易时段
afternoon_start = datetime.time(13, 0)
afternoon_end = datetime.time(14, 57)
is_morning = morning_start <= now_time <= morning_end
is_afternoon = afternoon_start <= now_time <= afternoon_end
return is_morning or is_afternoon
# 定义一个类 创建类的实例 作为状态的容器
class _a():
pass
A = _a()
A.hsa = xtdata.get_stock_list_in_sector('沪深A股')
A.order_timeout = 300 # 订单超时时间默认5分钟
# 数据库管理器实例
db_manager = None
redis_manager = None
def initialize_database_manager(logger=None):
"""初始化数据库管理器"""
global db_manager
if db_manager is None:
db_manager = DatabaseManager(logger)
return db_manager
def initialize_redis_manager(logger=None):
"""初始化Redis状态管理器"""
global redis_manager
if redis_manager is None:
redis_manager = RedisStateManager(logger)
return redis_manager
def load_trading_plans_from_database(strategy_id=1, logger=None):
"""已弃用计划改为直接从Redis读取保留此函数用于日志兼容"""
if logger:
logger.info("交易计划现改为从Redis读取数据库仅做审计。")
return {}, {}
def load_initial_positions(xt_trader, acc, logger):
"""启动时一次性加载初始持仓状态券商→Redis并返回Redis视图"""
try:
# 先从QMT获取持仓
positions = xt_trader.query_stock_positions(acc)
qmt_positions = {pos.stock_code: pos.m_nVolume for pos in positions if pos.m_nVolume > 0}
# 从数据库获取持仓
db_manager = initialize_database_manager(logger)
db_positions = db_manager.get_current_positions()
# 将券商持仓写入Redis为权威值DB中但券商为0的暂不写入避免脏数据覆盖
r = initialize_redis_manager(logger)
# 清理Redis中不存在于券商的持仓设为0即删除
for code in list(r.get_all_positions().keys()):
if code not in qmt_positions:
r.set_position(code, 0)
for code, qty in qmt_positions.items():
r.set_position(code, qty)
logger.info(f"初始持仓加载完成已同步至Redis")
logger.info(f" QMT持仓: {qmt_positions}")
logger.info(f" 数据库持仓: {db_positions}")
return r.get_all_positions()
except Exception as e:
logger.error(f"加载初始持仓失败: {str(e)}")
A.positions = {}
return {}
def update_position_in_memory(stock_code, quantity_change, is_buy=True, price=None, logger=None):
"""以Redis为主更新持仓同时更新数据库不再维护内存镜像"""
try:
# Redis持仓更新
r = initialize_redis_manager(logger)
delta = quantity_change if is_buy else -quantity_change
new_qty = r.incr_position(stock_code, delta)
# 更新数据库持仓
if price is not None:
db_manager = initialize_database_manager(logger)
db_manager.update_position(stock_code, quantity_change, price, is_buy)
if logger:
logger.info(f"持仓状态已更新: {stock_code} -> {new_qty}Redis")
except Exception as e:
if logger:
logger.error(f"更新持仓状态失败: {str(e)}")
def add_pending_order(stock_code, order_id, order_quantity, order_price, target_price, order_side='buy', logger=None):
"""添加在途订单写Redis为主同时记录数据库"""
try:
order_info = {
'order_id': order_id,
'order_quantity': order_quantity,
'order_price': order_price,
'target_price': target_price,
'order_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'status': 'pending',
'side': order_side
}
# Redis
r = initialize_redis_manager(logger)
r.set_pending(stock_code, order_info)
# 记录到数据库
db_manager = initialize_database_manager(logger)
order_data = {
'order_id': order_id,
'strategy_id': 1,
'stock_code': stock_code,
'order_side': order_side,
'order_quantity': order_quantity,
'order_price': order_price,
'target_price': target_price,
'order_status': 'pending',
'order_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'order_remark': f"{order_side.upper()}_{stock_code}_{int(time.time())}"
}
db_manager.insert_order(order_data)
if logger:
logger.info(f"添加在途订单: {stock_code} -> {order_id}")
except Exception as e:
if logger:
logger.error(f"添加在途订单失败: {str(e)}")
def remove_pending_order(stock_code, logger=None):
"""移除在途订单,同时更新数据库状态"""
try:
r = initialize_redis_manager(logger)
order_info = r.get_pending(stock_code)
if order_info is None:
if logger:
logger.warning(f"未找到在途订单: {stock_code}")
return
r.del_pending(stock_code)
# 更新数据库订单状态
print("====================================")
db_manager = initialize_database_manager(logger)
db_manager.update_order_status(order_info['order_id'], 'completed')
if logger:
logger.info(f"移除在途订单: {stock_code} -> {order_info['order_id']}")
except Exception as e:
if logger:
logger.error(f"移除在途订单失败: {str(e)}")
def is_stock_pending_order(stock_code):
"""检查股票是否有在途订单以Redis为准"""
r = initialize_redis_manager()
return r.get_pending(stock_code) is not None
def check_order_timeout(xt_trader, acc, logger):
"""已停用:本策略不再执行超时自动撤单,仅保留占位"""
logger.info("check_order_timeout 已停用:挂单后不再按超时撤单")
return
def start_order_timeout_monitor(xt_trader, acc, logger):
"""已停用:不再启动订单超时监控线程"""
logger.info("订单超时监控已停用:不启动后台监控线程")
return
def create_buy_strategy_callback(xt_trader, acc, buy_amount, logger):
"""创建买入策略回调函数"""
def f(data):
"""
实时行情回调函数
检查当前价格是否低于目标价格如果满足条件则买入
"""
# logger.info(f"收到买入行情数据: {datetime.datetime.now()}")
# 获取当前时间
current_time = datetime.datetime.now().time()
# 判断当前交易时段
is_call_auction = is_call_auction_time(current_time)
is_continuous = is_continuous_trading_time(current_time)
for stock_code, current_data in data.items():
if stock_code not in A.hsa:
continue
# 从Redis读取买入计划
r = initialize_redis_manager(logger)
plan_info = r.get_buy_plan(stock_code)
if not plan_info or plan_info.get('is_active', 1) != 1:
continue
try:
# 获取当前价格和目标价格
current_price = current_data[0]['close']
target_price = plan_info['target_price']
buy_amount = plan_info['buy_amount']
# logger.info(f"[买入监控] {stock_code} 当前价格: {current_price:.2f}, 目标价格: {target_price:.2f}")
# 判断是否满足买入条件:当前价格低于目标价格
if current_price < target_price:
# 检查持仓Redis
current_qty = r.get_position(stock_code)
if current_qty > 0:
# logger.info(f"{stock_code} 已有持仓 {current_qty}股,跳过买入")
continue
# 检查是否有在途订单Redis
if is_stock_pending_order(stock_code):
logger.info(f"{stock_code} 有在途订单,跳过买入")
continue
# 集合竞价时段:只观察,不下单
if is_call_auction:
logger.info(f"[集合竞价观察] {stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f},等待连续交易时段下单")
continue
# 连续交易时段:执行下单
if is_continuous:
logger.info(f"[连续交易下单] 满足买入条件!{stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f}")
# 计算买入数量(从数据库获取金额)
buy_volume = int(buy_amount / current_price / 100) * 100 # 取整为100的整数倍
if buy_volume > 0:
logger.info(f"准备买入 {stock_code} {buy_volume}股,金额约 {buy_volume * current_price:.2f}")
# 生成订单ID
order_id = f"BUY_{stock_code}_{int(time.time())}"
# 执行买入订单
async_seq = xt_trader.order_stock_async(
acc,
stock_code,
xtconstant.STOCK_BUY,
buy_volume,
xtconstant.FIX_PRICE,
target_price,
'auto_buy_strategy',
order_id
)
# 添加到在途订单(同时记录到数据库)
add_pending_order(stock_code, order_id, buy_volume, current_price, target_price, 'buy', logger)
logger.info(f"已提交买入订单: {stock_code} {buy_volume}股,价格{target_price}订单ID: {order_id}")
else:
logger.warning(f"买入数量计算为0跳过 {stock_code}")
else:
# 非交易时段
logger.info(f"[非交易时段] {stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f},等待交易时段")
except Exception as e:
logger.error(f"处理 {stock_code} 买入行情数据时出错: {str(e)}")
continue
return f
def create_sell_strategy_callback(xt_trader, acc, logger):
"""创建卖出策略回调函数"""
def f(data):
"""
实时行情回调函数
检查当前价格是否高于目标价格如果满足条件且持有该股票则卖出
"""
# logger.info(f"收到卖出行情数据: {datetime.datetime.now()}")
# 获取当前时间
current_time = datetime.datetime.now().time()
# 判断当前交易时段
is_call_auction = is_call_auction_time(current_time)
is_continuous = is_continuous_trading_time(current_time)
for stock_code, current_data in data.items():
if stock_code not in A.hsa:
continue
# 从Redis读取卖出计划
r = initialize_redis_manager(logger)
plan_info = r.get_sell_plan(stock_code)
if not plan_info or plan_info.get('is_active', 1) != 1:
continue
try:
# 获取当前价格和目标价格
current_price = current_data[0]['close']
target_price = plan_info['target_price']
# logger.info(f"[卖出监控] {stock_code} 当前价格: {current_price:.2f}, 目标价格: {target_price:.2f}")
# 判断是否满足卖出条件:当前价格高于目标价格
if current_price > target_price:
# 检查持仓Redis
r = initialize_redis_manager(logger)
current_position = r.get_position(stock_code)
if current_position <= 0:
logger.info(f"{stock_code} 无持仓,跳过卖出")
continue
# 检查是否有在途订单
if is_stock_pending_order(stock_code):
logger.info(f"{stock_code} 有在途订单,跳过卖出")
continue
# 集合竞价时段:只观察,不下单
if is_call_auction:
logger.info(f"[集合竞价观察] {stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f},等待连续交易时段下单")
continue
# 连续交易时段:执行下单
if is_continuous:
logger.info(f"[连续交易下单] 满足卖出条件!{stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f}")
# 卖出数量:全部持仓
sell_volume = current_position
if sell_volume > 0:
logger.info(f"准备卖出 {stock_code} {sell_volume}股,金额约 {sell_volume * current_price:.2f}")
# 生成订单ID
order_id = f"SELL_{stock_code}_{int(time.time())}"
# 执行卖出订单
async_seq = xt_trader.order_stock_async(
acc,
stock_code,
xtconstant.STOCK_SELL,
sell_volume,
xtconstant.LATEST_PRICE,
target_price,
'auto_sell_strategy',
order_id
)
# 添加到在途订单(同时记录到数据库)
add_pending_order(stock_code, order_id, sell_volume, current_price, target_price, 'sell', logger)
logger.info(f"已提交卖出订单: {stock_code} {sell_volume}股,价格{target_price}订单ID: {order_id}")
else:
logger.warning(f"卖出数量为0跳过 {stock_code}")
else:
# 非交易时段
logger.info(f"[非交易时段] {stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f},等待交易时段")
except Exception as e:
logger.error(f"处理 {stock_code} 卖出行情数据时出错: {str(e)}")
continue
return f
def update_account_funds_periodically(xt_trader, acc, logger):
"""每小时更新账户资金信息到数据库"""
while True:
try:
# 获取账户资金信息
account_info = xt_trader.query_stock_asset(acc)
if account_info:
# 计算相关数据
available_cash = getattr(account_info, 'm_dCash', 0.0)
frozen_cash = getattr(account_info, 'm_dFrozenCash', 0.0)
market_value = getattr(account_info, 'm_dMarketValue', 0.0)
total_asset = available_cash + frozen_cash + market_value
# 尝试获取盈亏信息如果属性不存在则设为0
try:
profit_loss = getattr(account_info, 'm_dProfitLoss', 0.0)
except AttributeError:
# 如果m_dProfitLoss不存在尝试其他可能的属性名
profit_loss = getattr(account_info, 'm_dProfit', 0.0)
# 计算盈亏比例
if total_asset > 0:
profit_loss_ratio = (profit_loss / total_asset) * 100
else:
profit_loss_ratio = 0.0
# 更新到数据库
db_manager = initialize_database_manager(logger)
success = db_manager.update_account_funds(
account_id=acc.account_id,
account_type="acc.account_type",
total_asset=total_asset,
available_cash=available_cash,
frozen_cash=frozen_cash,
market_value=market_value,
profit_loss=profit_loss,
profit_loss_ratio=profit_loss_ratio
)
if success:
logger.info(f"账户资金更新成功 - 总资产: {total_asset:.2f}, 可用资金: {available_cash:.2f}, 市值: {market_value:.2f}")
else:
logger.warning("账户资金更新失败")
else:
logger.warning("无法获取账户资金信息")
except Exception as e:
logger.error(f"更新账户资金信息时出错: {str(e)}")
# 等待1小时3600秒
time.sleep(3600)
def start_account_funds_monitor(xt_trader, acc, logger):
"""启动账户资金监控线程(每小时更新一次)"""
try:
funds_thread = threading.Thread(
target=update_account_funds_periodically,
args=(xt_trader, acc, logger),
daemon=True
)
funds_thread.start()
logger.info("账户资金监控线程已启动(每小时更新一次)")
except Exception as e:
logger.error(f"启动账户资金监控失败: {str(e)}")
# ========== 每5分钟对账查询券商持仓并反向更新订单/持仓 ==========
def reconcile_orders_and_positions(xt_trader, acc, logger):
"""一次性对账(基于当日成交对比委托):
- 仅查询MySQL今日订单submittedfilled[部分]pendingordered
- 通过QMT接口获取当日成交明细/汇总按股票累计成交量对比委托量
* 当日成交量 >= 委托量 => 状态置为 completed
* 0 < 当日成交量 < 委托量 => 状态置为 filled部分成交记录 filled_quantity
* 当日成交量 == 0 => 不处理
- 本函数不更新持仓持仓改由快照线程覆盖更新
"""
try:
dbm = initialize_database_manager(logger)
# 1) 读取今日未完成订单四种状态submitted、filled(部分成交)、pending、委托订单
# 你提到“委托订单”第四种状态名,这里按 'ordered' 兜底;如你表里具体值不同,可替换。
candidate_statuses = ['submitted', 'filled', 'pending', 'ordered']
orders = dbm.get_todays_orders_by_statuses(candidate_statuses)
if not orders:
return
# 2) 通过QMT查询当日成交结果并按股票聚合成交数量
filled_volume_by_code = {}
trades = xt_trader.query_stock_orders(acc, cancelable_only = False)
for tr in trades or []:
code = getattr(tr, 'stock_code', None) or getattr(tr, 'm_strInstrumentID', None)
vol = int(getattr(tr, 'traded_volume', 0) or getattr(tr, 'm_nVolume', 0) or 0)
if not code or vol <= 0:
continue
filled_volume_by_code[code] = filled_volume_by_code.get(code, 0) + vol
# 3) 遍历订单并根据“当日成交累计 vs 委托量”判断状态
for od in orders:
try:
stock_code = od['stock_code']
side = od['side']
order_id = od['order_id']
qty = int(od.get('order_quantity') or 0)
day_filled = int(filled_volume_by_code.get(stock_code, 0))
if day_filled <= 0:
continue
if day_filled >= qty and qty > 0:
# 全部成交 -> completed
dbm.update_order_status(order_id, 'completed', qty, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info(f"[对账-简] 全部成交(>=委托): {stock_code} {side} {day_filled}/{qty}")
elif 0 < day_filled < qty:
# 部分成交 -> filled部分成交
dbm.update_order_status(order_id, 'filled', day_filled, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info(f"[对账-简] 部分成交(<委托): {stock_code} {side} {day_filled}/{qty}")
except Exception as ie:
logger.warning(f"[对账-简] 处理订单失败 {od}: {ie}")
except Exception as e:
if logger:
logger.error(f"对账过程出错: {str(e)}")
def start_reconciliation_monitor(xt_trader, acc, logger, interval_seconds: int = 300):
"""启动对账后台线程默认每5分钟一次"""
try:
def worker():
while True:
try:
reconcile_orders_and_positions(xt_trader, acc, logger)
except Exception as ie:
logger.warning(f"对账线程异常: {ie}")
time.sleep(interval_seconds)
t = threading.Thread(target=worker, daemon=True)
t.start()
logger.info(f"对账线程已启动(每{interval_seconds}秒)")
except Exception as e:
logger.error(f"启动对账线程失败: {str(e)}")
# ========== 每5分钟持仓快照覆盖更新仅DB ==========
def refresh_positions_snapshot(xt_trader, acc, logger):
"""从QMT拉取当前持仓覆盖更新到DB的 trading_position
- 对存在的标的 upsert 所有持仓字段数量可用冻结成本价市价市值盈亏等
- 删除DB中多余的标的
本函数不操作Redis
"""
try:
dbm = initialize_database_manager(logger)
positions = xt_trader.query_stock_positions(acc) or []
valid_codes = []
for pos in positions:
try:
code = pos.stock_code
# 获取持仓基本信息
total_qty = int(getattr(pos, 'm_nVolume', 0) or 0)
available_qty = int(getattr(pos, 'm_nCanUseVolume', 0) or 0)
frozen_qty = max(0, total_qty - available_qty) # 冻结数量 = 总数量 - 可用数量
# 获取价格和市值信息
cost_price = float(getattr(pos, 'm_dOpenPrice', 0.0) or 0.0)
market_price = float(getattr(pos, 'm_dLastPrice', 0.0) or 0.0)
market_value = float(getattr(pos, 'm_dInstrumentValue', 0.0) or 0.0)
# 获取盈亏信息
profit_loss = float(getattr(pos, 'm_dPositionProfit', 0.0) or 0.0)
position_cost = float(getattr(pos, 'm_dPositionCost', 0.0) or 0.0)
# 计算盈亏比例
if position_cost > 0:
profit_loss_ratio = (profit_loss / position_cost) * 100
else:
profit_loss_ratio = 0.0
if total_qty > 0:
dbm.upsert_position_snapshot(
code, total_qty, available_qty, frozen_qty,
cost_price, market_price, market_value,
profit_loss, profit_loss_ratio
)
valid_codes.append(code)
except Exception as e:
logger.warning(f"处理持仓记录失败 {getattr(pos, 'stock_code', 'unknown')}: {e}")
continue
# 清理非持有标的
dbm.delete_positions_except(valid_codes)
logger.info(f"[持仓快照] 已覆盖更新 {len(valid_codes)} 条持仓记录")
except Exception as e:
if logger:
logger.error(f"持仓快照刷新失败: {str(e)}")
def start_position_snapshot_monitor(xt_trader, acc, logger, interval_seconds: int = 300):
try:
def worker():
while True:
try:
refresh_positions_snapshot(xt_trader, acc, logger)
except Exception as ie:
logger.warning(f"持仓快照线程异常: {ie}")
time.sleep(interval_seconds)
t = threading.Thread(target=worker, daemon=True)
t.start()
logger.info(f"持仓快照线程已启动(每{interval_seconds}秒)")
except Exception as e:
logger.error(f"启动持仓快照线程失败: {str(e)}")
# 为了向后兼容,保留原来的函数名
def create_strategy_callback(xt_trader, acc, buy_amount, logger):
"""创建策略回调函数(兼容性函数,实际调用买入回调)"""
return create_buy_strategy_callback(xt_trader, acc, buy_amount, logger)

184
src/QMT/trader_callback.py Normal file
View File

@ -0,0 +1,184 @@
# coding:utf-8
import datetime
import sys
from xtquant.xttrader import XtQuantTraderCallback
from strategy import update_position_in_memory, remove_pending_order
from database_manager import DatabaseManager
class MyXtQuantTraderCallback(XtQuantTraderCallback):
def __init__(self, logger):
self.logger = logger
self.db_manager = DatabaseManager(logger)
def on_disconnected(self):
"""
连接断开
:return:
"""
self.logger.warning(f"{datetime.datetime.now()} 连接断开回调")
def on_stock_order(self, order):
"""
委托回报推送
:param order: XtOrder对象
:return:
"""
self.logger.info(f"{datetime.datetime.now()} 委托回调 {order.order_remark}")
# 更新在途订单状态和数据库
try:
# 从订单备注中提取股票代码
order_remark = order.order_remark
if '_' in order_remark:
stock_code = order_remark.split('_')[1] # 修正:取第二段作为股票代码
# 检查Redis中是否存在该在途订单
from strategy import initialize_redis_manager
r = initialize_redis_manager(self.logger)
order_info = r.get_pending(stock_code)
if order_info:
# 确保 order_id 是字符串类型
qmt_order_id_str = str(order.order_id)
our_order_id = order_info.get('order_id')
# 先更新QMT订单ID映射
if our_order_id:
self.db_manager.update_qmt_order_id(our_order_id, qmt_order_id_str)
# 更新数据库订单状态
self.db_manager.update_order_status(qmt_order_id_str, 'submitted')
# 记录交易日志
log_data = {
'order_id': our_order_id or qmt_order_id_str,
'stock_code': stock_code,
'log_type': 'order_submitted',
'log_level': 'INFO',
'message': f'订单已提交: {order.order_remark}',
'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
self.db_manager.insert_trading_log(log_data)
self.logger.info(f"订单状态更新为已提交: {stock_code}")
else:
self.logger.warning(f"未找到在途订单: {stock_code}")
except Exception as e:
self.logger.error(f"更新订单状态失败: {str(e)}")
def on_stock_trade(self, trade):
"""
成交变动推送
:param trade: XtTrade对象
:return:
"""
self.logger.info(f"{datetime.datetime.now()} 成交回调 {trade.order_remark} 成交价格 {trade.traded_price} 成交数量 {trade.traded_volume}")
# 更新内存中的持仓状态和数据库
try:
# 根据订单备注判断是买入还是卖出
is_buy = True # 默认买入
if trade.order_remark.startswith('SELL_'):
is_buy = False
elif trade.order_remark.startswith('BUY_'):
is_buy = True
# 记录交易方向
trade_direction = "买入" if is_buy else "卖出"
self.logger.info(f"识别交易方向: {trade_direction}")
# 更新内存和数据库持仓状态
update_position_in_memory(trade.stock_code, trade.traded_volume, is_buy, trade.traded_price, self.logger)
# 确保 order_id 是字符串类型
order_id_str = str(trade.order_id)
# 更新数据库订单状态
self.db_manager.update_order_status(
order_id_str,
'filled',
trade.traded_volume,
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
)
# 记录交易日志
log_data = {
'order_id': order_id_str,
'stock_code': trade.stock_code,
'log_type': 'trade_filled',
'log_level': 'INFO',
'message': f'{trade_direction}成交: {trade.stock_code} {trade.traded_volume}股 @ {trade.traded_price}',
'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
self.db_manager.insert_trading_log(log_data)
# 从在途订单中移除
remove_pending_order(trade.stock_code, self.logger)
except Exception as e:
self.logger.error(f"更新持仓状态失败: {str(e)}")
def on_order_error(self, order_error):
"""
委托失败推送
:param order_error:XtOrderError 对象
:return:
"""
self.logger.error(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
# 从在途订单中移除失败的订单,并更新数据库
try:
order_remark = order_error.order_remark
if '_' in order_remark:
stock_code = order_remark.split('_')[1] # 修正:取第二段作为股票代码
# 确保 order_id 是字符串类型
order_id_str = str(order_error.order_id)
# 更新数据库订单状态
self.db_manager.update_order_status(order_id_str, 'failed')
# 记录错误日志
log_data = {
'order_id': order_id_str,
'stock_code': stock_code,
'log_type': 'order_failed',
'log_level': 'ERROR',
'message': f'订单失败: {order_error.error_msg}',
'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
self.db_manager.insert_trading_log(log_data)
remove_pending_order(stock_code, self.logger)
except Exception as e:
self.logger.error(f"处理失败订单失败: {str(e)}")
def on_cancel_error(self, cancel_error):
"""
撤单失败推送
:param cancel_error: XtCancelError 对象
:return:
"""
self.logger.error(f"{datetime.datetime.now()} {sys._getframe().f_code.co_name}")
def on_order_stock_async_response(self, response):
"""
异步下单回报推送
:param response: XtOrderResponse 对象
:return:
"""
self.logger.info(f"异步委托回调 {response.order_remark}")
def on_cancel_order_stock_async_response(self, response):
"""
:param response: XtCancelOrderResponse 对象
:return:
"""
self.logger.info(f"{datetime.datetime.now()} {sys._getframe().f_code.co_name}")
def on_account_status(self, status):
"""
:param response: XtAccountStatus 对象
:return:
"""
self.logger.info(f"{datetime.datetime.now()} {sys._getframe().f_code.co_name}")

View File

@ -127,7 +127,7 @@ class FinancialDataCollectorV2:
List[str]: 股票代码列表 List[str]: 股票代码列表
""" """
try: try:
query = "SELECT DISTINCT gp_code_two FROM gp_code_all_copy1 WHERE gp_code_two IS NOT NULL AND gp_code_two != ''" query = "SELECT DISTINCT gp_code_two FROM gp_code_all WHERE gp_code_two IS NOT NULL AND gp_code_two != ''"
with self.mysql_engine.connect() as conn: with self.mysql_engine.connect() as conn:
df = pd.read_sql(text(query), conn) df = pd.read_sql(text(query), conn)
@ -224,11 +224,11 @@ class FinancialDataCollectorV2:
# 简化逻辑直接从2025年开始往前推21个季度 # 简化逻辑直接从2025年开始往前推21个季度
base_year = 2025 base_year = 2025
base_quarters = [ base_quarters = [
(2025, 3), (2024, 12), (2024, 9), (2024, 6), (2024, 3), (2025, 6), (2025, 3), (2024, 12), (2024, 9), (2024, 6), (2024, 3),
(2023, 12), (2023, 9), (2023, 6), (2023, 3), (2023, 12), (2023, 9), (2023, 6), (2023, 3),
(2022, 12), (2022, 9), (2022, 6), (2022, 3), (2022, 12), (2022, 9), (2022, 6), (2022, 3),
(2021, 12), (2021, 9), (2021, 6), (2021, 3), (2021, 12), (2021, 9), (2021, 6), (2021, 3),
(2020, 12), (2020, 9), (2020, 6), (2020, 3) (2020, 12), (2020, 9), (2020, 6)
] ]
if i < len(base_quarters): if i < len(base_quarters):
@ -539,124 +539,9 @@ class FinancialDataCollectorV2:
logger.error(f"保存数据到MongoDB失败: {str(e)}") logger.error(f"保存数据到MongoDB失败: {str(e)}")
return False return False
def check_missing_data(self, stock_code: str) -> List[str]:
"""
检查MongoDB中哪些报告期的资产负债表或现金流量表数据为空
Args:
stock_code: 股票代码
Returns:
List[str]: 需要更新的报告期列表
"""
try:
# 查询该股票的所有记录
records = list(self.collection.find({'stock_code': stock_code}))
missing_periods = []
for record in records:
balance_empty = not record.get('balance_sheet') or record.get('balance_sheet') == {}
cash_empty = not record.get('cash_flow_statement') or record.get('cash_flow_statement') == {}
# 如果资产负债表或现金流量表为空,则需要更新
if balance_empty or cash_empty:
missing_periods.append(record.get('report_date'))
logger.debug(f"发现需要更新的数据: {stock_code} - {record.get('report_date')} (资产负债表空: {balance_empty}, 现金流量表空: {cash_empty})")
if missing_periods:
logger.info(f"股票 {stock_code}{len(missing_periods)} 个报告期需要更新数据")
else:
logger.info(f"股票 {stock_code} 的数据完整,无需更新")
return missing_periods
except Exception as e:
logger.error(f"检查缺失数据失败: {str(e)}")
return []
def update_missing_financial_data(self, stock_code: str, missing_periods: List[str]) -> bool:
"""
更新缺失的财务数据只更新资产负债表和现金流量表
Args:
stock_code: 股票代码
missing_periods: 需要更新的报告期列表
Returns:
bool: 是否更新成功
"""
try:
if not missing_periods:
return True
logger.info(f"开始更新股票 {stock_code} 缺失的财务数据")
# 获取资产负债表和现金流量表数据
balance_data = self.fetch_balance_sheet(stock_code, periods=21)
time.sleep(1)
cash_data = self.fetch_cash_flow_statement(stock_code, periods=21)
time.sleep(1)
# 创建按报告日期索引的字典
balance_dict = {item['REPORT_DATE'][:10]: item for item in balance_data if item.get('REPORT_DATE')}
cash_dict = {item['REPORT_DATE'][:10]: item for item in cash_data if item.get('REPORT_DATE')}
updated_count = 0
for report_date in missing_periods:
try:
# 查找当前记录
current_record = self.collection.find_one({
'stock_code': stock_code,
'report_date': report_date
})
if not current_record:
logger.warning(f"未找到记录: {stock_code} - {report_date}")
continue
# 准备更新的字段
update_fields = {}
# 检查是否需要更新资产负债表
balance_empty = not current_record.get('balance_sheet') or current_record.get('balance_sheet') == {}
if balance_empty and report_date in balance_dict:
update_fields['balance_sheet'] = balance_dict[report_date]
logger.debug(f"更新资产负债表: {stock_code} - {report_date}")
# 检查是否需要更新现金流量表
cash_empty = not current_record.get('cash_flow_statement') or current_record.get('cash_flow_statement') == {}
if cash_empty and report_date in cash_dict:
update_fields['cash_flow_statement'] = cash_dict[report_date]
logger.debug(f"更新现金流量表: {stock_code} - {report_date}")
# 如果有字段需要更新
if update_fields:
update_fields['collect_time'] = datetime.datetime.now() # 更新采集时间
self.collection.update_one(
{'stock_code': stock_code, 'report_date': report_date},
{'$set': update_fields}
)
updated_count += 1
logger.info(f"成功更新: {stock_code} - {report_date}")
except Exception as e:
logger.error(f"更新记录失败: {stock_code} - {report_date} - {str(e)}")
continue
logger.info(f"股票 {stock_code} 更新完成,共更新 {updated_count} 个报告期")
return True
except Exception as e:
logger.error(f"更新缺失财务数据失败: {str(e)}")
return False
def collect_financial_data(self, stock_code: str, periods: int = 21) -> bool: def collect_financial_data(self, stock_code: str, periods: int = 21) -> bool:
""" """
采集单只股票的财务数据 - 增量更新模式 采集单只股票的财务数据
Args: Args:
stock_code: 股票代码'300750.SZ' stock_code: 股票代码'300750.SZ'
@ -666,20 +551,37 @@ class FinancialDataCollectorV2:
bool: 是否采集成功 bool: 是否采集成功
""" """
try: try:
logger.info(f"开始检查股票 {stock_code} 的财务数据") logger.info(f"开始采集股票 {stock_code} 的财务数据{periods}个报告期)")
# 检查哪些报告期的数据缺失 # 获取三张财务报表数据
missing_periods = self.check_missing_data(stock_code) profit_data = self.fetch_profit_statement(stock_code, periods)
time.sleep(1) # 避免请求过于频繁
if not missing_periods: balance_data = self.fetch_balance_sheet(stock_code, periods)
logger.info(f"股票 {stock_code} 数据完整,跳过") time.sleep(1)
return True
# 更新缺失的数据 cash_data = self.fetch_cash_flow_statement(stock_code, periods)
success = self.update_missing_financial_data(stock_code, missing_periods) time.sleep(1)
# 检查至少有一张表有数据
if not any([profit_data, balance_data, cash_data]):
logger.error(f"股票 {stock_code} 没有获取到任何财务数据")
return False
# 处理财务数据
financial_data_list = self.process_financial_data(
stock_code, profit_data, balance_data, cash_data
)
if not financial_data_list:
logger.error(f"股票 {stock_code} 的财务数据处理失败")
return False
# 保存到MongoDB
success = self.save_to_mongodb(financial_data_list)
if success: if success:
logger.info(f"股票 {stock_code} 的财务数据更新完成") logger.info(f"股票 {stock_code} 的财务数据采集完成")
return success return success
@ -689,19 +591,19 @@ class FinancialDataCollectorV2:
def batch_collect_financial_data(self, stock_codes: List[str], periods: int = 21) -> Dict: def batch_collect_financial_data(self, stock_codes: List[str], periods: int = 21) -> Dict:
""" """
批量更新多只股票的缺失财务数据 批量采集多只股票的财务数据
Args: Args:
stock_codes: 股票代码列表 stock_codes: 股票代码列表
periods: 获取多少个报告期默认21个季度 periods: 获取多少个报告期默认21个季度
Returns: Returns:
Dict: 更新结果统计 Dict: 采集结果统计
""" """
results = {'success': 0, 'failed': 0, 'failed_stocks': [], 'skipped': 0} results = {'success': 0, 'failed': 0, 'failed_stocks': []}
total_stocks = len(stock_codes) total_stocks = len(stock_codes)
logger.info(f"开始批量检查和更新 {total_stocks} 只股票的财务数据") logger.info(f"开始批量采集 {total_stocks} 只股票的财务数据")
for index, stock_code in enumerate(stock_codes, 1): for index, stock_code in enumerate(stock_codes, 1):
try: try:
@ -712,11 +614,11 @@ class FinancialDataCollectorV2:
success = self.collect_financial_data(stock_code, periods) success = self.collect_financial_data(stock_code, periods)
if success: if success:
results['success'] += 1 results['success'] += 1
logger.info(f"SUCCESS [{index}/{total_stocks}] {stock_code} 处理成功") logger.info(f"SUCCESS [{index}/{total_stocks}] {stock_code} 采集成功")
else: else:
results['failed'] += 1 results['failed'] += 1
results['failed_stocks'].append(stock_code) results['failed_stocks'].append(stock_code)
logger.warning(f"FAILED [{index}/{total_stocks}] {stock_code} 处理失败") logger.warning(f"FAILED [{index}/{total_stocks}] {stock_code} 采集失败")
# 每只股票之间暂停一下,避免请求过于频繁 # 每只股票之间暂停一下,避免请求过于频繁
time.sleep(2) time.sleep(2)
@ -735,7 +637,7 @@ class FinancialDataCollectorV2:
continue continue
success_rate = (results['success'] / total_stocks) * 100 success_rate = (results['success'] / total_stocks) * 100
logger.info(f"批量更新完成: 成功{results['success']}只,失败{results['failed']}只,成功率: {success_rate:.2f}%") logger.info(f"批量采集完成: 成功{results['success']}只,失败{results['failed']}只,成功率: {success_rate:.2f}%")
if results['failed_stocks']: if results['failed_stocks']:
logger.info(f"失败的股票数量: {len(results['failed_stocks'])}") logger.info(f"失败的股票数量: {len(results['failed_stocks'])}")
@ -757,7 +659,7 @@ class FinancialDataCollectorV2:
def main(): def main():
"""主函数 - 批量更新所有股票的缺失财务数据""" """主函数 - 批量采集所有股票的财务数据"""
collector = FinancialDataCollectorV2() collector = FinancialDataCollectorV2()
try: try:
@ -771,32 +673,30 @@ def main():
logger.info(f"从数据库获取到 {len(stock_codes)} 只股票") logger.info(f"从数据库获取到 {len(stock_codes)} 只股票")
# 可以选择处理所有股票或者部分股票进行测试 # 可以选择采集所有股票或者部分股票进行测试
# 如果要测试,可以取前几只股票 # 如果要测试,可以取前几只股票
# 测试模式:只处理前10只股票 # 测试模式:只采集前10只股票
TEST_MODE = False # 设置为False将处理所有股票 TEST_MODE = False # 设置为False将采集所有股票
if TEST_MODE: if TEST_MODE:
test_count = min(10, len(stock_codes)) # 最多取10只股票测试 test_count = min(10, len(stock_codes)) # 最多取10只股票测试
stock_codes = stock_codes[:test_count] stock_codes = stock_codes[:test_count]
logger.info(f"TEST MODE: 仅处理{test_count} 只股票") logger.info(f"TEST MODE: 仅采集{test_count} 只股票")
else: else:
logger.info(f"PRODUCTION MODE: 将处理全部 {len(stock_codes)} 只股票") logger.info(f"PRODUCTION MODE: 将采集全部 {len(stock_codes)} 只股票")
logger.info(f"开始批量检查和更新 {len(stock_codes)} 只股票的财务数据") logger.info(f"开始批量采集 {len(stock_codes)} 只股票的财务数据")
logger.info("注意: 本次运行为增量更新模式,只会更新缺失的资产负债表和现金流量表数据")
# 批量更新 # 批量采集
results = collector.batch_collect_financial_data(stock_codes, periods=21) results = collector.batch_collect_financial_data(stock_codes, periods=21)
# 输出最终结果 # 输出最终结果
print(f"\n{'='*50}") print(f"\n{'='*50}")
print(f"批量更新完成统计") print(f"批量采集完成统计")
print(f"{'='*50}") print(f"{'='*50}")
print(f"SUCCESS 成功处理: {results['success']} 只股票") print(f"SUCCESS 成功采集: {results['success']} 只股票")
print(f"FAILED 处理失败: {results['failed']} 只股票") print(f"FAILED 采集失败: {results['failed']} 只股票")
print(f"SUCCESS RATE 成功率: {(results['success'] / len(stock_codes) * 100):.2f}%") print(f"SUCCESS RATE 成功率: {(results['success'] / len(stock_codes) * 100):.2f}%")
print(f"\n说明: 成功处理包括数据完整(无需更新)和成功更新缺失数据的股票")
if results['failed_stocks']: if results['failed_stocks']:
print(f"\n失败的股票列表:") print(f"\n失败的股票列表:")
@ -811,7 +711,7 @@ def main():
logger.info("用户中断程序执行") logger.info("用户中断程序执行")
print("\n警告: 程序被用户中断") print("\n警告: 程序被用户中断")
except Exception as e: except Exception as e:
logger.error(f"更新过程中出现错误: {str(e)}") logger.error(f"采集过程中出现错误: {str(e)}")
print(f"\n错误: 程序执行出错: {str(e)}") print(f"\n错误: 程序执行出错: {str(e)}")
finally: finally:
collector.close_connection() collector.close_connection()

View File

@ -194,24 +194,35 @@ def fetch_and_store_hk_stock_data(page_size=90, max_workers=10, use_proxy=False)
def format_hk_stock_code(stock_code): def format_hk_stock_code(stock_code):
""" """
统一港股代码格式支持0700.HKHK0700 统一港股代码格式支持0700.HKHK070000700
返回雪球格式如0700.HK和Redis存储格式 返回雪球格式如0700.HK和Redis存储格式如00700
""" """
stock_code = stock_code.upper() stock_code = stock_code.upper()
if '.HK' in stock_code: if '.HK' in stock_code:
return stock_code, stock_code code = stock_code.replace('.HK', '')
# 确保是5位数字格式
if code.isdigit():
return f'{code.zfill(5)}.HK', code.zfill(5)
else:
return stock_code, code.zfill(5)
elif stock_code.startswith('HK'): elif stock_code.startswith('HK'):
code = stock_code[2:] code = stock_code[2:]
return f'{code}.HK', f'{code}.HK' if code.isdigit():
return f'{code.zfill(5)}.HK', code.zfill(5)
else: else:
# 假设是纯数字,添加.HK后缀 return stock_code, code.zfill(5)
return f'{stock_code}.HK', f'{stock_code}.HK' else:
# 假设是纯数字,添加.HK后缀并确保5位格式
if stock_code.isdigit():
return f'{stock_code.zfill(5)}.HK', stock_code.zfill(5)
else:
return stock_code, stock_code
def get_hk_stock_realtime_info_from_redis(stock_code): def get_hk_stock_realtime_info_from_redis(stock_code):
""" """
根据港股代码从Redis查询实时行情并封装为指定结构 根据港股代码从Redis查询实时行情并封装为指定结构
:param stock_code: 支持0700.HKHK0700等格式 :param stock_code: 支持0700.HKHK070000700等格式
:return: dict or None :return: dict or None
""" """
_, redis_code = format_hk_stock_code(stock_code) _, redis_code = format_hk_stock_code(stock_code)
@ -246,7 +257,7 @@ def get_hk_stock_realtime_info_from_redis(stock_code):
result["crawlDate"] = data.get("fetch_time") result["crawlDate"] = data.get("fetch_time")
result["marketValue"] = data.get("market_capital") result["marketValue"] = data.get("market_capital")
result["maxPrice"] = data.get("high") if "high" in data else data.get("high52w") result["maxPrice"] = data.get("high") if "high" in data else data.get("high52w")
result["minPrice"] = data.get("low") if "low" in data else data.get("high52w") result["minPrice"] = data.get("low") if "low" in data else data.get("low52w")
result["nowPrice"] = data.get("current") result["nowPrice"] = data.get("current")
result["pbRate"] = data.get("pb") result["pbRate"] = data.get("pb")
result["rangeRiseAndFall"] = data.get("percent") result["rangeRiseAndFall"] = data.get("percent")

View File

@ -7,11 +7,10 @@
""" """
import sys import sys
import pymongo
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import logging import logging
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Tuple
from pathlib import Path from pathlib import Path
from sqlalchemy import create_engine, text from sqlalchemy import create_engine, text
from datetime import datetime from datetime import datetime
@ -190,7 +189,7 @@ class TechFundamentalFactorStrategy:
logger.info(f"计算 {len(stock_codes)} 只股票的通用因子") logger.info(f"计算 {len(stock_codes)} 只股票的通用因子")
results = [] results = []
latest_date = "2025-03-31" # 最新季度数据 latest_date = "2025-06-30" # 最新季度数据
annual_date = "2024-12-31" # 年报数据 annual_date = "2024-12-31" # 年报数据
for stock_code in stock_codes: for stock_code in stock_codes:
@ -247,7 +246,7 @@ class TechFundamentalFactorStrategy:
logger.info(f"计算 {len(stock_codes)} 只成长期股票的特色因子") logger.info(f"计算 {len(stock_codes)} 只成长期股票的特色因子")
results = [] results = []
latest_date = "2025-03-31" # 使用最新数据 latest_date = "2025-06-30" # 使用最新数据
annual_date = "2024-12-31" # 使用年度数据 annual_date = "2024-12-31" # 使用年度数据
for stock_code in stock_codes: for stock_code in stock_codes:
@ -306,7 +305,7 @@ class TechFundamentalFactorStrategy:
try: try:
logger.info(f"计算 {len(stock_codes)} 只成熟期股票的特色因子") logger.info(f"计算 {len(stock_codes)} 只成熟期股票的特色因子")
latest_date = "2025-03-31" # 使用最新数据 latest_date = "2025-06-30" # 使用最新数据
# 在循环外获取全A股PB和ROE数据避免重复查询 # 在循环外获取全A股PB和ROE数据避免重复查询
logger.info("获取全A股PB数据...") logger.info("获取全A股PB数据...")
@ -670,7 +669,7 @@ def main():
try: try:
print("=== 科技主题基本面因子选股策略 ===") print("=== 科技主题基本面因子选股策略 ===")
print("数据说明:") print("数据说明:")
print("- 毛利率、净利润增长率等:使用最新数据 (2025-03-31)") print("- 毛利率、净利润增长率等:使用最新数据 (2025-06-30)")
print("- 供应商客户集中度、折旧摊销、研发费用:使用年报数据 (2024-12-31)") print("- 供应商客户集中度、折旧摊销、研发费用:使用年报数据 (2024-12-31)")
print() print()

View File

@ -11,7 +11,7 @@ XUEQIU_HEADERS = {
'Accept-Encoding': 'gzip, deflate, br, zstd', 'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Client-Version': 'v2.44.75', 'Client-Version': 'v2.44.75',
'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; Hm_lvt_1db88642e346389874251b5a1eded6e3=1754636834; xq_a_token=4ea8af8f9cb5850af2ba654c5255cbf6bf797b39; xqat=4ea8af8f9cb5850af2ba654c5255cbf6bf797b39; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzU3NjM3OTA4LCJjdG0iOjE3NTUwNDU5MDg0ODksImNpZCI6ImQ5ZDBuNEFadXAifQ.jAeKlW2r1xRuyoZ3cuy2rTgfSoW_79wGJeuup7I7sZMSH5QeRDJrGx5JWXO4373YRpNW0qnAXR51Ygd8Plmko1u99iN8LifGzyMtblXDPgs17aS0zyHr6cMAsURU984wCXkmZxdvRMCHdevc8XWNHnuqeGfQNSgBSdO6Zv7Xc5-t965TJba96UOsNBpv2GghV9B2mcrUQyW3edi9kRAN_Fxmx5M1Iri4Yfppcaj-VSZYkdZtUpizrN5BbVYujcnQjj4kceUYYAl3Ccs273KVNSMFKpHMIOJcMJATY6PRgLvbEu8_ttIfBnbG4mmZ71bU7RXigleXIj1qhcDL2rDzQQ; xq_r_token=2b5db3e3897cb3e46b8fa2fa384471b334ec59cb; acw_tc=ac11000117550489614555169ef3ec63ec008e1bfba0fe5321bc8a30c2deb8; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1755050072; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=kE/XuROkIJ4APDhfxUYOb9lRiDFNJT8KxiXYwJAuoCeNlkaxlcytBSuiCXGjqxhydALLguC/FB4qIXfLut408Q%3D%3D; ssxmod_itna=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuuqxGeR0qxPehAjqDsqze4GzDiLPGhDBWAFdYjw7ivq8RG5pxMBrGHuhLMEzoiqds=iCpxusTh1K/Zjw=KmoeK4xGLDY=DCTKq1QeD4S3Dt4DIDAYDDxDWU4DLDYoDY3nYxGP=xpWTcmRbD0YDzqDgUEe=xi3DA4DjnehqYiTdwDDBDGtO=9aDG4GfSmDD0wDLoGQQoDGWnCneE6mkiFIr6TTDjqPD/Shc59791vGW56CM9zo3paFDtqD90aAFn=GrvFaE_n93e4F4qibH7GYziTmrzt4xmrKi44mBDmAQQ0TKe4Bxq3DPQDhtTH7OY8FYS_Qqx4Gn/lHDcnDd7YerPCYC70bYbC42q3i8WTx3e8/rijIosDPRGrE0WdoqzGh_YPeD; ssxmod_itna2=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuuqxGeR0qxPehAe4DWhYeRonANsR7vNG4g2BxpQTxTiD', 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; aliyungf_tc=00c6b999835b16cea9e4a6aab36cca373a0976bf55ee74770d11f421f7119ad8; Hm_lvt_1db88642e346389874251b5a1eded6e3=1757464900; xq_a_token=975ab9f15a4965b9e557b9bc6f33bc1da20a0f49; xqat=975ab9f15a4965b9e557b9bc6f33bc1da20a0f49; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzYwMjU2NzM4LCJjdG0iOjE3NTc2NjQ3Mzg4MjUsImNpZCI6ImQ5ZDBuNEFadXAifQ.TMx4-TjKx96j5h6-EGiRIM2WKtJm1xctZhYidc40Em0pRcr0UBHAKBGl3No5r1BElYa9qnEDgNYI0Zv137Inx-EMPqm5cd1Z_ZjLdWOSLzT9qqBj8zdfuqJwP2nCYvC6KLjd8BvykS0vSFKqwb-r0WhEA3OzbO8teVNsaemdKAhBoIyP3-RQCfRxJ9RLNha1ZMdg66iZvfz_SOsG41y8IA9yyl-FFFJOq4TnAiywY1yO1QIJJhkh8YQqfnDfQQdSIFgJGToU980Lw1dm4aCDY-kvn-t18KjrL_hZJ_UNN65bgZsSsuWf-VQ7wsjjczNrfBYAHdZ6kES0CGo9g8IZZw; xq_r_token=c209224335327f29fc555d9910b43c0df6d52d5a; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1757774901; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=rJD0qKtipTMjRBkLBVEyXbl0CiVeY7y4AxEZC0Vf6Zkou9cxp0NPsxwSrnOyFyBMr+Ws5/nJDO1NUalRDyAPsA%3D%3D; acw_tc=3ccdc14717578973700474899e2dc1c35b6358f1af81617250f8f00b4cf31c; ssxmod_itna=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0qRHhqDylAkiETFN1t42Y5D/KlYeDZDGFdDqx0Ei6FiYHK1ezjCGbKSAQY5P53Niio89NQ7DEQm6fjL1S4K7s5h8KRDo9n4hiDB3DbqDymgY5qxGGA4GwDGoD34DiDDPDb8rDALeD7qDFnenropTDm4GWneGfDDoDYbT3xiUYDDUvbeG2iET4DDN4bIGYZ2G76=r1doBip29xKiTDjqPD/ShUoiuzZKC4icFL2/amAeGyC5GuY6mWHQ77SWcbscAV70i8hx_Bx3rKqB5YGDRYqK8o2xY9iKR0YRDxeEDW0DWnQ8EwhDDiP46iRiGDYZtgrNMhXiY4MQA7bAilP4nPkFGCmqzBqGYesQGQiT3ihKbm5CexbxxD; ssxmod_itna2=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0qRHhqDylAkiETFN1t42YeDA4rYnRItORCitz/D3nyGQigbiD',
'Referer': 'https://weibo.com/u/7735765253', 'Referer': 'https://weibo.com/u/7735765253',
'Sec-Ch-Ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"', 'Sec-Ch-Ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"',
'Sec-Ch-Ua-Mobile': '?0', 'Sec-Ch-Ua-Mobile': '?0',

View File

@ -0,0 +1,100 @@
# coding:utf-8
# 更新港股列表的代码
import requests
import pandas as pd
from sqlalchemy import create_engine, text
import sys
import os
# 将项目根目录添加到Python路径以便导入config
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.scripts.config import XUEQIU_HEADERS
def collect_us_stock_codes(db_url):
"""
采集雪球港股列表数据并存储到数据库
"""
engine = create_engine(db_url)
headers = XUEQIU_HEADERS
base_url = "https://stock.xueqiu.com/v5/stock/screener/quote/list.json"
page = 1
page_size = 90
all_data = []
print("--- Starting to collect Hong Kong stock codes ---")
# 采集前先清空表
try:
with engine.begin() as conn:
conn.execute(text("TRUNCATE TABLE gp_code_us"))
print("Table `gp_code_us` has been truncated.")
except Exception as e:
print(f"Error truncating table `gp_code_us`: {e}")
return
while True:
params = {
'page': page,
'size': page_size,
'order': 'desc',
'order_by': 'market_capital',
'market': 'US',
'type': 'us',
'is_delay': 'true'
}
print(f"Fetching page {page}...")
try:
response = requests.get(base_url, headers=headers, params=params, timeout=20)
if response.status_code != 200:
print(f"Request failed with status code {response.status_code}")
break
data = response.json()
if data.get('error_code') != 0:
print(f"API error: {data.get('error_description')}")
break
stock_list = data.get('data', {}).get('list', [])
if not stock_list:
print("No more data found. Collection finished.")
break
all_data.extend(stock_list)
# 如果获取到的数据少于每页数量,说明是最后一页
if len(stock_list) < page_size:
print("Reached the last page. Collection finished.")
break
page += 1
except requests.exceptions.RequestException as e:
print(f"Request exception on page {page}: {e}")
break
if all_data:
print(f"--- Collected a total of {len(all_data)} stocks. Preparing to save to database. ---")
df = pd.DataFrame(all_data)
# 数据映射和转换
df_to_save = pd.DataFrame()
df_to_save['gp_name'] = df['name']
df_to_save['gp_code'] = df['symbol']
df_to_save['gp_code_two'] = 'US.' + df['symbol'].astype(str)
df_to_save['market_cap'] = df['market_capital']
try:
df_to_save.to_sql('gp_code_us', engine, if_exists='append', index=False)
print("--- Successfully saved all data to `gp_code_us`. ---")
except Exception as e:
print(f"Error saving data to database: {e}")
else:
print("--- No data collected. ---")
engine.dispose()
if __name__ == "__main__":
db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj'
collect_us_stock_codes(db_url)

View File

@ -1,118 +0,0 @@
20230508_20240508下跌后快速反弹走势,,,,,,,,
take_profit_pct,avg_profit_per_trade,median_profit_per_trade,avg_holding_days,trade_win_rate,stock_win_rate,total_trades,stock_count,
30,-4712,0,100.8,49.8,53.1,1815,405,
29,-5206,0,100,49.9,53.3,1818,405,
28,-6335,0,99.2,50,53.6,1822,405,
27,-6590,0,98,50.2,53.6,1827,405,
26,-6865,0,96.7,50.5,54.1,1833,405,
25,-6648,0,95.4,51,54.3,1836,405,
24,-6931,0,93.9,51.4,54.1,1835,403,
23,-5709,0,91.9,52.2,54.5,1837,404,
22,-6556,0,90.7,52.4,54.6,1852,405,
21,-5242,0,88.3,53.1,55.3,1858,405,
20,-5913,0,86.6,53.7,55.3,1866,405,
19,-5848,0,84.1,54.5,55.6,1876,405,
18,-4698,0,81.5,55.6,56.3,1882,405,
17,-5508,0,79.3,56.4,56.3,1888,405,
16,-4942,0,76.6,57.5,57,1897,405,
15,-5598,0,74.1,58.6,58.5,1903,405,
14,-6530,0,71,59.6,58.5,1910,405,
13,-6346,0,68.3,61,58.8,1919,405,
12,-6840,0,65.3,62.6,58.8,1926,405,
11,-7236,0,62.5,64.4,58.5,1931,405,
10,-7611,0,59.3,66.1,58.8,1939,405,
9,-9419,0,55.9,67.5,59,1949,405,
8,-9855,0,52.7,69.4,59,1958,405,
7,-10832,0,49.2,71.4,58.8,1969,405,
6,-11762,0,45.3,73.7,59.8,1973,405,
5,-13031,0,41.6,75.9,60.5,1978,405,
,,,,,,,,
,,,,,,,,
20230508_20240205下跌走势,,,,,,,,
take_profit_pct,avg_profit_per_trade,median_profit_per_trade,avg_holding_days,trade_win_rate,stock_win_rate,total_trades,stock_count,
30,-120990,0,75.6,10.9,8.8,1465,396,
29,-120762,0,75.4,10.9,8.6,1466,396,
28,-120539,0,75.1,11.1,8.3,1468,396,
27,-119956,0,74.8,11.4,9.1,1469,396,
26,-118942,0,74.3,11.6,9.6,1471,396,
25,-117868,0,73.8,11.9,9.3,1471,396,
24,-117173,0,73.3,12.3,9.3,1471,396,
23,-114536,0,72.2,13,9.6,1474,396,
22,-113484,0,71.5,13.3,9.8,1479,396,
21,-109829,0,69.9,14.4,10.9,1482,396,
20,-108867,0,69,15,10.9,1486,396,
19,-106925,0,67.9,15.8,11.4,1492,396,
18,-104300,0,66.7,16.8,11.4,1496,396,
17,-102194,0,65.5,17.7,12.6,1499,396,
16,-99208,0,64,18.9,12.9,1506,396,
15,-97766,0,62.6,19.8,13.1,1511,396,
14,-96754,0,61.2,20.6,13.1,1515,396,
13,-93449,0,59.5,22.3,13.9,1522,396,
12,-91252,0,57.7,23.9,14.6,1526,396,
11,-88804,0,56.1,25.3,15.2,1527,396,
10,-84415,0,53.6,27.9,17.4,1535,396,
9,-80819,0,50.9,30.7,19.7,1543,396,
8,-77402,0,48.2,33.2,21.5,1547,396,
7,-72609,0,45.1,36.2,23.5,1556,396,
6,-67130,0,41.5,40.1,27,1559,396,
5,-63089,0,38.3,43.3,27,1563,396,
,,,,,,,,
,,,,,,,,
20221028_20230508上涨走势,,,,,,,,
take_profit_pct,avg_profit_per_trade,median_profit_per_trade,avg_holding_days,trade_win_rate,stock_win_rate,total_trades,stock_count,
30,-26094,0,84.9,35.4,39.4,1022,343,
29,-24312,0,83.6,36.2,40.2,1027,343,
28,-24443,0,82.7,36.7,40.5,1031,343,
27,-23021,0,81.2,37.2,41.1,1034,343,
26,-22763,0,80,37.7,41.4,1036,343,
25,-21882,0,78,38.5,42,1040,343,
24,-22570,0,77.1,38.9,41.4,1042,343,
23,-22085,0,75.5,39.6,42,1048,343,
22,-21214,0,74.1,40.4,43.4,1053,343,
21,-20730,0,72.7,41.3,44,1059,343,
20,-18191,0,70.6,42.5,44.6,1064,343,
19,-17678,0,68.8,43.4,43.7,1068,343,
18,-18481,0,67.2,44,43.1,1072,343,
17,-16782,0,64.9,45.4,44.9,1076,343,
16,-16490,0,62.9,46.4,45.8,1081,343,
15,-15946,0,60.4,47.7,46.4,1089,343,
14,-16037,0,58.4,48.9,47.2,1095,343,
13,-16701,0,56.4,49.8,48.1,1100,343,
12,-16062,0,54,51.3,49,1106,343,
11,-16000,0,51.4,52.9,49.6,1110,343,
10,-15453,0,49.1,54.4,49.6,1114,343,
9,-14841,0,46.2,56.2,49.9,1121,343,
8,-14211,0,43,58.3,51.3,1132,343,
7,-14199,0,39.9,60.5,53.9,1138,343,
6,-15366,0,37.3,62.4,52.8,1144,343,
5,-15711,0,34.1,64.9,50.7,1152,343,
,,,,,,,,
,,,,,,,,
20210106_20220224震荡走势,,,,,,,,
take_profit_pct,avg_profit_per_trade,median_profit_per_trade,avg_holding_days,trade_win_rate,stock_win_rate,total_trades,stock_count,
30,12047,0,115.6,44.5,48.8,2508,424,
29,12857,0,113.5,45.1,48.5,2525,425,
28,13349,0,112,45.7,48.9,2538,425,
27,12644,0,110.5,46.3,49.4,2554,425,
26,12878,0,108.4,47.1,48.7,2571,425,
25,12319,0,106.9,47.7,48.5,2589,425,
24,12744,0,104.5,48.6,50.4,2608,425,
23,11949,0,102.3,49.4,51.3,2626,425,
22,11246,0,100.6,50.3,51.3,2637,425,
21,10399,0,98.7,51.1,51.1,2644,425,
20,9908,0,96.3,52,51.1,2666,425,
19,9327,0,93.8,53,52.5,2681,425,
18,7509,0,92.1,53.6,52.6,2686,424,
17,7333,0,89.4,55.1,53.9,2713,425,
16,6404,0,86.6,56.2,54.8,2745,425,
15,5296,0,83.7,57.3,55.3,2763,425,
14,5382,0,80.5,59,54.6,2788,425,
13,4527,0,77.7,60.4,56.6,2800,424,
12,3650,0,74.3,62,57.2,2826,425,
11,1583,0,71.1,63.3,57.2,2849,425,
10,-741,0,68.1,64.7,56.9,2859,425,
9,-2581,0,64.5,66.3,56.9,2879,425,
8,-3833,0,60.7,68.3,56.7,2902,425,
7,-4992,0,56.7,70.5,57.9,2924,425,
6,-4815,0,52.1,73.6,60.5,2948,425,
5,-7635,0,47.9,75.6,58.8,2965,425,
1 20230508_20240508下跌后快速反弹走势
2 take_profit_pct avg_profit_per_trade median_profit_per_trade avg_holding_days trade_win_rate stock_win_rate total_trades stock_count
3 30 -4712 0 100.8 49.8 53.1 1815 405
4 29 -5206 0 100 49.9 53.3 1818 405
5 28 -6335 0 99.2 50 53.6 1822 405
6 27 -6590 0 98 50.2 53.6 1827 405
7 26 -6865 0 96.7 50.5 54.1 1833 405
8 25 -6648 0 95.4 51 54.3 1836 405
9 24 -6931 0 93.9 51.4 54.1 1835 403
10 23 -5709 0 91.9 52.2 54.5 1837 404
11 22 -6556 0 90.7 52.4 54.6 1852 405
12 21 -5242 0 88.3 53.1 55.3 1858 405
13 20 -5913 0 86.6 53.7 55.3 1866 405
14 19 -5848 0 84.1 54.5 55.6 1876 405
15 18 -4698 0 81.5 55.6 56.3 1882 405
16 17 -5508 0 79.3 56.4 56.3 1888 405
17 16 -4942 0 76.6 57.5 57 1897 405
18 15 -5598 0 74.1 58.6 58.5 1903 405
19 14 -6530 0 71 59.6 58.5 1910 405
20 13 -6346 0 68.3 61 58.8 1919 405
21 12 -6840 0 65.3 62.6 58.8 1926 405
22 11 -7236 0 62.5 64.4 58.5 1931 405
23 10 -7611 0 59.3 66.1 58.8 1939 405
24 9 -9419 0 55.9 67.5 59 1949 405
25 8 -9855 0 52.7 69.4 59 1958 405
26 7 -10832 0 49.2 71.4 58.8 1969 405
27 6 -11762 0 45.3 73.7 59.8 1973 405
28 5 -13031 0 41.6 75.9 60.5 1978 405
29
30
31 20230508_20240205下跌走势
32 take_profit_pct avg_profit_per_trade median_profit_per_trade avg_holding_days trade_win_rate stock_win_rate total_trades stock_count
33 30 -120990 0 75.6 10.9 8.8 1465 396
34 29 -120762 0 75.4 10.9 8.6 1466 396
35 28 -120539 0 75.1 11.1 8.3 1468 396
36 27 -119956 0 74.8 11.4 9.1 1469 396
37 26 -118942 0 74.3 11.6 9.6 1471 396
38 25 -117868 0 73.8 11.9 9.3 1471 396
39 24 -117173 0 73.3 12.3 9.3 1471 396
40 23 -114536 0 72.2 13 9.6 1474 396
41 22 -113484 0 71.5 13.3 9.8 1479 396
42 21 -109829 0 69.9 14.4 10.9 1482 396
43 20 -108867 0 69 15 10.9 1486 396
44 19 -106925 0 67.9 15.8 11.4 1492 396
45 18 -104300 0 66.7 16.8 11.4 1496 396
46 17 -102194 0 65.5 17.7 12.6 1499 396
47 16 -99208 0 64 18.9 12.9 1506 396
48 15 -97766 0 62.6 19.8 13.1 1511 396
49 14 -96754 0 61.2 20.6 13.1 1515 396
50 13 -93449 0 59.5 22.3 13.9 1522 396
51 12 -91252 0 57.7 23.9 14.6 1526 396
52 11 -88804 0 56.1 25.3 15.2 1527 396
53 10 -84415 0 53.6 27.9 17.4 1535 396
54 9 -80819 0 50.9 30.7 19.7 1543 396
55 8 -77402 0 48.2 33.2 21.5 1547 396
56 7 -72609 0 45.1 36.2 23.5 1556 396
57 6 -67130 0 41.5 40.1 27 1559 396
58 5 -63089 0 38.3 43.3 27 1563 396
59
60
61 20221028_20230508上涨走势
62 take_profit_pct avg_profit_per_trade median_profit_per_trade avg_holding_days trade_win_rate stock_win_rate total_trades stock_count
63 30 -26094 0 84.9 35.4 39.4 1022 343
64 29 -24312 0 83.6 36.2 40.2 1027 343
65 28 -24443 0 82.7 36.7 40.5 1031 343
66 27 -23021 0 81.2 37.2 41.1 1034 343
67 26 -22763 0 80 37.7 41.4 1036 343
68 25 -21882 0 78 38.5 42 1040 343
69 24 -22570 0 77.1 38.9 41.4 1042 343
70 23 -22085 0 75.5 39.6 42 1048 343
71 22 -21214 0 74.1 40.4 43.4 1053 343
72 21 -20730 0 72.7 41.3 44 1059 343
73 20 -18191 0 70.6 42.5 44.6 1064 343
74 19 -17678 0 68.8 43.4 43.7 1068 343
75 18 -18481 0 67.2 44 43.1 1072 343
76 17 -16782 0 64.9 45.4 44.9 1076 343
77 16 -16490 0 62.9 46.4 45.8 1081 343
78 15 -15946 0 60.4 47.7 46.4 1089 343
79 14 -16037 0 58.4 48.9 47.2 1095 343
80 13 -16701 0 56.4 49.8 48.1 1100 343
81 12 -16062 0 54 51.3 49 1106 343
82 11 -16000 0 51.4 52.9 49.6 1110 343
83 10 -15453 0 49.1 54.4 49.6 1114 343
84 9 -14841 0 46.2 56.2 49.9 1121 343
85 8 -14211 0 43 58.3 51.3 1132 343
86 7 -14199 0 39.9 60.5 53.9 1138 343
87 6 -15366 0 37.3 62.4 52.8 1144 343
88 5 -15711 0 34.1 64.9 50.7 1152 343
89
90
91 20210106_20220224震荡走势
92 take_profit_pct avg_profit_per_trade median_profit_per_trade avg_holding_days trade_win_rate stock_win_rate total_trades stock_count
93 30 12047 0 115.6 44.5 48.8 2508 424
94 29 12857 0 113.5 45.1 48.5 2525 425
95 28 13349 0 112 45.7 48.9 2538 425
96 27 12644 0 110.5 46.3 49.4 2554 425
97 26 12878 0 108.4 47.1 48.7 2571 425
98 25 12319 0 106.9 47.7 48.5 2589 425
99 24 12744 0 104.5 48.6 50.4 2608 425
100 23 11949 0 102.3 49.4 51.3 2626 425
101 22 11246 0 100.6 50.3 51.3 2637 425
102 21 10399 0 98.7 51.1 51.1 2644 425
103 20 9908 0 96.3 52 51.1 2666 425
104 19 9327 0 93.8 53 52.5 2681 425
105 18 7509 0 92.1 53.6 52.6 2686 424
106 17 7333 0 89.4 55.1 53.9 2713 425
107 16 6404 0 86.6 56.2 54.8 2745 425
108 15 5296 0 83.7 57.3 55.3 2763 425
109 14 5382 0 80.5 59 54.6 2788 425
110 13 4527 0 77.7 60.4 56.6 2800 424
111 12 3650 0 74.3 62 57.2 2826 425
112 11 1583 0 71.1 63.3 57.2 2849 425
113 10 -741 0 68.1 64.7 56.9 2859 425
114 9 -2581 0 64.5 66.3 56.9 2879 425
115 8 -3833 0 60.7 68.3 56.7 2902 425
116 7 -4992 0 56.7 70.5 57.9 2924 425
117 6 -4815 0 52.1 73.6 60.5 2948 425
118 5 -7635 0 47.9 75.6 58.8 2965 425