208 lines
7.0 KiB
Python
208 lines
7.0 KiB
Python
"""定时任务管理系统"""
|
|
from typing import Dict, Any, List
|
|
from sqlalchemy import select
|
|
from db import get_session
|
|
from models import ScheduledTask
|
|
import scheduler
|
|
|
|
|
|
# 任务配置定义
|
|
TASK_CONFIGS = {
|
|
"daily_ingest": {
|
|
"name": "每日数据入库",
|
|
"description": "收盘后自动抓取并入库股票、板块、资金等数据",
|
|
"default_enabled": True,
|
|
"default_schedule": "cron",
|
|
"default_cron": "mon-fri 16:00",
|
|
"category": "数据入库"
|
|
},
|
|
"alert_check": {
|
|
"name": "预警检查",
|
|
"description": "每分钟检查价格预警规则(仅交易时间)",
|
|
"default_enabled": True,
|
|
"default_schedule": "interval",
|
|
"default_interval": 60,
|
|
"category": "实时监控"
|
|
},
|
|
"daily_report": {
|
|
"name": "AI复盘日报",
|
|
"description": "生成每日复盘报告并推送",
|
|
"default_enabled": True,
|
|
"default_schedule": "cron",
|
|
"default_cron": "mon-fri 16:10",
|
|
"category": "AI分析"
|
|
},
|
|
"verify_pred": {
|
|
"name": "预测准确率核验",
|
|
"description": "核验到期的AI预测结果",
|
|
"default_enabled": True,
|
|
"default_schedule": "cron",
|
|
"default_cron": "mon-fri 16:15",
|
|
"category": "AI分析"
|
|
},
|
|
"signal_stats": {
|
|
"name": "信号历史胜率",
|
|
"description": "重新计算技术信号历史统计",
|
|
"default_enabled": True,
|
|
"default_schedule": "cron",
|
|
"default_cron": "sat 09:00",
|
|
"category": "AI分析"
|
|
},
|
|
"intraday_scan": {
|
|
"name": "盘中异动扫描",
|
|
"description": "实时扫描急涨急跌、放量突破等异动",
|
|
"default_enabled": True,
|
|
"default_schedule": "interval",
|
|
"default_interval": 60,
|
|
"category": "实时监控"
|
|
},
|
|
"calendar_alerts": {
|
|
"name": "日历事件提醒",
|
|
"description": "推送除权、解禁、财报等重要事件",
|
|
"default_enabled": True,
|
|
"default_schedule": "cron",
|
|
"default_cron": "mon-fri 08:30",
|
|
"category": "事件提醒"
|
|
}
|
|
}
|
|
|
|
|
|
def init_tasks():
|
|
"""初始化任务配置到数据库"""
|
|
with get_session() as s:
|
|
for task_id, config in TASK_CONFIGS.items():
|
|
existing = s.execute(
|
|
select(ScheduledTask).where(ScheduledTask.task_id == task_id)
|
|
).scalar_one_or_none()
|
|
|
|
if not existing:
|
|
task = ScheduledTask(
|
|
task_id=task_id,
|
|
name=config["name"],
|
|
description=config["description"],
|
|
enabled=config["default_enabled"],
|
|
schedule_type=config["default_schedule"],
|
|
cron_expression=config.get("default_cron"),
|
|
interval_seconds=config.get("default_interval"),
|
|
category=config["category"]
|
|
)
|
|
s.add(task)
|
|
s.commit()
|
|
|
|
|
|
def get_all_tasks() -> List[Dict[str, Any]]:
|
|
"""获取所有任务配置"""
|
|
with get_session() as s:
|
|
tasks = s.execute(select(ScheduledTask).order_by(ScheduledTask.id)).scalars().all()
|
|
return [{
|
|
"id": t.id,
|
|
"task_id": t.task_id,
|
|
"name": t.name,
|
|
"description": t.description,
|
|
"enabled": t.enabled,
|
|
"schedule_type": t.schedule_type,
|
|
"cron_expression": t.cron_expression,
|
|
"interval_seconds": t.interval_seconds,
|
|
"category": t.category,
|
|
"last_run": t.last_run.strftime("%Y-%m-%d %H:%M:%S") if t.last_run else None,
|
|
"next_run": get_next_run_time(t.task_id),
|
|
"run_count": t.run_count,
|
|
"last_status": t.last_status
|
|
} for t in tasks]
|
|
|
|
|
|
def get_next_run_time(task_id: str) -> str:
|
|
"""获取任务下次运行时间"""
|
|
if scheduler._scheduler:
|
|
job = scheduler._scheduler.get_job(task_id)
|
|
if job and job.next_run_time:
|
|
return job.next_run_time.strftime("%Y-%m-%d %H:%M:%S")
|
|
return "未调度"
|
|
|
|
|
|
def update_task(task_id: str, enabled: bool = None, schedule_type: str = None,
|
|
cron_expression: str = None, interval_seconds: int = None) -> Dict[str, Any]:
|
|
"""更新任务配置"""
|
|
with get_session() as s:
|
|
task = s.execute(
|
|
select(ScheduledTask).where(ScheduledTask.task_id == task_id)
|
|
).scalar_one_or_none()
|
|
|
|
if not task:
|
|
return {"ok": False, "msg": "任务不存在"}
|
|
|
|
if enabled is not None:
|
|
task.enabled = enabled
|
|
if schedule_type is not None:
|
|
task.schedule_type = schedule_type
|
|
if cron_expression is not None:
|
|
task.cron_expression = cron_expression
|
|
if interval_seconds is not None:
|
|
task.interval_seconds = interval_seconds
|
|
|
|
s.commit()
|
|
|
|
# 重新调度
|
|
scheduler.reload_scheduler()
|
|
|
|
return {"ok": True, "msg": "任务配置已更新"}
|
|
|
|
|
|
def toggle_task(task_id: str) -> Dict[str, Any]:
|
|
"""切换任务开关"""
|
|
with get_session() as s:
|
|
task = s.execute(
|
|
select(ScheduledTask).where(ScheduledTask.task_id == task_id)
|
|
).scalar_one_or_none()
|
|
|
|
if not task:
|
|
return {"ok": False, "msg": "任务不存在"}
|
|
|
|
task.enabled = not task.enabled
|
|
s.commit()
|
|
|
|
# 重新调度
|
|
scheduler.reload_scheduler()
|
|
|
|
return {
|
|
"ok": True,
|
|
"enabled": task.enabled,
|
|
"msg": f"任务已{'启用' if task.enabled else '禁用'}"
|
|
}
|
|
|
|
|
|
def record_task_run(task_id: str, status: str, message: str = ""):
|
|
"""记录任务执行"""
|
|
import datetime as dt
|
|
with get_session() as s:
|
|
task = s.execute(
|
|
select(ScheduledTask).where(ScheduledTask.task_id == task_id)
|
|
).scalar_one_or_none()
|
|
|
|
if task:
|
|
task.last_run = dt.datetime.now()
|
|
task.run_count += 1
|
|
task.last_status = status
|
|
task.last_message = message[:500] if message else ""
|
|
s.commit()
|
|
|
|
|
|
def get_task_logs(task_id: str = None, limit: int = 50) -> List[Dict[str, Any]]:
|
|
"""获取任务执行日志"""
|
|
# 这里可以从 JobRun 表读取,或者创建专门的 TaskLog 表
|
|
from models import JobRun
|
|
with get_session() as s:
|
|
stmt = select(JobRun).order_by(JobRun.id.desc()).limit(limit)
|
|
if task_id:
|
|
stmt = stmt.where(JobRun.job == task_id)
|
|
logs = s.execute(stmt).scalars().all()
|
|
|
|
return [{
|
|
"id": log.id,
|
|
"task_id": log.job,
|
|
"status": log.status,
|
|
"started": log.started_at.strftime("%Y-%m-%d %H:%M:%S") if log.started_at else "",
|
|
"finished": log.finished_at.strftime("%Y-%m-%d %H:%M:%S") if log.finished_at else "",
|
|
"duration": (log.finished_at - log.started_at).total_seconds() if log.finished_at and log.started_at else 0,
|
|
"message": log.message
|
|
} for log in logs] |