Skip to main content

Event-Driven Architecture (EDA)

Event-Driven Architecture is the foundational pattern that makes OmniDaemon powerful. This page explains what EDA is, why it’s essential for AI agents, and how OmniDaemon implements it.

What is Event-Driven Architecture?

In traditional architectures, components talk directly to each other (request-response):
┌─────────┐                    ┌─────────┐
│Component│ ──HTTP Request──▶  │Component│
│    A    │ ◀─HTTP Response──  │    B    │
└─────────┘                    └─────────┘
In Event-Driven Architecture, components communicate through events:
┌─────────┐                    ┌─────────┐
│Component│                    │Component│
│    A    │                    │    B    │
└─────────┘                    └─────────┘
     │                              ▲
     │ Publishes Event              │ Subscribes
     ▼                              │
┌────────────────────────────────────────┐
│           Event Bus                     │
│   (Redis Streams, Kafka, RabbitMQ...)  │
└────────────────────────────────────────┘

Key Differences

AspectRequest-ResponseEvent-Driven
CouplingTight (A knows about B)Loose (A doesn’t know who listens)
SynchronousYes (A waits for B)No (A continues immediately)
ScalabilityVertical (scale both)Horizontal (scale independently)
ResilienceBrittle (if B fails, A fails)Resilient (failures isolated)
FlexibilityRigid (hard to add C)Flexible (easy to add listeners)

Why Event-Driven for AI Agents?

The Sean Falconer article “The Future of AI Agents is Event-Driven” explains this brilliantly:
“Agents need access to data, tools, and the ability to share information across systems, with their outputs available for use by multiple services — including other agents. This isn’t an AI problem; it’s an infrastructure and data interoperability problem.”

Agents Are Like Microservices

Just as microservices transformed how we build applications, agentic AI requires similar architectural patterns: Microservices Need:
  • Independent deployment
  • Inter-service communication
  • Loose coupling
  • Horizontal scaling
  • Resilience to failures
AI Agents Need:
  • Independent operation
  • Inter-agent communication
  • Loose coupling
  • Horizontal scaling
  • Resilience to failures
  • Plus: Context-rich information flow

The Tight Coupling Problem

If you connect agents via direct API calls:
❌ TIGHT COUPLING
┌──────────────┐
│ Agent 1      │───API Call───▶ ┌──────────────┐
│ (Researcher) │                 │ Agent 2      │
└──────────────┘                 │ (Analyzer)   │
                                 └──────────────┘

                                      │ API Call

                                 ┌──────────────┐
                                 │ Agent 3      │
                                 │ (Reporter)   │
                                 └──────────────┘

Problems:
1. If Agent 2 fails, Agent 1 fails
2. Can't scale agents independently
3. Agent 1 needs to know Agent 2's address
4. Changes in Agent 2 affect Agent 1
5. Can't easily add new agents

The Event-Driven Solution

With EDA, agents communicate through events:
✅ EVENT-DRIVEN
                   ┌──────────────────────┐
                   │     Event Bus        │
                   │  (Redis Streams)     │
                   └──────────────────────┘
                    ↑        ↑        ↑
        ┌───────────┘        │        └───────────┐
        │                    │                    │
┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│ Agent 1      │    │ Agent 2      │    │ Agent 3      │
│ (Researcher) │    │ (Analyzer)   │    │ (Reporter)   │
└──────────────┘    └──────────────┘    └──────────────┘

Benefits:
✅ Agents don't depend on each other
✅ Scale each agent independently
✅ Agents don't need to know about each other
✅ Changes in one agent don't affect others
✅ Easy to add new agents (just subscribe to events)
✅ Multiple agents can react to same event

Core EDA Concepts in OmniDaemon

1. Events

An event is a message representing something that happened:
{
    "topic": "user.registered",
    "content": {
        "user_id": "12345",
        "email": "[email protected]",
        "timestamp": "2025-03-12T10:30:00Z"
    },
    "correlation_id": "req-abc-123",
    "source": "auth-service"
}
Event Characteristics:
  • Immutable (can’t be changed after published)
  • Timestamped (when it happened)
  • Semantic (describes what happened, not what to do)
  • Context-rich (includes metadata for routing/processing)

2. Topics

