# Copyright (c) 2025 Softwell Srl, Milano, Italy
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""StorageNode - Represents a file or directory in a storage backend.
This module provides the StorageNode class which is the main interface for
interacting with files and directories across different storage backends.
"""
from __future__ import annotations
from typing import BinaryIO, TextIO, TYPE_CHECKING, Callable, Literal, Annotated
from pathlib import PurePosixPath
from enum import Enum
from datetime import datetime
# TODO: Replace with genro_core.decorators.api.apiready when available
from .decorators import apiready
if TYPE_CHECKING:
from .manager import StorageManager
class SkipStrategy(str, Enum):
"""Strategy for skipping files during copy operations.
Attributes:
NEVER: Always copy (overwrite existing files)
EXISTS: Skip if destination file exists (fastest)
SIZE: Skip if destination exists and has same size (fast)
HASH: Skip if destination exists and has same content/MD5 (accurate)
CUSTOM: Use custom skip function provided by user
"""
NEVER = 'never'
EXISTS = 'exists'
SIZE = 'size'
HASH = 'hash'
CUSTOM = 'custom'
[docs]
@apiready(path="/storage/nodes")
class StorageNode:
"""Represents a file or directory in a storage backend.
StorageNode provides a unified interface for file operations across
different storage backends (local, S3, GCS, Azure, HTTP, etc.).
Note:
Users should not instantiate StorageNode directly. Use
``StorageManager.node()`` instead.
The node can represent either a file or a directory. Use the properties
``isfile`` and ``isdir`` to determine the type.
Examples:
>>> # Get a node via StorageManager
>>> node = storage.node('home:documents/report.pdf')
>>>
>>> # Check if it exists
>>> if node.exists:
... print(f"File size: {node.size} bytes")
>>>
>>> # Read content
>>> content = node.read_text()
>>>
>>> # Write content
>>> node.write_text("Hello World")
Attributes:
fullpath: Full path including mount point (e.g., "home:documents/file.txt")
exists: True if file or directory exists
isfile: True if node points to a file
isdir: True if node points to a directory
size: File size in bytes
mtime: Last modification time as Unix timestamp
basename: Filename with extension
stem: Filename without extension
suffix: File extension including dot
parent: Parent directory as StorageNode
"""
[docs]
def __init__(self, manager: StorageManager, mount_name: str | None, path: str | None, version: int | str | None = None):
"""Initialize a StorageNode.
Args:
manager: The StorageManager instance that owns this node
mount_name: Name of the mount point (e.g., "home", "uploads"), or None for dummy node
path: Relative path within the mount (e.g., "documents/file.txt"), or None for dummy node
version: Optional version specifier for versioned storage.
If set, the node becomes a read-only snapshot of that version.
Note:
This should not be called directly. Use ``StorageManager.node()`` instead.
"""
self._manager = manager
self._mount_name = mount_name
self._path = path
self._version = version # None = current version, int/str = specific version
self._posix_path = PurePosixPath(path) if path else PurePosixPath('.')
# Virtual node support (set by iternode()/diffnode())
self._is_virtual = False
self._virtual_type = None # 'iter' or 'diff'
self._sources: list[StorageNode] = [] # For virtual nodes
# Get backend from manager (None for virtual nodes)
self._backend = manager._mounts[mount_name] if mount_name else None
# ==================== Properties ====================
@property
def fullpath(self) -> str:
"""Full path including mount point.
Returns:
str: Full path in format "mount:path/to/file"
Examples:
>>> node = storage.node('home:documents/report.pdf')
>>> print(node.fullpath)
'home:documents/report.pdf'
"""
if self._path:
return f"{self._mount_name}:{self._path}"
return f"{self._mount_name}:"
@property
def path(self) -> str:
"""Relative path within the mount.
Returns:
str: Path relative to mount point (without mount prefix)
Examples:
>>> node = storage.node('home:documents/report.pdf')
>>> print(node.path)
'documents/report.pdf'
>>> # For base64 backend, this is the base64-encoded content
>>> node = storage.node('b64:SGVsbG8=')
>>> print(node.path)
'SGVsbG8='
"""
return self._path
@property
def exists(self) -> bool:
"""True if file or directory exists.
Returns:
bool: True if the file or directory exists on the storage backend.
Virtual nodes always return False.
Examples:
>>> if node.exists:
... print("File exists!")
... else:
... print("File not found")
"""
# Virtual nodes don't have physical storage
if self._is_virtual:
return False
return self._backend.exists(self._path)
@property
def isfile(self) -> bool:
"""True if node points to a file.
Returns:
bool: True if this node is a file, False if directory or doesn't exist
Examples:
>>> if node.isfile:
... data = node._read_bytes()
"""
return self._backend.is_file(self._path)
@property
def isdir(self) -> bool:
"""True if node points to a directory.
Returns:
bool: True if this node is a directory, False if file or doesn't exist
Examples:
>>> if node.isdir:
... for child in node.children():
... print(child.basename)
"""
return self._backend.is_dir(self._path)
@property
def size(self) -> int:
"""File size in bytes.
Returns:
int: Size of the file in bytes
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If node is a directory (directories don't have size)
Examples:
>>> print(f"File size: {node.size} bytes")
>>> print(f"File size: {node.size / 1024:.1f} KB")
"""
return self._backend.size(self._path)
@property
def mtime(self) -> float:
"""Last modification time as Unix timestamp.
Returns:
float: Unix timestamp of last modification time
Examples:
>>> from datetime import datetime
>>> mod_time = datetime.fromtimestamp(node.mtime)
>>> print(f"Modified: {mod_time}")
"""
return self._backend.mtime(self._path)
@property
def basename(self) -> str:
"""Filename with extension.
Returns:
str: The filename including extension
Examples:
>>> node = storage.node('home:documents/report.pdf')
>>> print(node.basename)
'report.pdf'
"""
return self._posix_path.name
@property
def stem(self) -> str:
"""Filename without extension.
Returns:
str: The filename without extension
Examples:
>>> node = storage.node('home:documents/report.pdf')
>>> print(node.stem)
'report'
"""
return self._posix_path.stem
@property
def suffix(self) -> str:
"""File extension including dot.
Returns:
str: The file extension including the leading dot (e.g., ".pdf")
Examples:
>>> node = storage.node('home:documents/report.pdf')
>>> print(node.suffix)
'.pdf'
"""
return self._posix_path.suffix
@property
def parent(self) -> StorageNode:
"""Parent directory as StorageNode.
Returns:
StorageNode: A new StorageNode pointing to the parent directory
Examples:
>>> node = storage.node('home:documents/reports/q4.pdf')
>>> parent = node.parent
>>> print(parent.fullpath)
'home:documents/reports'
"""
parent_path = str(self._posix_path.parent)
if parent_path == '.':
parent_path = ''
return self._create_node(self._manager, self._mount_name, parent_path)
@property
def dirname(self) -> str:
"""Parent directory fullpath as string.
Convenience property that returns the fullpath of the parent directory
as a string, equivalent to ``parent.fullpath``.
Returns:
str: Parent directory fullpath (e.g., 'home:documents/reports')
Examples:
>>> node = storage.node('home:documents/reports/q4.pdf')
>>> print(node.dirname)
'home:documents/reports'
>>>
>>> # Compare with parent property
>>> print(node.parent.fullpath)
'home:documents/reports'
>>> # dirname is a shortcut for the above
"""
return self.parent.fullpath
@property
def ext(self) -> str:
"""File extension without leading dot.
Convenience property for getting the file extension without the dot prefix,
which is more convenient for comparisons and type checking than ``suffix``.
Returns:
str: Extension without dot (e.g., 'pdf', 'txt'), or empty string if no extension
Examples:
>>> node = storage.node('home:documents/report.pdf')
>>> print(node.ext)
'pdf'
>>> print(node.suffix) # Compare with suffix
'.pdf'
>>>
>>> # More convenient for comparisons
>>> if node.ext == 'pdf':
... process_pdf(node)
>>>
>>> # Instead of remembering the dot
>>> if node.suffix == '.pdf':
... process_pdf(node)
"""
return self.suffix.lstrip('.') if self.suffix else ''
[docs]
def splitext(self) -> tuple[str, str]:
"""Split path into filename and extension.
Similar to ``os.path.splitext()``, returns a tuple of (filename, extension).
The extension includes the leading dot. The filename includes the full path
without the extension.
Returns:
tuple[str, str]: (filename, extension) where extension includes the dot
Examples:
>>> node = storage.node('home:documents/report.pdf')
>>> name, ext = node.splitext()
>>> print(name)
'documents/report'
>>> print(ext)
'.pdf'
>>>
>>> # Useful for renaming with different extension
>>> name, _ = node.splitext()
>>> new_path = f'{name}.docx'
>>> new_node = storage.node(f'home:{new_path}')
"""
if self.suffix:
name = self._path.rsplit(self.suffix, 1)[0]
return name, self.suffix
return self._path, ''
@property
def ext_attributes(self) -> tuple[float | None, int | None, bool]:
"""Commonly-used file attributes as a tuple.
Convenience property for getting (mtime, size, isdir) together in one call.
Returns None values if file doesn't exist. Size is None for directories.
Returns:
tuple: (mtime, size, isdir) where:
- mtime: Modification time as Unix timestamp or None
- size: File size in bytes or None (None for directories)
- isdir: True if directory, False otherwise
Examples:
>>> node = storage.node('home:document.pdf')
>>> mtime, size, isdir = node.ext_attributes
>>> if mtime and size:
... print(f'File: {size} bytes, modified at {mtime}')
>>>
>>> # More concise than
>>> mtime = node.mtime
>>> size = node.size
>>> isdir = node.isdir
"""
if not self.exists:
return None, None, False
is_directory = self.isdir
file_size = None if is_directory else self.size
return self.mtime, file_size, is_directory
@property
def md5hash(self) -> str:
"""MD5 hash of file content.
For cloud storage (S3, GCS, Azure), retrieves hash from metadata (fast).
For local storage, computes hash by reading file in blocks (slower).
Returns:
str: MD5 hash as lowercase hexadecimal string (32 characters)
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If node is a directory
Examples:
>>> hash1 = node1.md5hash
>>> hash2 = node2.md5hash
>>> if hash1 == hash2:
... print("Files have identical content")
"""
# Check if exists first
if not self.exists:
raise FileNotFoundError(f"File not found: {self.fullpath}")
# Check if it's a file (not a directory)
if not self.isfile:
raise ValueError(f"Cannot compute hash of directory: {self.fullpath}")
# Try to get hash from backend metadata first (S3 ETag, etc.)
metadata_hash = self._backend.get_hash(self._path)
if metadata_hash:
return metadata_hash.lower()
# Fallback: compute MD5 by reading file in blocks
import hashlib
hasher = hashlib.md5()
# Use 64KB blocks like Genropy legacy code
BLOCKSIZE = 65536
with self.open('rb') as f:
while True:
chunk = f.read(BLOCKSIZE)
if not chunk:
break
hasher.update(chunk)
return hasher.hexdigest()
@property
def mimetype(self) -> str:
"""Get MIME type from file extension.
Uses Python's mimetypes module to guess the MIME type based on
the file extension. Returns 'application/octet-stream' if type
cannot be determined.
Returns:
str: MIME type string (e.g., 'image/png', 'application/pdf')
Examples:
>>> jpg = storage.node('photos:image.jpg')
>>> jpg.mimetype
'image/jpeg'
>>>
>>> pdf = storage.node('documents:report.pdf')
>>> pdf.mimetype
'application/pdf'
>>>
>>> # Use for HTTP responses
>>> response.headers['Content-Type'] = node.mimetype
"""
import mimetypes
mime, _ = mimetypes.guess_type(self.path)
return mime or 'application/octet-stream'
@property
def capabilities(self):
"""Get capabilities of underlying backend.
Returns backend capabilities which describe what features are supported,
such as versioning, metadata, presigned URLs, etc.
If this node is a versioned snapshot (created with version parameter),
the versioning capabilities are disabled since the node is read-only.
Returns:
BackendCapabilities: Object describing supported features
Examples:
>>> if node.capabilities.versioning:
... versions = node.versions
>>> if node.capabilities.presigned_urls:
... url = node.get_presigned_url()
"""
caps = self._backend.capabilities
# If node has a fixed version, it's a snapshot and loses versioning capabilities
if self._version is not None:
from dataclasses import replace
caps = replace(caps,
versioning=False,
version_listing=False,
version_access=False)
return caps
# ==================== File I/O Methods ====================
[docs]
def open(self,
mode: str = 'r',
version: int | str | None = None,
as_of: datetime | None = None) -> BinaryIO | TextIO:
"""Open file with optional version control support.
Args:
mode: File mode ('r', 'rb', 'w', 'wb', 'a', 'ab')
version: Version to open:
- None: Latest version (default)
- str: Specific version_id (e.g., 'abc123...')
- int: Version index with negative indexing support:
- -1: Latest version
- -2: Previous version
- 0: Oldest version
- 1: Second oldest version
as_of: Open file as it was at this datetime
Returns:
BinaryIO | TextIO: File-like object (context manager)
Raises:
ValueError: If both version and as_of provided, or invalid mode for historical versions
IndexError: If version index out of range
FileNotFoundError: If no version found for as_of date
PermissionError: If backend doesn't support versioning
Examples:
>>> # Latest version
>>> with node.open() as f:
... data = f.read()
>>> # Previous version (pythonic!)
>>> with node.open(version=-2) as f:
... previous = f.read()
>>> # Specific version by ID
>>> with node.open(version='abc123xyz') as f:
... old_content = f.read()
>>> # Version at date
>>> from datetime import datetime
>>> with node.open(as_of=datetime(2024, 1, 15)) as f:
... historical = f.read()
"""
# If node has a fixed version, cannot specify another version
if self._version is not None and (version is not None or as_of is not None):
raise ValueError(
"This node is a versioned snapshot. "
"Cannot specify version parameter on an already-versioned node."
)
# If node has a fixed version, it's read-only
if self._version is not None and mode in ('w', 'wb', 'a', 'ab'):
raise ValueError(
"Cannot write to versioned snapshot. "
"Create a new node without version parameter to write."
)
# Use node's version if set, otherwise use parameter
effective_version = self._version if self._version is not None else version
# Validazione parametri
if version is not None and as_of is not None:
raise ValueError(
"Cannot specify both version and as_of. "
"Use version for ID/index or as_of for date-based access."
)
# Check versioning capability FIRST
if effective_version is not None or as_of is not None:
# Check backend capabilities (not node capabilities, since we might be using self._version)
if not self._backend.capabilities.versioning:
raise PermissionError(
f"{self._mount_name} backend does not support versioning. "
f"Supported features: {self._list_supported_features()}"
)
# Accesso per data
if as_of is not None:
if 'w' in mode or 'a' in mode or '+' in mode:
raise ValueError("Cannot write to historical versions (read-only)")
version_id = self._resolve_version_at_date(as_of)
if not version_id:
raise FileNotFoundError(
f"No version found before {as_of} for {self.fullpath}"
)
return self._backend.open_version(self._path, version_id, mode)
# Accesso per version
if effective_version is not None:
if 'w' in mode or 'a' in mode or '+' in mode:
raise ValueError("Cannot write to historical versions (read-only)")
# Se è un intero, risolvi l'indice
if isinstance(effective_version, int):
version_id = self._resolve_version_index(effective_version)
else:
# È già un version_id stringa
version_id = effective_version
return self._backend.open_version(self._path, version_id, mode)
# Accesso normale (latest)
return self._backend.open(self._path, mode)
def _read_bytes(self) -> bytes:
"""Internal method: Read entire file as bytes.
If node has a fixed version, reads that version.
For virtual nodes, materializes content.
"""
# Virtual node: materialize content
if self._is_virtual:
if self._virtual_type == 'iter':
# Concatenate all sources as bytes
return b''.join(node._read_bytes() for node in self._sources)
elif self._virtual_type == 'diff':
# Diff as bytes (encode UTF-8)
return self._read_text().encode('utf-8')
else:
raise ValueError(f"Unknown virtual type: {self._virtual_type}")
# Versioned node
if self._version is not None:
with self.open(mode='rb') as f:
return f.read()
# Normal node
return self._backend.read_bytes(self._path)
def _read_text(self, encoding: str = 'utf-8') -> str:
"""Internal method: Read entire file as string.
If node has a fixed version, reads that version.
For virtual nodes, materializes content.
"""
# Virtual node: materialize content
if self._is_virtual:
if self._virtual_type == 'iter':
# Concatenate all sources as text
return ''.join(node._read_text(encoding) for node in self._sources)
elif self._virtual_type == 'diff':
# Generate unified diff
if len(self._sources) != 2:
raise ValueError("diffnode requires exactly 2 nodes")
node1, node2 = self._sources
# Check if binary by reading bytes first
bytes1 = node1._read_bytes()
bytes2 = node2._read_bytes()
# Check for null bytes (binary indicator)
if b'\x00' in bytes1 or b'\x00' in bytes2:
raise ValueError("Cannot diff binary files")
# Try to decode
try:
text1 = bytes1.decode(encoding)
text2 = bytes2.decode(encoding)
except UnicodeDecodeError:
raise ValueError("Cannot diff binary files")
# Generate unified diff
import difflib
lines1 = text1.splitlines(keepends=True)
lines2 = text2.splitlines(keepends=True)
diff_lines = difflib.unified_diff(
lines1, lines2,
fromfile=node1.fullpath or 'file1',
tofile=node2.fullpath or 'file2'
)
return ''.join(diff_lines)
else:
raise ValueError(f"Unknown virtual type: {self._virtual_type}")
# Versioned node
if self._version is not None:
with self.open(mode='r') as f:
content = f.read()
# If we got bytes, decode them
if isinstance(content, bytes):
return content.decode(encoding)
return content
# Normal node
return self._backend.read_text(self._path, encoding)
[docs]
@apiready
def read(
self,
mode: Annotated[str, "Read mode: 'r' for text, 'rb' for binary"] = 'r',
encoding: Annotated[str, "Text encoding (only for text mode)"] = 'utf-8'
) -> Annotated[str | bytes, "File content as text or bytes"]:
"""Read file content in text or binary mode.
Args:
mode: Read mode - 'r' for text (default), 'rb' for binary
encoding: Text encoding (used only for text mode)
Returns:
str | bytes: File content as text or bytes depending on mode
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If mode is invalid
Examples:
>>> # Read as text (default)
>>> content = node.read()
>>> content = node.read(mode='r')
>>>
>>> # Read as binary
>>> data = node.read(mode='rb')
"""
if mode == 'r':
return self._read_text(encoding)
elif mode == 'rb':
return self._read_bytes()
else:
raise ValueError(f"Invalid read mode '{mode}'. Use 'r' for text or 'rb' for binary")
def _write_bytes(self, data: bytes, skip_if_unchanged: bool = False) -> bool:
"""Internal method: Write bytes to file.
Args:
data: Bytes to write
skip_if_unchanged: If True, skip writing if content identical to current version.
Uses MD5 hash comparison with existing file's ETag (S3) or computed hash.
Returns:
bool: True if written, False if skipped (only when skip_if_unchanged=True)
Raises:
ValueError: If node is a versioned snapshot (read-only)
Note:
For base64 backend, this updates the node's path to the new base64-encoded content.
Examples:
>>> # Simple write
>>> node.write_bytes(b'Hello World')
True
>>> # Skip if unchanged (efficient for versioned storage)
>>> written = node.write_bytes(data, skip_if_unchanged=True)
>>> if written:
... print("File updated")
... else:
... print("Content unchanged, skipped")
"""
# Cannot write to virtual nodes
if self._is_virtual:
raise ValueError(
"Cannot write to virtual node (no path). "
"Virtual nodes are read-only."
)
# Cannot write to versioned snapshots
if self._version is not None:
raise ValueError(
"Cannot write to versioned snapshot. "
"Create a new node without version parameter to write."
)
# Check if we should skip
if skip_if_unchanged:
import hashlib
# Try to compare with existing content
if self.capabilities.versioning and self.exists:
# Calculate MD5 of new content
new_md5 = hashlib.md5(data).hexdigest()
# Get latest version ETag
versions = self.versions
if versions:
# Find latest version
latest = next((v for v in versions if v.get('is_latest')), versions[0])
current_etag = latest.get('etag', '')
# Compare (S3 ETag is MD5 for simple uploads)
if current_etag and new_md5 == current_etag:
return False # Skip: content identical
elif self.exists:
# Non-versioned backend: compare with current content
try:
current_data = self._read_bytes()
if current_data == data:
return False # Skip: content identical
except Exception:
pass # If we can't read, write anyway
# Write the data
result = self._backend.write_bytes(self._path, data)
# If backend returns a new path (e.g., base64), update it
if result is not None:
self._path = result
self._posix_path = PurePosixPath(result) if result else PurePosixPath('.')
return True
def _write_text(self, text: str, encoding: str = 'utf-8', skip_if_unchanged: bool = False) -> bool:
"""Internal method: Write string to file.
Args:
text: String to write
encoding: Text encoding (default: 'utf-8')
skip_if_unchanged: If True, skip writing if content identical to current version.
Uses MD5 hash comparison with existing file's ETag (S3) or computed hash.
Returns:
bool: True if written, False if skipped (only when skip_if_unchanged=True)
Note:
For base64 backend, this updates the node's path to the new base64-encoded content.
Examples:
>>> # Simple write
>>> node.write_text('Hello World')
True
>>> # Skip if unchanged
>>> written = node.write_text(content, skip_if_unchanged=True)
>>> if not written:
... print("Content unchanged, skipped")
"""
return self._write_bytes(text.encode(encoding), skip_if_unchanged=skip_if_unchanged)
[docs]
@apiready
def write(
self,
data: Annotated[str | bytes, "Data to write (str for text, bytes for binary)"],
mode: Annotated[str, "Write mode: 'w' for text, 'wb' for binary"] = 'w',
encoding: Annotated[str, "Text encoding (only for text mode)"] = 'utf-8',
skip_if_unchanged: Annotated[bool, "Skip writing if content is identical"] = False
) -> Annotated[bool, "True if written, False if skipped"]:
"""Write data to file in text or binary mode.
Args:
data: Data to write (str for text mode, bytes for binary mode)
mode: Write mode - 'w' for text (default), 'wb' for binary
encoding: Text encoding (used only for text mode)
skip_if_unchanged: If True, skip writing if content identical
Returns:
bool: True if written, False if skipped
Raises:
TypeError: If data type doesn't match mode
ValueError: If mode is invalid
Examples:
>>> # Write text (default)
>>> node.write('Hello World')
>>> node.write('Hello', mode='w')
>>>
>>> # Write binary
>>> node.write(b'binary data', mode='wb')
>>>
>>> # Skip if unchanged
>>> written = node.write('content', skip_if_unchanged=True)
"""
if mode == 'w':
if not isinstance(data, str):
raise TypeError(f"Text mode 'w' requires str, got {type(data).__name__}")
return self._write_text(data, encoding, skip_if_unchanged)
elif mode == 'wb':
if not isinstance(data, bytes):
raise TypeError(f"Binary mode 'wb' requires bytes, got {type(data).__name__}")
return self._write_bytes(data, skip_if_unchanged)
else:
raise ValueError(f"Invalid write mode '{mode}'. Use 'w' for text or 'wb' for binary")
# ==================== Convenience Methods (Pythonic API) ====================
[docs]
@apiready
def read_text(self, encoding: str = 'utf-8') -> str:
"""Read file content as text.
Convenience method equivalent to read(mode='r', encoding=encoding).
Compatible with pathlib.Path API.
Args:
encoding: Text encoding (default: 'utf-8')
Returns:
str: File content as text
Raises:
FileNotFoundError: If file doesn't exist
Examples:
>>> content = node.read_text()
>>> content = node.read_text(encoding='latin-1')
"""
return self.read(mode='r', encoding=encoding)
[docs]
@apiready
def read_bytes(self) -> bytes:
"""Read file content as bytes.
Convenience method equivalent to read(mode='rb').
Compatible with pathlib.Path API.
Returns:
bytes: File content as bytes
Raises:
FileNotFoundError: If file doesn't exist
Examples:
>>> data = node.read_bytes()
"""
return self.read(mode='rb')
[docs]
@apiready
def write_text(self, text: str, encoding: str = 'utf-8',
skip_if_unchanged: bool = False) -> bool:
"""Write text content to file.
Convenience method equivalent to write(text, mode='w', encoding=encoding, skip_if_unchanged=skip_if_unchanged).
Compatible with pathlib.Path API.
Args:
text: Text content to write
encoding: Text encoding (default: 'utf-8')
skip_if_unchanged: Skip write if content identical (default: False)
Returns:
bool: True if file was written, False if skipped
Raises:
TypeError: If text is not str
ValueError: If node is a versioned snapshot (read-only)
Examples:
>>> node.write_text("Hello World")
>>> node.write_text("Content", encoding='latin-1')
>>> written = node.write_text("New", skip_if_unchanged=True)
"""
return self.write(text, mode='w', encoding=encoding, skip_if_unchanged=skip_if_unchanged)
[docs]
@apiready
def write_bytes(self, data: bytes, skip_if_unchanged: bool = False) -> bool:
"""Write binary content to file.
Convenience method equivalent to write(data, mode='wb', skip_if_unchanged=skip_if_unchanged).
Compatible with pathlib.Path API.
Args:
data: Binary content to write
skip_if_unchanged: Skip write if content identical (default: False)
Returns:
bool: True if file was written, False if skipped
Raises:
TypeError: If data is not bytes
ValueError: If node is a versioned snapshot (read-only)
Examples:
>>> node.write_bytes(b"Binary data")
>>> written = node.write_bytes(data, skip_if_unchanged=True)
"""
return self.write(data, mode='wb', skip_if_unchanged=skip_if_unchanged)
# ==================== File Operations ====================
[docs]
@apiready
def delete(self) -> None:
"""Delete file or directory."""
self._backend.delete(self._path, recursive=True)
def _should_skip_file(self, dest: StorageNode,
skip: SkipStrategy | str,
skip_fn: Callable[[StorageNode, StorageNode], bool] | None) -> tuple[bool, str]:
"""Determine if file should be skipped during copy.
Args:
dest: Destination node
skip: Skip strategy to use
skip_fn: Custom skip function (required if skip='custom')
Returns:
Tuple of (should_skip: bool, reason: str)
"""
# Never skip if destination doesn't exist
if not dest.exists:
return (False, '')
# Check skip strategy
if skip == 'never' or skip == SkipStrategy.NEVER:
return (False, '')
elif skip == 'exists' or skip == SkipStrategy.EXISTS:
return (True, 'destination exists')
elif skip == 'size' or skip == SkipStrategy.SIZE:
try:
if self.size == dest.size:
return (True, f'same size ({self.size} bytes)')
else:
return (False, '')
except Exception:
# If size comparison fails, don't skip
return (False, '')
elif skip == 'hash' or skip == SkipStrategy.HASH:
try:
# Use MD5 hash comparison (with cloud metadata optimization)
if self.md5hash == dest.md5hash:
return (True, f'same content (MD5: {self.md5hash[:8]}...)')
else:
return (False, '')
except Exception:
# If hash comparison fails, don't skip
return (False, '')
elif skip == 'custom' or skip == SkipStrategy.CUSTOM:
try:
if skip_fn and skip_fn(self, dest):
return (True, 'custom function returned True')
else:
return (False, '')
except Exception as e:
# If custom function fails, don't skip
return (False, '')
return (False, '')
def _copy_file_with_skip(self, dest: StorageNode,
skip: SkipStrategy | str,
skip_fn: Callable[[StorageNode, StorageNode], bool] | None,
on_file: Callable[[StorageNode], None] | None,
on_skip: Callable[[StorageNode, str], None] | None) -> StorageNode:
"""Copy single file with skip logic.
Args:
dest: Destination node
skip: Skip strategy
skip_fn: Custom skip function
on_file: Callback after file copied
on_skip: Callback when file skipped
Returns:
Destination node
"""
# Check if we should skip
should_skip, reason = self._should_skip_file(dest, skip, skip_fn)
if should_skip:
if on_skip:
on_skip(self, reason)
return dest
# Perform actual copy
new_path = self._backend.copy(self._path, dest._backend, dest._path)
# Update destination path if backend returned new path
if new_path is not None:
dest._path = new_path
dest._posix_path = PurePosixPath(new_path) if new_path else PurePosixPath('.')
# Call on_file callback
if on_file:
on_file(self)
return dest
def _copy_dir_with_skip(self, dest: StorageNode,
skip: SkipStrategy | str,
skip_fn: Callable[[StorageNode, StorageNode], bool] | None,
progress: Callable[[int, int], None] | None,
on_file: Callable[[StorageNode], None] | None,
on_skip: Callable[[StorageNode, str], None] | None,
include_patterns: list[str] | None = None,
exclude_patterns: list[str] | None = None,
filter_fn: Callable[[StorageNode, str], bool] | None = None) -> StorageNode:
"""Copy directory recursively with filtering, skip logic and progress tracking.
Args:
dest: Destination node
skip: Skip strategy
skip_fn: Custom skip function
progress: Progress callback(current, total)
on_file: Callback after each file copied
on_skip: Callback when file skipped
include_patterns: Glob patterns for files to include
exclude_patterns: Glob patterns for files to exclude
filter_fn: Custom filter function(node, relpath) -> bool
Returns:
Destination node
"""
# Create destination directory if needed
if not dest.exists:
dest.mkdir(parents=True, exist_ok=True)
# Collect all files to process (with filtering)
files_to_process = []
def matches_filters(node: StorageNode, relpath: str) -> tuple[bool, str]:
"""Check if file matches include/exclude/filter criteria.
Returns:
tuple[bool, str]: (should_include, reason_if_excluded)
"""
from fnmatch import fnmatch
# If include patterns specified, file must match at least one (whitelist mode)
if include_patterns:
matched = False
for pattern in include_patterns:
if fnmatch(relpath, pattern):
matched = True
break
if not matched:
return False, f"not matching include patterns"
# Check exclude patterns (blacklist)
if exclude_patterns:
for pattern in exclude_patterns:
if fnmatch(relpath, pattern):
return False, f"matching exclude pattern '{pattern}'"
# Apply custom filter function
if filter_fn:
try:
if not filter_fn(node, relpath):
return False, "filtered by custom function"
except Exception as e:
# If filter raises exception, skip the file
return False, f"filter error: {e}"
return True, ""
def collect_files(src_node: StorageNode, dest_node: StorageNode, relpath: str = ""):
"""Recursively collect all files that match filters."""
if src_node.isfile:
# Apply filtering
should_include, reason = matches_filters(src_node, relpath)
if should_include:
files_to_process.append((src_node, dest_node, relpath))
elif on_skip:
# Notify about filtered files
on_skip(src_node, reason)
elif src_node.isdir:
# Ensure destination dir exists
if not dest_node.exists:
dest_node.mkdir(parents=True, exist_ok=True)
# Recurse into children
for child in src_node.children():
child_relpath = f"{relpath}/{child.basename}" if relpath else child.basename
collect_files(child, dest_node.child(child.basename), child_relpath)
collect_files(self, dest)
# Process files with progress tracking
total = len(files_to_process)
for idx, (src, dst, relpath) in enumerate(files_to_process, 1):
# Check skip condition (skip logic is destination-based)
should_skip, reason = src._should_skip_file(dst, skip, skip_fn)
if should_skip:
if on_skip:
on_skip(src, reason)
else:
# Copy file
new_path = src._backend.copy(src._path, dst._backend, dst._path)
# Update destination path if backend returned new path
if new_path is not None:
dst._path = new_path
dst._posix_path = PurePosixPath(new_path) if new_path else PurePosixPath('.')
if on_file:
on_file(src)
# Progress callback
if progress:
progress(idx, total)
return dest
[docs]
def copy_to(self, dest: StorageNode | str,
# Filtering (source-based)
include: str | list[str] | None = None,
exclude: str | list[str] | None = None,
filter: Callable[[StorageNode, str], bool] | None = None,
# Skip logic (destination-based)
skip: SkipStrategy | Literal['never', 'exists', 'size', 'hash', 'custom'] = 'never',
skip_fn: Callable[[StorageNode, StorageNode], bool] | None = None,
# Callbacks
progress: Callable[[int, int], None] | None = None,
on_file: Callable[[StorageNode], None] | None = None,
on_skip: Callable[[StorageNode, str], None] | None = None) -> StorageNode:
"""Copy file or directory to destination with filtering and skip logic.
Supports filtering which files to copy (source-based) and skipping
existing files (destination-based) for efficient incremental backups.
Filtering (applied to source files):
- 'include': Glob patterns for files to include (whitelist)
- 'exclude': Glob patterns for files to exclude (blacklist)
- 'filter': Custom function(node, relpath) -> bool
Skip strategies (applied to destination files):
- 'never': Always copy (overwrite existing files) - default
- 'exists': Skip if destination file exists (fastest)
- 'size': Skip if destination exists and has same size (fast)
- 'hash': Skip if destination exists and has same content/MD5 (accurate)
- 'custom': Use custom skip function
Args:
dest: Destination node or path string
include: Glob pattern(s) for files to include. If specified, only matching
files are copied (whitelist mode). Can be string or list of strings.
exclude: Glob pattern(s) for files to exclude. Applied after include.
Can be string or list of strings.
filter: Custom filter function(node, relative_path) -> bool.
Return True to include file, False to exclude.
Applied after include/exclude patterns.
skip: Skip strategy (default: 'never' = always copy)
skip_fn: Custom skip function(src, dest) -> bool (required if skip='custom')
progress: Callback(current, total) called after each file
on_file: Callback(src_node) called after each file copied
on_skip: Callback(src_node, reason) called when file is skipped
Returns:
Destination StorageNode
Raises:
FileNotFoundError: If source doesn't exist
ValueError: If skip='custom' but no skip_fn provided
Examples:
>>> # Simple copy (overwrite) - default behavior
>>> src.copy(dest)
>>>
>>> # Copy only Python files
>>> src.copy(dest, include='*.py')
>>>
>>> # Copy all except logs and temp files
>>> src.copy(dest, exclude=['*.log', '*.tmp', '__pycache__/**'])
>>>
>>> # Combine include and exclude
>>> src.copy(dest, include='*.py', exclude='test_*.py')
>>>
>>> # Custom filter: only files smaller than 10MB
>>> src.copy(dest, filter=lambda node, path: node.size < 10_000_000)
>>>
>>> # Filter by modification time
>>> from datetime import datetime, timedelta
>>> cutoff = datetime.now() - timedelta(days=7)
>>> src.copy(dest, filter=lambda n, p: n.mtime > cutoff.timestamp())
>>>
>>> # Combine filtering and skip strategy
>>> src.copy(dest,
... include=['*.py', '*.json'],
... exclude='__pycache__/**',
... skip='hash') # Skip if content identical
>>>
>>> # Full-featured backup with tracking
>>> src.copy(dest,
... exclude=['*.log', '*.tmp', 'node_modules/**'],
... filter=lambda n, p: n.size < 100_000_000,
... skip='hash',
... progress=lambda c, t: print(f"{c}/{t}"))
Performance Notes:
- Filtering is applied before copying (saves bandwidth)
- skip='exists': ~1-2ms per file (only existence check)
- skip='size': ~2-5ms per file (existence + size read)
- skip='hash':
* S3/GCS: ~5-10ms per file (ETag from metadata, fast)
* Local: ~100ms per MB (must read file to compute MD5)
For cloud storage, 'hash' is efficient due to ETag metadata.
For local storage, 'size' is usually sufficient.
Note:
- Include/exclude patterns match against relative paths from source
- If copying to base64 backend, destination path will be updated
- Filtering is source-based (which files to copy)
- Skip logic is destination-based (whether to overwrite)
"""
# Convert string to StorageNode if needed
if isinstance(dest, str):
dest = self._manager.node(dest)
# Virtual node: copy materialized content
if self._is_virtual:
# Read content and write to destination
content = self._read_bytes()
dest._write_bytes(content)
return dest
if not self.exists:
raise FileNotFoundError(f"Source not found: {self.fullpath}")
# Validate skip strategy
if skip == 'custom' and skip_fn is None:
raise ValueError("skip='custom' requires skip_fn parameter")
# Normalize include/exclude patterns to lists
include_patterns = []
if include is not None:
include_patterns = [include] if isinstance(include, str) else list(include)
exclude_patterns = []
if exclude is not None:
exclude_patterns = [exclude] if isinstance(exclude, str) else list(exclude)
# Check if we need enhanced copy (with skip/filter/callbacks)
has_filters = bool(include_patterns or exclude_patterns or filter)
needs_enhanced = skip != 'never' or progress or on_file or on_skip or has_filters
if needs_enhanced:
# Single file copy
if self.isfile:
# For single files, filters don't apply (no relative path context)
return self._copy_file_with_skip(dest, skip, skip_fn, on_file, on_skip)
# Directory copy (recursive with filtering)
elif self.isdir:
return self._copy_dir_with_skip(
dest, skip, skip_fn, progress, on_file, on_skip,
include_patterns, exclude_patterns, filter
)
# Simple copy without skip logic (backward compatible)
else:
# Copy via backends
new_path = self._backend.copy(self._path, dest._backend, dest._path)
# If destination backend returned a new path, update dest
if new_path is not None:
dest._path = new_path
dest._posix_path = PurePosixPath(new_path) if new_path else PurePosixPath('.')
return dest
[docs]
def move_to(self, dest: StorageNode | str) -> StorageNode:
"""Move file/directory to destination."""
# Convert string to StorageNode if needed
if isinstance(dest, str):
dest = self._manager.node(dest)
# Copy then delete
self.copy_to(dest)
self.delete()
# Update self to point to new location
self._mount_name = dest._mount_name
self._path = dest._path
self._posix_path = dest._posix_path
self._backend = dest._backend
return self
# ==================== Virtual Node Methods ====================
[docs]
def append(self, node: StorageNode) -> None:
"""Append a node to this virtual node (iternode only).
This method is only available for virtual nodes created with
storage.iternode(). It adds a node reference to the accumulation list.
Content is read lazily when materialized.
Args:
node: StorageNode to append
Raises:
ValueError: If not a virtual iternode
Examples:
>>> iternode = storage.iternode()
>>> n1 = storage.node('mem:part1.txt')
>>> iternode.append(n1)
>>> content = iternode.read_text() # Materializes here
"""
if not self._is_virtual or self._virtual_type != 'iter':
raise ValueError("append() is only available for iternode virtual nodes")
self._sources.append(node)
[docs]
def extend(self, *nodes: StorageNode) -> None:
"""Extend this virtual node with multiple nodes (iternode only).
This method is only available for virtual nodes created with
storage.iternode(). It adds multiple node references to the accumulation list.
Content is read lazily when materialized.
Args:
*nodes: StorageNodes to append
Raises:
ValueError: If not a virtual iternode
Examples:
>>> iternode = storage.iternode(n1)
>>> iternode.extend(n2, n3, n4)
>>> content = iternode.read_text() # Materializes all
"""
if not self._is_virtual or self._virtual_type != 'iter':
raise ValueError("extend() is only available for iternode virtual nodes")
self._sources.extend(nodes)
[docs]
def zip(self) -> bytes:
"""Create ZIP archive from node content.
Behavior depends on node type:
- Regular file: Creates ZIP containing that file
- Regular directory: Creates ZIP with all files recursively
- Virtual iternode: Creates ZIP with all accumulated nodes as separate files
Returns:
bytes: ZIP archive as bytes
Raises:
ValueError: If node doesn't exist (for regular nodes)
Examples:
>>> # ZIP a directory
>>> docs = storage.node('home:documents')
>>> zip_bytes = docs.zip()
>>>
>>> # ZIP accumulated files
>>> iternode = storage.iternode(n1, n2, n3)
>>> zip_bytes = iternode.zip()
>>>
>>> # Save ZIP
>>> archive = storage.node('backup.zip')
>>> archive.write_bytes(zip_bytes)
"""
import zipfile
import io
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
if self._is_virtual and self._virtual_type == 'iter':
# Virtual iternode: add each source node as separate file
for node in self._sources:
# Use basename as filename in ZIP
filename = node.basename if node.basename else 'file'
zf.writestr(filename, node._read_bytes())
elif self.isfile:
# Single file: add to ZIP
zf.writestr(self.basename, self._read_bytes())
elif self.isdir:
# Directory: recursively add all files
self._zip_directory(zf, self, '')
else:
raise ValueError(f"Cannot create ZIP: node doesn't exist or is invalid type")
return buffer.getvalue()
def _zip_directory(self, zf: 'zipfile.ZipFile', dir_node: StorageNode, arc_prefix: str) -> None:
"""Recursively add directory contents to ZIP.
Args:
zf: ZipFile object to write to
dir_node: Directory node to process
arc_prefix: Archive path prefix for this directory
"""
for child in dir_node.children():
# Build archive path
arc_path = f"{arc_prefix}/{child.basename}" if arc_prefix else child.basename
if child.isfile:
# Add file to ZIP
zf.writestr(arc_path, child._read_bytes())
elif child.isdir:
# Recurse into subdirectory
self._zip_directory(zf, child, arc_path)
# ==================== Directory Operations ====================
def _create_node(self, manager: StorageManager, mount_name: str, path: str) -> "StorageNode":
"""Factory method for creating node instances.
This method is used internally to create new node instances while
preserving the correct class type through inheritance. Subclasses
can override this method to customize node creation.
Args:
manager: StorageManager instance
mount_name: Mount point name
path: Path within the mount
Returns:
StorageNode: New node instance of the same class as self
Examples:
>>> # In a subclass with custom constructor
>>> class CustomNode(StorageNode):
... def __init__(self, manager, mount, path, extra_param):
... super().__init__(manager, mount, path)
... self.extra = extra_param
...
... def _create_node(self, manager, mount, path):
... # Preserve extra_param when creating children
... return self.__class__(manager, mount, path, self.extra)
Note:
This is a protected method (starts with _) and is not part of
the public API. It should only be overridden by subclasses that
need custom node creation logic.
"""
return self.__class__(manager, mount_name, path)
[docs]
@apiready
def children(self) -> Annotated[list["StorageNode"], "List of child nodes in this directory"]:
"""List child nodes (if directory)."""
names = self._backend.list_dir(self._path)
return [self.child(name) for name in names]
[docs]
@apiready
def child(
self,
*parts: Annotated[str, "Path components to append"]
) -> Annotated["StorageNode", "Child node at the specified path"]:
"""Get a child node by path components.
Args:
*parts: Path components to append. Can be:
- Single string with path separators: 'aaa/bbb/ccc'
- Multiple strings: 'aaa', 'bbb', 'ccc'
Returns:
StorageNode: Child node with combined path
Examples:
>>> docs = storage.node('home:documents')
>>>
>>> # Single path string
>>> report = docs.child('2024/reports/q4.pdf')
>>>
>>> # Multiple components
>>> report = docs.child('2024', 'reports', 'q4.pdf')
>>>
>>> # Both produce: 'home:documents/2024/reports/q4.pdf'
"""
# Join all parts into a single path
child_path = '/'.join(parts)
# Combine with current path
full_child_path = str(self._posix_path / child_path)
return self._create_node(self._manager, self._mount_name, full_child_path)
[docs]
@apiready
def mkdir(
self,
parents: Annotated[bool, "Create parent directories if needed"] = False,
exist_ok: Annotated[bool, "Don't raise error if directory exists"] = False
) -> None:
"""Create directory."""
self._backend.mkdir(self._path, parents=parents, exist_ok=exist_ok)
# ==================== Advanced Methods ====================
[docs]
def local_path(self, mode: str = 'r'):
"""Get local filesystem path for this file.
Returns a context manager that provides a local filesystem path.
For local storage, returns the actual path. For remote storage
(S3, GCS, etc.), downloads to a temporary file, yields the temp path,
and uploads changes on exit.
This is essential for integrating with external tools that only
work with local filesystem paths (ffmpeg, ImageMagick, etc.).
Args:
mode: Access mode
- 'r': Read-only (download, no upload)
- 'w': Write-only (no download, upload on exit)
- 'rw': Read-write (download and upload)
Returns:
Context manager yielding str (local filesystem path)
Examples:
>>> # Process video with ffmpeg
>>> video = storage.node('s3:videos/input.mp4')
>>> with video.local_path(mode='r') as path:
... subprocess.run(['ffmpeg', '-i', path, 'output.mp4'])
>>>
>>> # Modify image in place
>>> image = storage.node('s3:photos/pic.jpg')
>>> with image.local_path(mode='rw') as path:
... subprocess.run(['convert', path, '-resize', '800x600', path])
>>> # Changes automatically uploaded to S3
Notes:
- For local storage, returns the actual path (no copy)
- For remote storage, uses temporary files
- Temporary files are automatically cleaned up on exit
- Large files are streamed in chunks to avoid memory issues
"""
return self._backend.local_path(self._path, mode=mode)
[docs]
def call(self, *args,
callback: Callable[[], None] | None = None,
async_mode: bool = False,
return_output: bool = False,
**subprocess_kwargs) -> str | None:
"""Execute external command with automatic local_path management.
Automatically manages local filesystem paths for StorageNode arguments,
downloading from cloud storage as needed and uploading changes after
execution. Perfect for integrating with external tools like ffmpeg,
imagemagick, pandoc, etc.
Args:
*args: Command arguments (str or StorageNode)
StorageNode arguments are automatically converted to local paths
callback: Function to call on completion (async mode only)
async_mode: Run in background thread (default: False)
return_output: Return subprocess output as string (default: False)
**subprocess_kwargs: Additional arguments passed to subprocess.run()
(e.g., cwd, env, timeout, shell, etc.)
Returns:
str | None: Command output if return_output=True, None otherwise
In async mode, returns immediately (None)
Raises:
subprocess.CalledProcessError: If command exits with non-zero status
FileNotFoundError: If command executable not found
Examples:
>>> # Video conversion (cloud storage)
>>> input_video = storage.node('s3:videos/input.mp4')
>>> output_video = storage.node('s3:videos/output.mp4')
>>> input_video.call('ffmpeg', '-i', input_video, '-vcodec', 'h264', output_video)
>>> # Automatically downloads input, uploads output
>>> # Image resize (local storage)
>>> image = storage.node('home:photos/photo.jpg')
>>> image.call('convert', image, '-resize', '800x600', image)
>>> # With callback (async)
>>> def on_complete():
... print("Processing complete!")
>>> video.call('ffmpeg', '-i', video, 'output.mp4',
... callback=on_complete, async_mode=True)
>>> # Returns immediately, callback called when done
>>> # Capture output
>>> pdf = storage.node('documents:report.pdf')
>>> info = pdf.call('pdfinfo', pdf, return_output=True)
>>> print(info)
>>> # With subprocess options
>>> script = storage.node('scripts:process.py')
>>> script.call('python', script, 'arg1', 'arg2',
... cwd='/tmp', timeout=60, env={'DEBUG': '1'})
Notes:
- StorageNode arguments use local_path(mode='rw') automatically
- Files are downloaded before command execution
- Modified files are uploaded after command execution
- In async mode, cleanup happens in background thread
- Use return_output=False for commands with large output
- For shell commands, use shell=True in subprocess_kwargs
"""
from contextlib import ExitStack
import subprocess
import threading
def _execute():
with ExitStack() as stack:
cmd_args = []
for arg in args:
if isinstance(arg, StorageNode):
# Automatically get local path for StorageNode
local_path = stack.enter_context(arg.local_path(mode='rw'))
cmd_args.append(local_path)
else:
cmd_args.append(str(arg))
# Execute command
if return_output:
result = subprocess.check_output(cmd_args, **subprocess_kwargs)
output = result.decode('utf-8') if isinstance(result, bytes) else result
else:
subprocess.check_call(cmd_args, **subprocess_kwargs)
output = None
# Call callback if provided
if callback:
callback()
return output
if async_mode:
# Run in background thread
thread = threading.Thread(target=_execute)
thread.daemon = True
thread.start()
return None
else:
# Run synchronously
return _execute()
[docs]
def serve(self,
environ: dict,
start_response: callable,
download: bool = False,
download_name: str | None = None,
cache_max_age: int | None = None) -> list[bytes]:
"""Serve file via WSGI interface with caching support.
Serves the file through a WSGI application with:
- ETag support for caching (304 Not Modified responses)
- Content-Disposition headers for downloads
- Cache-Control headers
- Efficient streaming for large files
Perfect for integrating storage with web frameworks like Flask, Django,
Pyramid, or any WSGI application.
Args:
environ: WSGI environment dict (contains HTTP headers, request info)
start_response: WSGI start_response callable
download: If True, force download with Content-Disposition: attachment
download_name: Custom filename for downloads (default: basename of file)
cache_max_age: Cache-Control max-age in seconds (default: no caching)
Returns:
list[bytes]: Response body as list of byte chunks (WSGI response)
Raises:
FileNotFoundError: If file doesn't exist
StorageError: If file cannot be read
Examples:
>>> # Flask integration
>>> from flask import Flask, request
>>> app = Flask(__name__)
>>>
>>> @app.route('/files/<path:filepath>')
>>> def serve_file(filepath):
>>> node = storage.node(f'uploads:{filepath}')
>>> return node.serve(request.environ, lambda s, h: None,
>>> cache_max_age=3600)
>>>
>>> # Download endpoint
>>> @app.route('/download/<path:filepath>')
>>> def download_file(filepath):
>>> node = storage.node(f'uploads:{filepath}')
>>> return node.serve(request.environ, lambda s, h: None,
>>> download=True,
>>> download_name='report.pdf')
>>>
>>> # Plain WSGI application
>>> def application(environ, start_response):
>>> path = environ['PATH_INFO']
>>> node = storage.node(f'static:{path}')
>>> if not node.exists:
>>> start_response('404 Not Found', [('Content-Type', 'text/plain')])
>>> return [b'Not Found']
>>> return node.serve(environ, start_response, cache_max_age=86400)
Notes:
- ETag is computed as "{mtime}-{size}" for efficient caching
- Returns 304 Not Modified when client ETag matches
- Uses local_path() for efficient cloud storage serving
- Streams large files in chunks (doesn't load entire file in memory)
"""
if not self.exists:
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b'Not Found']
# Check ETag for 304 Not Modified
if_none_match = environ.get('HTTP_IF_NONE_MATCH')
if if_none_match:
# Remove quotes from ETag
if_none_match = if_none_match.replace('"', '')
# Compute our ETag (mtime-size)
mtime = self.mtime
size = self.size
our_etag = f"{mtime}-{size}"
if our_etag == if_none_match:
# Client has current version, return 304
headers = [('ETag', f'"{our_etag}"')]
start_response('304 Not Modified', headers)
return [b'']
# Build response headers
headers = []
# ETag for caching
mtime = self.mtime
size = self.size
etag = f"{mtime}-{size}"
headers.append(('ETag', f'"{etag}"'))
# Content-Type
headers.append(('Content-Type', self.mimetype))
# Content-Length
headers.append(('Content-Length', str(size)))
# Content-Disposition (download)
if download or download_name:
filename = download_name or self.basename
headers.append(('Content-Disposition', f'attachment; filename="{filename}"'))
# Cache-Control
if cache_max_age is not None:
headers.append(('Cache-Control', f'max-age={cache_max_age}'))
# Start response
start_response('200 OK', headers)
# Stream file content
# Use local_path for efficient serving (downloads from cloud if needed)
with self.local_path(mode='r') as local_path:
# Read and stream in chunks
chunk_size = 64 * 1024 # 64KB chunks
chunks = []
with open(local_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
chunks.append(chunk)
return chunks
[docs]
def url(self, expires_in: int = 3600, **kwargs) -> str | None:
"""Generate public URL for accessing this file.
Returns a URL that can be used to access the file directly.
For cloud storage (S3, GCS), generates a presigned/signed URL.
For HTTP storage, returns the direct URL.
For local storage, returns None.
Args:
expires_in: URL expiration time in seconds (default: 3600 = 1 hour)
**kwargs: Backend-specific options
Returns:
str | None: Public URL or None if not supported
Examples:
>>> # S3 presigned URL
>>> file = storage.node('s3:documents/report.pdf')
>>> url = file.url()
>>> print(url)
'https://bucket.s3.amazonaws.com/documents/report.pdf?X-Amz-...'
>>>
>>> # Custom expiration (24 hours)
>>> url = file.url(expires_in=86400)
Notes:
- Cloud storage URLs are temporary and expire
- Use this for sharing files externally
- HTTP URLs are direct (no expiration)
"""
return self._backend.url(self._path, expires_in=expires_in, **kwargs)
[docs]
def internal_url(self, nocache: bool = False) -> str | None:
"""Generate internal/relative URL for this file.
Returns a URL suitable for internal application use.
Optionally includes cache busting parameters.
Args:
nocache: If True, append mtime for cache busting
Returns:
str | None: Internal URL or None if not supported
Examples:
>>> file = storage.node('home:static/app.js')
>>> url = file.internal_url(nocache=True)
>>> print(url)
'/storage/home/static/app.js?mtime=1234567890'
Notes:
- Useful for web applications
- Cache busting helps with CDN/browser caching
"""
return self._backend.internal_url(self._path, nocache=nocache)
@property
def versions(self) -> list[dict]:
"""Get list of available versions for this file.
Returns version history for versioned storage (S3 with versioning enabled).
For non-versioned storage, returns empty list.
Returns:
list[dict]: List of version info dicts
Examples:
>>> file = storage.node('s3:documents/report.pdf')
>>> for v in file.versions:
... print(f"Version {v['version_id']}: {v['last_modified']}")
Notes:
- Only S3 with versioning enabled returns versions
- Empty list if versioning not supported
"""
return self._backend.get_versions(self._path)
def _resolve_version_index(self, index: int) -> str:
"""Resolve version index to version_id.
Supports negative indexing like Python lists.
Args:
index: Version index
-1 = latest (most recent)
-2 = previous version
0 = oldest version
1 = second oldest
Returns:
version_id string
Raises:
IndexError: If index out of range
"""
versions = self.versions
if not versions:
raise IndexError(f"No versions available for {self.fullpath}")
try:
# Python gestisce automaticamente indici negativi
version_info = versions[index]
return version_info['version_id']
except IndexError:
total = len(versions)
raise IndexError(
f"Version index {index} out of range. "
f"Available versions: 0 to {total-1} or -1 to -{total}"
)
def _resolve_version_at_date(self, target_date: datetime) -> str | None:
"""Find version_id closest to (but not after) target_date.
Args:
target_date: Target datetime
Returns:
version_id or None if no version found before date
"""
from datetime import timezone
versions = self.versions
# Normalize target_date to UTC if naive
if target_date.tzinfo is None:
target_date = target_date.replace(tzinfo=timezone.utc)
# Filter versions up to target date
valid_versions = [
v for v in versions
if v['last_modified'] <= target_date
]
if not valid_versions:
return None
# Get the most recent version before target date
target_version = max(
valid_versions,
key=lambda v: v['last_modified']
)
return target_version['version_id']
def _list_supported_features(self) -> str:
"""Helper to list what this backend supports.
Returns:
Human-readable string of supported features
"""
return str(self.capabilities)
@property
def version_count(self) -> int:
"""Get total number of versions available.
Returns:
int: Number of versions, or 0 if versioning not supported
Examples:
>>> print(f"File has {node.version_count} versions")
"""
return len(self.versions)
[docs]
def compact_versions(self, dry_run: bool = False) -> int:
"""Compact version history by removing consecutive duplicates.
Scans version history and removes versions that have identical content
to the immediately preceding version. This cleans up unnecessary versions
created by repeated writes of the same content, reducing storage costs.
The rule: For each pair of consecutive versions with the same ETag,
delete the second (more recent) one, keeping the first (older) one.
Non-consecutive duplicates are preserved to maintain history
(e.g., reverting to an earlier state).
Args:
dry_run: If True, only report what would be deleted without actually deleting
Returns:
int: Number of versions removed (or would be removed if dry_run=True)
Raises:
PermissionError: If versioning not supported
Examples:
>>> # Check what would be removed
>>> count = node.compact_versions(dry_run=True)
>>> print(f"Would remove {count} duplicate versions")
>>> # Actually compact the history
>>> removed = node.compact_versions()
>>> print(f"Removed {removed} redundant versions")
Notes:
- Only works with backends that support versioning
- Requires backend to support version deletion (S3)
- Preserves the oldest of each duplicate pair
- History of changes is maintained (non-consecutive duplicates kept)
- Useful for reducing storage costs on versioned buckets
Example scenario:
v1: content A (etag: xxx)
v2: content A (etag: xxx) ← REMOVED (consecutive duplicate)
v3: content B (etag: yyy)
v4: content B (etag: yyy) ← REMOVED (consecutive duplicate)
v5: content A (etag: xxx) ← KEPT (not consecutive to v1, shows revert)
"""
if not self.capabilities.versioning:
raise PermissionError(
f"{self._mount_name} backend does not support versioning"
)
versions = self.versions
if len(versions) < 2:
return 0 # Nothing to clean up
# Sort by date (oldest first) to process chronologically
sorted_versions = sorted(versions, key=lambda v: v['last_modified'])
to_delete = []
# Compare consecutive pairs
for i in range(len(sorted_versions) - 1):
current = sorted_versions[i]
next_version = sorted_versions[i + 1]
current_etag = current.get('etag', '')
next_etag = next_version.get('etag', '')
# If consecutive versions have same content, mark the newer one for deletion
if current_etag and next_etag and current_etag == next_etag:
to_delete.append(next_version['version_id'])
if dry_run:
return len(to_delete)
# Delete marked versions
deleted_count = 0
for version_id in to_delete:
try:
self._backend.delete_version(self._path, version_id)
deleted_count += 1
except Exception as e:
# Log but continue with other deletions
import warnings
warnings.warn(
f"Failed to delete version {version_id}: {e}",
RuntimeWarning
)
return deleted_count
[docs]
def fill_from_url(self, url: str, timeout: int = 30) -> None:
"""Download content from URL and write to this file.
Fetches content from the specified URL and writes it to this storage node.
Useful for downloading files from the internet into storage.
Args:
url: URL to download from (http:// or https://)
timeout: Request timeout in seconds (default: 30)
Raises:
ValueError: If URL is invalid
IOError: If download fails
PermissionError: If storage is read-only
Examples:
>>> # Download image from internet
>>> img = storage.node('s3:downloads/logo.png')
>>> img.fill_from_url('https://example.com/logo.png')
>>>
>>> # Download with custom timeout
>>> file = storage.node('local:data.json')
>>> file.fill_from_url('https://api.example.com/data', timeout=60)
Notes:
- Uses urllib for HTTP requests (no external dependencies)
- Overwrites existing file if present
- Parent directory must exist or backend must support auto-creation
"""
import urllib.request
import urllib.error
# Validate URL
if not url or not url.startswith(('http://', 'https://')):
raise ValueError(f"Invalid URL: {url}. Must start with http:// or https://")
# Download content
try:
with urllib.request.urlopen(url, timeout=timeout) as response:
data = response.read()
except urllib.error.URLError as e:
raise IOError(f"Failed to download from {url}: {e}") from e
except Exception as e:
raise IOError(f"Error downloading from {url}: {e}") from e
# Write to storage
self._write_bytes(data)
[docs]
def to_base64(self, mime: str | None = None, include_uri: bool = True) -> str:
"""Encode file content as base64 string.
Converts the file content to a base64-encoded string, optionally
formatted as a data URI for direct embedding in HTML/CSS.
Args:
mime: MIME type to include in data URI (auto-detected if None)
include_uri: If True, format as data URI; if False, return raw base64
Returns:
str: Base64-encoded string or data URI
Raises:
FileNotFoundError: If file doesn't exist
ValueError: If node is a directory
Examples:
>>> # Data URI with auto-detected MIME type
>>> img = storage.node('images:logo.png')
>>> data_uri = img.to_base64()
>>> print(data_uri)
'data:image/png;base64,iVBORw0KGgo...'
>>>
>>> # Raw base64 without URI wrapper
>>> b64 = img.to_base64(include_uri=False)
>>> print(b64)
'iVBORw0KGgo...'
>>>
>>> # Custom MIME type
>>> data_uri = img.to_base64(mime='image/x-icon')
Notes:
- Useful for embedding small images/files in HTML
- MIME type auto-detection based on file extension
- Large files will result in very long strings
"""
import base64
import mimetypes
# Check exists and is file
if not self.exists:
raise FileNotFoundError(f"File not found: {self.fullpath}")
if not self.isfile:
raise ValueError(f"Cannot encode directory as base64: {self.fullpath}")
# Read file content
data = self._read_bytes()
# Encode to base64
b64_data = base64.b64encode(data).decode('ascii')
# Return based on format
if include_uri:
# Auto-detect MIME type if not provided
if mime is None:
mime, _ = mimetypes.guess_type(self.basename)
if mime is None:
mime = 'application/octet-stream'
return f'data:{mime};base64,{b64_data}'
else:
return b64_data
# ==================== Special Methods ====================
[docs]
def __repr__(self) -> str:
"""String representation for debugging."""
return f"StorageNode('{self.fullpath}')"
[docs]
def __str__(self) -> str:
"""String representation."""
return self.fullpath
[docs]
def __eq__(self, other: object) -> bool:
"""Compare nodes by content (MD5 hash).
Two nodes are considered equal if they have the same file content,
regardless of their path or location. Comparison is done via MD5 hash.
Args:
other: Another StorageNode or object to compare
Returns:
bool: True if both nodes have identical content
Examples:
>>> file1 = storage.node('home:original.txt')
>>> file2 = storage.node('backup:copy.txt')
>>> if file1 == file2:
... print("Files have identical content")
Notes:
- Only files can be compared (directories return False)
- Non-existent files return False
- Comparing with non-StorageNode returns NotImplemented
"""
if not isinstance(other, StorageNode):
return NotImplemented
# If same path, they're equal
if self.fullpath == other.fullpath:
return True
# Both must be files to compare content
if not (self.isfile and other.isfile):
return False
# Compare via MD5 hash
try:
return self.md5hash == other.md5hash
except (FileNotFoundError, ValueError):
return False
[docs]
def __ne__(self, other: object) -> bool:
"""Compare nodes for inequality.
Args:
other: Another StorageNode or object to compare
Returns:
bool: True if nodes have different content
Examples:
>>> if file1 != file2:
... print("Files differ")
"""
result = self.__eq__(other)
if result is NotImplemented:
return NotImplemented
return not result