Skip to main content

Common Patterns

This guide provides 10 production-ready patterns for building robust event-driven AI agent systems with OmniDaemon.

Overview

Patterns Covered:
  1. Fire-and-Forget - Async tasks without waiting
  2. Request-Reply - Synchronous-like responses
  3. Agent Pipeline - Multi-stage workflows
  4. Fan-Out/Fan-In - Parallel processing
  5. Dead Letter Queue Handler - Failed message recovery
  6. Saga Pattern - Distributed transactions
  7. Multi-Tenant Routing - Customer isolation
  8. Webhook Integration - External system callbacks
  9. Event Replay - Reprocessing historical events
  10. Circuit Breaker - Fault tolerance

1. Fire-and-Forget

Use Case: Publish events without waiting for results (logging, analytics, notifications) When to Use:
  • ✅ Non-critical operations
  • ✅ Background tasks
  • ✅ High throughput requirements
  • ✅ Don’t need response
Pattern:
import asyncio
from omnidaemon import OmniDaemonSDK
from omnidaemon import EventEnvelope, PayloadBase

sdk = OmniDaemonSDK()

async def fire_and_forget():
    """Publish event and immediately return."""
    # Publish
    task_id = await sdk.publish_task(
        EventEnvelope(
            topic="analytics.track",
            payload=PayloadBase(
                content={
                    "event": "page_view",
                    "user_id": "user_123",
                    "page": "/dashboard"
                }
            ),
        )
    )
    
    print(f"✅ Event published: {task_id}")
    # Don't wait for result!
    return task_id

# Usage
await fire_and_forget()
# Continues immediately, agent processes in background
Agent Side:
async def track_analytics(message: dict):
    """Process analytics event."""
    content = message.get("content", {})
    
    # Store in analytics DB
    await analytics_db.insert({
        "event": content["event"],
        "user_id": content["user_id"],
        "timestamp": time.time(),
    })
    
    # No return needed (fire-and-forget)
    return None
Benefits:
  • ✅ Maximum throughput
  • ✅ No blocking
  • ✅ Simple implementation
Drawbacks:
  • ❌ No result confirmation
  • ❌ No error notification (check DLQ)

2. Request-Reply

Use Case: Publish event and wait for response (API calls, synchronous operations) When to Use:
  • ✅ Need result immediately
  • ✅ User waiting for response
  • ✅ API endpoints
  • ✅ Interactive applications
Pattern:
async def request_reply(query: str, timeout: int = 30) -> dict:
    """Publish and wait for result."""
    # Publish
    task_id = await sdk.publish_task(
        EventEnvelope(
            topic="ai.query",
            payload=PayloadBase(
                content={"query": query}
            ),
        )
    )
    
    # Poll for result with timeout
    start_time = time.time()
    while time.time() - start_time < timeout:
        result = await sdk.get_result(task_id)
        if result is not None:
            return result
        
        await asyncio.sleep(0.5)  # Poll every 500ms
    
    raise TimeoutError(f"No result after {timeout}s")

# Usage
try:
    result = await request_reply("Analyze this document", timeout=30)
    print(f"✅ Result: {result}")
except TimeoutError:
    print("❌ Request timed out")
With Exponential Backoff:
async def request_reply_backoff(query: str, max_wait: int = 30):
    """Poll with exponential backoff."""
    task_id = await sdk.publish_task(
        EventEnvelope(
            topic="ai.query",
            payload=PayloadBase(content={"query": query}),
        )
    )
    
    delays = [0.1, 0.2, 0.5, 1, 2, 5]  # Exponential backoff
    total_waited = 0
    
    for delay in delays:
        if total_waited >= max_wait:
            raise TimeoutError(f"No result after {max_wait}s")
        
        await asyncio.sleep(delay)
        total_waited += delay
        
        result = await sdk.get_result(task_id)
        if result is not None:
            return result
    
    # Final check
    result = await sdk.get_result(task_id)
    if result is None:
        raise TimeoutError(f"No result after {max_wait}s")
    
    return result
Benefits:
  • ✅ Get result immediately
  • ✅ Error handling
  • ✅ Timeout control
Drawbacks:
  • ❌ Blocks caller
  • ❌ Polling overhead
  • ❌ Not ideal for slow tasks

3. Agent Pipeline

