Skip to main content

Publisher Example

This page provides comprehensive examples of publishing events to OmniDaemon, from simple one-liners to advanced patterns with all features.

Overview

What This Guide Covers:
  • ✅ Simple publishing (minimal code)
  • ✅ Full publishing (all parameters)
  • ✅ Batch publishing (multiple events)
  • ✅ Webhook callbacks
  • ✅ Reply-to patterns (agent-to-agent)
  • ✅ Correlation and causation tracking
  • ✅ Multi-tenancy with tenant_id
  • ✅ Custom source tracking
  • ✅ Result retrieval (24h TTL)

Prerequisites

1. Install OmniDaemon

# Recommended: uv (fast & modern)
uv add omnidaemon

# Or with pip:
pip install omnidaemon

2. Set Up Event Bus

# Docker Redis (easiest)
docker run -d -p 6379:6379 --name redis redis:latest

# Or install locally
# macOS: brew install redis && brew services start redis
# Ubuntu: sudo apt install redis-server && sudo systemctl start redis

3. Set Up Environment

# Create .env (optional - uses defaults)
cat > .env << 'EOF'
REDIS_URL=redis://localhost:6379
STORAGE_BACKEND=json
JSON_STORAGE_DIR=.omnidaemon_data
EOF

4. Start an Agent Runner

You need an agent listening on the topic you publish to:
# In another terminal
python agent_runner.py

Simple Publishing (Minimal)

Example 1: Simplest Possible

import asyncio
from omnidaemon import OmniDaemonSDK
from omnidaemon import EventEnvelope, PayloadBase

sdk = OmniDaemonSDK()

async def main():
    # Create event
    event = EventEnvelope(
        topic="greet.user",                    # REQUIRED: Where to send
        payload=PayloadBase(
            content={"name": "Alice"}          # REQUIRED: Your data
        ),
    )
    
    # Publish
    task_id = await sdk.publish_task(event_envelope=event)
    print(f"✅ Published! Task ID: {task_id}")

asyncio.run(main())
Output:
✅ Published! Task ID: 550e8400-e29b-41d4-a716-446655440000
What Happens:
  1. Event published to omni-stream:greet.user
  2. Any agent subscribed to greet.user receives it
  3. Task ID returned immediately (async processing)

Full Publishing (All Parameters)

Example 2: Using All Features

import asyncio
from omnidaemon import OmniDaemonSDK
from omnidaemon import EventEnvelope, PayloadBase

sdk = OmniDaemonSDK()

async def main():
    event = EventEnvelope(
        # REQUIRED
        topic="file_system.tasks",
        payload=PayloadBase(
            content={
                "action": "create_file",
                "filename": "report.pdf",
                "data": "..."
            },
            
            # OPTIONAL: Webhook callback
            webhook="https://your-api.com/webhook/results",
            
            # OPTIONAL: Reply to topic (agent-to-agent)
            reply_to="file_system.response",
        ),
        
        # OPTIONAL: Tracking IDs
        correlation_id="req-12345",          # Track across requests
        causation_id="evt-67890",            # Track causality chain
        source="api-gateway",                # Where event came from
        
        # OPTIONAL: Multi-tenancy
        tenant_id="tenant-acme-corp",        # Isolate by customer
    )
    
    task_id = await sdk.publish_task(event_envelope=event)
    print(f"✅ Task ID: {task_id}")
    
    # Wait for processing
    await asyncio.sleep(3)
    
    # Get result (24h TTL)
    result = await sdk.get_result(task_id)
    print(f"✅ Result: {result}")

asyncio.run(main())
Output:
✅ Task ID: 550e8400-e29b-41d4-a716-446655440000
✅ Result: {
    'status': 'success',
    'data': 'File created successfully',
    'processed_by': 'file-processor'
}

Parameter Explanations

Required Parameters

topic (Required)

What: The topic (channel) to publish to
Format: {domain}.{action} (convention)
Example:
topic="file.uploaded"
topic="user.registered"
topic="analysis.requested"
How it works:
  • Creates/uses stream: omni-stream:{topic}
  • Delivered to all agents subscribed to this topic

payload.content (Required)

What: Your actual data
Format: Any JSON-serializable dict
Example:
content={
    "user_id": "123",
    "action": "upload",
    "file": "doc.pdf"
}

Optional Parameters

payload.webhook (Optional)

What: HTTP URL to POST result to when processing completes
When to use: When you want push notifications instead of polling
TTL: Results stored for 24 hours
Example:
payload=PayloadBase(
    content={...},
    webhook="https://your-api.com/webhook/results"
)
What gets POSTed:
{
    "task_id": "550e8400-...",
    "topic": "file_system.tasks",
    "status": "success",
    "result": {
        "status": "success",
        "data": "..."
    },
    "processed_at": 1678901234.567
}
Use cases:
  • ✅ Real-time notifications
  • ✅ Integration with external systems
  • ✅ Async job completion callbacks
  • ✅ Monitoring/alerting systems
