Skip to main content

Event Bus Architecture

This page provides a comprehensive deep dive into OmniDaemon’s event bus architecture, focusing on the Redis Streams implementation - the current production-ready backend.

What is the Event Bus?

The event bus is the message broker that delivers events from publishers to subscribers (agents). Think of it as the “nervous system” of your AI agent infrastructure. Key Responsibilities:
  • 📨 Receive events from publishers
  • 🚀 Deliver events to appropriate subscribers
  • 💾 Persist messages for durability
  • 🔄 Handle retries when processing fails
  • ⚖️ Load balance across multiple agent instances
  • 💀 Manage DLQ for failed messages
  • 📊 Track metrics for observability

Redis Streams Implementation

Overview

Redis Streams is a data structure that acts as an append-only log with powerful consumption semantics. It’s perfect for event-driven architectures. Why Redis Streams?
  • Durable - Messages persisted to disk
  • Reliable - At-least-once delivery guaranteed
  • Fast - Sub-millisecond latency
  • Scalable - Handles millions of messages/second
  • Consumer Groups - Built-in load balancing
  • Reclaiming - Automatic failure recovery
  • Simple - No separate broker to manage

Architecture Diagram

┌─────────────────────────────────────────────────────────────────┐
│                     RedisStreamEventBus                          │
│                                                                  │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐                │
│  │ Publisher  │  │ Publisher  │  │ Publisher  │                │
│  │    #1      │  │    #2      │  │    #3      │                │
│  └────────────┘  └────────────┘  └────────────┘                │
│         │               │               │                        │
│         └───────────────┼───────────────┘                        │
│                         │ XADD                                   │
│                         ▼                                        │
│              ┌──────────────────────┐                            │
│              │   Redis Stream       │                            │
│              │ omni-stream:{topic}  │                            │
│              │                      │                            │
│              │ • Message ID         │                            │
│              │ • Payload (JSON)     │                            │
│              │ • Metadata           │                            │
│              └──────────────────────┘                            │
│                         │                                        │
│                         │ XREADGROUP                             │
│         ┌───────────────┼───────────────┐                        │
│         ▼               ▼               ▼                        │
│  ┌────────────┐  ┌────────────┐  ┌────────────┐                │
│  │ Consumer 1 │  │ Consumer 2 │  │ Consumer 3 │                │
│  │ (Agent #1) │  │ (Agent #2) │  │ (Agent #3) │                │
│  └────────────┘  └────────────┘  └────────────┘                │
│         │               │               │                        │
│         └───────────────┼───────────────┘                        │
│                         │ Success: XACK                          │
│                         │ Failure: Pending → Reclaim → DLQ      │
│                         ▼                                        │
│              ┌──────────────────────┐                            │
│              │   Dead Letter Queue  │                            │
│              │ omni-dlq:{group}     │                            │
│              └──────────────────────┘                            │
└─────────────────────────────────────────────────────────────────┘

Core Components

1. Redis Stream

A Redis Stream is an append-only log where each entry has: Structure:
omni-stream:{topic}

├─ Message ID: 1678901234567-0
│  └─ data: {"content": {...}, "correlation_id": "...", ...}

├─ Message ID: 1678901234568-0
│  └─ data: {"content": {...}, "correlation_id": "...", ...}

└─ Message ID: 1678901234569-0
   └─ data: {"content": {...}, "correlation_id": "...", ...}
Message ID Format:
{timestamp_ms}-{sequence}
Example: 1678901234567-0

• timestamp_ms: Milliseconds since epoch
• sequence: Auto-incrementing sequence (for same millisecond)
Stream Naming:
stream_name = f"omni-stream:{topic}"

Examples:
  omni-stream:file.uploaded
  omni-stream:analysis.requested
  omni-stream:user.registered
Configuration:
class RedisStreamEventBus:
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        default_maxlen: int = 10_000,              # Max messages in stream
        reclaim_interval: int = 30,                 # Reclaim check interval (seconds)
        default_reclaim_idle_ms: int = 180_000,     # Message idle time (3 min)
        default_dlq_retry_limit: int = 3,           # Max retries before DLQ
    ):
