stock_fundamentals/src/QMT/strategy.py

602 lines
26 KiB
Python
Raw Normal View History

2025-09-19 11:54:39 +08:00
# coding:utf-8
import time
import datetime
import threading
import logging
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
from xtquant import xtconstant
from database_manager import DatabaseManager
from redis_state_manager import RedisStateManager
def is_call_auction_time(now_time=None):
"""
判断当前时间是否处于开盘集合竞价时段
"""
if not now_time:
now_time = datetime.datetime.now().time()
# 开盘集合竞价时间段
call_auction_start = datetime.time(9, 15)
call_auction_end = datetime.time(9, 25)
return call_auction_start <= now_time <= call_auction_end
def is_continuous_trading_time(now_time=None):
"""
判断当前时间是否处于连续交易时段
"""
if not now_time:
now_time = datetime.datetime.now().time()
# 上午连续交易时段
morning_start = datetime.time(9, 30)
morning_end = datetime.time(11, 30)
# 下午连续交易时段
afternoon_start = datetime.time(13, 0)
afternoon_end = datetime.time(14, 57)
is_morning = morning_start <= now_time <= morning_end
is_afternoon = afternoon_start <= now_time <= afternoon_end
return is_morning or is_afternoon
# 定义一个类 创建类的实例 作为状态的容器
class _a():
pass
A = _a()
A.hsa = xtdata.get_stock_list_in_sector('沪深A股')
A.order_timeout = 300 # 订单超时时间默认5分钟
# 数据库管理器实例
db_manager = None
redis_manager = None
def initialize_database_manager(logger=None):
"""初始化数据库管理器"""
global db_manager
if db_manager is None:
db_manager = DatabaseManager(logger)
return db_manager
def initialize_redis_manager(logger=None):
"""初始化Redis状态管理器"""
global redis_manager
if redis_manager is None:
redis_manager = RedisStateManager(logger)
return redis_manager
def load_trading_plans_from_database(strategy_id=1, logger=None):
"""已弃用计划改为直接从Redis读取保留此函数用于日志兼容"""
if logger:
logger.info("交易计划现改为从Redis读取数据库仅做审计。")
return {}, {}
def load_initial_positions(xt_trader, acc, logger):
"""启动时一次性加载初始持仓状态券商→Redis并返回Redis视图"""
try:
# 先从QMT获取持仓
positions = xt_trader.query_stock_positions(acc)
qmt_positions = {pos.stock_code: pos.m_nVolume for pos in positions if pos.m_nVolume > 0}
# 从数据库获取持仓
db_manager = initialize_database_manager(logger)
db_positions = db_manager.get_current_positions()
# 将券商持仓写入Redis为权威值DB中但券商为0的暂不写入避免脏数据覆盖
r = initialize_redis_manager(logger)
# 清理Redis中不存在于券商的持仓设为0即删除
for code in list(r.get_all_positions().keys()):
if code not in qmt_positions:
r.set_position(code, 0)
for code, qty in qmt_positions.items():
r.set_position(code, qty)
logger.info(f"初始持仓加载完成已同步至Redis")
logger.info(f" QMT持仓: {qmt_positions}")
logger.info(f" 数据库持仓: {db_positions}")
return r.get_all_positions()
except Exception as e:
logger.error(f"加载初始持仓失败: {str(e)}")
A.positions = {}
return {}
def update_position_in_memory(stock_code, quantity_change, is_buy=True, price=None, logger=None):
"""以Redis为主更新持仓同时更新数据库不再维护内存镜像"""
try:
# Redis持仓更新
r = initialize_redis_manager(logger)
delta = quantity_change if is_buy else -quantity_change
new_qty = r.incr_position(stock_code, delta)
# 更新数据库持仓
if price is not None:
db_manager = initialize_database_manager(logger)
db_manager.update_position(stock_code, quantity_change, price, is_buy)
if logger:
logger.info(f"持仓状态已更新: {stock_code} -> {new_qty}Redis")
except Exception as e:
if logger:
logger.error(f"更新持仓状态失败: {str(e)}")
def add_pending_order(stock_code, order_id, order_quantity, order_price, target_price, order_side='buy', logger=None):
"""添加在途订单写Redis为主同时记录数据库"""
try:
order_info = {
'order_id': order_id,
'order_quantity': order_quantity,
'order_price': order_price,
'target_price': target_price,
'order_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'status': 'pending',
'side': order_side
}
# Redis
r = initialize_redis_manager(logger)
r.set_pending(stock_code, order_info)
# 记录到数据库
db_manager = initialize_database_manager(logger)
order_data = {
'order_id': order_id,
'strategy_id': 1,
'stock_code': stock_code,
'order_side': order_side,
'order_quantity': order_quantity,
'order_price': order_price,
'target_price': target_price,
'order_status': 'pending',
'order_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'order_remark': f"{order_side.upper()}_{stock_code}_{int(time.time())}"
}
db_manager.insert_order(order_data)
if logger:
logger.info(f"添加在途订单: {stock_code} -> {order_id}")
except Exception as e:
if logger:
logger.error(f"添加在途订单失败: {str(e)}")
def remove_pending_order(stock_code, logger=None):
"""移除在途订单,同时更新数据库状态"""
try:
r = initialize_redis_manager(logger)
order_info = r.get_pending(stock_code)
if order_info is None:
if logger:
logger.warning(f"未找到在途订单: {stock_code}")
return
r.del_pending(stock_code)
# 更新数据库订单状态
print("====================================")
db_manager = initialize_database_manager(logger)
db_manager.update_order_status(order_info['order_id'], 'completed')
if logger:
logger.info(f"移除在途订单: {stock_code} -> {order_info['order_id']}")
except Exception as e:
if logger:
logger.error(f"移除在途订单失败: {str(e)}")
def is_stock_pending_order(stock_code):
"""检查股票是否有在途订单以Redis为准"""
r = initialize_redis_manager()
return r.get_pending(stock_code) is not None
def check_order_timeout(xt_trader, acc, logger):
"""已停用:本策略不再执行超时自动撤单,仅保留占位"""
logger.info("check_order_timeout 已停用:挂单后不再按超时撤单")
return
def start_order_timeout_monitor(xt_trader, acc, logger):
"""已停用:不再启动订单超时监控线程"""
logger.info("订单超时监控已停用:不启动后台监控线程")
return
def create_buy_strategy_callback(xt_trader, acc, buy_amount, logger):
"""创建买入策略回调函数"""
def f(data):
"""
实时行情回调函数
检查当前价格是否低于目标价格如果满足条件则买入
"""
# logger.info(f"收到买入行情数据: {datetime.datetime.now()}")
# 获取当前时间
current_time = datetime.datetime.now().time()
# 判断当前交易时段
is_call_auction = is_call_auction_time(current_time)
is_continuous = is_continuous_trading_time(current_time)
for stock_code, current_data in data.items():
if stock_code not in A.hsa:
continue
# 从Redis读取买入计划
r = initialize_redis_manager(logger)
plan_info = r.get_buy_plan(stock_code)
if not plan_info or plan_info.get('is_active', 1) != 1:
continue
try:
# 获取当前价格和目标价格
current_price = current_data[0]['close']
target_price = plan_info['target_price']
buy_amount = plan_info['buy_amount']
# logger.info(f"[买入监控] {stock_code} 当前价格: {current_price:.2f}, 目标价格: {target_price:.2f}")
# 判断是否满足买入条件:当前价格低于目标价格
if current_price < target_price:
# 检查持仓Redis
current_qty = r.get_position(stock_code)
if current_qty > 0:
# logger.info(f"{stock_code} 已有持仓 {current_qty}股,跳过买入")
continue
# 检查是否有在途订单Redis
if is_stock_pending_order(stock_code):
2025-10-11 14:05:18 +08:00
# logger.info(f"{stock_code} 有在途订单,跳过买入")
2025-09-19 11:54:39 +08:00
continue
# 集合竞价时段:只观察,不下单
if is_call_auction:
logger.info(f"[集合竞价观察] {stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f},等待连续交易时段下单")
continue
# 连续交易时段:执行下单
if is_continuous:
logger.info(f"[连续交易下单] 满足买入条件!{stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f}")
# 计算买入数量(从数据库获取金额)
buy_volume = int(buy_amount / current_price / 100) * 100 # 取整为100的整数倍
if buy_volume > 0:
logger.info(f"准备买入 {stock_code} {buy_volume}股,金额约 {buy_volume * current_price:.2f}")
# 生成订单ID
order_id = f"BUY_{stock_code}_{int(time.time())}"
# 执行买入订单
async_seq = xt_trader.order_stock_async(
acc,
stock_code,
xtconstant.STOCK_BUY,
buy_volume,
xtconstant.FIX_PRICE,
target_price,
'auto_buy_strategy',
order_id
)
# 添加到在途订单(同时记录到数据库)
add_pending_order(stock_code, order_id, buy_volume, current_price, target_price, 'buy', logger)
logger.info(f"已提交买入订单: {stock_code} {buy_volume}股,价格{target_price}订单ID: {order_id}")
else:
logger.warning(f"买入数量计算为0跳过 {stock_code}")
else:
# 非交易时段
logger.info(f"[非交易时段] {stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f},等待交易时段")
except Exception as e:
logger.error(f"处理 {stock_code} 买入行情数据时出错: {str(e)}")
continue
return f
def create_sell_strategy_callback(xt_trader, acc, logger):
"""创建卖出策略回调函数"""
def f(data):
"""
实时行情回调函数
检查当前价格是否高于目标价格如果满足条件且持有该股票则卖出
"""
# logger.info(f"收到卖出行情数据: {datetime.datetime.now()}")
# 获取当前时间
current_time = datetime.datetime.now().time()
# 判断当前交易时段
is_call_auction = is_call_auction_time(current_time)
is_continuous = is_continuous_trading_time(current_time)
for stock_code, current_data in data.items():
if stock_code not in A.hsa:
continue
# 从Redis读取卖出计划
r = initialize_redis_manager(logger)
plan_info = r.get_sell_plan(stock_code)
if not plan_info or plan_info.get('is_active', 1) != 1:
continue
try:
# 获取当前价格和目标价格
current_price = current_data[0]['close']
target_price = plan_info['target_price']
# logger.info(f"[卖出监控] {stock_code} 当前价格: {current_price:.2f}, 目标价格: {target_price:.2f}")
# 判断是否满足卖出条件:当前价格高于目标价格
if current_price > target_price:
# 检查持仓Redis
r = initialize_redis_manager(logger)
current_position = r.get_position(stock_code)
if current_position <= 0:
logger.info(f"{stock_code} 无持仓,跳过卖出")
continue
# 检查是否有在途订单
if is_stock_pending_order(stock_code):
2025-10-11 14:05:18 +08:00
# logger.info(f"{stock_code} 有在途订单,跳过卖出")
2025-09-19 11:54:39 +08:00
continue
# 集合竞价时段:只观察,不下单
if is_call_auction:
logger.info(f"[集合竞价观察] {stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f},等待连续交易时段下单")
continue
# 连续交易时段:执行下单
if is_continuous:
logger.info(f"[连续交易下单] 满足卖出条件!{stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f}")
# 卖出数量:全部持仓
sell_volume = current_position
if sell_volume > 0:
logger.info(f"准备卖出 {stock_code} {sell_volume}股,金额约 {sell_volume * current_price:.2f}")
# 生成订单ID
order_id = f"SELL_{stock_code}_{int(time.time())}"
# 执行卖出订单
async_seq = xt_trader.order_stock_async(
acc,
stock_code,
xtconstant.STOCK_SELL,
sell_volume,
xtconstant.LATEST_PRICE,
target_price,
'auto_sell_strategy',
order_id
)
# 添加到在途订单(同时记录到数据库)
add_pending_order(stock_code, order_id, sell_volume, current_price, target_price, 'sell', logger)
logger.info(f"已提交卖出订单: {stock_code} {sell_volume}股,价格{target_price}订单ID: {order_id}")
else:
logger.warning(f"卖出数量为0跳过 {stock_code}")
else:
# 非交易时段
logger.info(f"[非交易时段] {stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f},等待交易时段")
except Exception as e:
logger.error(f"处理 {stock_code} 卖出行情数据时出错: {str(e)}")
continue
return f
def update_account_funds_periodically(xt_trader, acc, logger):
"""每小时更新账户资金信息到数据库"""
while True:
try:
# 获取账户资金信息
account_info = xt_trader.query_stock_asset(acc)
if account_info:
# 计算相关数据
available_cash = getattr(account_info, 'm_dCash', 0.0)
frozen_cash = getattr(account_info, 'm_dFrozenCash', 0.0)
market_value = getattr(account_info, 'm_dMarketValue', 0.0)
total_asset = available_cash + frozen_cash + market_value
# 尝试获取盈亏信息如果属性不存在则设为0
try:
profit_loss = getattr(account_info, 'm_dProfitLoss', 0.0)
except AttributeError:
# 如果m_dProfitLoss不存在尝试其他可能的属性名
profit_loss = getattr(account_info, 'm_dProfit', 0.0)
# 计算盈亏比例
if total_asset > 0:
profit_loss_ratio = (profit_loss / total_asset) * 100
else:
profit_loss_ratio = 0.0
# 更新到数据库
db_manager = initialize_database_manager(logger)
success = db_manager.update_account_funds(
account_id=acc.account_id,
account_type="acc.account_type",
total_asset=total_asset,
available_cash=available_cash,
frozen_cash=frozen_cash,
market_value=market_value,
profit_loss=profit_loss,
profit_loss_ratio=profit_loss_ratio
)
if success:
logger.info(f"账户资金更新成功 - 总资产: {total_asset:.2f}, 可用资金: {available_cash:.2f}, 市值: {market_value:.2f}")
else:
logger.warning("账户资金更新失败")
else:
logger.warning("无法获取账户资金信息")
except Exception as e:
logger.error(f"更新账户资金信息时出错: {str(e)}")
# 等待1小时3600秒
time.sleep(3600)
def start_account_funds_monitor(xt_trader, acc, logger):
"""启动账户资金监控线程(每小时更新一次)"""
try:
funds_thread = threading.Thread(
target=update_account_funds_periodically,
args=(xt_trader, acc, logger),
daemon=True
)
funds_thread.start()
logger.info("账户资金监控线程已启动(每小时更新一次)")
except Exception as e:
logger.error(f"启动账户资金监控失败: {str(e)}")
# ========== 每5分钟对账查询券商持仓并反向更新订单/持仓 ==========
def reconcile_orders_and_positions(xt_trader, acc, logger):
"""一次性对账(基于当日成交对比委托):
- 仅查询MySQL今日订单submittedfilled[部分]pendingordered
- 通过QMT接口获取当日成交明细/汇总按股票累计成交量对比委托量
* 当日成交量 >= 委托量 => 状态置为 completed
* 0 < 当日成交量 < 委托量 => 状态置为 filled部分成交记录 filled_quantity
* 当日成交量 == 0 => 不处理
- 本函数不更新持仓持仓改由快照线程覆盖更新
"""
try:
dbm = initialize_database_manager(logger)
# 1) 读取今日未完成订单四种状态submitted、filled(部分成交)、pending、委托订单
# 你提到“委托订单”第四种状态名,这里按 'ordered' 兜底;如你表里具体值不同,可替换。
candidate_statuses = ['submitted', 'filled', 'pending', 'ordered']
orders = dbm.get_todays_orders_by_statuses(candidate_statuses)
if not orders:
return
# 2) 通过QMT查询当日成交结果并按股票聚合成交数量
filled_volume_by_code = {}
trades = xt_trader.query_stock_orders(acc, cancelable_only = False)
for tr in trades or []:
code = getattr(tr, 'stock_code', None) or getattr(tr, 'm_strInstrumentID', None)
vol = int(getattr(tr, 'traded_volume', 0) or getattr(tr, 'm_nVolume', 0) or 0)
if not code or vol <= 0:
continue
filled_volume_by_code[code] = filled_volume_by_code.get(code, 0) + vol
# 3) 遍历订单并根据“当日成交累计 vs 委托量”判断状态
for od in orders:
try:
stock_code = od['stock_code']
side = od['side']
order_id = od['order_id']
qty = int(od.get('order_quantity') or 0)
day_filled = int(filled_volume_by_code.get(stock_code, 0))
if day_filled <= 0:
continue
if day_filled >= qty and qty > 0:
# 全部成交 -> completed
dbm.update_order_status(order_id, 'completed', qty, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info(f"[对账-简] 全部成交(>=委托): {stock_code} {side} {day_filled}/{qty}")
elif 0 < day_filled < qty:
# 部分成交 -> filled部分成交
dbm.update_order_status(order_id, 'filled', day_filled, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info(f"[对账-简] 部分成交(<委托): {stock_code} {side} {day_filled}/{qty}")
except Exception as ie:
logger.warning(f"[对账-简] 处理订单失败 {od}: {ie}")
except Exception as e:
if logger:
logger.error(f"对账过程出错: {str(e)}")
def start_reconciliation_monitor(xt_trader, acc, logger, interval_seconds: int = 300):
"""启动对账后台线程默认每5分钟一次"""
try:
def worker():
while True:
try:
reconcile_orders_and_positions(xt_trader, acc, logger)
except Exception as ie:
logger.warning(f"对账线程异常: {ie}")
time.sleep(interval_seconds)
t = threading.Thread(target=worker, daemon=True)
t.start()
logger.info(f"对账线程已启动(每{interval_seconds}秒)")
except Exception as e:
logger.error(f"启动对账线程失败: {str(e)}")
# ========== 每5分钟持仓快照覆盖更新仅DB ==========
def refresh_positions_snapshot(xt_trader, acc, logger):
"""从QMT拉取当前持仓覆盖更新到DB的 trading_position
- 对存在的标的 upsert 所有持仓字段数量可用冻结成本价市价市值盈亏等
- 删除DB中多余的标的
本函数不操作Redis
"""
try:
dbm = initialize_database_manager(logger)
positions = xt_trader.query_stock_positions(acc) or []
valid_codes = []
for pos in positions:
try:
code = pos.stock_code
# 获取持仓基本信息
total_qty = int(getattr(pos, 'm_nVolume', 0) or 0)
available_qty = int(getattr(pos, 'm_nCanUseVolume', 0) or 0)
frozen_qty = max(0, total_qty - available_qty) # 冻结数量 = 总数量 - 可用数量
# 获取价格和市值信息
cost_price = float(getattr(pos, 'm_dOpenPrice', 0.0) or 0.0)
market_price = float(getattr(pos, 'm_dLastPrice', 0.0) or 0.0)
market_value = float(getattr(pos, 'm_dInstrumentValue', 0.0) or 0.0)
# 获取盈亏信息
profit_loss = float(getattr(pos, 'm_dPositionProfit', 0.0) or 0.0)
position_cost = float(getattr(pos, 'm_dPositionCost', 0.0) or 0.0)
# 计算盈亏比例
if position_cost > 0:
profit_loss_ratio = (profit_loss / position_cost) * 100
else:
profit_loss_ratio = 0.0
if total_qty > 0:
dbm.upsert_position_snapshot(
code, total_qty, available_qty, frozen_qty,
cost_price, market_price, market_value,
profit_loss, profit_loss_ratio
)
valid_codes.append(code)
except Exception as e:
logger.warning(f"处理持仓记录失败 {getattr(pos, 'stock_code', 'unknown')}: {e}")
continue
# 清理非持有标的
dbm.delete_positions_except(valid_codes)
logger.info(f"[持仓快照] 已覆盖更新 {len(valid_codes)} 条持仓记录")
except Exception as e:
if logger:
logger.error(f"持仓快照刷新失败: {str(e)}")
def start_position_snapshot_monitor(xt_trader, acc, logger, interval_seconds: int = 300):
try:
def worker():
while True:
try:
refresh_positions_snapshot(xt_trader, acc, logger)
except Exception as ie:
logger.warning(f"持仓快照线程异常: {ie}")
time.sleep(interval_seconds)
t = threading.Thread(target=worker, daemon=True)
t.start()
logger.info(f"持仓快照线程已启动(每{interval_seconds}秒)")
except Exception as e:
logger.error(f"启动持仓快照线程失败: {str(e)}")
# 为了向后兼容,保留原来的函数名
def create_strategy_callback(xt_trader, acc, buy_amount, logger):
"""创建策略回调函数(兼容性函数,实际调用买入回调)"""
return create_buy_strategy_callback(xt_trader, acc, buy_amount, logger)