from datetime import datetime, timedelta, timezone from pwdlib import PasswordHash import jwt from jwt.exceptions import InvalidTokenError from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlmodel import Session, select from app.core.database import get_session from app.internal.models import Admin SECRET_KEY = "CHANGE_THIS_IN_PRODUCTION" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="admin/login") def verify_password(plain_password: str, hashed_password: str) -> bool: return PasswordHash.recommended().verify(plain_password, hashed_password) def hash_password(password: str) -> str: return PasswordHash.recommended().hash(password) def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str: to_encode = data.copy() expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def get_current_admin_user( token: str = Depends(oauth2_scheme), session: Session = Depends(get_session) ): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except InvalidTokenError: raise credentials_exception admin_user = session.exec(select(Admin).where(Admin.username == username)).first() if not admin_user: raise credentials_exception return admin_user