Skip to main content

Storage Architecture

This page provides a comprehensive deep dive into OmniDaemon’s storage architecture, covering what is stored, how it’s stored, and the two production-ready backends: JSON (development) and Redis (production).

What is Stored?

OmniDaemon stores four types of data:

1. Agent Registry

What: Metadata about registered agents
Lifetime: Persistent (until explicitly deleted)
Purpose: Track which agents are registered, their configuration, and subscription details
Data Structure:
{
    "name": "file-processor",           # Unique agent name
    "callback_name": "process_file",    # Callback function name
    "tools": ["file_reader", "..."],    # Available tools
    "description": "Processes uploads",  # Human description
    "config": {                          # Subscription config
        "max_retries": 3,
        "reclaim_idle_ms": 300000,
        "dlq_enabled": True
    },
    "topic": "file.uploaded",            # Topic subscribed to
    "created_at": 1678901234.567         # Registration timestamp
}

2. Task Results

What: Outputs from agent callbacks
Lifetime: 24 hours (configurable TTL)
Purpose: Store results for retrieval by publishers or other systems
Data Structure:
{
    "task_id": "550e8400-e29b-41d4-a716-446655440000",
    "result": {                          # Agent return value
        "status": "success",
        "processed_file": "doc.pdf",
        "output": "..."
    },
    "saved_at": 1678901234.567           # When saved
}
TTL (Time-To-Live):
  • Default: 24 hours (86,400 seconds)
  • Auto-expires (Redis) or manual cleanup (JSON)
  • Balances storage efficiency with result availability

3. Metrics

What: Performance and operational metrics
Lifetime: Configurable (default: unlimited, up to 100K entries)
Purpose: Track agent performance, debug issues, analyze trends
Data Structure:
{
    "topic": "file.uploaded",
    "agent_name": "file-processor",
    "event_type": "task_received",      # received, processed, failed
    "task_id": "550e8400-...",
    "timestamp": 1678901234.567,
    "processing_time_ms": 150.5,        # For task_processed
    "error": "...",                      # For task_failed
    "saved_at": 1678901234.567
}
Event Types:
  • task_received - Agent received task
  • task_processed - Agent completed successfully
  • task_failed - Agent failed to process

4. Configuration

What: System configuration and state
Lifetime: Persistent
Purpose: Store runtime settings, start time, runner ID, custom settings
Data Structure:
{
    "_omnidaemon_start_time": 1678901234.567,    # When runner started
    "_omnidaemon_runner_id": "abc-123",           # Current runner ID
    "custom_key": "custom_value",                 # User config
    # ... other config
}

Storage Interface (BaseStore)

All storage backends implement the BaseStore abstract class:

Connection Management

async def connect() -> None:
    """Establish connection to storage backend."""

async def close() -> None:
    """Close connection and cleanup resources."""

async def health_check() -> Dict[str, Any]:
    """Check storage health and return stats."""

Agent Management

async def add_agent(topic: str, agent_data: Dict[str, Any]) -> None:
    """Add or update an agent (upsert)."""

async def get_agent(topic: str, agent_name: str) -> Optional[Dict[str, Any]]:
    """Get specific agent."""

async def get_agents_by_topic(topic: str) -> List[Dict[str, Any]]:
    """Get all agents for a topic."""

async def list_all_agents() -> Dict[str, List[Dict[str, Any]]]:
    """Get all agents grouped by topic."""

async def delete_agent(topic: str, agent_name: str) -> bool:
    """Delete specific agent."""

async def delete_topic(topic: str) -> int:
    """Delete all agents for a topic."""

Result Management

async def save_result(
    task_id: str,
    result: Dict[str, Any],
    ttl_seconds: Optional[int] = None
) -> None:
    """Save task result with optional TTL."""

async def get_result(task_id: str) -> Optional[Dict[str, Any]]:
    """Get task result."""

async def delete_result(task_id: str) -> bool:
    """Delete task result."""

async def list_results(limit: int = 100) -> List[Dict[str, Any]]:
    """List recent results."""

