Source code for piccolo_api.media.s3
from __future__ import annotations
import asyncio
import functools
import pathlib
import sys
import typing as t
from concurrent.futures import ThreadPoolExecutor
from piccolo.apps.user.tables import BaseUser
from piccolo.columns.column_types import Array, Text, Varchar
from .base import ALLOWED_CHARACTERS, ALLOWED_EXTENSIONS, MediaStorage
from .content_type import CONTENT_TYPE
if t.TYPE_CHECKING: # pragma: no cover
from concurrent.futures._base import Executor
[docs]class S3MediaStorage(MediaStorage):
def __init__(
self,
column: t.Union[Text, Varchar, Array],
bucket_name: str,
folder_name: t.Optional[str] = None,
connection_kwargs: t.Optional[t.Dict[str, t.Any]] = None,
sign_urls: bool = True,
signed_url_expiry: int = 3600,
upload_metadata: t.Optional[t.Dict[str, t.Any]] = None,
executor: t.Optional[Executor] = None,
allowed_extensions: t.Optional[t.Sequence[str]] = ALLOWED_EXTENSIONS,
allowed_characters: t.Optional[t.Sequence[str]] = ALLOWED_CHARACTERS,
):
"""
Stores media files in S3 compatible storage. This is a good option when
you have lots of files to store, and don't want them stored locally
on a server. Many cloud providers provide S3 compatible storage,
besides from Amazon Web Services.
:param column:
The Piccolo :class:`Column <piccolo.columns.base.Column>` which the
storage is for.
:param bucket_name:
Which S3 bucket the files are stored in.
:param folder_name:
The files will be stored in this folder within the bucket. S3
buckets don't really have folders, but if ``folder`` is
``'movie_screenshots'``, then we store the file at
``'movie_screenshots/my-file-abc-123.jpeg'``, to simulate it being
in a folder.
:param connection_kwargs:
These kwargs are passed directly to the boto3 :meth:`client <boto3.session.Session.client>`.
For example::
S3MediaStorage(
...,
connection_kwargs={
'aws_access_key_id': 'abc123',
'aws_secret_access_key': 'xyz789',
'endpoint_url': 's3.cloudprovider.com',
'region_name': 'uk'
}
)
:param sign_urls:
Whether to sign the URLs - by default this is ``True``, as it's
highly recommended that your store your files in a private bucket.
:param signed_url_expiry:
Files are accessed via signed URLs, which are only valid for this
number of seconds.
:param upload_metadata:
You can provide additional metadata to the uploaded files. To
see all available options see :class:`S3Transfer.ALLOWED_UPLOAD_ARGS <boto3.s3.transfer.S3Transfer>`.
Below we show examples of common use cases.
To set the ACL::
S3MediaStorage(
...,
upload_metadata={'ACL': 'my_acl'}
)
To set the content disposition (how the file behaves when opened -
is it downloaded, or shown in the browser)::
S3MediaStorage(
...,
# Shows the file within the browser:
upload_metadata={'ContentDisposition': 'inline'}
)
To attach `user defined metadata <https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html>`_
to the file::
S3MediaStorage(
...,
upload_metadata={'Metadata': {'myfield': 'abc123'}}
)
To specify how long browsers should cache the file for::
S3MediaStorage(
...,
# Cache the file for 24 hours:
upload_metadata={'CacheControl': 'max-age=86400'}
)
Note: We automatically add the ``ContentType`` field based on the
file type.
:param executor:
An executor, which file save operations are run in, to avoid
blocking the event loop. If not specified, we use a sensibly
configured :class:`ThreadPoolExecutor <concurrent.futures.ThreadPoolExecutor>`.
:param allowed_extensions:
Which file extensions are allowed. If ``None``, then all extensions
are allowed (not recommended unless the users are trusted).
:param allowed_characters:
Which characters are allowed in the file name. By default, it's
very strict. If set to ``None`` then all characters are allowed.
""" # noqa: E501
try:
import boto3 # noqa
except ImportError: # pragma: no cover
sys.exit(
"Please install boto3 to use this feature "
"`pip install 'piccolo_api[s3]'`"
)
else:
self.boto3 = boto3
self.bucket_name = bucket_name
self.upload_metadata = upload_metadata or {}
self.folder_name = folder_name
self.connection_kwargs = connection_kwargs or {}
self.sign_urls = sign_urls
self.signed_url_expiry = signed_url_expiry
self.executor = executor or ThreadPoolExecutor(max_workers=10)
super().__init__(
column=column,
allowed_extensions=allowed_extensions,
allowed_characters=allowed_characters,
)
def get_client(self, config=None): # pragma: no cover
"""
Returns an S3 client.
"""
session = self.boto3.session.Session()
extra_kwargs = {"config": config} if config else {}
client = session.client("s3", **self.connection_kwargs, **extra_kwargs)
return client
async def store_file(
self, file_name: str, file: t.IO, user: t.Optional[BaseUser] = None
) -> str:
loop = asyncio.get_running_loop()
blocking_function = functools.partial(
self.store_file_sync, file_name=file_name, file=file, user=user
)
file_key = await loop.run_in_executor(self.executor, blocking_function)
return file_key
def _prepend_folder_name(self, file_key: str) -> str:
folder_name = self.folder_name
if folder_name:
return str(pathlib.Path(folder_name, file_key))
else:
return file_key
def store_file_sync(
self, file_name: str, file: t.IO, user: t.Optional[BaseUser] = None
) -> str:
"""
A sync wrapper around :meth:`store_file`.
"""
file_key = self.generate_file_key(file_name=file_name, user=user)
extension = file_key.rsplit(".", 1)[-1]
client = self.get_client()
upload_metadata: t.Dict[str, t.Any] = self.upload_metadata
if extension in CONTENT_TYPE:
upload_metadata["ContentType"] = CONTENT_TYPE[extension]
client.upload_fileobj(
file,
self.bucket_name,
self._prepend_folder_name(file_key),
ExtraArgs=upload_metadata,
)
return file_key
async def generate_file_url(
self, file_key: str, root_url: str, user: t.Optional[BaseUser] = None
) -> str:
"""
This retrieves an absolute URL for the file.
"""
loop = asyncio.get_running_loop()
blocking_function: t.Callable = functools.partial(
self.generate_file_url_sync,
file_key=file_key,
root_url=root_url,
user=user,
)
return await loop.run_in_executor(self.executor, blocking_function)
def generate_file_url_sync(
self, file_key: str, root_url: str, user: t.Optional[BaseUser] = None
) -> str:
"""
A sync wrapper around :meth:`generate_file_url`.
"""
if self.sign_urls:
config = None
else:
from botocore import UNSIGNED
from botocore.config import Config
config = Config(signature_version=UNSIGNED)
s3_client = self.get_client(config=config)
return s3_client.generate_presigned_url(
ClientMethod="get_object",
Params={
"Bucket": self.bucket_name,
"Key": self._prepend_folder_name(file_key),
},
ExpiresIn=self.signed_url_expiry,
)
###########################################################################
async def get_file(self, file_key: str) -> t.Optional[t.IO]:
"""
Returns the file object matching the ``file_key``.
"""
loop = asyncio.get_running_loop()
func = functools.partial(self.get_file_sync, file_key=file_key)
return await loop.run_in_executor(self.executor, func)
def get_file_sync(self, file_key: str) -> t.Optional[t.IO]:
"""
Returns the file object matching the ``file_key``.
"""
s3_client = self.get_client()
response = s3_client.get_object(
Bucket=self.bucket_name,
Key=self._prepend_folder_name(file_key),
)
return response["Body"]
async def delete_file(self, file_key: str):
"""
Deletes the file object matching the ``file_key``.
"""
loop = asyncio.get_running_loop()
func = functools.partial(
self.delete_file_sync,
file_key=file_key,
)
return await loop.run_in_executor(self.executor, func)
def delete_file_sync(self, file_key: str):
"""
Deletes the file object matching the ``file_key``.
"""
s3_client = self.get_client()
return s3_client.delete_object(
Bucket=self.bucket_name,
Key=self._prepend_folder_name(file_key),
)
async def bulk_delete_files(self, file_keys: t.List[str]):
loop = asyncio.get_running_loop()
func = functools.partial(
self.bulk_delete_files_sync,
file_keys=file_keys,
)
await loop.run_in_executor(self.executor, func)
def bulk_delete_files_sync(self, file_keys: t.List[str]):
s3_client = self.get_client()
batch_size = 100
iteration = 0
while True:
batch = file_keys[
(iteration * batch_size) : ( # noqa: E203
iteration + 1 * batch_size
)
]
if not batch:
# https://github.com/nedbat/coveragepy/issues/772
break # pragma: no cover
s3_client.delete_objects(
Bucket=self.bucket_name,
Delete={
"Objects": [
{
"Key": self._prepend_folder_name(file_key),
}
for file_key in file_keys
],
},
)
iteration += 1
def get_file_keys_sync(self) -> t.List[str]:
"""
Returns the file key for each file we have stored.
"""
s3_client = self.get_client()
keys = []
start_after = None
while True:
extra_kwargs: t.Dict[str, t.Any] = {}
if start_after:
extra_kwargs["StartAfter"] = start_after
if self.folder_name:
extra_kwargs["Prefix"] = f"{self.folder_name}/"
response = s3_client.list_objects_v2(
Bucket=self.bucket_name,
**extra_kwargs,
)
contents = response.get("Contents")
if contents:
for obj in contents:
keys.append(obj["Key"])
start_after = keys[-1]
else:
# https://github.com/nedbat/coveragepy/issues/772
break # pragma: no cover
if self.folder_name:
prefix = f"{self.folder_name}/"
return [i.lstrip(prefix) for i in keys]
else:
return keys
async def get_file_keys(self) -> t.List[str]:
"""
Returns the file key for each file we have stored.
"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor, self.get_file_keys_sync
)
def __hash__(self):
return hash(
(
"s3",
self.connection_kwargs.get("endpoint_url"),
self.bucket_name,
self.folder_name,
)
)
def __eq__(self, value):
if not isinstance(value, S3MediaStorage):
return False
return value.__hash__() == self.__hash__()