Default Values:
  • default_maxlen: 10,000 messages (approximate)
  • reclaim_interval: 30 seconds
  • default_reclaim_idle_ms: 180,000 ms (3 minutes)
  • default_dlq_retry_limit: 3 retries

2. Consumer Groups

A consumer group is a set of consumers that process messages from the same stream. Redis ensures each message is delivered to only ONE consumer in the group. Creation:
# Automatic creation during subscription
stream_name = f"omni-stream:{topic}"
group_name = f"group:{topic}:{agent_name}"

await redis.xgroup_create(
    stream_name,   # Stream to consume from
    group_name,    # Unique group name
    id="$",        # Start from NEW messages ($ = end of stream)
    mkstream=True  # Create stream if doesn't exist
)
Group Naming Convention:
group_name = f"group:{topic}:{agent_name}"

Examples:
  group:file.uploaded:file-processor
  group:analysis.requested:analyzer-agent
  group:user.registered:welcome-emailer
Why Groups?
  • Load Balancing - Messages distributed across consumers
  • Fault Tolerance - If consumer dies, another picks up
  • At-Least-Once - Messages not lost even if consumer crashes
  • Position Tracking - Group remembers last processed message
Multiple Consumers in Group:
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
        config=SubscriptionConfig(
            consumer_count=3,  # 3 consumers in this group
        )
    )
)

# Results in:
# - Consumer: consumer:file-processor-1
# - Consumer: consumer:file-processor-2
# - Consumer: consumer:file-processor-3

3. Message Publishing

Publishing Process:
  1. Event Created:
    event = EventEnvelope(
        topic="file.uploaded",
        payload=PayloadBase(content={"filename": "doc.pdf"})
    )
    
  2. Serialized to JSON:
    data = {
        "content": event.payload.content,
        "webhook": event.payload.webhook,
        "reply_to": event.payload.reply_to,
        "topic": event.topic,
        "task_id": event.id,
        "correlation_id": event.correlation_id,
        "causation_id": event.causation_id,
        "source": event.source,
        "delivery_attempts": 0,
        "created_at": time.time(),
    }
    payload = json.dumps(data)
    
  3. Added to Stream:
    msg_id = await redis.xadd(
        name="omni-stream:file.uploaded",
        fields={"data": payload},
        maxlen=10_000,        # Trim to ~10K messages
        approximate=True      # Faster trimming
    )
    
XADD Parameters:
  • maxlen: Maximum stream length (older messages trimmed)
  • approximate: If True, trimming is faster but less precise (~)
Return Value:
msg_id = "1678901234567-0"  # Timestamp-Sequence format

4. Message Consumption

Consumption Loop:
while self._running:
    # Read up to 10 new messages, block for 5 seconds
    entries = await redis.xreadgroup(
        groupname="group:file.uploaded:file-processor",
        consumername="consumer:file-processor-1",
        streams={"omni-stream:file.uploaded": ">"},  # ">" = new messages
        count=10,                                      # Batch size
        block=5000,                                    # Block timeout (ms)
    )
    
    if not entries:
        continue  # No messages, loop again
    
    for stream_name, msgs in entries:
        for msg_id, fields in msgs:
            payload = json.loads(fields["data"])
            
            # Mark in-flight (prevent duplicate processing)
            self._in_flight[group_name].add(msg_id)
            
            try:
                # Call agent callback
                await callback(payload)
                
                # Acknowledge successful processing
                await redis.xack(stream_name, group_name, msg_id)
                
                # Emit metric
                await self._emit_monitor({
                    "topic": topic,
                    "event": "processed",
                    "msg_id": msg_id,
                    "timestamp": time.time(),
                })
            
            except Exception as e:
                # Leave in pending list for reclaiming
                logger.exception(f"Callback error: {e}")
            
            finally:
                # Remove from in-flight
                self._in_flight[group_name].discard(msg_id)
