Coverage for apps/kwai-api/src/kwai_api/v1/auth/endpoints/sso.py: 54%

41 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that defines endpoints for SSO logins.""" 

2 

3from typing import Annotated 

4 

5from fastapi import APIRouter, HTTPException, Request, status 

6from fastapi.params import Depends, Header 

7from fastapi.responses import RedirectResponse 

8from fastapi_sso import GoogleSSO 

9from kwai_bc_identity.authenticate_user import AuthenticateUser, AuthenticateUserCommand 

10from kwai_bc_identity.tokens.access_token_db_repository import AccessTokenDbRepository 

11from kwai_bc_identity.tokens.log_user_login_db_service import LogUserLoginDbService 

12from kwai_bc_identity.tokens.refresh_token_db_repository import RefreshTokenDbRepository 

13from kwai_bc_identity.users.user_account_db_repository import UserAccountDbRepository 

14from kwai_bc_identity.users.user_account_repository import UserAccountNotFoundException 

15from kwai_core.db.database import Database 

16from kwai_core.db.uow import UnitOfWork 

17from kwai_core.settings import Settings, get_settings 

18 

19from kwai_api.dependencies import create_database 

20from kwai_api.v1.auth.cookies import create_cookies 

21 

22 

23router = APIRouter() 

24 

25 

26async def get_google_sso( 

27 settings: Annotated[Settings, Depends(get_settings)], 

28) -> GoogleSSO: 

29 """Google SSO dependency.""" 

30 if settings.security.google is None: 

31 raise HTTPException( 

32 status_code=status.HTTP_501_NOT_IMPLEMENTED, 

33 detail="Google SSO is not configured", 

34 ) 

35 

36 return GoogleSSO( 

37 settings.security.google.client_id, 

38 settings.security.google.client_secret, 

39 redirect_uri=f"{settings.website.url}/api/v1/auth/sso/google/callback", 

40 allow_insecure_http=settings.frontend.test, 

41 ) 

42 

43 

44@router.get("/google/login") 

45async def google_login( 

46 google_sso: Annotated[GoogleSSO, Depends(get_google_sso)], 

47 return_url: str | None = None, 

48): 

49 """Initiate the Google login process.""" 

50 async with google_sso: 

51 return await google_sso.get_login_redirect(state=return_url) 

52 

53 

54@router.get("/google/callback") 

55async def google_callback( 

56 request: Request, 

57 google_sso: Annotated[GoogleSSO, Depends(get_google_sso)], 

58 db: Annotated[Database, Depends(create_database)], 

59 settings: Annotated[Settings, Depends(get_settings)], 

60 state: str | None = None, 

61 x_forwarded_for: Annotated[str | None, Header()] = None, 

62 user_agent: Annotated[str | None, Header()] = "", 

63): 

64 """Implement the Google login callback.""" 

65 async with google_sso: 

66 openid = await google_sso.verify_and_process(request) 

67 if not openid: 

68 raise HTTPException( 

69 status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication failed" 

70 ) 

71 

72 async with UnitOfWork(db, always_commit=True): 

73 try: 

74 refresh_token = await AuthenticateUser( 

75 UserAccountDbRepository(db), 

76 AccessTokenDbRepository(db), 

77 RefreshTokenDbRepository(db), 

78 LogUserLoginDbService( 

79 db, 

80 email=str(openid.email), 

81 user_agent=user_agent, 

82 client_ip=request.client.host 

83 if x_forwarded_for is None 

84 else x_forwarded_for, 

85 open_id_sub=openid.id if openid.id else "", 

86 open_id_provider=openid.provider, 

87 ), 

88 ).execute(AuthenticateUserCommand(username=str(openid.email))) 

89 except UserAccountNotFoundException as exc: 

90 raise HTTPException( 

91 status_code=status.HTTP_401_UNAUTHORIZED, detail="Unknown user account" 

92 ) from exc 

93 

94 if state is not None: 

95 response = RedirectResponse(state) 

96 else: 

97 response = RedirectResponse(settings.website.url) 

98 

99 create_cookies(response, refresh_token, settings) 

100 

101 return response