Coverage for bc/kwai-bc-portal/src/kwai_bc_portal/repositories/author_db_repository.py: 94%
34 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Module that implements an AuthorRepository for a database."""
3from kwai_core.db.database import Database
4from kwai_core.domain.value_objects.unique_id import UniqueId
6from kwai_bc_portal.domain.author import AuthorEntity, AuthorIdentifier
7from kwai_bc_portal.repositories._tables import AuthorRow
8from kwai_bc_portal.repositories.author_db_query import (
9 AuthorDbQuery,
10 AuthorQueryRow,
11)
12from kwai_bc_portal.repositories.author_query import AuthorQuery
13from kwai_bc_portal.repositories.author_repository import (
14 AuthorNotFoundException,
15 AuthorRepository,
16)
19class AuthorDbRepository(AuthorRepository):
20 """A author repository for a database."""
22 def __init__(self, database: Database):
23 self._database = database
24 super().__init__()
26 def create_query(self) -> AuthorQuery:
27 """Create a base query."""
28 return AuthorDbQuery(self._database)
30 async def get(self, id_: AuthorIdentifier) -> AuthorEntity:
31 query = self.create_query().filter_by_id(id_)
32 row = await query.fetch_one()
33 if row:
34 return AuthorQueryRow.map(row).create_entity()
36 raise AuthorNotFoundException()
38 async def get_by_uuid(self, uuid: UniqueId) -> AuthorEntity:
39 query = self.create_query().filter_by_uuid(uuid)
40 row = await query.fetch_one()
41 if row:
42 return AuthorQueryRow.map(row).create_entity()
44 raise AuthorNotFoundException()
46 async def get_all(self, query=None, limit=0, offset=0):
47 query = query or self.create_query()
49 async for row in query.fetch(limit, offset):
50 yield AuthorQueryRow.map(row).create_entity()
52 async def create(self, author: AuthorEntity) -> AuthorEntity:
53 await self._database.insert(AuthorRow.__table_name__, AuthorRow.persist(author))
54 return author
56 async def delete(self, author: AuthorEntity) -> None:
57 await self._database.delete(
58 author.id.value, AuthorRow.__table_name__, "user_id"
59 )