Source code for piccolo_api.token_auth.tables

from __future__ import annotations

import typing as t
import uuid

from piccolo.apps.user.tables import BaseUser
from piccolo.columns.column_types import ForeignKey, Serial, Varchar
from piccolo.table import Table
from piccolo.utils.sync import run_sync

if t.TYPE_CHECKING:  # pragma: no cover
    from piccolo.query.methods.select import First


def generate_token() -> str:
    return str(uuid.uuid4())


[docs] class TokenAuth(Table): """ Holds randomly generated tokens. Useful for mobile authentication, IOT etc. Session auth is recommended for web usage. """ id: Serial token = Varchar(default=generate_token) user = ForeignKey(references=BaseUser)
[docs] @classmethod async def create_token( cls, user_id: int, one_per_user: bool = True ) -> str: """ Create a new token. :param one_per_user: If True, a ValueError is raised if a token already exists for that user. """ if ( await cls.exists().where(cls.user.id == user_id).run() and one_per_user ): raise ValueError(f"User {user_id} already has a token.") token_auth = cls(user=user_id) await token_auth.save().run() return t.cast(str, token_auth.token)
[docs] @classmethod def create_token_sync(cls, user_id: int) -> str: return run_sync(cls.create_token(user_id))
[docs] @classmethod async def authenticate(cls, token: str) -> First: return cls.select(cls.user.id).where(cls.token == token).first()
[docs] @classmethod async def authenticate_sync(cls, token: str) -> t.Optional[int]: return run_sync(cls.authenticate(token))
[docs] @classmethod async def get_user_id(cls, token: str) -> t.Optional[int]: """ Returns the user_id if the given token is valid, otherwise None. """ data = ( await cls.select(cls.user) .where(cls.token == token) .output(as_list=True) .first() .run() ) return data.get("user", None) if data else None