A topic is a channel for related events:
user.registered     ← All user registration events
order.placed        ← All order placement events
file.uploaded       ← All file upload events
analysis.requested  ← All analysis request events
Topic Naming Conventions:
  • Use dot notation: resource.action
  • Past tense for events: user.created not user.create
  • Specific not generic: order.shipped not order.event

3. Publishers

A publisher sends events to a topic:
# Service publishes event
await sdk.publish_task(
    event_envelope=EventEnvelope(
        topic="file.uploaded",
        payload=PayloadBase(
            content={"filename": "doc.pdf", "size": 1024}
        )
    )
)
Publisher Characteristics:
  • Doesn’t know who’s listening (loose coupling)
  • Fire and forget (doesn’t wait for response)
  • Non-blocking (continues immediately)

4. Subscribers (Agents)

A subscriber (agent) listens to topics and processes events:
# Agent subscribes to topic
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
    )
)
Subscriber Characteristics:
  • Autonomous (runs independently)
  • Reactive (responds to events)
  • Scalable (can run multiple instances)

5. Event Bus

The event bus is the message broker that delivers events:
┌──────────────────────────────────────────────┐
│             Event Bus                         │
│                                               │
│  • Receives events from publishers           │
│  • Routes events to subscribers              │
│  • Persists messages (durability)            │
│  • Handles retries                           │
│  • Manages consumer groups                   │
│  • Dead letter queue for failures            │
└──────────────────────────────────────────────┘
OmniDaemon’s Pluggable Event Bus:
  • Currently: Redis Streams (production-ready)
  • 🚧 Coming: Kafka, RabbitMQ, NATS
You provide the URL, OmniDaemon handles the rest:
# Switch event bus - NO CODE CHANGES!
EVENT_BUS_TYPE=redis_stream
REDIS_URL=redis://localhost:6379

EVENT_BUS_TYPE=kafka
KAFKA_SERVERS=localhost:9092

How OmniDaemon Implements EDA

Message Flow

Here’s what happens when you publish an event:
1. Publisher creates event
   └─▶ EventEnvelope created with topic, content, metadata

2. SDK publishes to event bus
   └─▶ Event serialized and sent to Redis Stream

3. Event bus persists message
   └─▶ Message stored durably (won't be lost)

4. Event bus notifies consumer groups
   └─▶ All groups subscribed to topic are notified

5. Agent(s) receive message
   └─▶ One agent per consumer group processes it

6. Agent executes callback
   └─▶ Your AI agent logic runs

7. Result stored (optional)
   └─▶ Output saved to storage backend

8. Success acknowledged
   └─▶ Message removed from processing queue

Consumer Groups

When multiple agents subscribe to the same topic, they form a consumer group:
Event Published to "file.uploaded"


    ┌──────────────┐
    │  Event Bus   │
    └──────────────┘

            ├─────────┬─────────┬─────────┐
            ▼         ▼         ▼         ▼
       Agent 1   Agent 2   Agent 3   Agent 4
     (Instance) (Instance) (Instance) (Instance)
Load Balancing Rules:
  • ✅ Each message delivered to only ONE agent in the group
  • ✅ Load distributed automatically
  • ✅ If agent fails, another agent picks up the message
  • ✅ Consumer group persists (messages not lost if all agents stop)

Message Durability

OmniDaemon ensures messages aren’t lost:
┌─────────────────────────────────────────┐
│  Message Lifecycle                      │
│                                         │
│  1. Published  ──▶ Persisted to disk   │
│  2. Delivered  ──▶ Agent processing    │
│  3. Acknowledged ──▶ Removed from queue │
│                                         │
│  If agent fails before acknowledging:  │
│  └─▶ Message returned to queue         │
│      └─▶ Another agent picks it up     │
└─────────────────────────────────────────┘

Retries & Dead Letter Queue

If an agent fails to process a message:
┌──────────────────────────────────────────────┐
│  Retry Logic                                 │
│                                              │
│  Attempt 1: Agent fails ──▶ Wait, retry     │
│  Attempt 2: Agent fails ──▶ Wait, retry     │
│  Attempt 3: Agent fails ──▶ Wait, retry     │
│  Attempt 4: Max retries ──▶ Send to DLQ     │
│                                              │
│  DLQ (Dead Letter Queue):                   │
│  • Failed messages stored here              │
│  • Can inspect and debug                    │
│  • Can manually retry or fix                │
└──────────────────────────────────────────────┘
Configure retries:
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
        config=SubscriptionConfig(
            max_retries=3,          # Retry up to 3 times
            reclaim_idle_ms=300000, # Reclaim after 5 min
            dlq_enabled=True,       # Enable DLQ
        )
    )
)

