Coverage for bc/kwai-bc-portal/src/kwai_bc_portal/applications/application_db_repository.py: 95%

39 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that implements an application repository for a database.""" 

2 

3from typing import AsyncIterator 

4 

5from kwai_core.db.database import Database 

6 

7from kwai_bc_portal.applications.application_db_query import ApplicationDbQuery 

8from kwai_bc_portal.applications.application_query import ApplicationQuery 

9from kwai_bc_portal.applications.application_repository import ( 

10 ApplicationNotFoundException, 

11 ApplicationRepository, 

12) 

13from kwai_bc_portal.applications.application_tables import ApplicationRow 

14from kwai_bc_portal.domain.application import ( 

15 ApplicationEntity, 

16 ApplicationIdentifier, 

17) 

18 

19 

20class ApplicationDbRepository(ApplicationRepository): 

21 """An application database repository. 

22 

23 Attributes: 

24 _database: the database for this repository. 

25 """ 

26 

27 def __init__(self, database: Database): 

28 self._database = database 

29 

30 def create_query(self) -> ApplicationQuery: 

31 return ApplicationDbQuery(self._database) 

32 

33 async def get_by_id(self, id_: ApplicationIdentifier) -> ApplicationEntity: 

34 query = self.create_query() 

35 query.filter_by_id(id_) 

36 

37 if row := await query.fetch_one(): 

38 return ApplicationRow.map(row).create_entity() 

39 

40 raise ApplicationNotFoundException(f"Application with {id} does not exist.") 

41 

42 async def get_by_name(self, name: str) -> ApplicationEntity: 

43 query = self.create_query() 

44 query.filter_by_name(name) 

45 

46 if row := await query.fetch_one(): 

47 return ApplicationRow.map(row).create_entity() 

48 

49 raise ApplicationNotFoundException(f"Application with {name} does not exist.") 

50 

51 async def get_all( 

52 self, 

53 query: ApplicationQuery | None = None, 

54 limit: int | None = None, 

55 offset: int | None = None, 

56 ) -> AsyncIterator[ApplicationEntity]: 

57 if query is None: 

58 query = self.create_query() 

59 async for row in query.fetch(limit, offset): 

60 yield ApplicationRow.map(row).create_entity() 

61 

62 async def create(self, application: ApplicationEntity) -> ApplicationEntity: 

63 new_id = await self._database.insert( 

64 ApplicationRow.__table_name__, ApplicationRow.persist(application) 

65 ) 

66 await self._database.commit() 

67 return application.set_id(ApplicationIdentifier(new_id)) 

68 

69 async def update(self, application: ApplicationEntity) -> None: 

70 await self._database.update( 

71 application.id.value, 

72 ApplicationRow.__table_name__, 

73 ApplicationRow.persist(application), 

74 ) 

75 await self._database.commit() 

76 

77 async def delete(self, application: ApplicationEntity) -> None: 

78 await self._database.delete(application.id.value, ApplicationRow.__table_name__) 

79 await self._database.commit()