from fastapi.responses import JSONResponse from fastapi.security import OAuth2PasswordRequestForm from sqlmodel import select from fastapi import APIRouter, Depends, HTTPException from fido2.webauthn import PublicKeyCredentialDescriptor from fido2 import cbor from sqlmodel.ext.asyncio.session import AsyncSession from app.core.database import get_session from app.internal.auth import create_access_token, get_current_admin_user, hash_password, verify_password from app.internal.models import Admin, AdminCreate, AdminCredential, AdminFIDO2Challenge, TokenResponse from app.internal.security import fido2_server router = APIRouter(prefix="/admin", tags=["admin"]) @router.post("/register") async def register( user: AdminCreate, session: AsyncSession = Depends(get_session) ): existing = (await session.exec(select(Admin).where(Admin.username == user.username))).first() if existing is not None: raise HTTPException(status_code=400, detail="Username already exists") new_admin = Admin( username=user.username, password=hash_password(user.password) ) session.add(new_admin) await session.commit() await session.refresh(new_admin) return JSONResponse(content={"message": "Admin registered successfully"}, status_code=201) @router.post("/login") async def login( form_data: OAuth2PasswordRequestForm = Depends(), session: AsyncSession = Depends(get_session) ) -> TokenResponse: admin_user = (await session.exec(select(Admin).where(Admin.username == form_data.username))).first() if not admin_user or not verify_password(form_data.password, admin_user.password): raise HTTPException(status_code=400, detail="Incorrect username or password") access_token = create_access_token(data={"sub": admin_user.username}) return TokenResponse(access_token=access_token, token_type="bearer") @router.post("/register/begin") async def register_begin( current_admin: Admin = Depends(get_current_admin_user), session: AsyncSession = Depends(get_session) ): registration_data, state = fido2_server.register_begin( user={ "id": current_admin.id.to_bytes(16, "big"), "name": current_admin.username, "displayName": current_admin.username }, credentials=(await session.exec(select(AdminCredential).where(AdminCredential.admin_id == current_admin.id))).all(), ) challenge = AdminFIDO2Challenge( admin_id = current_admin.id, challenge = state["challenge"], type = "registration" ) session.add(challenge) await session.commit() print("Registration data:", registration_data) return cbor.encode(registration_data).decode("utf-8") @router.post("/register/complete") async def register_complete( admin_id: str, credential: dict, session: AsyncSession = Depends(get_session) ): challenge = (await session.exec( select(AdminFIDO2Challenge) .where(AdminFIDO2Challenge.admin_id == int(admin_id), AdminFIDO2Challenge.type == "registration") )).first() if not challenge: raise HTTPException(status_code=400, detail="No registration challenge found") auth_data = fido2_server.register_complete( state={"challenge": challenge.challenge}, client_data=credential, ) user = (await session.exec(select(Admin).where(Admin.id == int(admin_id)))).first() new_credential = AdminCredential( admin_id=user.id, credential_id=auth_data.credential_id, public_key=auth_data.credential_public_key, sign_count=auth_data.sign_count ) session.add(new_credential) await session.delete(challenge) await session.commit() return JSONResponse(content={"message": "FIDO2 registration successful"}, status_code=200) @router.post("/login/begin") async def login_begin(username: str, session: AsyncSession = Depends(get_session)): admin_user = session.exec(select(Admin).where(Admin.username == username)).first() if not admin_user: raise HTTPException(status_code=400, detail="User not found") credentials = session.exec(select(AdminCredential).where(AdminCredential.admin_id == admin_user.id)).all() allow_credentials = [ PublicKeyCredentialDescriptor(id=cred.credential_id) for cred in credentials ] authentication_data, state = fido2_server.authenticate_begin( allow_credentials=allow_credentials, user_verification="preferred" ) challenge = AdminFIDO2Challenge( admin_id=admin_user.id, challenge=state["challenge"], type="authentication" ) session.add(challenge) await session.commit() return authentication_data @router.post("/login/complete") async def login_complete(admin_id: str, credential: dict, session: AsyncSession = Depends(get_session)): challenge = (await session.exec( select(AdminFIDO2Challenge) .where(AdminFIDO2Challenge.admin_id == int(admin_id)) .where(AdminFIDO2Challenge.type == "authentication") )).first() if not challenge: raise HTTPException(status_code=400, detail="No authentication challenge found") user = (await session.exec(select(Admin).where(Admin.id == int(admin_id)))).first() credentials = (await session.exec(select(AdminCredential).where(AdminCredential.admin_id == user.id))).all() auth_data = fido2_server.authenticate_complete( state={"challenge": challenge.challenge}, credentials=[ { "id": cred.credential_id, "public_key": cred.public_key, "sign_count": cred.sign_count } for cred in credentials ], ) credential = (await session.exec( select(AdminCredential) .where(AdminCredential.credential_id == auth_data.credential_id) )).first() credential.sign_count = auth_data.sign_count await session.delete(challenge) await session.commit() return JSONResponse(content={"message": "FIDO2 authentication successful"}, status_code=200)