Implementation tip:
# In your webhook endpoint
@app.post("/webhook/results")
async def handle_result(payload: dict):
    task_id = payload["task_id"]
    result = payload["result"]
    
    # Process result
    logger.info(f"Task {task_id} completed: {result}")
    
    return {"status": "acknowledged"}

payload.reply_to (Optional)

What: Topic to publish result to (agent-to-agent communication)
When to use: When building agent pipelines/workflows
Example:
payload=PayloadBase(
    content={...},
    reply_to="file_system.response"
)
What happens:
  1. Agent processes event
  2. Result published to reply_to topic
  3. Another agent (listening to that topic) receives it
  4. Creates event chains!
Agent chain example:
# Agent 1: Process file → reply to analysis.tasks
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
    )
)

# Inside callback:
async def process_file(message: dict):
    result = {"file_data": "..."}
    
    # Publish to next agent
    if message.get("reply_to"):
        await sdk.publish_task(
            EventEnvelope(
                topic=message["reply_to"],
                payload=PayloadBase(content=result)
            )
        )
    
    return result

# Agent 2: Analyze data
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="analysis.tasks",
        callback=analyze_data,
    )
)
Use cases:
  • ✅ Multi-stage pipelines
  • ✅ Agent orchestration
  • ✅ Workflow automation
  • ✅ Fan-out/fan-in patterns

correlation_id (Optional)

What: Track a single request across multiple events
Format: String (UUID recommended)
Lifetime: Passed through entire event chain
Example:
# Initial request
event = EventEnvelope(
    topic="user.registered",
    correlation_id="req-12345",  # Same for all related events
    payload=PayloadBase(content={...}),
)
Use cases:
  • ✅ Distributed tracing
  • ✅ Log aggregation
  • ✅ Debug workflows
  • ✅ Track user journeys
Usage in callback:
async def process_event(message: dict):
    correlation_id = message.get("correlation_id")
    
    logger.info(f"[{correlation_id}] Processing event")
    
    # Pass to downstream events
    await sdk.publish_task(
        EventEnvelope(
            topic="next.step",
            correlation_id=correlation_id,  # Propagate!
            payload=PayloadBase(content={...}),
        )
    )

causation_id (Optional)

What: Track cause-effect relationships in event chains
Format: String (usually the ID of the triggering event)
Difference from correlation_id: Shows parent-child relationships
Example:
# Event 1 (root cause)
event1 = EventEnvelope(
    id="evt-100",
    topic="file.uploaded",
    correlation_id="req-12345",
    payload=PayloadBase(content={...}),
)

# Event 2 (caused by Event 1)
event2 = EventEnvelope(
    id="evt-101",
    topic="virus.scan",
    correlation_id="req-12345",      # Same request
    causation_id="evt-100",           # Caused by evt-100
    payload=PayloadBase(content={...}),
)

# Event 3 (caused by Event 2)
event3 = EventEnvelope(
    id="evt-102",
    topic="notify.user",
    correlation_id="req-12345",      # Same request
    causation_id="evt-101",           # Caused by evt-101
    payload=PayloadBase(content={...}),
)
Use cases:
  • ✅ Event sourcing
  • ✅ Causality tracking
  • ✅ Root cause analysis
  • ✅ Event replay

source (Optional)

What: Identifies where the event originated
Format: String (system/service name)
Example:
event = EventEnvelope(
    topic="user.action",
    source="mobile-app",  # Or: "web-app", "api-gateway", "cron-job"
    payload=PayloadBase(content={...}),
)
Use cases:
  • ✅ Multi-source systems
  • ✅ Source-specific logic
  • ✅ Analytics
  • ✅ Security auditing
Usage in callback:
async def process_event(message: dict):
    source = message.get("source")
    
    if source == "mobile-app":
        # Mobile-specific logic
        pass
    elif source == "web-app":
        # Web-specific logic
        pass

tenant_id (Optional)

What: Isolate data by customer/organization
Format: String (customer identifier)
Lifetime: Passed through entire event chain
Example:
event = EventEnvelope(
    topic="file.upload",
    tenant_id="customer-acme",  # Or: "org-123", "user-456"
    payload=PayloadBase(content={...}),
)
Use cases:
  • ✅ Multi-tenancy
  • ✅ Customer isolation
  • ✅ Per-customer processing
  • ✅ Billing/usage tracking