Metrics Management

async def save_metric(metric_data: Dict[str, Any]) -> None:
    """Save a metric entry."""

async def get_metrics(
    topic: Optional[str] = None,
    limit: int = 1000
) -> List[Dict[str, Any]]:
    """Get recent metrics."""

Configuration Management

async def save_config(key: str, value: Any) -> None:
    """Save configuration value."""

async def get_config(key: str, default: Any = None) -> Any:
    """Get configuration value."""

Bulk Operations

async def clear_agents() -> int:
    """Clear all agents."""

async def clear_results() -> int:
    """Clear all results."""

async def clear_metrics() -> int:
    """Clear all metrics."""

async def clear_all() -> Dict[str, int]:
    """Clear all data (returns counts)."""

JSON Storage (Development)

JSONStore is a file-based storage implementation perfect for local development and testing.

Configuration

from omnidaemon.storage import JSONStore

store = JSONStore(
    storage_dir=".omnidaemon_data"  # Directory for JSON files
)
Environment Variables:
STORAGE_BACKEND=json
JSON_STORAGE_DIR=.omnidaemon_data  # Default

File Structure

.omnidaemon_data/
├── agents.json      # Agent registry
├── results.json     # Task results
├── metrics.json     # Metrics
└── config.json      # Configuration

How It Works

In-Memory Cache:
class JSONStore:
    def __init__(self, storage_dir: str):
        # In-memory caches (fast reads)
        self._agents: Dict[str, List[Dict[str, Any]]] = {}
        self._results: Dict[str, Dict[str, Any]] = {}
        self._metrics: List[Dict[str, Any]] = []
        self._config: Dict[str, Any] = {}
        
        # Thread safety
        self._lock = RLock()
Atomic Writes:
def _save_agents(self):
    """Save agents to file atomically."""
    # Write to temporary file first
    tmp_file = self.agents_file.with_suffix(".tmp")
    with open(tmp_file, "w", encoding="utf-8") as f:
        json.dump(self._agents, f, indent=2, default=str)
    
    # Atomic rename (OS-level atomicity)
    os.replace(tmp_file, self.agents_file)
Load on Connect:
async def connect(self):
    if self._connected:
        return
    
    with self._lock:
        self._load_agents()      # Load agents.json
        self._load_results()     # Load results.json
        self._load_metrics()     # Load metrics.json
        self._load_config()      # Load config.json
        self._connected = True
Save on Close:
async def close(self):
    with self._lock:
        self._save_agents()      # Save agents.json
        self._save_results()     # Save results.json
        self._save_metrics()     # Save metrics.json
        self._save_config()      # Save config.json
        self._connected = False

Agent Storage (agents.json)

Structure:
{
    "file.uploaded": [
        {
            "name": "file-processor",
            "callback_name": "process_file",
            "tools": ["file_reader"],
            "description": "Processes uploads",
            "config": {"max_retries": 3},
            "topic": "file.uploaded",
            "created_at": 1678901234.567
        }
    ],
    "analysis.requested": [
        {
            "name": "analyzer",
            "callback_name": "analyze",
            "tools": [],
            "description": "Analyzes data",
            "config": {},
            "topic": "analysis.requested",
            "created_at": 1678901250.123
        }
    ]
}
Operations: Add Agent (Upsert):
async def add_agent(self, topic: str, agent_data: Dict[str, Any]):
    with self._lock:
        agent_name = agent_data.get("name")
        
        if topic not in self._agents:
            self._agents[topic] = []
        
        # Remove old (if exists)
        self._agents[topic] = [
            a for a in self._agents[topic]
            if a.get("name") != agent_name
        ]
        
        # Add new
        self._agents[topic].append(agent_data)
        
        # Save immediately
        self._save_agents()
Get Agent:
async def get_agent(self, topic: str, agent_name: str):
    with self._lock:
        for agent in self._agents.get(topic, []):
            if agent.get("name") == agent_name:
                return agent.copy()  # Return copy (immutable)
        return None

Result Storage (results.json)