Use Case: Multi-stage processing (upload → scan → process → notify) When to Use:
  • ✅ Complex workflows
  • ✅ Multiple processing steps
  • ✅ Each step is independent
  • ✅ Need fault tolerance at each stage
Pattern:
# Stage 1: Upload Handler
async def handle_upload(message: dict):
    """Stage 1: Validate and store file."""
    content = message.get("content", {})
    file_data = content["file"]
    
    # Validate
    if not is_valid_file(file_data):
        return {"status": "invalid"}
    
    # Store
    file_id = await storage.save(file_data)
    
    # Publish to next stage
    correlation_id = message.get("correlation_id")
    await sdk.publish_task(
        EventEnvelope(
            topic="file.scan",  # Next stage
            correlation_id=correlation_id,
            causation_id=message.get("task_id"),
            payload=PayloadBase(
                content={"file_id": file_id}
            ),
        )
    )
    
    return {"status": "uploaded", "file_id": file_id}


# Stage 2: Virus Scanner
async def handle_scan(message: dict):
    """Stage 2: Scan file for viruses."""
    content = message.get("content", {})
    file_id = content["file_id"]
    
    # Scan
    scan_result = await virus_scanner.scan(file_id)
    
    if scan_result.is_clean:
        # Publish to next stage
        correlation_id = message.get("correlation_id")
        await sdk.publish_task(
            EventEnvelope(
                topic="file.process",  # Next stage
                correlation_id=correlation_id,
                causation_id=message.get("task_id"),
                payload=PayloadBase(
                    content={"file_id": file_id}
                ),
            )
        )
        return {"status": "clean"}
    else:
        # Quarantine and notify
        await sdk.publish_task(
            EventEnvelope(
                topic="file.quarantine",
                correlation_id=correlation_id,
                payload=PayloadBase(
                    content={"file_id": file_id, "reason": "virus_detected"}
                ),
            )
        )
        return {"status": "infected"}


# Stage 3: Processor
async def handle_process(message: dict):
    """Stage 3: Process file."""
    content = message.get("content", {})
    file_id = content["file_id"]
    
    # Process
    result = await processor.process(file_id)
    
    # Publish to final stage
    correlation_id = message.get("correlation_id")
    await sdk.publish_task(
        EventEnvelope(
            topic="file.notify",  # Final stage
            correlation_id=correlation_id,
            causation_id=message.get("task_id"),
            payload=PayloadBase(
                content={"file_id": file_id, "result": result}
            ),
        )
    )
    
    return {"status": "processed", "result": result}


# Stage 4: Notifier
async def handle_notify(message: dict):
    """Stage 4: Notify user."""
    content = message.get("content", {})
    
    # Notify user
    await notification_service.send({
        "type": "file_processed",
        "file_id": content["file_id"],
        "result": content["result"],
    })
    
    return {"status": "notified"}


# Register all stages
async def register_pipeline():
    await sdk.register_agent(
        AgentConfig(topic="file.upload", callback=handle_upload)
    )
    await sdk.register_agent(
        AgentConfig(topic="file.scan", callback=handle_scan)
    )
    await sdk.register_agent(
        AgentConfig(topic="file.process", callback=handle_process)
    )
    await sdk.register_agent(
        AgentConfig(topic="file.notify", callback=handle_notify)
    )
Flow:
User Upload

file.upload (validate, store)

file.scan (virus check)
    ↓ (if clean)
file.process (AI processing)

file.notify (user notification)
Benefits:
  • ✅ Each stage independent
  • ✅ Fault tolerance per stage
  • ✅ Easy to scale each stage
  • ✅ Traceable with correlation_id
Drawbacks:
  • ❌ More complex
  • ❌ Latency (multiple hops)
  • ❌ Harder to debug

4. Fan-Out/Fan-In

Use Case: Parallel processing then aggregation (image variants, multi-model analysis) When to Use:
  • ✅ Independent parallel tasks
  • ✅ Aggregate results
  • ✅ Performance optimization
  • ✅ Multiple agents for same data