XREADGROUP Parameters:
  • groupname: Consumer group name
  • consumername: Individual consumer name (unique within group)
  • streams: Dict of {stream_name: start_id}
    • ">" = Only NEW messages (not pending)
    • "0" = All messages from beginning
  • count: Max messages per read (batch size)
  • block: Milliseconds to block waiting (0 = don’t block)
Return Format:
entries = [
    (
        "omni-stream:file.uploaded",  # Stream name
        [
            ("1678901234567-0", {"data": "{...}"}),  # Message 1
            ("1678901234568-0", {"data": "{...}"}),  # Message 2
        ]
    )
]

5. Pending Entries List (PEL)

The Pending Entries List tracks messages that have been delivered to consumers but not yet acknowledged. Why PEL?
  • Failure Recovery - Know which messages weren’t processed
  • Reclaiming - Reassign stuck messages to other consumers
  • Monitoring - See what’s in-flight
PEL Entry:
Message ID: 1678901234567-0
Consumer: consumer:file-processor-1
Delivered: 1678901250000 (timestamp)
Idle Time: 3500 ms
Delivery Count: 1
Checking PEL:
pending = await redis.xpending_range(
    name="omni-stream:file.uploaded",
    groupname="group:file.uploaded:file-processor",
    start="-",     # Start of PEL
    end="+",       # End of PEL
    count=50       # Max entries
)

# Returns list of pending messages with idle time

6. Message Reclaiming

If a consumer crashes or takes too long, its messages are reclaimed by another consumer. Reclaim Loop:
while self._running:
    # Get pending messages
    pending = await redis.xpending_range(
        stream_name, group_name, "-", "+", count=50
    )
    
    for entry in pending:
        msg_id = entry.get("message_id")
        idle = entry.get("time_since_delivered", 0)  # Milliseconds
        
        # Skip if not idle long enough
        if idle < reclaim_idle_ms:  # Default: 180,000 ms (3 min)
            continue
        
        # Skip if still being processed
        if msg_id in self._in_flight[group_name]:
            continue
        
        # Reclaim message
        claimed = await redis.xclaim(
            name=stream_name,
            groupname=group_name,
            consumername=consumer,
            min_idle_time=reclaim_idle_ms,
            message_ids=[msg_id],
        )
        
        if not claimed:
            continue
        
        # Track retry count
        retry_count = await redis.hincrby(
            f"retry_counts:{group_name}",
            msg_id,
            1  # Increment
        )
        
        # Check if exceeded max retries
        if retry_count > dlq_retry_limit:  # Default: 3
            # Send to Dead Letter Queue
            await self._send_to_dlq(group_name, msg_id, payload)
            await redis.xack(stream_name, group_name, msg_id)
            await redis.hdel(f"retry_counts:{group_name}", msg_id)
        else:
            # Retry processing
            try:
                await callback(payload)
                await redis.xack(stream_name, group_name, msg_id)
                await redis.hdel(f"retry_counts:{group_name}", msg_id)
            except Exception:
                # Will be reclaimed again
                logger.exception("Retry failed")
    
    await asyncio.sleep(reclaim_interval)  # Default: 30 seconds
Reclaim Configuration:
  • reclaim_idle_ms: How long before reclaiming (default: 180,000 ms = 3 min)
  • reclaim_interval: How often to check (default: 30 seconds)
  • dlq_retry_limit: Max retries before DLQ (default: 3)
In-Flight Tracking:
# Prevent duplicate processing during reclaim
self._in_flight[group_name] = set()

# Mark as in-flight when processing starts
self._in_flight[group_name].add(msg_id)

# Remove when processing completes (success or failure)
self._in_flight[group_name].discard(msg_id)

7. Dead Letter Queue (DLQ)

Messages that fail repeatedly go to the Dead Letter Queue for manual inspection. DLQ Stream:
dlq_stream = f"omni-dlq:{group_name}"

Example:
  omni-dlq:group:file.uploaded:file-processor
