"""定时任务:收盘后自动入库。""" from __future__ import annotations import threading from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger import config import ingest import alerts import report import signals import intraday_radar import trade_calendar _scheduler: BackgroundScheduler | None = None _lock = threading.Lock() _running = {"flag": False} def _job(): with _lock: if _running["flag"]: return _running["flag"] = True try: ingest.run_daily_ingest() finally: _running["flag"] = False def trigger_async(): """手动触发一次入库(后台线程,不阻塞请求)。""" t = threading.Thread(target=_job, daemon=True) t.start() return {"started": True} def _job_all(): with _lock: if _running["flag"]: return _running["flag"] = True try: ingest.ingest_quotes_all() finally: _running["flag"] = False def trigger_all_async(): """手动触发全市场回填(后台线程,耗时较长)。""" if _running["flag"]: return {"started": False, "msg": "已有任务在执行"} t = threading.Thread(target=_job_all, daemon=True) t.start() return {"started": True} def is_running(): return _running["flag"] def _job_report(): try: report.generate(push=True) except Exception as e: print("[report] generate error:", repr(e)[:160]) def trigger_report_async(push=True): """手动触发生成日报(后台线程)。""" t = threading.Thread(target=lambda: report.generate(push=push), daemon=True) t.start() return {"started": True} _sig_running = {"flag": False} def _job_signal_stats(sample=500, horizon=5): if _sig_running["flag"]: return _sig_running["flag"] = True try: signals.compute_signal_stats(sample_limit=sample, horizon=horizon) except Exception as e: print("[signals] compute error:", repr(e)[:160]) finally: _sig_running["flag"] = False def trigger_signal_stats_async(sample=500, horizon=5): if _sig_running["flag"]: return {"started": False, "msg": "胜率回测进行中"} t = threading.Thread(target=lambda: _job_signal_stats(sample, horizon), daemon=True) t.start() return {"started": True} def _job_verify(): try: signals.verify_predictions() except Exception as e: print("[predict] verify error:", repr(e)[:160]) def reload_scheduler(): """重新加载调度器(应用新配置)""" global _scheduler if _scheduler: _scheduler.shutdown(wait=False) _scheduler = None start_scheduler() return {"ok": True, "msg": "调度器已重新加载"} def start_scheduler(): global _scheduler if _scheduler is not None: return _scheduler # 先初始化任务配置 try: import task_manager task_manager.init_tasks() except Exception as e: print(f"[scheduler] init tasks error: {repr(e)[:120]}") _scheduler = BackgroundScheduler(timezone="Asia/Shanghai") # 从数据库加载任务配置 from db import get_session from models import ScheduledTask from sqlalchemy import select try: with get_session() as s: tasks = s.execute(select(ScheduledTask).where(ScheduledTask.enabled == True)).scalars().all() for task in tasks: _add_job_from_config(task) except Exception as e: print(f"[scheduler] load tasks error: {repr(e)[:120]}") # 降级:使用默认配置 _add_default_jobs() _scheduler.start() return _scheduler def _add_job_from_config(task): """根据配置添加任务""" job_func = _get_job_function(task.task_id) if not job_func: return if task.schedule_type == "cron" and task.cron_expression: # 解析 cron 表达式 (格式: "mon-fri 16:00") parts = task.cron_expression.split() if len(parts) == 2: day_of_week, time_str = parts hour, minute = map(int, time_str.split(':')) _scheduler.add_job( job_func, CronTrigger(day_of_week=day_of_week, hour=hour, minute=minute), id=task.task_id, replace_existing=True, misfire_grace_time=3600 ) elif task.schedule_type == "interval" and task.interval_seconds: _scheduler.add_job( job_func, IntervalTrigger(seconds=task.interval_seconds), id=task.task_id, replace_existing=True, max_instances=1 ) def _get_job_function(task_id): """获取任务函数""" job_map = { "daily_ingest": _job, "alert_check": _safe_check_alerts, "daily_report": _job_report, "verify_pred": _job_verify, "signal_stats": lambda: _job_signal_stats(), "intraday_scan": _safe_scan_intraday, "calendar_alerts": _job_calendar_alerts } return job_map.get(task_id) def _add_default_jobs(): """添加默认任务配置(降级方案)""" _scheduler.add_job( _job, CronTrigger(day_of_week="mon-fri", hour=config.INGEST_HOUR, minute=config.INGEST_MINUTE), id="daily_ingest", replace_existing=True, misfire_grace_time=3600, ) _scheduler.add_job( _safe_check_alerts, IntervalTrigger(seconds=60), id="alert_check", replace_existing=True, max_instances=1, ) _rep_total = config.INGEST_HOUR * 60 + config.INGEST_MINUTE + 10 _scheduler.add_job( _job_report, CronTrigger(day_of_week="mon-fri", hour=(_rep_total // 60) % 24, minute=_rep_total % 60), id="daily_report", replace_existing=True, misfire_grace_time=3600, ) _scheduler.add_job( _job_verify, CronTrigger(day_of_week="mon-fri", hour=(_rep_total // 60) % 24, minute=(_rep_total + 5) % 60), id="verify_pred", replace_existing=True, misfire_grace_time=3600, ) _scheduler.add_job( _job_signal_stats, CronTrigger(day_of_week="sat", hour=9, minute=0), id="signal_stats", replace_existing=True, misfire_grace_time=7200, ) _scheduler.add_job( _safe_scan_intraday, IntervalTrigger(seconds=60), id="intraday_scan", replace_existing=True, max_instances=1, ) _scheduler.add_job( _job_calendar_alerts, CronTrigger(day_of_week="mon-fri", hour=8, minute=30), id="calendar_alerts", replace_existing=True, misfire_grace_time=3600, ) def _safe_check_alerts(): # 只在交易日的交易时间执行 if not intraday_radar._is_trading_time(): return try: alerts.check_alerts() except Exception as e: print("[alert] check error:", repr(e)[:120]) def _safe_scan_intraday(): # 只在交易时间执行 if not intraday_radar._is_trading_time(): return try: result = intraday_radar.scan_all() if result.get("count", 0) > 0: intraday_radar.notify_events() except Exception as e: print("[intraday] scan error:", repr(e)[:120]) def _job_calendar_alerts(): try: trade_calendar.check_and_push_calendar_alerts() except Exception as e: print("[calendar] alert error:", repr(e)[:120])