Publisher Example
This page provides comprehensive examples of publishing events to OmniDaemon, from simple one-liners to advanced patterns with all features.Overview
What This Guide Covers:- ✅ Simple publishing (minimal code)
- ✅ Full publishing (all parameters)
- ✅ Batch publishing (multiple events)
- ✅ Webhook callbacks
- ✅ Reply-to patterns (agent-to-agent)
- ✅ Correlation and causation tracking
- ✅ Multi-tenancy with tenant_id
- ✅ Custom source tracking
- ✅ Result retrieval (24h TTL)
Prerequisites
1. Install OmniDaemon
2. Set Up Event Bus
3. Set Up Environment
4. Start an Agent Runner
You need an agent listening on the topic you publish to:Simple Publishing (Minimal)
Example 1: Simplest Possible
- Event published to
omni-stream:greet.user - Any agent subscribed to
greet.userreceives it - Task ID returned immediately (async processing)
Full Publishing (All Parameters)
Example 2: Using All Features
Parameter Explanations
Required Parameters
topic (Required)
What: The topic (channel) to publish toFormat:
{domain}.{action} (convention)Example:
- Creates/uses stream:
omni-stream:{topic} - Delivered to all agents subscribed to this topic
payload.content (Required)
What: Your actual dataFormat: Any JSON-serializable dict
Example:
Optional Parameters
payload.webhook (Optional)
What: HTTP URL to POST result to when processing completesWhen to use: When you want push notifications instead of polling
TTL: Results stored for 24 hours Example:
- ✅ Real-time notifications
- ✅ Integration with external systems
- ✅ Async job completion callbacks
- ✅ Monitoring/alerting systems
payload.reply_to (Optional)
What: Topic to publish result to (agent-to-agent communication)When to use: When building agent pipelines/workflows Example:
- Agent processes event
- Result published to
reply_totopic - Another agent (listening to that topic) receives it
- Creates event chains!
- ✅ Multi-stage pipelines
- ✅ Agent orchestration
- ✅ Workflow automation
- ✅ Fan-out/fan-in patterns
correlation_id (Optional)
What: Track a single request across multiple eventsFormat: String (UUID recommended)
Lifetime: Passed through entire event chain Example:
- ✅ Distributed tracing
- ✅ Log aggregation
- ✅ Debug workflows
- ✅ Track user journeys
causation_id (Optional)
What: Track cause-effect relationships in event chainsFormat: String (usually the ID of the triggering event)
Difference from correlation_id: Shows parent-child relationships Example:
- ✅ Event sourcing
- ✅ Causality tracking
- ✅ Root cause analysis
- ✅ Event replay
source (Optional)
What: Identifies where the event originatedFormat: String (system/service name) Example:
- ✅ Multi-source systems
- ✅ Source-specific logic
- ✅ Analytics
- ✅ Security auditing
tenant_id (Optional)
What: Isolate data by customer/organizationFormat: String (customer identifier)
Lifetime: Passed through entire event chain Example:
- ✅ Multi-tenancy
- ✅ Customer isolation
- ✅ Per-customer processing
- ✅ Billing/usage tracking
Result Storage & Retrieval
24-Hour TTL
Results are stored for 24 hours by default:- ✅ Balance availability with storage efficiency
- ✅ Most use cases retrieve results quickly
- ✅ Prevents unbounded storage growth
- ✅ Encourages proper result handling
Retrieving Results
Common Patterns
Pattern 1: Fire-and-Forget
Pattern 2: Wait for Result
Pattern 3: Batch Publishing
Pattern 4: Webhook Callback
Pattern 5: Agent Pipeline
Pattern 6: Multi-Tenant Publishing
Complete Example
File:publisher.py (based on actual example)
Best Practices
1. Always Use correlation_id
2. Structure Your Topics
3. Use Webhooks for Long Tasks
4. Propagate IDs Through Chains
5. Handle Expiration Gracefully
Troubleshooting
Event Not Processed
Result Not Found
Webhook Not Called
Further Reading
- Callback Pattern - What’s in the message
- Event-Driven Architecture - EDA principles
- OmniCore Agent Example - Build agents
- Google ADK Example - Build agents
- Common Patterns - Production recipes
Summary
Required Parameters:topic- Where to send eventpayload.content- Your data
payload.webhook- HTTP callback URLpayload.reply_to- Agent-to-agent topiccorrelation_id- Track across eventscausation_id- Track cause-effectsource- Event origintenant_id- Multi-tenancy
- 24-hour TTL by default
- Retrieve with
sdk.get_result(task_id) - Or use webhook for push notifications
- Or use reply_to for agent chains
- Always use correlation_id
- Structure topics hierarchically
- Use webhooks for long tasks
- Propagate IDs through chains
- Handle expiration gracefully