DLQ Entry Structure:
{
    "topic": "file.uploaded",
    "original_stream": "omni-stream:file.uploaded",
    "original_id": "1678901234567-0",
    "failed_message": {
        "content": {"filename": "doc.pdf"},
        "correlation_id": "req-123",
        "delivery_attempts": 4
    },
    "error": "Max retries (3) exceeded",
    "retry_count": 3,
    "failed_at": 1678901300.123
}
Sending to DLQ:
async def _send_to_dlq(
    self,
    group: str,
    stream_name: str,
    msg_id: str,
    payload: dict,
    error: str,
    retry_count: int,
):
    dlq_stream = f"omni-dlq:{group}"
    dlq_payload = {
        "topic": stream_name.replace("omni-stream:", ""),
        "original_stream": stream_name,
        "original_id": msg_id,
        "failed_message": payload,
        "error": error,
        "retry_count": retry_count,
        "failed_at": time.time(),
    }
    
    await self._redis.xadd(
        dlq_stream,
        {"data": json.dumps(dlq_payload)},
        maxlen=10_000,
        approximate=True,
    )
Inspecting DLQ:
# Via CLI
omnidaemon bus dlq --topic file.uploaded --limit 10

# Via SDK
dlq_entries = await sdk.inspect_dlq("file.uploaded", limit=10)

# Via Redis directly
redis-cli XREVRANGE omni-dlq:group:file.uploaded:file-processor + - COUNT 10

8. Acknowledgment (XACK)

Acknowledging a message tells Redis it was successfully processed and can be removed from the PEL. When to Acknowledge:
try:
    # Process message
    await callback(payload)
    
    # Success! Acknowledge
    await redis.xack(stream_name, group_name, msg_id)
    
except RetriableError:
    # DON'T acknowledge - leave in PEL for reclaiming
    logger.warning("Retriable error, will retry")
    raise

except PermanentError:
    # Acknowledge to prevent retries
    await redis.xack(stream_name, group_name, msg_id)
    logger.error("Permanent error, acknowledged to skip")
XACK Behavior:
  • ✅ Removes message from PEL
  • ✅ Message no longer redelivered
  • ✅ Group’s “last delivered ID” advances

9. Metrics Tracking

All events are tracked in a dedicated metrics stream. Metrics Stream:
stream_name = "omni-metrics"
Metric Events:
# Message processed successfully
{
    "topic": "file.uploaded",
    "event": "processed",
    "msg_id": "1678901234567-0",
    "group": "group:file.uploaded:file-processor",
    "consumer": "consumer:file-processor-1",
    "timestamp": 1678901300.123
}

# Message reclaimed
{
    "topic": "file.uploaded",
    "event": "reclaimed",
    "msg_id": "1678901234567-0",
    "group": "group:file.uploaded:file-processor",
    "consumer": "consumer:file-processor-2",
    "timestamp": 1678901330.456
}

# Message sent to DLQ
{
    "topic": "file.uploaded",
    "event": "dlq_push",
    "msg_id": "1678901234567-0",
    "group": "group:file.uploaded:file-processor",
    "consumer": "consumer:file-processor-2",
    "timestamp": 1678901360.789
}
Emitting Metrics:
async def _emit_monitor(self, metric: dict):
    await self._redis.xadd(
        "omni-metrics",
        {"data": json.dumps(metric)},
        maxlen=1_000_000,  # Keep up to 1M metrics
        approximate=True
    )

Redis Commands Used

XADD - Add Message to Stream

XADD omni-stream:file.uploaded * data '{"content": {...}}'
Parameters:
  • Stream name
  • * = Auto-generate message ID
  • Field-value pairs
Returns: Message ID (e.g., 1678901234567-0)

XREADGROUP - Read from Group

XREADGROUP GROUP group:file.uploaded:processor consumer-1
  COUNT 10 BLOCK 5000 STREAMS omni-stream:file.uploaded >
Parameters:
  • GROUP group consumer
  • COUNT - Max messages
  • BLOCK - Timeout (ms)
  • STREAMS - Stream and start ID (> = new only)
Returns: List of messages

XACK - Acknowledge Message

XACK omni-stream:file.uploaded group:file.uploaded:processor 1678901234567-0
Parameters:
  • Stream name
  • Group name
  • Message ID(s)