Structure:
{
    "550e8400-e29b-41d4-a716-446655440000": {
        "task_id": "550e8400-e29b-41d4-a716-446655440000",
        "result": {
            "status": "success",
            "output": "..."
        },
        "saved_at": 1678901234.567,
        "expires_at": 1678987634.567
    }
}
TTL Handling (Manual):
async def save_result(self, task_id: str, result: Dict, ttl_seconds: Optional[int]):
    with self._lock:
        result_data = {
            "task_id": task_id,
            "result": result,
            "saved_at": time.time(),
        }
        
        if ttl_seconds:
            result_data["expires_at"] = time.time() + ttl_seconds
        
        self._results[task_id] = result_data
        self._save_results()

async def get_result(self, task_id: str):
    with self._lock:
        result_data = self._results.get(task_id)
        
        if not result_data:
            return None
        
        # Check TTL expiration
        expires_at = result_data.get("expires_at")
        if expires_at and time.time() > expires_at:
            # Expired! Remove and return None
            del self._results[task_id]
            self._save_results()
            return None
        
        return result_data.get("result")

Metrics Storage (metrics.json)

Structure:
[
    {
        "topic": "file.uploaded",
        "agent_name": "file-processor",
        "event_type": "task_received",
        "task_id": "550e8400-...",
        "timestamp": 1678901234.567,
        "saved_at": 1678901234.567
    },
    {
        "topic": "file.uploaded",
        "agent_name": "file-processor",
        "event_type": "task_processed",
        "task_id": "550e8400-...",
        "timestamp": 1678901234.700,
        "processing_time_ms": 133.5,
        "saved_at": 1678901234.700
    }
]
Operations:
async def save_metric(self, metric_data: Dict[str, Any]):
    with self._lock:
        metric_data["saved_at"] = time.time()
        self._metrics.append(metric_data)
        
        # Keep only last 10K metrics
        if len(self._metrics) > 10_000:
            self._metrics = self._metrics[-10_000:]
        
        self._save_metrics()

async def get_metrics(self, topic: Optional[str] = None, limit: int = 1000):
    with self._lock:
        metrics = list(reversed(self._metrics))  # Most recent first
        
        if topic:
            metrics = [m for m in metrics if m.get("topic") == topic]
        
        return metrics[:limit]

Configuration Storage (config.json)

Structure:
{
    "_omnidaemon_start_time": 1678901234.567,
    "_omnidaemon_runner_id": "abc-123",
    "custom_setting": "value"
}
Operations:
async def save_config(self, key: str, value: Any):
    with self._lock:
        self._config[key] = value
        self._save_config()

async def get_config(self, key: str, default: Any = None):
    with self._lock:
        return self._config.get(key, default)

Performance Characteristics

Throughput:
  • Writes: ~1,000 ops/sec (depends on disk speed)
  • Reads: ~10,000 ops/sec (in-memory cache)
Latency:
  • Reads: <1ms (in-memory)
  • Writes: 1-10ms (disk I/O)
Memory Usage:
  • Proportional to data size (all data in memory)
  • ~1KB per agent
  • ~1KB per result
  • ~500 bytes per metric
Storage:
  • Human-readable JSON files
  • Indented for easy inspection
  • ~10-20% overhead vs binary

Pros & Cons

✅ Pros:
  • ✅ No external dependencies (pure Python)
  • ✅ Human-readable (easy to inspect/debug)
  • ✅ Simple setup (just a directory)
  • ✅ Atomic writes (safe file operations)
  • ✅ Fast reads (in-memory cache)
  • ✅ Easy backup (copy directory)
❌ Cons:
  • ❌ Not distributed (single machine only)
  • ❌ No concurrent access from multiple processes
  • ❌ Manual TTL handling (cleanup not automatic)
  • ❌ All data in memory (RAM usage)
  • ❌ Slower writes (disk I/O)
  • ❌ No advanced queries
Use Cases:
  • Local development
  • Testing
  • Single-instance deployments
  • Small-scale production (< 10K agents)

Redis Storage (Production)

