| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409 |
- import inspect
- from datetime import datetime, timedelta, timezone
- from typing import Any, Awaitable, Callable, Collection, Dict, Optional, Type, Union
- import jwt
- from anyio.to_thread import run_sync
- from fastapi import FastAPI, Request, Response
- from fastapi.security import OAuth2PasswordBearer, SecurityScopes
- from starlette.middleware.base import BaseHTTPMiddleware
- from fastapi_login.exceptions import InsufficientScopeException, InvalidCredentialsException
- from fastapi_login.secrets import to_secret
- from fastapi_login.utils import ordered_partial
- SECRET_TYPE = Union[str, bytes]
- CUSTOM_EXCEPTION = Union[Type[Exception], Exception]
- class LoginManager(OAuth2PasswordBearer):
- def __init__(
- self,
- secret: Union[SECRET_TYPE, Dict[str, SECRET_TYPE]],
- token_url: str,
- algorithm="HS256",
- use_cookie=False,
- use_header=True,
- cookie_name: str = "access-token",
- not_authenticated_exception: CUSTOM_EXCEPTION = InvalidCredentialsException,
- default_expiry: timedelta = timedelta(minutes=15),
- scopes: Optional[Dict[str, str]] = None,
- out_of_scope_exception: CUSTOM_EXCEPTION = InsufficientScopeException,
- ):
- """
- Initializes LoginManager
- Args:
- algorithm (str): Should be "HS256" or "RS256" used to decrypt the JWT
- token_url (str): The url where the user can login to get the token
- use_cookie (bool): Set if cookies should be checked for the token
- use_header (bool): Set if headers should be checked for the token
- cookie_name (str): Name of the cookie to check for the token
- not_authenticated_exception (Union[Type[Exception], Exception]): Exception to raise when the user is not authenticated
- this defaults to `fastapi_login.exceptions.InvalidCredentialsException`
- default_expiry (datetime.timedelta): The default expiry time of the token, defaults to 15 minutes
- scopes (Dict[str, str]): Scopes argument of OAuth2PasswordBearer for more information see
- `https://fastapi.tiangolo.com/advanced/security/oauth2-scopes/#oauth2-security-scheme`
- out_of_scope_exception (Union[Type[Exception], Exception]): Exception to raise when the user is out of scopes,
- if not set, default is `fastapi_login.exceptions.InsufficientScopeException`
- """
- if use_cookie is False and use_header is False:
- raise AttributeError(
- "use_cookie and use_header are both False one of them needs to be True"
- )
- if isinstance(secret, str):
- secret = secret.encode()
- self.secret = to_secret({"algorithms": algorithm, "secret": secret})
- self.algorithm = algorithm
- self.oauth_scheme = None
- self.use_cookie = use_cookie
- self.use_header = use_header
- self.cookie_name = cookie_name
- self.default_expiry = default_expiry
- # private
- self._user_callback: Optional[ordered_partial] = None
- self._not_authenticated_exception = not_authenticated_exception
- self._out_of_scope_exception = out_of_scope_exception
- # we take over the exception raised possibly by setting auto_error to False
- super().__init__(tokenUrl=token_url, auto_error=False, scopes=scopes)
- @property
- def out_of_scope_exception(self):
- """
- Exception raised when the user is out of scope.
- Defaults to `fastapi_login.exceptions.InsufficientScopeException`
- """
- return self._out_of_scope_exception
- @property
- def not_authenticated_exception(self):
- """
- Exception raised when no (valid) token is present.
- Defaults to `fastapi_login.exceptions.InvalidCredentialsException`
- """
- return self._not_authenticated_exception
- def user_loader(self, *args, **kwargs) -> Union[Callable, Callable[..., Awaitable]]:
- """
- This sets the callback to retrieve the user.
- The function should take an unique identifier like an email
- and return the user object or None.
- Basic usage:
- >>> from fastapi import FastAPI
- >>> from fastapi_login import LoginManager
- >>> app = FastAPI()
- >>> # use import secrets; print(secrets.token_hex(24)) to get a suitable secret key
- >>> SECRET = "super-secret"
- >>> manager = LoginManager(SECRET, token_url="Login")
- >>> manager.user_loader()(get_user)
- >>> @manager.user_loader(...) # Arguments and keyword arguments declared here are passed on
- >>> def get_user(user_identifier, ...):
- ... # get user logic here
- Args:
- args: Positional arguments to pass on to the decorated method
- kwargs: Keyword arguments to pass on to the decorated method
- Returns:
- The callback
- """
- def decorator(callback: Union[Callable, Callable[..., Awaitable]]):
- """
- The actual setter of the load_user callback
- Args:
- callback (Callable or Awaitable): The callback which returns the user
- Returns:
- Partial of the callback with given args and keyword arguments already set
- """
- self._user_callback = ordered_partial(callback, *args, **kwargs)
- return callback
- return decorator
- def _get_payload(self, token: str) -> Dict[str, Any]:
- """
- Returns the decoded token payload.
- If failed, raises `LoginManager.not_authenticated_exception`
- Args:
- token (str): The token to decode
- Returns:
- Payload of the token
- Raises:
- LoginManager.not_authenticated_exception: The token is invalid or None was returned by `_load_user`
- """
- try:
- payload = jwt.decode(
- token, self.secret.secret_for_decode, algorithms=[self.algorithm]
- )
- return payload
- # This includes all errors raised by pyjwt
- except jwt.PyJWTError:
- raise self.not_authenticated_exception
- def _has_scopes(
- self, payload: Dict[str, Any], required_scopes: Optional[SecurityScopes]
- ) -> bool:
- """
- Returns true if the required scopes are present in the token
- Args:
- payload (Dict[str, Any]): The decoded JWT payload
- required_scopes: The scopes required to access this route
- Returns:
- True if the required scopes are contained in the tokens payload
- """
- if required_scopes is None or not required_scopes.scopes:
- # According to RFC 6749, the scopes are optional
- return True
- # when the manager was invoked using fastapi.Security(manager, scopes=[...])
- # we have to check if all required scopes are contained in the token
- provided_scopes = payload.get("scopes", [])
- # Check if enough scopes are present
- if len(provided_scopes) < len(required_scopes.scopes):
- return False
- # Check if all required scopes are present
- elif any(scope not in provided_scopes for scope in required_scopes.scopes):
- return False
- return True
- def has_scopes(self, token: str, required_scopes: SecurityScopes) -> bool:
- """
- Combines `_get_payload` and `_has_scopes` to check if the token has the required scopes
- Args:
- token (str): The token to decode
- required_scopes: The scopes required to access this route
- Returns:
- True if the required scopes are contained in the tokens payload
- """
- payload = self._get_payload(token)
- return self._has_scopes(payload, required_scopes)
- async def _get_current_user(self, payload: Dict[str, Any]):
- """
- This decodes the jwt based on the secret and the algorithm set on the instance.
- If the token is correctly formatted and the user is found the user object
- is returned else this raises `LoginManager.not_authenticated_exception`
- Args:
- payload (Dict[str, Any]): The decoded JWT payload
- Returns:
- The user object returned by the instances `_user_callback`
- Raises:
- LoginManager.not_authenticated_exception: The token is invalid or None was returned by `_load_user`
- """
- # the identifier should be stored under the sub (subject) key
- user_identifier = payload.get("sub")
- if user_identifier is None:
- raise self.not_authenticated_exception
- user = await self._load_user(user_identifier)
- if user is None:
- raise self.not_authenticated_exception
- return user
- async def get_current_user(self, token: str) -> Any:
- """
- Combines `_get_payload` and `_get_current_user` to get the user object
- Args:
- token (str): The encoded jwt token
- Returns:
- The user object returned by the instances `_user_callback`
- Raises:
- LoginManager.not_authenticated_exception: The token is invalid or None was returned by `_load_user`
- """
- payload = self._get_payload(token)
- return await self._get_current_user(payload)
- async def _load_user(self, identifier: Any):
- """
- This loads the user using the user_callback
- Args:
- identifier (Any): The user identifier expected by `_user_callback`
- Returns:
- The user object returned by `_user_callback` or None
- Raises:
- Exception: When no ``user_loader`` has been set
- """
- if self._user_callback is None:
- raise Exception("Missing user_loader callback")
- if inspect.iscoroutinefunction(self._user_callback):
- user = await self._user_callback(identifier)
- else:
- user = await run_sync(self._user_callback, identifier)
- return user
- def create_access_token(
- self,
- *,
- data: dict,
- expires: Optional[timedelta] = None,
- scopes: Optional[Collection[str]] = None,
- ) -> str:
- """
- Helper function to create the encoded access token using
- the provided secret and the algorithm of the LoginManager instance
- Args:
- data (dict): The data which should be stored in the token
- expires (datetime.timedelta): An optional timedelta in which the token expires.
- Defaults to 15 minutes
- scopes (Collection): Optional scopes the token user has access to.
- Returns:
- The encoded JWT with the data and the expiry. The expiry is
- available under the 'exp' key
- """
- to_encode = data.copy()
- if expires:
- expires_in = datetime.now(timezone.utc) + expires
- else:
- expires_in = datetime.now(timezone.utc) + self.default_expiry
- to_encode.update({"exp": expires_in})
- if scopes is not None:
- unique_scopes = set(scopes)
- to_encode.update({"scopes": list(unique_scopes)})
- return jwt.encode(to_encode, self.secret.secret_for_encode, self.algorithm)
- def set_cookie(self, response: Response, token: str) -> None:
- """
- Utility function to set a cookie containing token on the response
- Args:
- response (fastapi.Response): The response which is send back
- token (str): The created JWT
- """
- response.set_cookie(key=self.cookie_name, value=token, httponly=True)
- def _token_from_cookie(self, request: Request) -> Optional[str]:
- """
- Checks the requests cookies for cookies with the value of`self.cookie_name` as name
- Args:
- request (fastapi.Request): The request to the route, normally filled in automatically
- Returns:
- The access token found in the cookies of the request or None
- """
- return request.cookies.get(self.cookie_name) or None
- async def _get_token(self, request: Request):
- """
- Tries to extract the token from the request, based on self.use_header and self.use_cookie
- Args:
- request: The request containing the token
- Returns:
- The in the request contained encoded JWT token
- Raises:
- LoginManager.not_authenticated_exception if no token is present
- """
- token = None
- if self.use_cookie:
- token = self._token_from_cookie(request)
- if not token and self.use_header:
- token = await super(LoginManager, self).__call__(request)
- if not token:
- raise self.not_authenticated_exception
- return token
- async def __call__(
- self,
- request: Request,
- security_scopes: SecurityScopes = None, # type: ignore
- ) -> Any:
- """
- Provides the functionality to act as a Dependency
- Args:
- request (fastapi.Request):The incoming request, this is set automatically
- by FastAPI
- Returns:
- The user object or None
- Raises:
- LoginManager.not_authenticated_exception: If set by the user and `self.auto_error` is set to False
- """
- token = await self._get_token(request)
- payload = self._get_payload(token)
- if not self._has_scopes(payload, security_scopes):
- raise self._out_of_scope_exception
- return await self._get_current_user(payload)
- async def optional(self, request: Request, security_scopes: SecurityScopes = None): # type: ignore
- """
- Acts as a dependency which catches all errors and returns `None` instead
- """
- try:
- user = await self.__call__(request, security_scopes)
- except Exception:
- return None
- else:
- return user
- def attach_middleware(self, app: FastAPI):
- """
- Add the instance as a middleware, which adds the user object, if present,
- to the request state
- Args:
- app (fastapi.FastAPI): FastAPI application
- """
- async def __set_user(request: Request, call_next):
- try:
- request.state.user = await self.__call__(request)
- except Exception:
- # An error occurred while getting the user
- # as middlewares are called for every incoming request
- # it's not a good idea to return the Exception
- # so we set the user to None
- request.state.user = None
- return await call_next(request)
- app.add_middleware(BaseHTTPMiddleware, dispatch=__set_user)
|