Usage in callback:
async def process_event(message: dict):
    tenant_id = message.get("tenant_id")
    
    # Load tenant-specific config
    config = get_tenant_config(tenant_id)
    
    # Process with tenant context
    result = process_with_config(message["content"], config)
    
    # Store result with tenant isolation
    await save_result(tenant_id, result)

Result Storage & Retrieval

24-Hour TTL

Results are stored for 24 hours by default:
# Publish event
task_id = await sdk.publish_task(event_envelope=event)

# Result available for 24 hours
result = await sdk.get_result(task_id)  # ✅ Available

# After 24 hours
result = await sdk.get_result(task_id)  # ❌ None (expired)
Why 24 hours?
  • ✅ Balance availability with storage efficiency
  • ✅ Most use cases retrieve results quickly
  • ✅ Prevents unbounded storage growth
  • ✅ Encourages proper result handling
Custom TTL (advanced):
# In storage implementation
await store.save_result(
    task_id=task_id,
    result=result,
    ttl_seconds=3600  # 1 hour
)

Retrieving Results

# Method 1: Poll for result
task_id = await sdk.publish_task(event_envelope=event)

await asyncio.sleep(2)  # Wait for processing

result = await sdk.get_result(task_id)
if result:
    print(f"✅ Result: {result}")
else:
    print("⏳ Not ready yet or expired")
# Method 2: Use webhook (push notification)
event = EventEnvelope(
    topic="file.process",
    payload=PayloadBase(
        content={...},
        webhook="https://your-api.com/webhook"  # Results pushed here
    ),
)
# Method 3: Use reply_to (agent-to-agent)
event = EventEnvelope(
    topic="file.process",
    payload=PayloadBase(
        content={...},
        reply_to="results.topic"  # Result published here
    ),
)

Common Patterns

Pattern 1: Fire-and-Forget

# Publish and don't wait
async def fire_and_forget():
    await sdk.publish_task(
        EventEnvelope(
            topic="analytics.track",
            payload=PayloadBase(content={"event": "page_view"}),
        )
    )
    # Don't retrieve result

Pattern 2: Wait for Result

# Publish and wait
async def wait_for_result():
    task_id = await sdk.publish_task(
        EventEnvelope(
            topic="file.process",
            payload=PayloadBase(content={"file": "doc.pdf"}),
        )
    )
    
    # Poll until ready
    for i in range(30):  # 30 attempts
        result = await sdk.get_result(task_id)
        if result:
            return result
        await asyncio.sleep(1)
    
    raise TimeoutError("Result not ready after 30s")

Pattern 3: Batch Publishing

# Publish multiple events
async def batch_publish():
    tasks = []
    
    for item in items:
        event = EventEnvelope(
            topic="process.item",
            payload=PayloadBase(content=item),
        )
        task_id = await sdk.publish_task(event_envelope=event)
        tasks.append(task_id)
    
    print(f"✅ Published {len(tasks)} tasks")
    return tasks

Pattern 4: Webhook Callback

# Publish with webhook
async def with_webhook():
    event = EventEnvelope(
        topic="long.task",
        payload=PayloadBase(
            content={"job_id": "123"},
            webhook="https://your-api.com/webhook/completion"
        ),
    )
    
    task_id = await sdk.publish_task(event_envelope=event)
    print(f"✅ Task {task_id} will POST to webhook when done")

Pattern 5: Agent Pipeline

# Multi-stage pipeline
async def agent_pipeline():
    # Stage 1: Upload
    event1 = EventEnvelope(
        topic="file.upload",
        correlation_id="pipeline-123",
        payload=PayloadBase(
            content={"file": "doc.pdf"},
            reply_to="file.scan"  # Next stage
        ),
    )
    await sdk.publish_task(event_envelope=event1)
    
    # Stage 2 agent (listening to file.scan) processes
    # Stage 2 replies to file.process
    # Stage 3 agent (listening to file.process) finishes

Pattern 6: Multi-Tenant Publishing

# Per-customer events
async def multi_tenant():
    for customer in customers:
        event = EventEnvelope(
            topic="invoice.generate",
            tenant_id=customer.id,
            correlation_id=f"invoice-{customer.id}",
            payload=PayloadBase(content={
                "customer_id": customer.id,
                "month": "2025-01"
            }),
        )
        await sdk.publish_task(event_envelope=event)

Complete Example

File: publisher.py (based on actual example)
import asyncio
from omnidaemon import OmniDaemonSDK
from omnidaemon import EventEnvelope, PayloadBase

# SDK uses dependency injection for storage
sdk = OmniDaemonSDK()

