diff --git a/backend/main.py b/backend/main.py index 00920f1..52c1833 100644 --- a/backend/main.py +++ b/backend/main.py @@ -58,10 +58,11 @@ import position_cost as pc import trade_calendar as cal import data_manager as dm import paper_trading as paper +import task_manager from db import init_db, get_session from models import (DailyQuote, IndexDaily, SectorDaily, FundFlowDaily, SentimentDaily, DragonTiger, Security, JobRun, StockMetric, Trade, - AlertRule, AlertEvent, SelectorStrategy, SelectorAlert) + AlertRule, AlertEvent, SelectorStrategy, SelectorAlert, ScheduledTask) @asynccontextmanager @@ -71,8 +72,9 @@ async def lifespan(app: FastAPI): init_auth.init_default_admin() wl.init_default_groups() paper.ensure_default_account() + task_manager.init_tasks() scheduler.start_scheduler() - print("[startup] db + scheduler + auth ready") + print("[startup] db + scheduler + task_manager + auth ready") except Exception as e: print("[startup] WARN:", repr(e)[:160]) yield @@ -1636,6 +1638,50 @@ def paper_place_order(account_id: int, req: PaperOrderIn): return paper.place_order(account_id, req.code, req.side, req.qty, req.price, req.reason) +# ============ 定时任务管理 ============ +@app.get("/api/tasks") +def list_tasks(current_user = Depends(require_admin)): + """获取所有定时任务""" + return {"ok": True, "tasks": task_manager.get_all_tasks()} + + +class UpdateTaskRequest(BaseModel): + enabled: Optional[bool] = None + schedule_type: Optional[str] = None + cron_expression: Optional[str] = None + interval_seconds: Optional[int] = None + + +@app.put("/api/tasks/{task_id}") +def update_task(task_id: str, req: UpdateTaskRequest, current_user = Depends(require_admin)): + """更新任务配置""" + return task_manager.update_task( + task_id, + enabled=req.enabled, + schedule_type=req.schedule_type, + cron_expression=req.cron_expression, + interval_seconds=req.interval_seconds + ) + + +@app.post("/api/tasks/{task_id}/toggle") +def toggle_task(task_id: str, current_user = Depends(require_admin)): + """切换任务开关""" + return task_manager.toggle_task(task_id) + + +@app.get("/api/tasks/{task_id}/logs") +def task_logs(task_id: str, limit: int = Query(50, le=200), current_user = Depends(require_admin)): + """获取任务执行日志""" + return {"ok": True, "logs": task_manager.get_task_logs(task_id, limit)} + + +@app.post("/api/tasks/reload") +def reload_tasks(current_user = Depends(require_admin)): + """重新加载调度器""" + return scheduler.reload_scheduler() + + @app.get("/api/paper/accounts/{account_id}/portfolio") def paper_get_portfolio(account_id: int): return paper.get_portfolio(account_id) diff --git a/backend/models.py b/backend/models.py index aef5d18..3ae35d5 100644 --- a/backend/models.py +++ b/backend/models.py @@ -429,3 +429,22 @@ class WatchlistItem(Base): sort_order: Mapped[int] = mapped_column(Integer, default=0) note: Mapped[str] = mapped_column(String(200), default="") # 个股备注 added_at: Mapped[dt.datetime] = mapped_column(DateTime, server_default=func.now()) + + +class ScheduledTask(Base): + """定时任务配置。""" + __tablename__ = "scheduled_tasks" + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + task_id: Mapped[str] = mapped_column(String(40), unique=True, index=True) # 任务标识 + name: Mapped[str] = mapped_column(String(80)) # 任务名称 + description: Mapped[str] = mapped_column(String(200), default="") # 描述 + enabled: Mapped[bool] = mapped_column(default=True) # 是否启用 + schedule_type: Mapped[str] = mapped_column(String(20), default="cron") # cron/interval + cron_expression: Mapped[str] = mapped_column(String(50), default="") # cron表达式 + interval_seconds: Mapped[int] = mapped_column(Integer, default=0) # 间隔秒数 + category: Mapped[str] = mapped_column(String(20), default="其他") # 分类 + last_run: Mapped[dt.datetime | None] = mapped_column(DateTime, nullable=True) # 上次运行 + run_count: Mapped[int] = mapped_column(Integer, default=0) # 运行次数 + last_status: Mapped[str] = mapped_column(String(20), default="") # 上次状态 + last_message: Mapped[str] = mapped_column(String(500), default="") # 上次消息 + created_at: Mapped[dt.datetime] = mapped_column(DateTime, server_default=func.now()) diff --git a/backend/scheduler.py b/backend/scheduler.py index cb3e44c..f213bb4 100644 --- a/backend/scheduler.py +++ b/backend/scheduler.py @@ -106,11 +106,95 @@ def _job_verify(): print("[predict] verify error:", repr(e)[:160]) +def reload_scheduler(): + """重新加载调度器(应用新配置)""" + global _scheduler + if _scheduler: + _scheduler.shutdown(wait=False) + _scheduler = None + start_scheduler() + return {"ok": True, "msg": "调度器已重新加载"} + + def start_scheduler(): global _scheduler if _scheduler is not None: return _scheduler + + # 先初始化任务配置 + try: + import task_manager + task_manager.init_tasks() + except Exception as e: + print(f"[scheduler] init tasks error: {repr(e)[:120]}") + _scheduler = BackgroundScheduler(timezone="Asia/Shanghai") + + # 从数据库加载任务配置 + from db import get_session + from models import ScheduledTask + from sqlalchemy import select + + try: + with get_session() as s: + tasks = s.execute(select(ScheduledTask).where(ScheduledTask.enabled == True)).scalars().all() + + for task in tasks: + _add_job_from_config(task) + except Exception as e: + print(f"[scheduler] load tasks error: {repr(e)[:120]}") + # 降级:使用默认配置 + _add_default_jobs() + + _scheduler.start() + return _scheduler + + +def _add_job_from_config(task): + """根据配置添加任务""" + job_func = _get_job_function(task.task_id) + if not job_func: + return + + if task.schedule_type == "cron" and task.cron_expression: + # 解析 cron 表达式 (格式: "mon-fri 16:00") + parts = task.cron_expression.split() + if len(parts) == 2: + day_of_week, time_str = parts + hour, minute = map(int, time_str.split(':')) + _scheduler.add_job( + job_func, + CronTrigger(day_of_week=day_of_week, hour=hour, minute=minute), + id=task.task_id, + replace_existing=True, + misfire_grace_time=3600 + ) + elif task.schedule_type == "interval" and task.interval_seconds: + _scheduler.add_job( + job_func, + IntervalTrigger(seconds=task.interval_seconds), + id=task.task_id, + replace_existing=True, + max_instances=1 + ) + + +def _get_job_function(task_id): + """获取任务函数""" + job_map = { + "daily_ingest": _job, + "alert_check": _safe_check_alerts, + "daily_report": _job_report, + "verify_pred": _job_verify, + "signal_stats": lambda: _job_signal_stats(), + "intraday_scan": _safe_scan_intraday, + "calendar_alerts": _job_calendar_alerts + } + return job_map.get(task_id) + + +def _add_default_jobs(): + """添加默认任务配置(降级方案)""" _scheduler.add_job( _job, CronTrigger(day_of_week="mon-fri", hour=config.INGEST_HOUR, minute=config.INGEST_MINUTE), id="daily_ingest", replace_existing=True, misfire_grace_time=3600, @@ -119,37 +203,33 @@ def start_scheduler(): _safe_check_alerts, IntervalTrigger(seconds=60), id="alert_check", replace_existing=True, max_instances=1, ) - # 收盘入库之后 10 分钟生成 AI 复盘日报并推送 _rep_total = config.INGEST_HOUR * 60 + config.INGEST_MINUTE + 10 _scheduler.add_job( _job_report, CronTrigger(day_of_week="mon-fri", hour=(_rep_total // 60) % 24, minute=_rep_total % 60), id="daily_report", replace_existing=True, misfire_grace_time=3600, ) - # 收盘后核验到期预测(实测准确率) _scheduler.add_job( _job_verify, CronTrigger(day_of_week="mon-fri", hour=(_rep_total // 60) % 24, minute=(_rep_total + 5) % 60), id="verify_pred", replace_existing=True, misfire_grace_time=3600, ) - # 每周六重算信号历史胜率 _scheduler.add_job( _job_signal_stats, CronTrigger(day_of_week="sat", hour=9, minute=0), id="signal_stats", replace_existing=True, misfire_grace_time=7200, ) - # 盘中异动扫描(交易时间每分钟) _scheduler.add_job( _safe_scan_intraday, IntervalTrigger(seconds=60), id="intraday_scan", replace_existing=True, max_instances=1, ) - # 每日早盘前推送日历事件提醒(持仓股除权、解禁、财报等) _scheduler.add_job( _job_calendar_alerts, CronTrigger(day_of_week="mon-fri", hour=8, minute=30), id="calendar_alerts", replace_existing=True, misfire_grace_time=3600, ) - _scheduler.start() - return _scheduler def _safe_check_alerts(): + # 只在交易日的交易时间执行 + if not intraday_radar._is_trading_time(): + return try: alerts.check_alerts() except Exception as e: @@ -157,6 +237,9 @@ def _safe_check_alerts(): def _safe_scan_intraday(): + # 只在交易时间执行 + if not intraday_radar._is_trading_time(): + return try: result = intraday_radar.scan_all() if result.get("count", 0) > 0: diff --git a/backend/task_manager.py b/backend/task_manager.py new file mode 100644 index 0000000..6ef5e79 --- /dev/null +++ b/backend/task_manager.py @@ -0,0 +1,208 @@ +"""定时任务管理系统""" +from typing import Dict, Any, List +from sqlalchemy import select +from db import get_session +from models import ScheduledTask +import scheduler + + +# 任务配置定义 +TASK_CONFIGS = { + "daily_ingest": { + "name": "每日数据入库", + "description": "收盘后自动抓取并入库股票、板块、资金等数据", + "default_enabled": True, + "default_schedule": "cron", + "default_cron": "mon-fri 16:00", + "category": "数据入库" + }, + "alert_check": { + "name": "预警检查", + "description": "每分钟检查价格预警规则(仅交易时间)", + "default_enabled": True, + "default_schedule": "interval", + "default_interval": 60, + "category": "实时监控" + }, + "daily_report": { + "name": "AI复盘日报", + "description": "生成每日复盘报告并推送", + "default_enabled": True, + "default_schedule": "cron", + "default_cron": "mon-fri 16:10", + "category": "AI分析" + }, + "verify_pred": { + "name": "预测准确率核验", + "description": "核验到期的AI预测结果", + "default_enabled": True, + "default_schedule": "cron", + "default_cron": "mon-fri 16:15", + "category": "AI分析" + }, + "signal_stats": { + "name": "信号历史胜率", + "description": "重新计算技术信号历史统计", + "default_enabled": True, + "default_schedule": "cron", + "default_cron": "sat 09:00", + "category": "AI分析" + }, + "intraday_scan": { + "name": "盘中异动扫描", + "description": "实时扫描急涨急跌、放量突破等异动", + "default_enabled": True, + "default_schedule": "interval", + "default_interval": 60, + "category": "实时监控" + }, + "calendar_alerts": { + "name": "日历事件提醒", + "description": "推送除权、解禁、财报等重要事件", + "default_enabled": True, + "default_schedule": "cron", + "default_cron": "mon-fri 08:30", + "category": "事件提醒" + } +} + + +def init_tasks(): + """初始化任务配置到数据库""" + with get_session() as s: + for task_id, config in TASK_CONFIGS.items(): + existing = s.execute( + select(ScheduledTask).where(ScheduledTask.task_id == task_id) + ).scalar_one_or_none() + + if not existing: + task = ScheduledTask( + task_id=task_id, + name=config["name"], + description=config["description"], + enabled=config["default_enabled"], + schedule_type=config["default_schedule"], + cron_expression=config.get("default_cron"), + interval_seconds=config.get("default_interval"), + category=config["category"] + ) + s.add(task) + s.commit() + + +def get_all_tasks() -> List[Dict[str, Any]]: + """获取所有任务配置""" + with get_session() as s: + tasks = s.execute(select(ScheduledTask).order_by(ScheduledTask.id)).scalars().all() + return [{ + "id": t.id, + "task_id": t.task_id, + "name": t.name, + "description": t.description, + "enabled": t.enabled, + "schedule_type": t.schedule_type, + "cron_expression": t.cron_expression, + "interval_seconds": t.interval_seconds, + "category": t.category, + "last_run": t.last_run.strftime("%Y-%m-%d %H:%M:%S") if t.last_run else None, + "next_run": get_next_run_time(t.task_id), + "run_count": t.run_count, + "last_status": t.last_status + } for t in tasks] + + +def get_next_run_time(task_id: str) -> str: + """获取任务下次运行时间""" + if scheduler._scheduler: + job = scheduler._scheduler.get_job(task_id) + if job and job.next_run_time: + return job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") + return "未调度" + + +def update_task(task_id: str, enabled: bool = None, schedule_type: str = None, + cron_expression: str = None, interval_seconds: int = None) -> Dict[str, Any]: + """更新任务配置""" + with get_session() as s: + task = s.execute( + select(ScheduledTask).where(ScheduledTask.task_id == task_id) + ).scalar_one_or_none() + + if not task: + return {"ok": False, "msg": "任务不存在"} + + if enabled is not None: + task.enabled = enabled + if schedule_type is not None: + task.schedule_type = schedule_type + if cron_expression is not None: + task.cron_expression = cron_expression + if interval_seconds is not None: + task.interval_seconds = interval_seconds + + s.commit() + + # 重新调度 + scheduler.reload_scheduler() + + return {"ok": True, "msg": "任务配置已更新"} + + +def toggle_task(task_id: str) -> Dict[str, Any]: + """切换任务开关""" + with get_session() as s: + task = s.execute( + select(ScheduledTask).where(ScheduledTask.task_id == task_id) + ).scalar_one_or_none() + + if not task: + return {"ok": False, "msg": "任务不存在"} + + task.enabled = not task.enabled + s.commit() + + # 重新调度 + scheduler.reload_scheduler() + + return { + "ok": True, + "enabled": task.enabled, + "msg": f"任务已{'启用' if task.enabled else '禁用'}" + } + + +def record_task_run(task_id: str, status: str, message: str = ""): + """记录任务执行""" + import datetime as dt + with get_session() as s: + task = s.execute( + select(ScheduledTask).where(ScheduledTask.task_id == task_id) + ).scalar_one_or_none() + + if task: + task.last_run = dt.datetime.now() + task.run_count += 1 + task.last_status = status + task.last_message = message[:500] if message else "" + s.commit() + + +def get_task_logs(task_id: str = None, limit: int = 50) -> List[Dict[str, Any]]: + """获取任务执行日志""" + # 这里可以从 JobRun 表读取,或者创建专门的 TaskLog 表 + from models import JobRun + with get_session() as s: + stmt = select(JobRun).order_by(JobRun.id.desc()).limit(limit) + if task_id: + stmt = stmt.where(JobRun.job == task_id) + logs = s.execute(stmt).scalars().all() + + return [{ + "id": log.id, + "task_id": log.job, + "status": log.status, + "started": log.started_at.strftime("%Y-%m-%d %H:%M:%S") if log.started_at else "", + "finished": log.finished_at.strftime("%Y-%m-%d %H:%M:%S") if log.finished_at else "", + "duration": (log.finished_at - log.started_at).total_seconds() if log.finished_at and log.started_at else 0, + "message": log.message + } for log in logs] \ No newline at end of file diff --git a/prototype/app.js b/prototype/app.js index d33e9d7..fc3e501 100644 --- a/prototype/app.js +++ b/prototype/app.js @@ -53,7 +53,7 @@ function showLoginModal() { bg.id = '_login_modal'; bg.style.cssText = 'position:fixed;inset:0;background:#00000099;z-index:20000;display:flex;align-items:center;justify-content:center'; bg.innerHTML = `
| 时间 | 任务 | 状态 | 耗时 | 消息 |
|---|