Coverage for apps/kwai-api/src/kwai_api/v1/auth/presenters.py: 85%

34 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that defines presenters for the auth api.""" 

2 

3from typing import Self 

4 

5from kwai_bc_identity.user_invitations.user_invitation import UserInvitationEntity 

6from kwai_bc_identity.users.user_account import UserAccountEntity 

7from kwai_core.domain.presenter import AsyncPresenter, IterableResult, Presenter 

8from kwai_core.json_api import JsonApiPresenter, Meta 

9 

10from kwai_api.v1.auth.schemas.revoked_user import ( 

11 RevokedUserAttributes, 

12 RevokedUserDocument, 

13 RevokedUserResource, 

14) 

15from kwai_api.v1.auth.schemas.user_account import ( 

16 UserAccountAttributes, 

17 UserAccountDocument, 

18 UserAccountResource, 

19 UserAccountsDocument, 

20) 

21from kwai_api.v1.auth.schemas.user_invitation import ( 

22 UserInvitationAttributes, 

23 UserInvitationDocument, 

24 UserInvitationResource, 

25 UserInvitationsDocument, 

26) 

27 

28 

29class JsonApiUserAccountPresenter( 

30 JsonApiPresenter[UserAccountDocument], Presenter[UserAccountEntity] 

31): 

32 """A presenter that transforms a user account entity into a JSON:API document.""" 

33 

34 def present(self, user_account: UserAccountEntity) -> Self: 

35 self._document = UserAccountDocument( 

36 data=UserAccountResource( 

37 id=str(user_account.user.uuid), 

38 attributes=UserAccountAttributes( 

39 email=str(user_account.user.email), 

40 last_login=None 

41 if user_account.last_login.empty 

42 else str(user_account.last_login), 

43 last_unsuccessful_login=None 

44 if user_account.last_unsuccessful_login.empty 

45 else str(user_account.last_unsuccessful_login), 

46 revoked=user_account.revoked, 

47 admin=user_account.user.admin, 

48 first_name=user_account.user.name.first_name, 

49 last_name=user_account.user.name.last_name, 

50 remark=user_account.user.remark, 

51 ), 

52 ) 

53 ) 

54 return self 

55 

56 

57class JsonApiUserAccountsPresenter( 

58 JsonApiPresenter[UserAccountsDocument], 

59 AsyncPresenter[IterableResult[UserAccountEntity]], 

60): 

61 """A presenter that transform an iterable list of user account entities into a JSON:API document.""" 

62 

63 async def present(self, use_case_result: IterableResult[UserAccountEntity]) -> Self: 

64 self._document = UserAccountsDocument( 

65 meta=Meta( 

66 count=use_case_result.count, 

67 offset=use_case_result.offset, 

68 limit=use_case_result.limit, 

69 ), 

70 data=[], 

71 ) 

72 async for user_account in use_case_result: 

73 user_account_document = ( 

74 JsonApiUserAccountPresenter().present(user_account).get_document() 

75 ) 

76 self._document.data.append(user_account_document.data) 

77 return self 

78 

79 

80class JsonApiRevokedUserPresenter( 

81 JsonApiPresenter[RevokedUserDocument], 

82 Presenter[UserAccountEntity], 

83): 

84 """A presenter that transforms a user account entity into a JSON:API document. 

85 

86 The document will be a [RevokedUserDocument]. 

87 """ 

88 

89 def present(self, user_account: UserAccountEntity) -> Self: 

90 self._document = RevokedUserDocument( 

91 data=RevokedUserResource( 

92 id=str(user_account.user.uuid), 

93 attributes=RevokedUserAttributes(revoked=user_account.revoked), 

94 ) 

95 ) 

96 return self 

97 

98 

99class JsonApiUserInvitationPresenter( 

100 JsonApiPresenter[UserInvitationDocument], Presenter[UserInvitationEntity] 

101): 

102 """A presenter that transforms a user invitation into a JSON:API document.""" 

103 

104 def present(self, user_invitation: UserInvitationEntity) -> Self: 

105 self._document = UserInvitationDocument( 

106 data=UserInvitationResource( 

107 id=str(user_invitation.uuid), 

108 attributes=UserInvitationAttributes( 

109 email=str(user_invitation.email), 

110 first_name=user_invitation.name.first_name or "", 

111 last_name=user_invitation.name.last_name or "", 

112 remark=user_invitation.remark, 

113 mailed_at=( 

114 str(user_invitation.mailed_at) 

115 if user_invitation.mailed 

116 else None 

117 ), 

118 expired_at=str(user_invitation.expired_at), 

119 confirmed_at=( 

120 str(user_invitation.confirmed_at) 

121 if user_invitation.confirmed 

122 else None 

123 ), 

124 revoked=user_invitation.revoked, 

125 ), 

126 ) 

127 ) 

128 return self 

129 

130 

131class JsonApiUserInvitationsPresenter( 

132 JsonApiPresenter[UserInvitationsDocument], 

133 AsyncPresenter[IterableResult[UserInvitationEntity]], 

134): 

135 """A presenter that transform an iterable list of user account entities into a JSON:API document.""" 

136 

137 async def present( 

138 self, use_case_result: IterableResult[UserInvitationEntity] 

139 ) -> Self: 

140 self._document = UserInvitationsDocument( 

141 meta=Meta( 

142 count=use_case_result.count, 

143 offset=use_case_result.offset, 

144 limit=use_case_result.limit, 

145 ), 

146 data=[], 

147 ) 

148 async for user_invitation in use_case_result: 

149 user_invitation_document = ( 

150 JsonApiUserInvitationPresenter().present(user_invitation).get_document() 

151 ) 

152 self._document.data.append(user_invitation_document.data) 

153 return self