RedisStore is a Redis-based storage implementation optimized for production deployments.

Configuration

from omnidaemon.storage import RedisStore

store = RedisStore(
    redis_url="redis://localhost:6379",
    key_prefix="omni"  # Namespace all keys
)
Environment Variables:
STORAGE_BACKEND=redis
REDIS_URL=redis://localhost:6379
REDIS_KEY_PREFIX=omni  # Default

Key Naming Convention

All Redis keys are namespaced with a prefix:
def _key(self, *parts: str) -> str:
    """Build namespaced key."""
    return f"{self.key_prefix}:{':'.join(parts)}"

# Examples:
_key("agent", "file.uploaded", "file-processor")
# → "omni:agent:file.uploaded:file-processor"

_key("result", "550e8400-...")
# → "omni:result:550e8400-..."

_key("metrics", "stream")
# → "omni:metrics:stream"
Why Prefix?
  • ✅ Namespace separation (multiple OmniDaemon instances)
  • ✅ Easy cleanup (delete all omni:* keys)
  • ✅ Organized key space
  • ✅ Avoid conflicts with other Redis users

Redis Data Structures Used

1. Hashes - For agents 2. Strings - For results (with TTL) 3. Sets - For topic/agent indexes 4. Sorted Sets - For result index 5. Streams - For metrics

Agent Storage (Redis Hashes)

Keys:
omni:agent:{topic}:{agent_name}     # Agent data (Hash)
omni:agents:topic:{topic}            # Agent names for topic (Set)
omni:topics                          # All topics (Set)
Data Structure:
HGETALL omni:agent:file.uploaded:file-processor
Output:
name: "file-processor"
callback_name: "process_file"
tools: "[\"file_reader\"]"      # JSON-encoded
description: "Processes uploads"
config: "{\"max_retries\": 3}"  # JSON-encoded
topic: "file.uploaded"
created_at: "1678901234.567"
Operations: Add Agent (Atomic):
async def add_agent(self, topic: str, agent_data: Dict[str, Any]):
    agent_name = agent_data.get("name")
    agent_key = self._key("agent", topic, agent_name)
    
    # Flatten agent data
    agent_flat = {
        "name": agent_name,
        "callback_name": agent_data.get("callback_name", ""),
        "tools": json.dumps(agent_data.get("tools", [])),
        "description": agent_data.get("description", ""),
        "config": json.dumps(agent_data.get("config", {})),
        "topic": topic,
        "created_at": time.time(),
    }
    
    # Atomic pipeline (all or nothing)
    async with self._redis.pipeline(transaction=True) as pipe:
        # Set agent data
        await pipe.hset(agent_key, mapping=agent_flat)
        
        # Add to topic index
        await pipe.sadd(self._key("agents", "topic", topic), agent_name)
        
        # Add topic to global set
        await pipe.sadd(self._key("topics"), topic)
        
        # Execute atomically
        await pipe.execute()
Get Agent:
async def get_agent(self, topic: str, agent_name: str):
    agent_key = self._key("agent", topic, agent_name)
    data = await self._redis.hgetall(agent_key)
    
    if not data:
        return None
    
    # Reconstruct agent
    return {
        "name": data.get("name"),
        "callback_name": data.get("callback_name"),
        "tools": json.loads(data.get("tools", "[]")),
        "description": data.get("description"),
        "config": json.loads(data.get("config", "{}")),
        "topic": data.get("topic"),
        "created_at": float(data.get("created_at", 0)),
    }
List Agents for Topic:
async def get_agents_by_topic(self, topic: str):
    # Get agent names from set
    agent_names = await self._redis.smembers(
        self._key("agents", "topic", topic)
    )
    
    # Fetch each agent
    agents = []
    for agent_name in agent_names:
        agent = await self.get_agent(topic, agent_name)
        if agent:
            agents.append(agent)
    
    return agents
