Coverage for src/tests/modules/identity/user_invitations/test_user_invitation_db_query.py: 75%
48 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Module for testing UserInvitationDbQuery."""
3import pytest
5from kwai_bc_identity.user_invitations.user_invitation import (
6 UserInvitationIdentifier,
7)
8from kwai_bc_identity.user_invitations.user_invitation_db_query import (
9 UserInvitationDbQuery,
10)
11from kwai_bc_identity.user_invitations.user_invitation_query import (
12 UserInvitationQuery,
13)
14from kwai_core.db.database import Database
15from kwai_core.domain.value_objects.email_address import EmailAddress
16from kwai_core.domain.value_objects.timestamp import Timestamp
17from kwai_core.domain.value_objects.unique_id import UniqueId
20pytestmark = pytest.mark.db
23@pytest.fixture(scope="module")
24def query(database: Database) -> UserInvitationQuery:
25 """Fixture for creating the invitation query."""
26 return UserInvitationDbQuery(database)
29async def test_filter_by_id(query: UserInvitationQuery):
30 """Test the filter by id."""
31 query.filter_by_id(UserInvitationIdentifier(1))
32 try:
33 await query.fetch_one()
34 except Exception as exc:
35 pytest.fail(f"An exception occurred: {exc}")
38async def test_filter_by_uuid(query: UserInvitationQuery):
39 """Test filter by the unique id."""
40 query.filter_by_uuid(UniqueId.generate())
41 try:
42 await query.fetch_one()
43 except Exception as exc:
44 pytest.fail(f"An exception occurred: {exc}")
47async def test_query_filter_by_email(query: UserInvitationQuery):
48 """Test the filter by email query."""
49 query.filter_by_email(EmailAddress("ichiro.abe@kwai.com"))
50 try:
51 await query.fetch_one()
52 except Exception as exc:
53 pytest.fail(f"An exception occurred: {exc}")
56async def test_query_filter_active(query: UserInvitationQuery):
57 """Test the filter active query."""
58 query.filter_active()
59 try:
60 await query.fetch_one()
61 except Exception as exc:
62 pytest.fail(f"An exception occurred: {exc}")
65async def test_query_filter_not_expired(query: UserInvitationQuery):
66 """Test the filter on not expired."""
67 query.filter_not_expired(Timestamp.create_now())
68 try:
69 await query.fetch_one()
70 except Exception as exc:
71 pytest.fail(f"An exception occurred: {exc}")
74async def test_query_filter_not_confirmed(query: UserInvitationQuery):
75 """Test the filter on not confirmed."""
76 query.filter_not_confirmed()
77 try:
78 await query.fetch_one()
79 except Exception as exc:
80 pytest.fail(f"An exception occurred: {exc}")