Benefits of EDA for AI Agents

1. Loose Coupling

Agents don’t need to know about each other:
# Agent A publishes event
await sdk.publish_task(
    event_envelope=EventEnvelope(
        topic="analysis.completed",
        payload=PayloadBase(content={"result": "..."})
    )
)

# Agent B subscribes (Agent A doesn't know about B!)
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="analysis.completed",
        callback=create_report,
    )
)

# Agent C also subscribes (Agent A doesn't know about C either!)
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="analysis.completed",
        callback=notify_user,
    )
)

2. Horizontal Scaling

Scale agents independently:
# Start 3 instances of file processor
python file_processor_agent.py &  # Instance 1
python file_processor_agent.py &  # Instance 2
python file_processor_agent.py &  # Instance 3

# Event bus automatically distributes load
# Each instance processes ~33% of messages

3. Resilience

Failures are isolated:
Agent 1 crashes ──▶ Agent 2 picks up the work
Event bus down ──▶ Messages queued until recovery
Storage fails  ──▶ Agent continues, results lost but processing continues

4. Flexibility

Easy to add new functionality:
# New requirement: Send email when analysis completes
# Just add a new agent - NO CHANGES to existing code!

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="analysis.completed",  # Same topic
        callback=send_email,          # New agent
    )
)

5. Observability

See what’s happening:
# List all topics
omnidaemon bus list

# Inspect messages in a topic
omnidaemon bus inspect --stream analysis.completed

# Check metrics
omnidaemon metrics --topic analysis.completed

# Inspect failed messages
omnidaemon bus dlq --topic analysis.completed

EDA Patterns in OmniDaemon

1. Fan-Out Pattern

One event triggers multiple agents:
          ┌──────────────┐
          │ order.placed │
          └──────────────┘

      ┌──────────┼──────────┐
      ▼          ▼          ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Inventory│ │Payment  │ │Shipping │
│ Agent   │ │ Agent   │ │ Agent   │
└─────────┘ └─────────┘ └─────────┘

2. Pipeline Pattern

Agents process in sequence:
┌────────┐   ┌────────┐   ┌────────┐   ┌────────┐
│ Upload │──▶│Extract │──▶│Analyze │──▶│ Report │
│ Agent  │   │ Agent  │   │ Agent  │   │ Agent  │
└────────┘   └────────┘   └────────┘   └────────┘
    ↓            ↓            ↓            ↓
file.uploaded  text.extracted analysis.done report.ready

3. Request-Reply Pattern

Agent responds to specific requester:
# Publisher includes reply_to
await sdk.publish_task(
    event_envelope=EventEnvelope(
        topic="analysis.request",
        payload=PayloadBase(
            content={"data": "..."},
            reply_to="analysis.response",  # Where to send result
            correlation_id="req-123",       # Track this request
        )
    )
)

# Agent processes and replies
async def analyze(message: dict):
    result = await process(message)
    
    # Publish result to reply_to topic
    await sdk.publish_task(
        event_envelope=EventEnvelope(
            topic=message.get("reply_to"),
            payload=PayloadBase(
                content=result,
                correlation_id=message.get("correlation_id"),
            )
        )
    )

4. Webhook Pattern

HTTP callback when processing completes:
# Publisher includes webhook URL
await sdk.publish_task(
    event_envelope=EventEnvelope(
        topic="analysis.request",
        payload=PayloadBase(
            content={"data": "..."},
            webhook="https://api.example.com/callback",
        )
    )
)

# OmniDaemon automatically POSTs result to webhook
# {
#   "task_id": "...",
#   "status": "success",
#   "result": {...},
#   "timestamp": "2025-03-12T10:30:00Z"
# }

Real-World Example

