Coverage for apps/kwai-api/src/kwai_api/v1/auth/endpoints/login.py: 81%

100 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Module that implements all APIs for login.""" 

2 

3from typing import Annotated 

4 

5import jwt 

6 

7from fastapi import ( 

8 APIRouter, 

9 Cookie, 

10 Depends, 

11 Form, 

12 Header, 

13 HTTPException, 

14 Request, 

15 status, 

16) 

17from fastapi.responses import Response 

18from fastapi.security import OAuth2PasswordRequestForm 

19from kwai_bc_identity.authenticate_user import AuthenticateUser, AuthenticateUserCommand 

20from kwai_bc_identity.exceptions import AuthenticationException, NotAllowedException 

21from kwai_bc_identity.logout import Logout, LogoutCommand 

22from kwai_bc_identity.recover_user import RecoverUser, RecoverUserCommand 

23from kwai_bc_identity.refresh_access_token import ( 

24 RefreshAccessToken, 

25 RefreshAccessTokenCommand, 

26) 

27from kwai_bc_identity.reset_password import ( 

28 ResetPassword, 

29 ResetPasswordCommand, 

30 UserRecoveryConfirmedException, 

31) 

32from kwai_bc_identity.tokens.access_token_db_repository import AccessTokenDbRepository 

33from kwai_bc_identity.tokens.log_user_login_db_service import LogUserLoginDbService 

34from kwai_bc_identity.tokens.refresh_token_db_repository import RefreshTokenDbRepository 

35from kwai_bc_identity.tokens.refresh_token_repository import ( 

36 RefreshTokenNotFoundException, 

37) 

38from kwai_bc_identity.user_recoveries.user_recovery_db_repository import ( 

39 UserRecoveryDbRepository, 

40) 

41from kwai_bc_identity.user_recoveries.user_recovery_repository import ( 

42 UserRecoveryNotFoundException, 

43) 

44from kwai_bc_identity.users.user_account_db_repository import UserAccountDbRepository 

45from kwai_bc_identity.users.user_account_repository import UserAccountNotFoundException 

46from kwai_core.db.database import Database 

47from kwai_core.db.uow import UnitOfWork 

48from kwai_core.domain.exceptions import UnprocessableException 

49from kwai_core.domain.value_objects.email_address import InvalidEmailException 

50from kwai_core.events.publisher import Publisher 

51from kwai_core.settings import Settings, get_settings 

52from loguru import logger 

53 

54from kwai_api.dependencies import create_database, get_publisher 

55from kwai_api.v1.auth.cookies import create_cookies, delete_cookies 

56 

57 

58router = APIRouter() 

59 

60 

61@router.post( 

62 "/login", 

63 summary="Create access and refresh token for a user.", 

64 responses={ 

65 200: {"description": "The user is logged in successfully."}, 

66 401: { 

67 "description": "The email is invalid, authentication failed or user is unknown." 

68 }, 

69 }, 

70) 

71async def login( 

72 request: Request, 

73 settings: Annotated[Settings, Depends(get_settings)], 

74 db: Annotated[Database, Depends(create_database)], 

75 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

76 response: Response, 

77 x_forwarded_for: Annotated[str | None, Header()] = None, 

78 user_agent: Annotated[str | None, Header()] = "", 

79): 

80 """Login a user. 

81 

82 This request expects a form (application/x-www-form-urlencoded). The form 

83 must contain a `username` and `password` field. The username is 

84 the email address of the user. 

85 

86 On success, a cookie for the access token and the refresh token will be returned. 