Delete Agent (Atomic):
async def delete_agent(self, topic: str, agent_name: str):
    agent_key = self._key("agent", topic, agent_name)
    
    async with self._redis.pipeline(transaction=True) as pipe:
        # Delete agent data
        await pipe.delete(agent_key)
        
        # Remove from topic index
        await pipe.srem(
            self._key("agents", "topic", topic),
            agent_name
        )
        
        await pipe.execute()
    
    # If no more agents for topic, clean up
    count = await self._redis.scard(self._key("agents", "topic", topic))
    if count == 0:
        await self._redis.srem(self._key("topics"), topic)
        await self._redis.delete(self._key("agents", "topic", topic))

Result Storage (Redis Strings + TTL)

Keys:
omni:result:{task_id}       # Result data (String with TTL)
omni:results:index           # Sorted set (task_id → timestamp)
Operations: Save Result (with Native TTL):
async def save_result(self, task_id: str, result: Dict, ttl_seconds: Optional[int]):
    result_key = self._key("result", task_id)
    
    result_data = {
        "task_id": task_id,
        "result": result,
        "saved_at": time.time(),
    }
    
    result_json = json.dumps(result_data, default=str)
    
    if ttl_seconds:
        # Set with TTL (Redis native expiration!)
        await self._redis.setex(result_key, ttl_seconds, result_json)
    else:
        await self._redis.set(result_key, result_json)
    
    # Add to sorted set (for listing)
    await self._redis.zadd(
        self._key("results", "index"),
        {task_id: time.time()}
    )
Get Result (TTL handled by Redis):
async def get_result(self, task_id: str):
    result_key = self._key("result", task_id)
    data = await self._redis.get(result_key)
    
    if not data:
        # Remove from index if missing
        await self._redis.zrem(self._key("results", "index"), task_id)
        return None
    
    result_data = json.loads(data)
    return result_data.get("result")
List Recent Results:
async def list_results(self, limit: int = 100):
    # Get most recent task IDs from sorted set (reverse order)
    task_ids = await self._redis.zrevrange(
        self._key("results", "index"),
        0,              # Start
        limit - 1       # End
    )
    
    # Fetch each result
    results = []
    for task_id in task_ids:
        result_key = self._key("result", task_id)
        data = await self._redis.get(result_key)
        if data:
            results.append(json.loads(data))
    
    return results

Metrics Storage (Redis Streams)

Stream:
omni:metrics:stream
Why Redis Streams for Metrics?
  • ✅ Time-ordered (perfect for time-series)
  • ✅ Efficient (append-only log)
  • ✅ Durable (persisted)
  • ✅ Queryable (by time range)
  • ✅ Auto-trimming (MAXLEN)
Save Metric:
async def save_metric(self, metric_data: Dict[str, Any]):
    metric_data["saved_at"] = time.time()
    
    await self._redis.xadd(
        self._key("metrics", "stream"),
        {"data": json.dumps(metric_data, default=str)},
        maxlen=100_000,      # Keep last 100K metrics
        approximate=True     # Faster trimming
    )
Get Metrics:
async def get_metrics(self, topic: Optional[str] = None, limit: int = 1000):
    # Read from stream (most recent first)
    entries = await self._redis.xrevrange(
        self._key("metrics", "stream"),
        count=limit
    )
    
    metrics = []
    for entry_id, fields in entries:
        metric = json.loads(fields.get("data", "{}"))
        
        # Filter by topic if specified
        if topic and metric.get("topic") != topic:
            continue
        
        metric["stream_id"] = entry_id
        metrics.append(metric)
    
    return metrics

Configuration Storage (Redis Strings)

Keys:
omni:config:{key}
Operations:
async def save_config(self, key: str, value: Any):
    config_key = self._key("config", key)
    await self._redis.set(config_key, json.dumps(value, default=str))

async def get_config(self, key: str, default: Any = None):
    config_key = self._key("config", key)
    data = await self._redis.get(config_key)
    
    if data is None:
        return default
    
    try:
        return json.loads(data)
    except json.JSONDecodeError:
        return default

Health Check