Let’s see EDA in action with a document processing system:
# ============================================
# Document Upload Service (Publisher)
# ============================================
async def handle_upload(file):
    """User uploads document"""
    # Save file
    file_path = save_file(file)
    
    # Publish event (fire and forget!)
    await sdk.publish_task(
        event_envelope=EventEnvelope(
            topic="document.uploaded",
            payload=PayloadBase(
                content={
                    "file_path": file_path,
                    "filename": file.filename,
                    "user_id": current_user.id,
                }
            )
        )
    )
    
    return {"status": "uploaded", "message": "Processing started"}

# ============================================
# Text Extraction Agent (Subscriber 1)
# ============================================
async def extract_text(message: dict):
    """Extract text from document"""
    file_path = message["content"]["file_path"]
    text = await pdf_to_text(file_path)
    
    # Publish extracted text
    await sdk.publish_task(
        event_envelope=EventEnvelope(
            topic="text.extracted",
            payload=PayloadBase(
                content={
                    "text": text,
                    "file_path": file_path,
                }
            )
        )
    )

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="document.uploaded",
        callback=extract_text,
    )
)

# ============================================
# Analysis Agent (Subscriber 2)
# ============================================
async def analyze_sentiment(message: dict):
    """Analyze sentiment of text"""
    text = message["content"]["text"]
    sentiment = await ai_analyze_sentiment(text)
    
    # Publish analysis result
    await sdk.publish_task(
        event_envelope=EventEnvelope(
            topic="analysis.completed",
            payload=PayloadBase(
                content={
                    "sentiment": sentiment,
                    "file_path": message["content"]["file_path"],
                }
            )
        )
    )

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="text.extracted",
        callback=analyze_sentiment,
    )
)

# ============================================
# Notification Agent (Subscriber 3)
# ============================================
async def notify_user(message: dict):
    """Notify user that analysis is complete"""
    result = message["content"]
    await send_email(
        to=result["user_email"],
        subject="Document Analysis Complete",
        body=f"Sentiment: {result['sentiment']}"
    )

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="analysis.completed",
        callback=notify_user,
    )
)
What Happens:
  1. User uploads document
  2. Upload service publishes document.uploaded event
  3. Text Extraction Agent processes and publishes text.extracted
  4. Analysis Agent processes and publishes analysis.completed
  5. Notification Agent sends email to user
Benefits:
  • ✅ Each service is independent
  • ✅ Can scale each agent separately
  • ✅ Easy to add new agents (e.g., thumbnail generator)
  • ✅ Failures isolated (if notification fails, analysis still works)

Best Practices

1. Design Good Events

✅ Good Event:
{
    "topic": "user.registered",
    "content": {
        "user_id": "12345",
        "email": "[email protected]",
        "timestamp": "2025-03-12T10:30:00Z"
    }
}
❌ Bad Event:
{
    "topic": "user",  # Too generic
    "content": {
        "do": "send_email",  # Commands, not events
    }
}

2. Keep Agents Focused

✅ Good:
  • One agent per concern
  • Clear, specific topics
  • Small, focused callbacks
❌ Bad:
  • One agent does everything
  • Generic topics like “process”
  • Complex, multi-step callbacks

3. Use Idempotency

Agents may receive the same message twice (network issues, retries):
async def process_payment(message: dict):
    payment_id = message["content"]["payment_id"]
    
    # Check if already processed
    if await is_processed(payment_id):
        return {"status": "already_processed"}
    
    # Process
    result = await charge_card(...)
    
    # Mark as processed
    await mark_processed(payment_id)
    
    return result

4. Handle Failures Gracefully

async def my_agent(message: dict):
    try:
        # Process
        result = await process(message)
        return {"status": "success", "result": result}
    
    except TemporaryError as e:
        # Retriable (network, rate limit)
        logger.warning(f"Temporary failure: {e}")
        raise  # Will retry automatically
    
    except PermanentError as e:
        # Not retriable (invalid data)
        logger.error(f"Permanent failure: {e}")
        return {"status": "error", "error": str(e)}  # Don't retry

5. Monitor Your System

# Check health regularly
omnidaemon health

# Monitor metrics
omnidaemon metrics

# Watch for DLQ messages
omnidaemon bus dlq --topic your.topic

# Inspect event bus
omnidaemon bus stats

Further Reading


References