Coverage for src/tests/modules/identity/user_invitations/test_user_invitation_db_query.py: 75%

48 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module for testing UserInvitationDbQuery.""" 

2 

3import pytest 

4 

5from kwai_bc_identity.user_invitations.user_invitation import ( 

6 UserInvitationIdentifier, 

7) 

8from kwai_bc_identity.user_invitations.user_invitation_db_query import ( 

9 UserInvitationDbQuery, 

10) 

11from kwai_bc_identity.user_invitations.user_invitation_query import ( 

12 UserInvitationQuery, 

13) 

14from kwai_core.db.database import Database 

15from kwai_core.domain.value_objects.email_address import EmailAddress 

16from kwai_core.domain.value_objects.timestamp import Timestamp 

17from kwai_core.domain.value_objects.unique_id import UniqueId 

18 

19 

20pytestmark = pytest.mark.db 

21 

22 

23@pytest.fixture(scope="module") 

24def query(database: Database) -> UserInvitationQuery: 

25 """Fixture for creating the invitation query.""" 

26 return UserInvitationDbQuery(database) 

27 

28 

29async def test_filter_by_id(query: UserInvitationQuery): 

30 """Test the filter by id.""" 

31 query.filter_by_id(UserInvitationIdentifier(1)) 

32 try: 

33 await query.fetch_one() 

34 except Exception as exc: 

35 pytest.fail(f"An exception occurred: {exc}") 

36 

37 

38async def test_filter_by_uuid(query: UserInvitationQuery): 

39 """Test filter by the unique id.""" 

40 query.filter_by_uuid(UniqueId.generate()) 

41 try: 

42 await query.fetch_one() 

43 except Exception as exc: 

44 pytest.fail(f"An exception occurred: {exc}") 

45 

46 

47async def test_query_filter_by_email(query: UserInvitationQuery): 

48 """Test the filter by email query.""" 

49 query.filter_by_email(EmailAddress("ichiro.abe@kwai.com")) 

50 try: 

51 await query.fetch_one() 

52 except Exception as exc: 

53 pytest.fail(f"An exception occurred: {exc}") 

54 

55 

56async def test_query_filter_active(query: UserInvitationQuery): 

57 """Test the filter active query.""" 

58 query.filter_active() 

59 try: 

60 await query.fetch_one() 

61 except Exception as exc: 

62 pytest.fail(f"An exception occurred: {exc}") 

63 

64 

65async def test_query_filter_not_expired(query: UserInvitationQuery): 

66 """Test the filter on not expired.""" 

67 query.filter_not_expired(Timestamp.create_now()) 

68 try: 

69 await query.fetch_one() 

70 except Exception as exc: 

71 pytest.fail(f"An exception occurred: {exc}") 

72 

73 

74async def test_query_filter_not_confirmed(query: UserInvitationQuery): 

75 """Test the filter on not confirmed.""" 

76 query.filter_not_confirmed() 

77 try: 

78 await query.fetch_one() 

79 except Exception as exc: 

80 pytest.fail(f"An exception occurred: {exc}")