"""涨跌停分析 — 连板股追踪、炸板率统计、敢死队排行。 功能: 1. 连板股追踪器 2. 炸板率统计 3. 涨停敢死队排行 """ import datetime as dt from typing import List, Dict, Any, Optional from collections import defaultdict, Counter import numpy as np from sqlalchemy import select, and_, func, desc from db import get_session from models import DailyQuote, StockMetric def get_limit_stocks(date: Optional[dt.date] = None, limit_type: str = "up") -> Dict[str, Any]: """获取涨停/跌停股票 Args: date: 日期(None表示最新) limit_type: up涨停/down跌停 Returns: 涨跌停股票列表 """ with get_session() as s: if date is None: date = s.execute(select(func.max(DailyQuote.date))).scalar() if not date: return {"ok": False, "msg": "暂无数据"} # 查询当日股票 quotes = s.execute( select(DailyQuote) .where(DailyQuote.date == date) ).scalars().all() if not quotes: return {"ok": False, "msg": "暂无数据"} # 筛选涨停/跌停股(涨跌幅接近±10%) threshold = 9.8 # 考虑精度问题,用9.8%作为阈值 results = [] for q in quotes: if q.open == 0: continue pct = (float(q.close) - float(q.open)) / float(q.open) * 100 if limit_type == "up" and pct >= threshold: results.append({ "code": q.code, "name": q.name, "close": float(q.close), "pct": round(pct, 2), "volume": float(q.volume), "amount": float(q.amount) }) elif limit_type == "down" and pct <= -threshold: results.append({ "code": q.code, "name": q.name, "close": float(q.close), "pct": round(pct, 2), "volume": float(q.volume), "amount": float(q.amount) }) return { "ok": True, "date": date.isoformat(), "type": limit_type, "count": len(results), "stocks": sorted(results, key=lambda x: x["pct"], reverse=(limit_type == "up")) } def track_consecutive_limits(days: int = 10) -> Dict[str, Any]: """连板股追踪器 Args: days: 追踪天数 Returns: 连板股列表 """ with get_session() as s: latest_date = s.execute(select(func.max(DailyQuote.date))).scalar() if not latest_date: return {"ok": False, "msg": "暂无数据"} start_date = latest_date - dt.timedelta(days=days) # 获取期间所有股票的日线数据 quotes = s.execute( select(DailyQuote) .where(DailyQuote.date >= start_date) .order_by(DailyQuote.code, DailyQuote.date) ).scalars().all() # 按股票分组 stock_data = defaultdict(list) for q in quotes: stock_data[q.code].append(q) # 统计连板 consecutive_limits = [] for code, data in stock_data.items(): if not data: continue # 倒序遍历,统计从最新日期开始的连续涨停天数 data_sorted = sorted(data, key=lambda x: x.date, reverse=True) consecutive_days = 0 for q in data_sorted: if q.open == 0: break pct = (float(q.close) - float(q.open)) / float(q.open) * 100 if pct >= 9.8: # 涨停 consecutive_days += 1 else: break if consecutive_days >= 2: # 至少2连板 latest = data_sorted[0] consecutive_limits.append({ "code": code, "name": latest.name, "consecutive_days": consecutive_days, "close": float(latest.close), "amount": float(latest.amount), "status": f"{consecutive_days}连板" }) # 按连板天数排序 consecutive_limits.sort(key=lambda x: x["consecutive_days"], reverse=True) return { "ok": True, "date": latest_date.isoformat(), "days": days, "count": len(consecutive_limits), "stocks": consecutive_limits } def analyze_limit_break_rate(days: int = 60) -> Dict[str, Any]: """炸板率统计 分析涨停后次日的表现(继续涨停/上涨/下跌/跌停) Args: days: 统计天数 Returns: 炸板率统计 """ with get_session() as s: latest_date = s.execute(select(func.max(DailyQuote.date))).scalar() if not latest_date: return {"ok": False, "msg": "暂无数据"} start_date = latest_date - dt.timedelta(days=days) # 获取期间所有股票的日线数据 quotes = s.execute( select(DailyQuote) .where(DailyQuote.date >= start_date) .order_by(DailyQuote.code, DailyQuote.date) ).scalars().all() # 按股票和日期组织数据 stock_data = defaultdict(dict) for q in quotes: stock_data[q.code][q.date] = q # 统计涨停后次日表现 next_day_stats = { "limit_up": 0, # 继续涨停 "up": 0, # 上涨但未涨停 "down": 0, # 下跌但未跌停 "limit_down": 0, # 跌停 } stock_break_rates = {} # 个股炸板率 for code, data in stock_data.items(): dates = sorted(data.keys()) stock_limits = 0 stock_breaks = 0 for i in range(len(dates) - 1): today = dates[i] tomorrow = dates[i + 1] today_q = data[today] tomorrow_q = data[tomorrow] # 判断今日是否涨停 if today_q.open == 0: continue today_pct = (float(today_q.close) - float(today_q.open)) / float(today_q.open) * 100 if today_pct >= 9.8: # 今日涨停 stock_limits += 1 # 判断次日表现 if tomorrow_q.open == 0: continue tomorrow_pct = (float(tomorrow_q.close) - float(tomorrow_q.open)) / float(tomorrow_q.open) * 100 if tomorrow_pct >= 9.8: next_day_stats["limit_up"] += 1 elif tomorrow_pct > 0: next_day_stats["up"] += 1 stock_breaks += 1 # 炸板 elif tomorrow_pct > -9.8: next_day_stats["down"] += 1 stock_breaks += 1 # 炸板 else: next_day_stats["limit_down"] += 1 stock_breaks += 1 # 炸板 # 计算个股炸板率 if stock_limits > 0: break_rate = stock_breaks / stock_limits * 100 if stock_limits >= 3: # 至少3次涨停才有统计意义 stock_break_rates[code] = { "name": list(data.values())[0].name, "limits": stock_limits, "breaks": stock_breaks, "break_rate": round(break_rate, 2) } total = sum(next_day_stats.values()) if total == 0: return {"ok": False, "msg": "统计样本不足"} # 计算比例 stats_with_pct = { "limit_up": { "count": next_day_stats["limit_up"], "pct": round(next_day_stats["limit_up"] / total * 100, 2) }, "up": { "count": next_day_stats["up"], "pct": round(next_day_stats["up"] / total * 100, 2) }, "down": { "count": next_day_stats["down"], "pct": round(next_day_stats["down"] / total * 100, 2) }, "limit_down": { "count": next_day_stats["limit_down"], "pct": round(next_day_stats["limit_down"] / total * 100, 2) } } # 炸板率 = (上涨未涨停 + 下跌 + 跌停) / 总数 break_rate = (next_day_stats["up"] + next_day_stats["down"] + next_day_stats["limit_down"]) / total * 100 # 个股炸板率排行(从高到低) stock_rankings = sorted( [(code, data) for code, data in stock_break_rates.items()], key=lambda x: x[1]["break_rate"], reverse=True )[:30] return { "ok": True, "days": days, "total_samples": total, "overall_break_rate": round(break_rate, 2), "next_day_stats": stats_with_pct, "stock_rankings": [{ "code": code, "name": data["name"], "limits": data["limits"], "breaks": data["breaks"], "break_rate": data["break_rate"] } for code, data in stock_rankings] } def get_limit_squad_rankings(days: int = 30, min_limits: int = 5) -> Dict[str, Any]: """涨停敢死队排行 统计期间内涨停次数最多的股票(俗称"妖股") Args: days: 统计天数 min_limits: 最少涨停次数 Returns: 敢死队排行 """ with get_session() as s: latest_date = s.execute(select(func.max(DailyQuote.date))).scalar() if not latest_date: return {"ok": False, "msg": "暂无数据"} start_date = latest_date - dt.timedelta(days=days) # 获取期间所有股票的日线数据 quotes = s.execute( select(DailyQuote) .where(DailyQuote.date >= start_date) .order_by(DailyQuote.code, DailyQuote.date) ).scalars().all() # 统计每只股票的涨停次数 limit_counts = defaultdict(lambda: { "name": "", "count": 0, "dates": [], "total_days": 0, "max_consecutive": 0 }) # 按股票分组 stock_data = defaultdict(list) for q in quotes: stock_data[q.code].append(q) for code, data in stock_data.items(): if not data: continue data_sorted = sorted(data, key=lambda x: x.date) limit_days = [] for q in data_sorted: if q.open == 0: continue pct = (float(q.close) - float(q.open)) / float(q.open) * 100 if pct >= 9.8: # 涨停 limit_days.append(q.date) if len(limit_days) >= min_limits: # 计算最大连板数 max_consecutive = 1 current_consecutive = 1 for i in range(1, len(limit_days)): if (limit_days[i] - limit_days[i-1]).days == 1: current_consecutive += 1 max_consecutive = max(max_consecutive, current_consecutive) else: current_consecutive = 1 limit_counts[code] = { "name": data_sorted[0].name, "count": len(limit_days), "dates": [d.isoformat() for d in limit_days], "total_days": len(data_sorted), "max_consecutive": max_consecutive, "frequency": round(len(limit_days) / len(data_sorted) * 100, 2) } # 排序 rankings = sorted( [(code, data) for code, data in limit_counts.items()], key=lambda x: (x[1]["count"], x[1]["max_consecutive"]), reverse=True )[:50] return { "ok": True, "days": days, "start_date": start_date.isoformat(), "end_date": latest_date.isoformat(), "count": len(rankings), "rankings": [{ "code": code, "name": data["name"], "limit_count": data["count"], "max_consecutive": data["max_consecutive"], "frequency": data["frequency"], "dates": data["dates"] } for code, data in rankings] }