Skip to main content

Pluggable Architecture

One of OmniDaemon’s most powerful features is its pluggable architecture. You can swap event bus and storage backends without changing a single line of your agent code.

The Simple Truth

You provide the connection URL, OmniDaemon handles EVERYTHING else!
# Your agent code NEVER changes
# Just update environment variables:

EVENT_BUS_TYPE=redis_stream
REDIS_URL=redis://localhost:6379

# Or switch to Kafka (when available):
EVENT_BUS_TYPE=kafka
KAFKA_SERVERS=localhost:9092

# Or RabbitMQ (when available):
EVENT_BUS_TYPE=rabbitmq
RABBITMQ_URL=amqp://localhost:5672
Your agent code stays exactly the same!

How It Works

OmniDaemon uses Dependency Injection to automatically configure backends based on environment variables.

Your Code

from omnidaemon import OmniDaemonSDK

# That's it! No backend configuration in code
sdk = OmniDaemonSDK()

# Works with ANY backend!
await sdk.register_agent(...)
await sdk.publish_task(...)

Behind the Scenes

When you create an OmniDaemonSDK(), OmniDaemon:
  1. Reads environment variables
    EVENT_BUS_TYPE = os.getenv("EVENT_BUS_TYPE", "redis_stream")
    STORAGE_BACKEND = os.getenv("STORAGE_BACKEND", "json")
    
  2. Loads the appropriate backend classes
    if EVENT_BUS_TYPE == "redis_stream":
        event_bus = RedisStreamEventBus(redis_url=REDIS_URL)
    elif EVENT_BUS_TYPE == "kafka":
        event_bus = KafkaEventBus(servers=KAFKA_SERVERS)
    # etc...
    
  3. Injects them into the SDK
    sdk.event_bus = event_bus
    sdk.store = store
    
  4. You just use the SDK! 🎉

Event Bus Backends

Currently Supported

Redis Streams ✅

Configuration:
EVENT_BUS_TYPE=redis_stream
REDIS_URL=redis://localhost:6379
Features:
  • ✅ Message persistence
  • ✅ Consumer groups
  • ✅ At-least-once delivery
  • ✅ Dead letter queue
  • ✅ Message reclaiming
  • ✅ Horizontal scaling
Use Case: Production-ready for most workloads Performance:
  • Throughput: ~100K msgs/sec per instance
  • Latency: <10ms
  • Storage: RAM + optional disk persistence

Coming Soon

Apache Kafka 🚧

Configuration:
EVENT_BUS_TYPE=kafka
KAFKA_SERVERS=localhost:9092
Features:
  • 🚧 Extremely high throughput
  • 🚧 Log-based persistence
  • 🚧 Exactly-once semantics
  • 🚧 Topic partitioning
  • 🚧 Long-term message retention
Use Case: High-throughput, mission-critical workloads Performance:
  • Throughput: >1M msgs/sec per broker
  • Latency: <50ms
  • Storage: Disk-based, configurable retention

RabbitMQ 🚧

Configuration:
EVENT_BUS_TYPE=rabbitmq
RABBITMQ_URL=amqp://localhost:5672
Features:
  • 🚧 Flexible routing
  • 🚧 AMQP protocol
  • 🚧 Multiple exchange types
  • 🚧 Priority queues
  • 🚧 Message TTL
Use Case: Complex routing requirements Performance:
  • Throughput: ~50K msgs/sec per node
  • Latency: <20ms
  • Storage: RAM or disk

NATS JetStream 🚧

Configuration:
EVENT_BUS_TYPE=nats
NATS_URL=nats://localhost:4222
Features:
  • 🚧 Cloud-native design
  • 🚧 Very lightweight
  • 🚧 Geographic distribution
  • 🚧 At-least-once and exactly-once
  • 🚧 Stream and KV storage
Use Case: Edge computing, IoT, cloud-native apps Performance:
  • Throughput: ~500K msgs/sec per server
  • Latency: <5ms
  • Storage: Memory or file-based

