# Copyright (c) 2025 Softwell Srl, Milano, Italy
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base backend interface for genro-storage.
This module defines the abstract base class that all storage backends
must implement. It provides the contract for file operations across
different storage systems.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import BinaryIO, TextIO
from dataclasses import asdict
from ..capabilities import BackendCapabilities
[docs]
class StorageBackend(ABC):
"""Abstract base class for storage backends.
All storage backend implementations (Local, S3, GCS, Azure, HTTP, etc.)
must inherit from this class and implement all abstract methods.
This ensures a consistent interface across all storage types and makes
it easy to add new backends in the future.
Note:
Backend implementations should not be instantiated directly by users.
They are created internally by StorageManager based on configuration.
Capability System:
Capabilities are automatically derived from methods decorated with @capability.
The decorator populates the _capabilities set during class definition,
and __init_subclass__ ensures proper inheritance.
"""
# Class variable: maps protocol → set of capabilities
# Populated automatically by @capability decorator or declared manually
PROTOCOL_CAPABILITIES: dict[str, set[str]] = {}
[docs]
def __init_subclass__(cls, **kwargs):
"""Automatically collect and inherit capabilities when subclass is created.
This method is called when a subclass of StorageBackend is defined.
It collects PROTOCOL_CAPABILITIES from parent classes and merges them.
"""
super().__init_subclass__(**kwargs)
# Collect capabilities from all parent classes
inherited_caps = {}
for base in cls.__mro__[1:]: # Skip cls itself
if hasattr(base, 'PROTOCOL_CAPABILITIES'):
for protocol, caps in base.PROTOCOL_CAPABILITIES.items():
if protocol not in inherited_caps:
inherited_caps[protocol] = set()
inherited_caps[protocol].update(caps)
# Get capabilities defined in this class (added by @capability decorator)
own_caps = cls.__dict__.get('PROTOCOL_CAPABILITIES', {})
# Merge inherited and own capabilities
all_caps = inherited_caps.copy()
for protocol, caps in own_caps.items():
if protocol not in all_caps:
all_caps[protocol] = set()
all_caps[protocol].update(caps)
cls.PROTOCOL_CAPABILITIES = all_caps
[docs]
@classmethod
def get_capabilities(cls, protocol: str | None = None) -> set[str]:
"""Get capability set for a given protocol.
For single-protocol backends (LocalStorage, Base64Backend), protocol
parameter is optional and defaults to the backend's only protocol.
For multi-protocol backends (FsspecBackend), protocol must be specified.
Args:
protocol: Protocol name (e.g., 's3', 'gcs', 'local', 'base64')
If None, returns capabilities for the only available protocol
Returns:
set: Set of capability names
Examples:
>>> # Single-protocol backend
>>> LocalStorage.get_capabilities() # protocol auto-detected
{'read', 'write', 'delete', 'mkdir', ...}
>>> # Multi-protocol backend
>>> FsspecBackend.get_capabilities('s3')
{'read', 'write', 'metadata', 'presigned_urls', ...}
"""
if protocol is None:
# Auto-detect for single-protocol backends
if len(cls.PROTOCOL_CAPABILITIES) == 1:
protocol = list(cls.PROTOCOL_CAPABILITIES.keys())[0]
else:
raise ValueError(
f"{cls.__name__} supports multiple protocols. "
f"Please specify protocol parameter. "
f"Available: {list(cls.PROTOCOL_CAPABILITIES.keys())}"
)
if protocol not in cls.PROTOCOL_CAPABILITIES:
raise ValueError(
f"Unknown protocol '{protocol}' for {cls.__name__}. "
f"Available: {list(cls.PROTOCOL_CAPABILITIES.keys())}"
)
return cls.PROTOCOL_CAPABILITIES[protocol]
@property
def capabilities(self) -> BackendCapabilities:
"""Return the capabilities of this backend instance.
For single-protocol backends, automatically uses the only protocol.
For multi-protocol backends (FsspecBackend), uses self.protocol.
Returns:
BackendCapabilities: Object describing supported features
Examples:
>>> backend = LocalStorage('/tmp')
>>> caps = backend.capabilities
>>> if caps.versioning:
... versions = backend.get_versions('file.txt')
"""
# For multi-protocol backends, get protocol from instance
protocol = getattr(self, 'protocol', None)
caps_set = self.get_capabilities(protocol)
# Build kwargs dict for BackendCapabilities from the capability set
kwargs = {}
for field in BackendCapabilities.__dataclass_fields__:
kwargs[field] = field in caps_set
return BackendCapabilities(**kwargs)
[docs]
@classmethod
def get_json_info(cls, protocol: str | None = None) -> dict:
"""Return complete backend information in JSON format.
This classmethod can be overridden by backend subclasses to provide
complete information including configuration schema, capabilities,
and description. This is useful for UI generation and documentation.
The default implementation returns capabilities derived from @capability
decorators, but no schema information.
Args:
protocol: Protocol name for multi-protocol backends (optional for single-protocol)
Returns:
dict: Backend information with schema, capabilities, and description
Examples:
>>> # Single-protocol backend
>>> info = LocalStorage.get_json_info()
>>> print(info['schema']['fields'])
[{'name': 'path', 'type': 'text', 'required': True}]
>>> # Multi-protocol backend
>>> info = FsspecBackend.get_json_info('s3')
>>> print(info['capabilities']['metadata'])
True
"""
# Get capabilities for the protocol
caps_set = cls.get_capabilities(protocol)
# Build capabilities dict from set
caps_dict = {}
for field in BackendCapabilities.__dataclass_fields__:
caps_dict[field] = field in caps_set
# Determine backend name from protocol or class name
if protocol:
backend_name = protocol
elif len(cls.PROTOCOL_CAPABILITIES) == 1:
backend_name = list(cls.PROTOCOL_CAPABILITIES.keys())[0]
else:
backend_name = cls.__name__.lower().replace('backend', '').replace('storage', '')
return {
"backend": backend_name,
"schema": {
"fields": []
},
"capabilities": caps_dict,
"description": "No description available"
}
[docs]
@abstractmethod
def exists(self, path: str) -> bool:
"""Check if a file or directory exists.
Args:
path: Relative path within this storage backend
Returns:
bool: True if file or directory exists
Examples:
>>> exists = backend.exists('documents/report.pdf')
"""
pass
[docs]
@abstractmethod
def is_file(self, path: str) -> bool:
"""Check if path points to a file.
Args:
path: Relative path within this storage backend
Returns:
bool: True if path is a file, False otherwise
Examples:
>>> if backend.is_file('documents/report.pdf'):
... print("It's a file")
"""
pass
[docs]
@abstractmethod
def is_dir(self, path: str) -> bool:
"""Check if path points to a directory.
Args:
path: Relative path within this storage backend
Returns:
bool: True if path is a directory, False otherwise
Examples:
>>> if backend.is_dir('documents'):
... print("It's a directory")
"""
pass
[docs]
@abstractmethod
def size(self, path: str) -> int:
"""Get file size in bytes.
Args:
path: Relative path to file
Returns:
int: File size in bytes
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If path is a directory
Examples:
>>> size = backend.size('documents/report.pdf')
>>> print(f"File is {size} bytes")
"""
pass
[docs]
@abstractmethod
def mtime(self, path: str) -> float:
"""Get last modification time.
Args:
path: Relative path to file or directory
Returns:
float: Unix timestamp of last modification
Raises:
FileNotFoundError: If path doesn't exist
Examples:
>>> from datetime import datetime
>>> timestamp = backend.mtime('documents/report.pdf')
>>> mod_time = datetime.fromtimestamp(timestamp)
"""
pass
[docs]
@abstractmethod
def open(self, path: str, mode: str = 'rb') -> BinaryIO | TextIO:
"""Open a file and return file-like object.
Args:
path: Relative path to file
mode: File mode ('r', 'rb', 'w', 'wb', 'a', 'ab')
Returns:
BinaryIO | TextIO: File-like object supporting context manager
Raises:
FileNotFoundError: If file doesn't exist (in read mode)
PermissionError: If insufficient permissions
Examples:
>>> with backend.open('file.txt', 'rb') as f:
... data = f.read()
"""
pass
[docs]
@abstractmethod
def read_bytes(self, path: str) -> bytes:
"""Read entire file as bytes.
Args:
path: Relative path to file
Returns:
bytes: Complete file contents
Raises:
FileNotFoundError: If file doesn't exist
Examples:
>>> data = backend.read_bytes('image.jpg')
"""
pass
[docs]
@abstractmethod
def read_text(self, path: str, encoding: str = 'utf-8') -> str:
"""Read entire file as text.
Args:
path: Relative path to file
encoding: Text encoding
Returns:
str: Complete file contents as string
Raises:
FileNotFoundError: If file doesn't exist
UnicodeDecodeError: If encoding is incorrect
Examples:
>>> content = backend.read_text('document.txt')
"""
pass
[docs]
@abstractmethod
def write_bytes(self, path: str, data: bytes) -> None:
"""Write bytes to file.
Args:
path: Relative path to file
data: Bytes to write
Raises:
PermissionError: If insufficient permissions
FileNotFoundError: If parent directory doesn't exist
Examples:
>>> backend.write_bytes('file.bin', b'Hello')
"""
pass
[docs]
@abstractmethod
def write_text(self, path: str, text: str, encoding: str = 'utf-8') -> None:
"""Write text to file.
Args:
path: Relative path to file
text: String to write
encoding: Text encoding
Raises:
PermissionError: If insufficient permissions
FileNotFoundError: If parent directory doesn't exist
Examples:
>>> backend.write_text('file.txt', 'Hello World')
"""
pass
[docs]
@abstractmethod
def delete(self, path: str, recursive: bool = False) -> None:
"""Delete file or directory.
Args:
path: Relative path to delete
recursive: If True, delete directories recursively
Raises:
FileNotFoundError: If path doesn't exist (implementation may choose to be idempotent)
ValueError: If path is non-empty directory and recursive=False
Examples:
>>> backend.delete('file.txt')
>>> backend.delete('folder', recursive=True)
"""
pass
[docs]
@abstractmethod
def list_dir(self, path: str) -> list[str]:
"""List directory contents.
Args:
path: Relative path to directory
Returns:
list[str]: List of names (not full paths) in the directory
Raises:
FileNotFoundError: If directory doesn't exist
ValueError: If path is not a directory
Examples:
>>> names = backend.list_dir('documents')
>>> for name in names:
... print(name) # Just 'report.pdf', not 'documents/report.pdf'
"""
pass
[docs]
@abstractmethod
def mkdir(self, path: str, parents: bool = False, exist_ok: bool = False) -> None:
"""Create directory.
Args:
path: Relative path to create
parents: If True, create parent directories as needed
exist_ok: If True, don't error if directory exists
Raises:
FileExistsError: If exists and exist_ok=False
FileNotFoundError: If parent doesn't exist and parents=False
Examples:
>>> backend.mkdir('new_folder')
>>> backend.mkdir('a/b/c', parents=True)
"""
pass
[docs]
@abstractmethod
def copy(self, src_path: str, dest_backend: 'StorageBackend', dest_path: str) -> str | None:
"""Copy file/directory to another backend.
This method handles cross-backend copying efficiently, streaming
data when possible to avoid loading large files in memory.
Args:
src_path: Source path in this backend
dest_backend: Destination backend (may be different type)
dest_path: Destination path in dest_backend
Returns:
str | None: New destination path if destination backend changes it
(e.g., base64 backend), or None if path unchanged
Raises:
FileNotFoundError: If source doesn't exist
PermissionError: If insufficient permissions
Examples:
>>> # Copy within same backend
>>> backend.copy('file.txt', backend, 'backup/file.txt')
>>>
>>> # Copy to different backend
>>> backend.copy('file.txt', other_backend, 'file.txt')
"""
pass
[docs]
def get_hash(self, path: str) -> str | None:
"""Get MD5 hash from filesystem metadata if available.
This method attempts to retrieve the MD5 hash from the storage
backend's metadata without reading the file content. For cloud
storage like S3, this uses the ETag. For local storage, this
returns None and the hash must be computed by reading the file.
Args:
path: Relative path to file
Returns:
str | None: MD5 hash as hexadecimal string, or None if not available
Examples:
>>> hash_value = backend.get_hash('file.txt')
>>> if hash_value:
... print(f"MD5: {hash_value}")
"""
return None # Default: no metadata hash available
[docs]
def get_versions(self, path: str) -> list[dict]:
"""Get list of available versions for a file.
Returns version history for versioned storage. Default implementation
returns empty list (no versioning support).
Args:
path: Relative path to file
Returns:
list[dict]: List of version info dicts
Notes:
- Override in subclasses that support versioning
- S3 with versioning enabled can implement this
"""
return [] # Default: no versioning
[docs]
def open_version(self, path: str, version_id: str, mode: str = 'rb'):
"""Open a specific version of a file.
Default implementation raises PermissionError. Override in subclasses
that support versioning (e.g., S3).
Args:
path: Relative path to file
version_id: Version identifier
mode: Open mode (read-only)
Raises:
PermissionError: Always (base implementation)
"""
raise PermissionError(
f"{self.__class__.__name__} does not support versioning"
)
[docs]
def delete_version(self, path: str, version_id: str) -> None:
"""Delete a specific version of a file.
Removes a specific version from versioned storage. The current version
and other versions remain unaffected. This is useful for cleaning up
duplicate or unwanted versions.
Default implementation raises PermissionError. Override in subclasses
that support versioning (e.g., S3).
Args:
path: Relative path to file
version_id: Version identifier to delete
Raises:
PermissionError: If backend doesn't support versioning
FileNotFoundError: If version doesn't exist
ValueError: If attempting to delete the only remaining version
Examples:
>>> # Delete a specific version
>>> backend.delete_version('file.txt', 'abc123')
Notes:
- Cannot delete the current version if it's the only version
- Some backends may have restrictions on version deletion
- This operation is typically irreversible
"""
raise PermissionError(
f"{self.__class__.__name__} does not support version deletion"
)
[docs]
def url(self, path: str, expires_in: int = 3600, **kwargs) -> str | None:
"""Generate public URL for file access.
Returns a URL that can be used to access the file directly.
For cloud storage (S3, GCS, Azure), this generates a presigned URL.
For local storage, this returns None or a local file path URL.
Args:
path: Relative path to file
expires_in: URL expiration time in seconds (default: 3600 = 1 hour)
**kwargs: Backend-specific options
Returns:
str | None: Public URL or None if not supported
Examples:
>>> # S3 presigned URL (expires in 1 hour)
>>> url = backend.url('documents/report.pdf')
>>> print(url)
'https://bucket.s3.amazonaws.com/documents/report.pdf?X-Amz-...'
>>>
>>> # Custom expiration (24 hours)
>>> url = backend.url('video.mp4', expires_in=86400)
Notes:
- Cloud storage URLs are temporary and expire
- Local storage typically returns None
- HTTP storage returns the direct URL
"""
return None # Default: no URL generation
[docs]
def internal_url(self, path: str, nocache: bool = False) -> str | None:
"""Generate internal/relative URL for file access.
Returns a URL suitable for internal application use, typically
relative to the application's base URL. Optionally includes
cache busting parameters.
Args:
path: Relative path to file
nocache: If True, append mtime as query parameter for cache busting
Returns:
str | None: Internal URL or None if not supported
Examples:
>>> # Simple internal URL
>>> url = backend.internal_url('images/logo.png')
>>> print(url)
'/storage/home/images/logo.png'
>>>
>>> # With cache busting
>>> url = backend.internal_url('app.js', nocache=True)
>>> print(url)
'/storage/home/app.js?mtime=1234567890'
Notes:
- Useful for web applications
- Cache busting helps with CDN/browser caching
- Format depends on application configuration
"""
return None # Default: no internal URL
[docs]
def local_path(self, path: str, mode: str = 'r'):
"""Get a local filesystem path for the file.
Returns a context manager that provides a local filesystem path
to the file. For local storage, this returns the actual path.
For remote storage (S3, GCS, etc.), this downloads the file to
a temporary location, yields the temp path, and uploads changes
back on exit if the file was modified.
This is essential for integrating with external tools that only
work with local filesystem paths (ffmpeg, ImageMagick, etc.).
Args:
path: Relative path to file
mode: Access mode - 'r' (read-only), 'w' (write-only), 'rw' (read-write)
Returns:
Context manager yielding str (local filesystem path)
Examples:
>>> # Process remote file with external tool
>>> with backend.local_path('video.mp4', mode='r') as local_path:
... subprocess.run(['ffmpeg', '-i', local_path, 'output.mp4'])
>>>
>>> # Modify remote file in place
>>> with backend.local_path('image.jpg', mode='rw') as local_path:
... subprocess.run(['convert', local_path, '-resize', '800x600', local_path])
>>> # Changes automatically uploaded on exit
Notes:
- For read mode ('r'), the file is downloaded but not uploaded
- For write mode ('w'), the file is uploaded on exit
- For read-write mode ('rw'), both download and upload occur
- Temporary files are automatically cleaned up on exit
- For local storage, returns the original path (no copy)
"""
from contextlib import contextmanager
@contextmanager
def _local_path():
yield self._get_local_path(path, mode)
return _local_path()
def _get_local_path(self, path: str, mode: str) -> str:
"""Implementation detail for local_path. Override in subclasses if needed."""
raise NotImplementedError(f"{self.__class__.__name__} must implement _get_local_path")
[docs]
def close(self) -> None:
"""Close backend and release resources.
This method is called when the backend is no longer needed.
Implementations should close any open connections, file handles, etc.
The default implementation does nothing. Backends that manage
resources should override this method.
Examples:
>>> backend.close()
"""
pass