stock_fundamentals/src/QMT/strategy.py

618 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding:utf-8
import time
import datetime
import threading
import logging
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
from xtquant import xtconstant
from database_manager import DatabaseManager
from redis_state_manager import RedisStateManager
def is_call_auction_time(now_time=None):
"""
判断当前时间是否处于开盘集合竞价时段
"""
if not now_time:
now_time = datetime.datetime.now().time()
# 开盘集合竞价时间段
call_auction_start = datetime.time(9, 15)
call_auction_end = datetime.time(9, 25)
return call_auction_start <= now_time <= call_auction_end
def is_continuous_trading_time(now_time=None):
"""
判断当前时间是否处于连续交易时段
"""
if not now_time:
now_time = datetime.datetime.now().time()
# 上午连续交易时段
morning_start = datetime.time(9, 30)
morning_end = datetime.time(11, 30)
# 下午连续交易时段
afternoon_start = datetime.time(13, 0)
afternoon_end = datetime.time(14, 57)
is_morning = morning_start <= now_time <= morning_end
is_afternoon = afternoon_start <= now_time <= afternoon_end
return is_morning or is_afternoon
# 定义一个类 创建类的实例 作为状态的容器
class _a():
pass
A = _a()
A.hsa = xtdata.get_stock_list_in_sector('沪深A股')
A.order_timeout = 300 # 订单超时时间默认5分钟
# 数据库管理器实例
db_manager = None
redis_manager = None
def initialize_database_manager(logger=None):
"""初始化数据库管理器"""
global db_manager
if db_manager is None:
db_manager = DatabaseManager(logger)
return db_manager
def initialize_redis_manager(logger=None):
"""初始化Redis状态管理器"""
global redis_manager
if redis_manager is None:
redis_manager = RedisStateManager(logger)
return redis_manager
def load_trading_plans_from_database(strategy_id=1, logger=None):
"""已弃用计划改为直接从Redis读取保留此函数用于日志兼容"""
if logger:
logger.info("交易计划现改为从Redis读取数据库仅做审计。")
return {}, {}
def load_initial_positions(xt_trader, acc, logger):
"""启动时一次性加载初始持仓状态券商→Redis并返回Redis视图"""
try:
# 先从QMT获取持仓
positions = xt_trader.query_stock_positions(acc)
qmt_positions = {pos.stock_code: pos.m_nVolume for pos in positions if pos.m_nVolume > 0}
# 从数据库获取持仓
db_manager = initialize_database_manager(logger)
db_positions = db_manager.get_current_positions()
# 将券商持仓写入Redis为权威值DB中但券商为0的暂不写入避免脏数据覆盖
r = initialize_redis_manager(logger)
# 清理Redis中不存在于券商的持仓设为0即删除
for code in list(r.get_all_positions().keys()):
if code not in qmt_positions:
r.set_position(code, 0)
for code, qty in qmt_positions.items():
r.set_position(code, qty)
logger.info(f"初始持仓加载完成已同步至Redis")
logger.info(f" QMT持仓: {qmt_positions}")
logger.info(f" 数据库持仓: {db_positions}")
return r.get_all_positions()
except Exception as e:
logger.error(f"加载初始持仓失败: {str(e)}")
A.positions = {}
return {}
def update_position_in_memory(stock_code, quantity_change, is_buy=True, price=None, logger=None):
"""以Redis为主更新持仓同时更新数据库不再维护内存镜像"""
try:
# Redis持仓更新
r = initialize_redis_manager(logger)
delta = quantity_change if is_buy else -quantity_change
new_qty = r.incr_position(stock_code, delta)
# 更新数据库持仓
if price is not None:
db_manager = initialize_database_manager(logger)
db_manager.update_position(stock_code, quantity_change, price, is_buy)
if logger:
logger.info(f"持仓状态已更新: {stock_code} -> {new_qty}Redis")
except Exception as e:
if logger:
logger.error(f"更新持仓状态失败: {str(e)}")
def add_pending_order(stock_code, order_id, order_quantity, order_price, target_price, order_side='buy', logger=None):
"""添加在途订单写Redis为主同时记录数据库"""
try:
order_info = {
'order_id': order_id,
'order_quantity': order_quantity,
'order_price': order_price,
'target_price': target_price,
'order_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'status': 'pending',
'side': order_side
}
# Redis
r = initialize_redis_manager(logger)
r.set_pending(stock_code, order_info)
# 记录到数据库
db_manager = initialize_database_manager(logger)
order_data = {
'order_id': order_id,
'strategy_id': 1,
'stock_code': stock_code,
'order_side': order_side,
'order_quantity': order_quantity,
'order_price': order_price,
'target_price': target_price,
'order_status': 'pending',
'order_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'order_remark': f"{order_side.upper()}_{stock_code}_{int(time.time())}"
}
db_manager.insert_order(order_data)
if logger:
logger.info(f"添加在途订单: {stock_code} -> {order_id}")
except Exception as e:
if logger:
logger.error(f"添加在途订单失败: {str(e)}")
def remove_pending_order(stock_code, logger=None):
"""移除在途订单,同时更新数据库状态"""
try:
r = initialize_redis_manager(logger)
order_info = r.get_pending(stock_code)
if order_info is None:
if logger:
logger.warning(f"未找到在途订单: {stock_code}")
return
r.del_pending(stock_code)
# 更新数据库订单状态
print("====================================")
db_manager = initialize_database_manager(logger)
db_manager.update_order_status(order_info['order_id'], 'completed')
if logger:
logger.info(f"移除在途订单: {stock_code} -> {order_info['order_id']}")
except Exception as e:
if logger:
logger.error(f"移除在途订单失败: {str(e)}")
def is_stock_pending_order(stock_code):
"""检查股票是否有在途订单以Redis为准"""
r = initialize_redis_manager()
return r.get_pending(stock_code) is not None
def check_order_timeout(xt_trader, acc, logger):
"""已停用:本策略不再执行超时自动撤单,仅保留占位"""
logger.info("check_order_timeout 已停用:挂单后不再按超时撤单")
return
def start_order_timeout_monitor(xt_trader, acc, logger):
"""已停用:不再启动订单超时监控线程"""
logger.info("订单超时监控已停用:不启动后台监控线程")
return
def create_buy_strategy_callback(xt_trader, acc, buy_amount, logger):
"""创建买入策略回调函数"""
def f(data):
"""
实时行情回调函数
检查当前价格是否低于目标价格,如果满足条件则买入
"""
# logger.info(f"收到买入行情数据: {datetime.datetime.now()}")
# 获取当前时间
current_time = datetime.datetime.now().time()
# 判断当前交易时段
is_call_auction = is_call_auction_time(current_time)
is_continuous = is_continuous_trading_time(current_time)
for stock_code, current_data in data.items():
if stock_code not in A.hsa:
continue
# 从Redis读取买入计划
r = initialize_redis_manager(logger)
plan_info = r.get_buy_plan(stock_code)
if not plan_info or plan_info.get('is_active', 1) != 1:
continue
try:
# 获取当前价格和目标价格
current_price = current_data[0]['close']
target_price = plan_info['target_price']
buy_amount = plan_info['buy_amount']
# logger.info(f"[买入监控] {stock_code} 当前价格: {current_price:.2f}, 目标价格: {target_price:.2f}")
# 判断是否满足买入条件:当前价格低于目标价格
if current_price < target_price:
# 检查持仓Redis
current_qty = r.get_position(stock_code)
if current_qty > 0:
# logger.info(f"{stock_code} 已有持仓 {current_qty}股,跳过买入")
continue
# 检查是否有在途订单Redis
if is_stock_pending_order(stock_code):
# logger.info(f"{stock_code} 有在途订单,跳过买入")
continue
# 集合竞价时段:只观察,不下单
if is_call_auction:
logger.info(f"[集合竞价观察] {stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f},等待连续交易时段下单")
continue
# 连续交易时段:执行下单
if is_continuous:
logger.info(f"[连续交易下单] 满足买入条件!{stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f}")
# 计算买入数量(从数据库获取金额)
buy_volume = int(buy_amount / current_price / 100) * 100 # 取整为100的整数倍
if buy_volume > 0:
logger.info(f"准备买入 {stock_code} {buy_volume}股,金额约 {buy_volume * current_price:.2f}")
# 生成订单ID
order_id = f"BUY_{stock_code}_{int(time.time())}"
# 执行买入订单
async_seq = xt_trader.order_stock_async(
acc,
stock_code,
xtconstant.STOCK_BUY,
buy_volume,
xtconstant.FIX_PRICE,
current_price,
'auto_buy_strategy',
order_id
)
# 添加到在途订单(同时记录到数据库)
add_pending_order(stock_code, order_id, buy_volume, current_price, target_price, 'buy', logger)
logger.info(f"已提交买入订单: {stock_code} {buy_volume}股,价格{target_price}订单ID: {order_id}")
update_account_funds_periodically(xt_trader, acc, logger)
else:
logger.warning(f"买入数量计算为0跳过 {stock_code}")
else:
# 非交易时段
logger.info(f"[非交易时段] {stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f},等待交易时段")
except Exception as e:
logger.error(f"处理 {stock_code} 买入行情数据时出错: {str(e)}")
continue
return f
def create_sell_strategy_callback(xt_trader, acc, logger):
"""创建卖出策略回调函数"""
def f(data):
"""
实时行情回调函数
检查当前价格是否高于目标价格,如果满足条件且持有该股票则卖出
"""
# logger.info(f"收到卖出行情数据: {datetime.datetime.now()}")
# 获取当前时间
current_time = datetime.datetime.now().time()
# 判断当前交易时段
is_call_auction = is_call_auction_time(current_time)
is_continuous = is_continuous_trading_time(current_time)
for stock_code, current_data in data.items():
if stock_code not in A.hsa:
continue
# 从Redis读取卖出计划
r = initialize_redis_manager(logger)
plan_info = r.get_sell_plan(stock_code)
if not plan_info or plan_info.get('is_active', 1) != 1:
continue
try:
# 获取当前价格和目标价格
current_price = current_data[0]['close']
target_price = plan_info['target_price']
# logger.info(f"[卖出监控] {stock_code} 当前价格: {current_price:.2f}, 目标价格: {target_price:.2f}")
# 判断是否满足卖出条件:当前价格高于目标价格
if current_price > target_price:
# 检查持仓Redis
r = initialize_redis_manager(logger)
current_position = r.get_position(stock_code)
if current_position <= 0:
logger.info(f"{stock_code} 无持仓,跳过卖出")
continue
# 检查是否有在途订单
if is_stock_pending_order(stock_code):
# logger.info(f"{stock_code} 有在途订单,跳过卖出")
continue
# 集合竞价时段:只观察,不下单
if is_call_auction:
logger.info(f"[集合竞价观察] {stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f},等待连续交易时段下单")
continue
# 连续交易时段:执行下单
if is_continuous:
logger.info(f"[连续交易下单] 满足卖出条件!{stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f}")
# 卖出数量:全部持仓
sell_volume = current_position
if sell_volume > 0:
logger.info(f"准备卖出 {stock_code} {sell_volume}股,金额约 {sell_volume * current_price:.2f}")
# 生成订单ID
order_id = f"SELL_{stock_code}_{int(time.time())}"
# 执行卖出订单
async_seq = xt_trader.order_stock_async(
acc,
stock_code,
xtconstant.STOCK_SELL,
sell_volume,
xtconstant.LATEST_PRICE,
target_price,
'auto_sell_strategy',
order_id
)
# 添加到在途订单(同时记录到数据库)
add_pending_order(stock_code, order_id, sell_volume, current_price, target_price, 'sell', logger)
logger.info(f"已提交卖出订单: {stock_code} {sell_volume}股,价格{target_price}订单ID: {order_id}")
update_account_funds_periodically(xt_trader, acc, logger)
else:
logger.warning(f"卖出数量为0跳过 {stock_code}")
else:
# 非交易时段
logger.info(f"[非交易时段] {stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f},等待交易时段")
except Exception as e:
logger.error(f"处理 {stock_code} 卖出行情数据时出错: {str(e)}")
continue
return f
def update_account_funds_periodically(xt_trader, acc, logger):
"""每小时更新账户资金信息到数据库"""
while True:
try:
# 获取账户资金信息
account_info = xt_trader.query_stock_asset(acc)
if account_info:
# 计算相关数据
available_cash = getattr(account_info, 'm_dCash', 0.0)
frozen_cash = getattr(account_info, 'm_dFrozenCash', 0.0)
market_value = getattr(account_info, 'm_dMarketValue', 0.0)
total_asset = available_cash + frozen_cash + market_value
# 尝试获取盈亏信息如果属性不存在则设为0
try:
profit_loss = getattr(account_info, 'm_dProfitLoss', 0.0)
except AttributeError:
# 如果m_dProfitLoss不存在尝试其他可能的属性名
profit_loss = getattr(account_info, 'm_dProfit', 0.0)
# 计算盈亏比例
if total_asset > 0:
profit_loss_ratio = (profit_loss / total_asset) * 100
else:
profit_loss_ratio = 0.0
# 更新到数据库
db_manager = initialize_database_manager(logger)
success = db_manager.update_account_funds(
account_id=acc.account_id,
account_type="acc.account_type",
total_asset=total_asset,
available_cash=available_cash,
frozen_cash=frozen_cash,
market_value=market_value,
profit_loss=profit_loss,
profit_loss_ratio=profit_loss_ratio
)
if success:
logger.info(f"账户资金更新成功 - 总资产: {total_asset:.2f}, 可用资金: {available_cash:.2f}, 市值: {market_value:.2f}")
else:
logger.warning("账户资金更新失败")
else:
logger.warning("无法获取账户资金信息")
except Exception as e:
logger.error(f"更新账户资金信息时出错: {str(e)}")
# 等待1小时3600秒
time.sleep(3600)
def start_account_funds_monitor(xt_trader, acc, logger):
"""启动账户资金监控线程(每小时更新一次)"""
try:
funds_thread = threading.Thread(
target=update_account_funds_periodically,
args=(xt_trader, acc, logger),
daemon=True
)
funds_thread.start()
logger.info("账户资金监控线程已启动(每小时更新一次)")
except Exception as e:
logger.error(f"启动账户资金监控失败: {str(e)}")
# ========== 每5分钟对账查询券商持仓并反向更新订单/持仓 ==========
def reconcile_orders_and_positions(xt_trader, acc, logger):
"""一次性对账(基于当日成交对比委托):
- 仅查询MySQL今日订单submitted、filled[部分]、pending、ordered
- 通过QMT接口获取当日成交明细/汇总,按股票累计成交量对比委托量:
* 当日成交量 >= 委托量 => 状态置为 completed
* 0 < 当日成交量 < 委托量 => 状态置为 filled部分成交记录 filled_quantity
* 当日成交量 == 0 => 不处理
- 本函数不更新持仓,持仓改由快照线程覆盖更新
"""
try:
dbm = initialize_database_manager(logger)
# 1) 读取今日未完成订单四种状态submitted、filled(部分成交)、pending、委托订单
# 你提到“委托订单”第四种状态名,这里按 'ordered' 兜底;如你表里具体值不同,可替换。
candidate_statuses = ['submitted', 'filled', 'pending', 'ordered']
orders = dbm.get_todays_orders_by_statuses(candidate_statuses)
if not orders:
return
# 2) 通过QMT查询当日成交结果并按股票聚合成交数量
filled_volume_by_code = {}
trades = xt_trader.query_stock_orders(acc, cancelable_only = False)
for tr in trades or []:
code = getattr(tr, 'stock_code', None) or getattr(tr, 'm_strInstrumentID', None)
vol = int(getattr(tr, 'traded_volume', 0) or getattr(tr, 'm_nVolume', 0) or 0)
if not code or vol <= 0:
continue
filled_volume_by_code[code] = filled_volume_by_code.get(code, 0) + vol
# 3) 遍历订单并根据“当日成交累计 vs 委托量”判断状态
for od in orders:
try:
stock_code = od['stock_code']
side = od['side']
order_id = od['order_id']
qty = int(od.get('order_quantity') or 0)
day_filled = int(filled_volume_by_code.get(stock_code, 0))
if day_filled <= 0:
continue
if day_filled >= qty and qty > 0:
# 全部成交 -> completed
dbm.update_order_status(order_id, 'completed', qty, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info(f"[对账-简] 全部成交(>=委托): {stock_code} {side} {day_filled}/{qty}")
elif 0 < day_filled < qty:
# 部分成交 -> filled部分成交
dbm.update_order_status(order_id, 'filled', day_filled, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
logger.info(f"[对账-简] 部分成交(<委托): {stock_code} {side} {day_filled}/{qty}")
except Exception as ie:
logger.warning(f"[对账-简] 处理订单失败 {od}: {ie}")
except Exception as e:
if logger:
logger.error(f"对账过程出错: {str(e)}")
def start_reconciliation_monitor(xt_trader, acc, logger, interval_seconds: int = 300):
"""启动对账后台线程默认每5分钟一次"""
try:
def worker():
while True:
try:
reconcile_orders_and_positions(xt_trader, acc, logger)
except Exception as ie:
logger.warning(f"对账线程异常: {ie}")
time.sleep(interval_seconds)
t = threading.Thread(target=worker, daemon=True)
t.start()
logger.info(f"对账线程已启动(每{interval_seconds}秒)")
except Exception as e:
logger.error(f"启动对账线程失败: {str(e)}")
# ========== 每5分钟持仓快照覆盖更新仅DB ==========
def refresh_positions_snapshot(xt_trader, acc, logger):
"""从QMT拉取当前持仓覆盖更新到DB的 trading_position
- 对存在的标的 upsert 所有持仓字段(数量、可用、冻结、成本价、市价、市值、盈亏等)
- 删除DB中多余的标的
本函数不操作Redis。
"""
try:
dbm = initialize_database_manager(logger)
positions = xt_trader.query_stock_positions(acc) or []
valid_codes = []
for pos in positions:
try:
code = pos.stock_code
# 获取持仓基本信息
total_qty = int(getattr(pos, 'volume', 0)) # 使用 'volume' 或 'm_nVolume'
available_qty = int(getattr(pos, 'can_use_volume', 0)) # 使用 'can_use_volume' 或 'm_nCanUseVolume'
# 直接使用API返回的冻结数量字段这比自己计算更准确
frozen_qty = int(getattr(pos, 'frozen_volume', 0)) # 使用 'frozen_volume' 或 'm_nFrozenVolume'
# 如果总持仓为0则没有处理的必要直接跳过
if total_qty == 0:
continue
# 2. 获取价格和市值信息
cost_price = float(getattr(pos, 'avg_price', 0.0)) # 使用 'avg_price' 或 'm_dAvgPrice'
market_value = float(getattr(pos, 'market_value', 0.0)) # 使用 'market_value' 或 'm_dMarketValue'
# 3. 根据现有字段计算缺失的信息
# 计算市价 (Market Price)
# 正如您所说,通过 市值 / 总数量 来计算并处理总数量为0的情况
market_price = market_value / total_qty if total_qty > 0 else 0.0
# 计算持仓成本 (Position Cost)
position_cost = cost_price * total_qty
# 计算持仓盈亏 (Profit/Loss)
# 盈亏 = 总市值 - 总成本
profit_loss = market_value - position_cost
# 计算盈亏比例 (Profit/Loss Ratio)
if position_cost > 0:
profit_loss_ratio = (profit_loss / position_cost) * 100
else:
profit_loss_ratio = 0.0 # 如果成本为0则盈亏比例也为0
# if total_qty > 0:
dbm.upsert_position_snapshot(
code, total_qty, available_qty, frozen_qty,
cost_price, market_price, market_value,
profit_loss, profit_loss_ratio
)
valid_codes.append(code)
except Exception as e:
logger.warning(f"处理持仓记录失败 {getattr(pos, 'stock_code', 'unknown')}: {e}")
continue
# 清理非持有标的
dbm.delete_positions_except(valid_codes)
logger.info(f"[持仓快照] 已覆盖更新 {len(valid_codes)} 条持仓记录")
except Exception as e:
if logger:
logger.error(f"持仓快照刷新失败: {str(e)}")
def start_position_snapshot_monitor(xt_trader, acc, logger, interval_seconds: int = 300):
try:
def worker():
while True:
try:
refresh_positions_snapshot(xt_trader, acc, logger)
except Exception as ie:
logger.warning(f"持仓快照线程异常: {ie}")
time.sleep(interval_seconds)
t = threading.Thread(target=worker, daemon=True)
t.start()
logger.info(f"持仓快照线程已启动(每{interval_seconds}秒)")
except Exception as e:
logger.error(f"启动持仓快照线程失败: {str(e)}")
# 为了向后兼容,保留原来的函数名
def create_strategy_callback(xt_trader, acc, buy_amount, logger):
"""创建策略回调函数(兼容性函数,实际调用买入回调)"""
return create_buy_strategy_callback(xt_trader, acc, buy_amount, logger)