From 112c55fbda55017216b2fd34085e2ba8c7c466ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BB=A1=E8=84=B8=E5=B0=8F=E6=98=9F=E6=98=9F?= Date: Mon, 22 Dec 2025 15:32:27 +0800 Subject: [PATCH] =?UTF-8?q?QMT=E6=B8=85=E4=BB=93=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/QMT/database_manager.py | 56 ++++++++++++++++ src/QMT/main.py | 50 +++++++++++++-- src/QMT/redis_state_manager.py | 19 ++++++ src/QMT/strategy.py | 114 +++++++++++++++++++++++++++++++++ src/QMT/trader_callback.py | 22 ++++++- 5 files changed, 255 insertions(+), 6 deletions(-) diff --git a/src/QMT/database_manager.py b/src/QMT/database_manager.py index 25d206e..bb5455a 100644 --- a/src/QMT/database_manager.py +++ b/src/QMT/database_manager.py @@ -452,6 +452,62 @@ class DatabaseManager: self.logger.error(f"获取今日订单失败: {str(e)}") return [] + # ===== 清仓请求管理 ===== + def get_pending_clearance_requests(self) -> Dict[str, Dict]: + """获取待处理的清仓请求(status='pending')""" + sql = """ + SELECT id, stock_code, stock_name, status, requested_by, request_time, reason + FROM trading_clearance_request + WHERE status = 'pending' + ORDER BY request_time ASC + """ + try: + results = self.execute_query(sql) + clearance_requests = {} + for row in results: + req_id, stock_code, stock_name, status, requested_by, request_time, reason = row + clearance_requests[stock_code] = { + 'id': req_id, + 'stock_name': stock_name, + 'status': status, + 'requested_by': requested_by, + 'request_time': request_time.strftime('%Y-%m-%d %H:%M:%S') if request_time else None, + 'reason': reason + } + return clearance_requests + except Exception as e: + self.logger.error(f"获取待清仓请求失败: {str(e)}") + return {} + + def update_clearance_status(self, stock_code: str, status: str, completed_time: str = None) -> bool: + """更新清仓请求状态""" + try: + if completed_time: + sql = """ + UPDATE trading_clearance_request + SET status = %s, completed_time = %s + WHERE stock_code = %s AND status = 'pending' + """ + params = (status, completed_time, stock_code) + else: + sql = """ + UPDATE trading_clearance_request + SET status = %s + WHERE stock_code = %s AND status = 'pending' + """ + params = (status, stock_code) + + affected_rows = self.execute_update(sql, params) + if affected_rows > 0: + self.logger.info(f"清仓请求状态已更新: {stock_code} -> {status}") + return True + else: + self.logger.warning(f"清仓请求状态更新失败,未找到待处理请求: {stock_code}") + return False + except Exception as e: + self.logger.error(f"更新清仓请求状态失败: {str(e)}") + return False + # ===== 持仓快照同步(覆盖式) ===== def upsert_position_snapshot(self, stock_code: str, total_quantity: int, available_quantity: int, frozen_quantity: int, cost_price: float, market_price: float, diff --git a/src/QMT/main.py b/src/QMT/main.py index 2cebc52..217faf3 100644 --- a/src/QMT/main.py +++ b/src/QMT/main.py @@ -11,8 +11,8 @@ from xtquant.xttype import StockAccount # 导入自定义模块 from config_loader import get_qmt_config, get_logging_config, get_strategy_config from strategy import load_initial_positions, create_buy_strategy_callback, create_sell_strategy_callback, \ - start_account_funds_monitor, start_reconciliation_monitor, start_position_snapshot_monitor, \ - refresh_positions_snapshot + create_clearance_strategy_callback, start_account_funds_monitor, start_reconciliation_monitor, \ + start_position_snapshot_monitor, refresh_positions_snapshot from redis_state_manager import RedisStateManager from database_manager import DatabaseManager, close_pool from trader_callback import MyXtQuantTraderCallback @@ -97,8 +97,25 @@ if __name__ == '__main__': buy_plans = rsm.get_all_buy_plans() sell_plans = rsm.get_all_sell_plans() + + # 加载清仓计划(启动时一次性加载) + try: + dbm = DatabaseManager(logger) + clearance_requests = dbm.get_pending_clearance_requests() + # 清空Redis清仓计划并全量覆盖 + rsm.clear_clearance_plans() + for code, request in clearance_requests.items(): + rsm.set_clearance_plan(code, request) + logger.info(f"已从MySQL加载清仓计划到Redis,共 {len(clearance_requests)} 条") + except Exception as e: + logger.warning(f"加载清仓计划失败: {str(e)}") + clearance_requests = {} + + clearance_plans = rsm.get_all_clearance_plans() + logger.info(f"买入计划数量(Redis): {len(buy_plans)}") logger.info(f"卖出计划数量(Redis): {len(sell_plans)}") + logger.info(f"清仓计划数量(Redis): {len(clearance_plans)}") # logger.info(f"每只股票买入金额: {buy_amount}元") logger.info("="*50) @@ -115,8 +132,15 @@ if __name__ == '__main__': sell_qty = plan_info['sell_quantity'] if plan_info['sell_quantity'] else '全部' logger.info(f" {stock_code} ({plan_info['stock_name']}): 目标价格 {plan_info['target_price']:.2f}, 卖出数量 {sell_qty}") - if not buy_plans and not sell_plans: - logger.warning("警告:买入和卖出计划都为空!请检查数据库配置。") + # 显示清仓计划 + if clearance_plans: + logger.info("清仓计划:") + for stock_code, plan_info in clearance_plans.items(): + reason = plan_info.get('reason', '无') + logger.info(f" {stock_code} ({plan_info.get('stock_name', '')}): 清仓原因 {reason}") + + if not buy_plans and not sell_plans and not clearance_plans: + logger.warning("警告:买入、卖出和清仓计划都为空!请检查数据库配置。") logger.info("="*50) @@ -193,8 +217,11 @@ if __name__ == '__main__': # 创建卖出策略回调函数 sell_strategy_callback = create_sell_strategy_callback(xt_trader, acc, logger) + + # 创建清仓策略回调函数 + clearance_strategy_callback = create_clearance_strategy_callback(xt_trader, acc, logger) - # 订阅买卖计划中的股票行情(基于Redis) + # 订阅买卖计划和清仓计划中的股票行情(基于Redis) subscription_ids = {} # 统一管理所有订阅ID if buy_plans: @@ -211,6 +238,17 @@ if __name__ == '__main__': sub_id = xtdata.subscribe_quote(code, '1d', callback=sell_strategy_callback) subscription_ids[code] = sub_id logger.info(f"已订阅卖出 {code} ({sell_plans[code].get('stock_name','')}) 行情,订阅ID: {sub_id}") + if clearance_plans: + clearance_code_list = list(clearance_plans.keys()) + logger.info(f"开始订阅 {len(clearance_code_list)} 只清仓股票的行情...") + for code in clearance_code_list: + # 如果已经订阅过(买入或卖出),跳过 + if code not in subscription_ids: + sub_id = xtdata.subscribe_quote(code, '1d', callback=clearance_strategy_callback) + subscription_ids[code] = sub_id + logger.info(f"已订阅清仓 {code} ({clearance_plans[code].get('stock_name','')}) 行情,订阅ID: {sub_id}") + else: + logger.info(f"清仓股票 {code} 已在其他策略中订阅,跳过重复订阅") # 计划同步线程:每60秒从MySQL同步到Redis,并直接更新订阅(不再分线程管理订阅) import threading, time @@ -301,6 +339,8 @@ if __name__ == '__main__': logger.info("买入策略:当股票价格低于目标价格时,将自动买入指定金额") if sell_plans: logger.info("卖出策略:当股票价格高于目标价格时,将自动卖出指定数量") + if clearance_plans: + logger.info("清仓策略:开盘时(连续交易时段)自动全仓卖出指定股票") logger.info("按 Ctrl+C 停止策略") # 阻塞主线程退出 diff --git a/src/QMT/redis_state_manager.py b/src/QMT/redis_state_manager.py index 4047138..b8ae463 100644 --- a/src/QMT/redis_state_manager.py +++ b/src/QMT/redis_state_manager.py @@ -10,6 +10,7 @@ PENDING_KEY = 'qmt:pending_orders' # Hash: stock_code -> json(order_info) META_KEY = 'qmt:meta' # Hash: various metadata PLAN_BUY_KEY = 'qmt:plan:buy' # Hash: stock_code -> json(plan) PLAN_SELL_KEY = 'qmt:plan:sell' # Hash: stock_code -> json(plan) +PLAN_CLEARANCE_KEY = 'qmt:plan:clearance' # Hash: stock_code -> json(clearance_request) PLAN_VERSION_KEY = 'qmt:plan:version' # String/integer: version bump on change PLAN_SYNCED_VERSION_KEY = 'qmt:plan:synced_version' # 上次从DB同步的版本 @@ -188,3 +189,21 @@ class RedisStateManager: def set_synced_version(self, version: int) -> None: self.r.set(PLAN_SYNCED_VERSION_KEY, version) + + # Clearance Plans + def get_clearance_plan(self, stock_code: str) -> Optional[Dict]: + v = self.r.hget(PLAN_CLEARANCE_KEY, stock_code) + return json.loads(v) if v else None + + def set_clearance_plan(self, stock_code: str, plan: Dict) -> None: + self.r.hset(PLAN_CLEARANCE_KEY, stock_code, json.dumps(plan, ensure_ascii=False)) + + def del_clearance_plan(self, stock_code: str) -> None: + self.r.hdel(PLAN_CLEARANCE_KEY, stock_code) + + def get_all_clearance_plans(self) -> Dict[str, Dict]: + data = self.r.hgetall(PLAN_CLEARANCE_KEY) + return {k: json.loads(v) for k, v in data.items()} if data else {} + + def clear_clearance_plans(self) -> None: + self.r.delete(PLAN_CLEARANCE_KEY) diff --git a/src/QMT/strategy.py b/src/QMT/strategy.py index 47c499b..cb2917b 100644 --- a/src/QMT/strategy.py +++ b/src/QMT/strategy.py @@ -672,6 +672,120 @@ def start_position_snapshot_monitor(xt_trader, acc, logger, interval_seconds: in except Exception as e: logger.error(f"启动持仓快照线程失败: {str(e)}") +def create_clearance_strategy_callback(xt_trader, acc, logger): + """创建清仓策略回调函数""" + # 使用集合记录已执行清仓的股票(当日),避免重复下单 + executed_clearance = set() + last_trading_date = None + + def f(data): + """ + 清仓策略回调函数 + 开盘时(连续交易时段)自动全仓卖出指定股票 + """ + nonlocal executed_clearance, last_trading_date + + # 获取当前时间 + current_time = datetime.datetime.now().time() + current_date = datetime.datetime.now().date() + + # 如果日期变化,清空已执行记录 + if last_trading_date != current_date: + executed_clearance.clear() + last_trading_date = current_date + + # 只在连续交易时段执行清仓(开盘后) + if not is_continuous_trading_time(current_time): + return + + for stock_code, current_data in data.items(): + if stock_code not in A.hsa: + continue + + # 从Redis读取清仓计划 + r = initialize_redis_manager(logger) + clearance_info = r.get_clearance_plan(stock_code) + if not clearance_info: + continue + + # 如果已经执行过清仓,跳过 + if stock_code in executed_clearance: + continue + + try: + # 检查持仓(Redis) + current_position = r.get_position(stock_code) + if current_position <= 0: + logger.info(f"[清仓] {stock_code} 无持仓,标记为失败") + # 无持仓也标记为失败 + db_manager = initialize_database_manager(logger) + db_manager.update_clearance_status( + stock_code, + 'failed', + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + ) + r.del_clearance_plan(stock_code) + executed_clearance.add(stock_code) # 标记为已处理 + continue + + # 检查是否有在途订单 + if is_stock_pending_order(stock_code): + logger.info(f"[清仓] {stock_code} 有在途订单,等待订单完成后再清仓") + continue + + # 获取当前价格 + current_price = current_data[0]['close'] + stock_name = clearance_info.get('stock_name', stock_code) + + logger.info(f"[清仓执行] {stock_code} ({stock_name}) 当前持仓: {current_position}股, 当前价格: {current_price:.2f}") + + # 全仓卖出 + sell_volume = current_position + + if sell_volume > 0: + # 清仓策略:使用限价单,价格设置为当前价的95%(向下取整到分),确保能成交 + # 对于卖出,价格越低越容易成交 + aggressive_price = round(current_price * 0.95, 2) + # 确保价格至少为0.01元 + aggressive_price = max(aggressive_price, 0.01) + + logger.info(f"准备清仓卖出 {stock_code} {sell_volume}股,当前价: {current_price:.2f}元,清仓价: {aggressive_price:.2f}元(当前价的95%),金额约 {sell_volume * aggressive_price:.2f}元") + + # 生成订单ID + order_id = f"CLEARANCE_{stock_code}_{int(time.time())}" + + # 执行卖出订单(使用限价单,价格设置为当前价的95%,确保能成交) + async_seq = xt_trader.order_stock_async( + acc, + stock_code, + xtconstant.STOCK_SELL, + sell_volume, + xtconstant.FIX_PRICE, + aggressive_price, + 'auto_clearance_strategy', + order_id + ) + + # 添加到在途订单(同时记录到数据库) + add_pending_order(stock_code, order_id, sell_volume, aggressive_price, aggressive_price, 'sell', logger) + + # 标记为已执行清仓 + executed_clearance.add(stock_code) + + # 单次更新账户资金(不阻塞回调) + update_account_funds_once(xt_trader, acc, logger) + + logger.info(f"已提交清仓卖出订单: {stock_code} {sell_volume}股,订单ID: {order_id}") + else: + logger.warning(f"清仓卖出数量为0,跳过 {stock_code}") + executed_clearance.add(stock_code) # 标记为已处理 + + except Exception as e: + logger.error(f"处理 {stock_code} 清仓行情数据时出错: {str(e)}") + continue + + return f + # 为了向后兼容,保留原来的函数名 def create_strategy_callback(xt_trader, acc, buy_amount, logger): """创建策略回调函数(兼容性函数,实际调用买入回调)""" diff --git a/src/QMT/trader_callback.py b/src/QMT/trader_callback.py index 39427e3..334ee6e 100644 --- a/src/QMT/trader_callback.py +++ b/src/QMT/trader_callback.py @@ -78,13 +78,19 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): try: # 根据订单备注判断是买入还是卖出 is_buy = True # 默认买入 - if trade.order_remark.startswith('SELL_'): + is_clearance = False # 是否为清仓订单 + if trade.order_remark.startswith('CLEARANCE_'): + is_buy = False + is_clearance = True + elif trade.order_remark.startswith('SELL_'): is_buy = False elif trade.order_remark.startswith('BUY_'): is_buy = True # 记录交易方向 trade_direction = "买入" if is_buy else "卖出" + if is_clearance: + trade_direction = "清仓" self.logger.info(f"识别交易方向: {trade_direction}") # 更新内存和数据库持仓状态(确保价格类型为float) @@ -117,6 +123,20 @@ class MyXtQuantTraderCallback(XtQuantTraderCallback): # 从在途订单中移除 remove_pending_order(trade.stock_code, self.logger) + + # 如果是清仓订单且完全成交,更新清仓请求状态 + if is_clearance: + completed_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + self.db_manager.update_clearance_status( + trade.stock_code, + 'completed', + completed_time + ) + # 从Redis中移除清仓计划 + from strategy import initialize_redis_manager + r = initialize_redis_manager(self.logger) + r.del_clearance_plan(trade.stock_code) + self.logger.info(f"清仓订单完全成交,已更新清仓请求状态: {trade.stock_code}") else: # 部分成交 self.db_manager.update_order_status(