Pattern:
# Fan-Out: Publish to multiple agents
async def fan_out(image_url: str):
    """Send image to multiple processors in parallel."""
    correlation_id = f"batch_{uuid.uuid4()}"
    
    tasks = []
    
    # Task 1: Generate thumbnail
    tasks.append(
        sdk.publish_task(
            EventEnvelope(
                topic="image.thumbnail",
                correlation_id=correlation_id,
                payload=PayloadBase(
                    content={"url": image_url, "size": "256x256"}
                ),
            )
        )
    )
    
    # Task 2: Detect objects
    tasks.append(
        sdk.publish_task(
            EventEnvelope(
                topic="image.detect",
                correlation_id=correlation_id,
                payload=PayloadBase(
                    content={"url": image_url}
                ),
            )
        )
    )
    
    # Task 3: Extract text (OCR)
    tasks.append(
        sdk.publish_task(
            EventEnvelope(
                topic="image.ocr",
                correlation_id=correlation_id,
                payload=PayloadBase(
                    content={"url": image_url}
                ),
            )
        )
    )
    
    # Wait for all tasks to be published
    task_ids = await asyncio.gather(*tasks)
    
    return {
        "correlation_id": correlation_id,
        "task_ids": task_ids,
    }


# Fan-In: Aggregate results
async def fan_in(correlation_id: str, task_ids: list, timeout: int = 30):
    """Wait for all results and aggregate."""
    start_time = time.time()
    results = {}
    
    while len(results) < len(task_ids):
        if time.time() - start_time > timeout:
            raise TimeoutError("Not all tasks completed")
        
        for task_id in task_ids:
            if task_id not in results:
                result = await sdk.get_result(task_id)
                if result is not None:
                    results[task_id] = result
        
        if len(results) < len(task_ids):
            await asyncio.sleep(0.5)
    
    return {
        "correlation_id": correlation_id,
        "results": results,
    }


# Usage
async def process_image_parallel(image_url: str):
    # Fan-out
    batch = await fan_out(image_url)
    
    # Fan-in
    aggregated = await fan_in(
        batch["correlation_id"],
        batch["task_ids"],
        timeout=30
    )
    
    return aggregated
With asyncio.gather (cleaner):
async def fan_out_fan_in(image_url: str):
    """Fan-out and fan-in in one function."""
    correlation_id = f"batch_{uuid.uuid4()}"
    
    # Publish all tasks
    task_ids = await asyncio.gather(
        sdk.publish_task(
            EventEnvelope(
                topic="image.thumbnail",
                correlation_id=correlation_id,
                payload=PayloadBase(content={"url": image_url}),
            )
        ),
        sdk.publish_task(
            EventEnvelope(
                topic="image.detect",
                correlation_id=correlation_id,
                payload=PayloadBase(content={"url": image_url}),
            )
        ),
        sdk.publish_task(
            EventEnvelope(
                topic="image.ocr",
                correlation_id=correlation_id,
                payload=PayloadBase(content={"url": image_url}),
            )
        ),
    )
    
    # Wait for all results (with timeout)
    async def get_result_with_retry(task_id, retries=60):
        for _ in range(retries):
            result = await sdk.get_result(task_id)
            if result is not None:
                return result
            await asyncio.sleep(0.5)
        return None
    
    results = await asyncio.gather(
        *[get_result_with_retry(tid) for tid in task_ids]
    )
    
    return {
        "thumbnail": results[0],
        "detections": results[1],
        "text": results[2],
    }
Benefits:
  • ✅ Parallel execution (faster)
  • ✅ Resource utilization
  • ✅ Scalable
Drawbacks:
  • ❌ Complex aggregation
  • ❌ Partial failure handling
  • ❌ Timeout management

5. Dead Letter Queue Handler

Use Case: Recover and retry failed messages (manual inspection, republishing) When to Use:
  • ✅ Messages failing systematically
  • ✅ Need manual intervention
  • ✅ Investigate failures
  • ✅ Retry after fixes
