Coverage for apps/kwai-api/src/kwai_api/v1/auth/presenters.py: 85%

34 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that defines presenters for the auth api.""" 

2 

3from typing import Self 

4 

5from kwai_bc_identity.user_invitations.user_invitation import UserInvitationEntity 

6from kwai_bc_identity.users.user_account import UserAccountEntity 

7from kwai_core.domain.presenter import AsyncPresenter, IterableResult, Presenter 

8from kwai_core.json_api import JsonApiPresenter, Meta 

9 

10from kwai_api.v1.auth.schemas.revoked_user import ( 

11 RevokedUserAttributes, 

12 RevokedUserDocument, 

13 RevokedUserResource, 

14) 

15from kwai_api.v1.auth.schemas.user_account import ( 

16 UserAccountAttributes, 

17 UserAccountDocument, 

18 UserAccountResource, 

19 UserAccountsDocument, 

20) 

21from kwai_api.v1.auth.schemas.user_invitation import ( 

22 UserInvitationAttributes, 

23 UserInvitationDocument, 

24 UserInvitationResource, 

25 UserInvitationsDocument, 

26) 

27 

28 

29class JsonApiUserAccountPresenter( 

30 JsonApiPresenter[UserAccountDocument], Presenter[UserAccountEntity] 

31): 

32 """A presenter that transforms a user account entity into a JSON:API document.""" 

33 

34 def present(self, user_account: UserAccountEntity) -> Self: 

35 self._document = UserAccountDocument( 

36 data=UserAccountResource( 

37 id=str(user_account.user.uuid), 

38 attributes=UserAccountAttributes( 

39 email=str(user_account.user.email), 

40 last_login=None 

41 if user_account.last_login.empty 

42 else str(user_account.last_login), 

43 last_unsuccessful_login=None 

44 if user_account.last_unsuccessful_login.empty 

45 else str(user_account.last_unsuccessful_login), 

46 revoked=user_account.revoked, 

47 admin=user_account.admin, 

48 first_name=user_account.user.name.first_name, 

49 last_name=user_account.user.name.last_name, 

50 remark=user_account.user.remark, 

51 ), 

52 ) 

53 ) 

54 return self 

55 

56 

57class JsonApiUserAccountsPresenter( 

58 JsonApiPresenter[UserAccountsDocument], 

59 AsyncPresenter[IterableResult[UserAccountEntity]], 

60): 

61 """A presenter that transform an iterable list of user account entities into a JSON:API document.""" 

62 

63 async def present(self, result: IterableResult[UserAccountEntity]) -> Self: 

64 self._document = UserAccountsDocument( 

65 meta=Meta(count=result.count, offset=result.offset, limit=result.limit), 

66 data=[], 

67 ) 

68 async for user_account in result.iterator: 

69 user_account_document = ( 

70 JsonApiUserAccountPresenter().present(user_account).get_document() 

71 ) 

72 self._document.data.append(user_account_document.data) 

73 return self 

74 

75 

76class JsonApiRevokedUserPresenter( 

77 JsonApiPresenter[RevokedUserDocument], 

78 Presenter[UserAccountEntity], 

79): 

80 """A presenter that transforms a user account entity into a JSON:API document. 

81 

82 The document will be a [RevokedUserDocument]. 

83 """ 

84 

85 def present(self, user_account: UserAccountEntity) -> Self: 

86 self._document = RevokedUserDocument( 

87 data=RevokedUserResource( 

88 id=str(user_account.user.uuid), 

89 attributes=RevokedUserAttributes(revoked=user_account.revoked), 

90 ) 

91 ) 

92 return self 

93 

94 

95class JsonApiUserInvitationPresenter( 

96 JsonApiPresenter[UserInvitationDocument], Presenter[UserInvitationEntity] 

97): 

98 """A presenter that transforms a user invitation into a JSON:API document.""" 

99 

100 def present(self, user_invitation: UserInvitationEntity) -> Self: 

101 self._document = UserInvitationDocument( 

102 data=UserInvitationResource( 

103 id=str(user_invitation.uuid), 

104 attributes=UserInvitationAttributes( 

105 email=str(user_invitation.email), 

106 first_name=user_invitation.name.first_name or "", 

107 last_name=user_invitation.name.last_name or "", 

108 remark=user_invitation.remark, 

109 mailed_at=( 

110 str(user_invitation.mailed_at) 

111 if user_invitation.mailed 

112 else None 

113 ), 

114 expired_at=str(user_invitation.expired_at), 

115 confirmed_at=( 

116 str(user_invitation.confirmed_at) 

117 if user_invitation.confirmed 

118 else None 

119 ), 

120 revoked=user_invitation.revoked, 

121 ), 

122 ) 

123 ) 

124 return self 

125 

126 

127class JsonApiUserInvitationsPresenter( 

128 JsonApiPresenter[UserInvitationsDocument], 

129 AsyncPresenter[IterableResult[UserInvitationEntity]], 

130): 

131 """A presenter that transform an iterable list of user account entities into a JSON:API document.""" 

132 

133 async def present(self, result: IterableResult[UserInvitationEntity]) -> Self: 

134 self._document = UserInvitationsDocument( 

135 meta=Meta(count=result.count, offset=result.offset, limit=result.limit), 

136 data=[], 

137 ) 

138 async for user_invitation in result.iterator: 

139 user_invitation_document = ( 

140 JsonApiUserInvitationPresenter().present(user_invitation).get_document() 

141 ) 

142 self._document.data.append(user_invitation_document.data) 

143 return self