Coverage for apps/kwai-api/src/kwai_api/v1/auth/presenters.py: 85%
34 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Module that defines presenters for the auth api."""
3from typing import Self
5from kwai_bc_identity.user_invitations.user_invitation import UserInvitationEntity
6from kwai_bc_identity.users.user_account import UserAccountEntity
7from kwai_core.domain.presenter import AsyncPresenter, IterableResult, Presenter
8from kwai_core.json_api import JsonApiPresenter, Meta
10from kwai_api.v1.auth.schemas.revoked_user import (
11 RevokedUserAttributes,
12 RevokedUserDocument,
13 RevokedUserResource,
14)
15from kwai_api.v1.auth.schemas.user_account import (
16 UserAccountAttributes,
17 UserAccountDocument,
18 UserAccountResource,
19 UserAccountsDocument,
20)
21from kwai_api.v1.auth.schemas.user_invitation import (
22 UserInvitationAttributes,
23 UserInvitationDocument,
24 UserInvitationResource,
25 UserInvitationsDocument,
26)
29class JsonApiUserAccountPresenter(
30 JsonApiPresenter[UserAccountDocument], Presenter[UserAccountEntity]
31):
32 """A presenter that transforms a user account entity into a JSON:API document."""
34 def present(self, user_account: UserAccountEntity) -> Self:
35 self._document = UserAccountDocument(
36 data=UserAccountResource(
37 id=str(user_account.user.uuid),
38 attributes=UserAccountAttributes(
39 email=str(user_account.user.email),
40 last_login=None
41 if user_account.last_login.empty
42 else str(user_account.last_login),
43 last_unsuccessful_login=None
44 if user_account.last_unsuccessful_login.empty
45 else str(user_account.last_unsuccessful_login),
46 revoked=user_account.revoked,
47 admin=user_account.admin,
48 first_name=user_account.user.name.first_name,
49 last_name=user_account.user.name.last_name,
50 remark=user_account.user.remark,
51 ),
52 )
53 )
54 return self
57class JsonApiUserAccountsPresenter(
58 JsonApiPresenter[UserAccountsDocument],
59 AsyncPresenter[IterableResult[UserAccountEntity]],
60):
61 """A presenter that transform an iterable list of user account entities into a JSON:API document."""
63 async def present(self, result: IterableResult[UserAccountEntity]) -> Self:
64 self._document = UserAccountsDocument(
65 meta=Meta(count=result.count, offset=result.offset, limit=result.limit),
66 data=[],
67 )
68 async for user_account in result.iterator:
69 user_account_document = (
70 JsonApiUserAccountPresenter().present(user_account).get_document()
71 )
72 self._document.data.append(user_account_document.data)
73 return self
76class JsonApiRevokedUserPresenter(
77 JsonApiPresenter[RevokedUserDocument],
78 Presenter[UserAccountEntity],
79):
80 """A presenter that transforms a user account entity into a JSON:API document.
82 The document will be a [RevokedUserDocument].
83 """
85 def present(self, user_account: UserAccountEntity) -> Self:
86 self._document = RevokedUserDocument(
87 data=RevokedUserResource(
88 id=str(user_account.user.uuid),
89 attributes=RevokedUserAttributes(revoked=user_account.revoked),
90 )
91 )
92 return self
95class JsonApiUserInvitationPresenter(
96 JsonApiPresenter[UserInvitationDocument], Presenter[UserInvitationEntity]
97):
98 """A presenter that transforms a user invitation into a JSON:API document."""
100 def present(self, user_invitation: UserInvitationEntity) -> Self:
101 self._document = UserInvitationDocument(
102 data=UserInvitationResource(
103 id=str(user_invitation.uuid),
104 attributes=UserInvitationAttributes(
105 email=str(user_invitation.email),
106 first_name=user_invitation.name.first_name or "",
107 last_name=user_invitation.name.last_name or "",
108 remark=user_invitation.remark,
109 mailed_at=(
110 str(user_invitation.mailed_at)
111 if user_invitation.mailed
112 else None
113 ),
114 expired_at=str(user_invitation.expired_at),
115 confirmed_at=(
116 str(user_invitation.confirmed_at)
117 if user_invitation.confirmed
118 else None
119 ),
120 revoked=user_invitation.revoked,
121 ),
122 )
123 )
124 return self
127class JsonApiUserInvitationsPresenter(
128 JsonApiPresenter[UserInvitationsDocument],
129 AsyncPresenter[IterableResult[UserInvitationEntity]],
130):
131 """A presenter that transform an iterable list of user account entities into a JSON:API document."""
133 async def present(self, result: IterableResult[UserInvitationEntity]) -> Self:
134 self._document = UserInvitationsDocument(
135 meta=Meta(count=result.count, offset=result.offset, limit=result.limit),
136 data=[],
137 )
138 async for user_invitation in result.iterator:
139 user_invitation_document = (
140 JsonApiUserInvitationPresenter().present(user_invitation).get_document()
141 )
142 self._document.data.append(user_invitation_document.data)
143 return self