from datetime import datetime, timedelta from typing import Any, Optional from passlib.context import CryptContext from jose import jwt, JWTError from fastapi import HTTPException, status, Depends, Request, Response from fastapi.security import OAuth2PasswordBearer from app.core.config import settings # ========================= # PASSWORD HASHING # ========================= pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: """Hash a password using bcrypt.""" return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return pwd_context.verify(plain_password, hashed_password) # ========================= # JWT TOKEN MANAGEMENT # ========================= oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token.""" to_encode = data.copy() expire = datetime.now() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt def decode_access_token(token: str) -> dict: """Decode and verify a JWT access token.""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) return payload except JWTError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token") # ========================= # DEPENDENCIES FASTAPI # ========================= def get_current_user(token: str) -> dict: """Get current user from JWT token.""" payload = decode_access_token(token) user_id: Any = payload.get("sub") if user_id is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload") return payload async def get_token_payload(request: Request) -> dict: token = get_cookie_token(request) if not token: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") return get_current_user(token) # ========================= # COOKIE MANAGEMENT (optionnel) # ========================= def set_auth_cookie(response: Response, token: str, max_age: int = 3600): """Set JWT token in HttpOnly cookie.""" response.set_cookie( key="access_token", value=f"Bearer {token}", httponly=True, max_age=max_age, samesite="lax" if settings.ENV == "dev" else "strict", secure=settings.USE_SSL, # True si HTTPS ) def get_cookie_token(request: Request) -> Optional[str]: """Get JWT token from cookies.""" cookie = request.cookies.get("access_token") if cookie and cookie.startswith("Bearer "): return cookie[7:] return None