Coverage for apps/kwai-api/src/kwai_api/security_dependencies.py: 65%

57 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that defines dependencies that can be used to check authentication and authorization.""" 

2 

3from typing import Annotated 

4 

5import jwt 

6 

7from fastapi import Cookie, Depends, Header 

8from kwai_bc_club.domain.club_coach import ClubCoachEntity 

9from kwai_bc_club.get_user_coach import GetUserCoach, GetUserCoachCommand 

10from kwai_bc_club.repositories.coach_db_repository import CoachDbRepository 

11from kwai_bc_club.repositories.coach_repository import CoachNotFoundException 

12from kwai_bc_identity.exceptions import AuthenticationException 

13from kwai_bc_identity.refresh_access_token import ( 

14 RefreshAccessToken, 

15 RefreshAccessTokenCommand, 

16) 

17from kwai_bc_identity.tokens.access_token_db_repository import AccessTokenDbRepository 

18from kwai_bc_identity.tokens.log_user_login_db_service import LogUserLoginDbService 

19from kwai_bc_identity.tokens.refresh_token_db_repository import RefreshTokenDbRepository 

20from kwai_bc_identity.users.user import UserEntity 

21from kwai_core.db.database import Database 

22from kwai_core.db.uow import UnitOfWork 

23from kwai_core.domain.presenter import EntityPresenter 

24from kwai_core.settings import Settings, get_settings 

25from starlette.requests import Request 

26from starlette.responses import Response 

27 

28from kwai_api.dependencies import create_database, get_optional_user 

29from kwai_api.v1.auth.cookies import create_cookies 

30 

31 

32async def check_login( 

33 request: Request, 

34 response: Response, 

35 settings: Annotated[Settings, Depends(get_settings)], 

36 db: Annotated[Database, Depends(create_database)], 

37 access_token: Annotated[str | None, Cookie()] = None, 

38 refresh_token: Annotated[str | None, Cookie()] = None, 

39 x_forwarded_for: Annotated[str | None, Header()] = None, 

40 user_agent: Annotated[str | None, Header()] = "", 

41) -> bool: 

42 """Dependency that checks for a valid login. 

43 

44 None will be returned when the access token is still valid. 

45 When the acces token is expired, it will try to generate a new refresh and access token. 

46 """ 

47 if access_token is None and refresh_token is None: 

48 return False 

49 

50 if refresh_token is None: 

51 return False 

52 

53 if access_token: 

54 try: 

55 jwt.decode( 

56 access_token, 

57 settings.security.jwt_secret, 

58 algorithms=[settings.security.jwt_algorithm], 

59 ) 

60 # The access token is still valid 

61 return True 

62 except jwt.ExpiredSignatureError: 

63 pass # ignore, we are going to recreate a new one, if possible 

64 

65 try: 

66 decoded_refresh_token = jwt.decode( 

67 refresh_token, 

68 settings.security.jwt_refresh_secret, 

69 algorithms=[settings.security.jwt_algorithm], 

70 ) 

71 except jwt.ExpiredSignatureError: 

72 return False 

73 

74 if x_forwarded_for: 

75 client_ip = x_forwarded_for 

76 else: 

77 client_ip = request.client.host if request.client else "" 

78 

79 command = RefreshAccessTokenCommand( 

80 identifier=decoded_refresh_token["jti"], 

81 access_token_expiry_minutes=settings.security.access_token_expires_in, 

82 refresh_token_expiry_minutes=settings.security.refresh_token_expires_in, 

83 ) 

84 

85 try: 

86 async with UnitOfWork(db, always_commit=True): 

87 new_refresh_token = await RefreshAccessToken( 

88 RefreshTokenDbRepository(db), 

89 AccessTokenDbRepository(db), 

90 LogUserLoginDbService( 

91 db, 

92 email="", 

93 user_agent=user_agent or "", 

94 client_ip=client_ip, 

95 ), 

96 ).execute(command) 

97 except AuthenticationException: 

98 return False 

99 

100 create_cookies(request, response, new_refresh_token, settings) 

101 return True 

102 

103 

104async def get_coach( 

105 user: Annotated[UserEntity, Depends(get_optional_user)], 

106 database: Annotated[Database, Depends(create_database)], 

107) -> ClubCoachEntity | None: 

108 """Get the coach from the current user.""" 

109 if user is None: 

110 return None 

111 

112 command = GetUserCoachCommand(uuid=str(user.uuid)) 

113 presenter = EntityPresenter[ClubCoachEntity]() 

114 try: 

115 await GetUserCoach(CoachDbRepository(database), presenter).execute(command) 

116 except CoachNotFoundException: 

117 return None 

118 

119 return presenter.entity