Coverage for apps/kwai-api/src/kwai_api/v1/auth/endpoints/sso.py: 54%
41 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Module that defines endpoints for SSO logins."""
3from typing import Annotated
5from fastapi import APIRouter, HTTPException, Request, status
6from fastapi.params import Depends, Header
7from fastapi.responses import RedirectResponse
8from fastapi_sso import GoogleSSO
9from kwai_bc_identity.authenticate_user import AuthenticateUser, AuthenticateUserCommand
10from kwai_bc_identity.tokens.access_token_db_repository import AccessTokenDbRepository
11from kwai_bc_identity.tokens.log_user_login_db_service import LogUserLoginDbService
12from kwai_bc_identity.tokens.refresh_token_db_repository import RefreshTokenDbRepository
13from kwai_bc_identity.users.user_account_db_repository import UserAccountDbRepository
14from kwai_bc_identity.users.user_account_repository import UserAccountNotFoundException
15from kwai_core.db.database import Database
16from kwai_core.db.uow import UnitOfWork
17from kwai_core.settings import Settings, get_settings
19from kwai_api.dependencies import create_database
20from kwai_api.v1.auth.cookies import create_cookies
23router = APIRouter()
26async def get_google_sso(
27 settings: Annotated[Settings, Depends(get_settings)],
28) -> GoogleSSO:
29 """Google SSO dependency."""
30 if settings.security.google is None:
31 raise HTTPException(
32 status_code=status.HTTP_501_NOT_IMPLEMENTED,
33 detail="Google SSO is not configured",
34 )
36 return GoogleSSO(
37 settings.security.google.client_id,
38 settings.security.google.client_secret,
39 redirect_uri=f"{settings.website.url}/api/v1/auth/sso/google/callback",
40 allow_insecure_http=settings.frontend.test,
41 )
44@router.get("/google/login")
45async def google_login(
46 google_sso: Annotated[GoogleSSO, Depends(get_google_sso)],
47 return_url: str | None = None,
48):
49 """Initiate the Google login process."""
50 async with google_sso:
51 return await google_sso.get_login_redirect(state=return_url)
54@router.get("/google/callback")
55async def google_callback(
56 request: Request,
57 google_sso: Annotated[GoogleSSO, Depends(get_google_sso)],
58 db: Annotated[Database, Depends(create_database)],
59 settings: Annotated[Settings, Depends(get_settings)],
60 state: str | None = None,
61 x_forwarded_for: Annotated[str | None, Header()] = None,
62 user_agent: Annotated[str | None, Header()] = "",
63):
64 """Implement the Google login callback."""
65 async with google_sso:
66 openid = await google_sso.verify_and_process(request)
67 if not openid:
68 raise HTTPException(
69 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication failed"
70 )
72 async with UnitOfWork(db, always_commit=True):
73 try:
74 refresh_token = await AuthenticateUser(
75 UserAccountDbRepository(db),
76 AccessTokenDbRepository(db),
77 RefreshTokenDbRepository(db),
78 LogUserLoginDbService(
79 db,
80 email=str(openid.email),
81 user_agent=user_agent,
82 client_ip=request.client.host
83 if x_forwarded_for is None
84 else x_forwarded_for,
85 open_id_sub=openid.id if openid.id else "",
86 open_id_provider=openid.provider,
87 ),
88 ).execute(AuthenticateUserCommand(username=str(openid.email)))
89 except UserAccountNotFoundException as exc:
90 raise HTTPException(
91 status_code=status.HTTP_401_UNAUTHORIZED, detail="Unknown user account"
92 ) from exc
94 if state is not None:
95 response = RedirectResponse(state)
96 else:
97 response = RedirectResponse(settings.website.url)
99 create_cookies(response, refresh_token, settings)
101 return response