Storage Backends

Currently Supported

JSON ✅

Configuration:
STORAGE_BACKEND=json
JSON_STORAGE_DIR=.omnidaemon_data
Features:
  • ✅ File-based storage
  • ✅ Human-readable
  • ✅ No external dependencies
  • ✅ Easy to backup
Use Case: Development, testing, single-machine deployments Limitations:
  • ❌ Not distributed
  • ❌ No concurrent access from multiple machines
  • ❌ Limited scalability

Redis ✅

Configuration:
STORAGE_BACKEND=redis
REDIS_URL=redis://localhost:6379
REDIS_KEY_PREFIX=omni
Features:
  • ✅ In-memory performance
  • ✅ Distributed access
  • ✅ TTL support (24h for results)
  • ✅ Atomic operations
  • ✅ Replication & clustering
Use Case: Production deployments, multi-machine setups Performance:
  • Read: <1ms
  • Write: <1ms
  • Storage: RAM (with optional persistence)

Coming Soon

PostgreSQL 🚧

Configuration:
STORAGE_BACKEND=postgresql
POSTGRES_URL=postgresql://user:pass@localhost:5432/omnidaemon
Features:
  • 🚧 ACID transactions
  • 🚧 Complex queries
  • 🚧 JSON columns (for flexibility)
  • 🚧 Full-text search
  • 🚧 Mature tooling
Use Case: Enterprise deployments, compliance requirements Performance:
  • Read: <10ms
  • Write: <20ms
  • Storage: Disk-based

MongoDB 🚧

Configuration:
STORAGE_BACKEND=mongodb
MONGODB_URI=mongodb://localhost:27017/omnidaemon
Features:
  • 🚧 Schema flexibility
  • 🚧 Document storage
  • 🚧 Horizontal scaling
  • 🚧 Aggregation pipelines
  • 🚧 TTL indexes
Use Case: Flexible schemas, document-oriented data Performance:
  • Read: <5ms
  • Write: <10ms
  • Storage: Disk-based with memory mapping

Amazon S3 🚧

Configuration:
STORAGE_BACKEND=s3
S3_BUCKET=omnidaemon-results
S3_REGION=us-east-1
Features:
  • 🚧 Unlimited storage
  • 🚧 High durability (99.999999999%)
  • 🚧 Cost-effective for large volumes
  • 🚧 Lifecycle policies
  • 🚧 Cross-region replication
Use Case: Long-term result storage, large-scale deployments Performance:
  • Read: <100ms
  • Write: <200ms
  • Storage: Object storage (pay per GB)

Switching Backends

Example: Development to Production

Development (Local Machine):
# .env.dev
STORAGE_BACKEND=json
JSON_STORAGE_DIR=.dev_data
EVENT_BUS_TYPE=redis_stream
REDIS_URL=redis://localhost:6379
Staging (Test Environment):
# .env.staging
STORAGE_BACKEND=redis
REDIS_URL=redis://staging-redis.example.com:6379
EVENT_BUS_TYPE=redis_stream
Production (Multi-Region):
# .env.prod
STORAGE_BACKEND=postgresql
POSTGRES_URL=postgresql://prod-db.example.com:5432/omnidaemon

EVENT_BUS_TYPE=kafka
KAFKA_SERVERS=kafka1.example.com:9092,kafka2.example.com:9092
Your agent code? Exactly the same!
# Works in dev, staging, AND production
from omnidaemon import OmniDaemonSDK

sdk = OmniDaemonSDK()  # Auto-configured!

await sdk.register_agent(...)  # Same code
await sdk.publish_task(...)     # Same code

Architecture Overview

┌────────────────────────────────────────────────────────┐
│                  Your Application                       │
│  (Same code works with ALL backends!)                  │
└────────────────────────────────────────────────────────┘

                         │ Uses OmniDaemonSDK