Pattern:
async def inspect_and_handle_dlq(topic: str):
    """Inspect DLQ and handle failed messages."""
    # Get DLQ entries
    dlq_entries = await sdk.inspect_dlq(topic, limit=100)
    
    print(f"📋 Found {len(dlq_entries)} DLQ entries for {topic}")
    
    for entry in dlq_entries:
        data = entry["data"]
        original_message = data["failed_message"]
        error = data["error"]
        retry_count = data["retry_count"]
        
        print(f"\n❌ Failed Message:")
        print(f"   Error: {error}")
        print(f"   Retries: {retry_count}")
        print(f"   Content: {original_message.get('content')}")
        
        # Analyze failure type
        if "KeyError" in error:
            # Missing field - fix data and republish
            print("   → Missing field, fixing...")
            fixed_content = fix_missing_fields(original_message["content"])
            await republish_message(topic, fixed_content)
        
        elif "ConnectionError" in error:
            # Transient error - republish
            print("   → Transient error, republishing...")
            await republish_message(topic, original_message["content"])
        
        elif "ValueError" in error:
            # Bad data - log and skip
            print("   → Bad data, logging to dead_letter.log")
            log_to_dead_letter(original_message)
        
        else:
            # Unknown - manual review needed
            print("   → Unknown error, needs manual review")


async def republish_message(topic: str, content: dict):
    """Republish message from DLQ."""
    task_id = await sdk.publish_task(
        EventEnvelope(
            topic=topic,
            payload=PayloadBase(content=content),
        )
    )
    print(f"   ✅ Republished: {task_id}")


# Run periodically
async def dlq_monitor():
    """Monitor DLQ continuously."""
    topics = ["file.upload", "analysis.tasks", "notifications"]
    
    while True:
        for topic in topics:
            await inspect_and_handle_dlq(topic)
        
        # Check every 5 minutes
        await asyncio.sleep(300)
Automated DLQ Handler:
async def auto_handle_dlq(topic: str):
    """Automatically handle DLQ based on error type."""
    dlq_entries = await sdk.inspect_dlq(topic, limit=50)
    
    stats = {
        "total": len(dlq_entries),
        "republished": 0,
        "logged": 0,
        "manual": 0,
    }
    
    for entry in dlq_entries:
        data = entry["data"]
        error = data["error"]
        original_message = data["failed_message"]
        
        # Auto-retry transient errors
        transient_errors = [
            "ConnectionError",
            "TimeoutError",
            "503 Service Unavailable",
            "429 Too Many Requests",
        ]
        
        if any(err in error for err in transient_errors):
            # Wait a bit then republish
            await asyncio.sleep(5)
            await republish_message(topic, original_message["content"])
            stats["republished"] += 1
        
        # Log permanent errors
        elif "ValueError" in error or "KeyError" in error:
            log_to_dead_letter(original_message, error)
            stats["logged"] += 1
        
        # Manual review
        else:
            send_alert(f"DLQ entry needs manual review: {entry['id']}")
            stats["manual"] += 1
    
    return stats
Benefits:
  • ✅ Recover from failures
  • ✅ Prevent data loss
  • ✅ Identify systemic issues
Drawbacks:
  • ❌ Manual intervention needed
  • ❌ Can be time-consuming

6. Saga Pattern

Use Case: Distributed transactions with compensation (order processing, reservations) When to Use:
  • ✅ Multi-step transactions
  • ✅ Need rollback capability
  • ✅ Distributed systems
  • ✅ Financial operations
Pattern:
# Saga Coordinator
async def process_order_saga(order_data: dict):
    """
    Order processing saga:
    1. Reserve inventory
    2. Charge payment
    3. Create shipment
    
    If any step fails, compensate previous steps.
    """
    saga_id = f"saga_{uuid.uuid4()}"
    correlation_id = f"order_{order_data['order_id']}"
    
    compensations = []  # Track what to undo
    
    try:
        # Step 1: Reserve inventory
        inventory_result = await execute_saga_step(
            topic="inventory.reserve",
            correlation_id=correlation_id,
            causation_id=saga_id,
            content={"items": order_data["items"]},
        )
        
        if not inventory_result.get("success"):
            raise SagaFailure("Inventory reservation failed")
        
        compensations.append({
            "topic": "inventory.release",
            "content": {"reservation_id": inventory_result["reservation_id"]},
        })
        
        # Step 2: Charge payment
        payment_result = await execute_saga_step(
            topic="payment.charge",
            correlation_id=correlation_id,
            causation_id=saga_id,
            content={
                "amount": order_data["total"],
                "customer_id": order_data["customer_id"],
            },
        )
        
        if not payment_result.get("success"):
            raise SagaFailure("Payment failed")
        
        compensations.append({
            "topic": "payment.refund",
            "content": {"transaction_id": payment_result["transaction_id"]},
        })
        
        # Step 3: Create shipment
        shipment_result = await execute_saga_step(
            topic="shipment.create",
            correlation_id=correlation_id,
            causation_id=saga_id,
            content={
                "order_id": order_data["order_id"],
                "address": order_data["shipping_address"],
            },
        )
        
        if not shipment_result.get("success"):
            raise SagaFailure("Shipment creation failed")
        
        # SUCCESS! All steps completed
        return {
            "status": "completed",
            "saga_id": saga_id,
            "inventory": inventory_result,
            "payment": payment_result,
            "shipment": shipment_result,
        }
    
    except SagaFailure as e:
        # FAILURE! Compensate all completed steps
        print(f"❌ Saga failed: {e}")
        await compensate_saga(compensations, correlation_id)
        
        return {
            "status": "failed",
            "saga_id": saga_id,
            "error": str(e),
            "compensated": len(compensations),
        }


