SDK API Reference
Complete API reference for the OmniDaemon SDK (OmniDaemonSDK class).
Overview
The OmniDaemonSDK is the primary interface for interacting with OmniDaemon. It provides methods for:- Publishing events
- Registering agents
- Managing agents (list, get, delete, unsubscribe)
- Retrieving results
- Getting metrics
- Monitoring health
- Event bus operations
- Storage operations
Constructor
OmniDaemonSDK(event_bus=None, store=None)
Create a new SDK instance with dependency injection.
Parameters:
event_bus(Optional[BaseEventBus]): Custom event bus instance. Defaults to module-level instance.store(Optional[BaseStore]): Custom storage instance. Defaults to module-level instance.
OmniDaemonSDK instance
Example:
- SDK uses dependency injection for flexibility
- Environment variables configure default instances
- Custom instances useful for testing
Publishing Methods
publish_task(event_envelope)
Publish an event to a topic.
Parameters:
event_envelope(EventEnvelope): Event to publish
str - Task ID (UUID)
Raises:
ValidationError: Invalid event envelopeException: Publishing failed
- Trigger agent processing
- Send events to topics
- Start workflows
- Notify agents
Agent Management Methods
register_agent(agent_config)
Register an agent to listen on a topic.
Parameters:
agent_config(AgentConfig): Agent configuration
None
Raises:
ValidationError: Invalid agent configException: Registration failed
topic(str, required): Topic to subscribe tocallback(Callable, required): Function to call when message arrivesname(str, optional): Agent name (default: auto-generated from callback name)description(str, optional): Human-readable descriptiontools(List[str], optional): List of tools/capabilitiesconfig(SubscriptionConfig, optional): Subscription configuration
consumer_count(int, default=1): Number of parallel consumersreclaim_idle_ms(int, default=180000): Reclaim timeout (3 minutes)dlq_retry_limit(int, default=3): Max retries before DLQgroup_name(str, optional): Custom group name (auto-generated if None)consumer_name(str, optional): Custom consumer name (auto-generated if None)
- Register AI agents
- Set up event handlers
- Configure processing pipelines
- Scale with multiple consumers
list_agents()
List all registered agents grouped by topic.
Parameters: None
Returns: Dict[str, List[Dict]] - Agents grouped by topic
Example:
- Monitor registered agents
- Audit agent configuration
- Build dashboards
- CLI tools
get_agent(topic, agent_name)
Get detailed information about a specific agent.
Parameters:
topic(str): Topic nameagent_name(str): Agent name
Optional[Dict] - Agent details or None if not found
Example:
- Inspect agent configuration
- Debugging
- Health checks
- CLI tools
unsubscribe_agent(topic, agent_name)
Temporarily stop agent processing (pause).
Parameters:
topic(str): Topic nameagent_name(str): Agent name
bool - True if unsubscribed, False if not found
Example:
- ✅ Stops consuming new messages
- ✅ Keeps consumer group intact
- ✅ Keeps DLQ preserved
- ✅ Keeps agent data in storage
- ❌ Doesn’t delete consumer group
- ❌ Doesn’t delete DLQ
- ❌ Doesn’t delete agent from storage
- Temporary maintenance
- Debugging issues
- Rate limiting
- Gradual rollout
- Simply restart the agent runner
- Agent automatically resumes from where it left off
delete_agent(topic, agent_name, delete_group=True, delete_dlq=False)
Permanently remove agent (complete cleanup).
Parameters:
topic(str): Topic nameagent_name(str): Agent namedelete_group(bool, default=True): Delete consumer group from Redisdelete_dlq(bool, default=False): Delete dead-letter queue
bool - True if deleted, False if not found
Example:
- ✅ Stops consuming messages (unsubscribes)
- ✅ Deletes consumer group from Redis (if
delete_group=True) - ✅ Deletes DLQ from Redis (if
delete_dlq=True) - ✅ Deletes agent from storage
- Agent no longer needed
- Decommissioning service
- Cleanup after testing
- Topic migration
- Cannot resume after deletion
- Pending messages lost (if
delete_group=True) - Failed messages lost (if
delete_dlq=True)
delete_topic(topic)
Delete all agents for a topic.
Parameters:
topic(str): Topic name
int - Number of agents deleted
Example:
- Cleanup entire topic
- Decommission feature
- Testing cleanup
Result Methods
get_result(task_id)
Retrieve result for a published task.
Parameters:
task_id(str): Task ID returned frompublish_task()
Optional[Dict] - Result or None if not found/expired
Example:
- Results stored for 24 hours (default TTL)
- Returns None if:
- Not processed yet
- Expired (> 24 hours)
- Agent returned None
- Agent failed (check DLQ)
- Request-reply pattern
- API endpoints
- Synchronous operations
- Result polling
list_results(limit=100)
List recent task results.
Parameters:
limit(int, default=100): Max results to return
List[Dict] - List of results (most recent first)
Example:
- Recent activity monitoring
- Debugging
- Result inspection
- Dashboards
delete_result(task_id)
Delete a specific task result.
Parameters:
task_id(str): Task ID
bool - True if deleted, False if not found
Example:
- Manual cleanup
- Privacy compliance (GDPR)
- Test cleanup
Metrics Methods
metrics(topic=None)
Get aggregated performance metrics.
Parameters:
topic(str, optional): Filter by topic (None = all topics)
Dict[str, Dict[str, Dict]] - Nested metrics structure
Example:
- Performance monitoring
- SLA tracking
- Capacity planning
- Dashboard metrics
Health & Status Methods
health()
Get comprehensive system health status.
Parameters: None
Returns: Dict - Health status
Example:
RUNNING: Runner active, all systems healthyREADY: No runner, but event bus and storage healthyDEGRADED: One system unhealthyDOWN: Both event bus and storage unhealthy
- Health checks
- Monitoring
- Kubernetes probes
- Load balancer health
storage_health()
Get detailed storage health information.
Parameters: None
Returns: Dict - Storage health details
Example:
- Storage monitoring
- Capacity planning
- Performance tuning
Runner Methods
start()
Start the agent runner.
Parameters: None
Returns: None
Example:
- Starts consuming messages
- Activates all registered agents
- Records start time
- Sets runner status to RUNNING
- Start agent processing
- Production deployment
- Development testing
shutdown()
Stop the agent runner gracefully.
Parameters: None
Returns: None
Example:
- Stops consuming messages
- Closes event bus connection
- Closes storage connection
- Cleans up resources
- Deletes start time from storage
- Graceful shutdown
- Cleanup on exit
- Deployment updates
Event Bus Monitoring Methods
(Only available whenEVENT_BUS_TYPE=redis_stream)
list_streams()
List all Redis streams.
Parameters: None
Returns: List[Dict] - Stream information
Raises: ValueError if event bus is not Redis Streams
Example:
inspect_stream(topic, limit=10)
View recent messages in a stream.
Parameters:
topic(str): Topic namelimit(int, default=10): Max messages to return
List[Dict] - Messages
Raises: ValueError if event bus is not Redis Streams
Example:
list_groups(topic)
List consumer groups for a topic.
Parameters:
topic(str): Topic name
List[Dict] - Consumer groups
Raises: ValueError if event bus is not Redis Streams
Example:
inspect_dlq(topic, limit=10)
Inspect dead-letter queue for a topic.
Parameters:
topic(str): Topic namelimit(int, default=10): Max entries to return
List[Dict] - DLQ entries
Raises: ValueError if event bus is not Redis Streams
Example:
get_bus_stats()
Get comprehensive event bus statistics.
Parameters: None
Returns: Dict - Event bus stats
Raises: ValueError if event bus is not Redis Streams
Example:
Storage Operations Methods
clear_agents()
Delete all agents from storage.
Parameters: None
Returns: int - Number of agents deleted
Example:
clear_results()
Delete all task results from storage.
Parameters: None
Returns: int - Number of results deleted
Example:
clear_metrics()
Delete all metrics from storage.
Parameters: None
Returns: int - Number of metrics deleted
Example:
clear_all()
Delete all data from storage (agents, results, metrics, config).
Parameters: None
Returns: Dict[str, int] - Counts by type
Example:
Configuration Methods
save_config(key, value)
Save configuration value to storage.
Parameters:
key(str): Configuration keyvalue(Any): Configuration value (JSON-serializable)
None
Example:
get_config(key, default=None)
Get configuration value from storage.
Parameters:
key(str): Configuration keydefault(Any, optional): Default value if key not found
Complete Example
Further Reading
- Event Bus Architecture - Understanding the event bus
- Storage Architecture - Understanding storage
- OmniCore Agent Example - Complete working example
- Google ADK Example - Google ADK integration
- Publisher Example - Publishing tasks
- Common Patterns - Production patterns
Summary
Key Methods:- Publishing:
publish_task() - Agents:
register_agent(),list_agents(),get_agent(),delete_agent(),unsubscribe_agent() - Results:
get_result(),list_results(),delete_result() - Metrics:
metrics() - Health:
health(),storage_health() - Runner:
start(),shutdown() - Bus:
list_streams(),inspect_stream(),list_groups(),inspect_dlq(),get_bus_stats() - Storage:
clear_agents(),clear_results(),clear_metrics(),clear_all() - Config:
save_config(),get_config()