Returns: Count of acknowledged messages

XPENDING - Get Pending Messages

XPENDING omni-stream:file.uploaded group:file.uploaded:processor - + 50
Parameters:
  • Stream name
  • Group name
  • Start ID (- = beginning)
  • End ID (+ = end)
  • Count
Returns: List of pending messages with idle time

XCLAIM - Reclaim Message

XCLAIM omni-stream:file.uploaded group:file.uploaded:processor consumer-2
  180000 1678901234567-0
Parameters:
  • Stream name
  • Group name
  • New consumer name
  • Min idle time (ms)
  • Message ID(s)
Returns: Claimed messages

XGROUP CREATE - Create Consumer Group

XGROUP CREATE omni-stream:file.uploaded group:file.uploaded:processor $ MKSTREAM
Parameters:
  • CREATE stream group start-id
  • MKSTREAM - Create stream if doesn’t exist
Returns: OK

XGROUP DESTROY - Delete Consumer Group

XGROUP DESTROY omni-stream:file.uploaded group:file.uploaded:processor
Returns: OK

XINFO GROUPS - List Consumer Groups

XINFO GROUPS omni-stream:file.uploaded
Returns: List of groups with stats

Performance Characteristics

Throughput

Single Redis Instance:
  • Publishing: ~100,000 messages/second
  • Consuming: ~50,000 messages/second per consumer
  • With Persistence: ~50,000 messages/second
Redis Cluster:
  • Publishing: >1,000,000 messages/second
  • Consuming: Scales linearly with consumers

Latency

End-to-End Latency:
  • Publish to Deliver: <10ms (typical)
  • Publish to Process: <50ms (typical)
  • With Network: <100ms (typical)
Breakdown:
  • XADD: <1ms
  • XREADGROUP: <1ms
  • Network: 1-10ms
  • Callback execution: Variable (depends on your agent)

Memory Usage

Per Message:
  • ~1 KB average (depends on payload size)
Stream Overhead:
  • Minimal (<10% of message size)
Pending Entries:
  • Stored in memory
  • ~100 bytes per pending message
Example:
10,000 messages in stream = ~10 MB
+ 1,000 pending messages = ~100 KB
+ Consumer group metadata = ~10 KB
─────────────────────────────────────
Total: ~10.1 MB

Persistence

AOF (Append-Only File):
  • Every write logged to disk
  • Slower but safer (no data loss)
  • Recommended for production
RDB (Snapshot):
  • Periodic snapshots
  • Faster but risk of data loss
  • OK for development
Configuration:
# redis.conf

# AOF (recommended for production)
appendonly yes
appendfsync everysec  # Balance between safety and speed

# RDB (optional, as backup)
save 900 1      # After 900 sec if ≥1 key changed
save 300 10     # After 300 sec if ≥10 keys changed
save 60 10000   # After 60 sec if ≥10000 keys changed

Monitoring Event Bus

Via CLI

# List all streams
omnidaemon bus list

# Inspect messages in stream
omnidaemon bus inspect --stream file.uploaded --limit 10

# List consumer groups
omnidaemon bus groups --stream file.uploaded

# Inspect DLQ
omnidaemon bus dlq --topic file.uploaded --limit 10

# Get comprehensive stats
omnidaemon bus stats

Via SDK

# List streams
streams = await sdk.list_streams()
# [{"stream": "omni-stream:file.uploaded", "length": 1500}, ...]

# Inspect stream
messages = await sdk.inspect_stream("file.uploaded", limit=10)
# [{"id": "1678901234567-0", "data": {...}}, ...]

# List groups
groups = await sdk.list_groups("file.uploaded")
# [{"name": "group:...", "consumers": 3, "pending": 5}, ...]

# Inspect DLQ
dlq = await sdk.inspect_dlq("file.uploaded", limit=10)
# [{"id": "...", "data": {...}}, ...]

# Get stats
stats = await sdk.get_bus_stats()
# {"snapshot": {...}, "redis_info": {...}}

Via Redis CLI

# List all streams
redis-cli KEYS omni-stream:*