async def publish_tasks():
    # Complex file operations task
    payload = {
        "content": """
**1. Create a Directory:** Create "test_directory".
**2. Create Files:** Create "file1.txt" and "file2.txt" in "test_directory".
**3. Write Content:** Write sample content into both files.
**4. List Directory:** List contents of "test_directory".
**5. Read File:** Read content of "file1.txt".
**6. Edit File:** Edit "file2.txt" to replace text.
**7. Move File:** Move "file1.txt" to "file3.txt".
""",
        # Optional webhook
        # "webhook": "http://localhost:8004/results",
    }
    
    topic = "file_system.tasks"
    
    event_payload = EventEnvelope(
        topic=topic,
        payload=PayloadBase(
            content=payload["content"],
            webhook=payload.get("webhook"),
        ),
    )
    
    task_id = await sdk.publish_task(event_envelope=event_payload)
    print(f"✅ Published task: {task_id}")
    
    # Wait for processing
    print("⏳ Waiting for agent to process...")
    await asyncio.sleep(10)
    
    # Get result
    result = await sdk.get_result(task_id)
    if result:
        print(f"✅ Result:\n{result}")
    else:
        print("❌ Result not found (may have expired or not ready)")

if __name__ == "__main__":
    asyncio.run(publish_tasks())
Run it:
python publisher.py
Output:
✅ Published task: 550e8400-e29b-41d4-a716-446655440000
⏳ Waiting for agent to process...
✅ Result:
{
    'status': 'success',
    'data': 'All file operations completed successfully:\n- Created test_directory\n- Created file1.txt and file2.txt\n- ...',
    'processed_by': 'OmniCore Agent'
}

Best Practices

1. Always Use correlation_id

# Good
event = EventEnvelope(
    topic="process.data",
    correlation_id=f"req-{uuid.uuid4()}",  # Track it!
    payload=PayloadBase(content={...}),
)

# Bad (hard to debug)
event = EventEnvelope(
    topic="process.data",
    payload=PayloadBase(content={...}),
)

2. Structure Your Topics

# Good (hierarchical, clear)
"user.registered"
"user.updated"
"payment.processed"
"email.sent"

# Bad (unclear, unstructured)
"do_stuff"
"handle_user"
"process"

3. Use Webhooks for Long Tasks

# Good for long-running tasks
event = EventEnvelope(
    topic="video.transcode",  # May take minutes
    payload=PayloadBase(
        content={...},
        webhook="https://api.example.com/webhook"  # Push when done
    ),
)

# Polling wastes resources for long tasks

4. Propagate IDs Through Chains

async def process_event(message: dict):
    # Extract IDs
    correlation_id = message.get("correlation_id")
    tenant_id = message.get("tenant_id")
    
    # Propagate to next event
    await sdk.publish_task(
        EventEnvelope(
            topic="next.step",
            correlation_id=correlation_id,  # Pass through!
            tenant_id=tenant_id,             # Pass through!
            causation_id=message.get("task_id"),  # Chain causality
            payload=PayloadBase(content={...}),
        )
    )

5. Handle Expiration Gracefully

# Always check if result exists
result = await sdk.get_result(task_id)
if result is None:
    # Either not ready, expired, or error
    logger.warning(f"Result {task_id} not found")
    # Implement fallback logic

Troubleshooting

Event Not Processed

# Check if agent is running
omnidaemon agent list

# Check if topic matches
# Publisher: topic="file.upload"
# Agent: topic="file.upload"  # Must match exactly!

Result Not Found

# Common causes:
# 1. Not processed yet (wait longer)
# 2. Expired (> 24 hours)
# 3. Agent returned None
# 4. Agent crashed (check DLQ)

# Check DLQ
omnidaemon bus dlq --topic file.upload

Webhook Not Called

# Ensure webhook URL is reachable from agent
# Use https://webhook.site for testing

event = EventEnvelope(
    topic="test",
    payload=PayloadBase(
        content={...},
        webhook="https://webhook.site/your-unique-url"
    ),
)

Further Reading


Summary

Required Parameters:
  • topic - Where to send event
  • payload.content - Your data
Optional Parameters:
  • payload.webhook - HTTP callback URL
  • payload.reply_to - Agent-to-agent topic
  • correlation_id - Track across events
  • causation_id - Track cause-effect
  • source - Event origin
  • tenant_id - Multi-tenancy
Result Storage:
  • 24-hour TTL by default
  • Retrieve with sdk.get_result(task_id)
  • Or use webhook for push notifications
  • Or use reply_to for agent chains
Best Practices:
  • Always use correlation_id
  • Structure topics hierarchically
  • Use webhooks for long tasks
  • Propagate IDs through chains
  • Handle expiration gracefully
Publishing events to OmniDaemon is simple yet powerful! 🚀