"""ETL:从数据源抽取并增量入库。 每个 ingest_* 负责一类数据;run_daily_ingest 编排全部并写任务日志。 """ from __future__ import annotations import datetime as dt from sqlalchemy.dialects.postgresql import insert as pg_insert import akshare_service as svc import config from db import get_session from models import (DailyQuote, DragonTiger, FundFlowDaily, IndexDaily, JobRun, SectorDaily, Security, SentimentDaily, StockMetric) try: import akshare as ak except Exception: ak = None def _today(): return dt.date.today() def _upsert(session, model, rows, index_elements, update_cols): if not rows: return 0 stmt = pg_insert(model).values(rows) stmt = stmt.on_conflict_do_update( index_elements=index_elements, set_={c: getattr(stmt.excluded, c) for c in update_cols}, ) session.execute(stmt) return len(rows) # ---------------- 个股日线(保留真实日期) ---------------- def fetch_daily(code: str, days: int = 400): if ak is None: return [] sym = svc._sina_symbol(code) for src in ("sina", "tx"): try: df = ak.stock_zh_a_daily(symbol=sym, adjust="qfq") if src == "sina" else ak.stock_zh_a_hist_tx(symbol=sym) if df is None or df.empty: continue df = df.tail(days) out = [] for _, r in df.iterrows(): d = r["date"] d = d.date() if hasattr(d, "date") else dt.date.fromisoformat(str(d)[:10]) out.append({ "code": code, "date": d, "open": float(r["open"]), "high": float(r["high"]), "low": float(r["low"]), "close": float(r["close"]), "volume": int(r["volume"]) if "volume" in df.columns else 0, "amount": float(r["amount"]) if "amount" in df.columns else 0.0, }) return out except Exception: continue return [] # ---------------- 因子计算 ---------------- def _mean(a): return sum(a) / len(a) if a else 0.0 def _ema(arr, n): k = 2 / (n + 1) e = arr[0] out = [] for v in arr: e = v * k + e * (1 - k) out.append(e) return out def compute_metrics(code, name, rows): """rows: 按日期升序的日线 dict 列表。返回 StockMetric 字段 dict 或 None。""" if len(rows) < 25: return None closes = [r["close"] for r in rows] highs = [r["high"] for r in rows] lows = [r["low"] for r in rows] vols = [r["volume"] for r in rows] n = len(closes) last = closes[-1] prev = closes[-2] def ma(k): return round(_mean(closes[-k:]), 3) if n >= k else 0.0 def ret(k): return round((last / closes[-k - 1] - 1) * 100, 2) if n > k else 0.0 ma5, ma10, ma20, ma60 = ma(5), ma(10), ma(20), ma(60) vol_ratio = round(vols[-1] / _mean(vols[-6:-1]), 2) if n >= 6 and _mean(vols[-6:-1]) else 0.0 win = min(60, n) high60, low60 = max(highs[-win:]), min(lows[-win:]) pos60 = round((last - low60) / (high60 - low60), 3) if high60 > low60 else 0.5 # RSI14 gains, losses = [], [] for i in range(max(1, n - 14), n): d = closes[i] - closes[i - 1] gains.append(max(d, 0)); losses.append(max(-d, 0)) ag, al = _mean(gains), _mean(losses) rsi14 = round(100 - 100 / (1 + ag / al), 1) if al else (100.0 if ag else 50.0) # MACD 金叉 e12, e26 = _ema(closes, 12), _ema(closes, 26) dif = [a - b for a, b in zip(e12, e26)] dea = _ema(dif, 9) macd_gold = len(dif) >= 2 and dif[-2] < dea[-2] and dif[-1] >= dea[-1] ma_bull = ma5 > ma10 > ma20 > 0 streak = 0 for i in range(n - 1, 0, -1): if closes[i] > closes[i - 1]: streak += 1 else: break return { "code": code, "name": name, "date": rows[-1]["date"], "close": last, "pct": round((last / prev - 1) * 100, 2) if prev else 0.0, "ma5": ma5, "ma10": ma10, "ma20": ma20, "ma60": ma60, "vol_ratio": vol_ratio, "ret5": ret(5), "ret20": ret(20), "ret60": ret(60), "pos60": pos60, "rsi14": rsi14, "macd_gold": bool(macd_gold), "ma_bull": bool(ma_bull), "up_streak": streak, "amount": round(rows[-1]["amount"] / 1e8, 3), } def ingest_quotes(codes, days=400, with_metrics=True, cmap=None): if with_metrics and cmap is None: cmap = svc._code_name_map() n = 0 with get_session() as s: for code in codes: rows = fetch_daily(code, days) if not rows: continue n += _upsert(s, DailyQuote, rows, ["code", "date"], ["open", "high", "low", "close", "volume", "amount"]) if with_metrics: m = compute_metrics(code, (cmap or {}).get(code, code), rows) if m: _upsert(s, StockMetric, [m], ["code"], ["name", "date", "close", "pct", "ma5", "ma10", "ma20", "ma60", "vol_ratio", "ret5", "ret20", "ret60", "pos60", "rsi14", "macd_gold", "ma_bull", "up_streak", "amount"]) s.commit() return n def ingest_quotes_all(days=250, progress_every=300): """全市场回填:对所有证券抓取日线并计算因子。耗时较长。""" cmap = svc._code_name_map() codes = [c for c in cmap.keys() if c[:1] in ("0", "3", "6")] total = len(codes) done = 0 with get_session() as s: job = JobRun(job="ingest_all", status="running", message=f"0/{total}") s.add(job); s.commit(); job_id = job.id try: for i in range(0, total, 50): batch = codes[i:i + 50] ingest_quotes(batch, days=days, with_metrics=True, cmap=cmap) done += len(batch) if done % progress_every < 50: with get_session() as s: j = s.get(JobRun, job_id); j.message = f"{done}/{total}"; s.commit() status, msg = "success", f"{done}/{total}" except Exception as e: status, msg = "error", f"{done}/{total} | {repr(e)[:160]}" with get_session() as s: j = s.get(JobRun, job_id); j.status = status j.finished_at = dt.datetime.now(); j.message = msg; s.commit() return {"status": status, "done": done, "total": total} def ingest_securities(): cmap = svc._code_name_map() rows = [{"code": c, "name": n, "market": "A"} for c, n in cmap.items()] with get_session() as s: cnt = _upsert(s, Security, rows, ["code"], ["name", "market"]) s.commit() return cnt def ingest_indices(): if ak is None: return 0 n = 0 with get_session() as s: for code, (name, _b) in svc.MAJOR_INDEX.items(): try: df = ak.stock_zh_index_daily(symbol=code).tail(400) rows = [] for _, r in df.iterrows(): d = r["date"] d = d.date() if hasattr(d, "date") else dt.date.fromisoformat(str(d)[:10]) rows.append({"code": code, "name": name, "date": d, "open": float(r["open"]), "high": float(r["high"]), "low": float(r["low"]), "close": float(r["close"]), "volume": int(r.get("volume", 0) or 0)}) n += _upsert(s, IndexDaily, rows, ["code", "date"], ["name", "open", "high", "low", "close", "volume"]) s.commit() except Exception: continue return n def ingest_sectors(): data = svc.get_industry_boards() d = _today() rows = [{"date": d, "name": b["name"], "pct": b["pct"], "amount": b.get("amount", 0), "count": b.get("count", 0), "leader": b.get("leader", "")} for b in data["list"]] with get_session() as s: n = _upsert(s, SectorDaily, rows, ["date", "name"], ["pct", "amount", "count", "leader"]) s.commit() return n def ingest_fund_flow(): data = svc.get_fund_flow() d = _today() rows = [{"date": d, "name": x["name"], "net": x["net"], "pct": x.get("pct", 0)} for x in data["list"]] with get_session() as s: n = _upsert(s, FundFlowDaily, rows, ["date", "name"], ["net", "pct"]) s.commit() return n def ingest_sentiment(): x = svc.get_sentiment() d = _today() row = [{"date": d, "up": x["up"], "down": x["down"], "flat": x["flat"], "limit_up": x["limit_up"], "limit_down": x["limit_down"]}] with get_session() as s: n = _upsert(s, SentimentDaily, row, ["date"], ["up", "down", "flat", "limit_up", "limit_down"]) s.commit() return n def ingest_dragon(): data = svc.get_dragon_tiger() if not data["list"]: return 0 try: d = dt.date.fromisoformat(f"{data['date'][:4]}-{data['date'][4:6]}-{data['date'][6:8]}") if data.get("date") else _today() except Exception: d = _today() rows = [{"date": d, "code": x["code"], "name": x["name"], "pct": x["pct"], "net": x["net"], "reason": x["reason"][:120]} for x in data["list"]] with get_session() as s: n = _upsert(s, DragonTiger, rows, ["date", "code", "reason"], ["name", "pct", "net"]) s.commit() return n # ---------------- 编排 ---------------- def run_daily_ingest(universe=None, with_quotes=True): universe = universe or config.DEFAULT_UNIVERSE with get_session() as s: job = JobRun(job="daily_ingest", status="running", message="") s.add(job) s.commit() job_id = job.id summary = {} try: summary["securities"] = ingest_securities() summary["indices"] = ingest_indices() summary["sectors"] = ingest_sectors() summary["fund_flow"] = ingest_fund_flow() summary["sentiment"] = ingest_sentiment() summary["dragon"] = ingest_dragon() if with_quotes: summary["quotes"] = ingest_quotes(universe) status, msg = "success", str(summary) except Exception as e: status, msg = "error", f"{summary} | EXC {repr(e)[:200]}" with get_session() as s: job = s.get(JobRun, job_id) job.status = status job.finished_at = dt.datetime.now() job.message = msg s.commit() return {"status": status, "summary": summary}