Files
stock_cursor_v0/backend/report.py
2026-06-14 11:54:45 +08:00

169 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI 自动复盘日报汇总当日盘面生成图文日报markdown落库并可推送。
数据全部来自中台sector_daily / fund_flow_daily / sentiment_daily / dragon_tiger / stock_metrics
AI 点评与明日策略复用 ai.py有 key 走大模型,无 key 规则降级)。
"""
from __future__ import annotations
import datetime as dt
from sqlalchemy import select, func
import ai
import llm
import notifier
from db import get_session
from models import (SectorDaily, FundFlowDaily, SentimentDaily, DragonTiger,
StockMetric, DailyReport)
def _gather(date=None):
with get_session() as s:
d = (dt.date.fromisoformat(date) if date
else s.execute(select(func.max(SectorDaily.date))).scalar())
if not d:
return None
secs = s.execute(select(SectorDaily).where(SectorDaily.date == d)
.order_by(SectorDaily.pct.desc())).scalars().all()
flows = s.execute(select(FundFlowDaily).where(FundFlowDaily.date == d)
.order_by(FundFlowDaily.net.desc())).scalars().all()
senti = s.execute(select(SentimentDaily).where(SentimentDaily.date == d)).scalar_one_or_none()
lhb = s.execute(select(DragonTiger).where(DragonTiger.date == d)
.order_by(DragonTiger.net.desc()).limit(6)).scalars().all()
mdate = s.execute(select(func.max(StockMetric.date))).scalar()
gainers = streak = []
if mdate:
gainers = s.execute(select(StockMetric).where(StockMetric.date == mdate)
.order_by(StockMetric.pct.desc()).limit(8)).scalars().all()
streak = s.execute(select(StockMetric).where(StockMetric.date == mdate, StockMetric.up_streak >= 3)
.order_by(StockMetric.up_streak.desc(), StockMetric.ret5.desc()).limit(8)).scalars().all()
return {
"date": d, "senti": senti, "secs": secs, "flows": flows, "lhb": lhb,
"gainers": [(x.name, x.code, x.pct) for x in gainers],
"streak": [(x.name, x.up_streak, x.ret5) for x in streak],
}
def _build_markdown(g):
d = g["date"]
s = g["senti"]
secs, flows, lhb = g["secs"], g["flows"], g["lhb"]
parts = [f"# {d} 收盘复盘日报\n"]
# 一、市场温度
if s:
ratio = (s.up / max(1, s.up + s.down)) * 100
mood = "赚钱效应较好" if s.up > s.down * 1.5 else ("情绪偏弱" if s.down > s.up else "多空分歧")
parts.append("## 一、市场温度")
parts.append(f"- 涨跌家数:上涨 **{s.up}** / 下跌 **{s.down}** / 平盘 {s.flat}(上涨占比 {ratio:.0f}%")
parts.append(f"- 涨停 **{s.limit_up}** 家 / 跌停 **{s.limit_down}** 家")
parts.append(f"- 综合判断:{mood}\n")
# 二、板块表现
if secs:
top = secs[:6]
bot = secs[-4:]
parts.append("## 二、板块表现")
parts.append("- 领涨:" + "".join(f"{x.name}({x.pct:+.2f}%)" for x in top))
parts.append("- 领跌:" + "".join(f"{x.name}({x.pct:+.2f}%)" for x in bot) + "\n")
# 三、资金流向
if flows:
parts.append("## 三、资金流向(主力净额)")
parts.append("- 净流入前列:" + "".join(f"{x.name}({x.net:+.1f}亿)" for x in flows[:6]))
parts.append("- 净流出前列:" + "".join(f"{x.name}({x.net:+.1f}亿)" for x in flows[-4:][::-1]) + "\n")
# 四、强势梯队
if g["streak"]:
parts.append("## 四、强势梯队(连涨)")
parts.append("".join(f"{n}{k}连阳, 5日{r:+.1f}%" for n, k, r in g["streak"]) + "\n")
# 五、龙虎榜焦点
if lhb:
parts.append("## 五、龙虎榜焦点(净买额)")
parts.append("".join(f"{n}({v:+.2f}亿)" for n, v in [(x.name, x.net) for x in lhb]) + "\n")
return "\n".join(parts)
def generate(date=None, push=False):
"""生成(或重生成)某日复盘日报。默认最新交易日。"""
g = _gather(date)
if not g:
return {"ok": False, "msg": "暂无入库数据,请先到数据中台入库"}
d = g["date"]
body = _build_markdown(g)
# AI 综合点评 + 明日策略(复用 ai.py
rv = ai.review_daily_comment(d.isoformat())
st = ai.today_strategy()
source = "llm" if (rv.get("source") == "llm" or st.get("source") == "llm") else "rule"
body += "\n## 六、AI 综合点评\n" + (rv.get("text", "") if rv.get("ok") else "")
body += "\n\n## 七、明日策略\n" + (st.get("text", "") if st.get("ok") else "")
title = f"{d} 收盘复盘日报"
with get_session() as s:
row = s.get(DailyReport, d)
if row:
row.source, row.title, row.content = source, title, body
else:
row = DailyReport(date=d, source=source, title=title, content=body)
s.add(row)
s.commit()
result = {"ok": True, "date": d.isoformat(), "source": source, "title": title, "content": body}
if push and notifier.any_enabled():
try:
# 推送精简版(情绪 + 领涨 + AI 点评首段)
brief = _push_brief(g, rv)
res = notifier.notify("【Blackdata】" + title, brief)
with get_session() as s:
r2 = s.get(DailyReport, d)
if r2:
r2.pushed = True
s.commit()
result["push"] = res
except Exception as e:
result["push"] = {"error": repr(e)[:120]}
return result
def _push_brief(g, rv):
s = g["senti"]
lines = [f"{g['date']} 收盘复盘"]
if s:
lines.append(f"{s.up}/跌{s.down},涨停{s.limit_up}/跌停{s.limit_down}")
if g["secs"]:
lines.append("领涨:" + "".join(f"{x.name}({x.pct:+.1f}%)" for x in g["secs"][:4]))
if g["flows"]:
lines.append("吸金:" + "".join(x.name for x in g["flows"][:4]))
txt = (rv.get("text") or "").strip().split("\n")[0]
if txt:
lines.append("点评:" + txt[:80])
return "\n".join(lines)
def latest():
with get_session() as s:
row = s.execute(select(DailyReport).order_by(DailyReport.date.desc())).scalars().first()
if not row:
return {"ok": False, "msg": "暂无日报,请先生成"}
return {"ok": True, "date": row.date.isoformat(), "source": row.source,
"title": row.title, "content": row.content, "pushed": row.pushed}
def get_by_date(date):
with get_session() as s:
row = s.get(DailyReport, dt.date.fromisoformat(date))
if not row:
return {"ok": False, "msg": "该日无日报"}
return {"ok": True, "date": row.date.isoformat(), "source": row.source,
"title": row.title, "content": row.content, "pushed": row.pushed}
def history(limit=30):
with get_session() as s:
rows = s.execute(select(DailyReport).order_by(DailyReport.date.desc()).limit(limit)).scalars().all()
return {"list": [{"date": r.date.isoformat(), "source": r.source,
"title": r.title, "pushed": r.pushed} for r in rows]}