416 lines
13 KiB
Python
416 lines
13 KiB
Python
"""盘中实时监控雷达 — 异动检测与推送。
|
||
|
||
监控类型:
|
||
1. 快速拉升(5分钟涨幅 >3%)
|
||
2. 放量突破(量比 >3 且突破关键位)
|
||
3. 涨停打开/炸板
|
||
4. 连板股追踪
|
||
5. 大单异动(单笔超百万)
|
||
"""
|
||
import datetime as dt
|
||
from typing import List, Dict, Any
|
||
from sqlalchemy import select, desc, func
|
||
from cachetools import TTLCache
|
||
|
||
import akshare_service as svc
|
||
import notifier
|
||
from db import get_session
|
||
from models import IntradayEvent, StockMetric, Security, DailyQuote
|
||
|
||
# 缓存最近检测到的事件,避免短时间内重复推送
|
||
_event_cache = TTLCache(maxsize=1000, ttl=300) # 5分钟缓存
|
||
|
||
|
||
def _is_trading_time() -> bool:
|
||
"""判断是否为交易时间(9:30-11:30, 13:00-15:00)。"""
|
||
now = dt.datetime.now()
|
||
if now.weekday() >= 5: # 周末
|
||
return False
|
||
t = now.time()
|
||
morning = dt.time(9, 30) <= t <= dt.time(11, 30)
|
||
afternoon = dt.time(13, 0) <= t <= dt.time(15, 0)
|
||
return morning or afternoon
|
||
|
||
|
||
def _cache_key(code: str, event_type: str) -> str:
|
||
"""生成事件缓存键。"""
|
||
return f"{code}:{event_type}"
|
||
|
||
|
||
def detect_surge(threshold: float = 3.0) -> List[Dict[str, Any]]:
|
||
"""快速拉升检测(基于实时报价,模拟5分钟涨幅)。
|
||
|
||
Args:
|
||
threshold: 涨幅阈值(%)
|
||
|
||
Returns:
|
||
检测到的异动列表
|
||
"""
|
||
if not _is_trading_time():
|
||
return []
|
||
|
||
events = []
|
||
try:
|
||
# 获取涨幅榜前50(模拟快速拉升)
|
||
data = svc.get_hot_stocks()
|
||
if not data.get("list"):
|
||
return []
|
||
|
||
for stock in data["list"][:50]:
|
||
pct = stock.get("pct", 0)
|
||
if pct >= threshold:
|
||
code = stock["code"]
|
||
key = _cache_key(code, "surge")
|
||
if key in _event_cache:
|
||
continue
|
||
|
||
_event_cache[key] = True
|
||
events.append({
|
||
"code": code,
|
||
"name": stock.get("name", code),
|
||
"event_type": "surge",
|
||
"price": stock.get("price", 0),
|
||
"pct": pct,
|
||
"description": f"快速拉升 {pct:.2f}%"
|
||
})
|
||
except Exception as e:
|
||
print(f"[surge] error: {e}")
|
||
|
||
return events
|
||
|
||
|
||
def detect_volume_break(vol_ratio_threshold: float = 3.0) -> List[Dict[str, Any]]:
|
||
"""放量突破检测(量比 >3 且价格突破)。
|
||
|
||
Args:
|
||
vol_ratio_threshold: 量比阈值
|
||
|
||
Returns:
|
||
检测到的异动列表
|
||
"""
|
||
if not _is_trading_time():
|
||
return []
|
||
|
||
events = []
|
||
with get_session() as s:
|
||
# 查询高量比且上涨的股票
|
||
rows = s.execute(
|
||
select(StockMetric)
|
||
.where(StockMetric.vol_ratio >= vol_ratio_threshold, StockMetric.pct > 0)
|
||
.order_by(StockMetric.vol_ratio.desc())
|
||
.limit(20)
|
||
).scalars().all()
|
||
|
||
for r in rows:
|
||
key = _cache_key(r.code, "volume_break")
|
||
if key in _event_cache:
|
||
continue
|
||
|
||
# 判断是否突破关键位(60日新高或MA20)
|
||
is_break = r.pos60 >= 0.95 or (r.close > r.ma20 and r.ma20 > 0)
|
||
if is_break:
|
||
_event_cache[key] = True
|
||
events.append({
|
||
"code": r.code,
|
||
"name": r.name,
|
||
"event_type": "volume_break",
|
||
"price": r.close,
|
||
"pct": r.pct,
|
||
"volume_ratio": r.vol_ratio,
|
||
"description": f"放量突破 量比{r.vol_ratio:.1f}"
|
||
})
|
||
|
||
return events
|
||
|
||
|
||
def detect_limit_open() -> List[Dict[str, Any]]:
|
||
"""涨停打开/炸板检测。
|
||
|
||
Returns:
|
||
检测到的异动列表
|
||
"""
|
||
if not _is_trading_time():
|
||
return []
|
||
|
||
events = []
|
||
try:
|
||
# 获取涨停股
|
||
data = svc.get_hot_stocks()
|
||
if not data.get("list"):
|
||
return []
|
||
|
||
with get_session() as s:
|
||
for stock in data["list"]:
|
||
pct = stock.get("pct", 0)
|
||
# 涨停附近但未封死(9.5%-9.99%)
|
||
if 9.5 <= pct < 9.99:
|
||
code = stock["code"]
|
||
key = _cache_key(code, "limit_open")
|
||
if key in _event_cache:
|
||
continue
|
||
|
||
_event_cache[key] = True
|
||
events.append({
|
||
"code": code,
|
||
"name": stock.get("name", code),
|
||
"event_type": "limit_open",
|
||
"price": stock.get("price", 0),
|
||
"pct": pct,
|
||
"description": f"涨停打开 {pct:.2f}%"
|
||
})
|
||
except Exception as e:
|
||
print(f"[limit_open] error: {e}")
|
||
|
||
return events
|
||
|
||
|
||
def detect_consecutive_limit() -> List[Dict[str, Any]]:
|
||
"""连板股追踪(2连板及以上)。
|
||
|
||
Returns:
|
||
检测到的异动列表
|
||
"""
|
||
if not _is_trading_time():
|
||
return []
|
||
|
||
events = []
|
||
try:
|
||
data = svc.get_hot_stocks()
|
||
if not data.get("list"):
|
||
return []
|
||
|
||
with get_session() as s:
|
||
for stock in data["list"]:
|
||
pct = stock.get("pct", 0)
|
||
if pct >= 9.9: # 涨停
|
||
code = stock["code"]
|
||
|
||
# 查询历史连板数
|
||
recent = s.execute(
|
||
select(DailyQuote)
|
||
.where(DailyQuote.code == code)
|
||
.order_by(DailyQuote.date.desc())
|
||
.limit(5)
|
||
).scalars().all()
|
||
|
||
if not recent:
|
||
continue
|
||
|
||
# 统计连续涨停天数
|
||
consecutive = 1 # 今天涨停
|
||
for q in recent[1:]:
|
||
if q.close / q.open >= 1.095: # 近似判断涨停
|
||
consecutive += 1
|
||
else:
|
||
break
|
||
|
||
if consecutive >= 2:
|
||
key = _cache_key(code, "consecutive")
|
||
if key in _event_cache:
|
||
continue
|
||
|
||
_event_cache[key] = True
|
||
events.append({
|
||
"code": code,
|
||
"name": stock.get("name", code),
|
||
"event_type": "consecutive",
|
||
"price": stock.get("price", 0),
|
||
"pct": pct,
|
||
"description": f"{consecutive}连板"
|
||
})
|
||
except Exception as e:
|
||
print(f"[consecutive] error: {e}")
|
||
|
||
return events
|
||
|
||
|
||
def detect_big_order(threshold: float = 1000000.0) -> List[Dict[str, Any]]:
|
||
"""大单异动检测(单笔超百万)。
|
||
|
||
注意:AkShare 免费接口无实时逐笔数据,此处返回空列表,可接入付费数据源。
|
||
|
||
Args:
|
||
threshold: 单笔金额阈值(元)
|
||
|
||
Returns:
|
||
检测到的异动列表
|
||
"""
|
||
# 需要付费数据源支持,暂不实现
|
||
return []
|
||
|
||
|
||
def scan_all() -> Dict[str, Any]:
|
||
"""执行全部异动扫描。
|
||
|
||
Returns:
|
||
扫描结果,包含各类异动事件
|
||
"""
|
||
if not _is_trading_time():
|
||
return {"ok": False, "msg": "非交易时间", "events": []}
|
||
|
||
all_events = []
|
||
|
||
# 执行各类检测
|
||
all_events.extend(detect_surge())
|
||
all_events.extend(detect_volume_break())
|
||
all_events.extend(detect_limit_open())
|
||
all_events.extend(detect_consecutive_limit())
|
||
all_events.extend(detect_big_order())
|
||
|
||
# 写入数据库
|
||
if all_events:
|
||
with get_session() as s:
|
||
for evt in all_events:
|
||
record = IntradayEvent(
|
||
code=evt["code"],
|
||
name=evt["name"],
|
||
event_type=evt["event_type"],
|
||
price=evt.get("price", 0),
|
||
pct=evt.get("pct", 0),
|
||
volume_ratio=evt.get("volume_ratio", 0),
|
||
amount=evt.get("amount", 0),
|
||
description=evt["description"]
|
||
)
|
||
s.add(record)
|
||
s.commit()
|
||
|
||
return {
|
||
"ok": True,
|
||
"count": len(all_events),
|
||
"events": all_events,
|
||
"scanned_at": dt.datetime.now().isoformat()
|
||
}
|
||
|
||
|
||
def get_recent_events(hours: int = 2, limit: int = 50) -> List[Dict[str, Any]]:
|
||
"""获取最近N小时的异动事件。
|
||
|
||
Args:
|
||
hours: 时间范围(小时)
|
||
limit: 最大返回数量
|
||
|
||
Returns:
|
||
异动事件列表
|
||
"""
|
||
since = dt.datetime.now() - dt.timedelta(hours=hours)
|
||
with get_session() as s:
|
||
rows = s.execute(
|
||
select(IntradayEvent)
|
||
.where(IntradayEvent.detected_at >= since)
|
||
.order_by(desc(IntradayEvent.detected_at))
|
||
.limit(limit)
|
||
).scalars().all()
|
||
|
||
return [{
|
||
"id": r.id,
|
||
"code": r.code,
|
||
"name": r.name,
|
||
"event_type": r.event_type,
|
||
"price": r.price,
|
||
"pct": r.pct,
|
||
"volume_ratio": r.volume_ratio,
|
||
"amount": r.amount,
|
||
"description": r.description,
|
||
"detected_at": r.detected_at.strftime("%H:%M:%S"),
|
||
"notified": r.notified
|
||
} for r in rows]
|
||
|
||
|
||
def notify_events(event_types: List[str] = None) -> Dict[str, Any]:
|
||
"""推送未通知的异动事件。
|
||
|
||
Args:
|
||
event_types: 需要推送的事件类型列表,None表示全部
|
||
|
||
Returns:
|
||
推送结果
|
||
"""
|
||
with get_session() as s:
|
||
stmt = select(IntradayEvent).where(IntradayEvent.notified.is_(False))
|
||
if event_types:
|
||
stmt = stmt.where(IntradayEvent.event_type.in_(event_types))
|
||
|
||
rows = s.execute(stmt.order_by(desc(IntradayEvent.detected_at)).limit(10)).scalars().all()
|
||
|
||
if not rows:
|
||
return {"ok": True, "count": 0, "msg": "无待推送事件"}
|
||
|
||
# 按事件类型分组
|
||
grouped = {}
|
||
for r in rows:
|
||
if r.event_type not in grouped:
|
||
grouped[r.event_type] = []
|
||
grouped[r.event_type].append(r)
|
||
|
||
# 构造推送消息
|
||
type_names = {
|
||
"surge": "快速拉升",
|
||
"volume_break": "放量突破",
|
||
"limit_open": "涨停打开",
|
||
"consecutive": "连板追踪",
|
||
"big_order": "大单异动"
|
||
}
|
||
|
||
msg_parts = ["【盘中异动雷达】\n"]
|
||
for etype, events in grouped.items():
|
||
msg_parts.append(f"\n{type_names.get(etype, etype)}:")
|
||
for e in events[:5]: # 每类最多5条
|
||
msg_parts.append(f"• {e.name}({e.code}) {e.description}")
|
||
|
||
msg = "\n".join(msg_parts)
|
||
|
||
# 推送
|
||
if notifier.any_enabled():
|
||
notifier.notify("盘中异动提醒", msg)
|
||
|
||
# 标记已推送
|
||
for r in rows:
|
||
r.notified = True
|
||
s.commit()
|
||
|
||
return {"ok": True, "count": len(rows), "msg": f"已推送 {len(rows)} 条异动"}
|
||
|
||
|
||
def get_statistics(date: dt.date = None) -> Dict[str, Any]:
|
||
"""获取异动统计数据。
|
||
|
||
Args:
|
||
date: 统计日期,None表示今天
|
||
|
||
Returns:
|
||
统计结果
|
||
"""
|
||
if date is None:
|
||
date = dt.date.today()
|
||
|
||
start = dt.datetime.combine(date, dt.time.min)
|
||
end = dt.datetime.combine(date, dt.time.max)
|
||
|
||
with get_session() as s:
|
||
# 按事件类型统计
|
||
stmt = (
|
||
select(IntradayEvent.event_type, func.count().label("count"))
|
||
.where(IntradayEvent.detected_at >= start, IntradayEvent.detected_at <= end)
|
||
.group_by(IntradayEvent.event_type)
|
||
)
|
||
rows = s.execute(stmt).all()
|
||
|
||
stats = {row.event_type: row.count for row in rows}
|
||
total = sum(stats.values())
|
||
|
||
# 最活跃股票
|
||
stmt = (
|
||
select(IntradayEvent.code, IntradayEvent.name, func.count().label("count"))
|
||
.where(IntradayEvent.detected_at >= start, IntradayEvent.detected_at <= end)
|
||
.group_by(IntradayEvent.code, IntradayEvent.name)
|
||
.order_by(desc("count"))
|
||
.limit(10)
|
||
)
|
||
top_stocks = s.execute(stmt).all()
|
||
|
||
return {
|
||
"date": date.isoformat(),
|
||
"total": total,
|
||
"by_type": stats,
|
||
"top_stocks": [{"code": r.code, "name": r.name, "count": r.count} for r in top_stocks]
|
||
} |