Source code for piccolo_api.token_auth.endpoints
from __future__ import annotations
from abc import ABCMeta, abstractmethod
from typing import Optional
from piccolo.apps.user.tables import BaseUser
from starlette.endpoints import HTTPEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.status import HTTP_401_UNAUTHORIZED
from .tables import TokenAuth
[docs]
class TokenProvider(metaclass=ABCMeta):
"""
Subclass this to provide your own custom token provider.
"""
@abstractmethod
async def get_token(self, username: str, password: str) -> Optional[str]:
pass
[docs]
class PiccoloTokenProvider(TokenProvider):
"""
Retrieves a token from a Piccolo table.
"""
async def get_token(self, username: str, password: str) -> Optional[str]:
user = await BaseUser.login(username=username, password=password)
if user:
response = (
await TokenAuth.select(TokenAuth.token)
.where(TokenAuth.user == user)
.first()
)
if response:
return response["token"]
return None
class TokenAuthLoginEndpoint(HTTPEndpoint):
token_provider: TokenProvider = PiccoloTokenProvider()
async def post(self, request: Request) -> Response:
"""
Return a token if the credentials are correct.
"""
json = await request.json()
username = json.get("username")
password = json.get("password")
if username and password:
token = await self.token_provider.get_token(
username=username, password=password
)
if token:
return JSONResponse({"token": str(token)})
else:
return Response(
content="The credentials were incorrect",
status_code=HTTP_401_UNAUTHORIZED,
)
else:
return Response(
content="No credentials were found.",
status_code=HTTP_401_UNAUTHORIZED,
)
[docs]
def token_login(
provider: TokenProvider = PiccoloTokenProvider(),
) -> type[TokenAuthLoginEndpoint]:
"""
Create an endpoint for logging using tokens.
:param token_provider:
Used to check if a token is valid.
"""
class TokenAuthLoginEndpoint_(TokenAuthLoginEndpoint):
token_provider = provider
return TokenAuthLoginEndpoint_