Source code for exasol.bucketfs._path

from __future__ import annotations
from typing import Protocol, ByteString, BinaryIO, Iterable, Generator, Optional
from enum import Enum, auto
from pathlib import PurePath, PureWindowsPath
import errno
import os
from io import IOBase
from exasol.bucketfs._buckets import BucketLike, SaaSBucket, MountedBucket
from exasol.bucketfs._service import Service
from exasol.bucketfs._error import BucketFsError

ARCHIVE_SUFFIXES = [".tar", ".gz", ".tgz", ".zip", ".tar"]


class StorageBackend(Enum):
    onprem = auto()
    saas = auto()
    mounted = auto()


[docs] class PathLike(Protocol): """ Definition of the PathLike view of the files in a Bucket. """ @property def name(self) -> str: """ A string representing the final path component, excluding the drive and root, if any. """ @property def suffix(self) -> str: """ The file extension of the final component, if any. """ @property def root(self) -> str: """ A string representing the root, if any. """ @property def parent(self) -> PathLike: """ The logical parent of this path. """
[docs] def as_uri(self) -> str: """ Represent the path as a file URI. Can be used to reconstruct the location/path. """
[docs] def as_udf_path(self) -> str: """ This method is specific to a BucketFS flavour of the PathLike. It returns a corresponding path, as it's seen from a UDF. """
[docs] def exists(self) -> bool: """ Return True if the path points to an existing file or directory. """
[docs] def is_dir(self) -> bool: """ Return True if the path points to a directory, False if it points to another kind of file. """
[docs] def is_file(self) -> bool: """ Return True if the path points to a regular file, False if it points to another kind of file. """
[docs] def read(self, chunk_size: int = 8192) -> Iterable[ByteString]: """ Read the content of the file behind this path. Only works for PathLike objects which return True for `is_file()`. Args: chunk_size: which will be yielded by the iterator. Returns: Returns an iterator which can be used to read the contents of the path in chunks. Raises: FileNotFoundError: If the file does not exist. IsADirectoryError: if the pathlike object points to a directory. """
[docs] def write(self, data: ByteString | BinaryIO | Iterable[ByteString]) -> None: """ Writes data to this path. Q. Should it create the parent directory if it doesn't exit? A. Yes, it should. After successfully writing to this path `exists` will yield true for this path. If the file already existed it will be overwritten. Args: data: which shall be writen to the path. Raises: NotAFileError: if the pathlike object is not a file path. """
[docs] def rm(self) -> None: """ Remove this file. Note: If `exists()` and is_file yields true for this path, the path will be deleted, otherwise exception will be thrown. Raises: FileNotFoundError: If the file does not exist. """
[docs] def rmdir(self, recursive: bool = False) -> None: """ Removes this directory. Note: In order to stay close to pathlib, by default `rmdir` with `recursive` set to `False` won't delete non-empty directories. Args: recursive: if true the directory itself and its entire contents (files and subdirs) will be deleted. If false and the directory is not empty an error will be thrown. Raises: FileNotFoundError: If the file does not exist. PermissionError: If recursive is false and the directory is not empty. """
[docs] def joinpath(self, *path_segments) -> "PathLike": """ Calling this method is equivalent to combining the path with each of the given path segments in turn. Returns: A new pathlike object pointing the combined path. """
[docs] def walk(self, top_down: bool = True) -> Generator[tuple["PathLike", list[str], list[str]], None, None]: """ Generate the file names in a directory tree by walking the tree either top-down or bottom-up. Note: Try to mimik https://docs.python.org/3/library/pathlib.html#pathlib.Path.walk as closely as possible, except the functionality associated with the parameters of the `pathlib` walk. Yields: A 3-tuple of (dirpath, dirnames, filenames). """
[docs] def iterdir(self) -> Generator["PathLike", None, None]: """ When the path points to a directory, yield path objects of the directory contents. Note: If `path` points to a file then `iterdir()` will yield nothing. Yields: All direct children of the pathlike object. """
def __truediv__(self, other): """ Overload / for joining, see also joinpath or `pathlib.Path`. """
def _remove_archive_suffix(path: PurePath) -> PurePath: while path.suffix in ARCHIVE_SUFFIXES: path = path.with_suffix('') return path class _BucketFile: """ A node in a perceived file structure of a bucket. This can be a file, a directory or both. """ def __init__(self, name: str, parent: str = ''): self._name = name self._path = f'{parent}/{name}' if parent else name self._children: Optional[dict[str, "_BucketFile"]] = None self.is_file = False @property def name(self): return self._name @property def path(self): return self._path @property def is_dir(self): # The node can be a directory as well as a file, # hence is the is_dir property, independent of is_file. return bool(self._children) def __iter__(self): if self._children is None: return iter(()) return iter(self._children.values()) def get_child(self, child_name: str) -> "_BucketFile": """ Returns a child object with the specified name. Creates one if it hasn't been created yet. """ if self._children is None: self._children = {} child: Optional["_BucketFile"] = None else: child = self._children.get(child_name) if child is None: child = _BucketFile(child_name, self._path) self._children[child_name] = child return child class BucketPath: """ Implementation of the PathLike view for files in a bucket. """ def __init__(self, path: str | PurePath, bucket_api: BucketLike): """ :param path: A pure path of a file or directory. The path is assumed to be relative to the bucket. It is also permissible to have this path in an absolute form, e.g. '/dir1/...' or '\\\\abc\\...\\'. All Pure Path methods of the PathLike protocol will be delegated to this object. :param bucket_api: An object supporting the Bucket API protocol. """ self._path = PurePath(path) self._bucket_api = bucket_api def _get_relative_posix(self): """ Returns the pure path of this object as a string, in the format of a bucket file: 'dir/subdir/.../filename'. """ path_str = str(self._path)[len(self._path.anchor):] if isinstance(self._path, PureWindowsPath): path_str = path_str.replace('\\', '/') if path_str == '.': path_str = '' return path_str def _navigate(self) -> Optional[_BucketFile]: """ Reads the bucket file structure and navigates to the node corresponding to the pure path of this object. Returns None if such node doesn't exist, otherwise returns this node. """ path_str = self._get_relative_posix() path_len = len(path_str) path_root: Optional[_BucketFile] = None for file_name in self._bucket_api.files: if file_name.startswith(path_str): path_root = path_root or _BucketFile(self._path.name, str(self.parent)) node = path_root for part in file_name[path_len:].split('/'): if part: node = node.get_child(part) node.is_file = True return path_root @property def name(self) -> str: return self._path.name @property def suffix(self) -> str: return self._path.suffix @property def root(self) -> str: return self._path.root @property def parent(self) -> PathLike: return BucketPath(self._path.parent, self._bucket_api) def as_uri(self) -> str: return self._path.as_uri() def as_udf_path(self) -> str: return str(PurePath(self._bucket_api.udf_path) / _remove_archive_suffix(self._path)) def exists(self) -> bool: return self._navigate() is not None def is_dir(self) -> bool: current_node = self._navigate() return (current_node is not None) and current_node.is_dir def is_file(self) -> bool: current_node = self._navigate() return (current_node is not None) and current_node.is_file def read(self, chunk_size: int = 8192) -> Iterable[ByteString]: return self._bucket_api.download(str(self._path), chunk_size) def write(self, data: ByteString | BinaryIO | Iterable[ByteString]) -> None: if (not isinstance(data, IOBase) and isinstance(data, Iterable) and all(isinstance(chunk, ByteString) for chunk in data)): data = b''.join(data) self._bucket_api.upload(str(self._path), data) def rm(self) -> None: current_node = self._navigate() if current_node is None: raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(self._path)) if not current_node.is_file: raise IsADirectoryError(errno.EISDIR, os.strerror(errno.EISDIR), str(self._path)) self._bucket_api.delete(str(self._path)) def rmdir(self, recursive: bool = False) -> None: current_node = self._navigate() if current_node is None: # There is no such thing as an empty directory. So, for the sake of # compatibility with the PathLike, any directory that doesn't exist # is considered empty. return if not current_node.is_dir: raise NotADirectoryError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), str(self._path)) if recursive: self._rmdir_recursive(current_node) else: raise OSError(errno.ENOTEMPTY, os.strerror(errno.ENOTEMPTY), str(self._path)) def _rmdir_recursive(self, node: _BucketFile): for child in node: self._rmdir_recursive(child) if node.is_file: self._bucket_api.delete(node.path) def joinpath(self, *path_segments) -> PathLike: # The path segments can be of either this type or an os.PathLike. cls = type(self) seg_paths = [seg._path if isinstance(seg, cls) else seg for seg in path_segments] new_path = self._path.joinpath(*seg_paths) return cls(new_path, self._bucket_api) def walk(self, top_down: bool = True) -> Generator[tuple[PathLike, list[str], list[str]], None, None]: current_node = self._navigate() if current_node is None: raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(self._path)) if current_node.is_dir: yield from self._walk_recursive(current_node, top_down) def _walk_recursive(self, node: _BucketFile, top_down: bool) -> \ Generator[tuple[PathLike, list[str], list[str]], None, None]: bucket_path = BucketPath(node.path, self._bucket_api) dir_list: list[str] = [] file_list: list[str] = [] for child in node: if child.is_file: file_list.append(child.name) if child.is_dir: dir_list.append(child.name) # The difference between the top_down and bottom_up is in the order of # yielding the current node and its children. Top down - current node first, # bottom_up - children first. if top_down: yield bucket_path, dir_list, file_list for child in node: if child.is_dir: yield from self._walk_recursive(child, top_down) if not top_down: yield bucket_path, dir_list, file_list def iterdir(self) -> Generator[PathLike, None, None]: current_node = self._navigate() if current_node is None: raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(self._path)) if not current_node.is_dir: raise NotADirectoryError(errno.ENOTDIR, os.strerror(errno.ENOTDIR), str(self._path)) for child in current_node: yield BucketPath(self._path / child.name, self._bucket_api) def __truediv__(self, other): # The other object can be of either this type or an os.PathLike. cls = type(self) new_path = self._path / (other._path if isinstance(other, cls) else other) return cls(new_path, self._bucket_api) def __eq__(self, other) -> bool: if not isinstance(other, BucketPath): return False return (self._path, self._bucket_api) == (other._path, other._bucket_api) def __str__(self): return str(self._path) def _create_onprem_bucket(url: str, username: str, password: str, bucket_name: str = 'default', verify: bool | str = True, service_name: Optional[str] = None ) -> BucketLike: """ Creates an on-prem bucket. """ credentials = {bucket_name: {'username': username, 'password': password}} service = Service(url, credentials, verify, service_name) buckets = service.buckets if bucket_name not in buckets: raise BucketFsError(f'Bucket {bucket_name} does not exist.') return buckets[bucket_name] def _create_saas_bucket(account_id: str, database_id: str, pat: str, url: str = 'https://cloud.exasol.com' ) -> BucketLike: """ Creates a SaaS bucket. """ return SaaSBucket(url=url, account_id=account_id, database_id=database_id, pat=pat) def _create_mounted_bucket(service_name: str = 'bfsdefault', bucket_name: str = 'default', base_path: Optional[str] = None ) -> BucketLike: """ Creates a bucket mounted to a UDF. """ bucket = MountedBucket(service_name, bucket_name, base_path) if not bucket.root.exists(): raise BucketFsError(f'Service {service_name} or bucket {bucket_name} do not exist.') return bucket
[docs] def build_path(**kwargs) -> PathLike: """ Creates a PathLike object based on a bucket in one of the BucketFS storage backends. It provides the same interface for the following BucketFS implementations: - On-Premises - SaaS - BucketFS files mounted as read-only directory in a UDF. Arguments: backend: This is a mandatory parameter that indicates the BucketFS storage backend. The available backends are defined in the StorageBackend enumeration, Currently, these are "onprem", "saas" and "mounted". The parameter value can be provided either as a string, e.g. "onprem", or as an enum, e.g. StorageBackend.onprem. path: Optional parameter that selects a path within the bucket. If not provided the returned PathLike objects corresponds to the root of the bucket. Hence, an alternative way of creating a PathLike pointing to a particular file or directory is as in the code below. path = build_path(...) / "the_desired_path" The rest of the arguments are backend specific. On-prem arguments: url: Url of the BucketFS service, e.g. `http(s)://127.0.0.1:2580`. username: BucketFS username (generally, different from the DB username). password: BucketFS user password. bucket_name: Name of the bucket. Currently, a PathLike cannot span multiple buckets. verify: Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. Defaults to ``True``. service_name: Optional name of the BucketFS service. SaaS arguments: url: Url of the Exasol SaaS. Defaults to 'https://cloud.exasol.com'. account_id: SaaS user account ID, e.g. 'org_LVeOj4pwXhPatNz5' (given example is not a valid ID of an existing account). database_id: Database ID, e.g. 'msduZKlMR8QCP_MsLsVRwy' (given example is not a valid ID of an existing database). pat: Personal Access Token, e.g. 'exa_pat_aj39AsM3bYR9bQ4qk2wiG8SWHXbRUGNCThnep5YV73az6A' (given example is not a valid PAT). Mounted BucketFS directory arguments: service_name: Name of the BucketFS service (not a service url). Defaults to 'bfsdefault'. bucket_name: Name of the bucket. Currently, a PathLike cannot span multiple buckets. base_path: Explicitly specified root path in a file system. This is an alternative to providing the service_name and the bucket_name. """ backend = kwargs.pop('backend', StorageBackend.onprem) path = kwargs.pop('path') if 'path' in kwargs else '' if isinstance(backend, str): backend = StorageBackend[backend.lower()] if backend == StorageBackend.onprem: bucket = _create_onprem_bucket(**kwargs) elif backend == StorageBackend.saas: bucket = _create_saas_bucket(**kwargs) else: bucket = _create_mounted_bucket(**kwargs) return BucketPath(path, bucket)