Source code for piccolo_api.encryption.providers

from __future__ import annotations

import logging
from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    import nacl
    from cryptography.fernet import Fernet


logger = logging.getLogger(__name__)


def get_fernet_class() -> type[Fernet]:  # type: ignore
    try:
        from cryptography.fernet import Fernet
    except ImportError as e:
        print(
            "Install pip install piccolo_api[cryptography] to use this "
            "feature."
        )
        raise e

    return Fernet


[docs] class EncryptionProvider(metaclass=ABCMeta): """ Base class for encryption providers. Don't use it directly, it must be subclassed. """ def __init__(self, prefix: str): self.prefix = prefix @abstractmethod def encrypt(self, value: str, add_prefix: bool = True) -> str: """ :param value: The value to encrypt. :param add_prefix: For example, with ``FernetProvider``, it will return a value like: ``'fernet-abc123'`` if ``add_prefix=True``. It can be useful to have some idea of how the value was encrypted if stored in a database. """ raise NotImplementedError() @abstractmethod def decrypt(self, encrypted_value: str, has_prefix: bool = True) -> str: """ :param encrypted_value: The value to decrypt. :param has_prefix: If the value has a prefix or not, indicating the algorithm used, i.e. ``'fernet-abc123'`` or just ``'abc123'``. """ raise NotImplementedError() def remove_prefix(self, encrypted_value: str) -> str: if encrypted_value.startswith(self.prefix): return encrypted_value.replace(f"{self.prefix}-", "", 1) else: raise ValueError( "Unable to identify which encryption was used - if moving " "to a new encryption provider, use " "`migrate_encrypted_value`." ) def add_prefix(self, encrypted_value: str) -> str: return f"{self.prefix}-{encrypted_value}"
[docs] class PlainTextProvider(EncryptionProvider): """ The values aren't encrypted - can be useful for testing. """ def __init__(self): super().__init__(prefix="plain") def encrypt(self, value: str, add_prefix: bool = True) -> str: return self.add_prefix(value) if add_prefix else value def decrypt(self, encrypted_value: str, has_prefix: bool = True) -> str: return ( self.remove_prefix(encrypted_value) if has_prefix else encrypted_value )
[docs] class FernetProvider(EncryptionProvider): def __init__(self, encryption_key: bytes): """ Uses the Fernet algorithm for encryption. :param encryption_key: This can be generated using ``FernetEncryption.get_new_key()``. """ self.encryption_key = encryption_key super().__init__(prefix="fernet") @staticmethod def get_new_key() -> bytes: Fernet = get_fernet_class() return Fernet.generate_key() # type: ignore def encrypt(self, value: str, add_prefix: bool = True) -> str: Fernet = get_fernet_class() fernet = Fernet(self.encryption_key) # type: ignore encrypted_value = fernet.encrypt(value.encode("utf-8")).decode("utf-8") return ( self.add_prefix(encrypted_value=encrypted_value) if add_prefix else encrypted_value ) def decrypt(self, encrypted_value: str, has_prefix: bool = True) -> str: if has_prefix: encrypted_value = self.remove_prefix(encrypted_value) Fernet = get_fernet_class() fernet = Fernet(self.encryption_key) # type: ignore return fernet.decrypt(encrypted_value.encode("utf-8")).decode("utf-8")
def get_nacl_encoding() -> nacl.encoding: # type: ignore try: import nacl.encoding except ImportError as e: print("Install pip install piccolo_api[pynacl] to use this feature.") raise e return nacl.encoding def get_nacl_utils() -> nacl.utils: # type: ignore try: import nacl.utils except ImportError as e: print("Install pip install piccolo_api[pynacl] to use this feature.") raise e return nacl.utils def get_nacl_secret() -> nacl.secret: # type: ignore try: import nacl.secret except ImportError as e: print("Install pip install piccolo_api[pynacl] to use this feature.") raise e return nacl.secret
[docs] class XChaCha20Provider(EncryptionProvider): def __init__(self, encryption_key: bytes): """ Uses the XChaCha20-Poly1305 algorithm for encryption. This is more secure than ``FernetProvider``. :param encryption_key: This can be generated using ``XChaCha20Provider.get_new_key()``. """ self.encryption_key = encryption_key super().__init__(prefix="xchacha20") @staticmethod def get_new_key() -> bytes: nacl_utils = get_nacl_utils() nacl_secret = get_nacl_secret() return nacl_utils.random(nacl_secret.Aead.KEY_SIZE) # type: ignore def _get_nacl_box(self) -> nacl.secret.Aead: nacl_secret = get_nacl_secret() return nacl_secret.Aead(self.encryption_key) # type: ignore def encrypt(self, value: str, add_prefix: bool = True) -> str: box = self._get_nacl_box() encrypted_value = box.encrypt(value.encode()).hex() return ( self.add_prefix(encrypted_value=encrypted_value) if add_prefix else encrypted_value ) def decrypt(self, encrypted_value: str, has_prefix: bool = True) -> str: if has_prefix: encrypted_value = self.remove_prefix(encrypted_value) box = self._get_nacl_box() return box.decrypt(bytes.fromhex(encrypted_value)).decode("utf-8")
def migrate_encrypted_value( old_provider: EncryptionProvider, new_provider: EncryptionProvider, encrypted_value: str, ): """ If you're migrating from one form of encryption to another, you can use this utility. """ return new_provider.encrypt(old_provider.decrypt(encrypted_value))