Files
stock_cursor_v0/backend/ai.py

233 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI 分析层:基于中台数据构造上下文,调用大模型生成点评;无 key 时规则降级。"""
from __future__ import annotations
import datetime as dt
from sqlalchemy import select, func, desc
import llm
import signals
import rag
from db import get_session
from models import (SectorDaily, FundFlowDaily, SentimentDaily, DragonTiger,
StockMetric, DailyQuote, Security)
DISCLAIMER = "(注:以上为基于数据的程序化分析,不构成投资建议,市场有风险。)"
# ============ 每日复盘点评 ============
def _daily_context(date=None):
with get_session() as s:
d = dt.date.fromisoformat(date) if date else s.execute(select(func.max(SectorDaily.date))).scalar()
if not d:
return None
secs = s.execute(select(SectorDaily).where(SectorDaily.date == d).order_by(SectorDaily.pct.desc())).scalars().all()
flows = s.execute(select(FundFlowDaily).where(FundFlowDaily.date == d).order_by(FundFlowDaily.net.desc())).scalars().all()
senti = s.execute(select(SentimentDaily).where(SentimentDaily.date == d)).scalar_one_or_none()
lhb = s.execute(select(DragonTiger).where(DragonTiger.date == d).order_by(DragonTiger.net.desc()).limit(6)).scalars().all()
return {
"date": d, "senti": senti,
"top": [(x.name, x.pct) for x in secs[:6]],
"bot": [(x.name, x.pct) for x in secs[-4:]],
"inflow": [(x.name, x.net) for x in flows[:6]],
"outflow": [(x.name, x.net) for x in flows[-4:][::-1]],
"lhb": [(x.name, x.net) for x in lhb],
}
def review_daily_comment(date=None):
ctx = _daily_context(date)
if not ctx:
return {"ok": False, "msg": "暂无入库数据,请先到数据中台入库"}
s = ctx["senti"]
senti_line = (f"上涨{s.up}家/下跌{s.down}家,涨停{s.limit_up}家/跌停{s.limit_down}" if s else "无情绪数据")
facts = (
f"日期:{ctx['date']}\n"
f"市场情绪:{senti_line}\n"
f"领涨板块:{', '.join(f'{n}({p:+.2f}%)' for n,p in ctx['top'])}\n"
f"领跌板块:{', '.join(f'{n}({p:+.2f}%)' for n,p in ctx['bot'])}\n"
f"主力净流入前列:{', '.join(f'{n}({v:+.1f}亿)' for n,v in ctx['inflow'])}\n"
f"主力净流出前列:{', '.join(f'{n}({v:+.1f}亿)' for n,v in ctx['outflow'])}\n"
f"龙虎榜净买额居前:{', '.join(f'{n}({v:+.2f}亿)' for n,v in ctx['lhb'])}\n"
)
if llm.enabled():
try:
prompt = ("请根据以下当日A股盘面数据撰写一篇 250 字以内的收盘复盘点评,"
"包含:① 今日市场情绪与赚钱效应判断;② 资金与题材主线;③ 明日需关注的方向与风险。"
"分段清晰。\n\n" + facts)
text = llm.ask(prompt, temperature=0.6)
return {"ok": True, "source": "llm", "date": ctx["date"].isoformat(), "facts": facts, "text": text}
except Exception as e:
pass
# 规则降级
tone = "情绪偏暖、赚钱效应尚可" if (s and s.up > s.down) else "情绪偏弱、需控制仓位"
text = (f"今日{tone}{senti_line}\n"
f"领涨主线集中在 {', '.join(n for n,_ in ctx['top'][:3])}"
f"主力资金净流入居前的是 {', '.join(n for n,_ in ctx['inflow'][:3])}"
f"{', '.join(n for n,_ in ctx['outflow'][:2])} 遭资金流出,需回避。\n"
f"明日关注领涨板块的持续性与量能配合,注意高位股退潮风险。\n{DISCLAIMER}")
return {"ok": True, "source": "rule", "date": ctx["date"].isoformat(), "facts": facts, "text": text}
# ============ 个股诊断 ============
def _stock_context(symbol):
with get_session() as s:
m = s.get(StockMetric, symbol)
sec = s.get(Security, symbol)
rows = s.execute(select(DailyQuote).where(DailyQuote.code == symbol)
.order_by(DailyQuote.date.desc()).limit(60)).scalars().all()
return m, sec, list(reversed(rows))
def _build_evidence(m, stats, news_tone):
"""构造证据链:每条含 维度/事实/方向/历史命中率。"""
def hr(key):
s = stats.get(key)
return (s["win_rate"], s["avg_ret"], s["samples"]) if s else (None, None, None)
ev = []
# 趋势
wr, ar, ns = hr("ma_bull")
ev.append({"dim": "趋势", "fact": f"均线{'多头排列(MA5>MA10>MA20)' if m.ma_bull else '未呈多头'}20日{m.ret20:+.1f}%",
"signal": "bull" if m.ma_bull else ("bear" if m.ret20 < -3 else "neutral"),
"win_rate": wr, "avg_ret": ar, "samples": ns, "weight": 1.2})
# 技术(MACD)
wr, ar, ns = hr("macd_gold")
ev.append({"dim": "技术", "fact": f"{'MACD金叉' if m.macd_gold else 'MACD未金叉'}RSI14={m.rsi14:.0f}"
+ ("(超买)" if m.rsi14 >= 80 else ("(超卖)" if m.rsi14 < 30 else "")),
"signal": "bull" if m.macd_gold else ("bear" if m.rsi14 >= 80 else "neutral"),
"win_rate": wr, "avg_ret": ar, "samples": ns, "weight": 1.0})
# 动量
wr, ar, ns = hr("up_streak3")
streak_sig = "bull" if m.up_streak >= 3 else ("bear" if m.ret5 < -5 else "neutral")
ev.append({"dim": "动量", "fact": f"连涨{m.up_streak}5日{m.ret5:+.1f}%",
"signal": streak_sig, "win_rate": wr if m.up_streak >= 3 else None,
"avg_ret": ar if m.up_streak >= 3 else None, "samples": ns if m.up_streak >= 3 else None, "weight": 1.0})
# 资金
wr, ar, ns = hr("vol_breakout")
vol_sig = "bull" if (m.vol_ratio > 2 and m.pct > 0) else ("bear" if (m.vol_ratio > 2 and m.pct < 0) else "neutral")
ev.append({"dim": "资金", "fact": f"量比{m.vol_ratio},当日{m.pct:+.2f}%,成交额{m.amount}亿",
"signal": vol_sig, "win_rate": wr if vol_sig != "neutral" else None,
"avg_ret": ar if vol_sig != "neutral" else None, "samples": ns if vol_sig != "neutral" else None, "weight": 1.1})
# 位置
key = "new_high60" if m.pos60 >= 0.99 else ("rsi_oversold" if m.rsi14 < 30 else None)
wr, ar, ns = hr(key) if key else (None, None, None)
pos_sig = "bear" if m.pos60 > 0.92 else ("bull" if m.pos60 < 0.2 else "neutral")
ev.append({"dim": "位置", "fact": f"处于60日 {m.pos60*100:.0f}% 分位" + ("(创新高)" if m.pos60 >= 0.99 else ("(低位)" if m.pos60 < 0.2 else "")),
"signal": pos_sig, "win_rate": wr, "avg_ret": ar, "samples": ns, "weight": 0.8})
# 消息面RAG
ev.append({"dim": "消息", "fact": f"近期资讯情绪:{news_tone}",
"signal": {"利好": "bull", "利空": "bear"}.get(news_tone, "neutral"),
"win_rate": None, "avg_ret": None, "samples": None, "weight": 0.9})
return ev
def _confidence_direction(ev):
sval = {"bull": 1, "bear": -1, "neutral": 0}
net = sum(sval[e["signal"]] * e["weight"] for e in ev)
tw = sum(e["weight"] for e in ev if e["signal"] != "neutral")
agreement = abs(net) / tw if tw else 0.0
conf = int(max(5, min(96, 35 + agreement * 55 + (8 if tw >= 3 else 0))))
thr = 0.15 * sum(e["weight"] for e in ev)
direction = "up" if net > thr else ("down" if net < -thr else "flat")
return conf, direction, round(net, 2)
def diagnose(symbol):
m, sec, rows = _stock_context(symbol)
if not m:
return {"ok": False, "msg": "该股票无因子数据,请先入库(数据中台→全市场回填或指定入库)"}
name = (sec.name if sec else m.name) or symbol
# 规则打分0~100
trend = 50 + (15 if m.ma_bull else -10) + min(20, max(-20, m.ret20))
momentum = 50 + min(25, max(-25, m.ret5)) + (10 if 50 < m.rsi14 < 75 else (-10 if m.rsi14 >= 80 else 0))
capital = 50 + min(30, (m.vol_ratio - 1) * 20) * (1 if m.pct > 0 else -1)
position = 100 - abs(m.pos60 * 100 - 45) # 价格分位适中得分高
clamp = lambda x: int(max(5, min(98, x)))
scores = {"趋势": clamp(trend), "动量": clamp(momentum), "资金": clamp(capital), "位置": clamp(position)}
total = clamp(sum(scores.values()) / 4)
# 证据链 + 历史命中率 + RAG 资讯
stats = signals.get_stats(horizon=5)
rctx = rag.stock_context(symbol, limit=5)
evidence = _build_evidence(m, stats, rctx["tone"])
confidence, direction, net = _confidence_direction(evidence)
# 留痕本次诊断写入预测N 日后核验
try:
signals.record_prediction(symbol, name, m.date, total, confidence, direction, m.close, horizon=5)
except Exception:
pass
facts = (
f"股票:{name}({symbol})\n"
f"最新价:{m.close},当日{m.pct:+.2f}%\n"
f"均线MA5={m.ma5} MA10={m.ma10} MA20={m.ma20} MA60={m.ma60}{'多头排列' if m.ma_bull else '非多头'}\n"
f"涨幅5日{m.ret5:+.2f}% / 20日{m.ret20:+.2f}% / 60日{m.ret60:+.2f}%\n"
f"量比:{m.vol_ratio}RSI14{m.rsi14}60日价格分位{m.pos60*100:.0f}%"
f"{'MACD金叉' if m.macd_gold else 'MACD未金叉'},连涨{m.up_streak}日,成交额{m.amount}亿\n"
)
hit_lines = "\n".join(
f"- {e['dim']}信号「{e['fact']}」历史5日上涨概率 {e['win_rate']}%、平均{e['avg_ret']:+.2f}%(样本{e['samples']}"
for e in evidence if e.get("win_rate") is not None)
base = {"ok": True, "symbol": symbol, "name": name, "scores": scores, "total": total,
"confidence": confidence, "direction": direction, "evidence": evidence,
"news": rctx["items"], "news_tone": rctx["tone"], "facts": facts}
if llm.enabled():
try:
prompt = ("请基于以下个股量化数据、信号历史命中率与相关资讯,输出结构化诊断:\n"
"1) 一句话结论含方向与置信判断2) 技术面/资金面/趋势面分别点评(引用历史命中率);"
"3) 操作建议含参考性关注位与止损思路4) 主要风险。200~320字。\n\n"
f"{facts}\n历史命中率:\n{hit_lines or '暂无回测数据请先在AI准确率页计算'}\n\n"
f"{rctx['block'] or '近期无相关资讯。'}\n")
text = llm.ask(prompt, temperature=0.4)
base.update({"source": "llm", "text": text})
return base
except Exception:
pass
judge = "偏强" if total >= 60 else ("偏弱" if total < 45 else "中性")
dir_cn = {"up": "看多", "down": "看空", "flat": "中性观望"}[direction]
text = (f"综合评分 {total}{judge}),方向{dir_cn},置信度 {confidence}%。"
f"趋势{'多头向上' if m.ma_bull else '尚未走强'}20日{m.ret20:+.1f}%"
f"动量 RSI {m.rsi14:.0f}{',超买防回调' if m.rsi14>=80 else ''}"
f"资金{'放量' if m.vol_ratio>=1.5 else '量能一般'}(量比{m.vol_ratio}"
f"价格处于60日 {m.pos60*100:.0f}% 分位;消息面{rctx['tone']}\n"
+ (f"依据信号历史表现:\n{hit_lines}\n" if hit_lines else "")
+ f"操作:{'回踩不破MA20可关注' if m.ma_bull else '等待站稳均线再观察'}跌破MA20或前低则减仓。"
f"风险:{'高位放量谨防见顶' if m.pos60>0.9 else '大盘系统性波动'}\n{DISCLAIMER}")
base.update({"source": "rule", "text": text})
return base
# ============ 今日策略 ============
def today_strategy():
with get_session() as s:
d = s.execute(select(func.max(SectorDaily.date))).scalar()
secs = s.execute(select(SectorDaily).where(SectorDaily.date == d).order_by(SectorDaily.pct.desc()).limit(5)).scalars().all() if d else []
flows = s.execute(select(FundFlowDaily).where(FundFlowDaily.date == d).order_by(FundFlowDaily.net.desc()).limit(5)).scalars().all() if d else []
strong = s.execute(select(StockMetric).where(StockMetric.up_streak >= 3).order_by(StockMetric.up_streak.desc()).limit(8)).scalars().all()
macd = s.execute(select(StockMetric).where(StockMetric.macd_gold.is_(True)).order_by(StockMetric.ret5.desc()).limit(8)).scalars().all()
facts = (
f"领涨板块:{', '.join(f'{x.name}({x.pct:+.2f}%)' for x in secs)}\n"
f"主力净流入板块:{', '.join(f'{x.name}({x.net:+.1f}亿)' for x in flows)}\n"
f"强势连涨个股:{', '.join(f'{x.name}({x.up_streak}连阳)' for x in strong)}\n"
f"MACD金叉个股{', '.join(x.name for x in macd)}\n"
)
if llm.enabled():
try:
prompt = ("请基于以下盘面数据,给出『今天/接下来怎么做』的策略观点:"
"① 当前资金与题材主线;② 值得关注的方向(给理由);③ 需要回避的风险。"
"220字以内。\n\n" + facts)
text = llm.ask(prompt, temperature=0.6)
return {"ok": True, "source": "llm", "facts": facts, "text": text}
except Exception:
pass
main = "".join(x.name for x in secs[:3]) or "暂无数据"
text = (f"当前资金主线集中在 {main}"
f"可重点跟踪净流入居前的 {', '.join(x.name for x in flows[:3])} 板块内的强势个股;"
f"强势连涨股需注意分歧与退潮节奏,避免追高。\n{DISCLAIMER}")
return {"ok": True, "source": "rule", "facts": facts, "text": text}