Files
stock_cursor_v0/backend/portfolio.py

177 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""持仓与盈亏归因(移动加权平均成本法)。"""
from __future__ import annotations
from collections import defaultdict
from sqlalchemy import select
from db import get_session
from models import Trade, StockMetric, DailyQuote, Security, IndexDaily
def _current_prices(codes):
px = {}
if not codes:
return px
with get_session() as s:
for m in s.execute(select(StockMetric).where(StockMetric.code.in_(codes))).scalars():
px[m.code] = m.close
missing = [c for c in codes if c not in px]
for c in missing:
row = s.execute(select(DailyQuote.close).where(DailyQuote.code == c)
.order_by(DailyQuote.date.desc()).limit(1)).scalar()
if row:
px[c] = float(row)
return px
def compute():
with get_session() as s:
trades = s.execute(select(Trade).order_by(Trade.date, Trade.id)).scalars().all()
pos = defaultdict(lambda: {"qty": 0, "cost": 0.0, "name": ""}) # 持仓
realized = defaultdict(float) # 各股已实现盈亏
by_reason = defaultdict(float) # 按理由归因(已实现)
by_emotion = defaultdict(float)
closed_sells = [] # 每笔卖出的盈亏(胜率用)
for t in trades:
p = pos[t.code]
p["name"] = t.name or p["name"]
if t.side == "buy":
p["cost"] += t.price * t.qty + t.fee
p["qty"] += t.qty
else: # sell
if p["qty"] <= 0:
continue
avg = p["cost"] / p["qty"] if p["qty"] else 0.0
qty = min(t.qty, p["qty"])
pnl = (t.price - avg) * qty - t.fee
realized[t.code] += pnl
by_reason[t.reason or "未标注"] += pnl
by_emotion[t.emotion or "未标注"] += pnl
closed_sells.append(pnl)
p["cost"] -= avg * qty
p["qty"] -= qty
codes = [c for c, v in pos.items() if v["qty"] > 0]
px = _current_prices(codes)
holdings, mkt_val, hold_cost, unreal = [], 0.0, 0.0, 0.0
for c in codes:
v = pos[c]
avg = v["cost"] / v["qty"] if v["qty"] else 0.0
cur = px.get(c, avg)
mv = cur * v["qty"]
u = (cur - avg) * v["qty"]
mkt_val += mv; hold_cost += v["cost"]; unreal += u
holdings.append({"code": c, "name": v["name"], "qty": v["qty"],
"avg_cost": round(avg, 3), "cur": round(cur, 3),
"market_value": round(mv, 2), "unrealized": round(u, 2),
"unrealized_pct": round((cur / avg - 1) * 100, 2) if avg else 0.0})
holdings.sort(key=lambda x: x["unrealized"], reverse=True)
total_realized = sum(realized.values())
wins = sum(1 for x in closed_sells if x > 0)
win_rate = round(wins / len(closed_sells) * 100, 1) if closed_sells else 0.0
# 盈亏归因:按个股(已实现+浮动)
attr_codes = defaultdict(float)
for c, r in realized.items():
attr_codes[c] += r
for h in holdings:
attr_codes[h["code"]] += h["unrealized"]
name_map = {h["code"]: h["name"] for h in holdings}
name_map.update({c: pos[c]["name"] for c in realized})
by_stock = sorted([{"code": c, "name": name_map.get(c, c), "pnl": round(v, 2)}
for c, v in attr_codes.items()], key=lambda x: x["pnl"], reverse=True)
return {
"summary": {
"market_value": round(mkt_val, 2), "hold_cost": round(hold_cost, 2),
"unrealized": round(unreal, 2), "realized": round(total_realized, 2),
"total_pnl": round(unreal + total_realized, 2),
"positions": len(holdings), "closed_trades": len(closed_sells), "win_rate": win_rate,
},
"holdings": holdings,
"attribution": {
"by_stock": by_stock,
"by_reason": sorted([{"key": k, "pnl": round(v, 2)} for k, v in by_reason.items()], key=lambda x: x["pnl"], reverse=True),
"by_emotion": sorted([{"key": k, "pnl": round(v, 2)} for k, v in by_emotion.items()], key=lambda x: x["pnl"], reverse=True),
},
}
def equity_curve():
"""重建每日资金曲线:累计盈亏(已实现+浮动) + 投入本金净值并对比沪深300。"""
with get_session() as s:
trades = s.execute(select(Trade).order_by(Trade.date, Trade.id)).scalars().all()
if not trades:
return {"ok": False, "msg": "暂无交易记录"}
codes = list({t.code for t in trades})
# 各股日线 {code: {date: close}}
px = defaultdict(dict)
start = min(t.date for t in trades)
for c in codes:
for d, close in s.execute(select(DailyQuote.date, DailyQuote.close)
.where(DailyQuote.code == c, DailyQuote.date >= start)
.order_by(DailyQuote.date)).all():
px[c][d] = float(close)
# 交易日序列用沪深300的日期轴
idx = s.execute(select(IndexDaily.date, IndexDaily.close)
.where(IndexDaily.code == "sh000300", IndexDaily.date >= start)
.order_by(IndexDaily.date)).all()
days = [d for d, _ in idx] or sorted({d for c in px for d in px[c]})
idx_map = {d: float(c) for d, c in idx}
# 按日推进
from collections import defaultdict as dd
pos = dd(lambda: {"qty": 0, "cost": 0.0})
realized = 0.0
ti = 0
last_px = {}
dates, equity, invested_curve, bench = [], [], [], []
base_idx = None
max_invested = 0.0
for d in days:
# 应用当天及之前所有交易
while ti < len(trades) and trades[ti].date <= d:
t = trades[ti]; p = pos[t.code]
if t.side == "buy":
p["cost"] += t.price * t.qty + t.fee; p["qty"] += t.qty
else:
if p["qty"] > 0:
avg = p["cost"] / p["qty"]; qty = min(t.qty, p["qty"])
realized += (t.price - avg) * qty - t.fee
p["cost"] -= avg * qty; p["qty"] -= qty
ti += 1
unreal = 0.0
invested = 0.0
for c, p in pos.items():
if p["qty"] <= 0:
continue
close = px[c].get(d, last_px.get(c))
if close is None:
continue
last_px[c] = close
avg = p["cost"] / p["qty"]
unreal += (close - avg) * p["qty"]
invested += p["cost"]
max_invested = max(max_invested, invested, 1.0)
dates.append(d.isoformat())
equity.append(round(realized + unreal, 2))
invested_curve.append(round(invested, 2))
if d in idx_map:
base_idx = base_idx or idx_map[d]
bench.append(round(idx_map[d] / base_idx, 4))
else:
bench.append(bench[-1] if bench else 1.0)
# 净值:以累计最大投入为基准
nav = [round(1 + e / max_invested, 4) for e in equity]
return {"ok": True, "dates": dates, "equity": equity, "invested": invested_curve,
"nav": nav, "bench": bench,
"final_pnl": equity[-1] if equity else 0.0,
"max_invested": round(max_invested, 2)}