API Quick Reference
This appendix provides a quick reference for all genro-storage APIs.
Note
For async/await support (FastAPI, asyncio) available in v0.3.0+, see Async API Reference (v0.3.0+) below.
StorageManager API
Configuration
Method |
Description |
|---|---|
|
Configure mount points from YAML/JSON file or list of dicts |
|
Get list of configured mount point names |
|
Check if a mount point is configured |
Node Creation
Method |
Description |
|---|---|
|
Create a StorageNode. If version specified, creates read-only snapshot |
|
Create virtual node for lazy concatenation of multiple nodes |
|
Create virtual node for lazy unified diff between two nodes |
StorageNode API
File I/O
Method |
Description |
Capabilities Required |
|---|---|---|
|
Read entire file as string (mode=’r’) or bytes (mode=’rb’) |
|
|
Write string (mode=’w’) or bytes (mode=’wb’) to file |
|
|
Open file for reading/writing (context manager) |
|
Directory Operations
Method |
Description |
Capabilities Required |
|---|---|---|
|
Create directory |
|
|
List child nodes (if directory) |
|
|
Get child node by path components |
None (navigation) |
Properties
Property |
Description |
Capabilities Required |
|---|---|---|
|
True if file/directory exists |
|
|
True if node is a file |
|
|
True if node is a directory |
|
|
File size in bytes |
|
|
Last modification time (Unix timestamp) |
|
|
Filename with extension |
None |
|
Filename without extension |
None |
|
File extension (including dot) |
None |
|
Full path including mount |
None |
|
File system path (without mount prefix) |
None |
|
Parent directory node |
None |
|
Backend capabilities |
None |
|
MD5 hash of file content |
|
|
MIME type based on extension |
None |
Copy and Move
Method |
Description |
Capabilities Required |
|---|---|---|
|
Copy file/directory to destination with filtering and callbacks |
|
|
Move file/directory to destination |
|
|
Delete file/directory |
|
Versioning
Method/Property |
Description |
Capabilities Required |
|---|---|---|
|
Number of versions (0 if not supported) |
|
|
List of version metadata dicts |
|
|
Open specific version for reading |
|
|
Remove consecutive duplicate versions |
|
Virtual Nodes
Method |
Description |
Available On |
|---|---|---|
|
Append node to iternode |
|
|
Extend iternode with multiple nodes |
|
Archiving
Method |
Description |
Capabilities Required |
|---|---|---|
|
Create ZIP archive: single file, directory (recursive), or iternode (multiple files) |
|
Metadata
Method |
Description |
Capabilities Required |
|---|---|---|
|
Get cloud metadata as dict |
|
|
Set cloud metadata |
|
URLs
Method |
Description |
Capabilities Required |
|---|---|---|
|
Generate presigned URL (S3/GCS) |
|
|
Get internal URL path |
Backend-dependent |
|
Encode file as base64/data URI |
|
Advanced
Method |
Description |
Capabilities Required |
|---|---|---|
|
Context manager for local filesystem path |
|
|
Execute external command with automatic temp file handling |
|
|
WSGI file serving with ETag support |
|
|
Download from URL and write to node |
|
Backend Capabilities
Each backend reports its capabilities via the capabilities property.
Core Capabilities
Capability |
Description |
Default |
|---|---|---|
|
Can read files |
|
|
Can write files |
|
|
Can delete files/directories |
|
|
Backend is read-only |
|
|
Storage is temporary (memory) |
|
Advanced Capabilities
Capability |
Description |
Default |
|---|---|---|
|
Supports file versioning |
|
|
Can list versions |
|
|
Can access specific versions |
|
|
Supports custom metadata |
|
|
Can generate presigned URLs |
|
Backend Capability Matrix
Backend |
Read/Write |
Delete |
Versioning |
Metadata |
URLs |
Temporary |
|---|---|---|---|---|---|---|
Local |
✓ |
✓ |
✗ |
✗ |
✗ |
✗ |
S3 |
✓ |
✓ |
✓ * |
✓ |
✓ |
✗ |
GCS |
✓ |
✓ |
✓ * |
✓ |
✓ |
✗ |
Azure |
✓ |
✓ |
✓ * |
✓ |
✓ |
✗ |
HTTP |
Read only |
✗ |
✗ |
✗ |
✗ |
✗ |
Memory |
✓ |
✓ |
✗ |
✗ |
✗ |
✓ |
Base64 |
Read/Write ** |
✗ |
✗ |
✗ |
✗ |
✗ |
* Versioning must be enabled on the bucket
** Base64 backend is writable but path changes after write
Skip Strategies
When copying files, you can specify a skip strategy to avoid unnecessary operations.
Strategy |
Behavior |
Performance |
|---|---|---|
|
Always copy (overwrite existing) |
Fast (no checks) |
|
Skip if destination exists |
Very fast (stat only) |
|
Skip if same size |
Fast (stat only) |
|
Skip if same MD5 hash |
Medium (may use ETag) |
|
Use custom skip function via |
Depends on function |
Copy Parameters
Additional copy_to() parameters for advanced control:
Parameter |
Description |
Example |
|---|---|---|
|
Callable to filter files: |
Filter by size, type, etc. |
|
Custom skip function: |
Required when |
|
Progress callback: |
Update progress bar |
|
Called for each file: |
Logging, notifications |
|
Called when file skipped: |
Track skipped files |
Common Patterns
Incremental Backup
# Copy only changed files
source.copy_to(dest, skip='hash')
Progress Tracking
from tqdm import tqdm
pbar = tqdm(desc="Copying")
source.copy_to(dest, progress=lambda cur, tot: pbar.update(1))
Filter by Size
# Copy only files smaller than 10MB
source.copy_to(dest, filter=lambda node, path: node.size < 10_000_000)
Custom Skip Logic
# Skip if destination is newer
def skip_if_newer(src, dest):
if not dest.exists:
return False
return dest.mtime > src.mtime
source.copy_to(dest, skip='custom', skip_fn=skip_if_newer)
Copy with Callbacks
# Log each file and track skips
def log_file(node):
print(f"Copied: {node.path}")
def log_skip(node, reason):
print(f"Skipped {node.path}: {reason}")
source.copy_to(dest, skip='hash', on_file=log_file, on_skip=log_skip)
Lazy Concatenation
# Build document from parts
builder = storage.iternode(header, body, footer)
builder.append(appendix)
builder.copy_to(storage.node('result.txt'))
Generate Diff
# Compare versions
diff = storage.diffnode(version1, version2)
diff.copy_to(storage.node('changes.diff'))
Create ZIP Archive
# Zip a single file
file = storage.node('data:report.pdf')
zip_bytes = file.zip()
storage.node('data:report.zip').write(zip_bytes, mode='wb')
# Zip entire directory (recursive)
folder = storage.node('data:documents/')
zip_bytes = folder.zip()
# Zip multiple files (iternode)
archive = storage.iternode(file1, file2, file3)
zip_bytes = archive.zip()
Async API Reference (v0.3.0+)
For async/await contexts (FastAPI, asyncio applications).
AsyncStorageManager API
Configuration
Method |
Description |
|---|---|
|
Configure mount points (synchronous - call at startup) |
|
Add single mount point at runtime (synchronous) |
|
Get list of configured mount point names (synchronous) |
|
Check if a mount point is configured (synchronous) |
Node Creation
Method |
Description |
|---|---|
|
Create an AsyncStorageNode (synchronous) |
AsyncStorageNode API
All I/O operations are async. Properties without I/O remain synchronous.
Async I/O Operations
Method |
Description |
Return Type |
|---|---|---|
|
Read entire file as string (mode=’r’) or bytes (mode=’rb’) |
|
|
Write string (mode=’w’) or bytes (mode=’wb’) to file |
|
|
Check if file exists |
|
|
Get file size in bytes |
|
|
Get last modification time |
|
|
Check if node is a file |
|
|
Check if node is a directory |
|
|
Delete file or directory |
|
|
Copy file to target location |
|
|
Move file to target location |
|
Synchronous Properties
These properties do not require I/O and remain synchronous:
Property |
Description |
Type |
|---|---|---|
|
File system path (without mount prefix) |
|
|
Full path including mount |
|
|
Filename with extension |
|
|
Filename without extension |
|
|
File extension (including dot) |
|
Usage Examples
Basic Usage
from genro_storage import AsyncStorageManager
storage = AsyncStorageManager()
storage.configure([
{'name': 'uploads', 'type': 's3', 'bucket': 'my-bucket'}
])
# Async context
async def process():
node = storage.node('uploads:file.txt')
# Async I/O
if await node.exists():
data = await node.read(mode='rb')
await node.write(b'new data', mode='wb')
# Sync properties
print(node.path)
print(node.basename)
FastAPI Integration
from fastapi import FastAPI, HTTPException
from genro_storage import AsyncStorageManager
app = FastAPI()
storage = AsyncStorageManager()
@app.on_event("startup")
async def startup():
storage.configure([
{'name': 'uploads', 'type': 's3', 'bucket': 'my-bucket'}
])
@app.get("/files/{filepath:path}")
async def get_file(filepath: str):
node = storage.node(f'uploads:{filepath}')
if not await node.exists():
raise HTTPException(status_code=404)
return {
"data": await node.read(mode='rb'),
"size": await node.size(),
"mime_type": node.mimetype # Sync property
}
Concurrent Operations
import asyncio
async def process_multiple(file_list):
async def process_one(filepath):
node = storage.node(f'uploads:{filepath}')
if await node.exists():
data = await node.read(mode='rb')
return len(data)
return 0
# Process all files concurrently
sizes = await asyncio.gather(*[process_one(f) for f in file_list])
return sum(sizes)
Implementation Notes
Built on asyncer library for automatic sync→async conversion
Uses ThreadPoolExecutor for I/O-bound operations
No event loop blocking
Configuration methods are synchronous (call at startup)
Full compatibility with underlying sync API (81% coverage, 274 tests)