Coverage for apps/kwai-api/src/kwai_api/v1/auth/presenters.py: 85%
34 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Module that defines presenters for the auth api."""
3from typing import Self
5from kwai_bc_identity.user_invitations.user_invitation import UserInvitationEntity
6from kwai_bc_identity.users.user_account import UserAccountEntity
7from kwai_core.domain.presenter import AsyncPresenter, IterableResult, Presenter
8from kwai_core.json_api import JsonApiPresenter, Meta
10from kwai_api.v1.auth.schemas.revoked_user import (
11 RevokedUserAttributes,
12 RevokedUserDocument,
13 RevokedUserResource,
14)
15from kwai_api.v1.auth.schemas.user_account import (
16 UserAccountAttributes,
17 UserAccountDocument,
18 UserAccountResource,
19 UserAccountsDocument,
20)
21from kwai_api.v1.auth.schemas.user_invitation import (
22 UserInvitationAttributes,
23 UserInvitationDocument,
24 UserInvitationResource,
25 UserInvitationsDocument,
26)
29class JsonApiUserAccountPresenter(
30 JsonApiPresenter[UserAccountDocument], Presenter[UserAccountEntity]
31):
32 """A presenter that transforms a user account entity into a JSON:API document."""
34 def present(self, user_account: UserAccountEntity) -> Self:
35 self._document = UserAccountDocument(
36 data=UserAccountResource(
37 id=str(user_account.user.uuid),
38 attributes=UserAccountAttributes(
39 email=str(user_account.user.email),
40 last_login=None
41 if user_account.last_login.empty
42 else str(user_account.last_login),
43 last_unsuccessful_login=None
44 if user_account.last_unsuccessful_login.empty
45 else str(user_account.last_unsuccessful_login),
46 revoked=user_account.revoked,
47 admin=user_account.user.admin,
48 first_name=user_account.user.name.first_name,
49 last_name=user_account.user.name.last_name,
50 remark=user_account.user.remark,
51 ),
52 )
53 )
54 return self
57class JsonApiUserAccountsPresenter(
58 JsonApiPresenter[UserAccountsDocument],
59 AsyncPresenter[IterableResult[UserAccountEntity]],
60):
61 """A presenter that transform an iterable list of user account entities into a JSON:API document."""
63 async def present(self, use_case_result: IterableResult[UserAccountEntity]) -> Self:
64 self._document = UserAccountsDocument(
65 meta=Meta(
66 count=use_case_result.count,
67 offset=use_case_result.offset,
68 limit=use_case_result.limit,
69 ),
70 data=[],
71 )
72 async for user_account in use_case_result:
73 user_account_document = (
74 JsonApiUserAccountPresenter().present(user_account).get_document()
75 )
76 self._document.data.append(user_account_document.data)
77 return self
80class JsonApiRevokedUserPresenter(
81 JsonApiPresenter[RevokedUserDocument],
82 Presenter[UserAccountEntity],
83):
84 """A presenter that transforms a user account entity into a JSON:API document.
86 The document will be a [RevokedUserDocument].
87 """
89 def present(self, user_account: UserAccountEntity) -> Self:
90 self._document = RevokedUserDocument(
91 data=RevokedUserResource(
92 id=str(user_account.user.uuid),
93 attributes=RevokedUserAttributes(revoked=user_account.revoked),
94 )
95 )
96 return self
99class JsonApiUserInvitationPresenter(
100 JsonApiPresenter[UserInvitationDocument], Presenter[UserInvitationEntity]
101):
102 """A presenter that transforms a user invitation into a JSON:API document."""
104 def present(self, user_invitation: UserInvitationEntity) -> Self:
105 self._document = UserInvitationDocument(
106 data=UserInvitationResource(
107 id=str(user_invitation.uuid),
108 attributes=UserInvitationAttributes(
109 email=str(user_invitation.email),
110 first_name=user_invitation.name.first_name or "",
111 last_name=user_invitation.name.last_name or "",
112 remark=user_invitation.remark,
113 mailed_at=(
114 str(user_invitation.mailed_at)
115 if user_invitation.mailed
116 else None
117 ),
118 expired_at=str(user_invitation.expired_at),
119 confirmed_at=(
120 str(user_invitation.confirmed_at)
121 if user_invitation.confirmed
122 else None
123 ),
124 revoked=user_invitation.revoked,
125 ),
126 )
127 )
128 return self
131class JsonApiUserInvitationsPresenter(
132 JsonApiPresenter[UserInvitationsDocument],
133 AsyncPresenter[IterableResult[UserInvitationEntity]],
134):
135 """A presenter that transform an iterable list of user account entities into a JSON:API document."""
137 async def present(
138 self, use_case_result: IterableResult[UserInvitationEntity]
139 ) -> Self:
140 self._document = UserInvitationsDocument(
141 meta=Meta(
142 count=use_case_result.count,
143 offset=use_case_result.offset,
144 limit=use_case_result.limit,
145 ),
146 data=[],
147 )
148 async for user_invitation in use_case_result:
149 user_invitation_document = (
150 JsonApiUserInvitationPresenter().present(user_invitation).get_document()
151 )
152 self._document.data.append(user_invitation_document.data)
153 return self