Coverage for bc/kwai-bc-identity/src/kwai_bc_identity/authenticate_user.py: 90%

39 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that implements the use case: authenticate user.""" 

2 

3from dataclasses import dataclass 

4 

5from kwai_core.domain.value_objects.email_address import EmailAddress 

6from kwai_core.domain.value_objects.timestamp import Timestamp 

7 

8from kwai_bc_identity.exceptions import AuthenticationException 

9from kwai_bc_identity.tokens.access_token import AccessTokenEntity 

10from kwai_bc_identity.tokens.access_token_repository import AccessTokenRepository 

11from kwai_bc_identity.tokens.log_user_login_service import LogUserLoginService 

12from kwai_bc_identity.tokens.refresh_token import RefreshTokenEntity 

13from kwai_bc_identity.tokens.refresh_token_repository import RefreshTokenRepository 

14from kwai_bc_identity.users.user_account_repository import UserAccountRepository 

15 

16 

17@dataclass(kw_only=True, frozen=True) 

18class AuthenticateUserCommand: 

19 """Input for the (AuthenticateUser) use case. 

20 

21 Attributes: 

22 username: The email address of the user. 

23 password: The password of the user. 

24 access_token_expiry_minutes: Minutes before expiring the access token. 

25 Default is 2 hours. 

26 refresh_token_expiry_minutes: Minutes before expiring the refresh token. 

27 Default is 2 months. 

28 """ 

29 

30 username: str 

31 password: str | None = None 

32 access_token_expiry_minutes: int = 60 * 2 # 2 hours 

33 refresh_token_expiry_minutes: int = 60 * 24 * 60 # 2 months 

34 

35 

36class AuthenticateUser: 

37 """Use case to authenticate a user.""" 

38 

39 def __init__( 

40 self, 

41 user_account_repo: UserAccountRepository, 

42 access_token_repo: AccessTokenRepository, 

43 refresh_token_repo: RefreshTokenRepository, 

44 log_user_login_service: LogUserLoginService, 

45 ): 

46 self._user_account_repo = user_account_repo 

47 self._access_token_repo = access_token_repo 

48 self._refresh_token_repo = refresh_token_repo 

49 self._log_user_login_service = log_user_login_service 

50 

51 async def execute(self, command: AuthenticateUserCommand) -> RefreshTokenEntity: 

52 """Execute the use case. 

53 

54 Args: 

55 command: The input for this use case. 

56 

57 Returns: 

58 RefreshTokenEntity: On success, a refresh token entity will be returned. 

59 

60 Raises: 

61 InvalidEmailException: Raised when the username 

62 contains an invalid email address. 

63 UserAccountNotFoundException: Raised when the user with the given email 

64 address doesn't exist. 

65 """ 

66 user_account = await self._user_account_repo.get_user_by_email( 

67 EmailAddress(command.username) 

68 ) 

69 if user_account.revoked: 

70 # save the last unsuccessful login 

71 await self._user_account_repo.update(user_account) 

72 

73 message = "User account is revoked" 

74 await self._log_user_login_service.notify_failure( 

75 message, user_account=user_account 

76 ) 

77 raise AuthenticationException(message) 

78 

79 user_account = user_account.login(command.password) 

80 if not user_account.logged_in: 

81 # save the last unsuccessful login 

82 await self._user_account_repo.update(user_account) 

83 

84 message = "Invalid password" 

85 await self._log_user_login_service.notify_failure( 

86 message, user_account=user_account 

87 ) 

88 raise AuthenticationException(message) 

89 

90 access_token = await self._access_token_repo.create( 

91 AccessTokenEntity( 

92 expiration=Timestamp.create_with_delta( 

93 minutes=command.access_token_expiry_minutes 

94 ), 

95 user_account=user_account, 

96 ) 

97 ) 

98 

99 refresh_token = await self._refresh_token_repo.create( 

100 RefreshTokenEntity( 

101 expiration=Timestamp.create_with_delta( 

102 minutes=command.refresh_token_expiry_minutes 

103 ), 

104 access_token=access_token, 

105 ) 

106 ) 

107 

108 # save the last successful login 

109 await self._user_account_repo.update(user_account) 

110 

111 await self._log_user_login_service.notify_success( 

112 user_account=user_account, refresh_token=refresh_token 

113 ) 

114 

115 return refresh_token