async def execute_saga_step(topic: str, correlation_id: str, causation_id: str, content: dict):
    """Execute one saga step and wait for result."""
    task_id = await sdk.publish_task(
        EventEnvelope(
            topic=topic,
            correlation_id=correlation_id,
            causation_id=causation_id,
            payload=PayloadBase(content=content),
        )
    )
    
    # Wait for result (with timeout)
    for _ in range(60):  # 30 seconds
        result = await sdk.get_result(task_id)
        if result is not None:
            return result
        await asyncio.sleep(0.5)
    
    raise TimeoutError(f"Step {topic} timed out")


async def compensate_saga(compensations: list, correlation_id: str):
    """Execute compensation steps in reverse order."""
    print(f"🔄 Compensating {len(compensations)} steps...")
    
    for compensation in reversed(compensations):
        try:
            await sdk.publish_task(
                EventEnvelope(
                    topic=compensation["topic"],
                    correlation_id=correlation_id,
                    payload=PayloadBase(content=compensation["content"]),
                )
            )
            print(f"✅ Compensated: {compensation['topic']}")
        except Exception as e:
            print(f"❌ Compensation failed: {e}")
            # Log for manual intervention
            log_compensation_failure(compensation, e)


class SagaFailure(Exception):
    """Saga step failed."""
    pass
Benefits:
  • ✅ Distributed transactions
  • ✅ Automatic rollback
  • ✅ Fault tolerance
Drawbacks:
  • ❌ Complex implementation
  • ❌ Compensation logic required
  • ❌ Eventual consistency

7. Multi-Tenant Routing

Use Case: Customer-specific processing (SaaS applications, per-tenant logic) When to Use:
  • ✅ Multi-tenant SaaS
  • ✅ Customer isolation
  • ✅ Per-customer configuration
  • ✅ Billing/usage tracking
Pattern:
async def process_with_tenant_routing(message: dict):
    """Route processing based on tenant."""
    tenant_id = message.get("tenant_id")
    content = message.get("content", {})
    
    if not tenant_id:
        return {"error": "Missing tenant_id"}
    
    # Load tenant-specific configuration
    tenant_config = await get_tenant_config(tenant_id)
    
    if not tenant_config:
        return {"error": f"Unknown tenant: {tenant_id}"}
    
    # Check tenant limits
    if not await check_tenant_limits(tenant_id):
        return {"error": "Tenant limit exceeded"}
    
    # Process with tenant context
    try:
        result = await process_for_tenant(
            content,
            tenant_id,
            tenant_config
        )
        
        # Track usage
        await track_tenant_usage(tenant_id, {
            "operation": "process",
            "tokens_used": result.get("tokens", 0),
            "timestamp": time.time(),
        })
        
        return result
    
    except Exception as e:
        # Log with tenant context
        logger.error(f"[Tenant {tenant_id}] Processing failed: {e}")
        raise


async def get_tenant_config(tenant_id: str) -> dict:
    """Load tenant-specific configuration."""
    # From database or cache
    config = await config_db.get(f"tenant:{tenant_id}")
    
    return {
        "model": config.get("ai_model", "gpt-4o"),
        "max_tokens": config.get("max_tokens", 1000),
        "features": config.get("features", []),
        "rate_limit": config.get("rate_limit", 100),
    }


