Coverage for bc/kwai-bc-portal/src/kwai_bc_portal/applications/application_db_repository.py: 95%
39 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Module that implements an application repository for a database."""
3from typing import AsyncIterator
5from kwai_core.db.database import Database
7from kwai_bc_portal.applications.application_db_query import ApplicationDbQuery
8from kwai_bc_portal.applications.application_query import ApplicationQuery
9from kwai_bc_portal.applications.application_repository import (
10 ApplicationNotFoundException,
11 ApplicationRepository,
12)
13from kwai_bc_portal.applications.application_tables import ApplicationRow
14from kwai_bc_portal.domain.application import (
15 ApplicationEntity,
16 ApplicationIdentifier,
17)
20class ApplicationDbRepository(ApplicationRepository):
21 """An application database repository.
23 Attributes:
24 _database: the database for this repository.
25 """
27 def __init__(self, database: Database):
28 self._database = database
30 def create_query(self) -> ApplicationQuery:
31 return ApplicationDbQuery(self._database)
33 async def get_by_id(self, id_: ApplicationIdentifier) -> ApplicationEntity:
34 query = self.create_query()
35 query.filter_by_id(id_)
37 if row := await query.fetch_one():
38 return ApplicationRow.map(row).create_entity()
40 raise ApplicationNotFoundException(f"Application with {id} does not exist.")
42 async def get_by_name(self, name: str) -> ApplicationEntity:
43 query = self.create_query()
44 query.filter_by_name(name)
46 if row := await query.fetch_one():
47 return ApplicationRow.map(row).create_entity()
49 raise ApplicationNotFoundException(f"Application with {name} does not exist.")
51 async def get_all(
52 self,
53 query: ApplicationQuery | None = None,
54 limit: int | None = None,
55 offset: int | None = None,
56 ) -> AsyncIterator[ApplicationEntity]:
57 if query is None:
58 query = self.create_query()
59 async for row in query.fetch(limit, offset):
60 yield ApplicationRow.map(row).create_entity()
62 async def create(self, application: ApplicationEntity) -> ApplicationEntity:
63 new_id = await self._database.insert(
64 ApplicationRow.__table_name__, ApplicationRow.persist(application)
65 )
66 await self._database.commit()
67 return application.set_id(ApplicationIdentifier(new_id))
69 async def update(self, application: ApplicationEntity) -> None:
70 await self._database.update(
71 application.id.value,
72 ApplicationRow.__table_name__,
73 ApplicationRow.persist(application),
74 )
75 await self._database.commit()
77 async def delete(self, application: ApplicationEntity) -> None:
78 await self._database.delete(application.id.value, ApplicationRow.__table_name__)
79 await self._database.commit()