"""社区情绪监控 — 爬取分析东方财富/雪球热帖,量化散户情绪。 功能: 1. 爬取社区热帖 2. 情绪分析(乐观/悲观) 3. 热议股票排行 4. 关键词提取和词云 5. 情绪与股价相关性分析 """ import datetime as dt import json import re from typing import List, Dict, Any, Optional from collections import Counter, defaultdict import requests from bs4 import BeautifulSoup import jieba import jieba.analyse from sqlalchemy import select, func, and_, desc from db import get_session from models import SocialPost, SentimentIndex, DailyQuote, StockMetric # 情绪关键词库 BULLISH_KEYWORDS = [ '看多', '看好', '买入', '加仓', '抄底', '突破', '上涨', '暴涨', '牛市', '利好', '反弹', '强势', '拉升', '涨停', '走强', '看涨', '做多' ] BEARISH_KEYWORDS = [ '看空', '看跌', '卖出', '减仓', '止损', '下跌', '暴跌', '熊市', '利空', '回调', '弱势', '下杀', '跌停', '走弱', '做空', '被套' ] # 停用词 STOP_WORDS = set([ '的', '了', '是', '在', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这', '那', '以', '为', '而', '能', '他', '对', '于' ]) def crawl_eastmoney_hot(limit: int = 50) -> List[Dict[str, Any]]: """爬取东方财富热帖(简化版,实际需要处理反爬) 注意:由于反爬限制,这里返回模拟数据 实际生产环境需要: 1. 使用代理IP 2. 模拟浏览器headers 3. 控制请求频率 4. 处理验证码 """ # 模拟数据(实际应该爬取真实数据) mock_posts = [ { 'source': 'eastmoney', 'post_id': f'em_{i}', 'title': f'模拟帖子{i}:今天大盘要反弹了', 'content': '技术分析显示底部信号明显,建议逢低买入', 'author': f'用户{i}', 'comment_count': 100 + i * 10, 'view_count': 1000 + i * 100, } for i in range(limit) ] return mock_posts def crawl_xueqiu_hot(limit: int = 50) -> List[Dict[str, Any]]: """爬取雪球热帖(简化版)""" # 雪球API(需要cookie和token) # 实际使用需要登录后获取token mock_posts = [ { 'source': 'xueqiu', 'post_id': f'xq_{i}', 'title': f'雪球热议{i}:半导体板块分析', 'content': '从产业链角度看,半导体景气度回升', 'author': f'雪球用户{i}', 'comment_count': 50 + i * 5, 'view_count': 500 + i * 50, } for i in range(limit) ] return mock_posts def analyze_sentiment(text: str) -> str: """分析文本情绪 Args: text: 待分析文本 Returns: 情绪标签:bullish/bearish/neutral """ text_lower = text.lower() bullish_score = sum(1 for kw in BULLISH_KEYWORDS if kw in text_lower) bearish_score = sum(1 for kw in BEARISH_KEYWORDS if kw in text_lower) if bullish_score > bearish_score and bullish_score >= 2: return 'bullish' elif bearish_score > bullish_score and bearish_score >= 2: return 'bearish' else: return 'neutral' def extract_keywords(text: str, top_n: int = 10) -> List[str]: """提取关键词 Args: text: 文本内容 top_n: 返回前N个关键词 Returns: 关键词列表 """ # 使用jieba提取关键词 keywords = jieba.analyse.extract_tags(text, topK=top_n, withWeight=False) # 过滤停用词 keywords = [kw for kw in keywords if kw not in STOP_WORDS and len(kw) > 1] return keywords[:top_n] def extract_stock_codes(text: str) -> List[str]: """从文本中提取股票代码 Args: text: 文本内容 Returns: 股票代码列表 """ # 匹配6位数字的股票代码 pattern = r'\b[036]\d{5}\b' codes = re.findall(pattern, text) return list(set(codes)) def collect_posts(limit_per_source: int = 50) -> Dict[str, Any]: """采集社区帖子 Args: limit_per_source: 每个来源采集数量 Returns: 采集结果 """ all_posts = [] # 采集东方财富 try: em_posts = crawl_eastmoney_hot(limit_per_source) all_posts.extend(em_posts) except Exception as e: print(f"[eastmoney] crawl error: {e}") # 采集雪球 try: xq_posts = crawl_xueqiu_hot(limit_per_source) all_posts.extend(xq_posts) except Exception as e: print(f"[xueqiu] crawl error: {e}") # 分析并存储 saved_count = 0 with get_session() as s: for post in all_posts: # 检查是否已存在 exists = s.execute( select(SocialPost).where(SocialPost.post_id == post['post_id']) ).scalar_one_or_none() if exists: continue # 情绪分析 text = post['title'] + ' ' + post.get('content', '') sentiment = analyze_sentiment(text) # 提取关键词 keywords = extract_keywords(text, top_n=5) # 提取股票代码 codes = extract_stock_codes(text) code = codes[0] if codes else '' # 存储 record = SocialPost( source=post['source'], post_id=post['post_id'], code=code, title=post['title'], content=post.get('content', ''), author=post.get('author', ''), comment_count=post.get('comment_count', 0), view_count=post.get('view_count', 0), sentiment=sentiment, keywords=','.join(keywords) ) s.add(record) saved_count += 1 s.commit() return { 'ok': True, 'collected': len(all_posts), 'saved': saved_count } def calculate_sentiment_index(date: Optional[dt.date] = None) -> Dict[str, Any]: """计算情绪指数 Args: date: 统计日期,None表示今天 Returns: 情绪指数数据 """ if date is None: date = dt.date.today() start = dt.datetime.combine(date, dt.time.min) end = dt.datetime.combine(date, dt.time.max) with get_session() as s: # 统计各情绪数量 posts = s.execute( select(SocialPost) .where( and_( SocialPost.created_at >= start, SocialPost.created_at <= end ) ) ).scalars().all() if not posts: return {'ok': False, 'msg': '暂无数据'} bullish_count = sum(1 for p in posts if p.sentiment == 'bullish') bearish_count = sum(1 for p in posts if p.sentiment == 'bearish') neutral_count = sum(1 for p in posts if p.sentiment == 'neutral') total = len(posts) bullish_ratio = bullish_count / total * 100 if total > 0 else 0 # 提取热门关键词 all_keywords = [] for p in posts: if p.keywords: all_keywords.extend(p.keywords.split(',')) keyword_counter = Counter(all_keywords) top_keywords = [ {'word': kw, 'count': cnt} for kw, cnt in keyword_counter.most_common(20) ] # 存储情绪指数 index_record = s.execute( select(SentimentIndex).where(SentimentIndex.date == date) ).scalar_one_or_none() if index_record: index_record.bullish_count = bullish_count index_record.bearish_count = bearish_count index_record.neutral_count = neutral_count index_record.bullish_ratio = bullish_ratio index_record.total_posts = total index_record.top_keywords = json.dumps(top_keywords, ensure_ascii=False) index_record.updated_at = dt.datetime.now() else: index_record = SentimentIndex( date=date, bullish_count=bullish_count, bearish_count=bearish_count, neutral_count=neutral_count, bullish_ratio=bullish_ratio, total_posts=total, top_keywords=json.dumps(top_keywords, ensure_ascii=False) ) s.add(index_record) s.commit() return { 'ok': True, 'date': date.isoformat(), 'bullish_count': bullish_count, 'bearish_count': bearish_count, 'neutral_count': neutral_count, 'bullish_ratio': round(bullish_ratio, 2), 'total_posts': total, 'top_keywords': top_keywords } def get_hot_stocks(days: int = 1, limit: int = 20) -> Dict[str, Any]: """获取热议股票排行 Args: days: 统计天数 limit: 返回数量 Returns: 热议股票列表 """ since = dt.datetime.now() - dt.timedelta(days=days) with get_session() as s: # 按股票代码分组统计 stmt = ( select( SocialPost.code, func.count().label('post_count'), func.sum(SocialPost.comment_count).label('total_comments'), func.sum(SocialPost.view_count).label('total_views') ) .where( and_( SocialPost.code != '', SocialPost.created_at >= since ) ) .group_by(SocialPost.code) .order_by(desc('post_count')) .limit(limit) ) rows = s.execute(stmt).all() if not rows: return {'ok': False, 'msg': '暂无数据'} # 获取股票名称和最新价格 codes = [r.code for r in rows] metrics = {} for m in s.execute( select(StockMetric) .where(StockMetric.code.in_(codes)) ).scalars(): metrics[m.code] = { 'name': m.name, 'close': m.close, 'pct': m.pct } results = [] for r in rows: info = metrics.get(r.code, {'name': r.code, 'close': 0, 'pct': 0}) results.append({ 'code': r.code, 'name': info['name'], 'post_count': r.post_count, 'total_comments': r.total_comments or 0, 'total_views': r.total_views or 0, 'heat_score': r.post_count * 10 + (r.total_comments or 0), 'close': info['close'], 'pct': info['pct'] }) # 按热度评分排序 results.sort(key=lambda x: x['heat_score'], reverse=True) return { 'ok': True, 'days': days, 'stocks': results } def get_sentiment_history(days: int = 30) -> Dict[str, Any]: """获取情绪指数历史 Args: days: 统计天数 Returns: 历史数据 """ since = dt.date.today() - dt.timedelta(days=days) with get_session() as s: rows = s.execute( select(SentimentIndex) .where(SentimentIndex.date >= since) .order_by(SentimentIndex.date) ).scalars().all() if not rows: return {'ok': False, 'msg': '暂无历史数据'} return { 'ok': True, 'dates': [r.date.isoformat() for r in rows], 'bullish_ratio': [round(r.bullish_ratio, 2) for r in rows], 'total_posts': [r.total_posts for r in rows] } def analyze_sentiment_correlation(code: str, days: int = 60) -> Dict[str, Any]: """分析情绪与股价相关性 Args: code: 股票代码 days: 分析天数 Returns: 相关性分析结果 """ since = dt.date.today() - dt.timedelta(days=days) with get_session() as s: # 获取该股票的讨论量和情绪 posts = s.execute( select(SocialPost) .where( and_( SocialPost.code == code, func.date(SocialPost.created_at) >= since ) ) ).scalars().all() if not posts: return {'ok': False, 'msg': '该股票暂无社区数据'} # 按日期聚合 daily_sentiment = defaultdict(lambda: {'bullish': 0, 'bearish': 0, 'neutral': 0, 'total': 0}) for p in posts: date = p.created_at.date() daily_sentiment[date][p.sentiment] += 1 daily_sentiment[date]['total'] += 1 # 获取股价数据 prices = {} for q in s.execute( select(DailyQuote) .where( and_( DailyQuote.code == code, DailyQuote.date >= since ) ) .order_by(DailyQuote.date) ).scalars(): prices[q.date] = { 'close': float(q.close), 'pct': ((float(q.close) - float(q.open)) / float(q.open) * 100) if q.open > 0 else 0 } if not prices: return {'ok': False, 'msg': '缺少股价数据'} # 计算相关性(简化版) dates = sorted(set(daily_sentiment.keys()) & set(prices.keys())) if len(dates) < 10: return {'ok': False, 'msg': '数据点不足'} sentiment_scores = [] price_changes = [] for date in dates: s_data = daily_sentiment[date] bullish_ratio = s_data['bullish'] / s_data['total'] * 100 if s_data['total'] > 0 else 50 sentiment_scores.append(bullish_ratio) price_changes.append(prices[date]['pct']) # 计算相关系数(简化版) import numpy as np if len(sentiment_scores) > 1: correlation = np.corrcoef(sentiment_scores, price_changes)[0, 1] else: correlation = 0 return { 'ok': True, 'code': code, 'days': days, 'data_points': len(dates), 'correlation': round(float(correlation), 3), 'interpretation': _interpret_correlation(correlation), 'dates': [d.isoformat() for d in dates], 'sentiment_scores': [round(s, 2) for s in sentiment_scores], 'price_changes': [round(p, 2) for p in price_changes] } def _interpret_correlation(corr: float) -> str: """解释相关系数""" if corr > 0.7: return '强正相关:情绪高涨时股价往往上涨' elif corr > 0.3: return '中度正相关:情绪与股价有一定同步性' elif corr > -0.3: return '弱相关:情绪与股价关系不明显' elif corr > -0.7: return '中度负相关:情绪高涨时股价反而下跌(反向指标)' else: return '强负相关:典型反向指标,情绪越乐观越要警惕' def get_keyword_cloud(days: int = 7, top_n: int = 50) -> Dict[str, Any]: """获取关键词云数据 Args: days: 统计天数 top_n: 返回前N个关键词 Returns: 词云数据 """ since = dt.datetime.now() - dt.timedelta(days=days) with get_session() as s: posts = s.execute( select(SocialPost) .where(SocialPost.created_at >= since) ).scalars().all() if not posts: return {'ok': False, 'msg': '暂无数据'} # 收集所有关键词 all_keywords = [] for p in posts: if p.keywords: all_keywords.extend(p.keywords.split(',')) # 统计词频 keyword_counter = Counter(all_keywords) # 格式化为词云数据 wordcloud_data = [ {'name': kw, 'value': cnt} for kw, cnt in keyword_counter.most_common(top_n) ] return { 'ok': True, 'days': days, 'keywords': wordcloud_data }