from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, HTTPException, status, Header from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt import bcrypt from sqlalchemy.orm import Session from db import SessionLocal from models import User from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, API_KEYS security = HTTPBearer(auto_error=False) def verify_password(plain_password: str, hashed_password: str) -> bool: """验证密码""" return bcrypt.checkpw(plain_password.encode(), hashed_password.encode()) def get_password_hash(password: str) -> str: """密码哈希""" return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """生成 JWT Token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def authenticate_user(db: Session, username: str, password: str) -> Optional[User]: """验证用户名密码""" user = db.query(User).filter(User.username == username).first() if not user: return None if not verify_password(password, user.hashed_password): return None return user async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), x_api_key: Optional[str] = Header(None), db: Session = Depends(lambda: SessionLocal()) ) -> Optional[User]: """获取当前用户(支持 JWT Token 和 API Key 两种方式)""" # 方式1:API Key if x_api_key and x_api_key in API_KEYS: # API Key 模式,返回虚拟管理员 user = db.query(User).filter(User.username == "admin").first() if user: return user # 方式2:JWT Token if credentials: token = credentials.credentials try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: return None user = db.query(User).filter(User.username == username).first() return user except JWTError: return None return None async def require_auth(current_user: Optional[User] = Depends(get_current_user)): """需要认证的依赖""" if not current_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="未认证,请先登录", headers={"WWW-Authenticate": "Bearer"}, ) return current_user async def require_admin(current_user: User = Depends(require_auth)): """需要管理员权限的依赖""" if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="需要管理员权限" ) return current_user