Coverage for bc/kwai-bc-identity/src/kwai_bc_identity/user_invitations/user_invitation_db_repository.py: 100%
38 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Module that implements a user invitation repository for a database."""
3from typing import AsyncIterator
5from kwai_core.db.database import Database
6from kwai_core.domain.value_objects.unique_id import UniqueId
8from kwai_bc_identity.user_invitations.user_invitation import (
9 UserInvitationEntity,
10 UserInvitationIdentifier,
11)
12from kwai_bc_identity.user_invitations.user_invitation_db_query import (
13 UserInvitationDbQuery,
14)
15from kwai_bc_identity.user_invitations.user_invitation_query import (
16 UserInvitationQuery,
17)
18from kwai_bc_identity.user_invitations.user_invitation_repository import (
19 UserInvitationNotFoundException,
20 UserInvitationRepository,
21)
22from kwai_bc_identity.user_invitations.user_invitation_tables import (
23 UserInvitationRow,
24)
25from kwai_bc_identity.users.user_tables import UserRow
28def _create_entity(row) -> UserInvitationEntity:
29 """Create a user invitation from a row."""
30 return UserInvitationRow.map(row).create_entity(UserRow.map(row).create_entity())
33class UserInvitationDbRepository(UserInvitationRepository):
34 """A user invitation repository for a database.
36 Attributes:
37 _database(Database): the database for this repository.
38 """
40 def __init__(self, database: Database):
41 self._database = database
43 def create_query(self) -> UserInvitationQuery:
44 return UserInvitationDbQuery(self._database)
46 async def get_all(
47 self,
48 query: UserInvitationQuery,
49 limit: int | None = None,
50 offset: int | None = None,
51 ) -> AsyncIterator[UserInvitationEntity]:
52 async for row in query.fetch(limit, offset):
53 yield _create_entity(row)
55 async def get_invitation_by_id(
56 self, id_: UserInvitationIdentifier
57 ) -> UserInvitationEntity:
58 query = self.create_query()
59 query.filter_by_id(id_)
61 if row := await query.fetch_one():
62 return _create_entity(row)
64 raise UserInvitationNotFoundException(
65 f"User invitation with {id} does not exist."
66 )
68 async def get_invitation_by_uuid(self, uuid: UniqueId) -> UserInvitationEntity:
69 query = self.create_query()
70 query.filter_by_uuid(uuid)
72 if row := await query.fetch_one():
73 return _create_entity(row)
75 raise UserInvitationNotFoundException(
76 f"User invitation with uuid {uuid} does not exist."
77 )
79 async def create(self, invitation: UserInvitationEntity) -> UserInvitationEntity:
80 new_id = await self._database.insert(
81 UserInvitationRow.__table_name__, UserInvitationRow.persist(invitation)
82 )
83 return invitation.set_id(UserInvitationIdentifier(new_id))
85 async def update(self, invitation: UserInvitationEntity) -> None:
86 await self._database.update(
87 invitation.id.value,
88 UserInvitationRow.__table_name__,
89 UserInvitationRow.persist(invitation),
90 )
92 async def delete(self, invitation: UserInvitationEntity) -> None:
93 await self._database.delete(
94 invitation.id.value, UserInvitationRow.__table_name__
95 )