Coverage for bc/kwai-bc-teams/src/kwai_bc_teams/create_team_member.py: 100%

24 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that defines the use case for creating a team member.""" 

2 

3from dataclasses import dataclass 

4 

5from kwai_core.domain.presenter import Presenter 

6from kwai_core.domain.value_objects.unique_id import UniqueId 

7 

8from kwai_bc_teams.domain.team import TeamEntity, TeamIdentifier 

9from kwai_bc_teams.domain.team_member import TeamMember 

10from kwai_bc_teams.repositories.member_repository import MemberRepository 

11from kwai_bc_teams.repositories.team_repository import TeamRepository 

12 

13 

14@dataclass(frozen=True, kw_only=True, slots=True) 

15class CreateTeamMemberCommand: 

16 """Input for the 'Create Team Member' use case.""" 

17 

18 team_id: int 

19 member_id: str 

20 active: bool = True 

21 

22 

23class CreateTeamMember: 

24 """Implements the 'Create Team Member' use case.""" 

25 

26 def __init__( 

27 self, 

28 team_repository: TeamRepository, 

29 member_repository: MemberRepository, 

30 presenter: Presenter[tuple[TeamMember, TeamEntity]], 

31 ): 

32 """Initialize the use case. 

33 

34 Args: 

35 team_repository: A repository for retrieving the team. 

36 member_repository: A repository for retrieving the member. 

37 presenter: A Presenter for processing the result. 

38 """ 

39 self._team_repository = team_repository 

40 self._member_repository = member_repository 

41 self._presenter = presenter 

42 

43 async def execute(self, command: CreateTeamMemberCommand) -> None: 

44 """Execute the use case. 

45 

46 Raises: 

47 TeamNotFoundException: If the team does not exist. 

48 MemberNotFoundException: If the member does not exist. 

49 TeamMemberAlreadyExistException: If the member is already part of the team. 

50 """ 

51 team_query = self._team_repository.create_query().filter_by_id( 

52 TeamIdentifier(command.team_id) 

53 ) 

54 team = await self._team_repository.get(team_query) 

55 

56 member_query = self._member_repository.create_query().filter_by_uuid( 

57 UniqueId.create_from_string(command.member_id) 

58 ) 

59 member = await self._member_repository.get(member_query) 

60 

61 team_member = TeamMember(member=member, active=command.active) 

62 team = team.add_member(team_member) 

63 

64 await self._team_repository.add_team_member(team, team_member) 

65 

66 self._presenter.present((team_member, team))