"""
This module contains a python api to programmatically access exasol bucketfs service(s).
.. attention:
If no python api is required, one can also use CLI tools like CURL and HTTPIE to access bucketfs services.
Example's using CURL and HTTPIE
-------------------------------
1. Listing buckets of a bucketfs service
HTTPIE:
$ http GET http://127.0.0.1:6666/
CURL:
$ curl -i http://127.0.0.1:6666/
2. List all files in the bucket "default"
HTTPIE:
$ http --auth w:write --auth-type basic GET http://127.0.0.1:6666/default
CURL:
$ curl -i -u "w:write" http://127.0.0.1:6666/default
3. Upload file into a bucket
HTTPIE:
$ http --auth w:write --auth-type basic PUT http://127.0.0.1:6666/default/myfile.txt @some-file.txt
CURL:
$ curl -i -u "w:write" -X PUT --binary-data @some-file.txt http://127.0.0.1:6666/default/myfile.txt
4. Download a file from a bucket
HTTPIE:
$ http --auth w:write --auth-type basic --download GET http://127.0.0.1:6666/default/myfile.txt
CURL:
$ curl -u "w:write" --output myfile.txt http://127.0.0.1:6666/default/myfile.txt
"""
from __future__ import annotations
import hashlib
from collections import defaultdict
from pathlib import Path
from typing import (
BinaryIO,
ByteString,
Iterable,
Iterator,
Mapping,
MutableMapping,
Union,
)
from urllib.parse import urlparse
import requests
from requests import HTTPError
from requests.auth import HTTPBasicAuth
__all__ = [
"Service",
"Bucket",
"MappedBucket",
"as_bytes",
"as_string",
"as_file",
"as_hash",
]
[docs]
class BucketFsError(Exception):
"""Error occurred while interacting with the bucket fs service."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _lines(response):
lines = (line for line in response.text.split("\n") if not line.isspace())
return (line for line in lines if line != "")
def _build_url(service_url, bucket=None, path=None) -> str:
info = urlparse(service_url)
url = f"{info.scheme}://{info.hostname}:{info.port}"
if bucket is not None:
url += f"/{bucket}"
if path is not None:
url += f"/{path}"
return url
def _parse_service_url(url: str) -> str:
supported_schemes = ("http", "https")
elements = urlparse(url)
if elements.scheme not in supported_schemes:
raise BucketFsError(
f"Invalid scheme: {elements.scheme}. Supported schemes [{', '.join(supported_schemes)}]"
)
if not elements.netloc:
raise BucketFsError(f"Invalid location: {elements.netloc}")
# use bucket fs default port if no explicit port was specified
port = elements.port if elements.port else 2580
return f"{elements.scheme}://{elements.hostname}:{port}"
[docs]
class Service:
"""Provides a simple to use api to access a bucketfs service.
Attributes:
buckets: lists all available buckets.
"""
[docs]
def __init__(
self,
url: str,
credentials: Mapping[str, Mapping[str, str]] = None,
verify: bool | str = True,
):
"""Create a new Service instance.
Args:
url:
Url of the bucketfs service, e.g. `http(s)://127.0.0.1:2580`.
credentials:
A mapping containing credentials (username and password) for buckets.
E.g. {"bucket1": { "username": "foo", "password": "bar" }}
verify:
Either a boolean, in which case it controls whether we verify
the server's TLS certificate, or a string, in which case it must be a path
to a CA bundle to use. Defaults to ``True``.
"""
self._url = _parse_service_url(url)
self._authenticator = defaultdict(
lambda: {"username": "r", "password": "read"},
credentials if credentials is not None else {},
)
self._verify = verify
@property
def buckets(self) -> MutableMapping[str, Bucket]:
"""List all available buckets."""
url = _build_url(service_url=self._url)
response = requests.get(url, verify=self._verify)
try:
response.raise_for_status()
except HTTPError as ex:
raise BucketFsError(
f"Couldn't list of all buckets from: {self._url}"
) from ex
buckets = _lines(response)
return {
name: Bucket(
name=name,
service=self._url,
username=self._authenticator[name]["username"],
password=self._authenticator[name]["password"],
)
for name in buckets
}
def __str__(self) -> str:
return f"Service<{self._url}>"
def __iter__(self) -> Iterator[Bucket]:
yield from self.buckets
def __getitem__(self, item: str) -> Bucket:
return self.buckets[item]
[docs]
class Bucket:
[docs]
def __init__(
self,
name: str,
service: str,
username: str,
password: str,
verify: bool | str = True,
):
"""
Create a new bucket instance.
Args:
name:
Name of the bucket.
service:
Url where this bucket is hosted on.
username:
Username used for authentication.
password:
Password used for authentication.
verify:
Either a boolean, in which case it controls whether we verify
the server's TLS certificate, or a string, in which case it must be a path
to a CA bundle to use. Defaults to ``True``.
"""
self._name = name
self._service = _parse_service_url(service)
self._username = username
self._password = password
self._verify = verify
def __str__(self):
return f"Bucket<{self.name} | on: {self._service}>"
@property
def name(self) -> str:
return self._name
@property
def _auth(self) -> HTTPBasicAuth:
return HTTPBasicAuth(username=self._username, password=self._password)
@property
def files(self) -> Iterable[str]:
url = _build_url(service_url=self._service, bucket=self.name)
response = requests.get(url, auth=self._auth, verify=self._verify)
try:
response.raise_for_status()
except HTTPError as ex:
raise BucketFsError(
f"Couldn't retrieve file list form bucket: {self.name}"
) from ex
return {line for line in _lines(response)}
def __iter__(self) -> Iterator[str]:
yield from self.files
[docs]
def upload(
self, path: str, data: ByteString | BinaryIO | Iterable[ByteString]
) -> None:
"""
Uploads a file onto this bucket
Args:
path: in the bucket the file shall be associated with.
data: raw content of the file.
"""
url = _build_url(service_url=self._service, bucket=self.name, path=path)
response = requests.put(url, data=data, auth=self._auth, verify=self._verify)
try:
response.raise_for_status()
except HTTPError as ex:
raise BucketFsError(f"Couldn't upload file: {path}") from ex
[docs]
def delete(self, path) -> None:
"""
Deletes a specific file in this bucket.
Args:
path: points to the file which shall be deleted.
Raises:
A BucketFsError if the operation couldn't be executed successfully.
"""
url = _build_url(service_url=self._service, bucket=self.name, path=path)
response = requests.delete(url, auth=self._auth, verify=self._verify)
try:
response.raise_for_status()
except HTTPError as ex:
raise BucketFsError(f"Couldn't delete: {path}") from ex
[docs]
def download(self, path: str, chunk_size: int = 8192) -> Iterable[ByteString]:
"""
Downloads a specific file of this bucket.
Args:
path: which shall be downloaded.
chunk_size: which shall be used for downloading.
Returns:
An iterable of binary chunks representing the downloaded file.
"""
url = _build_url(service_url=self._service, bucket=self.name, path=path)
with requests.get(
url, stream=True, auth=self._auth, verify=self._verify
) as response:
try:
response.raise_for_status()
except HTTPError as ex:
raise BucketFsError(f"Couldn't download: {path}") from ex
yield from response.iter_content(chunk_size=chunk_size)
[docs]
class MappedBucket:
"""
Wraps a bucket and provides various convenience features to it (e.g. index based access).
Attention:
Even though this class provides a very convenient interface,
the functionality of this class should be used with care.
Even though it may not be obvious, all the provided features do involve interactions with a bucketfs service
in the background (upload, download, sync, etc.).
Keep this in mind when using this class.
"""
[docs]
def __init__(self, bucket: Bucket, chunk_size: int = 8192):
"""
Creates a new MappedBucket.
Args:
bucket: which shall be wrapped.
chunk_size: which shall be used for downloads.
"""
self._bucket = bucket
self._chunk_size = chunk_size
@property
def chunk_size(self) -> int:
"""Chunk size which will be used for downloads."""
return self._chunk_size
@chunk_size.setter
def chunk_size(self, value: int) -> None:
self._chunk_size = value
def __iter__(self) -> Iterable[str]:
yield from self._bucket.files
def __setitem__(
self, key: str, value: ByteString | BinaryIO | Iterable[ByteString]
) -> None:
"""
Uploads a file onto this bucket.
See also Bucket:upload
"""
self._bucket.upload(path=key, data=value)
def __delitem__(self, key: str) -> None:
"""
Deletes a file from the bucket.
See also Bucket:delete
"""
self._bucket.delete(path=key)
def __getitem__(self, item: str) -> Iterable[ByteString]:
"""
Downloads a file from this bucket.
See also Bucket::download
"""
return self._bucket.download(item, self._chunk_size)
def __str__(self):
return f"MappedBucket<{self._bucket}>"
def _chunk_as_bytes(chunk: int | ByteString) -> ByteString:
"""
In some scenarios python converts single bytes to integers:
>>> chunks = [type(chunk) for chunk in b"abc"]
>>> chunks
... [<class 'int'>, <class 'int'>, <class 'int'>]
in order to cope with this transparently this wrapper can be used.
"""
if not isinstance(chunk, Iterable):
chunk = bytes([chunk])
return chunk
def _bytes(chunks: Iterable[ByteString]) -> ByteString:
chunks = (_chunk_as_bytes(c) for c in chunks)
data = bytearray()
for chunk in chunks:
data.extend(chunk)
return data
[docs]
def as_bytes(chunks: Iterable[ByteString]) -> ByteString:
"""
Transforms a set of byte chunks into a bytes like object.
Args:
chunks: which shall be concatenated.
Return:
A single continues byte like object.
"""
return _bytes(chunks)
[docs]
def as_string(chunks: Iterable[ByteString], encoding: str = "utf-8") -> str:
"""
Transforms a set of byte chunks into a string.
Args:
chunks: which shall be converted into a single string.
encoding: which shall be used to convert the bytes to a string.
Return:
A string representation of the converted bytes.
"""
return _bytes(chunks).decode(encoding)
[docs]
def as_file(chunks: Iterable[ByteString], filename: str | Path) -> Path:
"""
Transforms a set of byte chunks into a string.
Args:
chunks: which shall be written to file.
filename: for the file which is to be created.
Return:
A path to the created file.
"""
chunks = (_chunk_as_bytes(c) for c in chunks)
filename = Path(filename)
with open(filename, "wb") as f:
for chunk in chunks:
f.write(chunk)
return filename
[docs]
def as_hash(chunks: Iterable[ByteString], algorithm: str = "sha1") -> ByteString:
"""
Calculate the hash for a set of byte chunks.
Args:
chunks: which shall be used as input for the checksum.
algorithm: which shall be used for calculating the checksum.
Return:
A string representing the hex digest.
"""
try:
hasher = hashlib.new(algorithm)
except ValueError as ex:
raise BucketFsError(
"Algorithm ({algorithm}) is not available, please use [{algorithms}]".format(
algorithm=algorithm, algorithms=",".join(hashlib.algorithms_available)
)
) from ex
chunks = (_chunk_as_bytes(c) for c in chunks)
for chunk in chunks:
hasher.update(chunk)
return hasher.digest()