Files
stock_cursor_v0/backend/intraday_radar.py
2026-06-14 11:54:45 +08:00

416 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""盘中实时监控雷达 — 异动检测与推送。
监控类型:
1. 快速拉升5分钟涨幅 >3%
2. 放量突破(量比 >3 且突破关键位)
3. 涨停打开/炸板
4. 连板股追踪
5. 大单异动(单笔超百万)
"""
import datetime as dt
from typing import List, Dict, Any
from sqlalchemy import select, desc, func
from cachetools import TTLCache
import akshare_service as svc
import notifier
from db import get_session
from models import IntradayEvent, StockMetric, Security, DailyQuote
# 缓存最近检测到的事件,避免短时间内重复推送
_event_cache = TTLCache(maxsize=1000, ttl=300) # 5分钟缓存
def _is_trading_time() -> bool:
"""判断是否为交易时间9:30-11:30, 13:00-15:00"""
now = dt.datetime.now()
if now.weekday() >= 5: # 周末
return False
t = now.time()
morning = dt.time(9, 30) <= t <= dt.time(11, 30)
afternoon = dt.time(13, 0) <= t <= dt.time(15, 0)
return morning or afternoon
def _cache_key(code: str, event_type: str) -> str:
"""生成事件缓存键。"""
return f"{code}:{event_type}"
def detect_surge(threshold: float = 3.0) -> List[Dict[str, Any]]:
"""快速拉升检测基于实时报价模拟5分钟涨幅
Args:
threshold: 涨幅阈值(%
Returns:
检测到的异动列表
"""
if not _is_trading_time():
return []
events = []
try:
# 获取涨幅榜前50模拟快速拉升
data = svc.get_hot_stocks()
if not data.get("list"):
return []
for stock in data["list"][:50]:
pct = stock.get("pct", 0)
if pct >= threshold:
code = stock["code"]
key = _cache_key(code, "surge")
if key in _event_cache:
continue
_event_cache[key] = True
events.append({
"code": code,
"name": stock.get("name", code),
"event_type": "surge",
"price": stock.get("price", 0),
"pct": pct,
"description": f"快速拉升 {pct:.2f}%"
})
except Exception as e:
print(f"[surge] error: {e}")
return events
def detect_volume_break(vol_ratio_threshold: float = 3.0) -> List[Dict[str, Any]]:
"""放量突破检测(量比 >3 且价格突破)。
Args:
vol_ratio_threshold: 量比阈值
Returns:
检测到的异动列表
"""
if not _is_trading_time():
return []
events = []
with get_session() as s:
# 查询高量比且上涨的股票
rows = s.execute(
select(StockMetric)
.where(StockMetric.vol_ratio >= vol_ratio_threshold, StockMetric.pct > 0)
.order_by(StockMetric.vol_ratio.desc())
.limit(20)
).scalars().all()
for r in rows:
key = _cache_key(r.code, "volume_break")
if key in _event_cache:
continue
# 判断是否突破关键位60日新高或MA20
is_break = r.pos60 >= 0.95 or (r.close > r.ma20 and r.ma20 > 0)
if is_break:
_event_cache[key] = True
events.append({
"code": r.code,
"name": r.name,
"event_type": "volume_break",
"price": r.close,
"pct": r.pct,
"volume_ratio": r.vol_ratio,
"description": f"放量突破 量比{r.vol_ratio:.1f}"
})
return events
def detect_limit_open() -> List[Dict[str, Any]]:
"""涨停打开/炸板检测。
Returns:
检测到的异动列表
"""
if not _is_trading_time():
return []
events = []
try:
# 获取涨停股
data = svc.get_hot_stocks()
if not data.get("list"):
return []
with get_session() as s:
for stock in data["list"]:
pct = stock.get("pct", 0)
# 涨停附近但未封死9.5%-9.99%
if 9.5 <= pct < 9.99:
code = stock["code"]
key = _cache_key(code, "limit_open")
if key in _event_cache:
continue
_event_cache[key] = True
events.append({
"code": code,
"name": stock.get("name", code),
"event_type": "limit_open",
"price": stock.get("price", 0),
"pct": pct,
"description": f"涨停打开 {pct:.2f}%"
})
except Exception as e:
print(f"[limit_open] error: {e}")
return events
def detect_consecutive_limit() -> List[Dict[str, Any]]:
"""连板股追踪2连板及以上
Returns:
检测到的异动列表
"""
if not _is_trading_time():
return []
events = []
try:
data = svc.get_hot_stocks()
if not data.get("list"):
return []
with get_session() as s:
for stock in data["list"]:
pct = stock.get("pct", 0)
if pct >= 9.9: # 涨停
code = stock["code"]
# 查询历史连板数
recent = s.execute(
select(DailyQuote)
.where(DailyQuote.code == code)
.order_by(DailyQuote.date.desc())
.limit(5)
).scalars().all()
if not recent:
continue
# 统计连续涨停天数
consecutive = 1 # 今天涨停
for q in recent[1:]:
if q.close / q.open >= 1.095: # 近似判断涨停
consecutive += 1
else:
break
if consecutive >= 2:
key = _cache_key(code, "consecutive")
if key in _event_cache:
continue
_event_cache[key] = True
events.append({
"code": code,
"name": stock.get("name", code),
"event_type": "consecutive",
"price": stock.get("price", 0),
"pct": pct,
"description": f"{consecutive}连板"
})
except Exception as e:
print(f"[consecutive] error: {e}")
return events
def detect_big_order(threshold: float = 1000000.0) -> List[Dict[str, Any]]:
"""大单异动检测(单笔超百万)。
注意AkShare 免费接口无实时逐笔数据,此处返回空列表,可接入付费数据源。
Args:
threshold: 单笔金额阈值(元)
Returns:
检测到的异动列表
"""
# 需要付费数据源支持,暂不实现
return []
def scan_all() -> Dict[str, Any]:
"""执行全部异动扫描。
Returns:
扫描结果,包含各类异动事件
"""
if not _is_trading_time():
return {"ok": False, "msg": "非交易时间", "events": []}
all_events = []
# 执行各类检测
all_events.extend(detect_surge())
all_events.extend(detect_volume_break())
all_events.extend(detect_limit_open())
all_events.extend(detect_consecutive_limit())
all_events.extend(detect_big_order())
# 写入数据库
if all_events:
with get_session() as s:
for evt in all_events:
record = IntradayEvent(
code=evt["code"],
name=evt["name"],
event_type=evt["event_type"],
price=evt.get("price", 0),
pct=evt.get("pct", 0),
volume_ratio=evt.get("volume_ratio", 0),
amount=evt.get("amount", 0),
description=evt["description"]
)
s.add(record)
s.commit()
return {
"ok": True,
"count": len(all_events),
"events": all_events,
"scanned_at": dt.datetime.now().isoformat()
}
def get_recent_events(hours: int = 2, limit: int = 50) -> List[Dict[str, Any]]:
"""获取最近N小时的异动事件。
Args:
hours: 时间范围(小时)
limit: 最大返回数量
Returns:
异动事件列表
"""
since = dt.datetime.now() - dt.timedelta(hours=hours)
with get_session() as s:
rows = s.execute(
select(IntradayEvent)
.where(IntradayEvent.detected_at >= since)
.order_by(desc(IntradayEvent.detected_at))
.limit(limit)
).scalars().all()
return [{
"id": r.id,
"code": r.code,
"name": r.name,
"event_type": r.event_type,
"price": r.price,
"pct": r.pct,
"volume_ratio": r.volume_ratio,
"amount": r.amount,
"description": r.description,
"detected_at": r.detected_at.strftime("%H:%M:%S"),
"notified": r.notified
} for r in rows]
def notify_events(event_types: List[str] = None) -> Dict[str, Any]:
"""推送未通知的异动事件。
Args:
event_types: 需要推送的事件类型列表None表示全部
Returns:
推送结果
"""
with get_session() as s:
stmt = select(IntradayEvent).where(IntradayEvent.notified.is_(False))
if event_types:
stmt = stmt.where(IntradayEvent.event_type.in_(event_types))
rows = s.execute(stmt.order_by(desc(IntradayEvent.detected_at)).limit(10)).scalars().all()
if not rows:
return {"ok": True, "count": 0, "msg": "无待推送事件"}
# 按事件类型分组
grouped = {}
for r in rows:
if r.event_type not in grouped:
grouped[r.event_type] = []
grouped[r.event_type].append(r)
# 构造推送消息
type_names = {
"surge": "快速拉升",
"volume_break": "放量突破",
"limit_open": "涨停打开",
"consecutive": "连板追踪",
"big_order": "大单异动"
}
msg_parts = ["【盘中异动雷达】\n"]
for etype, events in grouped.items():
msg_parts.append(f"\n{type_names.get(etype, etype)}:")
for e in events[:5]: # 每类最多5条
msg_parts.append(f"{e.name}({e.code}) {e.description}")
msg = "\n".join(msg_parts)
# 推送
if notifier.any_enabled():
notifier.notify("盘中异动提醒", msg)
# 标记已推送
for r in rows:
r.notified = True
s.commit()
return {"ok": True, "count": len(rows), "msg": f"已推送 {len(rows)} 条异动"}
def get_statistics(date: dt.date = None) -> Dict[str, Any]:
"""获取异动统计数据。
Args:
date: 统计日期None表示今天
Returns:
统计结果
"""
if date is None:
date = dt.date.today()
start = dt.datetime.combine(date, dt.time.min)
end = dt.datetime.combine(date, dt.time.max)
with get_session() as s:
# 按事件类型统计
stmt = (
select(IntradayEvent.event_type, func.count().label("count"))
.where(IntradayEvent.detected_at >= start, IntradayEvent.detected_at <= end)
.group_by(IntradayEvent.event_type)
)
rows = s.execute(stmt).all()
stats = {row.event_type: row.count for row in rows}
total = sum(stats.values())
# 最活跃股票
stmt = (
select(IntradayEvent.code, IntradayEvent.name, func.count().label("count"))
.where(IntradayEvent.detected_at >= start, IntradayEvent.detected_at <= end)
.group_by(IntradayEvent.code, IntradayEvent.name)
.order_by(desc("count"))
.limit(10)
)
top_stocks = s.execute(stmt).all()
return {
"date": date.isoformat(),
"total": total,
"by_type": stats,
"top_stocks": [{"code": r.code, "name": r.name, "count": r.count} for r in top_stocks]
}