┌────────────────────────────────────────────────────────┐
│                  OmniDaemon Core                        │
│               (Dependency Injection)                    │
└────────────────────────────────────────────────────────┘

         ┌───────────────┴───────────────┐
         │                               │
         ▼                               ▼
┌──────────────────┐          ┌──────────────────┐
│   Event Bus      │          │    Storage       │
│  (Pluggable!)    │          │  (Pluggable!)    │
├──────────────────┤          ├──────────────────┤
│ • Redis Streams  │          │ • JSON           │
│ • Kafka          │          │ • Redis          │
│ • RabbitMQ       │          │ • PostgreSQL     │
│ • NATS           │          │ • MongoDB        │
└──────────────────┘          └──────────────────┘

Benefits of Pluggability

1. No Vendor Lock-In

Start with Redis → Grow to Kafka → Add PostgreSQL
         ↓              ↓                  ↓
    Same code      Same code         Same code

2. Optimize for Your Workload

Development:    JSON (simple, no dependencies)
Staging:        Redis (fast, distributed)
Production:     Kafka + PostgreSQL (high-throughput, durable)

3. Easy Migration

# Phase 1: Run both backends simultaneously
STORAGE_BACKEND=redis        # Write to Redis
STORAGE_BACKEND_BACKUP=postgresql  # Also write to PostgreSQL

# Phase 2: Switch reads to new backend
# (Validate data integrity)

# Phase 3: Switch completely
STORAGE_BACKEND=postgresql   # Now primary

4. Cost Optimization

Dev/Test: Local Redis (free)

Production Small: Managed Redis ($50/month)

Production Large: Kafka cluster ($500/month)

Enterprise: Kafka + PostgreSQL + S3 (optimized per use case)

5. Compliance & Data Residency

# US deployment
EVENT_BUS_TYPE=kafka
KAFKA_SERVERS=us-east-1.kafka.example.com:9092
POSTGRES_URL=postgresql://us-east-1.db.example.com:5432/omni

# EU deployment (data stays in EU)
EVENT_BUS_TYPE=kafka
KAFKA_SERVERS=eu-west-1.kafka.example.com:9092
POSTGRES_URL=postgresql://eu-west-1.db.example.com:5432/omni

# Same agent code!

How to Choose a Backend

Event Bus Selection

If you need…Choose…
Simple setup, good performanceRedis Streams
Extremely high throughput (>100K msgs/sec)Kafka
Complex routing patternsRabbitMQ
Cloud-native, edge deploymentNATS
Cost-effective, fastRedis Streams

Storage Selection

If you need…Choose…
Local developmentJSON
Fast, distributed accessRedis
Complex queries, reportingPostgreSQL
Flexible schemasMongoDB
Long-term, cheap storageS3

Hybrid Approach

You can use different backends for different purposes:
# Fast event bus
EVENT_BUS_TYPE=kafka
KAFKA_SERVERS=localhost:9092

# Fast storage for agents & config
STORAGE_BACKEND=redis
REDIS_URL=redis://localhost:6379

# Long-term result storage (not yet implemented)
RESULT_STORAGE=s3
S3_BUCKET=omnidaemon-results

Implementation Details

Backend Registry

OmniDaemon maintains registries of available backends:
# Event Bus Registry
EVENT_BUS_BACKENDS = {
    "redis_stream": RedisStreamEventBus,
    "kafka": KafkaEventBus,         # Coming soon
    "rabbitmq": RabbitMQEventBus,   # Coming soon
    "nats": NATSEventBus,           # Coming soon
}

# Storage Registry
STORAGE_BACKENDS = {
    "json": JSONStore,
    "redis": RedisStore,
    "postgresql": PostgreSQLStore,   # Coming soon
    "mongodb": MongoDBStore,         # Coming soon
    "s3": S3Store,                   # Coming soon
}

Factory Pattern

