Files
stock_cursor_v0/backend/limit_analysis.py
2026-06-15 01:26:39 +08:00

614 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""涨跌停分析 — 连板股追踪、炸板率统计、敢死队排行、炸板走势统计、涨停原因分类。"""
import datetime as dt
from typing import List, Dict, Any, Optional
from collections import defaultdict, Counter
import numpy as np
from sqlalchemy import select, and_, func, desc
from db import get_session
from models import DailyQuote, StockMetric, DragonTiger
try:
import akshare as ak
AK_OK = True
except Exception:
ak = None
AK_OK = False
# 涨停原因关键词分类
LIMIT_REASON_MAP = {
"题材": ["概念", "题材", "热点", "风口", "赛道"],
"业绩": ["业绩", "净利润", "营收", "盈利", "超预期", "预增", "扭亏"],
"政策": ["政策", "补贴", "利好", "支持", "规划", "国家", "工信部", "发改委"],
"技术突破": ["突破", "新高", "均线", "金叉", "放量"],
"重组并购": ["重组", "并购", "收购", "合并", "入股"],
"情绪": ["跟风", "连板", "情绪", "氛围", "涨停潮"],
}
def get_limit_stocks(date: Optional[dt.date] = None, limit_type: str = "up") -> Dict[str, Any]:
"""获取涨停/跌停股票
Args:
date: 日期None表示最新
limit_type: up涨停/down跌停
Returns:
涨跌停股票列表
"""
with get_session() as s:
if date is None:
date = s.execute(select(func.max(DailyQuote.date))).scalar()
if not date:
return {"ok": False, "msg": "暂无数据"}
# 查询当日股票
quotes = s.execute(
select(DailyQuote)
.where(DailyQuote.date == date)
).scalars().all()
if not quotes:
return {"ok": False, "msg": "暂无数据"}
# 筛选涨停/跌停股涨跌幅接近±10%
threshold = 9.8 # 考虑精度问题用9.8%作为阈值
results = []
for q in quotes:
if q.open == 0:
continue
pct = (float(q.close) - float(q.open)) / float(q.open) * 100
if limit_type == "up" and pct >= threshold:
results.append({
"code": q.code,
"name": q.name,
"close": float(q.close),
"pct": round(pct, 2),
"volume": float(q.volume),
"amount": float(q.amount)
})
elif limit_type == "down" and pct <= -threshold:
results.append({
"code": q.code,
"name": q.name,
"close": float(q.close),
"pct": round(pct, 2),
"volume": float(q.volume),
"amount": float(q.amount)
})
return {
"ok": True,
"date": date.isoformat(),
"type": limit_type,
"count": len(results),
"stocks": sorted(results, key=lambda x: x["pct"], reverse=(limit_type == "up"))
}
def track_consecutive_limits(days: int = 10) -> Dict[str, Any]:
"""连板股追踪器
Args:
days: 追踪天数
Returns:
连板股列表
"""
with get_session() as s:
latest_date = s.execute(select(func.max(DailyQuote.date))).scalar()
if not latest_date:
return {"ok": False, "msg": "暂无数据"}
start_date = latest_date - dt.timedelta(days=days)
# 获取期间所有股票的日线数据
quotes = s.execute(
select(DailyQuote)
.where(DailyQuote.date >= start_date)
.order_by(DailyQuote.code, DailyQuote.date)
).scalars().all()
# 按股票分组
stock_data = defaultdict(list)
for q in quotes:
stock_data[q.code].append(q)
# 统计连板
consecutive_limits = []
for code, data in stock_data.items():
if not data:
continue
# 倒序遍历,统计从最新日期开始的连续涨停天数
data_sorted = sorted(data, key=lambda x: x.date, reverse=True)
consecutive_days = 0
for q in data_sorted:
if q.open == 0:
break
pct = (float(q.close) - float(q.open)) / float(q.open) * 100
if pct >= 9.8: # 涨停
consecutive_days += 1
else:
break
if consecutive_days >= 2: # 至少2连板
latest = data_sorted[0]
consecutive_limits.append({
"code": code,
"name": latest.name,
"consecutive_days": consecutive_days,
"close": float(latest.close),
"amount": float(latest.amount),
"status": f"{consecutive_days}连板"
})
# 按连板天数排序
consecutive_limits.sort(key=lambda x: x["consecutive_days"], reverse=True)
return {
"ok": True,
"date": latest_date.isoformat(),
"days": days,
"count": len(consecutive_limits),
"stocks": consecutive_limits
}
def analyze_limit_break_rate(days: int = 60) -> Dict[str, Any]:
"""炸板率统计
分析涨停后次日的表现(继续涨停/上涨/下跌/跌停)
Args:
days: 统计天数
Returns:
炸板率统计
"""
with get_session() as s:
latest_date = s.execute(select(func.max(DailyQuote.date))).scalar()
if not latest_date:
return {"ok": False, "msg": "暂无数据"}
start_date = latest_date - dt.timedelta(days=days)
# 获取期间所有股票的日线数据
quotes = s.execute(
select(DailyQuote)
.where(DailyQuote.date >= start_date)
.order_by(DailyQuote.code, DailyQuote.date)
).scalars().all()
# 按股票和日期组织数据
stock_data = defaultdict(dict)
for q in quotes:
stock_data[q.code][q.date] = q
# 统计涨停后次日表现
next_day_stats = {
"limit_up": 0, # 继续涨停
"up": 0, # 上涨但未涨停
"down": 0, # 下跌但未跌停
"limit_down": 0, # 跌停
}
stock_break_rates = {} # 个股炸板率
for code, data in stock_data.items():
dates = sorted(data.keys())
stock_limits = 0
stock_breaks = 0
for i in range(len(dates) - 1):
today = dates[i]
tomorrow = dates[i + 1]
today_q = data[today]
tomorrow_q = data[tomorrow]
# 判断今日是否涨停
if today_q.open == 0:
continue
today_pct = (float(today_q.close) - float(today_q.open)) / float(today_q.open) * 100
if today_pct >= 9.8: # 今日涨停
stock_limits += 1
# 判断次日表现
if tomorrow_q.open == 0:
continue
tomorrow_pct = (float(tomorrow_q.close) - float(tomorrow_q.open)) / float(tomorrow_q.open) * 100
if tomorrow_pct >= 9.8:
next_day_stats["limit_up"] += 1
elif tomorrow_pct > 0:
next_day_stats["up"] += 1
stock_breaks += 1 # 炸板
elif tomorrow_pct > -9.8:
next_day_stats["down"] += 1
stock_breaks += 1 # 炸板
else:
next_day_stats["limit_down"] += 1
stock_breaks += 1 # 炸板
# 计算个股炸板率
if stock_limits > 0:
break_rate = stock_breaks / stock_limits * 100
if stock_limits >= 3: # 至少3次涨停才有统计意义
stock_break_rates[code] = {
"name": list(data.values())[0].name,
"limits": stock_limits,
"breaks": stock_breaks,
"break_rate": round(break_rate, 2)
}
total = sum(next_day_stats.values())
if total == 0:
return {"ok": False, "msg": "统计样本不足"}
# 计算比例
stats_with_pct = {
"limit_up": {
"count": next_day_stats["limit_up"],
"pct": round(next_day_stats["limit_up"] / total * 100, 2)
},
"up": {
"count": next_day_stats["up"],
"pct": round(next_day_stats["up"] / total * 100, 2)
},
"down": {
"count": next_day_stats["down"],
"pct": round(next_day_stats["down"] / total * 100, 2)
},
"limit_down": {
"count": next_day_stats["limit_down"],
"pct": round(next_day_stats["limit_down"] / total * 100, 2)
}
}
# 炸板率 = (上涨未涨停 + 下跌 + 跌停) / 总数
break_rate = (next_day_stats["up"] + next_day_stats["down"] + next_day_stats["limit_down"]) / total * 100
# 个股炸板率排行(从高到低)
stock_rankings = sorted(
[(code, data) for code, data in stock_break_rates.items()],
key=lambda x: x[1]["break_rate"],
reverse=True
)[:30]
return {
"ok": True,
"days": days,
"total_samples": total,
"overall_break_rate": round(break_rate, 2),
"next_day_stats": stats_with_pct,
"stock_rankings": [{
"code": code,
"name": data["name"],
"limits": data["limits"],
"breaks": data["breaks"],
"break_rate": data["break_rate"]
} for code, data in stock_rankings]
}
def get_consecutive_calendar(days: int = 60) -> Dict[str, Any]:
"""连板日历:记录每只股票的连板历史,分析几进几出规律"""
with get_session() as s:
latest_date = s.execute(select(func.max(DailyQuote.date))).scalar()
if not latest_date:
return {"ok": False, "msg": "暂无数据"}
start_date = latest_date - dt.timedelta(days=days)
quotes = s.execute(
select(DailyQuote)
.where(DailyQuote.date >= start_date)
.order_by(DailyQuote.code, DailyQuote.date)
).scalars().all()
stock_data = defaultdict(list)
for q in quotes:
stock_data[q.code].append(q)
all_streaks = []
current_streaks = []
for code, data in stock_data.items():
data_sorted = sorted(data, key=lambda x: x.date)
name = data_sorted[-1].name
streaks = []
current = []
for q in data_sorted:
if q.open == 0:
if len(current) >= 2:
streaks.append(current)
current = []
continue
pct = (float(q.close) - float(q.open)) / float(q.open) * 100
if pct >= 9.8:
current.append(q)
else:
if len(current) >= 2:
streaks.append(current)
current = []
if len(current) >= 2:
current_streaks.append({
"code": code, "name": name,
"days": len(current),
"start_date": current[0].date.isoformat(),
"latest_date": current[-1].date.isoformat(),
"latest_close": float(current[-1].close)
})
for streak in streaks:
all_streaks.append({
"code": code, "name": name,
"days": len(streak),
"start_date": streak[0].date.isoformat(),
"end_date": streak[-1].date.isoformat()
})
distribution = defaultdict(int)
for item in all_streaks:
distribution[f"{item['days']}"] += 1
current_streaks.sort(key=lambda x: x["days"], reverse=True)
return {
"ok": True,
"date_range": f"{start_date.isoformat()} ~ {latest_date.isoformat()}",
"current_streaks": current_streaks[:30],
"streak_distribution": dict(distribution),
"total_streaks": len(all_streaks)
}
def analyze_post_break_performance(days: int = 90) -> Dict[str, Any]:
"""炸板后走势统计:炸板后 1/3/5 日表现概率分布"""
with get_session() as s:
latest_date = s.execute(select(func.max(DailyQuote.date))).scalar()
if not latest_date:
return {"ok": False, "msg": "暂无数据"}
start_date = latest_date - dt.timedelta(days=days + 10)
quotes = s.execute(
select(DailyQuote)
.where(DailyQuote.date >= start_date)
.order_by(DailyQuote.code, DailyQuote.date)
).scalars().all()
stock_data = defaultdict(dict)
for q in quotes:
stock_data[q.code][q.date] = q
# 炸板 = 当日一度涨停但收盘时未涨停(用开高低收近似判断)
# 简化:前一日涨停,次日收盘未涨停视为炸板
perf_1d, perf_3d, perf_5d = [], [], []
for code, date_data in stock_data.items():
dates = sorted(date_data.keys())
for i in range(len(dates) - 5):
today = dates[i]
q_today = date_data[today]
if q_today.open == 0:
continue
pct_today = (float(q_today.close) - float(q_today.open)) / float(q_today.open) * 100
# 判断昨日涨停今日炸板(开高但收盘低)
if i == 0:
continue
yesterday = dates[i - 1]
q_yest = date_data[yesterday]
if q_yest.open == 0:
continue
pct_yest = (float(q_yest.close) - float(q_yest.open)) / float(q_yest.open) * 100
# 昨日涨停,今日未涨停(炸板)
if pct_yest >= 9.8 and pct_today < 9.8:
base = float(q_today.close)
# 后续 1/3/5 日表现
for horizon, perf_list in [(1, perf_1d), (3, perf_3d), (5, perf_5d)]:
if i + horizon < len(dates):
future = dates[i + horizon]
q_future = date_data[future]
ret = (float(q_future.close) - base) / base * 100
perf_list.append(round(ret, 2))
def summarize(perfs):
if not perfs:
return {}
arr = np.array(perfs)
return {
"samples": len(perfs),
"avg_ret": round(float(arr.mean()), 2),
"win_rate": round(float((arr > 0).mean() * 100), 1),
"p25": round(float(np.percentile(arr, 25)), 2),
"median": round(float(np.median(arr)), 2),
"p75": round(float(np.percentile(arr, 75)), 2),
}
return {
"ok": True,
"days": days,
"after_1d": summarize(perf_1d),
"after_3d": summarize(perf_3d),
"after_5d": summarize(perf_5d),
"conclusion": (
f"炸板后样本 {len(perf_1d)} 条,"
f"次日平均收益 {summarize(perf_1d).get('avg_ret', 0)}%"
f"次日上涨概率 {summarize(perf_1d).get('win_rate', 0)}%"
) if perf_1d else "样本不足"
}
def classify_limit_reasons(date: Optional[dt.date] = None) -> Dict[str, Any]:
"""涨停原因分类:情绪、题材、业绩、技术突破等"""
with get_session() as s:
if date is None:
date = s.execute(select(func.max(DragonTiger.date))).scalar()
if not date:
return {"ok": False, "msg": "暂无龙虎榜数据,请先入库"}
lhb_rows = s.execute(
select(DragonTiger).where(DragonTiger.date == date)
).scalars().all()
# 尝试从 AkShare 获取当日涨停原因
reason_data = {}
if AK_OK:
try:
df = ak.stock_zt_pool_em(date=date.strftime("%Y%m%d"))
if df is not None and not df.empty:
for _, r in df.iterrows():
code = str(r.get("代码", ""))
reason = str(r.get("涨停原因类别", "") or r.get("上榜原因", ""))
reason_data[code] = reason
except Exception:
pass
# 合并龙虎榜原因
for row in lhb_rows:
if row.code not in reason_data:
reason_data[row.code] = row.reason
# 分类
classified = defaultdict(list)
for code, reason in reason_data.items():
matched = False
for category, keywords in LIMIT_REASON_MAP.items():
if any(kw in reason for kw in keywords):
classified[category].append({"code": code, "reason": reason})
matched = True
break
if not matched:
classified["其他"].append({"code": code, "reason": reason})
total = sum(len(v) for v in classified.values())
summary = [
{
"category": cat,
"count": len(items),
"pct": round(len(items) / total * 100, 1) if total > 0 else 0,
"stocks": items[:10]
}
for cat, items in sorted(classified.items(), key=lambda x: len(x[1]), reverse=True)
]
return {
"ok": True,
"date": date.isoformat(),
"total": total,
"categories": summary
}
def get_limit_squad_rankings(days: int = 30, min_limits: int = 5) -> Dict[str, Any]:
"""涨停敢死队排行
统计期间内涨停次数最多的股票(俗称"妖股"
Args:
days: 统计天数
min_limits: 最少涨停次数
Returns:
敢死队排行
"""
with get_session() as s:
latest_date = s.execute(select(func.max(DailyQuote.date))).scalar()
if not latest_date:
return {"ok": False, "msg": "暂无数据"}
start_date = latest_date - dt.timedelta(days=days)
# 获取期间所有股票的日线数据
quotes = s.execute(
select(DailyQuote)
.where(DailyQuote.date >= start_date)
.order_by(DailyQuote.code, DailyQuote.date)
).scalars().all()
# 统计每只股票的涨停次数
limit_counts = defaultdict(lambda: {
"name": "",
"count": 0,
"dates": [],
"total_days": 0,
"max_consecutive": 0
})
# 按股票分组
stock_data = defaultdict(list)
for q in quotes:
stock_data[q.code].append(q)
for code, data in stock_data.items():
if not data:
continue
data_sorted = sorted(data, key=lambda x: x.date)
limit_days = []
for q in data_sorted:
if q.open == 0:
continue
pct = (float(q.close) - float(q.open)) / float(q.open) * 100
if pct >= 9.8: # 涨停
limit_days.append(q.date)
if len(limit_days) >= min_limits:
# 计算最大连板数
max_consecutive = 1
current_consecutive = 1
for i in range(1, len(limit_days)):
if (limit_days[i] - limit_days[i-1]).days == 1:
current_consecutive += 1
max_consecutive = max(max_consecutive, current_consecutive)
else:
current_consecutive = 1
limit_counts[code] = {
"name": data_sorted[0].name,
"count": len(limit_days),
"dates": [d.isoformat() for d in limit_days],
"total_days": len(data_sorted),
"max_consecutive": max_consecutive,
"frequency": round(len(limit_days) / len(data_sorted) * 100, 2)
}
# 排序
rankings = sorted(
[(code, data) for code, data in limit_counts.items()],
key=lambda x: (x[1]["count"], x[1]["max_consecutive"]),
reverse=True
)[:50]
return {
"ok": True,
"days": days,
"start_date": start_date.isoformat(),
"end_date": latest_date.isoformat(),
"count": len(rankings),
"rankings": [{
"code": code,
"name": data["name"],
"limit_count": data["count"],
"max_consecutive": data["max_consecutive"],
"frequency": data["frequency"],
"dates": data["dates"]
} for code, data in rankings]
}