Coverage for bc/kwai-bc-identity/src/kwai_bc_identity/user_invitations/user_invitation_db_repository.py: 100%

38 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that implements a user invitation repository for a database.""" 

2 

3from typing import AsyncIterator 

4 

5from kwai_core.db.database import Database 

6from kwai_core.domain.value_objects.unique_id import UniqueId 

7 

8from kwai_bc_identity.user_invitations.user_invitation import ( 

9 UserInvitationEntity, 

10 UserInvitationIdentifier, 

11) 

12from kwai_bc_identity.user_invitations.user_invitation_db_query import ( 

13 UserInvitationDbQuery, 

14) 

15from kwai_bc_identity.user_invitations.user_invitation_query import ( 

16 UserInvitationQuery, 

17) 

18from kwai_bc_identity.user_invitations.user_invitation_repository import ( 

19 UserInvitationNotFoundException, 

20 UserInvitationRepository, 

21) 

22from kwai_bc_identity.user_invitations.user_invitation_tables import ( 

23 UserInvitationRow, 

24) 

25from kwai_bc_identity.users.user_tables import UserRow 

26 

27 

28def _create_entity(row) -> UserInvitationEntity: 

29 """Create a user invitation from a row.""" 

30 return UserInvitationRow.map(row).create_entity(UserRow.map(row).create_entity()) 

31 

32 

33class UserInvitationDbRepository(UserInvitationRepository): 

34 """A user invitation repository for a database. 

35 

36 Attributes: 

37 _database(Database): the database for this repository. 

38 """ 

39 

40 def __init__(self, database: Database): 

41 self._database = database 

42 

43 def create_query(self) -> UserInvitationQuery: 

44 return UserInvitationDbQuery(self._database) 

45 

46 async def get_all( 

47 self, 

48 query: UserInvitationQuery, 

49 limit: int | None = None, 

50 offset: int | None = None, 

51 ) -> AsyncIterator[UserInvitationEntity]: 

52 async for row in query.fetch(limit, offset): 

53 yield _create_entity(row) 

54 

55 async def get_invitation_by_id( 

56 self, id_: UserInvitationIdentifier 

57 ) -> UserInvitationEntity: 

58 query = self.create_query() 

59 query.filter_by_id(id_) 

60 

61 if row := await query.fetch_one(): 

62 return _create_entity(row) 

63 

64 raise UserInvitationNotFoundException( 

65 f"User invitation with {id} does not exist." 

66 ) 

67 

68 async def get_invitation_by_uuid(self, uuid: UniqueId) -> UserInvitationEntity: 

69 query = self.create_query() 

70 query.filter_by_uuid(uuid) 

71 

72 if row := await query.fetch_one(): 

73 return _create_entity(row) 

74 

75 raise UserInvitationNotFoundException( 

76 f"User invitation with uuid {uuid} does not exist." 

77 ) 

78 

79 async def create(self, invitation: UserInvitationEntity) -> UserInvitationEntity: 

80 new_id = await self._database.insert( 

81 UserInvitationRow.__table_name__, UserInvitationRow.persist(invitation) 

82 ) 

83 return invitation.set_id(UserInvitationIdentifier(new_id)) 

84 

85 async def update(self, invitation: UserInvitationEntity) -> None: 

86 await self._database.update( 

87 invitation.id.value, 

88 UserInvitationRow.__table_name__, 

89 UserInvitationRow.persist(invitation), 

90 ) 

91 

92 async def delete(self, invitation: UserInvitationEntity) -> None: 

93 await self._database.delete( 

94 invitation.id.value, UserInvitationRow.__table_name__ 

95 )