Backends are created using a factory:
def create_event_bus(backend_name: str, **kwargs) -> BaseEventBus:
    backend_cls = EVENT_BUS_BACKENDS.get(backend_name.lower())
    if not backend_cls:
        raise ValueError(f"Unsupported backend: {backend_name}")
    return backend_cls(**kwargs)

def create_store(backend_name: str, **kwargs) -> BaseStore:
    backend_cls = STORAGE_BACKENDS.get(backend_name.lower())
    if not backend_cls:
        raise ValueError(f"Unsupported backend: {backend_name}")
    return backend_cls(**kwargs)

Module-Level Instances

Pre-configured instances are created at module load:
# omnidaemon/event_bus/__init__.py
_backend_type = config("EVENT_BUS_TYPE", default="redis_stream")

if _backend_type == "redis_stream":
    redis_url = config("REDIS_URL", default="redis://localhost:6379")
    event_bus = create_event_bus("redis_stream", redis_url=redis_url)
# ... etc

# omnidaemon/storage/__init__.py
_backend_type = config("STORAGE_BACKEND", default="json")

if _backend_type == "json":
    storage_dir = config("JSON_STORAGE_DIR", default=".omnidaemon_data")
    store = create_store("json", storage_dir=storage_dir)
# ... etc

Best Practices

1. Use Environment-Specific Configs

# .env.dev
STORAGE_BACKEND=json
EVENT_BUS_TYPE=redis_stream

# .env.staging
STORAGE_BACKEND=redis
EVENT_BUS_TYPE=redis_stream

# .env.prod
STORAGE_BACKEND=postgresql
EVENT_BUS_TYPE=kafka

2. Test with Production-Like Setup

# Even in staging, use production backends
STORAGE_BACKEND=postgresql  # Same as prod
EVENT_BUS_TYPE=kafka        # Same as prod

# Just smaller/cheaper instances
POSTGRES_URL=staging-db.example.com
KAFKA_SERVERS=staging-kafka.example.com

3. Monitor Backend Performance

# Check event bus stats
omnidaemon bus stats

# Check storage health
omnidaemon storage health

# Overall health
omnidaemon health

4. Plan for Migration

# Start with simple
STORAGE_BACKEND=json

# Move to distributed when needed
STORAGE_BACKEND=redis

# Optimize for scale when ready
STORAGE_BACKEND=postgresql
EVENT_BUS_TYPE=kafka

5. Document Your Backend Choices

# infrastructure.yaml
environments:
  development:
    event_bus: redis_stream  # Simple, fast
    storage: json            # No dependencies
    
  staging:
    event_bus: redis_stream  # Same as dev
    storage: redis           # Test distributed
    
  production:
    event_bus: kafka         # High throughput
    storage: postgresql      # Durable, queryable

Troubleshooting

Backend Not Found

# Error: "Unsupported event bus backend: kafk"
EVENT_BUS_TYPE=kafk  # Typo!

# Fix: Check backend name
EVENT_BUS_TYPE=kafka  # Correct

Connection Refused

# Check if backend service is running
redis-cli ping                    # For Redis
kafka-topics.sh --list ...        # For Kafka
psql -h localhost -U user -d db   # For PostgreSQL

Performance Issues

# Profile your backend
omnidaemon metrics                # Task processing times
omnidaemon bus stats              # Event bus stats
omnidaemon storage health         # Storage health

Further Reading


Summary

Key Points:
  • No Code Changes - Your agent code never changes
  • Environment Variables - Just update config
  • Dependency Injection - OmniDaemon handles wiring
  • Multiple Backends - Redis, Kafka, PostgreSQL, and more
  • Easy Migration - Start simple, scale when needed
  • No Vendor Lock-In - Switch anytime
Current Options:
  • Event Bus: Redis Streams
  • Storage: JSON, Redis
Coming Soon:
  • Event Bus: Kafka, RabbitMQ, NATS 🚧
  • Storage: PostgreSQL, MongoDB, S3 🚧
The Promise:
“You provide the URL, OmniDaemon handles EVERYTHING else!”