# Copyright (c) 2025 Softwell Srl, Milano, Italy
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""StorageManager - Main entry point for configuring and accessing storage.
This module provides the StorageManager class which is the primary interface
for configuring storage backends and creating StorageNode instances.
"""
from __future__ import annotations
from typing import Any, Annotated
import json
from pathlib import Path
# TODO: Replace with genro_core.decorators.api.apiready when available
from .decorators import apiready
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
from .node import StorageNode
from .exceptions import StorageConfigError, StorageNotFoundError
from .backends import StorageBackend
from .backends.local import LocalStorage
from .backends.fsspec import FsspecBackend
from .backends.base64 import Base64Backend
from .backends.relative import RelativeMountBackend
[docs]
@apiready(path="/storage")
class StorageManager:
"""Main entry point for configuring and accessing storage.
StorageManager is responsible for:
- Configuring mount points that map to storage backends
- Creating StorageNode instances for file/directory access
- Managing the lifecycle of storage backend connections
A mount point is a logical name (e.g., "home", "uploads", "s3") that
maps to an actual storage backend (local filesystem, S3 bucket, etc.).
Examples:
>>> # Create manager
>>> storage = StorageManager()
>>>
>>> # Configure from file
>>> storage.configure('/etc/app/storage.yaml')
>>>
>>> # Configure programmatically
>>> storage.configure([
... {'name': 'home', 'type': 'local', 'path': '/home/user'},
... {'name': 'uploads', 'type': 's3', 'bucket': 'my-bucket'}
... ])
>>>
>>> # Access files
>>> node = storage.node('home:documents/report.pdf')
>>> content = node.read_text()
"""
[docs]
def __init__(self):
"""Initialize a new StorageManager with no configured mounts.
After initialization, you must call ``configure()`` to set up
mount points before you can access any files.
Examples:
>>> from genro_storage import StorageManager
>>> storage = StorageManager()
"""
# Dictionary mapping mount names to backend instances
self._mounts: dict[str, Any] = {}
[docs]
@apiready
def add_mount(
self,
config: Annotated[
dict[str, Any],
"Mount configuration dictionary"
]
) -> None:
"""Add or update a single mount point.
If a mount with the same name already exists, it will be replaced.
Args:
config: Mount configuration dictionary with 'name' and 'type' fields
Raises:
StorageConfigError: If configuration is invalid
Examples:
>>> storage.add_mount({
... 'name': 'uploads',
... 'type': 's3',
... 'bucket': 'my-bucket'
... })
"""
self._configure_mount(config)
[docs]
@apiready
def delete_mount(
self,
name: Annotated[str, "Mount point name to delete"]
) -> None:
"""Delete a mount point.
Args:
name: Name of the mount point to remove
Raises:
KeyError: If mount point doesn't exist
Examples:
>>> storage.delete_mount('uploads')
"""
if name not in self._mounts:
raise KeyError(f"Mount point '{name}' not found")
del self._mounts[name]
def _load_config_file(self, filepath: str) -> list[dict[str, Any]]:
"""Load configuration from YAML or JSON file.
Args:
filepath: Path to configuration file
Returns:
list[dict]: List of mount configurations
Raises:
FileNotFoundError: If file doesn't exist
StorageConfigError: If file format is invalid
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Configuration file not found: {filepath}")
# Determine format from extension
suffix = path.suffix.lower()
try:
with open(path, 'r') as f:
if suffix in ('.yaml', '.yml'):
if not HAS_YAML:
raise StorageConfigError(
"YAML support not available. Install PyYAML: pip install PyYAML"
)
config = yaml.safe_load(f)
elif suffix == '.json':
config = json.load(f)
else:
raise StorageConfigError(
f"Unsupported configuration file format: {suffix}. "
f"Use .yaml, .yml, or .json"
)
except Exception as e:
if HAS_YAML and isinstance(e, yaml.YAMLError):
raise StorageConfigError(f"Failed to parse YAML file: {e}")
elif isinstance(e, json.JSONDecodeError):
raise StorageConfigError(f"Failed to parse JSON file: {e}")
else:
raise
if not isinstance(config, list):
raise StorageConfigError(
f"Configuration must be a list of mount configs, got {type(config).__name__}"
)
return config
def _configure_mount(self, config: dict[str, Any]) -> None:
"""Configure a single mount point.
Args:
config: Mount configuration dictionary
Raises:
StorageConfigError: If configuration is invalid
"""
# Validate required fields
if 'name' not in config:
raise StorageConfigError("Mount configuration missing required field: 'name'")
mount_name = config['name']
# Check for relative mount (child mount referencing a parent)
# Only check if path is a string (not callable)
if 'path' in config and isinstance(config['path'], str) and ':' in config['path']:
self._configure_relative_mount(mount_name, config)
return
if 'type' not in config:
raise StorageConfigError(
f"Mount configuration for '{config['name']}' missing required field: 'type'"
)
backend_type = config['type']
# Create appropriate backend
if backend_type == 'local':
if 'path' not in config:
raise StorageConfigError(
f"Local storage '{mount_name}' missing required field: 'path'"
)
# LocalStorage supports both string paths and callables
backend = LocalStorage(path=config['path'])
elif backend_type == 'memory':
backend = FsspecBackend('memory', base_path=config.get('base_path', ''))
elif backend_type == 's3':
if 'bucket' not in config:
raise StorageConfigError(
f"S3 storage '{mount_name}' missing required field: 'bucket'"
)
# Build S3 path: bucket/prefix
base_path = config['bucket']
if 'prefix' in config:
base_path = f"{base_path}/{config['prefix'].strip('/')}"
kwargs = {}
if 'region' in config:
kwargs['client_kwargs'] = {'region_name': config['region']}
if 'anon' in config:
kwargs['anon'] = config['anon']
if 'key' in config:
kwargs['key'] = config['key']
if 'secret' in config:
kwargs['secret'] = config['secret']
if 'endpoint_url' in config:
kwargs['endpoint_url'] = config['endpoint_url']
backend = FsspecBackend('s3', base_path=base_path, **kwargs)
elif backend_type == 'gcs':
if 'bucket' not in config:
raise StorageConfigError(
f"GCS storage '{mount_name}' missing required field: 'bucket'"
)
base_path = config['bucket']
if 'prefix' in config:
base_path = f"{base_path}/{config['prefix'].strip('/')}"
kwargs = {}
if 'token' in config:
kwargs['token'] = config['token']
if 'project' in config:
kwargs['project'] = config['project']
if 'endpoint_url' in config:
kwargs['endpoint_url'] = config['endpoint_url']
backend = FsspecBackend('gcs', base_path=base_path, **kwargs)
elif backend_type == 'azure':
if 'container' not in config:
raise StorageConfigError(
f"Azure storage '{mount_name}' missing required field: 'container'"
)
if 'account_name' not in config:
raise StorageConfigError(
f"Azure storage '{mount_name}' missing required field: 'account_name'"
)
# Base path is just the container name for Azure
base_path = config['container']
kwargs = {
'account_name': config['account_name']
}
if 'account_key' in config:
kwargs['account_key'] = config['account_key']
if 'sas_token' in config:
kwargs['sas_token'] = config['sas_token']
if 'connection_string' in config:
kwargs['connection_string'] = config['connection_string']
backend = FsspecBackend('az', base_path=base_path, **kwargs)
elif backend_type == 'http':
if 'base_url' not in config:
raise StorageConfigError(
f"HTTP storage '{mount_name}' missing required field: 'base_url'"
)
backend = FsspecBackend('http', base_path=config['base_url'])
elif backend_type == 'smb':
if 'host' not in config:
raise StorageConfigError(
f"SMB storage '{mount_name}' missing required field: 'host'"
)
if 'share' not in config:
raise StorageConfigError(
f"SMB storage '{mount_name}' missing required field: 'share'"
)
# Build SMB path: /share/path
base_path = f"/{config['share']}"
if 'path' in config:
base_path = f"{base_path}/{config['path'].strip('/')}"
kwargs = {
'host': config['host']
}
if 'username' in config:
kwargs['username'] = config['username']
if 'password' in config:
kwargs['password'] = config['password']
if 'domain' in config:
kwargs['domain'] = config['domain']
if 'port' in config:
kwargs['port'] = config['port']
backend = FsspecBackend('smb', base_path=base_path, **kwargs)
elif backend_type == 'sftp':
if 'host' not in config:
raise StorageConfigError(
f"SFTP storage '{mount_name}' missing required field: 'host'"
)
if 'username' not in config:
raise StorageConfigError(
f"SFTP storage '{mount_name}' missing required field: 'username'"
)
# Build SFTP path: host:/path
base_path = config.get('path', '/')
kwargs = {
'host': config['host'],
'username': config['username']
}
if 'password' in config:
kwargs['password'] = config['password']
if 'port' in config:
kwargs['port'] = config['port']
if 'key_filename' in config:
kwargs['key_filename'] = config['key_filename']
if 'passphrase' in config:
kwargs['passphrase'] = config['passphrase']
if 'timeout' in config:
kwargs['timeout'] = config['timeout']
backend = FsspecBackend('sftp', base_path=base_path, **kwargs)
elif backend_type == 'zip':
if 'file' not in config:
raise StorageConfigError(
f"ZIP storage '{mount_name}' missing required field: 'file'"
)
# ZIP archives use 'fo' parameter for file path
kwargs = {'fo': config['file']}
if 'mode' in config:
kwargs['mode'] = config['mode']
if 'target_protocol' in config:
kwargs['target_protocol'] = config['target_protocol']
if 'target_options' in config:
kwargs['target_options'] = config['target_options']
backend = FsspecBackend('zip', base_path='', **kwargs)
elif backend_type == 'tar':
if 'file' not in config:
raise StorageConfigError(
f"TAR storage '{mount_name}' missing required field: 'file'"
)
# TAR archives use 'fo' parameter for file path
kwargs = {'fo': config['file']}
if 'compression' in config:
kwargs['compression'] = config['compression']
if 'target_protocol' in config:
kwargs['target_protocol'] = config['target_protocol']
if 'target_options' in config:
kwargs['target_options'] = config['target_options']
backend = FsspecBackend('tar', base_path='', **kwargs)
elif backend_type == 'git':
if 'path' not in config:
raise StorageConfigError(
f"Git storage '{mount_name}' missing required field: 'path'"
)
# Git backend accesses local Git repositories
kwargs = {'path': config['path']}
if 'ref' in config:
kwargs['ref'] = config['ref'] # commit, tag, or branch
if 'fo' in config:
kwargs['fo'] = config['fo']
backend = FsspecBackend('git', base_path='', **kwargs)
elif backend_type == 'github':
if 'org' not in config:
raise StorageConfigError(
f"GitHub storage '{mount_name}' missing required field: 'org'"
)
if 'repo' not in config:
raise StorageConfigError(
f"GitHub storage '{mount_name}' missing required field: 'repo'"
)
# GitHub backend accesses remote repositories via API
kwargs = {
'org': config['org'],
'repo': config['repo']
}
if 'sha' in config:
kwargs['sha'] = config['sha'] # commit, branch, or tag
if 'username' in config:
kwargs['username'] = config['username'] # GitHub username for auth
if 'token' in config:
kwargs['token'] = config['token'] # GitHub personal access token
backend = FsspecBackend('github', base_path='', **kwargs)
elif backend_type == 'webdav':
if 'url' not in config:
raise StorageConfigError(
f"WebDAV storage '{mount_name}' missing required field: 'url'"
)
# WebDAV backend for Nextcloud, ownCloud, SharePoint, etc.
kwargs = {
'base_url': config['url']
}
if 'username' in config and 'password' in config:
kwargs['auth'] = (config['username'], config['password'])
if 'token' in config:
kwargs['token'] = config['token']
if 'cert' in config:
kwargs['cert'] = config['cert']
if 'verify_ssl' in config:
kwargs['verify_ssl'] = config['verify_ssl']
backend = FsspecBackend('webdav', base_path='', **kwargs)
elif backend_type == 'libarchive':
if 'file' not in config:
raise StorageConfigError(
f"LibArchive storage '{mount_name}' missing required field: 'file'"
)
# LibArchive backend for universal archive support (7z, rar, iso, etc.)
kwargs = {'fo': config['file']}
if 'target_protocol' in config:
kwargs['target_protocol'] = config['target_protocol']
if 'target_options' in config:
kwargs['target_options'] = config['target_options']
backend = FsspecBackend('libarchive', base_path='', **kwargs)
elif backend_type == 'base64':
# Base64 backend has no configuration parameters
backend = Base64Backend()
else:
raise StorageConfigError(
f"Unknown storage type '{backend_type}' for mount '{mount_name}'. "
f"Supported types: local, s3, gcs, azure, http, memory, base64, smb, sftp, zip, tar, "
f"git, github, webdav, libarchive"
)
# Apply permission restrictions if specified
if 'permissions' in config:
backend = self._apply_permissions(mount_name, backend, config['permissions'])
self._mounts[mount_name] = backend
def _configure_relative_mount(self, mount_name: str, config: dict[str, Any]) -> None:
"""Configure a relative mount point that references a parent mount.
A relative mount is a child mount that inherits the backend type and
configuration from a parent mount, but adds a relative path prefix
and optional permission restrictions.
Args:
mount_name: Name for the new child mount
config: Configuration dict with 'path' and optional 'permissions'
Raises:
StorageConfigError: If path format is invalid or parent mount not found
Examples:
>>> # After configuring parent:
>>> # {'name': 'data', 'type': 's3', 'bucket': 'my-bucket'}
>>> # Configure child with permissions:
>>> # {'name': 'public', 'path': 'data:public', 'permissions': 'readonly'}
>>> # Now 'public:file.txt' can only read from 'data:public/file.txt'
"""
mount_path = config['path']
# Parse parent mount and relative path
if ':' not in mount_path:
raise StorageConfigError(
f"Relative mount path must contain ':' separator (got '{mount_path}')"
)
parent_mount_name, relative_path = mount_path.split(':', 1)
# Validate parent mount exists
if parent_mount_name not in self._mounts:
available = ', '.join(f"'{m}'" for m in self._mounts.keys()) if self._mounts else 'none'
raise StorageConfigError(
f"Parent mount '{parent_mount_name}' not found for relative mount '{mount_name}'. "
f"Available mounts: {available}"
)
# Get parent backend and create relative wrapper
parent_backend = self._mounts[parent_mount_name]
# Get permissions (default: 'delete' = full access)
permissions = config.get('permissions', 'delete')
# Validate permissions value
valid_permissions = ('readonly', 'readwrite', 'delete')
if permissions not in valid_permissions:
raise StorageConfigError(
f"Invalid permissions '{permissions}' for mount '{mount_name}'. "
f"Valid values: {', '.join(valid_permissions)}"
)
relative_backend = RelativeMountBackend(parent_backend, relative_path, permissions)
self._mounts[mount_name] = relative_backend
def _apply_permissions(self, mount_name: str, backend: StorageBackend, permissions: str) -> StorageBackend:
"""Apply permission restrictions to a backend by wrapping with RelativeMountBackend.
Validates that requested permissions are compatible with backend capabilities.
For example, cannot request 'readwrite' on a read-only backend like HTTP.
Args:
mount_name: Name of the mount (for error messages)
backend: The backend to wrap
permissions: Permission level ('readonly', 'readwrite', 'delete')
Returns:
StorageBackend: Original backend wrapped with permission layer
Raises:
StorageConfigError: If permissions are invalid or incompatible with backend
"""
# Validate permissions value
valid_permissions = ('readonly', 'readwrite', 'delete')
if permissions not in valid_permissions:
raise StorageConfigError(
f"Invalid permissions '{permissions}' for mount '{mount_name}'. "
f"Valid values: {', '.join(valid_permissions)}"
)
# Check compatibility with backend capabilities
caps = backend.capabilities
# If backend is readonly, can only request 'readonly' permission
if caps.readonly and permissions in ('readwrite', 'delete'):
raise StorageConfigError(
f"Cannot configure mount '{mount_name}' with '{permissions}' permission. "
f"Backend type is read-only (supports only 'readonly' permission)"
)
# If backend doesn't support write, cannot request write/delete
if not caps.write and permissions in ('readwrite', 'delete'):
raise StorageConfigError(
f"Cannot configure mount '{mount_name}' with '{permissions}' permission. "
f"Backend does not support write operations"
)
# If backend doesn't support delete, cannot request delete permission
if not caps.delete and permissions == 'delete':
raise StorageConfigError(
f"Cannot configure mount '{mount_name}' with 'delete' permission. "
f"Backend does not support delete operations"
)
# Wrap backend with permission layer (empty relative path = no prefix)
return RelativeMountBackend(backend, relative_path='', permissions=permissions)
[docs]
@apiready
def node(
self,
mount_or_path: Annotated[
str | None,
"Mount name or full path (mount:path format), or None for dummy node"
] = None,
*path_parts: str,
version: Annotated[
int | str | None,
"Optional version: int for index (-1=latest), str for version_id"
] = None
) -> StorageNode:
"""Create a StorageNode pointing to a file or directory.
This is the primary way to access files and directories. The path
uses a mount:path format where the mount name refers to a configured
storage backend.
When called without arguments, creates a dummy/accumulator node that
can be used to build content from multiple sources.
Args:
mount_or_path: Either:
- Full path with mount: "mount:path/to/file"
- Just mount name: "mount"
- None: creates a dummy accumulator node (no storage backend)
*path_parts: Additional path components to join
version: Optional version specifier for versioned storage (S3, GCS).
If specified, creates a read-only snapshot node of that version.
Can be int (index: -1=latest, -2=previous) or str (version_id).
Returns:
StorageNode: A new StorageNode instance
Raises:
KeyError: If mount point doesn't exist (wrapped as StorageNotFoundError)
ValueError: If path format is invalid
Path Normalization:
- Multiple slashes collapsed: "a//b" → "a/b"
- Leading/trailing slashes stripped
- No support for ".." (parent directory) - raises ValueError
Examples:
**Full path in one string:**
>>> node = storage.node('home:documents/report.pdf')
**Mount + path parts:**
>>> node = storage.node('home', 'documents', 'report.pdf')
**Mix styles:**
>>> node = storage.node('home:documents', 'reports', 'q4.pdf')
**Dynamic composition:**
>>> user_id = '123'
>>> year = '2024'
>>> node = storage.node('uploads', 'users', user_id, year, 'avatar.jpg')
>>> # Result: uploads:users/123/2024/avatar.jpg
**Just mount (root of storage):**
>>> node = storage.node('home')
>>> # Result: home:
**Path with special characters:**
>>> # Spaces and unicode are OK
>>> node = storage.node('home:My Documents/Café Menu.pdf')
**Invalid paths (will raise ValueError):**
>>> # Parent directory traversal not allowed
>>> node = storage.node('home:documents/../etc/passwd') # ValueError
**Dummy node (accumulator):**
>>> dummy = storage.node() # No parameters
>>> dummy.append(node1)
>>> dummy.extend(node2, node3)
>>> dummy.read_text() # Concatenates all sources
"""
# Create dummy node if no mount specified
if mount_or_path is None:
return StorageNode(self, None, None, version=version)
# Parse mount and path
if ':' in mount_or_path:
# Format: "mount:path"
mount_name, initial_path = mount_or_path.split(':', 1)
path_components = [initial_path] if initial_path else []
else:
# Just mount name
mount_name = mount_or_path
path_components = []
# Add additional path parts
path_components.extend(path_parts)
# Check if mount exists
if mount_name not in self._mounts:
raise StorageNotFoundError(
f"Mount point '{mount_name}' not found. "
f"Available mounts: {', '.join(self._mounts.keys())}"
)
# Join and normalize path
path = '/'.join(path_components)
path = self._normalize_path(path)
# Create and return node
return StorageNode(self, mount_name, path, version=version)
[docs]
def iternode(self, *nodes) -> StorageNode:
"""Create a virtual node that concatenates multiple nodes lazily.
This creates a virtual node (no physical storage) that accumulates
references to other nodes. Content is only read when materialized
via read_text(), read_bytes(), copy(), or zip().
Args:
*nodes: StorageNode instances to concatenate
Returns:
StorageNode: Virtual node with concatenation capability
Examples:
>>> # Create from existing nodes
>>> n1 = storage.node('mem:part1.txt')
>>> n2 = storage.node('mem:part2.txt')
>>> combined = storage.iternode(n1, n2)
>>>
>>> # Read concatenated content
>>> content = combined.read_text()
>>>
>>> # Add more nodes
>>> n3 = storage.node('mem:part3.txt')
>>> combined.append(n3)
>>>
>>> # Save to file
>>> result = storage.node('mem:result.txt')
>>> combined.copy(result)
>>>
>>> # Create ZIP
>>> zip_bytes = combined.zip()
"""
from .node import StorageNode
node = StorageNode(self, None, None, version=None)
node._is_virtual = True
node._virtual_type = 'iter'
node._sources = list(nodes)
return node
[docs]
def diffnode(self, node1: StorageNode, node2: StorageNode) -> StorageNode:
"""Create a virtual node that generates a diff between two nodes.
This creates a virtual node that generates a unified diff between
two text files. The diff is only computed when materialized via
read_text() or copy().
Args:
node1: First node (old version)
node2: Second node (new version)
Returns:
StorageNode: Virtual node with diff capability
Raises:
ValueError: If nodes contain binary data
Examples:
>>> # Compare two versions
>>> v1 = storage.node('mem:config_v1.txt')
>>> v2 = storage.node('mem:config_v2.txt')
>>> diff = storage.diffnode(v1, v2)
>>>
>>> # Read diff
>>> changes = diff.read_text()
>>>
>>> # Save diff to file
>>> diff_file = storage.node('mem:changes.diff')
>>> diff.copy(diff_file)
"""
from .node import StorageNode
node = StorageNode(self, None, None, version=None)
node._is_virtual = True
node._virtual_type = 'diff'
node._sources = [node1, node2]
return node
def _normalize_path(self, path: str) -> str:
"""Normalize a path.
Args:
path: Path to normalize
Returns:
str: Normalized path
Raises:
ValueError: If path contains invalid components (e.g., "..")
"""
if not path:
return ''
# Check for parent directory traversal
if '..' in path.split('/'):
raise ValueError("Parent directory traversal (..) is not supported")
# Normalize: strip leading/trailing slashes, collapse multiple slashes
parts = [p for p in path.split('/') if p]
return '/'.join(parts)
[docs]
@apiready
def get_mount_names(self) -> Annotated[list[str], "List of configured mount point names"]:
"""Get list of configured mount names.
Returns:
list[str]: List of mount point names
Examples:
>>> storage.configure([
... {'name': 'home', 'type': 'local', 'path': '/home/user'},
... {'name': 'uploads', 'type': 's3', 'bucket': 'my-bucket'}
... ])
>>> print(storage.get_mount_names())
['home', 'uploads']
"""
return list(self._mounts.keys())
[docs]
@apiready
def has_mount(
self,
name: Annotated[str, "Mount point name to check"]
) -> Annotated[bool, "True if mount exists"]:
"""Check if a mount point is configured.
Args:
name: Mount point name to check
Returns:
bool: True if mount exists
Examples:
>>> if storage.has_mount('uploads'):
... node = storage.node('uploads:file.txt')
... else:
... print("Uploads storage not configured")
"""
return name in self._mounts
[docs]
def __repr__(self) -> str:
"""String representation for debugging."""
mount_names = ', '.join(self._mounts.keys()) if self._mounts else 'none'
return f"StorageManager(mounts=[{mount_names}])"