async def check_tenant_limits(tenant_id: str) -> bool:
    """Check if tenant has capacity."""
    usage = await usage_db.get(f"usage:{tenant_id}:today")
    config = await get_tenant_config(tenant_id)
    
    return usage < config["rate_limit"]


# Register with tenant awareness
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="ai.query",
        callback=process_with_tenant_routing,
        description="Multi-tenant AI query processor",
    )
)
Publishing with Tenant ID:
# Publisher includes tenant_id
async def publish_for_tenant(tenant_id: str, query: str):
    task_id = await sdk.publish_task(
        EventEnvelope(
            topic="ai.query",
            tenant_id=tenant_id,  # Include tenant context
            payload=PayloadBase(
                content={"query": query}
            ),
        )
    )
    return task_id
Benefits:
  • ✅ Customer isolation
  • ✅ Per-tenant limits
  • ✅ Custom configuration
  • ✅ Usage tracking

8. Webhook Integration

Use Case: Notify external systems (API callbacks, third-party integrations) When to Use:
  • ✅ External system needs results
  • ✅ Push notifications
  • ✅ Third-party integrations
  • ✅ Don’t want polling
Pattern:
# Publisher with webhook
async def publish_with_webhook(data: dict, webhook_url: str):
    """Publish task with webhook callback."""
    task_id = await sdk.publish_task(
        EventEnvelope(
            topic="document.process",
            payload=PayloadBase(
                content=data,
                webhook=webhook_url,  # External system URL
            ),
        )
    )
    
    return {"task_id": task_id, "webhook": webhook_url}


# Agent publishes result to webhook
async def process_with_webhook(message: dict):
    """Process and POST result to webhook."""
    content = message.get("content", {})
    webhook = message.get("webhook")
    
    # Process
    result = await process_document(content)
    
    # If webhook provided, POST result
    if webhook:
        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    webhook,
                    json={
                        "task_id": message.get("task_id"),
                        "topic": message.get("topic"),
                        "status": "success",
                        "result": result,
                        "processed_at": time.time(),
                    },
                    timeout=10,
                )
                
                if response.status_code == 200:
                    print(f"✅ Webhook delivered: {webhook}")
                else:
                    print(f"❌ Webhook failed: {response.status_code}")
        
        except Exception as e:
            print(f"❌ Webhook error: {e}")
            # Don't fail task if webhook fails
    
    return result
With Retry:
async def post_to_webhook_with_retry(webhook_url: str, data: dict, retries: int = 3):
    """POST to webhook with exponential backoff."""
    for attempt in range(retries):
        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    webhook_url,
                    json=data,
                    timeout=10,
                )
                
                if response.status_code in [200, 201, 202]:
                    return True
                
                if response.status_code >= 500:
                    # Server error - retry
                    if attempt < retries - 1:
                        await asyncio.sleep(2 ** attempt)  # Exponential backoff
                        continue
                
                # Client error - don't retry
                return False
        
        except Exception as e:
            if attempt < retries - 1:
                await asyncio.sleep(2 ** attempt)
                continue
            
            return False
    
    return False
Benefits:
  • ✅ Push notifications
  • ✅ Real-time updates
  • ✅ No polling overhead
Drawbacks:
  • ❌ Webhook endpoint must be reliable
  • ❌ Security (validate requests)
  • ❌ Network errors

9. Event Replay

Use Case: Reprocess historical events (bug fixes, new features, data migration) When to Use:
  • ✅ Fixed a bug, need to reprocess
  • ✅ New feature needs old data
  • ✅ Data migration
  • ✅ Testing
Pattern:
async def replay_events(
    topic: str,
    start_date: datetime,
    end_date: datetime,
    dry_run: bool = True,
):
    """Replay events from stream."""
    print(f"🔄 Replaying {topic} from {start_date} to {end_date}")
    
    # Get messages from stream
    messages = await sdk.inspect_stream(topic, limit=10000)
    
    replayed = 0
    skipped = 0
    
    for msg in messages:
        # Filter by date
        created_at = msg["data"].get("created_at", 0)
        msg_date = datetime.fromtimestamp(created_at)
        
        if start_date <= msg_date <= end_date:
            if dry_run:
                print(f"   [DRY RUN] Would replay: {msg['id']}")
                replayed += 1
            else:
                # Republish
                await sdk.publish_task(
                    EventEnvelope(
                        topic=topic,
                        correlation_id=f"replay_{msg['id']}",
                        payload=PayloadBase(
                            content=msg["data"]["content"]
                        ),
                    )
                )
                replayed += 1
                print(f"   ✅ Replayed: {msg['id']}")
        else:
            skipped += 1
    
    print(f"\n📊 Summary:")
    print(f"   Replayed: {replayed}")
    print(f"   Skipped: {skipped}")
    print(f"   Total: {replayed + skipped}")


