Coverage for bc/kwai-bc-club/src/kwai_bc_club/repositories/person_db_repository.py: 94%

34 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that implements a person repository for a database.""" 

2 

3from dataclasses import dataclass, replace 

4 

5from kwai_core.db.database import Database 

6from kwai_core.db.table_row import JoinedTableRow 

7from sql_smith.functions import alias, on 

8 

9from kwai_bc_club.domain.person import PersonEntity, PersonIdentifier 

10from kwai_bc_club.repositories._tables import ( 

11 ContactRow, 

12 CountryRow, 

13 PersonRow, 

14) 

15from kwai_bc_club.repositories.contact_db_repository import ContactDbRepository 

16from kwai_bc_club.repositories.person_repository import ( 

17 PersonNotFoundException, 

18 PersonRepository, 

19) 

20 

21 

22@dataclass(kw_only=True, frozen=True, slots=True) 

23class PersonQueryRow(JoinedTableRow): 

24 """A data transfer object for a Contact query.""" 

25 

26 person: PersonRow 

27 contact: ContactRow 

28 country: CountryRow 

29 nationality: CountryRow 

30 

31 def create_entity(self) -> PersonEntity: 

32 """Create a Contact entity from a row.""" 

33 return self.person.create_entity( 

34 self.nationality.create_country(), 

35 self.contact.create_entity(self.country.create_country()), 

36 ) 

37 

38 

39class PersonDbRepository(PersonRepository): 

40 """A person repository for a database.""" 

41 

42 def __init__(self, database: Database): 

43 self._database = database 

44 

45 async def create(self, person: PersonEntity) -> PersonEntity: 

46 if person.contact.id.is_empty(): 

47 new_contact = await ContactDbRepository(self._database).create( 

48 person.contact 

49 ) 

50 person = replace(person, contact=new_contact) 

51 new_id = await self._database.insert( 

52 PersonRow.__table_name__, PersonRow.persist(person) 

53 ) 

54 return person.set_id(PersonIdentifier(new_id)) 

55 

56 async def update(self, person: PersonEntity) -> None: 

57 await ContactDbRepository(self._database).update(person.contact) 

58 await self._database.update( 

59 person.id.value, PersonRow.__table_name__, PersonRow.persist(person) 

60 ) 

61 

62 async def delete(self, person: PersonEntity): 

63 await ContactDbRepository(self._database).delete(person.contact) 

64 await self._database.delete(person.id.value, PersonRow.__table_name__) 

65 

66 async def get(self, id_: PersonIdentifier) -> PersonEntity: 

67 query = Database.create_query_factory().select() 

68 query.from_(PersonRow.__table_name__).columns( 

69 *PersonQueryRow.get_aliases() 

70 ).inner_join( 

71 ContactRow.__table_name__, 

72 on(ContactRow.column("id"), PersonRow.column("contact_id")), 

73 ).inner_join( 

74 CountryRow.__table_name__, 

75 on(CountryRow.column("id"), ContactRow.column("country_id")), 

76 ).inner_join( 

77 alias(CountryRow.__table_name__, "nationality"), 

78 on("nationality.id", PersonRow.column("nationality_id")), 

79 ).where(PersonRow.field("id").eq(id_.value)) 

80 

81 row = await self._database.fetch_one(query) 

82 if row: 

83 return PersonQueryRow.map(row).create_entity() 

84 

85 raise PersonNotFoundException(f"Person with {id_} not found")