async def health_check(self):
    try:
        # Measure latency
        start = time.time()
        await self._redis.ping()
        latency = (time.time() - start) * 1000  # ms
        
        # Get Redis stats
        info = await self._redis.info()
        
        return {
            "status": "healthy",
            "backend": "redis",
            "redis_url": self.redis_url,
            "connected": self._connected,
            "latency_ms": round(latency, 2),
            "redis_version": info.get("redis_version"),
            "used_memory": info.get("used_memory_human"),
            "connected_clients": info.get("connected_clients"),
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "backend": "redis",
            "error": str(e),
        }

Performance Characteristics

Throughput:
  • Writes: ~50,000 ops/sec (single instance)
  • Reads: ~100,000 ops/sec (single instance)
  • Cluster: >500,000 ops/sec
Latency:
  • Reads: <1ms
  • Writes: <1ms
  • Network: +1-10ms
Memory Usage:
  • In-memory (RAM)
  • Persistent to disk (optional: AOF, RDB)
  • ~1KB per agent
  • ~1KB per result
  • ~500 bytes per metric
Persistence Options:
# redis.conf

# AOF (Append-Only File) - Every write logged
appendonly yes
appendfsync everysec

# RDB (Snapshot) - Periodic snapshots
save 900 1
save 300 10
save 60 10000

Pros & Cons

✅ Pros:
  • ✅ High performance (sub-millisecond)
  • ✅ Distributed access (multiple processes/machines)
  • ✅ Native TTL (automatic expiration)
  • ✅ Atomic operations (thread-safe)
  • ✅ Persistence (AOF/RDB)
  • ✅ Replication & clustering
  • ✅ Efficient data structures
  • ✅ Battle-tested & mature
❌ Cons:
  • ❌ Requires Redis server
  • ❌ RAM usage (all data in memory)
  • ❌ Not human-readable (binary protocol)
  • ❌ Network latency (for remote Redis)
Use Cases:
  • Production deployments
  • Multi-instance setups
  • High throughput requirements
  • Distributed systems
  • Large-scale production (10K+ agents)

Comparison: JSON vs Redis

FeatureJSONRedis
SetupNo dependenciesRequires Redis server
PerformanceReads: Fast (in-memory) / Writes: Slow (disk I/O)Reads: Very fast / Writes: Very fast
Distributed❌ No✅ Yes
TTLManual cleanupNative (automatic)
Concurrent Access❌ Single process✅ Multiple processes/machines
PersistenceFilesAOF/RDB
Human-Readable✅ Yes (JSON)❌ No (binary)
Memory UsageAll in RAMAll in RAM
BackupCopy directoryRedis SAVE/BGSAVE
Use CaseDevelopment, TestingProduction

Data Lifecycle

Agent Persistence

Agents persist until explicitly deleted:
  • Created: sdk.register_agent()
  • Updated: sdk.register_agent() (same name)
  • Deleted: sdk.delete_agent() or sdk.delete_topic()
State Across Restarts:
  • ✅ Agent metadata persisted
  • ✅ Subscription configuration saved
  • ✅ Can restart runner and resume

Result Expiration (24 Hours)

Results auto-expire after 24 hours:
await sdk.publish_task(event_envelope)  # Returns task_id

# Result available for 24 hours
await sdk.get_result(task_id)  # ✅ Available

# After 24 hours
await sdk.get_result(task_id)  # ❌ None (expired)
Why 24 Hours?
  • ✅ Balance availability with storage efficiency
  • ✅ Most use cases retrieve results quickly
  • ✅ Prevents unbounded storage growth
  • ✅ Encourages proper result handling
Customizable:
# Custom TTL (1 hour)
await store.save_result(task_id, result, ttl_seconds=3600)

# No expiration
await store.save_result(task_id, result, ttl_seconds=None)

Metric Retention

Metrics kept up to limit:
  • JSON: Last 10,000 metrics
  • Redis: Last 100,000 metrics
Oldest metrics trimmed:
# JSON
if len(self._metrics) > 10_000:
    self._metrics = self._metrics[-10_000:]