# Usage
await replay_events(
    topic="document.process",
    start_date=datetime(2025, 11, 1),
    end_date=datetime(2025, 11, 7),
    dry_run=True,  # Test first!
)
Benefits:
  • ✅ Fix past mistakes
  • ✅ Apply new logic to old data
  • ✅ Data recovery
Drawbacks:
  • ❌ Can be slow
  • ❌ Duplicate processing risk
  • ❌ Resource intensive

10. Circuit Breaker

Use Case: Prevent cascading failures (external API down, database overload) When to Use:
  • ✅ Call external services
  • ✅ Prevent cascading failures
  • ✅ Fast fail when service down
  • ✅ Automatic recovery
Pattern:
class CircuitBreaker:
    """Circuit breaker for external service calls."""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout_duration: int = 60,
        expected_exception: type = Exception,
    ):
        self.failure_threshold = failure_threshold
        self.timeout_duration = timeout_duration
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker."""
        if self.state == "OPEN":
            # Check if timeout expired
            if time.time() - self.last_failure_time > self.timeout_duration:
                self.state = "HALF_OPEN"
            else:
                raise CircuitBreakerOpen("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            
            # Success - reset
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failure_count = 0
            
            return result
        
        except self.expected_exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                print(f"❌ Circuit breaker OPENED after {self.failure_count} failures")
            
            raise


class CircuitBreakerOpen(Exception):
    """Circuit breaker is open."""
    pass


# Usage in agent
external_api_breaker = CircuitBreaker(
    failure_threshold=5,
    timeout_duration=60,
    expected_exception=httpx.HTTPError,
)


async def call_external_api(data: dict):
    """Call external API with circuit breaker."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://external-api.com/process",
            json=data,
            timeout=10,
        )
        return response.json()


async def process_with_circuit_breaker(message: dict):
    """Process with circuit breaker protection."""
    content = message.get("content", {})
    
    try:
        result = await external_api_breaker.call(
            call_external_api,
            content
        )
        return {"status": "success", "result": result}
    
    except CircuitBreakerOpen:
        # Circuit breaker open - fast fail
        print("⚠️ Circuit breaker OPEN, skipping external API call")
        return {
            "status": "skipped",
            "reason": "circuit_breaker_open",
        }
    
    except Exception as e:
        # Other error
        print(f"❌ External API error: {e}")
        raise
Benefits:
  • ✅ Prevent cascading failures
  • ✅ Fast fail when service down
  • ✅ Automatic recovery
  • ✅ System stability
Drawbacks:
  • ❌ Complexity
  • ❌ False positives possible
  • ❌ State management

Further Reading


Summary

10 Production-Ready Patterns:
  1. Fire-and-Forget - Background tasks
  2. Request-Reply - Synchronous responses
  3. Agent Pipeline - Multi-stage workflows
  4. Fan-Out/Fan-In - Parallel processing
  5. DLQ Handler - Failed message recovery
  6. Saga Pattern - Distributed transactions
  7. Multi-Tenant Routing - Customer isolation
  8. Webhook Integration - External callbacks
  9. Event Replay - Reprocess historical events
  10. Circuit Breaker - Fault tolerance
Choose Pattern Based On:
  • Fire-and-Forget: High throughput, don’t need result
  • Request-Reply: Need result immediately
  • Pipeline: Complex multi-step workflows
  • Fan-Out/Fan-In: Parallel processing needed
  • DLQ Handler: Need failure recovery
  • Saga: Distributed transactions with rollback
  • Multi-Tenant: SaaS applications
  • Webhook: External system integration
  • Event Replay: Bug fixes, new features
  • Circuit Breaker: External service calls
These patterns cover 90% of production use cases! 🚀✨