QMT清仓逻辑新增。

This commit is contained in:
满脸小星星 2025-12-22 15:32:27 +08:00
parent a4ef9df315
commit 112c55fbda
5 changed files with 255 additions and 6 deletions

View File

@ -452,6 +452,62 @@ class DatabaseManager:
self.logger.error(f"获取今日订单失败: {str(e)}") self.logger.error(f"获取今日订单失败: {str(e)}")
return [] return []
# ===== 清仓请求管理 =====
def get_pending_clearance_requests(self) -> Dict[str, Dict]:
"""获取待处理的清仓请求status='pending'"""
sql = """
SELECT id, stock_code, stock_name, status, requested_by, request_time, reason
FROM trading_clearance_request
WHERE status = 'pending'
ORDER BY request_time ASC
"""
try:
results = self.execute_query(sql)
clearance_requests = {}
for row in results:
req_id, stock_code, stock_name, status, requested_by, request_time, reason = row
clearance_requests[stock_code] = {
'id': req_id,
'stock_name': stock_name,
'status': status,
'requested_by': requested_by,
'request_time': request_time.strftime('%Y-%m-%d %H:%M:%S') if request_time else None,
'reason': reason
}
return clearance_requests
except Exception as e:
self.logger.error(f"获取待清仓请求失败: {str(e)}")
return {}
def update_clearance_status(self, stock_code: str, status: str, completed_time: str = None) -> bool:
"""更新清仓请求状态"""
try:
if completed_time:
sql = """
UPDATE trading_clearance_request
SET status = %s, completed_time = %s
WHERE stock_code = %s AND status = 'pending'
"""
params = (status, completed_time, stock_code)
else:
sql = """
UPDATE trading_clearance_request
SET status = %s
WHERE stock_code = %s AND status = 'pending'
"""
params = (status, stock_code)
affected_rows = self.execute_update(sql, params)
if affected_rows > 0:
self.logger.info(f"清仓请求状态已更新: {stock_code} -> {status}")
return True
else:
self.logger.warning(f"清仓请求状态更新失败,未找到待处理请求: {stock_code}")
return False
except Exception as e:
self.logger.error(f"更新清仓请求状态失败: {str(e)}")
return False
# ===== 持仓快照同步(覆盖式) ===== # ===== 持仓快照同步(覆盖式) =====
def upsert_position_snapshot(self, stock_code: str, total_quantity: int, available_quantity: int, def upsert_position_snapshot(self, stock_code: str, total_quantity: int, available_quantity: int,
frozen_quantity: int, cost_price: float, market_price: float, frozen_quantity: int, cost_price: float, market_price: float,

View File

@ -11,8 +11,8 @@ from xtquant.xttype import StockAccount
# 导入自定义模块 # 导入自定义模块
from config_loader import get_qmt_config, get_logging_config, get_strategy_config from config_loader import get_qmt_config, get_logging_config, get_strategy_config
from strategy import load_initial_positions, create_buy_strategy_callback, create_sell_strategy_callback, \ from strategy import load_initial_positions, create_buy_strategy_callback, create_sell_strategy_callback, \
start_account_funds_monitor, start_reconciliation_monitor, start_position_snapshot_monitor, \ create_clearance_strategy_callback, start_account_funds_monitor, start_reconciliation_monitor, \
refresh_positions_snapshot start_position_snapshot_monitor, refresh_positions_snapshot
from redis_state_manager import RedisStateManager from redis_state_manager import RedisStateManager
from database_manager import DatabaseManager, close_pool from database_manager import DatabaseManager, close_pool
from trader_callback import MyXtQuantTraderCallback from trader_callback import MyXtQuantTraderCallback
@ -97,8 +97,25 @@ if __name__ == '__main__':
buy_plans = rsm.get_all_buy_plans() buy_plans = rsm.get_all_buy_plans()
sell_plans = rsm.get_all_sell_plans() sell_plans = rsm.get_all_sell_plans()
# 加载清仓计划(启动时一次性加载)
try:
dbm = DatabaseManager(logger)
clearance_requests = dbm.get_pending_clearance_requests()
# 清空Redis清仓计划并全量覆盖
rsm.clear_clearance_plans()
for code, request in clearance_requests.items():
rsm.set_clearance_plan(code, request)
logger.info(f"已从MySQL加载清仓计划到Redis{len(clearance_requests)}")
except Exception as e:
logger.warning(f"加载清仓计划失败: {str(e)}")
clearance_requests = {}
clearance_plans = rsm.get_all_clearance_plans()
logger.info(f"买入计划数量(Redis): {len(buy_plans)}") logger.info(f"买入计划数量(Redis): {len(buy_plans)}")
logger.info(f"卖出计划数量(Redis): {len(sell_plans)}") logger.info(f"卖出计划数量(Redis): {len(sell_plans)}")
logger.info(f"清仓计划数量(Redis): {len(clearance_plans)}")
# logger.info(f"每只股票买入金额: {buy_amount}元") # logger.info(f"每只股票买入金额: {buy_amount}元")
logger.info("="*50) logger.info("="*50)
@ -115,8 +132,15 @@ if __name__ == '__main__':
sell_qty = plan_info['sell_quantity'] if plan_info['sell_quantity'] else '全部' sell_qty = plan_info['sell_quantity'] if plan_info['sell_quantity'] else '全部'
logger.info(f" {stock_code} ({plan_info['stock_name']}): 目标价格 {plan_info['target_price']:.2f}, 卖出数量 {sell_qty}") logger.info(f" {stock_code} ({plan_info['stock_name']}): 目标价格 {plan_info['target_price']:.2f}, 卖出数量 {sell_qty}")
if not buy_plans and not sell_plans: # 显示清仓计划
logger.warning("警告:买入和卖出计划都为空!请检查数据库配置。") if clearance_plans:
logger.info("清仓计划:")
for stock_code, plan_info in clearance_plans.items():
reason = plan_info.get('reason', '')
logger.info(f" {stock_code} ({plan_info.get('stock_name', '')}): 清仓原因 {reason}")
if not buy_plans and not sell_plans and not clearance_plans:
logger.warning("警告:买入、卖出和清仓计划都为空!请检查数据库配置。")
logger.info("="*50) logger.info("="*50)
@ -193,8 +217,11 @@ if __name__ == '__main__':
# 创建卖出策略回调函数 # 创建卖出策略回调函数
sell_strategy_callback = create_sell_strategy_callback(xt_trader, acc, logger) sell_strategy_callback = create_sell_strategy_callback(xt_trader, acc, logger)
# 创建清仓策略回调函数
clearance_strategy_callback = create_clearance_strategy_callback(xt_trader, acc, logger)
# 订阅买卖计划中的股票行情基于Redis # 订阅买卖计划和清仓计划中的股票行情基于Redis
subscription_ids = {} # 统一管理所有订阅ID subscription_ids = {} # 统一管理所有订阅ID
if buy_plans: if buy_plans:
@ -211,6 +238,17 @@ if __name__ == '__main__':
sub_id = xtdata.subscribe_quote(code, '1d', callback=sell_strategy_callback) sub_id = xtdata.subscribe_quote(code, '1d', callback=sell_strategy_callback)
subscription_ids[code] = sub_id subscription_ids[code] = sub_id
logger.info(f"已订阅卖出 {code} ({sell_plans[code].get('stock_name','')}) 行情订阅ID: {sub_id}") logger.info(f"已订阅卖出 {code} ({sell_plans[code].get('stock_name','')}) 行情订阅ID: {sub_id}")
if clearance_plans:
clearance_code_list = list(clearance_plans.keys())
logger.info(f"开始订阅 {len(clearance_code_list)} 只清仓股票的行情...")
for code in clearance_code_list:
# 如果已经订阅过(买入或卖出),跳过
if code not in subscription_ids:
sub_id = xtdata.subscribe_quote(code, '1d', callback=clearance_strategy_callback)
subscription_ids[code] = sub_id
logger.info(f"已订阅清仓 {code} ({clearance_plans[code].get('stock_name','')}) 行情订阅ID: {sub_id}")
else:
logger.info(f"清仓股票 {code} 已在其他策略中订阅,跳过重复订阅")
# 计划同步线程每60秒从MySQL同步到Redis并直接更新订阅不再分线程管理订阅 # 计划同步线程每60秒从MySQL同步到Redis并直接更新订阅不再分线程管理订阅
import threading, time import threading, time
@ -301,6 +339,8 @@ if __name__ == '__main__':
logger.info("买入策略:当股票价格低于目标价格时,将自动买入指定金额") logger.info("买入策略:当股票价格低于目标价格时,将自动买入指定金额")
if sell_plans: if sell_plans:
logger.info("卖出策略:当股票价格高于目标价格时,将自动卖出指定数量") logger.info("卖出策略:当股票价格高于目标价格时,将自动卖出指定数量")
if clearance_plans:
logger.info("清仓策略:开盘时(连续交易时段)自动全仓卖出指定股票")
logger.info("按 Ctrl+C 停止策略") logger.info("按 Ctrl+C 停止策略")
# 阻塞主线程退出 # 阻塞主线程退出

View File

@ -10,6 +10,7 @@ PENDING_KEY = 'qmt:pending_orders' # Hash: stock_code -> json(order_info)
META_KEY = 'qmt:meta' # Hash: various metadata META_KEY = 'qmt:meta' # Hash: various metadata
PLAN_BUY_KEY = 'qmt:plan:buy' # Hash: stock_code -> json(plan) PLAN_BUY_KEY = 'qmt:plan:buy' # Hash: stock_code -> json(plan)
PLAN_SELL_KEY = 'qmt:plan:sell' # Hash: stock_code -> json(plan) PLAN_SELL_KEY = 'qmt:plan:sell' # Hash: stock_code -> json(plan)
PLAN_CLEARANCE_KEY = 'qmt:plan:clearance' # Hash: stock_code -> json(clearance_request)
PLAN_VERSION_KEY = 'qmt:plan:version' # String/integer: version bump on change PLAN_VERSION_KEY = 'qmt:plan:version' # String/integer: version bump on change
PLAN_SYNCED_VERSION_KEY = 'qmt:plan:synced_version' # 上次从DB同步的版本 PLAN_SYNCED_VERSION_KEY = 'qmt:plan:synced_version' # 上次从DB同步的版本
@ -188,3 +189,21 @@ class RedisStateManager:
def set_synced_version(self, version: int) -> None: def set_synced_version(self, version: int) -> None:
self.r.set(PLAN_SYNCED_VERSION_KEY, version) self.r.set(PLAN_SYNCED_VERSION_KEY, version)
# Clearance Plans
def get_clearance_plan(self, stock_code: str) -> Optional[Dict]:
v = self.r.hget(PLAN_CLEARANCE_KEY, stock_code)
return json.loads(v) if v else None
def set_clearance_plan(self, stock_code: str, plan: Dict) -> None:
self.r.hset(PLAN_CLEARANCE_KEY, stock_code, json.dumps(plan, ensure_ascii=False))
def del_clearance_plan(self, stock_code: str) -> None:
self.r.hdel(PLAN_CLEARANCE_KEY, stock_code)
def get_all_clearance_plans(self) -> Dict[str, Dict]:
data = self.r.hgetall(PLAN_CLEARANCE_KEY)
return {k: json.loads(v) for k, v in data.items()} if data else {}
def clear_clearance_plans(self) -> None:
self.r.delete(PLAN_CLEARANCE_KEY)

View File

@ -672,6 +672,120 @@ def start_position_snapshot_monitor(xt_trader, acc, logger, interval_seconds: in
except Exception as e: except Exception as e:
logger.error(f"启动持仓快照线程失败: {str(e)}") logger.error(f"启动持仓快照线程失败: {str(e)}")
def create_clearance_strategy_callback(xt_trader, acc, logger):
"""创建清仓策略回调函数"""
# 使用集合记录已执行清仓的股票(当日),避免重复下单
executed_clearance = set()
last_trading_date = None
def f(data):
"""
清仓策略回调函数
开盘时连续交易时段自动全仓卖出指定股票
"""
nonlocal executed_clearance, last_trading_date
# 获取当前时间
current_time = datetime.datetime.now().time()
current_date = datetime.datetime.now().date()
# 如果日期变化,清空已执行记录
if last_trading_date != current_date:
executed_clearance.clear()
last_trading_date = current_date
# 只在连续交易时段执行清仓(开盘后)
if not is_continuous_trading_time(current_time):
return
for stock_code, current_data in data.items():
if stock_code not in A.hsa:
continue
# 从Redis读取清仓计划
r = initialize_redis_manager(logger)
clearance_info = r.get_clearance_plan(stock_code)
if not clearance_info:
continue
# 如果已经执行过清仓,跳过
if stock_code in executed_clearance:
continue
try:
# 检查持仓Redis
current_position = r.get_position(stock_code)
if current_position <= 0:
logger.info(f"[清仓] {stock_code} 无持仓,标记为失败")
# 无持仓也标记为失败
db_manager = initialize_database_manager(logger)
db_manager.update_clearance_status(
stock_code,
'failed',
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
)
r.del_clearance_plan(stock_code)
executed_clearance.add(stock_code) # 标记为已处理
continue
# 检查是否有在途订单
if is_stock_pending_order(stock_code):
logger.info(f"[清仓] {stock_code} 有在途订单,等待订单完成后再清仓")
continue
# 获取当前价格
current_price = current_data[0]['close']
stock_name = clearance_info.get('stock_name', stock_code)
logger.info(f"[清仓执行] {stock_code} ({stock_name}) 当前持仓: {current_position}股, 当前价格: {current_price:.2f}")
# 全仓卖出
sell_volume = current_position
if sell_volume > 0:
# 清仓策略使用限价单价格设置为当前价的95%(向下取整到分),确保能成交
# 对于卖出,价格越低越容易成交
aggressive_price = round(current_price * 0.95, 2)
# 确保价格至少为0.01元
aggressive_price = max(aggressive_price, 0.01)
logger.info(f"准备清仓卖出 {stock_code} {sell_volume}股,当前价: {current_price:.2f}元,清仓价: {aggressive_price:.2f}当前价的95%),金额约 {sell_volume * aggressive_price:.2f}")
# 生成订单ID
order_id = f"CLEARANCE_{stock_code}_{int(time.time())}"
# 执行卖出订单使用限价单价格设置为当前价的95%,确保能成交)
async_seq = xt_trader.order_stock_async(
acc,
stock_code,
xtconstant.STOCK_SELL,
sell_volume,
xtconstant.FIX_PRICE,
aggressive_price,
'auto_clearance_strategy',
order_id
)
# 添加到在途订单(同时记录到数据库)
add_pending_order(stock_code, order_id, sell_volume, aggressive_price, aggressive_price, 'sell', logger)
# 标记为已执行清仓
executed_clearance.add(stock_code)
# 单次更新账户资金(不阻塞回调)
update_account_funds_once(xt_trader, acc, logger)
logger.info(f"已提交清仓卖出订单: {stock_code} {sell_volume}订单ID: {order_id}")
else:
logger.warning(f"清仓卖出数量为0跳过 {stock_code}")
executed_clearance.add(stock_code) # 标记为已处理
except Exception as e:
logger.error(f"处理 {stock_code} 清仓行情数据时出错: {str(e)}")
continue
return f
# 为了向后兼容,保留原来的函数名 # 为了向后兼容,保留原来的函数名
def create_strategy_callback(xt_trader, acc, buy_amount, logger): def create_strategy_callback(xt_trader, acc, buy_amount, logger):
"""创建策略回调函数(兼容性函数,实际调用买入回调)""" """创建策略回调函数(兼容性函数,实际调用买入回调)"""

View File

@ -78,13 +78,19 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback):
try: try:
# 根据订单备注判断是买入还是卖出 # 根据订单备注判断是买入还是卖出
is_buy = True # 默认买入 is_buy = True # 默认买入
if trade.order_remark.startswith('SELL_'): is_clearance = False # 是否为清仓订单
if trade.order_remark.startswith('CLEARANCE_'):
is_buy = False
is_clearance = True
elif trade.order_remark.startswith('SELL_'):
is_buy = False is_buy = False
elif trade.order_remark.startswith('BUY_'): elif trade.order_remark.startswith('BUY_'):
is_buy = True is_buy = True
# 记录交易方向 # 记录交易方向
trade_direction = "买入" if is_buy else "卖出" trade_direction = "买入" if is_buy else "卖出"
if is_clearance:
trade_direction = "清仓"
self.logger.info(f"识别交易方向: {trade_direction}") self.logger.info(f"识别交易方向: {trade_direction}")
# 更新内存和数据库持仓状态确保价格类型为float # 更新内存和数据库持仓状态确保价格类型为float
@ -117,6 +123,20 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback):
# 从在途订单中移除 # 从在途订单中移除
remove_pending_order(trade.stock_code, self.logger) remove_pending_order(trade.stock_code, self.logger)
# 如果是清仓订单且完全成交,更新清仓请求状态
if is_clearance:
completed_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.db_manager.update_clearance_status(
trade.stock_code,
'completed',
completed_time
)
# 从Redis中移除清仓计划
from strategy import initialize_redis_manager
r = initialize_redis_manager(self.logger)
r.del_clearance_plan(trade.stock_code)
self.logger.info(f"清仓订单完全成交,已更新清仓请求状态: {trade.stock_code}")
else: else:
# 部分成交 # 部分成交
self.db_manager.update_order_status( self.db_manager.update_order_status(