Coverage for apps/kwai-api/src/kwai_api/security_dependencies.py: 65%
57 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Module that defines dependencies that can be used to check authentication and authorization."""
3from typing import Annotated
5import jwt
7from fastapi import Cookie, Depends, Header
8from kwai_bc_club.domain.club_coach import ClubCoachEntity
9from kwai_bc_club.get_user_coach import GetUserCoach, GetUserCoachCommand
10from kwai_bc_club.repositories.coach_db_repository import CoachDbRepository
11from kwai_bc_club.repositories.coach_repository import CoachNotFoundException
12from kwai_bc_identity.exceptions import AuthenticationException
13from kwai_bc_identity.refresh_access_token import (
14 RefreshAccessToken,
15 RefreshAccessTokenCommand,
16)
17from kwai_bc_identity.tokens.access_token_db_repository import AccessTokenDbRepository
18from kwai_bc_identity.tokens.log_user_login_db_service import LogUserLoginDbService
19from kwai_bc_identity.tokens.refresh_token_db_repository import RefreshTokenDbRepository
20from kwai_bc_identity.users.user import UserEntity
21from kwai_core.db.database import Database
22from kwai_core.db.uow import UnitOfWork
23from kwai_core.domain.presenter import EntityPresenter
24from kwai_core.settings import Settings, get_settings
25from starlette.requests import Request
26from starlette.responses import Response
28from kwai_api.dependencies import create_database, get_optional_user
29from kwai_api.v1.auth.cookies import create_cookies
32async def check_login(
33 request: Request,
34 response: Response,
35 settings: Annotated[Settings, Depends(get_settings)],
36 db: Annotated[Database, Depends(create_database)],
37 access_token: Annotated[str | None, Cookie()] = None,
38 refresh_token: Annotated[str | None, Cookie()] = None,
39 x_forwarded_for: Annotated[str | None, Header()] = None,
40 user_agent: Annotated[str | None, Header()] = "",
41) -> bool:
42 """Dependency that checks for a valid login.
44 None will be returned when the access token is still valid.
45 When the acces token is expired, it will try to generate a new refresh and access token.
46 """
47 if access_token is None and refresh_token is None:
48 return False
50 if refresh_token is None:
51 return False
53 if access_token:
54 try:
55 jwt.decode(
56 access_token,
57 settings.security.jwt_secret,
58 algorithms=[settings.security.jwt_algorithm],
59 )
60 # The access token is still valid
61 return True
62 except jwt.ExpiredSignatureError:
63 pass # ignore, we are going to recreate a new one, if possible
65 try:
66 decoded_refresh_token = jwt.decode(
67 refresh_token,
68 settings.security.jwt_refresh_secret,
69 algorithms=[settings.security.jwt_algorithm],
70 )
71 except jwt.ExpiredSignatureError:
72 return False
74 if x_forwarded_for:
75 client_ip = x_forwarded_for
76 else:
77 client_ip = request.client.host if request.client else ""
79 command = RefreshAccessTokenCommand(
80 identifier=decoded_refresh_token["jti"],
81 access_token_expiry_minutes=settings.security.access_token_expires_in,
82 refresh_token_expiry_minutes=settings.security.refresh_token_expires_in,
83 )
85 try:
86 async with UnitOfWork(db, always_commit=True):
87 new_refresh_token = await RefreshAccessToken(
88 RefreshTokenDbRepository(db),
89 AccessTokenDbRepository(db),
90 LogUserLoginDbService(
91 db,
92 email="",
93 user_agent=user_agent or "",
94 client_ip=client_ip,
95 ),
96 ).execute(command)
97 except AuthenticationException:
98 return False
100 create_cookies(request, response, new_refresh_token, settings)
101 return True
104async def get_coach(
105 user: Annotated[UserEntity, Depends(get_optional_user)],
106 database: Annotated[Database, Depends(create_database)],
107) -> ClubCoachEntity | None:
108 """Get the coach from the current user."""
109 if user is None:
110 return None
112 command = GetUserCoachCommand(uuid=str(user.uuid))
113 presenter = EntityPresenter[ClubCoachEntity]()
114 try:
115 await GetUserCoach(CoachDbRepository(database), presenter).execute(command)
116 except CoachNotFoundException:
117 return None
119 return presenter.entity