# Get stream length
redis-cli XLEN omni-stream:file.uploaded

# Get consumer groups
redis-cli XINFO GROUPS omni-stream:file.uploaded

# Get pending count
redis-cli XPENDING omni-stream:file.uploaded group:file.uploaded:processor

# Inspect messages
redis-cli XREVRANGE omni-stream:file.uploaded + - COUNT 10

Advanced Configuration

Custom Consumer Count

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
        config=SubscriptionConfig(
            consumer_count=5,  # 5 parallel consumers
        )
    )
)
Effect:
  • 5 consumers in the group
  • Load distributed across all 5
  • Higher throughput

Custom Reclaim Settings

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
        config=SubscriptionConfig(
            reclaim_idle_ms=60000,  # 1 minute (faster reclaim)
        )
    )
)
Use Case: Quick reclaim for fast-failing tasks

Custom Retry Limit

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
        config=SubscriptionConfig(
            max_retries=5,  # More retries before DLQ
        )
    )
)
Use Case: Transient errors (network issues, rate limits)

Custom Stream Max Length

class RedisStreamEventBus:
    def __init__(
        self,
        default_maxlen=100_000,  # Keep 100K messages
    ):
Use Case: High-volume topics needing more history

Best Practices

1. Choose Appropriate reclaim_idle_ms

# Fast tasks (< 1 minute)
reclaim_idle_ms=60000  # 1 minute

# Medium tasks (1-5 minutes)
reclaim_idle_ms=300000  # 5 minutes (default)

# Long tasks (5+ minutes)
reclaim_idle_ms=600000  # 10 minutes

2. Set Appropriate max_retries

# Idempotent operations (safe to retry)
max_retries=5

# Non-idempotent operations (risky to retry)
max_retries=1

# Transient errors expected
max_retries=10

3. Monitor DLQ

# Check DLQ regularly
omnidaemon bus dlq --topic file.uploaded

# Investigate failures
# Fix issues
# Manually retry if needed

4. Configure Persistence

# Production: AOF enabled
appendonly yes
appendfsync everysec

# Development: RDB only
appendonly no
save 60 1

5. Scale with Consumer Count

# Low traffic
consumer_count=1

# Medium traffic
consumer_count=3-5

# High traffic
consumer_count=10+

Troubleshooting

Messages Not Being Delivered

# Check if stream exists
redis-cli EXISTS omni-stream:file.uploaded

# Check consumer groups
redis-cli XINFO GROUPS omni-stream:file.uploaded

# Check if agents subscribed
omnidaemon agent list

Messages Stuck in Pending

# Check pending count
redis-cli XPENDING omni-stream:file.uploaded group:file.uploaded:processor

# Force reclaim
# Lower reclaim_idle_ms temporarily

High DLQ Count

# Inspect DLQ
omnidaemon bus dlq --topic file.uploaded

# Common causes:
# - Bad data (permanent errors)
# - Service down (transient errors)
# - Timeout too low
# - max_retries too low

Slow Processing

# Check consumer count
omnidaemon bus groups --stream file.uploaded

# Increase consumers
consumer_count=10

Further Reading


Summary

Key Components:
  • Redis Stream - Append-only log for messages
  • Consumer Group - Load balancing and fault tolerance
  • Pending Entries List (PEL) - Track unacknowledged messages
  • Message Reclaiming - Automatic failure recovery
  • Dead Letter Queue (DLQ) - Failed message storage
Key Redis Commands:
  • XADD - Publish message
  • XREADGROUP - Consume message
  • XACK - Acknowledge message
  • XCLAIM - Reclaim message
  • XPENDING - Check pending messages
Configuration:
  • default_maxlen: 10,000 messages
  • reclaim_interval: 30 seconds
  • default_reclaim_idle_ms: 180,000 ms (3 min)
  • default_dlq_retry_limit: 3 retries
Performance:
  • Throughput: ~100K msgs/sec (single instance)
  • Latency: <10ms (typical)
  • Memory: ~1KB per message
Redis Streams provides a robust, scalable foundation for OmniDaemon’s event-driven architecture! 🚀