功能细节优化

This commit is contained in:
2026-06-15 01:26:39 +08:00
parent e524a3589a
commit 964c17c200
33 changed files with 6990 additions and 210 deletions

88
backend/auth.py Normal file
View File

@@ -0,0 +1,88 @@
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
import bcrypt
from sqlalchemy.orm import Session
from db import SessionLocal
from models import User
from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, API_KEYS
security = HTTPBearer(auto_error=False)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return bcrypt.checkpw(plain_password.encode(), hashed_password.encode())
def get_password_hash(password: str) -> str:
"""密码哈希"""
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""生成 JWT Token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""验证用户名密码"""
user = db.query(User).filter(User.username == username).first()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
x_api_key: Optional[str] = Header(None),
db: Session = Depends(lambda: SessionLocal())
) -> Optional[User]:
"""获取当前用户(支持 JWT Token 和 API Key 两种方式)"""
# 方式1API Key
if x_api_key and x_api_key in API_KEYS:
# API Key 模式,返回虚拟管理员
user = db.query(User).filter(User.username == "admin").first()
if user:
return user
# 方式2JWT Token
if credentials:
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
user = db.query(User).filter(User.username == username).first()
return user
except JWTError:
return None
return None
async def require_auth(current_user: Optional[User] = Depends(get_current_user)):
"""需要认证的依赖"""
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="未认证,请先登录",
headers={"WWW-Authenticate": "Bearer"},
)
return current_user
async def require_admin(current_user: User = Depends(require_auth)):
"""需要管理员权限的依赖"""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限"
)
return current_user