# Redis
await self._redis.xadd(
    "omni:metrics:stream",
    {...},
    maxlen=100_000,      # Trim to 100K
    approximate=True
)

Monitoring Storage

Via CLI

# Check storage health
omnidaemon storage health

# Clear data
omnidaemon storage clear-agents
omnidaemon storage clear-results
omnidaemon storage clear-metrics
omnidaemon storage clear-all

Via SDK

# Health check
health = await sdk.storage_health()
print(health)

# Clear operations
agents_count = await sdk.clear_agents()
results_count = await sdk.clear_results()
metrics_count = await sdk.clear_metrics()
all_counts = await sdk.clear_all()

Via Redis CLI (Redis backend only)

# List all OmniDaemon keys
redis-cli KEYS omni:*

# Get agent
redis-cli HGETALL omni:agent:file.uploaded:file-processor

# Get result
redis-cli GET omni:result:550e8400-...

# Check result TTL
redis-cli TTL omni:result:550e8400-...

# Get metrics stream length
redis-cli XLEN omni:metrics:stream

# Inspect metrics
redis-cli XREVRANGE omni:metrics:stream + - COUNT 10

Best Practices

1. Choose Right Backend

# Development
STORAGE_BACKEND=json

# Production
STORAGE_BACKEND=redis

2. Monitor Storage Health

# Regular health checks
omnidaemon storage health

3. Handle Result Expiration

# Always check if result exists
result = await sdk.get_result(task_id)
if result is None:
    # Result expired or not found
    logger.warning(f"Result {task_id} not found")

4. Configure Appropriate TTL

# Quick tasks: Short TTL
await store.save_result(task_id, result, ttl_seconds=3600)  # 1 hour

# Long-lived results: Default TTL
await store.save_result(task_id, result)  # 24 hours

# Permanent results: No TTL
await store.save_result(task_id, result, ttl_seconds=None)

5. Clean Up Old Data

# Periodic cleanup
omnidaemon storage clear-results  # Clear expired results
omnidaemon storage clear-metrics  # Clear old metrics

6. Backup Regularly

JSON:
# Backup directory
cp -r .omnidaemon_data backups/omnidaemon_$(date +%Y%m%d)
Redis:
# Trigger backup
redis-cli BGSAVE

# Copy RDB file
cp /var/lib/redis/dump.rdb backups/redis_$(date +%Y%m%d).rdb

Future Backends

PostgreSQL 🚧

Configuration:
STORAGE_BACKEND=postgresql
POSTGRES_URL=postgresql://user:pass@localhost:5432/omnidaemon
Benefits:
  • ACID transactions
  • Complex queries
  • Full-text search
  • Mature ecosystem

MongoDB 🚧

Configuration:
STORAGE_BACKEND=mongodb
MONGODB_URI=mongodb://localhost:27017/omnidaemon
Benefits:
  • Schema flexibility
  • Document storage
  • Horizontal scaling
  • Aggregation pipelines

Amazon S3 🚧

Configuration:
STORAGE_BACKEND=s3
S3_BUCKET=omnidaemon-results
S3_REGION=us-east-1
Benefits:
  • Unlimited storage
  • Very cost-effective
  • High durability
  • Lifecycle policies

Further Reading


Summary

What’s Stored:
  1. Agent Registry - Metadata about registered agents (persistent)
  2. Task Results - Agent outputs (24-hour TTL)
  3. Metrics - Performance data (last 10K-100K entries)
  4. Configuration - System settings (persistent)
Backends:
  • JSON - File-based, development, single-instance
  • Redis - In-memory, production, distributed
Key Differences:
JSONRedis
SetupSimpleRequires server
PerformanceFast reads, slow writesVery fast both
DistributedNoYes
TTLManualAutomatic
Storage Size Estimates:
  • 1 agent = ~1KB
  • 1 result = ~1KB
  • 1 metric = ~500 bytes
Best Practices:
  • Use JSON for development
  • Use Redis for production
  • Monitor health regularly
  • Handle result expiration
  • Backup regularly
Storage is the foundation of OmniDaemon’s persistence layer! 💾✨