"""持仓与盈亏归因(移动加权平均成本法)。""" from __future__ import annotations from collections import defaultdict from sqlalchemy import select from db import get_session from models import Trade, StockMetric, DailyQuote, Security, IndexDaily def _current_prices(codes): px = {} if not codes: return px with get_session() as s: for m in s.execute(select(StockMetric).where(StockMetric.code.in_(codes))).scalars(): px[m.code] = m.close missing = [c for c in codes if c not in px] for c in missing: row = s.execute(select(DailyQuote.close).where(DailyQuote.code == c) .order_by(DailyQuote.date.desc()).limit(1)).scalar() if row: px[c] = float(row) return px def compute(): with get_session() as s: trades = s.execute(select(Trade).order_by(Trade.date, Trade.id)).scalars().all() pos = defaultdict(lambda: {"qty": 0, "cost": 0.0, "name": ""}) # 持仓 realized = defaultdict(float) # 各股已实现盈亏 by_reason = defaultdict(float) # 按理由归因(已实现) by_emotion = defaultdict(float) closed_sells = [] # 每笔卖出的盈亏(胜率用) for t in trades: p = pos[t.code] p["name"] = t.name or p["name"] if t.side == "buy": p["cost"] += t.price * t.qty + t.fee p["qty"] += t.qty else: # sell if p["qty"] <= 0: continue avg = p["cost"] / p["qty"] if p["qty"] else 0.0 qty = min(t.qty, p["qty"]) pnl = (t.price - avg) * qty - t.fee realized[t.code] += pnl by_reason[t.reason or "未标注"] += pnl by_emotion[t.emotion or "未标注"] += pnl closed_sells.append(pnl) p["cost"] -= avg * qty p["qty"] -= qty codes = [c for c, v in pos.items() if v["qty"] > 0] px = _current_prices(codes) holdings, mkt_val, hold_cost, unreal = [], 0.0, 0.0, 0.0 for c in codes: v = pos[c] avg = v["cost"] / v["qty"] if v["qty"] else 0.0 cur = px.get(c, avg) mv = cur * v["qty"] u = (cur - avg) * v["qty"] mkt_val += mv; hold_cost += v["cost"]; unreal += u holdings.append({"code": c, "name": v["name"], "qty": v["qty"], "avg_cost": round(avg, 3), "cur": round(cur, 3), "market_value": round(mv, 2), "unrealized": round(u, 2), "unrealized_pct": round((cur / avg - 1) * 100, 2) if avg else 0.0}) holdings.sort(key=lambda x: x["unrealized"], reverse=True) total_realized = sum(realized.values()) wins = sum(1 for x in closed_sells if x > 0) win_rate = round(wins / len(closed_sells) * 100, 1) if closed_sells else 0.0 # 盈亏归因:按个股(已实现+浮动) attr_codes = defaultdict(float) for c, r in realized.items(): attr_codes[c] += r for h in holdings: attr_codes[h["code"]] += h["unrealized"] name_map = {h["code"]: h["name"] for h in holdings} name_map.update({c: pos[c]["name"] for c in realized}) by_stock = sorted([{"code": c, "name": name_map.get(c, c), "pnl": round(v, 2)} for c, v in attr_codes.items()], key=lambda x: x["pnl"], reverse=True) return { "summary": { "market_value": round(mkt_val, 2), "hold_cost": round(hold_cost, 2), "unrealized": round(unreal, 2), "realized": round(total_realized, 2), "total_pnl": round(unreal + total_realized, 2), "positions": len(holdings), "closed_trades": len(closed_sells), "win_rate": win_rate, }, "holdings": holdings, "attribution": { "by_stock": by_stock, "by_reason": sorted([{"key": k, "pnl": round(v, 2)} for k, v in by_reason.items()], key=lambda x: x["pnl"], reverse=True), "by_emotion": sorted([{"key": k, "pnl": round(v, 2)} for k, v in by_emotion.items()], key=lambda x: x["pnl"], reverse=True), }, } def equity_curve(): """重建每日资金曲线:累计盈亏(已实现+浮动) + 投入本金净值,并对比沪深300。""" with get_session() as s: trades = s.execute(select(Trade).order_by(Trade.date, Trade.id)).scalars().all() if not trades: return {"ok": False, "msg": "暂无交易记录"} codes = list({t.code for t in trades}) # 各股日线 {code: {date: close}} px = defaultdict(dict) start = min(t.date for t in trades) for c in codes: for d, close in s.execute(select(DailyQuote.date, DailyQuote.close) .where(DailyQuote.code == c, DailyQuote.date >= start) .order_by(DailyQuote.date)).all(): px[c][d] = float(close) # 交易日序列(用沪深300的日期轴) idx = s.execute(select(IndexDaily.date, IndexDaily.close) .where(IndexDaily.code == "sh000300", IndexDaily.date >= start) .order_by(IndexDaily.date)).all() days = [d for d, _ in idx] or sorted({d for c in px for d in px[c]}) idx_map = {d: float(c) for d, c in idx} # 按日推进 from collections import defaultdict as dd pos = dd(lambda: {"qty": 0, "cost": 0.0}) realized = 0.0 ti = 0 last_px = {} dates, equity, invested_curve, bench = [], [], [], [] base_idx = None max_invested = 0.0 for d in days: # 应用当天及之前所有交易 while ti < len(trades) and trades[ti].date <= d: t = trades[ti]; p = pos[t.code] if t.side == "buy": p["cost"] += t.price * t.qty + t.fee; p["qty"] += t.qty else: if p["qty"] > 0: avg = p["cost"] / p["qty"]; qty = min(t.qty, p["qty"]) realized += (t.price - avg) * qty - t.fee p["cost"] -= avg * qty; p["qty"] -= qty ti += 1 unreal = 0.0 invested = 0.0 for c, p in pos.items(): if p["qty"] <= 0: continue close = px[c].get(d, last_px.get(c)) if close is None: continue last_px[c] = close avg = p["cost"] / p["qty"] unreal += (close - avg) * p["qty"] invested += p["cost"] max_invested = max(max_invested, invested, 1.0) dates.append(d.isoformat()) equity.append(round(realized + unreal, 2)) invested_curve.append(round(invested, 2)) if d in idx_map: base_idx = base_idx or idx_map[d] bench.append(round(idx_map[d] / base_idx, 4)) else: bench.append(bench[-1] if bench else 1.0) # 净值:以累计最大投入为基准 nav = [round(1 + e / max_invested, 4) for e in equity] return {"ok": True, "dates": dates, "equity": equity, "invested": invested_curve, "nav": nav, "bench": bench, "final_pnl": equity[-1] if equity else 0.0, "max_invested": round(max_invested, 2)}