Files
stock_cursor_v0/backend/trade_calendar.py
2026-06-15 01:26:39 +08:00

339 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""交易日历与关键事件提醒"""
import datetime as dt
from typing import List, Optional, Dict
from sqlalchemy import select, and_
from db import get_session
from models import Trade, Security, CorporateEvent, AlertEvent
import akshare_service as svc
try:
import akshare as ak
AK_OK = True
except Exception:
ak = None
AK_OK = False
def get_upcoming_dividends(days_ahead: int = 30) -> Dict:
"""获取即将到来的除权除息日"""
today = dt.date.today()
end = today + dt.timedelta(days=days_ahead)
# 获取持仓股票代码
with get_session() as s:
trades = s.execute(select(Trade).order_by(Trade.date)).scalars().all()
# 计算当前持仓
pos = {}
for t in trades:
if t.code not in pos:
pos[t.code] = {"qty": 0, "name": t.name}
if t.side == "buy":
pos[t.code]["qty"] += t.qty
else:
pos[t.code]["qty"] = max(0, pos[t.code]["qty"] - t.qty)
holding_codes = [c for c, v in pos.items() if v["qty"] > 0]
events = []
if AK_OK and holding_codes:
try:
df = ak.stock_zh_a_dividend()
if df is not None and not df.empty:
for _, r in df.iterrows():
code = str(r.get("代码", ""))
if code not in holding_codes:
continue
ex_date_str = str(r.get("除权除息日", ""))
if not ex_date_str or ex_date_str == "nan":
continue
try:
ex_date = dt.date.fromisoformat(ex_date_str[:10])
if today <= ex_date <= end:
days_left = (ex_date - today).days
events.append({
"code": code,
"name": pos[code]["name"],
"event_type": "除权除息",
"event_date": ex_date.isoformat(),
"days_left": days_left,
"detail": f"送股: {r.get('送股', 0)}, 转增: {r.get('转增', 0)}, 派息: {r.get('派息', 0)}",
"is_holding": True,
"urgency": "high" if days_left <= 3 else "medium"
})
except Exception:
continue
except Exception:
pass
# 补充从数据库中获取的事件
with get_session() as s:
db_events = s.execute(
select(CorporateEvent).where(
and_(
CorporateEvent.event_type == "dividend",
CorporateEvent.event_date >= today,
CorporateEvent.event_date <= end
)
).order_by(CorporateEvent.event_date)
).scalars().all()
for e in db_events:
if any(ev["code"] == e.code for ev in events):
continue
days_left = (e.event_date - today).days
events.append({
"code": e.code,
"name": e.name,
"event_type": "除权除息",
"event_date": e.event_date.isoformat(),
"days_left": days_left,
"detail": e.description,
"is_holding": e.code in holding_codes,
"urgency": "high" if days_left <= 3 else "medium"
})
events.sort(key=lambda x: x["event_date"])
return {"ok": True, "events": events, "count": len(events)}
def get_unlock_calendar(days_ahead: int = 90) -> Dict:
"""获取限售解禁日历"""
today = dt.date.today()
end = today + dt.timedelta(days=days_ahead)
events = []
if AK_OK:
try:
df = ak.stock_restricted_release_summary_em()
if df is not None and not df.empty:
for _, r in df.head(50).iterrows():
date_str = str(r.get("解禁日期", ""))
if not date_str or date_str == "nan":
continue
try:
unlock_date = dt.date.fromisoformat(date_str[:10])
if today <= unlock_date <= end:
amount = float(r.get("解禁数量", 0) or 0)
market_val = float(r.get("解禁市值", 0) or 0)
events.append({
"code": str(r.get("代码", "")),
"name": str(r.get("名称", "")),
"event_type": "限售解禁",
"event_date": unlock_date.isoformat(),
"days_left": (unlock_date - today).days,
"detail": f"解禁市值: {round(market_val/1e8, 2)}亿",
"amount_billion": round(market_val / 1e8, 2),
"urgency": "high" if market_val >= 10e8 else "medium"
})
except Exception:
continue
except Exception:
pass
# 从数据库补充
with get_session() as s:
db_events = s.execute(
select(CorporateEvent).where(
and_(
CorporateEvent.event_type == "unlock",
CorporateEvent.event_date >= today,
CorporateEvent.event_date <= end
)
).order_by(CorporateEvent.event_date)
).scalars().all()
for e in db_events:
if any(ev["code"] == e.code for ev in events):
continue
events.append({
"code": e.code,
"name": e.name,
"event_type": "限售解禁",
"event_date": e.event_date.isoformat(),
"days_left": (e.event_date - today).days,
"detail": e.description,
"amount_billion": e.amount,
"urgency": "high" if e.amount >= 10 else "medium"
})
events.sort(key=lambda x: x["event_date"])
return {"ok": True, "events": events, "count": len(events)}
def get_earnings_calendar(days_ahead: int = 30, holding_only: bool = False) -> Dict:
"""获取财报披露日历"""
today = dt.date.today()
end = today + dt.timedelta(days=days_ahead)
# 获取持仓代码
holding_codes = set()
if holding_only:
with get_session() as s:
trades = s.execute(select(Trade).order_by(Trade.date)).scalars().all()
pos = {}
for t in trades:
if t.code not in pos:
pos[t.code] = 0
pos[t.code] += t.qty if t.side == "buy" else -t.qty
holding_codes = {c for c, q in pos.items() if q > 0}
events = []
if AK_OK:
try:
df = ak.stock_notice_report()
if df is not None and not df.empty:
for _, r in df.head(100).iterrows():
date_str = str(r.get("公告日期", ""))
code = str(r.get("代码", ""))
if not date_str or date_str == "nan":
continue
if holding_only and code not in holding_codes:
continue
try:
report_date = dt.date.fromisoformat(date_str[:10])
if today <= report_date <= end:
events.append({
"code": code,
"name": str(r.get("名称", "")),
"event_type": "财报披露",
"event_date": report_date.isoformat(),
"days_left": (report_date - today).days,
"report_type": str(r.get("公告类型", "")),
"is_holding": code in holding_codes,
"urgency": "high" if code in holding_codes else "low"
})
except Exception:
continue
except Exception:
pass
# 从数据库补充
with get_session() as s:
db_events = s.execute(
select(CorporateEvent).where(
and_(
CorporateEvent.event_type == "earnings",
CorporateEvent.event_date >= today,
CorporateEvent.event_date <= end
)
).order_by(CorporateEvent.event_date)
).scalars().all()
for e in db_events:
if any(ev["code"] == e.code for ev in events):
continue
if holding_only and e.code not in holding_codes:
continue
events.append({
"code": e.code,
"name": e.name,
"event_type": "财报披露",
"event_date": e.event_date.isoformat(),
"days_left": (e.event_date - today).days,
"report_type": e.title,
"is_holding": e.code in holding_codes,
"urgency": "high" if e.code in holding_codes else "low"
})
events.sort(key=lambda x: x["event_date"])
return {"ok": True, "events": events, "count": len(events)}
def get_all_upcoming_events(days_ahead: int = 30) -> Dict:
"""获取所有即将到来的关键事件(综合视图)"""
today = dt.date.today()
all_events = []
# 合并所有事件
for result in [
get_upcoming_dividends(days_ahead),
get_earnings_calendar(days_ahead),
get_unlock_calendar(days_ahead)
]:
all_events.extend(result.get("events", []))
# 按日期排序
all_events.sort(key=lambda x: x["event_date"])
# 按日期分组
grouped = {}
for event in all_events:
date = event["event_date"]
if date not in grouped:
grouped[date] = []
grouped[date].append(event)
# 生成日历视图
calendar = []
for date_str, events in sorted(grouped.items()):
date = dt.date.fromisoformat(date_str)
calendar.append({
"date": date_str,
"weekday": ["周一", "周二", "周三", "周四", "周五", "周六", "周日"][date.weekday()],
"days_left": (date - today).days,
"events": events,
"has_high_urgency": any(e["urgency"] == "high" for e in events),
"has_holding": any(e.get("is_holding", False) for e in events)
})
# 紧急事件3天内
urgent = [e for e in all_events if e.get("days_left", 99) <= 3]
return {
"ok": True,
"calendar": calendar,
"urgent": urgent,
"total": len(all_events),
"summary": {
"dividends": len([e for e in all_events if e["event_type"] == "除权除息"]),
"earnings": len([e for e in all_events if e["event_type"] == "财报披露"]),
"unlocks": len([e for e in all_events if e["event_type"] == "限售解禁"]),
"urgent": len(urgent)
}
}
def check_and_push_calendar_alerts() -> Dict:
"""检查并推送日历事件预警(定时任务调用)"""
try:
from notifier import notify
except Exception:
return {"ok": False, "msg": "推送模块不可用"}
result = get_all_upcoming_events(days_ahead=7)
urgent = result.get("urgent", [])
if not urgent:
return {"ok": True, "msg": "无紧急事件", "pushed": 0}
# 生成推送内容
lines = [f"📅 未来7天关键事件提醒{len(urgent)}条)\n"]
for event in urgent[:10]: # 最多推送10条
urgency_icon = "🔴" if event["urgency"] == "high" else "🟡"
holding_icon = "💰" if event.get("is_holding") else ""
lines.append(
f"{urgency_icon}{holding_icon} {event['event_date']} "
f"{event['name']}({event['code']}) "
f"{event['event_type']} "
f"({event['days_left']}天后)"
)
message = "\n".join(lines)
notify("【Blackdata】关键事件提醒", message)
# 写入站内通知
with get_session() as s:
for event in urgent[:10]:
alert = AlertEvent(
rule_id=0,
code=event["code"],
name=event["name"],
message=f"{event['event_type']}: {event.get('detail', '')} ({event['days_left']}天后)",
value=event.get("amount_billion", 0)
)
s.add(alert)
s.commit()
return {"ok": True, "pushed": len(urgent), "msg": f"已推送 {len(urgent)} 条事件提醒"}