Coverage for bc/kwai-bc-portal/src/kwai_bc_portal/repositories/author_db_repository.py: 94%

34 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that implements an AuthorRepository for a database.""" 

2 

3from kwai_core.db.database import Database 

4from kwai_core.domain.value_objects.unique_id import UniqueId 

5 

6from kwai_bc_portal.domain.author import AuthorEntity, AuthorIdentifier 

7from kwai_bc_portal.repositories._tables import AuthorRow 

8from kwai_bc_portal.repositories.author_db_query import ( 

9 AuthorDbQuery, 

10 AuthorQueryRow, 

11) 

12from kwai_bc_portal.repositories.author_query import AuthorQuery 

13from kwai_bc_portal.repositories.author_repository import ( 

14 AuthorNotFoundException, 

15 AuthorRepository, 

16) 

17 

18 

19class AuthorDbRepository(AuthorRepository): 

20 """A author repository for a database.""" 

21 

22 def __init__(self, database: Database): 

23 self._database = database 

24 super().__init__() 

25 

26 def create_query(self) -> AuthorQuery: 

27 """Create a base query.""" 

28 return AuthorDbQuery(self._database) 

29 

30 async def get(self, id_: AuthorIdentifier) -> AuthorEntity: 

31 query = self.create_query().filter_by_id(id_) 

32 row = await query.fetch_one() 

33 if row: 

34 return AuthorQueryRow.map(row).create_entity() 

35 

36 raise AuthorNotFoundException() 

37 

38 async def get_by_uuid(self, uuid: UniqueId) -> AuthorEntity: 

39 query = self.create_query().filter_by_uuid(uuid) 

40 row = await query.fetch_one() 

41 if row: 

42 return AuthorQueryRow.map(row).create_entity() 

43 

44 raise AuthorNotFoundException() 

45 

46 async def get_all(self, query=None, limit=0, offset=0): 

47 query = query or self.create_query() 

48 

49 async for row in query.fetch(limit, offset): 

50 yield AuthorQueryRow.map(row).create_entity() 

51 

52 async def create(self, author: AuthorEntity) -> AuthorEntity: 

53 await self._database.insert(AuthorRow.__table_name__, AuthorRow.persist(author)) 

54 return author 

55 

56 async def delete(self, author: AuthorEntity) -> None: 

57 await self._database.delete( 

58 author.id.value, AuthorRow.__table_name__, "user_id" 

59 )