from datetime import datetime, timezone from fastapi.responses import JSONResponse from fastapi.security import OAuth2PasswordRequestForm from sqlmodel import SQLModel, Field, Session, select from fastapi import APIRouter, Depends, HTTPException, Request from fido2.server import Fido2Server from fido2.webauthn import PublicKeyCredentialDescriptor, PublicKeyCredentialRpEntity, AttestationObject, AuthenticatorData from fido2 import cbor import secrets from app.core.database import get_session from typing import Any from app.internal.auth import create_access_token, get_current_admin_user, hash_password, verify_password from app.internal.models import Admin, AdminCreate, AdminCredential, AdminFIDO2Challenge, TokenResponse from app.internal.security import fido2_server from app.utils.webauthn import convert_bytes_to_base64 router = APIRouter(prefix="/admin", tags=["admin"]) @router.post("/register") async def register( user: AdminCreate, session: Session = Depends(get_session) ): existing = session.exec(select(Admin).where(Admin.username == user.username)).first() if existing is not None: raise HTTPException(status_code=400, detail="Username already exists") new_admin = Admin( username=user.username, password=hash_password(user.password) ) session.add(new_admin) session.commit() session.refresh(new_admin) return JSONResponse(content={"message": "Admin registered successfully"}, status_code=201) @router.post("/login") async def login( form_data: OAuth2PasswordRequestForm = Depends(), session: Session = Depends(get_session) ) -> TokenResponse: admin_user = session.exec(select(Admin).where(Admin.username == form_data.username)).first() if not admin_user or not verify_password(form_data.password, admin_user.password): raise HTTPException(status_code=400, detail="Incorrect username or password") access_token = create_access_token(data={"sub": admin_user.username}) return TokenResponse(access_token=access_token, token_type="bearer") @router.post("/register/begin") async def register_begin( current_admin: Admin = Depends(get_current_admin_user), session: Session = Depends(get_session) ): registration_data, state = fido2_server.register_begin( user={ "id": current_admin.id.to_bytes(16, "big"), "name": current_admin.username, "displayName": current_admin.username }, credentials=session.exec(select(AdminCredential).where(AdminCredential.admin_id == current_admin.id)).all(), ) challenge = AdminFIDO2Challenge( admin_id = current_admin.id, challenge = state["challenge"], type = "registration" ) session.add(challenge) session.commit() print("Registration data:", registration_data) return cbor.encode(registration_data).decode("utf-8") @router.post("/register/complete") async def register_complete( admin_id: str, credential: dict, session: Session = Depends(get_session) ): challenge = session.exec( select(AdminFIDO2Challenge) .where(AdminFIDO2Challenge.admin_id == int(admin_id), AdminFIDO2Challenge.type == "registration") ).first() if not challenge: raise HTTPException(status_code=400, detail="No registration challenge found") auth_data = fido2_server.register_complete( state={"challenge": challenge.challenge}, client_data=credential, ) user = session.exec(select(Admin).where(Admin.id == int(admin_id))).first() new_credential = AdminCredential( admin_id=user.id, credential_id=auth_data.credential_id, public_key=auth_data.credential_public_key, sign_count=auth_data.sign_count ) session.add(new_credential) session.delete(challenge) session.commit() return JSONResponse(content={"message": "FIDO2 registration successful"}, status_code=200) @router.post("/login/begin") async def login_begin(username: str, session: Session = Depends(get_session)): admin_user = session.exec(select(Admin).where(Admin.username == username)).first() if not admin_user: raise HTTPException(status_code=400, detail="User not found") credentials = session.exec(select(AdminCredential).where(AdminCredential.admin_id == admin_user.id)).all() allow_credentials = [ PublicKeyCredentialDescriptor(id=cred.credential_id) for cred in credentials ] authentication_data, state = fido2_server.authenticate_begin( allow_credentials=allow_credentials, user_verification="preferred" ) challenge = AdminFIDO2Challenge( admin_id=admin_user.id, challenge=state["challenge"], type="authentication" ) session.add(challenge) session.commit() return authentication_data @router.post("/login/complete") async def login_complete(admin_id: str, credential: dict, session: Session = Depends(get_session)): challenge = session.exec( select(AdminFIDO2Challenge) .where(AdminFIDO2Challenge.admin_id == int(admin_id)) .where(AdminFIDO2Challenge.type == "authentication") ).first() if not challenge: raise HTTPException(status_code=400, detail="No authentication challenge found") user = session.exec(select(Admin).where(Admin.id == int(admin_id))).first() credentials = session.exec(select(AdminCredential).where(AdminCredential.admin_id == user.id)).all() auth_data = fido2_server.authenticate_complete( state={"challenge": challenge.challenge}, credentials=[ { "id": cred.credential_id, "public_key": cred.public_key, "sign_count": cred.sign_count } for cred in credentials ], ) credential = session.exec( select(AdminCredential) .where(AdminCredential.credential_id == auth_data.credential_id) ).first() credential.sign_count = auth_data.sign_count session.delete(challenge) session.commit() return JSONResponse(content={"message": "FIDO2 authentication successful"}, status_code=200)