This commit is contained in:
2026-02-22 00:05:04 +08:00
parent 6ad5256df9
commit ed8ba1ae7d
5 changed files with 54 additions and 40 deletions

View File

@@ -1,29 +1,26 @@
from datetime import datetime, timezone
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import SQLModel, Field, Session, select
from fastapi import APIRouter, Depends, HTTPException, Request
from fido2.server import Fido2Server
from fido2.webauthn import PublicKeyCredentialDescriptor, PublicKeyCredentialRpEntity, AttestationObject, AuthenticatorData
from sqlmodel import select
from fastapi import APIRouter, Depends, HTTPException
from fido2.webauthn import PublicKeyCredentialDescriptor
from fido2 import cbor
import secrets
from sqlmodel.ext.asyncio.session import AsyncSession
from app.core.database import get_session
from typing import Any
from app.internal.auth import create_access_token, get_current_admin_user, hash_password, verify_password
from app.internal.models import Admin, AdminCreate, AdminCredential, AdminFIDO2Challenge, TokenResponse
from app.internal.security import fido2_server
from app.utils.webauthn import convert_bytes_to_base64
router = APIRouter(prefix="/admin", tags=["admin"])
@router.post("/register")
async def register(
user: AdminCreate,
session: Session = Depends(get_session)
session: AsyncSession = Depends(get_session)
):
existing = session.exec(select(Admin).where(Admin.username == user.username)).first()
existing = (await session.exec(select(Admin).where(Admin.username == user.username))).first()
if existing is not None:
raise HTTPException(status_code=400, detail="Username already exists")
@@ -34,17 +31,17 @@ async def register(
)
session.add(new_admin)
session.commit()
session.refresh(new_admin)
await session.commit()
await session.refresh(new_admin)
return JSONResponse(content={"message": "Admin registered successfully"}, status_code=201)
@router.post("/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
session: Session = Depends(get_session)
session: AsyncSession = Depends(get_session)
) -> TokenResponse:
admin_user = session.exec(select(Admin).where(Admin.username == form_data.username)).first()
admin_user = (await session.exec(select(Admin).where(Admin.username == form_data.username))).first()
if not admin_user or not verify_password(form_data.password, admin_user.password):
raise HTTPException(status_code=400, detail="Incorrect username or password")
@@ -58,7 +55,7 @@ async def login(
@router.post("/register/begin")
async def register_begin(
current_admin: Admin = Depends(get_current_admin_user),
session: Session = Depends(get_session)
session: AsyncSession = Depends(get_session)
):
registration_data, state = fido2_server.register_begin(
@@ -67,7 +64,7 @@ async def register_begin(
"name": current_admin.username,
"displayName": current_admin.username
},
credentials=session.exec(select(AdminCredential).where(AdminCredential.admin_id == current_admin.id)).all(),
credentials=(await session.exec(select(AdminCredential).where(AdminCredential.admin_id == current_admin.id))).all(),
)
challenge = AdminFIDO2Challenge(
@@ -77,7 +74,7 @@ async def register_begin(
)
session.add(challenge)
session.commit()
await session.commit()
print("Registration data:", registration_data)
return cbor.encode(registration_data).decode("utf-8")
@@ -87,12 +84,12 @@ async def register_begin(
async def register_complete(
admin_id: str,
credential: dict,
session: Session = Depends(get_session)
session: AsyncSession = Depends(get_session)
):
challenge = session.exec(
challenge = (await session.exec(
select(AdminFIDO2Challenge)
.where(AdminFIDO2Challenge.admin_id == int(admin_id), AdminFIDO2Challenge.type == "registration")
).first()
)).first()
if not challenge:
raise HTTPException(status_code=400, detail="No registration challenge found")
@@ -102,7 +99,7 @@ async def register_complete(
client_data=credential,
)
user = session.exec(select(Admin).where(Admin.id == int(admin_id))).first()
user = (await session.exec(select(Admin).where(Admin.id == int(admin_id)))).first()
new_credential = AdminCredential(
admin_id=user.id,
credential_id=auth_data.credential_id,
@@ -110,13 +107,13 @@ async def register_complete(
sign_count=auth_data.sign_count
)
session.add(new_credential)
session.delete(challenge)
session.commit()
await session.delete(challenge)
await session.commit()
return JSONResponse(content={"message": "FIDO2 registration successful"}, status_code=200)
@router.post("/login/begin")
async def login_begin(username: str, session: Session = Depends(get_session)):
async def login_begin(username: str, session: AsyncSession = Depends(get_session)):
admin_user = session.exec(select(Admin).where(Admin.username == username)).first()
if not admin_user:
raise HTTPException(status_code=400, detail="User not found")
@@ -140,23 +137,23 @@ async def login_begin(username: str, session: Session = Depends(get_session)):
)
session.add(challenge)
session.commit()
await session.commit()
return authentication_data
@router.post("/login/complete")
async def login_complete(admin_id: str, credential: dict, session: Session = Depends(get_session)):
challenge = session.exec(
async def login_complete(admin_id: str, credential: dict, session: AsyncSession = Depends(get_session)):
challenge = (await session.exec(
select(AdminFIDO2Challenge)
.where(AdminFIDO2Challenge.admin_id == int(admin_id))
.where(AdminFIDO2Challenge.type == "authentication")
).first()
)).first()
if not challenge:
raise HTTPException(status_code=400, detail="No authentication challenge found")
user = session.exec(select(Admin).where(Admin.id == int(admin_id))).first()
credentials = session.exec(select(AdminCredential).where(AdminCredential.admin_id == user.id)).all()
user = (await session.exec(select(Admin).where(Admin.id == int(admin_id)))).first()
credentials = (await session.exec(select(AdminCredential).where(AdminCredential.admin_id == user.id))).all()
auth_data = fido2_server.authenticate_complete(
state={"challenge": challenge.challenge},
@@ -170,12 +167,12 @@ async def login_complete(admin_id: str, credential: dict, session: Session = Dep
],
)
credential = session.exec(
credential = (await session.exec(
select(AdminCredential)
.where(AdminCredential.credential_id == auth_data.credential_id)
).first()
)).first()
credential.sign_count = auth_data.sign_count
session.delete(challenge)
session.commit()
await session.delete(challenge)
await session.commit()
return JSONResponse(content={"message": "FIDO2 authentication successful"}, status_code=200)