Source code for piccolo_api.jwt_auth.endpoints
from __future__ import annotations
import typing as t
from abc import abstractproperty
from datetime import datetime, timedelta, timezone
import jwt
from piccolo.apps.user.tables import BaseUser
from starlette.endpoints import HTTPEndpoint
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse
class JWTLoginBase(HTTPEndpoint):
@abstractproperty
def _auth_table(self) -> t.Type[BaseUser]:
raise NotImplementedError
@abstractproperty
def _secret(self) -> str:
raise NotImplementedError
@abstractproperty
def _expiry(self) -> timedelta:
raise NotImplementedError
async def post(self, request: Request) -> JSONResponse:
body = await request.json()
username = body.get("username", None)
password = body.get("password", None)
user_id = await self._auth_table.login(
username=username, password=password
)
if not user_id:
raise HTTPException(status_code=401, detail="Login failed")
expiry = datetime.now(tz=timezone.utc) + self._expiry
payload = jwt.encode({"user_id": user_id, "exp": expiry}, self._secret)
return JSONResponse({"token": payload})
[docs]
def jwt_login(
secret: str,
auth_table: t.Type[BaseUser] = BaseUser,
expiry: timedelta = timedelta(days=1),
) -> t.Type[JWTLoginBase]:
"""
Create an endpoint for generating JWT tokens.
:param secret:
Used to sign the the JWT tokens.
:param auth_table:
Which Piccolo table to use to authenticate the user.
:param expiry:
How long before the JWT token expires.
"""
class JWTLogin(JWTLoginBase):
_auth_table = auth_table
_secret = secret
_expiry = expiry
return JWTLogin