Files

178 lines
6.0 KiB
Python
Raw Permalink Normal View History

2026-02-21 14:56:19 +08:00
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
2026-02-22 00:05:04 +08:00
from sqlmodel import select
from fastapi import APIRouter, Depends, HTTPException
from fido2.webauthn import PublicKeyCredentialDescriptor
2026-02-21 14:56:19 +08:00
from fido2 import cbor
2026-02-22 00:05:04 +08:00
from sqlmodel.ext.asyncio.session import AsyncSession
2026-02-21 14:56:19 +08:00
from app.core.database import get_session
from app.internal.auth import create_access_token, get_current_admin_user, hash_password, verify_password
from app.internal.models import Admin, AdminCreate, AdminCredential, AdminFIDO2Challenge, TokenResponse
from app.internal.security import fido2_server
router = APIRouter(prefix="/admin", tags=["admin"])
@router.post("/register")
async def register(
user: AdminCreate,
2026-02-22 00:05:04 +08:00
session: AsyncSession = Depends(get_session)
2026-02-21 14:56:19 +08:00
):
2026-02-22 00:05:04 +08:00
existing = (await session.exec(select(Admin).where(Admin.username == user.username))).first()
2026-02-21 14:56:19 +08:00
if existing is not None:
raise HTTPException(status_code=400, detail="Username already exists")
new_admin = Admin(
username=user.username,
password=hash_password(user.password)
)
session.add(new_admin)
2026-02-22 00:05:04 +08:00
await session.commit()
await session.refresh(new_admin)
2026-02-21 14:56:19 +08:00
return JSONResponse(content={"message": "Admin registered successfully"}, status_code=201)
@router.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
2026-02-22 00:05:04 +08:00
session: AsyncSession = Depends(get_session)
2026-02-21 14:56:19 +08:00
) -> TokenResponse:
2026-02-22 00:05:04 +08:00
admin_user = (await session.exec(select(Admin).where(Admin.username == form_data.username))).first()
2026-02-21 14:56:19 +08:00
if not admin_user or not verify_password(form_data.password, admin_user.password):
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token = create_access_token(data={"sub": admin_user.username})
return TokenResponse(access_token=access_token, token_type="bearer")
@router.post("/register/begin")
async def register_begin(
current_admin: Admin = Depends(get_current_admin_user),
2026-02-22 00:05:04 +08:00
session: AsyncSession = Depends(get_session)
2026-02-21 14:56:19 +08:00
):
registration_data, state = fido2_server.register_begin(
user={
"id": current_admin.id.to_bytes(16, "big"),
"name": current_admin.username,
"displayName": current_admin.username
},
2026-02-22 00:05:04 +08:00
credentials=(await session.exec(select(AdminCredential).where(AdminCredential.admin_id == current_admin.id))).all(),
2026-02-21 14:56:19 +08:00
)
challenge = AdminFIDO2Challenge(
admin_id = current_admin.id,
challenge = state["challenge"],
type = "registration"
)
session.add(challenge)
2026-02-22 00:05:04 +08:00
await session.commit()
2026-02-21 14:56:19 +08:00
print("Registration data:", registration_data)
return cbor.encode(registration_data).decode("utf-8")
@router.post("/register/complete")
async def register_complete(
admin_id: str,
credential: dict,
2026-02-22 00:05:04 +08:00
session: AsyncSession = Depends(get_session)
2026-02-21 14:56:19 +08:00
):
2026-02-22 00:05:04 +08:00
challenge = (await session.exec(
2026-02-21 14:56:19 +08:00
select(AdminFIDO2Challenge)
.where(AdminFIDO2Challenge.admin_id == int(admin_id), AdminFIDO2Challenge.type == "registration")
2026-02-22 00:05:04 +08:00
)).first()
2026-02-21 14:56:19 +08:00
if not challenge:
raise HTTPException(status_code=400, detail="No registration challenge found")
auth_data = fido2_server.register_complete(
state={"challenge": challenge.challenge},
client_data=credential,
)
2026-02-22 00:05:04 +08:00
user = (await session.exec(select(Admin).where(Admin.id == int(admin_id)))).first()
2026-02-21 14:56:19 +08:00
new_credential = AdminCredential(
admin_id=user.id,
credential_id=auth_data.credential_id,
public_key=auth_data.credential_public_key,
sign_count=auth_data.sign_count
)
session.add(new_credential)
2026-02-22 00:05:04 +08:00
await session.delete(challenge)
await session.commit()
2026-02-21 14:56:19 +08:00
return JSONResponse(content={"message": "FIDO2 registration successful"}, status_code=200)
@router.post("/login/begin")
2026-02-22 00:05:04 +08:00
async def login_begin(username: str, session: AsyncSession = Depends(get_session)):
2026-02-21 14:56:19 +08:00
admin_user = session.exec(select(Admin).where(Admin.username == username)).first()
if not admin_user:
raise HTTPException(status_code=400, detail="User not found")
credentials = session.exec(select(AdminCredential).where(AdminCredential.admin_id == admin_user.id)).all()
allow_credentials = [
PublicKeyCredentialDescriptor(id=cred.credential_id)
for cred in credentials
]
authentication_data, state = fido2_server.authenticate_begin(
allow_credentials=allow_credentials,
user_verification="preferred"
)
challenge = AdminFIDO2Challenge(
admin_id=admin_user.id,
challenge=state["challenge"],
type="authentication"
)
session.add(challenge)
2026-02-22 00:05:04 +08:00
await session.commit()
2026-02-21 14:56:19 +08:00
return authentication_data
@router.post("/login/complete")
2026-02-22 00:05:04 +08:00
async def login_complete(admin_id: str, credential: dict, session: AsyncSession = Depends(get_session)):
challenge = (await session.exec(
2026-02-21 14:56:19 +08:00
select(AdminFIDO2Challenge)
.where(AdminFIDO2Challenge.admin_id == int(admin_id))
.where(AdminFIDO2Challenge.type == "authentication")
2026-02-22 00:05:04 +08:00
)).first()
2026-02-21 14:56:19 +08:00
if not challenge:
raise HTTPException(status_code=400, detail="No authentication challenge found")
2026-02-22 00:05:04 +08:00
user = (await session.exec(select(Admin).where(Admin.id == int(admin_id)))).first()
credentials = (await session.exec(select(AdminCredential).where(AdminCredential.admin_id == user.id))).all()
2026-02-21 14:56:19 +08:00
auth_data = fido2_server.authenticate_complete(
state={"challenge": challenge.challenge},
credentials=[
{
"id": cred.credential_id,
"public_key": cred.public_key,
"sign_count": cred.sign_count
}
for cred in credentials
],
)
2026-02-22 00:05:04 +08:00
credential = (await session.exec(
2026-02-21 14:56:19 +08:00
select(AdminCredential)
.where(AdminCredential.credential_id == auth_data.credential_id)
2026-02-22 00:05:04 +08:00
)).first()
2026-02-21 14:56:19 +08:00
credential.sign_count = auth_data.sign_count
2026-02-22 00:05:04 +08:00
await session.delete(challenge)
await session.commit()
2026-02-21 14:56:19 +08:00
return JSONResponse(content={"message": "FIDO2 authentication successful"}, status_code=200)