Coverage for bc/kwai-bc-club/src/kwai_bc_club/repositories/person_db_repository.py: 94%
34 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Module that implements a person repository for a database."""
3from dataclasses import dataclass, replace
5from kwai_core.db.database import Database
6from kwai_core.db.table_row import JoinedTableRow
7from sql_smith.functions import alias, on
9from kwai_bc_club.domain.person import PersonEntity, PersonIdentifier
10from kwai_bc_club.repositories._tables import (
11 ContactRow,
12 CountryRow,
13 PersonRow,
14)
15from kwai_bc_club.repositories.contact_db_repository import ContactDbRepository
16from kwai_bc_club.repositories.person_repository import (
17 PersonNotFoundException,
18 PersonRepository,
19)
22@dataclass(kw_only=True, frozen=True, slots=True)
23class PersonQueryRow(JoinedTableRow):
24 """A data transfer object for a Contact query."""
26 person: PersonRow
27 contact: ContactRow
28 country: CountryRow
29 nationality: CountryRow
31 def create_entity(self) -> PersonEntity:
32 """Create a Contact entity from a row."""
33 return self.person.create_entity(
34 self.nationality.create_country(),
35 self.contact.create_entity(self.country.create_country()),
36 )
39class PersonDbRepository(PersonRepository):
40 """A person repository for a database."""
42 def __init__(self, database: Database):
43 self._database = database
45 async def create(self, person: PersonEntity) -> PersonEntity:
46 if person.contact.id.is_empty():
47 new_contact = await ContactDbRepository(self._database).create(
48 person.contact
49 )
50 person = replace(person, contact=new_contact)
51 new_id = await self._database.insert(
52 PersonRow.__table_name__, PersonRow.persist(person)
53 )
54 return person.set_id(PersonIdentifier(new_id))
56 async def update(self, person: PersonEntity) -> None:
57 await ContactDbRepository(self._database).update(person.contact)
58 await self._database.update(
59 person.id.value, PersonRow.__table_name__, PersonRow.persist(person)
60 )
62 async def delete(self, person: PersonEntity):
63 await ContactDbRepository(self._database).delete(person.contact)
64 await self._database.delete(person.id.value, PersonRow.__table_name__)
66 async def get(self, id_: PersonIdentifier) -> PersonEntity:
67 query = Database.create_query_factory().select()
68 query.from_(PersonRow.__table_name__).columns(
69 *PersonQueryRow.get_aliases()
70 ).inner_join(
71 ContactRow.__table_name__,
72 on(ContactRow.column("id"), PersonRow.column("contact_id")),
73 ).inner_join(
74 CountryRow.__table_name__,
75 on(CountryRow.column("id"), ContactRow.column("country_id")),
76 ).inner_join(
77 alias(CountryRow.__table_name__, "nationality"),
78 on("nationality.id", PersonRow.column("nationality_id")),
79 ).where(PersonRow.field("id").eq(id_.value))
81 row = await self._database.fetch_one(query)
82 if row:
83 return PersonQueryRow.map(row).create_entity()
85 raise PersonNotFoundException(f"Person with {id_} not found")