Source code for piccolo_api.token_auth.endpoints

from __future__ import annotations

import typing as t
from abc import ABCMeta, abstractmethod

from piccolo.apps.user.tables import BaseUser
from starlette.endpoints import HTTPEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse, Response

from .tables import TokenAuth


[docs] class TokenProvider(metaclass=ABCMeta): """ Subclass this to provide your own custom token provider. """ @abstractmethod async def get_token(self, username: str, password: str) -> t.Optional[str]: pass
[docs] class PiccoloTokenProvider(TokenProvider): """ Retrieves a token from a Piccolo table. """ async def get_token(self, username: str, password: str) -> t.Optional[str]: user = await BaseUser.login(username=username, password=password) if user: response = ( await TokenAuth.select(TokenAuth.token) .first() .where(TokenAuth.user == user) .run() ) return response["token"] return None
class TokenAuthLoginEndpoint(HTTPEndpoint): token_provider: TokenProvider = PiccoloTokenProvider() async def post(self, request: Request) -> Response: """ Return a token if the credentials are correct. """ json = await request.json() username = json.get("username") password = json.get("password") if username and password: token = await self.token_provider.get_token( username=username, password=password ) if token: return JSONResponse({"token": str(token)}) else: return Response( content="The credentials were incorrect", status_code=401, ) else: return Response( content="No credentials were found.", status_code=401 )
[docs] def token_login( provider: TokenProvider = PiccoloTokenProvider(), ) -> t.Type[TokenAuthLoginEndpoint]: """ Create an endpoint for logging using tokens. :param token_provider: Used to check if a token is valid. """ class TokenAuthLoginEndpoint_(TokenAuthLoginEndpoint): token_provider = provider return TokenAuthLoginEndpoint_