Coverage for bc/kwai-bc-club/src/kwai_bc_club/repositories/coach_db_repository.py: 94%

34 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module for defining a coach repository using a database.""" 

2 

3from typing import AsyncGenerator 

4 

5from kwai_core.db.database import Database 

6 

7from kwai_bc_club.domain.club_coach import ClubCoachEntity, ClubCoachIdentifier 

8from kwai_bc_club.repositories._tables import CoachRow 

9from kwai_bc_club.repositories.coach_db_query import CoachDBQuery, CoachQueryRow 

10from kwai_bc_club.repositories.coach_query import CoachQuery 

11from kwai_bc_club.repositories.coach_repository import ( 

12 CoachNotFoundException, 

13 CoachRepository, 

14) 

15 

16 

17class CoachDbRepository(CoachRepository): 

18 """A coach repository using a database.""" 

19 

20 def __init__(self, database: Database) -> None: 

21 self._database = database 

22 

23 def create_query(self) -> CoachQuery: 

24 return CoachDBQuery(self._database) 

25 

26 async def get(self, query: CoachQuery) -> ClubCoachEntity: 

27 if coach_entity := await anext(self.get_all(query), None): 

28 return coach_entity 

29 raise CoachNotFoundException("No records found for the given query.") 

30 

31 async def get_by_id(self, id_: ClubCoachIdentifier) -> ClubCoachEntity: 

32 query = self.create_query().filter_by_id(id_) 

33 coach_iterator = self.get_all(query) 

34 try: 

35 return await anext(coach_iterator) 

36 except StopAsyncIteration: 

37 raise CoachNotFoundException(f"Coach not found with {id_}") from None 

38 

39 async def get_all( 

40 self, 

41 query: CoachQuery | None = None, 

42 limit: int | None = None, 

43 offset: int | None = None, 

44 ) -> AsyncGenerator[ClubCoachEntity, None]: 

45 query = query or self.create_query() 

46 async for row in query.fetch(limit, offset): 

47 yield CoachQueryRow.map(row).create_entity() 

48 

49 async def create(self, coach: ClubCoachEntity): 

50 new_coach_id = await self._database.insert( 

51 CoachRow.__table_name__, CoachRow.persist(coach) 

52 ) 

53 return coach.set_id(ClubCoachIdentifier(new_coach_id)) 

54 

55 async def delete(self, coach: ClubCoachEntity): 

56 await self._database.delete(coach.id.value, CoachRow.__table_name__) 

57 

58 async def update(self, coach: ClubCoachEntity) -> None: 

59 await self._database.update( 

60 coach.id.value, CoachRow.__table_name__, CoachRow.persist(coach) 

61 )