diff --git a/requirements.txt b/requirements.txt index 8feab48..093a6d6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,4 @@ pandas==2.2.3 apscheduler==3.11.0 pymongo==4.13.0 scikit-learn==1.6.1 +dbutils==3.1.2 \ No newline at end of file diff --git a/src/QMT/config.yaml b/src/QMT/config.yaml new file mode 100644 index 0000000..345792c --- /dev/null +++ b/src/QMT/config.yaml @@ -0,0 +1,52 @@ +# QMT交易配置 +qmt_config: + # 客户端路径 + client_path: "D:\\SoftwareCenter\\中金财富QMT个人版模拟交易端38421\\userdata_mini" + + # 账户配置 + account: + account_id: "10839603" # 资金账号 + account_type: "STOCK" # 账号类型:STOCK(股票) CREDIT(信用) FUTURE(期货) + +# 数据库配置 +database: + host: "192.168.18.199" + port: 3306 + user: "root" + password: "Chlry#$.8" + database: "db_gp_cj" + +# 日志配置 +logging: + level: "INFO" + log_dir: "../logs" + log_format: "%(asctime)s - %(levelname)s - %(message)s" + +# Redis 配置 +redis: + host: "192.168.18.208" + port: 6379 + db: 13 + password: "wlkj2018" + socket_timeout: 5 + +# 计划同步配置(从MySQL同步到Redis) +plan_sync: + interval_seconds: 60 # 定时同步周期,秒 + +# 交易策略配置 +strategy: + # 买入配置 + buy: + amount: 50000 # 每只股票买入金额(元) + enabled: true # 是否启用买入策略 + + # 卖出配置 + sell: + enabled: true # 是否启用卖出策略 + # 卖出策略:当价格高于目标价时卖出全部持仓 + + # 订单配置 + order: + timeout: 300 # 订单超时时间(秒),默认5分钟 + cooldown: 60 # 同股票下单冷却时间(秒),避免频繁下单 \ No newline at end of file diff --git a/src/QMT/config_loader.py b/src/QMT/config_loader.py new file mode 100644 index 0000000..062cf79 --- /dev/null +++ b/src/QMT/config_loader.py @@ -0,0 +1,52 @@ +# coding:utf-8 +import os +import yaml +import sys + +def load_config(): + """加载配置文件""" + config_path = os.path.join(os.path.dirname(__file__), 'config.yaml') + try: + with open(config_path, 'r', encoding='utf-8') as f: + config = yaml.safe_load(f) + return config + except Exception as e: + print(f"加载配置文件失败: {str(e)}") + sys.exit(1) + +def get_qmt_config(): + """获取QMT配置""" + config = load_config() + return config['qmt_config'] + +def get_database_config(): + """获取数据库配置""" + config = load_config() + return config['database'] + +def get_logging_config(): + """获取日志配置""" + config = load_config() + return config['logging'] + +def get_strategy_config(): + """获取策略配置""" + config = load_config() + return config.get('strategy', {}) + +def get_database_config(): + """获取数据库配置""" + config = load_config() + print(config) + print("----------------------------------------------------------") + return config.get('database', {}) + +def get_redis_config(): + """获取Redis配置""" + config = load_config() + return config.get('redis', {}) + +def get_plan_sync_config(): + """获取计划同步配置""" + config = load_config() + return config.get('plan_sync', {'interval_seconds': 60}) \ No newline at end of file diff --git a/src/QMT/database_manager.py b/src/QMT/database_manager.py new file mode 100644 index 0000000..32f4347 --- /dev/null +++ b/src/QMT/database_manager.py @@ -0,0 +1,575 @@ +# coding:utf-8 +import pymysql +import logging +import datetime +import time +from typing import Dict, List, Optional, Tuple +from dbutils.pooled_db import PooledDB # 1. 引入 PooledDB +from config_loader import get_database_config + +# --- 连接池的配置 --- +# 2. 创建全局唯一的数据库连接池 +# 我们可以把连接池看作一个"连接的仓库",所有线程都从这里借用和归还连接 +try: + db_config = get_database_config() + pool = PooledDB( + creator=pymysql, # 指定使用 pymysql 作为数据库连接库 + mincached=2, # 初始化时,池中至少创建的空闲连接数 + maxcached=5, # 池中最多能存放的空闲连接数 + maxconnections=10, # 连接池允许的最大连接数 + blocking=True, # 连接池中无可用连接时,是否阻塞等待 + host=db_config['host'], + port=db_config['port'], + user=db_config['user'], + password=db_config['password'], + database=db_config['database'], + charset='utf8mb4', + autocommit=True, # 注意:这里设置 autocommit=True,每次执行完 SQL 会自动提交 + # 连接保活配置 + connect_timeout=10, + read_timeout=60, + write_timeout=60 + ) + # 获取一个初始化日志记录器 + initial_logger = logging.getLogger(__name__) + initial_logger.info("数据库连接池创建成功") +except Exception as e: + initial_logger = logging.getLogger(__name__) + initial_logger.error(f"数据库连接池创建失败: {str(e)}") + pool = None # 创建失败,则 pool 为 None + +def close_pool(): + """关闭连接池(在应用退出时调用)""" + if pool: + pool.close() + logging.getLogger(__name__).info("数据库连接池已关闭") + +class DatabaseManager: + """ + 数据库管理器类(已改造为使用连接池) + 现在这个类的每个实例都是轻量级的,并且线程安全。 + """ + + def __init__(self, logger=None): + self.logger = logger or logging.getLogger(__name__) + if pool is None: + raise Exception("数据库连接池未初始化,请检查配置") + # 3. 每个实例共享同一个连接池 + self.pool = pool + + def _get_connection(self): + """从连接池获取一个连接""" + return self.pool.connection() + + def execute_query(self, sql: str, params: tuple = None) -> List[Tuple]: + """执行查询SQL(从连接池获取连接)""" + # 4. 每次执行都从池中获取一个新连接,用完后自动归还 + conn = self._get_connection() + try: + with conn.cursor() as cursor: + cursor.execute(sql, params) + result = cursor.fetchall() + return result + except Exception as e: + self.logger.error(f"执行查询失败: {sql}, 错误: {str(e)}") + # 连接池会自动处理坏掉的连接,所以通常不需要复杂的重连逻辑 + raise + finally: + if conn: + # 5. conn.close() 在这里不是关闭连接,而是将连接【归还】给连接池 + conn.close() + + def execute_update(self, sql: str, params: tuple = None) -> int: + """执行更新SQL(从连接池获取连接)""" + conn = self._get_connection() + try: + with conn.cursor() as cursor: + affected_rows = cursor.execute(sql, params) + # 因为设置了 autocommit=True, 所以不需要手动 conn.commit() + return affected_rows + except Exception as e: + self.logger.error(f"执行更新失败: {sql}, 错误: {str(e)}") + raise + finally: + if conn: + # 同样,将连接归还给池 + conn.close() + + # ====================================================================== + # 注意:以下所有业务逻辑方法,都不需要做任何修改! + # 因为它们都依赖于 execute_query 和 execute_update,而这两个方法已经被改造好了。 + # ====================================================================== + + def get_active_buy_plans(self, strategy_id: int = 1) -> Dict[str, Dict]: + """获取激活的买入计划(仅当日最新记录)""" + sql = """ + SELECT t.stock_code, t.stock_name, t.target_price, t.buy_amount, t.trading_version + FROM trading_buy_plan t + JOIN ( + SELECT stock_code, MAX(update_time) AS max_ut + FROM trading_buy_plan + WHERE strategy_id=%s AND is_active=1 AND DATE(trading_time)=CURDATE() + GROUP BY stock_code + ) m ON t.stock_code=m.stock_code AND t.update_time=m.max_ut + WHERE t.strategy_id=%s AND t.is_active=1 + """ + try: + results = self.execute_query(sql, (strategy_id, strategy_id)) + buy_plans = {} + for row in results: + stock_code, stock_name, target_price, buy_amount, tver = row + buy_plans[stock_code] = { + 'stock_name': stock_name, + 'target_price': float(target_price), + 'buy_amount': float(buy_amount), + 'trading_version': int(tver or 0) + } + self.logger.info(f"加载买入计划: {len(buy_plans)}个") + return buy_plans + except Exception as e: + self.logger.error(f"获取买入计划失败: {str(e)}") + return {} + + def get_active_sell_plans(self, strategy_id: int = 1) -> Dict[str, Dict]: + """获取激活的卖出计划(仅当日最新记录)""" + sql = """ + SELECT t.stock_code, t.stock_name, t.target_price, t.sell_quantity, t.trading_version + FROM trading_sell_plan t + JOIN ( + SELECT stock_code, MAX(update_time) AS max_ut + FROM trading_sell_plan + WHERE strategy_id=%s AND is_active=1 AND DATE(trading_time)=CURDATE() + GROUP BY stock_code + ) m ON t.stock_code=m.stock_code AND t.update_time=m.max_ut + WHERE t.strategy_id=%s AND t.is_active=1 + """ + try: + results = self.execute_query(sql, (strategy_id, strategy_id)) + sell_plans = {} + for row in results: + stock_code, stock_name, target_price, sell_quantity, tver = row + sell_plans[stock_code] = { + 'stock_name': stock_name, + 'target_price': float(target_price), + 'sell_quantity': sell_quantity, + 'trading_version': int(tver or 0) + } + self.logger.info(f"加载卖出计划: {len(sell_plans)}个") + return sell_plans + except Exception as e: + self.logger.error(f"获取卖出计划失败: {str(e)}") + return {} + + # ===== Plan versioned read (optional trading_version column) ===== + def get_max_plan_version(self, strategy_id: int = 1) -> int: + """获取买/卖计划的最大版本号(需要表中存在 trading_version 列)。若列不存在返回0。""" + try: + buy_sql = "SELECT COALESCE(MAX(trading_version), 0) FROM trading_buy_plan WHERE strategy_id=%s AND is_active=1" + sell_sql = "SELECT COALESCE(MAX(trading_version), 0) FROM trading_sell_plan WHERE strategy_id=%s AND is_active=1" + bmax = self.execute_query(buy_sql, (strategy_id,))[0][0] + smax = self.execute_query(sell_sql, (strategy_id,))[0][0] + return max(int(bmax or 0), int(smax or 0)) + except Exception as e: + self.logger.warning(f"读取最大计划版本失败,可能缺少 trading_version 列: {str(e)}") + return 0 + + def get_buy_plans_since(self, strategy_id: int, version: int) -> Dict[str, Dict]: + """获取版本号大于给定值的买入计划(需要 trading_version 列)。列缺失时返回全部激活计划。""" + try: + sql = """ + SELECT stock_code, stock_name, target_price, buy_amount, trading_version + FROM trading_buy_plan WHERE strategy_id=%s AND is_active=1 AND trading_version>%s + """ + results = self.execute_query(sql, (strategy_id, version)) + plans = {} + for row in results: + stock_code, stock_name, target_price, buy_amount, tver = row + plans[stock_code] = { + 'stock_name': stock_name, + 'target_price': float(target_price), + 'buy_amount': float(buy_amount), + 'trading_version': int(tver or 0) + } + return plans + except Exception: + # 回退到全部激活计划 + return self.get_active_buy_plans(strategy_id) + + def get_sell_plans_since(self, strategy_id: int, version: int) -> Dict[str, Dict]: + """获取版本号大于给定值的卖出计划(需要 trading_version 列)。列缺失时返回全部激活计划。""" + try: + sql = """ + SELECT stock_code, stock_name, target_price, sell_quantity, trading_version + FROM trading_sell_plan WHERE strategy_id=%s AND is_active=1 AND trading_version>%s + """ + results = self.execute_query(sql, (strategy_id, version)) + plans = {} + for row in results: + stock_code, stock_name, target_price, sell_quantity, tver = row + plans[stock_code] = { + 'stock_name': stock_name, + 'target_price': float(target_price), + 'sell_quantity': sell_quantity, + 'trading_version': int(tver or 0) + } + return plans + except Exception: + return self.get_active_sell_plans(strategy_id) + + def insert_order(self, order_data: Dict) -> bool: + """插入订单记录""" + sql = """ + INSERT INTO trading_order + (order_id, strategy_id, stock_code, order_side, order_quantity, order_price, target_price, + order_status, order_time, order_remark) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + try: + params = ( + order_data['order_id'], + order_data['strategy_id'], + order_data['stock_code'], + order_data['order_side'], + order_data['order_quantity'], + order_data['order_price'], + order_data['target_price'], + order_data['order_status'], + order_data['order_time'], + order_data['order_remark'] + ) + self.execute_update(sql, params) + self.logger.info(f"订单记录已插入: {order_data['order_id']}") + return True + except Exception as e: + self.logger.error(f"插入订单记录失败: {str(e)}") + return False + + def update_order_status(self, order_id: str, status: str, filled_quantity: int = None, filled_time: str = None) -> bool: + """更新订单状态""" + try: + # 添加调试信息 + self.logger.info(f"准备更新订单状态: order_id={order_id} (类型: {type(order_id)}), status={status}") + + # 确保 order_id 是字符串类型 + order_id_str = str(order_id) + + if filled_quantity is not None and filled_time: + sql = """ + UPDATE trading_order + SET order_status = %s, filled_quantity = %s, filled_time = %s + WHERE order_id = %s OR qmt_order_id = %s + """ + params = (status, filled_quantity, filled_time, order_id_str, order_id_str) + else: + sql = """ + UPDATE trading_order + SET order_status = %s + WHERE order_id = %s OR qmt_order_id = %s + """ + params = (status, order_id_str, order_id_str) + + # 添加 SQL 调试信息 + self.logger.info(f"执行 SQL: {sql}") + self.logger.info(f"参数: {params}") + + affected_rows = self.execute_update(sql, params) + # if affected_rows > 0: + self.logger.info(f"订单状态已更新: {order_id_str} -> {status}") + return True + # else: + # self.logger.warning(f"订单状态更新失败,未找到订单: {order_id_str}") + # return False + except Exception as e: + self.logger.error(f"更新订单状态失败: {str(e)}") + return False + + def update_qmt_order_id(self, our_order_id: str, qmt_order_id: str) -> bool: + """更新QMT内部订单ID""" + try: + sql = """ + UPDATE trading_order + SET qmt_order_id = %s + WHERE order_id = %s + """ + params = (str(qmt_order_id), our_order_id) + + affected_rows = self.execute_update(sql, params) + # if affected_rows > 0: + self.logger.info(f"QMT订单ID已更新: {our_order_id} -> {qmt_order_id}") + return True + # else: + # self.logger.warning(f"更新QMT订单ID失败,未找到订单: {our_order_id}") + # return False + except Exception as e: + self.logger.error(f"更新QMT订单ID失败: {str(e)}") + return False + + def update_position(self, stock_code: str, quantity_change: int, price: float, is_buy: bool) -> bool: + """更新持仓""" + try: + # 先查询当前持仓 + sql_select = "SELECT total_quantity, cost_price FROM trading_position WHERE stock_code = %s" + results = self.execute_query(sql_select, (stock_code,)) + + if results: + # 更新现有持仓 + current_quantity, current_cost_price = results[0] + if is_buy: + # 买入:增加持仓 + new_quantity = current_quantity + quantity_change + if current_quantity > 0: + new_cost_price = (current_cost_price * current_quantity + price * quantity_change) / new_quantity + else: + new_cost_price = price + else: + # 卖出:减少持仓 + new_quantity = max(0, current_quantity - quantity_change) + new_cost_price = current_cost_price if new_quantity > 0 else 0 + + if new_quantity > 0: + sql_update = """ + UPDATE trading_position + SET total_quantity = %s, cost_price = %s, update_time = NOW() + WHERE stock_code = %s + """ + self.execute_update(sql_update, (new_quantity, new_cost_price, stock_code)) + else: + # 持仓为0,删除记录 + sql_delete = "DELETE FROM trading_position WHERE stock_code = %s" + self.execute_update(sql_delete, (stock_code,)) + else: + # 新增持仓(仅买入时) + if is_buy and quantity_change > 0: + sql_insert = """ + INSERT INTO trading_position + (stock_code, total_quantity, cost_price, create_time, update_time) + VALUES (%s, %s, %s, NOW(), NOW()) + """ + self.execute_update(sql_insert, (stock_code, quantity_change, price)) + + self.logger.info(f"持仓已更新: {stock_code} {'买入' if is_buy else '卖出'} {quantity_change}股") + return True + except Exception as e: + self.logger.error(f"更新持仓失败: {str(e)}") + return False + + def insert_trading_log(self, log_data: Dict) -> bool: + """插入交易日志""" + sql = """ + INSERT INTO trading_log + (order_id, stock_code, log_type, log_level, message, create_time) + VALUES (%s, %s, %s, %s, %s, %s) + """ + try: + params = ( + log_data.get('order_id'), + log_data.get('stock_code'), + log_data['log_type'], + log_data['log_level'], + log_data['message'], + log_data['create_time'] + ) + self.execute_update(sql, params) + return True + except Exception as e: + self.logger.error(f"插入交易日志失败: {str(e)}") + return False + + def get_current_positions(self) -> Dict[str, int]: + """获取当前持仓""" + sql = "SELECT stock_code, total_quantity FROM trading_position WHERE total_quantity > 0" + try: + results = self.execute_query(sql) + positions = {row[0]: row[1] for row in results} + self.logger.info(f"从数据库加载持仓: {positions}") + return positions + except Exception as e: + self.logger.error(f"获取当前持仓失败: {str(e)}") + return {} + + def get_open_orders(self) -> Dict[str, Dict]: + """获取未完成订单(pending/submitted)""" + sql = """ + SELECT order_id, stock_code, order_side, order_quantity, order_price, order_status, order_time + FROM trading_order + WHERE order_status IN ('pending','submitted') + """ + try: + results = self.execute_query(sql) + orders = {} + for row in results: + order_id, stock_code, side, qty, price, status, order_time = row + orders[stock_code] = { + 'order_id': order_id, + 'order_quantity': qty, + 'order_price': float(price) if price is not None else None, + 'status': status, + 'side': side, + 'order_time': order_time.strftime('%Y-%m-%d %H:%M:%S') if order_time else None + } + return orders + except Exception as e: + self.logger.error(f"获取未完成订单失败: {str(e)}") + return {} + + def get_todays_orders_by_statuses(self, statuses: List[str]) -> List[Dict]: + """获取今日指定状态集合的订单列表""" + if not statuses: + return [] + placeholders = ','.join(['%s'] * len(statuses)) + sql = f""" + SELECT order_id, stock_code, order_side, order_quantity, order_price, order_status, order_time + FROM trading_order + WHERE DATE(order_time)=CURDATE() AND order_status IN ({placeholders}) + ORDER BY order_time ASC + """ + try: + results = self.execute_query(sql, tuple(statuses)) + orders: List[Dict] = [] + for row in results: + order_id, stock_code, side, qty, price, status, order_time = row + orders.append({ + 'order_id': str(order_id), + 'stock_code': stock_code, + 'side': side, + 'order_quantity': int(qty) if qty is not None else 0, + 'order_price': float(price) if price is not None else None, + 'order_status': status, + 'order_time': order_time, + }) + return orders + except Exception as e: + self.logger.error(f"获取今日订单失败: {str(e)}") + return [] + + # ===== 持仓快照同步(覆盖式) ===== + def upsert_position_snapshot(self, stock_code: str, total_quantity: int, available_quantity: int, + frozen_quantity: int, cost_price: float, market_price: float, + market_value: float, profit_loss: float, profit_loss_ratio: float) -> None: + """按快照覆盖更新单条持仓(存在则更新,不存在则插入)""" + sql = """ + INSERT INTO trading_position + (stock_code, total_quantity, available_quantity, frozen_quantity, cost_price, + market_price, market_value, profit_loss, profit_loss_ratio, create_time, update_time) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) + ON DUPLICATE KEY UPDATE + total_quantity=VALUES(total_quantity), + available_quantity=VALUES(available_quantity), + frozen_quantity=VALUES(frozen_quantity), + cost_price=VALUES(cost_price), + market_price=VALUES(market_price), + market_value=VALUES(market_value), + profit_loss=VALUES(profit_loss), + profit_loss_ratio=VALUES(profit_loss_ratio), + update_time=NOW() + """ + self.execute_update(sql, (stock_code, total_quantity, available_quantity, frozen_quantity, + cost_price, market_price, market_value, profit_loss, profit_loss_ratio)) + + def delete_positions_except(self, stock_codes: List[str]) -> None: + """删除不在给定集合中的所有持仓记录(用于快照清理)""" + if not stock_codes: + # 如果为空,清空整表 + sql = "DELETE FROM trading_position" + self.execute_update(sql) + return + placeholders = ','.join(['%s'] * len(stock_codes)) + sql = f"DELETE FROM trading_position WHERE stock_code NOT IN ({placeholders})" + self.execute_update(sql, tuple(stock_codes)) + + def update_account_funds(self, account_id: str, account_type: str, total_asset: float, + available_cash: float, frozen_cash: float, market_value: float, + profit_loss: float, profit_loss_ratio: float) -> bool: + """更新账户资金信息""" + try: + sql = """ + INSERT INTO trading_account_funds + (account_id, account_type, total_asset, available_cash, frozen_cash, + market_value, profit_loss, profit_loss_ratio, update_time) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW()) + ON DUPLICATE KEY UPDATE + account_type = VALUES(account_type), + total_asset = VALUES(total_asset), + available_cash = VALUES(available_cash), + frozen_cash = VALUES(frozen_cash), + market_value = VALUES(market_value), + profit_loss = VALUES(profit_loss), + profit_loss_ratio = VALUES(profit_loss_ratio), + update_time = NOW() + """ + params = (account_id, account_type, total_asset, available_cash, + frozen_cash, market_value, profit_loss, profit_loss_ratio) + + affected_rows = self.execute_update(sql, params) + if affected_rows > 0: + self.logger.info(f"账户资金信息更新成功: {account_id}") + return True + else: + self.logger.warning(f"账户资金信息更新失败: {account_id}") + return False + except Exception as e: + self.logger.error(f"更新账户资金信息失败: {str(e)}") + return False + + def get_account_funds(self, account_id: str) -> Optional[Dict]: + """获取账户资金信息""" + try: + sql = "SELECT * FROM trading_account_funds WHERE account_id = %s ORDER BY update_time DESC LIMIT 1" + result = self.execute_query(sql, (account_id,)) + + if result: + row = result[0] + return { + 'id': row[0], + 'account_id': row[1], + 'account_type': row[2], + 'total_asset': float(row[3]), + 'available_cash': float(row[4]), + 'frozen_cash': float(row[5]), + 'market_value': float(row[6]), + 'profit_loss': float(row[7]), + 'profit_loss_ratio': float(row[8]), + 'update_time': row[9], + 'create_time': row[10] + } + return None + except Exception as e: + self.logger.error(f"获取账户资金信息失败: {str(e)}") + return None + + def get_account_funds_history(self, account_id: str, days: int = 7) -> List[Dict]: + """获取账户资金历史记录""" + try: + sql = """ + SELECT * FROM trading_account_funds + WHERE account_id = %s + AND update_time >= DATE_SUB(NOW(), INTERVAL %s DAY) + ORDER BY update_time DESC + """ + result = self.execute_query(sql, (account_id, days)) + + funds_history = [] + for row in result: + funds_history.append({ + 'id': row[0], + 'account_id': row[1], + 'account_type': row[2], + 'total_asset': float(row[3]), + 'available_cash': float(row[4]), + 'frozen_cash': float(row[5]), + 'market_value': float(row[6]), + 'profit_loss': float(row[7]), + 'profit_loss_ratio': float(row[8]), + 'update_time': row[9], + 'create_time': row[10] + }) + return funds_history + except Exception as e: + self.logger.error(f"获取账户资金历史记录失败: {str(e)}") + return [] + + def close(self): + """关闭数据库连接池(注意:这里不需要关闭,因为池是全局的)""" + # DBUtils.PooledDB 的池是全局的,不需要在这里关闭 + # 应该在应用退出时调用 close_pool() 函数 + pass \ No newline at end of file diff --git a/src/QMT/main.py b/src/QMT/main.py new file mode 100644 index 0000000..06e54b3 --- /dev/null +++ b/src/QMT/main.py @@ -0,0 +1,310 @@ +# coding:utf-8 +import time +import datetime +import logging +import os +import sys +from xtquant import xtdata +from xtquant.xttrader import XtQuantTrader +from xtquant.xttype import StockAccount + +# 导入自定义模块 +from config_loader import get_qmt_config, get_logging_config, get_strategy_config +from strategy import load_initial_positions, create_buy_strategy_callback, create_sell_strategy_callback, start_account_funds_monitor, start_reconciliation_monitor, start_position_snapshot_monitor +from redis_state_manager import RedisStateManager +from database_manager import DatabaseManager, close_pool +from trader_callback import MyXtQuantTraderCallback + +def setup_logging(): + """设置日志配置""" + log_config = get_logging_config() + + # 创建logs目录(如果不存在) + log_dir = os.path.join(os.path.dirname(__file__), '..', 'logs') + os.makedirs(log_dir, exist_ok=True) + + # 生成日志文件名(包含日期时间) + log_filename = f"qmt_auto_buy_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.log" + log_filepath = os.path.join(log_dir, log_filename) + + # 配置日志格式 + logging.basicConfig( + level=getattr(logging, log_config['level']), + format=log_config['log_format'], + handlers=[ + logging.FileHandler(log_filepath, encoding='utf-8'), + logging.StreamHandler(sys.stdout) # 同时输出到控制台 + ] + ) + + logger = logging.getLogger(__name__) + logger.info(f"日志文件已创建: {log_filepath}") + return logger + +def interact(): + """执行后进入repl模式""" + import code + code.InteractiveConsole(locals=globals()).interact() + +if __name__ == '__main__': + # 初始化日志 + logger = setup_logging() + + # 获取配置 + qmt_config = get_qmt_config() + strategy_config = get_strategy_config() + buy_amount = strategy_config.get('buy', {}).get('amount', 50000) + + logger.info("启动基于数据库的自动买入卖出策略") + logger.info("="*50) + + # 初始化计划:先从MySQL拉取“已发布计划”覆盖写入到Redis(一次性),然后读取Redis供订阅 + rsm = RedisStateManager(logger) + try: + dbm = DatabaseManager(logger) + db_buy = dbm.get_active_buy_plans(strategy_id=1) + db_sell = dbm.get_active_sell_plans(strategy_id=1) + # 清空Redis计划并全量覆盖 + rsm.clear_buy_plans() + rsm.clear_sell_plans() + for code, plan in db_buy.items(): + # 兼容数据库返回结构 + rsm.set_buy_plan(code, { + 'stock_name': plan.get('stock_name'), + 'target_price': plan['target_price'], + 'buy_amount': plan['buy_amount'], + 'is_active': 1 + }) + for code, plan in db_sell.items(): + rsm.set_sell_plan(code, { + 'stock_name': plan.get('stock_name'), + 'target_price': plan['target_price'], + 'sell_quantity': plan.get('sell_quantity'), + 'is_active': 1 + }) + logger.info(f"已用MySQL已发布计划初始化Redis,买入{len(db_buy)},卖出{len(db_sell)}") + # 设置已同步版本 + try: + max_ver = dbm.get_max_plan_version(strategy_id=1) + rsm.set_synced_version(max_ver) + logger.info(f"初始化同步版本设置为 {max_ver}") + except Exception as _e: + logger.warning(f"初始化同步版本设置失败: {_e}") + except Exception as e: + logger.warning(f"初始化Redis计划失败(将使用已有Redis计划):{str(e)}") + + buy_plans = rsm.get_all_buy_plans() + sell_plans = rsm.get_all_sell_plans() + logger.info(f"买入计划数量(Redis): {len(buy_plans)}") + logger.info(f"卖出计划数量(Redis): {len(sell_plans)}") + logger.info(f"每只股票买入金额: {buy_amount}元") + logger.info("="*50) + + # 显示买入计划 + if buy_plans: + logger.info("买入计划:") + for stock_code, plan_info in buy_plans.items(): + logger.info(f" {stock_code} ({plan_info['stock_name']}): 目标价格 {plan_info['target_price']:.2f}, 买入金额 {plan_info['buy_amount']:.2f}元") + + # 显示卖出计划 + if sell_plans: + logger.info("卖出计划:") + for stock_code, plan_info in sell_plans.items(): + sell_qty = plan_info['sell_quantity'] if plan_info['sell_quantity'] else '全部' + logger.info(f" {stock_code} ({plan_info['stock_name']}): 目标价格 {plan_info['target_price']:.2f}, 卖出数量 {sell_qty}") + + if not buy_plans and not sell_plans: + logger.warning("警告:买入和卖出计划都为空!请检查数据库配置。") + + logger.info("="*50) + + # 从配置文件获取客户端路径 + client_path = qmt_config['client_path'] + logger.info(f"客户端路径: {client_path}") + + # 生成session id 整数类型 同时运行的策略不能重复 + session_id = int(time.time()) + + xt_trader = XtQuantTrader(client_path, session_id) + # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定 + # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程 + # xt_trader.set_relaxed_response_order_enabled(True) + + # 从配置文件获取账户信息 + account_id = qmt_config['account']['account_id'] + account_type = qmt_config['account']['account_type'] + + # 创建资金账号对象 + acc = StockAccount(account_id, account_type) + logger.info(f"账户信息: {account_id} ({account_type})") + + # 创建交易回调类对象,并声明接收回调 + callback = MyXtQuantTraderCallback(logger) + xt_trader.register_callback(callback) + # 启动交易线程 + xt_trader.start() + # 建立交易连接,返回0表示连接成功 + connect_result = xt_trader.connect() + logger.info(f'建立交易连接,返回0表示连接成功 {connect_result}') + # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功 + subscribe_result = xt_trader.subscribe(acc) + logger.info(f'对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功 {subscribe_result}') + + # 查询账户资金 + account_info = xt_trader.query_stock_asset(acc) + print(account_info) + print("=========================================") + available_cash = account_info.m_dCash + logger.info(f"账户可用资金: {available_cash:.2f}元") + + # 启动时一次性加载并同步持仓:券商→Redis + initial_positions = load_initial_positions(xt_trader, acc, logger) + logger.info(f"初始持仓: {initial_positions}") + # 覆盖式同步DB未完成订单到Redis + try: + rsm = RedisStateManager(logger) + dbm = DatabaseManager(logger) + open_orders = dbm.get_open_orders() + rsm.clear_pending() + for stock_code, info in open_orders.items(): + rsm.set_pending(stock_code, info) + logger.info(f"已用DB未完成订单覆盖Redis在途订单,共 {len(open_orders)} 条") + except Exception as e: + logger.warning(f"Redis状态读取失败: {str(e)}") + + # 启动账户资金监控(每小时更新一次) + start_account_funds_monitor(xt_trader, acc, logger) + + # 停用订单超时监控(策略为价格触发挂单,挂单后不自动撤单) + logger.info("已停用订单超时监控与自动撤单逻辑:挂单成功后将持续等待成交") + + # 启动每10分钟对账线程(兜底:无回调或程序中断时),可在配置里调整频率 + start_reconciliation_monitor(xt_trader, acc, logger, interval_seconds=600) + + # 启动每10分钟持仓快照覆盖更新线程(只更新DB,不叠加) + start_position_snapshot_monitor(xt_trader, acc, logger, interval_seconds=600) + + # 下载股票数据 + xtdata.download_sector_data() + + # 创建买入策略回调函数 + buy_strategy_callback = create_buy_strategy_callback(xt_trader, acc, buy_amount, logger) + + # 创建卖出策略回调函数 + sell_strategy_callback = create_sell_strategy_callback(xt_trader, acc, logger) + + # 订阅买卖计划中的股票行情(基于Redis) + if buy_plans: + buy_code_list = list(buy_plans.keys()) + logger.info(f"开始订阅 {len(buy_code_list)} 只买入股票的行情...") + for code in buy_code_list: + xtdata.subscribe_quote(code, '1m', callback=buy_strategy_callback) + logger.info(f"已订阅买入 {code} ({buy_plans[code].get('stock_name','')}) 行情") + if sell_plans: + sell_code_list = list(sell_plans.keys()) + logger.info(f"开始订阅 {len(sell_code_list)} 只卖出股票的行情...") + for code in sell_code_list: + xtdata.subscribe_quote(code, '1m', callback=sell_strategy_callback) + logger.info(f"已订阅卖出 {code} ({sell_plans[code].get('stock_name','')}) 行情") + + # 计划同步线程:每60秒从MySQL同步到Redis,并直接更新订阅(不再分线程管理订阅) + import threading, time + def plan_sync_worker(): + interval = 60 # 固定60秒同步一次 + # 当前已订阅集合 + subscribed_buy = set(buy_plans.keys()) + subscribed_sell = set(sell_plans.keys()) + while True: + try: + time.sleep(interval) + dbm_local = DatabaseManager(logger) + # 拉取当日最新激活计划 + latest_buy = dbm_local.get_active_buy_plans(strategy_id=1) + latest_sell = dbm_local.get_active_sell_plans(strategy_id=1) + # 对比Redis,找出新增/删除/变更(版本或数值变化) + curr_buy = rsm.get_all_buy_plans() + curr_sell = rsm.get_all_sell_plans() + latest_buy_codes = set(latest_buy.keys()) + latest_sell_codes = set(latest_sell.keys()) + # 新增/删除 + add_buy = latest_buy_codes - set(curr_buy.keys()) + del_buy = set(curr_buy.keys()) - latest_buy_codes + add_sell = latest_sell_codes - set(curr_sell.keys()) + del_sell = set(curr_sell.keys()) - latest_sell_codes + # 变更(比较 trading_version 或关键字段) + changed_buy = set() + for code in (latest_buy_codes & set(curr_buy.keys())): + lb = latest_buy[code] + cb = curr_buy[code] + if ( + lb.get('trading_version', 0) != cb.get('trading_version', 0) + or lb['target_price'] != cb.get('target_price') + or lb['buy_amount'] != cb.get('buy_amount') + ): + changed_buy.add(code) + changed_sell = set() + for code in (latest_sell_codes & set(curr_sell.keys())): + ls = latest_sell[code] + cs = curr_sell[code] + if ( + ls.get('trading_version', 0) != cs.get('trading_version', 0) + or ls['target_price'] != cs.get('target_price') + or (ls.get('sell_quantity') != cs.get('sell_quantity')) + ): + changed_sell.add(code) + # 更新Redis + for code in add_buy | changed_buy: + rsm.set_buy_plan(code, latest_buy[code]) + for code in add_sell | changed_sell: + rsm.set_sell_plan(code, latest_sell[code]) + for code in del_buy: + rsm.del_buy_plan(code) + for code in del_sell: + rsm.del_sell_plan(code) + # 直接同步订阅(差分) + new_buy_sub = latest_buy_codes + new_sell_sub = latest_sell_codes + # 买入订阅差分 + for code in new_buy_sub - subscribed_buy: + xtdata.subscribe_quote(code, '1m', callback=buy_strategy_callback) + logger.info(f"[订阅新增-买入] {code}") + for code in subscribed_buy - new_buy_sub: + xtdata.unsubscribe_quote(code) + logger.info(f"[取消订阅-买入] {code}") + # 卖出订阅差分 + for code in new_sell_sub - subscribed_sell: + xtdata.subscribe_quote(code, '1m', callback=sell_strategy_callback) + logger.info(f"[订阅新增-卖出] {code}") + for code in subscribed_sell - new_sell_sub: + xtdata.unsubscribe_quote(code) + logger.info(f"[取消订阅-卖出] {code}") + subscribed_buy = new_buy_sub + subscribed_sell = new_sell_sub + logger.info("计划同步与订阅更新完成") + except Exception as e: + logger.warning(f"计划同步线程异常: {str(e)}") + + threading.Thread(target=plan_sync_worker, daemon=True).start() + + logger.info("策略启动完成,等待行情推送...") + if buy_plans: + logger.info("买入策略:当股票价格低于目标价格时,将自动买入指定金额") + if sell_plans: + logger.info("卖出策略:当股票价格高于目标价格时,将自动卖出指定数量") + logger.info("按 Ctrl+C 停止策略") + + # 阻塞主线程退出 + try: + xt_trader.run_forever() + except KeyboardInterrupt: + logger.info("策略已停止") + finally: + # 程序退出时清理连接池 + try: + close_pool() + logger.info("数据库连接池已清理") + except Exception as e: + logger.error(f"清理数据库连接池失败: {str(e)}") + + # 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里) + # interact() \ No newline at end of file diff --git a/src/QMT/redis_state_manager.py b/src/QMT/redis_state_manager.py new file mode 100644 index 0000000..9bf665f --- /dev/null +++ b/src/QMT/redis_state_manager.py @@ -0,0 +1,166 @@ +# coding:utf-8 +import redis +import json +import datetime +from typing import Dict, Optional, List +from config_loader import get_redis_config + +POSITIONS_KEY = 'qmt:positions' # Hash: stock_code -> quantity +PENDING_KEY = 'qmt:pending_orders' # Hash: stock_code -> json(order_info) +META_KEY = 'qmt:meta' # Hash: various metadata +PLAN_BUY_KEY = 'qmt:plan:buy' # Hash: stock_code -> json(plan) +PLAN_SELL_KEY = 'qmt:plan:sell' # Hash: stock_code -> json(plan) +PLAN_VERSION_KEY = 'qmt:plan:version' # String/integer: version bump on change +PLAN_SYNCED_VERSION_KEY = 'qmt:plan:synced_version' # 上次从DB同步的版本 + +class RedisStateManager: + def __init__(self, logger=None): + self.logger = logger + cfg = get_redis_config() + pool = redis.ConnectionPool( + host=cfg.get('host', 'localhost'), + port=cfg.get('port', 6379), + db=cfg.get('db', 0), + password=cfg.get('password'), + socket_timeout=cfg.get('socket_timeout', 5), + decode_responses=True, + ) + self.r = redis.Redis(connection_pool=pool) + + # Positions + def get_all_positions(self) -> Dict[str, int]: + data = self.r.hgetall(POSITIONS_KEY) + return {k: int(v) for k, v in data.items()} if data else {} + + def get_position(self, stock_code: str) -> int: + v = self.r.hget(POSITIONS_KEY, stock_code) + return int(v) if v is not None else 0 + + def set_position(self, stock_code: str, quantity: int) -> None: + if quantity > 0: + self.r.hset(POSITIONS_KEY, stock_code, quantity) + else: + self.r.hdel(POSITIONS_KEY, stock_code) + self._touch_meta('positions_updated_at') + + def incr_position(self, stock_code: str, delta: int) -> int: + new_qty = self.r.hincrby(POSITIONS_KEY, stock_code, delta) + if new_qty <= 0: + self.r.hdel(POSITIONS_KEY, stock_code) + new_qty = 0 + self._touch_meta('positions_updated_at') + return new_qty + + # Pending Orders + def get_all_pending(self) -> Dict[str, Dict]: + data = self.r.hgetall(PENDING_KEY) + return {k: json.loads(v) for k, v in data.items()} if data else {} + + def get_pending(self, stock_code: str) -> Optional[Dict]: + v = self.r.hget(PENDING_KEY, stock_code) + return json.loads(v) if v else None + + def set_pending(self, stock_code: str, order_info: Dict) -> None: + self.r.hset(PENDING_KEY, stock_code, json.dumps(order_info, ensure_ascii=False)) + self._touch_meta('pending_updated_at') + + def del_pending(self, stock_code: str) -> None: + self.r.hdel(PENDING_KEY, stock_code) + self._touch_meta('pending_updated_at') + + def clear_pending(self) -> None: + self.r.delete(PENDING_KEY) + self._touch_meta('pending_cleared_at') + + # Meta + def _touch_meta(self, field: str) -> None: + self.r.hset(META_KEY, field, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + + def clear_all(self) -> None: + pipe = self.r.pipeline() + pipe.delete(POSITIONS_KEY) + pipe.delete(PENDING_KEY) + pipe.delete('qmt_order_mapping') # 清理订单映射 + pipe.execute() + self._touch_meta('cleared_at') + + # 订单ID映射管理 + def set_qmt_order_mapping(self, qmt_order_id: str, our_order_id: str) -> None: + """建立QMT内部订单ID到我们订单ID的映射""" + self.r.hset('qmt_order_mapping', qmt_order_id, our_order_id) + + def get_our_order_id(self, qmt_order_id: str) -> Optional[str]: + """根据QMT内部订单ID获取我们的订单ID""" + return self.r.hget('qmt_order_mapping', qmt_order_id) + + def del_qmt_order_mapping(self, qmt_order_id: str) -> None: + """删除订单ID映射""" + self.r.hdel('qmt_order_mapping', qmt_order_id) + + # Plans (buy/sell) + def get_buy_plan(self, stock_code: str) -> Optional[Dict]: + v = self.r.hget(PLAN_BUY_KEY, stock_code) + return json.loads(v) if v else None + + def get_sell_plan(self, stock_code: str) -> Optional[Dict]: + v = self.r.hget(PLAN_SELL_KEY, stock_code) + return json.loads(v) if v else None + + def set_buy_plan(self, stock_code: str, plan: Dict) -> None: + self.r.hset(PLAN_BUY_KEY, stock_code, json.dumps(plan, ensure_ascii=False)) + self.bump_plan_version() + + def set_sell_plan(self, stock_code: str, plan: Dict) -> None: + self.r.hset(PLAN_SELL_KEY, stock_code, json.dumps(plan, ensure_ascii=False)) + self.bump_plan_version() + + def del_buy_plan(self, stock_code: str) -> None: + self.r.hdel(PLAN_BUY_KEY, stock_code) + self.bump_plan_version() + + def del_sell_plan(self, stock_code: str) -> None: + self.r.hdel(PLAN_SELL_KEY, stock_code) + self.bump_plan_version() + + def get_all_buy_plans(self) -> Dict[str, Dict]: + data = self.r.hgetall(PLAN_BUY_KEY) + return {k: json.loads(v) for k, v in data.items()} if data else {} + + def get_all_sell_plans(self) -> Dict[str, Dict]: + data = self.r.hgetall(PLAN_SELL_KEY) + return {k: json.loads(v) for k, v in data.items()} if data else {} + + def get_buy_codes(self) -> List[str]: + return list(self.r.hkeys(PLAN_BUY_KEY)) + + def get_sell_codes(self) -> List[str]: + return list(self.r.hkeys(PLAN_SELL_KEY)) + + def get_plan_version(self) -> int: + v = self.r.get(PLAN_VERSION_KEY) + try: + return int(v) if v is not None else 0 + except Exception: + return 0 + + def bump_plan_version(self) -> int: + return int(self.r.incr(PLAN_VERSION_KEY)) + + def clear_buy_plans(self) -> None: + self.r.delete(PLAN_BUY_KEY) + self.bump_plan_version() + + def clear_sell_plans(self) -> None: + self.r.delete(PLAN_SELL_KEY) + self.bump_plan_version() + + # synced version helpers + def get_synced_version(self) -> int: + v = self.r.get(PLAN_SYNCED_VERSION_KEY) + try: + return int(v) if v is not None else 0 + except Exception: + return 0 + + def set_synced_version(self, version: int) -> None: + self.r.set(PLAN_SYNCED_VERSION_KEY, version) diff --git a/src/QMT/strategy.py b/src/QMT/strategy.py new file mode 100644 index 0000000..389f4bb --- /dev/null +++ b/src/QMT/strategy.py @@ -0,0 +1,601 @@ +# coding:utf-8 +import time +import datetime +import threading +import logging +from xtquant import xtdata +from xtquant.xttrader import XtQuantTrader +from xtquant.xttype import StockAccount +from xtquant import xtconstant +from database_manager import DatabaseManager +from redis_state_manager import RedisStateManager + +def is_call_auction_time(now_time=None): + """ + 判断当前时间是否处于开盘集合竞价时段 + """ + if not now_time: + now_time = datetime.datetime.now().time() + + # 开盘集合竞价时间段 + call_auction_start = datetime.time(9, 15) + call_auction_end = datetime.time(9, 25) + + return call_auction_start <= now_time <= call_auction_end + +def is_continuous_trading_time(now_time=None): + """ + 判断当前时间是否处于连续交易时段 + """ + if not now_time: + now_time = datetime.datetime.now().time() + + # 上午连续交易时段 + morning_start = datetime.time(9, 30) + morning_end = datetime.time(11, 30) + + # 下午连续交易时段 + afternoon_start = datetime.time(13, 0) + afternoon_end = datetime.time(14, 57) + + is_morning = morning_start <= now_time <= morning_end + is_afternoon = afternoon_start <= now_time <= afternoon_end + + return is_morning or is_afternoon + +# 定义一个类 创建类的实例 作为状态的容器 +class _a(): + pass + +A = _a() +A.hsa = xtdata.get_stock_list_in_sector('沪深A股') +A.order_timeout = 300 # 订单超时时间(秒),默认5分钟 + +# 数据库管理器实例 +db_manager = None +redis_manager = None + +def initialize_database_manager(logger=None): + """初始化数据库管理器""" + global db_manager + if db_manager is None: + db_manager = DatabaseManager(logger) + return db_manager + +def initialize_redis_manager(logger=None): + """初始化Redis状态管理器""" + global redis_manager + if redis_manager is None: + redis_manager = RedisStateManager(logger) + return redis_manager + +def load_trading_plans_from_database(strategy_id=1, logger=None): + """已弃用:计划改为直接从Redis读取,保留此函数用于日志兼容""" + if logger: + logger.info("交易计划现改为从Redis读取,数据库仅做审计。") + return {}, {} + +def load_initial_positions(xt_trader, acc, logger): + """启动时一次性加载初始持仓状态(券商→Redis)并返回Redis视图""" + try: + # 先从QMT获取持仓 + positions = xt_trader.query_stock_positions(acc) + qmt_positions = {pos.stock_code: pos.m_nVolume for pos in positions if pos.m_nVolume > 0} + + # 从数据库获取持仓 + db_manager = initialize_database_manager(logger) + db_positions = db_manager.get_current_positions() + + # 将券商持仓写入Redis为权威值;DB中但券商为0的暂不写入,避免脏数据覆盖 + r = initialize_redis_manager(logger) + # 清理Redis中不存在于券商的持仓(设为0即删除) + for code in list(r.get_all_positions().keys()): + if code not in qmt_positions: + r.set_position(code, 0) + for code, qty in qmt_positions.items(): + r.set_position(code, qty) + + logger.info(f"初始持仓加载完成,已同步至Redis") + logger.info(f" QMT持仓: {qmt_positions}") + logger.info(f" 数据库持仓: {db_positions}") + return r.get_all_positions() + except Exception as e: + logger.error(f"加载初始持仓失败: {str(e)}") + A.positions = {} + return {} + +def update_position_in_memory(stock_code, quantity_change, is_buy=True, price=None, logger=None): + """以Redis为主更新持仓,同时更新数据库(不再维护内存镜像)""" + try: + # Redis持仓更新 + r = initialize_redis_manager(logger) + delta = quantity_change if is_buy else -quantity_change + new_qty = r.incr_position(stock_code, delta) + + # 更新数据库持仓 + if price is not None: + db_manager = initialize_database_manager(logger) + db_manager.update_position(stock_code, quantity_change, price, is_buy) + + if logger: + logger.info(f"持仓状态已更新: {stock_code} -> {new_qty}股(Redis)") + except Exception as e: + if logger: + logger.error(f"更新持仓状态失败: {str(e)}") + +def add_pending_order(stock_code, order_id, order_quantity, order_price, target_price, order_side='buy', logger=None): + """添加在途订单:写Redis为主,同时记录数据库""" + try: + order_info = { + 'order_id': order_id, + 'order_quantity': order_quantity, + 'order_price': order_price, + 'target_price': target_price, + 'order_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'status': 'pending', + 'side': order_side + } + # Redis + r = initialize_redis_manager(logger) + r.set_pending(stock_code, order_info) + + # 记录到数据库 + db_manager = initialize_database_manager(logger) + order_data = { + 'order_id': order_id, + 'strategy_id': 1, + 'stock_code': stock_code, + 'order_side': order_side, + 'order_quantity': order_quantity, + 'order_price': order_price, + 'target_price': target_price, + 'order_status': 'pending', + 'order_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'order_remark': f"{order_side.upper()}_{stock_code}_{int(time.time())}" + } + db_manager.insert_order(order_data) + + if logger: + logger.info(f"添加在途订单: {stock_code} -> {order_id}") + except Exception as e: + if logger: + logger.error(f"添加在途订单失败: {str(e)}") + +def remove_pending_order(stock_code, logger=None): + """移除在途订单,同时更新数据库状态""" + try: + r = initialize_redis_manager(logger) + order_info = r.get_pending(stock_code) + + if order_info is None: + if logger: + logger.warning(f"未找到在途订单: {stock_code}") + return + + r.del_pending(stock_code) + + # 更新数据库订单状态 + print("====================================") + db_manager = initialize_database_manager(logger) + db_manager.update_order_status(order_info['order_id'], 'completed') + + if logger: + logger.info(f"移除在途订单: {stock_code} -> {order_info['order_id']}") + except Exception as e: + if logger: + logger.error(f"移除在途订单失败: {str(e)}") + +def is_stock_pending_order(stock_code): + """检查股票是否有在途订单(以Redis为准)""" + r = initialize_redis_manager() + return r.get_pending(stock_code) is not None + +def check_order_timeout(xt_trader, acc, logger): + """已停用:本策略不再执行超时自动撤单,仅保留占位""" + logger.info("check_order_timeout 已停用:挂单后不再按超时撤单") + return + +def start_order_timeout_monitor(xt_trader, acc, logger): + """已停用:不再启动订单超时监控线程""" + logger.info("订单超时监控已停用:不启动后台监控线程") + return + +def create_buy_strategy_callback(xt_trader, acc, buy_amount, logger): + """创建买入策略回调函数""" + def f(data): + """ + 实时行情回调函数 + 检查当前价格是否低于目标价格,如果满足条件则买入 + """ + # logger.info(f"收到买入行情数据: {datetime.datetime.now()}") + + # 获取当前时间 + current_time = datetime.datetime.now().time() + + # 判断当前交易时段 + is_call_auction = is_call_auction_time(current_time) + is_continuous = is_continuous_trading_time(current_time) + + for stock_code, current_data in data.items(): + if stock_code not in A.hsa: + continue + + # 从Redis读取买入计划 + r = initialize_redis_manager(logger) + plan_info = r.get_buy_plan(stock_code) + if not plan_info or plan_info.get('is_active', 1) != 1: + continue + + try: + # 获取当前价格和目标价格 + current_price = current_data[0]['close'] + target_price = plan_info['target_price'] + buy_amount = plan_info['buy_amount'] + + # logger.info(f"[买入监控] {stock_code} 当前价格: {current_price:.2f}, 目标价格: {target_price:.2f}") + + # 判断是否满足买入条件:当前价格低于目标价格 + if current_price < target_price: + + # 检查持仓(Redis) + current_qty = r.get_position(stock_code) + if current_qty > 0: + # logger.info(f"{stock_code} 已有持仓 {current_qty}股,跳过买入") + continue + + # 检查是否有在途订单(Redis) + if is_stock_pending_order(stock_code): + logger.info(f"{stock_code} 有在途订单,跳过买入") + continue + + # 集合竞价时段:只观察,不下单 + if is_call_auction: + logger.info(f"[集合竞价观察] {stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f},等待连续交易时段下单") + continue + + # 连续交易时段:执行下单 + if is_continuous: + logger.info(f"[连续交易下单] 满足买入条件!{stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f}") + + # 计算买入数量(从数据库获取金额) + buy_volume = int(buy_amount / current_price / 100) * 100 # 取整为100的整数倍 + + if buy_volume > 0: + logger.info(f"准备买入 {stock_code} {buy_volume}股,金额约 {buy_volume * current_price:.2f}元") + + # 生成订单ID + order_id = f"BUY_{stock_code}_{int(time.time())}" + + # 执行买入订单 + async_seq = xt_trader.order_stock_async( + acc, + stock_code, + xtconstant.STOCK_BUY, + buy_volume, + xtconstant.FIX_PRICE, + target_price, + 'auto_buy_strategy', + order_id + ) + + # 添加到在途订单(同时记录到数据库) + add_pending_order(stock_code, order_id, buy_volume, current_price, target_price, 'buy', logger) + + logger.info(f"已提交买入订单: {stock_code} {buy_volume}股,价格{target_price},订单ID: {order_id}") + else: + logger.warning(f"买入数量计算为0,跳过 {stock_code}") + else: + # 非交易时段 + logger.info(f"[非交易时段] {stock_code} 当前价格 {current_price:.2f} < 目标价格 {target_price:.2f},等待交易时段") + + except Exception as e: + logger.error(f"处理 {stock_code} 买入行情数据时出错: {str(e)}") + continue + + return f + +def create_sell_strategy_callback(xt_trader, acc, logger): + """创建卖出策略回调函数""" + def f(data): + """ + 实时行情回调函数 + 检查当前价格是否高于目标价格,如果满足条件且持有该股票则卖出 + """ + # logger.info(f"收到卖出行情数据: {datetime.datetime.now()}") + + # 获取当前时间 + current_time = datetime.datetime.now().time() + + # 判断当前交易时段 + is_call_auction = is_call_auction_time(current_time) + is_continuous = is_continuous_trading_time(current_time) + + for stock_code, current_data in data.items(): + if stock_code not in A.hsa: + continue + + # 从Redis读取卖出计划 + r = initialize_redis_manager(logger) + plan_info = r.get_sell_plan(stock_code) + if not plan_info or plan_info.get('is_active', 1) != 1: + continue + + try: + # 获取当前价格和目标价格 + current_price = current_data[0]['close'] + target_price = plan_info['target_price'] + + # logger.info(f"[卖出监控] {stock_code} 当前价格: {current_price:.2f}, 目标价格: {target_price:.2f}") + + # 判断是否满足卖出条件:当前价格高于目标价格 + if current_price > target_price: + + # 检查持仓(Redis) + r = initialize_redis_manager(logger) + current_position = r.get_position(stock_code) + if current_position <= 0: + logger.info(f"{stock_code} 无持仓,跳过卖出") + continue + + # 检查是否有在途订单 + if is_stock_pending_order(stock_code): + logger.info(f"{stock_code} 有在途订单,跳过卖出") + continue + + # 集合竞价时段:只观察,不下单 + if is_call_auction: + logger.info(f"[集合竞价观察] {stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f},等待连续交易时段下单") + continue + + # 连续交易时段:执行下单 + if is_continuous: + logger.info(f"[连续交易下单] 满足卖出条件!{stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f}") + + # 卖出数量:全部持仓 + sell_volume = current_position + + if sell_volume > 0: + logger.info(f"准备卖出 {stock_code} {sell_volume}股,金额约 {sell_volume * current_price:.2f}元") + + # 生成订单ID + order_id = f"SELL_{stock_code}_{int(time.time())}" + # 执行卖出订单 + async_seq = xt_trader.order_stock_async( + acc, + stock_code, + xtconstant.STOCK_SELL, + sell_volume, + xtconstant.LATEST_PRICE, + target_price, + 'auto_sell_strategy', + order_id + ) + + # 添加到在途订单(同时记录到数据库) + add_pending_order(stock_code, order_id, sell_volume, current_price, target_price, 'sell', logger) + + logger.info(f"已提交卖出订单: {stock_code} {sell_volume}股,价格{target_price},订单ID: {order_id}") + else: + logger.warning(f"卖出数量为0,跳过 {stock_code}") + else: + # 非交易时段 + logger.info(f"[非交易时段] {stock_code} 当前价格 {current_price:.2f} > 目标价格 {target_price:.2f},等待交易时段") + + except Exception as e: + logger.error(f"处理 {stock_code} 卖出行情数据时出错: {str(e)}") + continue + + return f + +def update_account_funds_periodically(xt_trader, acc, logger): + """每小时更新账户资金信息到数据库""" + while True: + try: + # 获取账户资金信息 + account_info = xt_trader.query_stock_asset(acc) + + if account_info: + + # 计算相关数据 + available_cash = getattr(account_info, 'm_dCash', 0.0) + frozen_cash = getattr(account_info, 'm_dFrozenCash', 0.0) + market_value = getattr(account_info, 'm_dMarketValue', 0.0) + total_asset = available_cash + frozen_cash + market_value + + # 尝试获取盈亏信息,如果属性不存在则设为0 + try: + profit_loss = getattr(account_info, 'm_dProfitLoss', 0.0) + except AttributeError: + # 如果m_dProfitLoss不存在,尝试其他可能的属性名 + profit_loss = getattr(account_info, 'm_dProfit', 0.0) + + # 计算盈亏比例 + if total_asset > 0: + profit_loss_ratio = (profit_loss / total_asset) * 100 + else: + profit_loss_ratio = 0.0 + + # 更新到数据库 + db_manager = initialize_database_manager(logger) + success = db_manager.update_account_funds( + account_id=acc.account_id, + account_type="acc.account_type", + total_asset=total_asset, + available_cash=available_cash, + frozen_cash=frozen_cash, + market_value=market_value, + profit_loss=profit_loss, + profit_loss_ratio=profit_loss_ratio + ) + + if success: + logger.info(f"账户资金更新成功 - 总资产: {total_asset:.2f}, 可用资金: {available_cash:.2f}, 市值: {market_value:.2f}") + else: + logger.warning("账户资金更新失败") + else: + logger.warning("无法获取账户资金信息") + + except Exception as e: + logger.error(f"更新账户资金信息时出错: {str(e)}") + + # 等待1小时(3600秒) + time.sleep(3600) + +def start_account_funds_monitor(xt_trader, acc, logger): + """启动账户资金监控线程(每小时更新一次)""" + try: + funds_thread = threading.Thread( + target=update_account_funds_periodically, + args=(xt_trader, acc, logger), + daemon=True + ) + funds_thread.start() + logger.info("账户资金监控线程已启动(每小时更新一次)") + except Exception as e: + logger.error(f"启动账户资金监控失败: {str(e)}") + +# ========== 每5分钟对账:查询券商持仓并反向更新订单/持仓 ========== +def reconcile_orders_and_positions(xt_trader, acc, logger): + """一次性对账(基于当日成交对比委托): + - 仅查询MySQL今日订单(submitted、filled[部分]、pending、ordered) + - 通过QMT接口获取当日成交明细/汇总,按股票累计成交量对比委托量: + * 当日成交量 >= 委托量 => 状态置为 completed + * 0 < 当日成交量 < 委托量 => 状态置为 filled(部分成交),记录 filled_quantity + * 当日成交量 == 0 => 不处理 + - 本函数不更新持仓,持仓改由快照线程覆盖更新 + """ + try: + dbm = initialize_database_manager(logger) + # 1) 读取今日未完成订单(四种状态:submitted、filled(部分成交)、pending、委托订单) + # 你提到“委托订单”第四种状态名,这里按 'ordered' 兜底;如你表里具体值不同,可替换。 + candidate_statuses = ['submitted', 'filled', 'pending', 'ordered'] + orders = dbm.get_todays_orders_by_statuses(candidate_statuses) + + if not orders: + return + + # 2) 通过QMT查询当日成交结果,并按股票聚合成交数量 + filled_volume_by_code = {} + trades = xt_trader.query_stock_orders(acc, cancelable_only = False) + + for tr in trades or []: + code = getattr(tr, 'stock_code', None) or getattr(tr, 'm_strInstrumentID', None) + vol = int(getattr(tr, 'traded_volume', 0) or getattr(tr, 'm_nVolume', 0) or 0) + if not code or vol <= 0: + continue + filled_volume_by_code[code] = filled_volume_by_code.get(code, 0) + vol + + # 3) 遍历订单并根据“当日成交累计 vs 委托量”判断状态 + for od in orders: + try: + stock_code = od['stock_code'] + side = od['side'] + order_id = od['order_id'] + qty = int(od.get('order_quantity') or 0) + day_filled = int(filled_volume_by_code.get(stock_code, 0)) + + if day_filled <= 0: + continue + + if day_filled >= qty and qty > 0: + # 全部成交 -> completed + dbm.update_order_status(order_id, 'completed', qty, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + logger.info(f"[对账-简] 全部成交(>=委托): {stock_code} {side} {day_filled}/{qty}") + elif 0 < day_filled < qty: + # 部分成交 -> filled(部分成交) + dbm.update_order_status(order_id, 'filled', day_filled, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) + logger.info(f"[对账-简] 部分成交(<委托): {stock_code} {side} {day_filled}/{qty}") + except Exception as ie: + logger.warning(f"[对账-简] 处理订单失败 {od}: {ie}") + + except Exception as e: + if logger: + logger.error(f"对账过程出错: {str(e)}") + +def start_reconciliation_monitor(xt_trader, acc, logger, interval_seconds: int = 300): + """启动对账后台线程(默认每5分钟一次)""" + try: + def worker(): + while True: + try: + reconcile_orders_and_positions(xt_trader, acc, logger) + except Exception as ie: + logger.warning(f"对账线程异常: {ie}") + time.sleep(interval_seconds) + + t = threading.Thread(target=worker, daemon=True) + t.start() + logger.info(f"对账线程已启动(每{interval_seconds}秒)") + except Exception as e: + logger.error(f"启动对账线程失败: {str(e)}") + +# ========== 每5分钟持仓快照覆盖更新(仅DB) ========== +def refresh_positions_snapshot(xt_trader, acc, logger): + """从QMT拉取当前持仓,覆盖更新到DB的 trading_position: + - 对存在的标的 upsert 所有持仓字段(数量、可用、冻结、成本价、市价、市值、盈亏等) + - 删除DB中多余的标的 + 本函数不操作Redis。 + """ + try: + dbm = initialize_database_manager(logger) + positions = xt_trader.query_stock_positions(acc) or [] + valid_codes = [] + for pos in positions: + try: + code = pos.stock_code + # 获取持仓基本信息 + total_qty = int(getattr(pos, 'm_nVolume', 0) or 0) + available_qty = int(getattr(pos, 'm_nCanUseVolume', 0) or 0) + frozen_qty = max(0, total_qty - available_qty) # 冻结数量 = 总数量 - 可用数量 + + # 获取价格和市值信息 + cost_price = float(getattr(pos, 'm_dOpenPrice', 0.0) or 0.0) + market_price = float(getattr(pos, 'm_dLastPrice', 0.0) or 0.0) + market_value = float(getattr(pos, 'm_dInstrumentValue', 0.0) or 0.0) + + # 获取盈亏信息 + profit_loss = float(getattr(pos, 'm_dPositionProfit', 0.0) or 0.0) + position_cost = float(getattr(pos, 'm_dPositionCost', 0.0) or 0.0) + + # 计算盈亏比例 + if position_cost > 0: + profit_loss_ratio = (profit_loss / position_cost) * 100 + else: + profit_loss_ratio = 0.0 + + if total_qty > 0: + dbm.upsert_position_snapshot( + code, total_qty, available_qty, frozen_qty, + cost_price, market_price, market_value, + profit_loss, profit_loss_ratio + ) + valid_codes.append(code) + except Exception as e: + logger.warning(f"处理持仓记录失败 {getattr(pos, 'stock_code', 'unknown')}: {e}") + continue + # 清理非持有标的 + dbm.delete_positions_except(valid_codes) + logger.info(f"[持仓快照] 已覆盖更新 {len(valid_codes)} 条持仓记录") + except Exception as e: + if logger: + logger.error(f"持仓快照刷新失败: {str(e)}") + +def start_position_snapshot_monitor(xt_trader, acc, logger, interval_seconds: int = 300): + try: + def worker(): + while True: + try: + refresh_positions_snapshot(xt_trader, acc, logger) + except Exception as ie: + logger.warning(f"持仓快照线程异常: {ie}") + time.sleep(interval_seconds) + t = threading.Thread(target=worker, daemon=True) + t.start() + logger.info(f"持仓快照线程已启动(每{interval_seconds}秒)") + except Exception as e: + logger.error(f"启动持仓快照线程失败: {str(e)}") + +# 为了向后兼容,保留原来的函数名 +def create_strategy_callback(xt_trader, acc, buy_amount, logger): + """创建策略回调函数(兼容性函数,实际调用买入回调)""" + return create_buy_strategy_callback(xt_trader, acc, buy_amount, logger) diff --git a/src/QMT/trader_callback.py b/src/QMT/trader_callback.py new file mode 100644 index 0000000..50245b2 --- /dev/null +++ b/src/QMT/trader_callback.py @@ -0,0 +1,184 @@ +# coding:utf-8 +import datetime +import sys +from xtquant.xttrader import XtQuantTraderCallback +from strategy import update_position_in_memory, remove_pending_order +from database_manager import DatabaseManager + +class MyXtQuantTraderCallback(XtQuantTraderCallback): + def __init__(self, logger): + self.logger = logger + self.db_manager = DatabaseManager(logger) + + def on_disconnected(self): + """ + 连接断开 + :return: + """ + self.logger.warning(f"{datetime.datetime.now()} 连接断开回调") + + def on_stock_order(self, order): + """ + 委托回报推送 + :param order: XtOrder对象 + :return: + """ + self.logger.info(f"{datetime.datetime.now()} 委托回调 {order.order_remark}") + + # 更新在途订单状态和数据库 + try: + # 从订单备注中提取股票代码 + order_remark = order.order_remark + if '_' in order_remark: + stock_code = order_remark.split('_')[1] # 修正:取第二段作为股票代码 + + # 检查Redis中是否存在该在途订单 + from strategy import initialize_redis_manager + r = initialize_redis_manager(self.logger) + order_info = r.get_pending(stock_code) + + if order_info: + # 确保 order_id 是字符串类型 + qmt_order_id_str = str(order.order_id) + our_order_id = order_info.get('order_id') + + # 先更新QMT订单ID映射 + if our_order_id: + self.db_manager.update_qmt_order_id(our_order_id, qmt_order_id_str) + + # 更新数据库订单状态 + self.db_manager.update_order_status(qmt_order_id_str, 'submitted') + + # 记录交易日志 + log_data = { + 'order_id': our_order_id or qmt_order_id_str, + 'stock_code': stock_code, + 'log_type': 'order_submitted', + 'log_level': 'INFO', + 'message': f'订单已提交: {order.order_remark}', + 'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + } + self.db_manager.insert_trading_log(log_data) + + self.logger.info(f"订单状态更新为已提交: {stock_code}") + else: + self.logger.warning(f"未找到在途订单: {stock_code}") + except Exception as e: + self.logger.error(f"更新订单状态失败: {str(e)}") + + def on_stock_trade(self, trade): + """ + 成交变动推送 + :param trade: XtTrade对象 + :return: + """ + self.logger.info(f"{datetime.datetime.now()} 成交回调 {trade.order_remark} 成交价格 {trade.traded_price} 成交数量 {trade.traded_volume}") + + # 更新内存中的持仓状态和数据库 + try: + # 根据订单备注判断是买入还是卖出 + is_buy = True # 默认买入 + if trade.order_remark.startswith('SELL_'): + is_buy = False + elif trade.order_remark.startswith('BUY_'): + is_buy = True + + # 记录交易方向 + trade_direction = "买入" if is_buy else "卖出" + self.logger.info(f"识别交易方向: {trade_direction}") + + # 更新内存和数据库持仓状态 + update_position_in_memory(trade.stock_code, trade.traded_volume, is_buy, trade.traded_price, self.logger) + + # 确保 order_id 是字符串类型 + order_id_str = str(trade.order_id) + + # 更新数据库订单状态 + self.db_manager.update_order_status( + order_id_str, + 'filled', + trade.traded_volume, + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + ) + + # 记录交易日志 + log_data = { + 'order_id': order_id_str, + 'stock_code': trade.stock_code, + 'log_type': 'trade_filled', + 'log_level': 'INFO', + 'message': f'{trade_direction}成交: {trade.stock_code} {trade.traded_volume}股 @ {trade.traded_price}元', + 'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + } + self.db_manager.insert_trading_log(log_data) + + # 从在途订单中移除 + remove_pending_order(trade.stock_code, self.logger) + + except Exception as e: + self.logger.error(f"更新持仓状态失败: {str(e)}") + + def on_order_error(self, order_error): + """ + 委托失败推送 + :param order_error:XtOrderError 对象 + :return: + """ + self.logger.error(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}") + + # 从在途订单中移除失败的订单,并更新数据库 + try: + order_remark = order_error.order_remark + if '_' in order_remark: + stock_code = order_remark.split('_')[1] # 修正:取第二段作为股票代码 + + # 确保 order_id 是字符串类型 + order_id_str = str(order_error.order_id) + + # 更新数据库订单状态 + self.db_manager.update_order_status(order_id_str, 'failed') + + # 记录错误日志 + log_data = { + 'order_id': order_id_str, + 'stock_code': stock_code, + 'log_type': 'order_failed', + 'log_level': 'ERROR', + 'message': f'订单失败: {order_error.error_msg}', + 'create_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + } + self.db_manager.insert_trading_log(log_data) + + remove_pending_order(stock_code, self.logger) + except Exception as e: + self.logger.error(f"处理失败订单失败: {str(e)}") + + def on_cancel_error(self, cancel_error): + """ + 撤单失败推送 + :param cancel_error: XtCancelError 对象 + :return: + """ + self.logger.error(f"{datetime.datetime.now()} {sys._getframe().f_code.co_name}") + + def on_order_stock_async_response(self, response): + """ + 异步下单回报推送 + :param response: XtOrderResponse 对象 + :return: + """ + self.logger.info(f"异步委托回调 {response.order_remark}") + + def on_cancel_order_stock_async_response(self, response): + """ + :param response: XtCancelOrderResponse 对象 + :return: + """ + self.logger.info(f"{datetime.datetime.now()} {sys._getframe().f_code.co_name}") + + def on_account_status(self, status): + """ + :param response: XtAccountStatus 对象 + :return: + """ + self.logger.info(f"{datetime.datetime.now()} {sys._getframe().f_code.co_name}") \ No newline at end of file diff --git a/src/quantitative_analysis/financial_data_collector.py b/src/quantitative_analysis/financial_data_collector.py index 3b50b2f..3b1d890 100644 --- a/src/quantitative_analysis/financial_data_collector.py +++ b/src/quantitative_analysis/financial_data_collector.py @@ -127,7 +127,7 @@ class FinancialDataCollectorV2: List[str]: 股票代码列表 """ try: - query = "SELECT DISTINCT gp_code_two FROM gp_code_all_copy1 WHERE gp_code_two IS NOT NULL AND gp_code_two != ''" + query = "SELECT DISTINCT gp_code_two FROM gp_code_all WHERE gp_code_two IS NOT NULL AND gp_code_two != ''" with self.mysql_engine.connect() as conn: df = pd.read_sql(text(query), conn) @@ -224,11 +224,11 @@ class FinancialDataCollectorV2: # 简化逻辑:直接从2025年开始往前推21个季度 base_year = 2025 base_quarters = [ - (2025, 3), (2024, 12), (2024, 9), (2024, 6), (2024, 3), + (2025, 6), (2025, 3), (2024, 12), (2024, 9), (2024, 6), (2024, 3), (2023, 12), (2023, 9), (2023, 6), (2023, 3), (2022, 12), (2022, 9), (2022, 6), (2022, 3), (2021, 12), (2021, 9), (2021, 6), (2021, 3), - (2020, 12), (2020, 9), (2020, 6), (2020, 3) + (2020, 12), (2020, 9), (2020, 6) ] if i < len(base_quarters): @@ -538,125 +538,10 @@ class FinancialDataCollectorV2: except Exception as e: logger.error(f"保存数据到MongoDB失败: {str(e)}") return False - - def check_missing_data(self, stock_code: str) -> List[str]: - """ - 检查MongoDB中哪些报告期的资产负债表或现金流量表数据为空 - - Args: - stock_code: 股票代码 - - Returns: - List[str]: 需要更新的报告期列表 - """ - try: - # 查询该股票的所有记录 - records = list(self.collection.find({'stock_code': stock_code})) - - missing_periods = [] - - for record in records: - balance_empty = not record.get('balance_sheet') or record.get('balance_sheet') == {} - cash_empty = not record.get('cash_flow_statement') or record.get('cash_flow_statement') == {} - - # 如果资产负债表或现金流量表为空,则需要更新 - if balance_empty or cash_empty: - missing_periods.append(record.get('report_date')) - logger.debug(f"发现需要更新的数据: {stock_code} - {record.get('report_date')} (资产负债表空: {balance_empty}, 现金流量表空: {cash_empty})") - - if missing_periods: - logger.info(f"股票 {stock_code} 有 {len(missing_periods)} 个报告期需要更新数据") - else: - logger.info(f"股票 {stock_code} 的数据完整,无需更新") - - return missing_periods - - except Exception as e: - logger.error(f"检查缺失数据失败: {str(e)}") - return [] - - def update_missing_financial_data(self, stock_code: str, missing_periods: List[str]) -> bool: - """ - 更新缺失的财务数据(只更新资产负债表和现金流量表) - - Args: - stock_code: 股票代码 - missing_periods: 需要更新的报告期列表 - - Returns: - bool: 是否更新成功 - """ - try: - if not missing_periods: - return True - - logger.info(f"开始更新股票 {stock_code} 缺失的财务数据") - - # 获取资产负债表和现金流量表数据 - balance_data = self.fetch_balance_sheet(stock_code, periods=21) - time.sleep(1) - - cash_data = self.fetch_cash_flow_statement(stock_code, periods=21) - time.sleep(1) - - # 创建按报告日期索引的字典 - balance_dict = {item['REPORT_DATE'][:10]: item for item in balance_data if item.get('REPORT_DATE')} - cash_dict = {item['REPORT_DATE'][:10]: item for item in cash_data if item.get('REPORT_DATE')} - - updated_count = 0 - - for report_date in missing_periods: - try: - # 查找当前记录 - current_record = self.collection.find_one({ - 'stock_code': stock_code, - 'report_date': report_date - }) - - if not current_record: - logger.warning(f"未找到记录: {stock_code} - {report_date}") - continue - - # 准备更新的字段 - update_fields = {} - - # 检查是否需要更新资产负债表 - balance_empty = not current_record.get('balance_sheet') or current_record.get('balance_sheet') == {} - if balance_empty and report_date in balance_dict: - update_fields['balance_sheet'] = balance_dict[report_date] - logger.debug(f"更新资产负债表: {stock_code} - {report_date}") - - # 检查是否需要更新现金流量表 - cash_empty = not current_record.get('cash_flow_statement') or current_record.get('cash_flow_statement') == {} - if cash_empty and report_date in cash_dict: - update_fields['cash_flow_statement'] = cash_dict[report_date] - logger.debug(f"更新现金流量表: {stock_code} - {report_date}") - - # 如果有字段需要更新 - if update_fields: - update_fields['collect_time'] = datetime.datetime.now() # 更新采集时间 - - self.collection.update_one( - {'stock_code': stock_code, 'report_date': report_date}, - {'$set': update_fields} - ) - updated_count += 1 - logger.info(f"成功更新: {stock_code} - {report_date}") - - except Exception as e: - logger.error(f"更新记录失败: {stock_code} - {report_date} - {str(e)}") - continue - - logger.info(f"股票 {stock_code} 更新完成,共更新 {updated_count} 个报告期") - return True - - except Exception as e: - logger.error(f"更新缺失财务数据失败: {str(e)}") - return False def collect_financial_data(self, stock_code: str, periods: int = 21) -> bool: """ - 采集单只股票的财务数据 - 增量更新模式 + 采集单只股票的财务数据 Args: stock_code: 股票代码,如'300750.SZ' @@ -666,20 +551,37 @@ class FinancialDataCollectorV2: bool: 是否采集成功 """ try: - logger.info(f"开始检查股票 {stock_code} 的财务数据") + logger.info(f"开始采集股票 {stock_code} 的财务数据({periods}个报告期)") + + # 获取三张财务报表数据 + profit_data = self.fetch_profit_statement(stock_code, periods) + time.sleep(1) # 避免请求过于频繁 + + balance_data = self.fetch_balance_sheet(stock_code, periods) + time.sleep(1) + + cash_data = self.fetch_cash_flow_statement(stock_code, periods) + time.sleep(1) - # 检查哪些报告期的数据缺失 - missing_periods = self.check_missing_data(stock_code) + # 检查至少有一张表有数据 + if not any([profit_data, balance_data, cash_data]): + logger.error(f"股票 {stock_code} 没有获取到任何财务数据") + return False - if not missing_periods: - logger.info(f"股票 {stock_code} 数据完整,跳过") - return True + # 处理财务数据 + financial_data_list = self.process_financial_data( + stock_code, profit_data, balance_data, cash_data + ) - # 更新缺失的数据 - success = self.update_missing_financial_data(stock_code, missing_periods) + if not financial_data_list: + logger.error(f"股票 {stock_code} 的财务数据处理失败") + return False + + # 保存到MongoDB + success = self.save_to_mongodb(financial_data_list) if success: - logger.info(f"股票 {stock_code} 的财务数据更新完成") + logger.info(f"股票 {stock_code} 的财务数据采集完成") return success @@ -689,19 +591,19 @@ class FinancialDataCollectorV2: def batch_collect_financial_data(self, stock_codes: List[str], periods: int = 21) -> Dict: """ - 批量更新多只股票的缺失财务数据 + 批量采集多只股票的财务数据 Args: stock_codes: 股票代码列表 periods: 获取多少个报告期,默认21个季度 Returns: - Dict: 更新结果统计 + Dict: 采集结果统计 """ - results = {'success': 0, 'failed': 0, 'failed_stocks': [], 'skipped': 0} + results = {'success': 0, 'failed': 0, 'failed_stocks': []} total_stocks = len(stock_codes) - logger.info(f"开始批量检查和更新 {total_stocks} 只股票的财务数据") + logger.info(f"开始批量采集 {total_stocks} 只股票的财务数据") for index, stock_code in enumerate(stock_codes, 1): try: @@ -712,11 +614,11 @@ class FinancialDataCollectorV2: success = self.collect_financial_data(stock_code, periods) if success: results['success'] += 1 - logger.info(f"SUCCESS [{index}/{total_stocks}] {stock_code} 处理成功") + logger.info(f"SUCCESS [{index}/{total_stocks}] {stock_code} 采集成功") else: results['failed'] += 1 results['failed_stocks'].append(stock_code) - logger.warning(f"FAILED [{index}/{total_stocks}] {stock_code} 处理失败") + logger.warning(f"FAILED [{index}/{total_stocks}] {stock_code} 采集失败") # 每只股票之间暂停一下,避免请求过于频繁 time.sleep(2) @@ -735,7 +637,7 @@ class FinancialDataCollectorV2: continue success_rate = (results['success'] / total_stocks) * 100 - logger.info(f"批量更新完成: 成功{results['success']}只,失败{results['failed']}只,成功率: {success_rate:.2f}%") + logger.info(f"批量采集完成: 成功{results['success']}只,失败{results['failed']}只,成功率: {success_rate:.2f}%") if results['failed_stocks']: logger.info(f"失败的股票数量: {len(results['failed_stocks'])}") @@ -757,7 +659,7 @@ class FinancialDataCollectorV2: def main(): - """主函数 - 批量更新所有股票的缺失财务数据""" + """主函数 - 批量采集所有股票的财务数据""" collector = FinancialDataCollectorV2() try: @@ -771,32 +673,30 @@ def main(): logger.info(f"从数据库获取到 {len(stock_codes)} 只股票") - # 可以选择处理所有股票或者部分股票进行测试 + # 可以选择采集所有股票或者部分股票进行测试 # 如果要测试,可以取前几只股票 - # 测试模式:只处理前10只股票 - TEST_MODE = False # 设置为False将处理所有股票 + # 测试模式:只采集前10只股票 + TEST_MODE = False # 设置为False将采集所有股票 if TEST_MODE: test_count = min(10, len(stock_codes)) # 最多取10只股票测试 stock_codes = stock_codes[:test_count] - logger.info(f"TEST MODE: 仅处理前 {test_count} 只股票") + logger.info(f"TEST MODE: 仅采集前 {test_count} 只股票") else: - logger.info(f"PRODUCTION MODE: 将处理全部 {len(stock_codes)} 只股票") + logger.info(f"PRODUCTION MODE: 将采集全部 {len(stock_codes)} 只股票") - logger.info(f"开始批量检查和更新 {len(stock_codes)} 只股票的财务数据") - logger.info("注意: 本次运行为增量更新模式,只会更新缺失的资产负债表和现金流量表数据") + logger.info(f"开始批量采集 {len(stock_codes)} 只股票的财务数据") - # 批量更新 + # 批量采集 results = collector.batch_collect_financial_data(stock_codes, periods=21) # 输出最终结果 print(f"\n{'='*50}") - print(f"批量更新完成统计") + print(f"批量采集完成统计") print(f"{'='*50}") - print(f"SUCCESS 成功处理: {results['success']} 只股票") - print(f"FAILED 处理失败: {results['failed']} 只股票") + print(f"SUCCESS 成功采集: {results['success']} 只股票") + print(f"FAILED 采集失败: {results['failed']} 只股票") print(f"SUCCESS RATE 成功率: {(results['success'] / len(stock_codes) * 100):.2f}%") - print(f"\n说明: 成功处理包括数据完整(无需更新)和成功更新缺失数据的股票") if results['failed_stocks']: print(f"\n失败的股票列表:") @@ -811,7 +711,7 @@ def main(): logger.info("用户中断程序执行") print("\n警告: 程序被用户中断") except Exception as e: - logger.error(f"更新过程中出现错误: {str(e)}") + logger.error(f"采集过程中出现错误: {str(e)}") print(f"\n错误: 程序执行出错: {str(e)}") finally: collector.close_connection() diff --git a/src/quantitative_analysis/hk_stock_price_collector.py b/src/quantitative_analysis/hk_stock_price_collector.py index cd9b796..f5865f1 100644 --- a/src/quantitative_analysis/hk_stock_price_collector.py +++ b/src/quantitative_analysis/hk_stock_price_collector.py @@ -194,24 +194,35 @@ def fetch_and_store_hk_stock_data(page_size=90, max_workers=10, use_proxy=False) def format_hk_stock_code(stock_code): """ - 统一港股代码格式,支持0700.HK、HK0700等 - 返回雪球格式(如0700.HK)和Redis存储格式 + 统一港股代码格式,支持0700.HK、HK0700、00700等 + 返回雪球格式(如0700.HK)和Redis存储格式(如00700) """ stock_code = stock_code.upper() if '.HK' in stock_code: - return stock_code, stock_code + code = stock_code.replace('.HK', '') + # 确保是5位数字格式 + if code.isdigit(): + return f'{code.zfill(5)}.HK', code.zfill(5) + else: + return stock_code, code.zfill(5) elif stock_code.startswith('HK'): code = stock_code[2:] - return f'{code}.HK', f'{code}.HK' + if code.isdigit(): + return f'{code.zfill(5)}.HK', code.zfill(5) + else: + return stock_code, code.zfill(5) else: - # 假设是纯数字,添加.HK后缀 - return f'{stock_code}.HK', f'{stock_code}.HK' + # 假设是纯数字,添加.HK后缀并确保5位格式 + if stock_code.isdigit(): + return f'{stock_code.zfill(5)}.HK', stock_code.zfill(5) + else: + return stock_code, stock_code def get_hk_stock_realtime_info_from_redis(stock_code): """ 根据港股代码从Redis查询实时行情,并封装为指定结构。 - :param stock_code: 支持0700.HK、HK0700等格式 + :param stock_code: 支持0700.HK、HK0700、00700等格式 :return: dict or None """ _, redis_code = format_hk_stock_code(stock_code) @@ -246,7 +257,7 @@ def get_hk_stock_realtime_info_from_redis(stock_code): result["crawlDate"] = data.get("fetch_time") result["marketValue"] = data.get("market_capital") result["maxPrice"] = data.get("high") if "high" in data else data.get("high52w") - result["minPrice"] = data.get("low") if "low" in data else data.get("high52w") + result["minPrice"] = data.get("low") if "low" in data else data.get("low52w") result["nowPrice"] = data.get("current") result["pbRate"] = data.get("pb") result["rangeRiseAndFall"] = data.get("percent") diff --git a/src/quantitative_analysis/tech_fundamental_factor_strategy.py b/src/quantitative_analysis/tech_fundamental_factor_strategy.py index 3e454bb..1d53e67 100644 --- a/src/quantitative_analysis/tech_fundamental_factor_strategy.py +++ b/src/quantitative_analysis/tech_fundamental_factor_strategy.py @@ -7,11 +7,10 @@ """ import sys -import pymongo import pandas as pd import numpy as np import logging -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Tuple from pathlib import Path from sqlalchemy import create_engine, text from datetime import datetime @@ -190,7 +189,7 @@ class TechFundamentalFactorStrategy: logger.info(f"计算 {len(stock_codes)} 只股票的通用因子") results = [] - latest_date = "2025-03-31" # 最新季度数据 + latest_date = "2025-06-30" # 最新季度数据 annual_date = "2024-12-31" # 年报数据 for stock_code in stock_codes: @@ -214,7 +213,7 @@ class TechFundamentalFactorStrategy: # 3. 前五大供应商占比(使用年报数据) supplier_conc = self.financial_analyzer.analyze_supplier_concentration(stock_code, annual_date) factor_data['supplier_concentration'] = supplier_conc - + # 4. 前五大客户占比(使用年报数据) customer_conc = self.financial_analyzer.analyze_customer_concentration(stock_code, annual_date) factor_data['customer_concentration'] = customer_conc @@ -247,7 +246,7 @@ class TechFundamentalFactorStrategy: logger.info(f"计算 {len(stock_codes)} 只成长期股票的特色因子") results = [] - latest_date = "2025-03-31" # 使用最新数据 + latest_date = "2025-06-30" # 使用最新数据 annual_date = "2024-12-31" # 使用年度数据 for stock_code in stock_codes: @@ -306,7 +305,7 @@ class TechFundamentalFactorStrategy: try: logger.info(f"计算 {len(stock_codes)} 只成熟期股票的特色因子") - latest_date = "2025-03-31" # 使用最新数据 + latest_date = "2025-06-30" # 使用最新数据 # 在循环外获取全A股PB和ROE数据,避免重复查询 logger.info("获取全A股PB数据...") @@ -670,7 +669,7 @@ def main(): try: print("=== 科技主题基本面因子选股策略 ===") print("数据说明:") - print("- 毛利率、净利润增长率等:使用最新数据 (2025-03-31)") + print("- 毛利率、净利润增长率等:使用最新数据 (2025-06-30)") print("- 供应商客户集中度、折旧摊销、研发费用:使用年报数据 (2024-12-31)") print() diff --git a/src/scripts/config.py b/src/scripts/config.py index c662c80..ce9ed01 100644 --- a/src/scripts/config.py +++ b/src/scripts/config.py @@ -11,7 +11,7 @@ XUEQIU_HEADERS = { 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Client-Version': 'v2.44.75', - 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; Hm_lvt_1db88642e346389874251b5a1eded6e3=1754636834; xq_a_token=4ea8af8f9cb5850af2ba654c5255cbf6bf797b39; xqat=4ea8af8f9cb5850af2ba654c5255cbf6bf797b39; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzU3NjM3OTA4LCJjdG0iOjE3NTUwNDU5MDg0ODksImNpZCI6ImQ5ZDBuNEFadXAifQ.jAeKlW2r1xRuyoZ3cuy2rTgfSoW_79wGJeuup7I7sZMSH5QeRDJrGx5JWXO4373YRpNW0qnAXR51Ygd8Plmko1u99iN8LifGzyMtblXDPgs17aS0zyHr6cMAsURU984wCXkmZxdvRMCHdevc8XWNHnuqeGfQNSgBSdO6Zv7Xc5-t965TJba96UOsNBpv2GghV9B2mcrUQyW3edi9kRAN_Fxmx5M1Iri4Yfppcaj-VSZYkdZtUpizrN5BbVYujcnQjj4kceUYYAl3Ccs273KVNSMFKpHMIOJcMJATY6PRgLvbEu8_ttIfBnbG4mmZ71bU7RXigleXIj1qhcDL2rDzQQ; xq_r_token=2b5db3e3897cb3e46b8fa2fa384471b334ec59cb; acw_tc=ac11000117550489614555169ef3ec63ec008e1bfba0fe5321bc8a30c2deb8; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1755050072; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=kE/XuROkIJ4APDhfxUYOb9lRiDFNJT8KxiXYwJAuoCeNlkaxlcytBSuiCXGjqxhydALLguC/FB4qIXfLut408Q%3D%3D; ssxmod_itna=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuuqxGeR0qxPehAjqDsqze4GzDiLPGhDBWAFdYjw7ivq8RG5pxMBrGHuhLMEzoiqds=iCpxusTh1K/Zjw=KmoeK4xGLDY=DCTKq1QeD4S3Dt4DIDAYDDxDWU4DLDYoDY3nYxGP=xpWTcmRbD0YDzqDgUEe=xi3DA4DjnehqYiTdwDDBDGtO=9aDG4GfSmDD0wDLoGQQoDGWnCneE6mkiFIr6TTDjqPD/Shc59791vGW56CM9zo3paFDtqD90aAFn=GrvFaE_n93e4F4qibH7GYziTmrzt4xmrKi44mBDmAQQ0TKe4Bxq3DPQDhtTH7OY8FYS_Qqx4Gn/lHDcnDd7YerPCYC70bYbC42q3i8WTx3e8/rijIosDPRGrE0WdoqzGh_YPeD; ssxmod_itna2=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0P6Dw1PtDCuuqxGeR0qxPehAe4DWhYeRonANsR7vNG4g2BxpQTxTiD', + 'Cookie': 'cookiesu=811743062689927; device_id=33fa3c7fca4a65f8f4354e10ed6b7470; smidV2=20250327160437f244626e8b47ca2a7992f30f389e4e790074ae48656a22f10; HMACCOUNT=8B64A2E3C307C8C0; s=c611ttmqlj; xq_is_login=1; u=8493411634; bid=4065a77ca57a69c83405d6e591ab5449_m8r2nhs8; __utma=1.434320573.1747189698.1747189698.1747189698.1; __utmc=1; __utmz=1.1747189698.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); snbim_minify=true; _c_WBKFRo=dsWgHR8i8KGPbIyhFlN51PHOzVuuNytvUAFppfkD; _nb_ioWEgULi=; aliyungf_tc=00c6b999835b16cea9e4a6aab36cca373a0976bf55ee74770d11f421f7119ad8; Hm_lvt_1db88642e346389874251b5a1eded6e3=1757464900; xq_a_token=975ab9f15a4965b9e557b9bc6f33bc1da20a0f49; xqat=975ab9f15a4965b9e557b9bc6f33bc1da20a0f49; xq_id_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJ1aWQiOjg0OTM0MTE2MzQsImlzcyI6InVjIiwiZXhwIjoxNzYwMjU2NzM4LCJjdG0iOjE3NTc2NjQ3Mzg4MjUsImNpZCI6ImQ5ZDBuNEFadXAifQ.TMx4-TjKx96j5h6-EGiRIM2WKtJm1xctZhYidc40Em0pRcr0UBHAKBGl3No5r1BElYa9qnEDgNYI0Zv137Inx-EMPqm5cd1Z_ZjLdWOSLzT9qqBj8zdfuqJwP2nCYvC6KLjd8BvykS0vSFKqwb-r0WhEA3OzbO8teVNsaemdKAhBoIyP3-RQCfRxJ9RLNha1ZMdg66iZvfz_SOsG41y8IA9yyl-FFFJOq4TnAiywY1yO1QIJJhkh8YQqfnDfQQdSIFgJGToU980Lw1dm4aCDY-kvn-t18KjrL_hZJ_UNN65bgZsSsuWf-VQ7wsjjczNrfBYAHdZ6kES0CGo9g8IZZw; xq_r_token=c209224335327f29fc555d9910b43c0df6d52d5a; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1757774901; .thumbcache_f24b8bbe5a5934237bbc0eda20c1b6e7=rJD0qKtipTMjRBkLBVEyXbl0CiVeY7y4AxEZC0Vf6Zkou9cxp0NPsxwSrnOyFyBMr+Ws5/nJDO1NUalRDyAPsA%3D%3D; acw_tc=3ccdc14717578973700474899e2dc1c35b6358f1af81617250f8f00b4cf31c; ssxmod_itna=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0qRHhqDylAkiETFN1t42Y5D/KlYeDZDGFdDqx0Ei6FiYHK1ezjCGbKSAQY5P53Niio89NQ7DEQm6fjL1S4K7s5h8KRDo9n4hiDB3DbqDymgY5qxGGA4GwDGoD34DiDDPDb8rDALeD7qDFnenropTDm4GWneGfDDoDYbT3xiUYDDUvbeG2iET4DDN4bIGYZ2G76=r1doBip29xKiTDjqPD/ShUoiuzZKC4icFL2/amAeGyC5GuY6mWHQ77SWcbscAV70i8hx_Bx3rKqB5YGDRYqK8o2xY9iKR0YRDxeEDW0DWnQ8EwhDDiP46iRiGDYZtgrNMhXiY4MQA7bAilP4nPkFGCmqzBqGYesQGQiT3ihKbm5CexbxxD; ssxmod_itna2=1-eqGxBDnGKYuxcD4kDRgxYq7ueYKS8DBP01Dp2xQyP08D60DB40Q0qRHhqDylAkiETFN1t42YeDA4rYnRItORCitz/D3nyGQigbiD', 'Referer': 'https://weibo.com/u/7735765253', 'Sec-Ch-Ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Google Chrome";v="122"', 'Sec-Ch-Ua-Mobile': '?0', diff --git a/src/scripts/us_stock_code_collector.py b/src/scripts/us_stock_code_collector.py new file mode 100644 index 0000000..5a0dbdb --- /dev/null +++ b/src/scripts/us_stock_code_collector.py @@ -0,0 +1,100 @@ +# coding:utf-8 +# 更新港股列表的代码 + +import requests +import pandas as pd +from sqlalchemy import create_engine, text +import sys +import os + +# 将项目根目录添加到Python路径,以便导入config +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from src.scripts.config import XUEQIU_HEADERS + +def collect_us_stock_codes(db_url): + """ + 采集雪球港股列表数据,并存储到数据库。 + """ + engine = create_engine(db_url) + headers = XUEQIU_HEADERS + base_url = "https://stock.xueqiu.com/v5/stock/screener/quote/list.json" + page = 1 + page_size = 90 + all_data = [] + + print("--- Starting to collect Hong Kong stock codes ---") + + # 采集前先清空表 + try: + with engine.begin() as conn: + conn.execute(text("TRUNCATE TABLE gp_code_us")) + print("Table `gp_code_us` has been truncated.") + except Exception as e: + print(f"Error truncating table `gp_code_us`: {e}") + return + + while True: + params = { + 'page': page, + 'size': page_size, + 'order': 'desc', + 'order_by': 'market_capital', + 'market': 'US', + 'type': 'us', + 'is_delay': 'true' + } + + print(f"Fetching page {page}...") + try: + response = requests.get(base_url, headers=headers, params=params, timeout=20) + if response.status_code != 200: + print(f"Request failed with status code {response.status_code}") + break + + data = response.json() + if data.get('error_code') != 0: + print(f"API error: {data.get('error_description')}") + break + + stock_list = data.get('data', {}).get('list', []) + if not stock_list: + print("No more data found. Collection finished.") + break + + all_data.extend(stock_list) + + # 如果获取到的数据少于每页数量,说明是最后一页 + if len(stock_list) < page_size: + print("Reached the last page. Collection finished.") + break + + page += 1 + + except requests.exceptions.RequestException as e: + print(f"Request exception on page {page}: {e}") + break + + if all_data: + print(f"--- Collected a total of {len(all_data)} stocks. Preparing to save to database. ---") + df = pd.DataFrame(all_data) + + # 数据映射和转换 + df_to_save = pd.DataFrame() + df_to_save['gp_name'] = df['name'] + df_to_save['gp_code'] = df['symbol'] + df_to_save['gp_code_two'] = 'US.' + df['symbol'].astype(str) + df_to_save['market_cap'] = df['market_capital'] + + try: + df_to_save.to_sql('gp_code_us', engine, if_exists='append', index=False) + print("--- Successfully saved all data to `gp_code_us`. ---") + except Exception as e: + print(f"Error saving data to database: {e}") + else: + print("--- No data collected. ---") + + engine.dispose() + +if __name__ == "__main__": + db_url = 'mysql+pymysql://root:Chlry#$.8@192.168.18.199:3306/db_gp_cj' + collect_us_stock_codes(db_url) \ No newline at end of file diff --git a/src/temp.csv b/src/temp.csv deleted file mode 100644 index 72b2abf..0000000 --- a/src/temp.csv +++ /dev/null @@ -1,118 +0,0 @@ -20230508_20240508下跌后快速反弹走势,,,,,,,, -take_profit_pct,avg_profit_per_trade,median_profit_per_trade,avg_holding_days,trade_win_rate,stock_win_rate,total_trades,stock_count, -30,-4712,0,100.8,49.8,53.1,1815,405, -29,-5206,0,100,49.9,53.3,1818,405, -28,-6335,0,99.2,50,53.6,1822,405, -27,-6590,0,98,50.2,53.6,1827,405, -26,-6865,0,96.7,50.5,54.1,1833,405, -25,-6648,0,95.4,51,54.3,1836,405, -24,-6931,0,93.9,51.4,54.1,1835,403, -23,-5709,0,91.9,52.2,54.5,1837,404, -22,-6556,0,90.7,52.4,54.6,1852,405, -21,-5242,0,88.3,53.1,55.3,1858,405, -20,-5913,0,86.6,53.7,55.3,1866,405, -19,-5848,0,84.1,54.5,55.6,1876,405, -18,-4698,0,81.5,55.6,56.3,1882,405, -17,-5508,0,79.3,56.4,56.3,1888,405, -16,-4942,0,76.6,57.5,57,1897,405, -15,-5598,0,74.1,58.6,58.5,1903,405, -14,-6530,0,71,59.6,58.5,1910,405, -13,-6346,0,68.3,61,58.8,1919,405, -12,-6840,0,65.3,62.6,58.8,1926,405, -11,-7236,0,62.5,64.4,58.5,1931,405, -10,-7611,0,59.3,66.1,58.8,1939,405, -9,-9419,0,55.9,67.5,59,1949,405, -8,-9855,0,52.7,69.4,59,1958,405, -7,-10832,0,49.2,71.4,58.8,1969,405, -6,-11762,0,45.3,73.7,59.8,1973,405, -5,-13031,0,41.6,75.9,60.5,1978,405, -,,,,,,,, -,,,,,,,, -20230508_20240205下跌走势,,,,,,,, -take_profit_pct,avg_profit_per_trade,median_profit_per_trade,avg_holding_days,trade_win_rate,stock_win_rate,total_trades,stock_count, -30,-120990,0,75.6,10.9,8.8,1465,396, -29,-120762,0,75.4,10.9,8.6,1466,396, -28,-120539,0,75.1,11.1,8.3,1468,396, -27,-119956,0,74.8,11.4,9.1,1469,396, -26,-118942,0,74.3,11.6,9.6,1471,396, -25,-117868,0,73.8,11.9,9.3,1471,396, -24,-117173,0,73.3,12.3,9.3,1471,396, -23,-114536,0,72.2,13,9.6,1474,396, -22,-113484,0,71.5,13.3,9.8,1479,396, -21,-109829,0,69.9,14.4,10.9,1482,396, -20,-108867,0,69,15,10.9,1486,396, -19,-106925,0,67.9,15.8,11.4,1492,396, -18,-104300,0,66.7,16.8,11.4,1496,396, -17,-102194,0,65.5,17.7,12.6,1499,396, -16,-99208,0,64,18.9,12.9,1506,396, -15,-97766,0,62.6,19.8,13.1,1511,396, -14,-96754,0,61.2,20.6,13.1,1515,396, -13,-93449,0,59.5,22.3,13.9,1522,396, -12,-91252,0,57.7,23.9,14.6,1526,396, -11,-88804,0,56.1,25.3,15.2,1527,396, -10,-84415,0,53.6,27.9,17.4,1535,396, -9,-80819,0,50.9,30.7,19.7,1543,396, -8,-77402,0,48.2,33.2,21.5,1547,396, -7,-72609,0,45.1,36.2,23.5,1556,396, -6,-67130,0,41.5,40.1,27,1559,396, -5,-63089,0,38.3,43.3,27,1563,396, -,,,,,,,, -,,,,,,,, -20221028_20230508上涨走势,,,,,,,, -take_profit_pct,avg_profit_per_trade,median_profit_per_trade,avg_holding_days,trade_win_rate,stock_win_rate,total_trades,stock_count, -30,-26094,0,84.9,35.4,39.4,1022,343, -29,-24312,0,83.6,36.2,40.2,1027,343, -28,-24443,0,82.7,36.7,40.5,1031,343, -27,-23021,0,81.2,37.2,41.1,1034,343, -26,-22763,0,80,37.7,41.4,1036,343, -25,-21882,0,78,38.5,42,1040,343, -24,-22570,0,77.1,38.9,41.4,1042,343, -23,-22085,0,75.5,39.6,42,1048,343, -22,-21214,0,74.1,40.4,43.4,1053,343, -21,-20730,0,72.7,41.3,44,1059,343, -20,-18191,0,70.6,42.5,44.6,1064,343, -19,-17678,0,68.8,43.4,43.7,1068,343, -18,-18481,0,67.2,44,43.1,1072,343, -17,-16782,0,64.9,45.4,44.9,1076,343, -16,-16490,0,62.9,46.4,45.8,1081,343, -15,-15946,0,60.4,47.7,46.4,1089,343, -14,-16037,0,58.4,48.9,47.2,1095,343, -13,-16701,0,56.4,49.8,48.1,1100,343, -12,-16062,0,54,51.3,49,1106,343, -11,-16000,0,51.4,52.9,49.6,1110,343, -10,-15453,0,49.1,54.4,49.6,1114,343, -9,-14841,0,46.2,56.2,49.9,1121,343, -8,-14211,0,43,58.3,51.3,1132,343, -7,-14199,0,39.9,60.5,53.9,1138,343, -6,-15366,0,37.3,62.4,52.8,1144,343, -5,-15711,0,34.1,64.9,50.7,1152,343, -,,,,,,,, -,,,,,,,, -20210106_20220224震荡走势,,,,,,,, -take_profit_pct,avg_profit_per_trade,median_profit_per_trade,avg_holding_days,trade_win_rate,stock_win_rate,total_trades,stock_count, -30,12047,0,115.6,44.5,48.8,2508,424, -29,12857,0,113.5,45.1,48.5,2525,425, -28,13349,0,112,45.7,48.9,2538,425, -27,12644,0,110.5,46.3,49.4,2554,425, -26,12878,0,108.4,47.1,48.7,2571,425, -25,12319,0,106.9,47.7,48.5,2589,425, -24,12744,0,104.5,48.6,50.4,2608,425, -23,11949,0,102.3,49.4,51.3,2626,425, -22,11246,0,100.6,50.3,51.3,2637,425, -21,10399,0,98.7,51.1,51.1,2644,425, -20,9908,0,96.3,52,51.1,2666,425, -19,9327,0,93.8,53,52.5,2681,425, -18,7509,0,92.1,53.6,52.6,2686,424, -17,7333,0,89.4,55.1,53.9,2713,425, -16,6404,0,86.6,56.2,54.8,2745,425, -15,5296,0,83.7,57.3,55.3,2763,425, -14,5382,0,80.5,59,54.6,2788,425, -13,4527,0,77.7,60.4,56.6,2800,424, -12,3650,0,74.3,62,57.2,2826,425, -11,1583,0,71.1,63.3,57.2,2849,425, -10,-741,0,68.1,64.7,56.9,2859,425, -9,-2581,0,64.5,66.3,56.9,2879,425, -8,-3833,0,60.7,68.3,56.7,2902,425, -7,-4992,0,56.7,70.5,57.9,2924,425, -6,-4815,0,52.1,73.6,60.5,2948,425, -5,-7635,0,47.9,75.6,58.8,2965,425,