Coverage for bc/kwai-bc-identity/src/kwai_bc_identity/users/user_account_db_repository.py: 98%

46 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that implements a user account repository for a database.""" 

2 

3from collections.abc import AsyncGenerator 

4from dataclasses import replace 

5 

6from kwai_core.db.database import Database 

7from kwai_core.domain.value_objects.email_address import EmailAddress 

8from kwai_core.domain.value_objects.unique_id import UniqueId 

9 

10from kwai_bc_identity.users.user import UserIdentifier 

11from kwai_bc_identity.users.user_account import ( 

12 UserAccountEntity, 

13 UserAccountIdentifier, 

14) 

15from kwai_bc_identity.users.user_account_db_query import UserAccountDbQuery 

16from kwai_bc_identity.users.user_account_query import UserAccountQuery 

17from kwai_bc_identity.users.user_account_repository import ( 

18 UserAccountNotFoundException, 

19 UserAccountRepository, 

20) 

21from kwai_bc_identity.users.user_tables import ( 

22 UserAccountRow, 

23) 

24 

25 

26class UserAccountDbRepository(UserAccountRepository): 

27 """User account repository for a database.""" 

28 

29 def __init__(self, database: Database): 

30 self._database = database 

31 

32 def create_query(self) -> UserAccountQuery: 

33 return UserAccountDbQuery(self._database) 

34 

35 async def get_all( 

36 self, 

37 query: UserAccountQuery | None = None, 

38 limit: int | None = None, 

39 offset: int | None = None, 

40 ) -> AsyncGenerator[UserAccountEntity, None]: 

41 query = query or self.create_query() 

42 async for row in query.fetch(limit, offset): 

43 yield UserAccountRow.map(row).create_entity() 

44 

45 async def get_user_by_email(self, email: EmailAddress) -> UserAccountEntity: 

46 query = self.create_query().filter_by_email(email) 

47 if row := await query.fetch_one(): 

48 return UserAccountRow.map(row).create_entity() 

49 

50 raise UserAccountNotFoundException() 

51 

52 async def exists_with_email(self, email: EmailAddress) -> bool: 

53 try: 

54 await self.get_user_by_email(email) 

55 except UserAccountNotFoundException: 

56 return False 

57 

58 return True 

59 

60 async def get_user_by_uuid(self, uuid: UniqueId) -> UserAccountEntity: 

61 query = self.create_query() 

62 query.filter_by_uuid(uuid) 

63 

64 row = await query.fetch_one() 

65 if row: 

66 return UserAccountRow.map(row).create_entity() 

67 

68 raise UserAccountNotFoundException() 

69 

70 async def create(self, user_account: UserAccountEntity) -> UserAccountEntity: 

71 new_id = await self._database.insert( 

72 UserAccountRow.__table_name__, UserAccountRow.persist(user_account) 

73 ) 

74 user = user_account.user.set_id(UserIdentifier(new_id)) 

75 return replace(user_account, user=user).set_id(UserAccountIdentifier(new_id)) 

76 

77 async def update(self, user_account: UserAccountEntity): 

78 await self._database.update( 

79 user_account.id.value, 

80 UserAccountRow.__table_name__, 

81 UserAccountRow.persist(user_account), 

82 ) 

83 

84 async def delete(self, user_account): 

85 await self._database.delete( 

86 user_account.id.value, UserAccountRow.__table_name__ 

87 )