87 """ 

88 command = AuthenticateUserCommand( 

89 username=form_data.username, 

90 password=form_data.password, 

91 access_token_expiry_minutes=settings.security.access_token_expires_in, 

92 refresh_token_expiry_minutes=settings.security.refresh_token_expires_in, 

93 ) 

94 

95 try: 

96 if x_forwarded_for: 

97 client_ip = x_forwarded_for 

98 else: 

99 client_ip = request.client.host if request.client else "" 

100 async with UnitOfWork(db, always_commit=True): 

101 refresh_token = await AuthenticateUser( 

102 UserAccountDbRepository(db), 

103 AccessTokenDbRepository(db), 

104 RefreshTokenDbRepository(db), 

105 LogUserLoginDbService( 

106 db, 

107 email=form_data.username, 

108 user_agent=user_agent or "", 

109 client_ip=client_ip, 

110 ), 

111 ).execute(command) 

112 except InvalidEmailException as exc: 

113 raise HTTPException( 

114 status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid email address" 

115 ) from exc 

116 except AuthenticationException as exc: 

117 raise HTTPException( 

118 status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc) 

119 ) from exc 

120 except UserAccountNotFoundException as exc: 

121 raise HTTPException( 

122 status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc) 

123 ) from exc 

124 

125 create_cookies(response, refresh_token, settings) 

126 response.status_code = status.HTTP_200_OK 

127 

128 return response 

129 

130 

131@router.post( 

132 "/logout", 

133 summary="Logout the current user", 

134 responses={200: {"description": "The user is logged out successfully."}}, 

135) 

136async def logout( 

137 settings: Annotated[Settings, Depends(get_settings)], 

138 db: Annotated[Database, Depends(create_database)], 

139 response: Response, 

140 refresh_token: Annotated[str | None, Cookie()] = None, 

141) -> None: 

142 """Log out the current user. 

143 

144 A user is logged out by revoking the refresh token. The associated access token 

145 will also be revoked. 

146 

147 This request expects a form (application/x-www-form-urlencoded). The form 

148 must contain a **refresh_token** field. 

149 

150 Even when a token could not be found, the cookies will be deleted. 

151 """ 

152 if refresh_token: 

153 decoded_refresh_token = jwt.decode( 

154 refresh_token, 

155 key=settings.security.jwt_refresh_secret, 

156 algorithms=[settings.security.jwt_algorithm], 

157 ) 

158 command = LogoutCommand(identifier=decoded_refresh_token["jti"]) 

159 try: 

160 async with UnitOfWork(db): 

161 await Logout( 

162 refresh_token_repository=RefreshTokenDbRepository(db), 

163 access_token_repository=AccessTokenDbRepository(db), 

164 ).execute(command) 

165 except RefreshTokenNotFoundException: 

166 pass 

167 

168 delete_cookies(response) 

169 response.status_code = status.HTTP_200_OK 

170 

171 

172@router.post( 

173 "/access_token", 

174 summary="Renew an access token using a refresh token.", 

175 responses={ 

176 200: {"description": "The access token is renewed."}, 

177 401: {"description": "The refresh token is expired."}, 

178 }, 

179) 

180async def renew_access_token( 

181 request: Request, 

182 settings: Annotated[Settings, Depends(get_settings)], 

183 db: Annotated[Database, Depends(create_database)], 

184 refresh_token: Annotated[str, Cookie()], 

185 response: Response, 

186 x_forwarded_for: Annotated[str | None, Header()] = None, 

187 user_agent: Annotated[str | None, Header()] = "", 

188): 

189 """Refresh the access token. 

190 

191 On success, a new access token / refresh token cookie will be sent. 

192 

193 When the refresh token is expired, the user needs to log in again. 

194 """ 

195 try: 

196 decoded_refresh_token = jwt.decode( 

197 refresh_token, 

198 key=settings.security.jwt_refresh_secret, 

199 algorithms=[settings.security.jwt_algorithm], 

200 ) 

201 except jwt.ExpiredSignatureError as exc: 

202 raise HTTPException( 

203 status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc) 

204 ) from exc 

205 

206 command = RefreshAccessTokenCommand( 

207 identifier=decoded_refresh_token["jti"], 

208 access_token_expiry_minutes=settings.security.access_token_expires_in, 

209 refresh_token_expiry_minutes=settings.security.refresh_token_expires_in, 

210 ) 

211 

212 try: 

213 if x_forwarded_for: 

214 client_ip = x_forwarded_for 

215 else: 

216 client_ip = request.client.host if request.client else "" 

217 

218 async with UnitOfWork(db, always_commit=True): 

219 new_refresh_token = await RefreshAccessToken( 

220 RefreshTokenDbRepository(db), 

221 AccessTokenDbRepository(db), 

222 LogUserLoginDbService( 

223 db, 

224 email="", 

225 user_agent=user_agent or "", 

226 client_ip=client_ip, 

227 ), 

228 ).execute(command) 

229 except AuthenticationException as exc: 

230 raise HTTPException( 

231 status_code=status.HTTP_401_UNAUTHORIZED, detail=str(exc) 

232 ) from exc 

233 

234 create_cookies(response, new_refresh_token, settings) 

235 response.status_code = status.HTTP_200_OK 

236 

237 

238@router.post( 

239 "/recover", 

240 summary="Initiate a password reset flow", 

241 responses={ 

242 200: {"description": "Ok."}, 

243 }, 

244) 

245async def recover_user( 

246 db: Annotated[Database, Depends(create_database)], 

247 publisher: Annotated[Publisher, Depends(get_publisher)], 

248 email: Annotated[str, Form()], 

249) -> None: 

250 """Start a recover password flow for the given email address. 

251 

252 A mail with a unique id will be sent using the message bus. 

253 

254 This request expects a form (application/x-www-form-urlencoded). The form 

255 must contain an **email** field. 

256 

257 !!! Note 

258 To avoid leaking information, this api will always respond with 200 

259 """ 

260 command = RecoverUserCommand(email=email) 

261 try: 

262 async with UnitOfWork(db): 

263 await RecoverUser( 

264 UserAccountDbRepository(db), UserRecoveryDbRepository(db), publisher 

265 ).execute(command) 

266 except UserAccountNotFoundException: 

267 logger.warning(f"Unknown email address used for a password recovery: {email}") 

268 except UnprocessableException as ex: 

269 logger.warning(f"User recovery could not be started: {ex}") 

270 

271 

272@router.post( 

273 "/reset", 

274 summary="Reset the password of a user.", 

275 responses={ # noqa B006 

276 200: {"description": "The password is reset successfully."}, 

277 400: {"description": "The reset code was already used."}, 

278 403: {"description": "This request is forbidden."}, 

279 404: {"description": "The uniqued id of the recovery could not be found."}, 

280 422: {"description": "The user could not be found."}, 

281 }, 

282) 

283async def reset_password( 

284 uuid: Annotated[str, Form()], 

285 password: Annotated[str, Form()], 

286 db: Annotated[Database, Depends(create_database)], 

287): 

288 """Reset the password of the user. 

289 

290 Http code 200 on success, 404 when the unique id is invalid, 422 when the 

291 request can't be processed, 403 when the request is forbidden. 

292 

293 This request expects a form (application/x-www-form-urlencoded). The form 

294 must contain an **uuid** and **password** field. The unique id must be valid 

295 and is retrieved by [/api/v1/auth/recover][post_/recover]. 

296 """ 

297 command = ResetPasswordCommand(uuid=uuid, password=password) 

298 try: 

299 async with UnitOfWork(db): 

300 await ResetPassword( 

301 user_account_repo=UserAccountDbRepository(db), 

302 user_recovery_repo=UserRecoveryDbRepository(db), 

303 ).execute(command) 

304 except UserRecoveryNotFoundException as exc: 

305 raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) from exc 

306 except UserAccountNotFoundException as exc: 

307 raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY) from exc 

308 except UserRecoveryConfirmedException as exc: 

309 raise HTTPException( 

310 status_code=status.HTTP_400_BAD_REQUEST, 

311 detail="Reset code was already used.", 

312 ) from exc 

313 except NotAllowedException as exc: 

314 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from exc