Skip to main content

SDK API Reference

Complete API reference for the OmniDaemon SDK (OmniDaemonSDK class).

Overview

The OmniDaemonSDK is the primary interface for interacting with OmniDaemon. It provides methods for:
  • Publishing events
  • Registering agents
  • Managing agents (list, get, delete, unsubscribe)
  • Retrieving results
  • Getting metrics
  • Monitoring health
  • Event bus operations
  • Storage operations
Import:
from omnidaemon import OmniDaemonSDK

Constructor

OmniDaemonSDK(event_bus=None, store=None)

Create a new SDK instance with dependency injection. Parameters:
  • event_bus (Optional[BaseEventBus]): Custom event bus instance. Defaults to module-level instance.
  • store (Optional[BaseStore]): Custom storage instance. Defaults to module-level instance.
Returns: OmniDaemonSDK instance Example:
# Default (uses environment variables)
sdk = OmniDaemonSDK()

# Custom event bus and storage
from omnidaemon.event_bus.redis_stream_bus import RedisStreamEventBus
from omnidaemon.storage.redis_store import RedisStore

custom_bus = RedisStreamEventBus(redis_url="redis://custom:6379")
custom_store = RedisStore(redis_url="redis://custom:6379")

sdk = OmniDaemonSDK(event_bus=custom_bus, store=custom_store)
Notes:
  • SDK uses dependency injection for flexibility
  • Environment variables configure default instances
  • Custom instances useful for testing

Publishing Methods

publish_task(event_envelope)

Publish an event to a topic. Parameters:
  • event_envelope (EventEnvelope): Event to publish
Returns: str - Task ID (UUID) Raises:
  • ValidationError: Invalid event envelope
  • Exception: Publishing failed
Example:
from omnidaemon import EventEnvelope, PayloadBase

# Simple publish
event = EventEnvelope(
    topic="file.uploaded",
    payload=PayloadBase(
        content={"filename": "doc.pdf"}
    ),
)

task_id = await sdk.publish_task(event_envelope=event)
print(f"Task ID: {task_id}")
# Output: Task ID: 550e8400-e29b-41d4-a716-446655440000
Full Example:
# All parameters
event = EventEnvelope(
    topic="file.uploaded",
    id="custom-task-id-123",  # Optional: auto-generated if None
    correlation_id="req-456",
    causation_id="evt-789",
    source="api-gateway",
    tenant_id="customer-acme",
    payload=PayloadBase(
        content={"filename": "doc.pdf", "size": 1024},
        webhook="https://api.example.com/callback",
        reply_to="file.processed",
    ),
)

task_id = await sdk.publish_task(event_envelope=event)
Use Cases:
  • Trigger agent processing
  • Send events to topics
  • Start workflows
  • Notify agents

Agent Management Methods

register_agent(agent_config)

Register an agent to listen on a topic. Parameters:
  • agent_config (AgentConfig): Agent configuration
Returns: None Raises:
  • ValidationError: Invalid agent config
  • Exception: Registration failed
Example:
from omnidaemon import AgentConfig, SubscriptionConfig

async def my_callback(message: dict):
    content = message.get("content", {})
    # Process...
    return {"status": "success"}

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=my_callback,
        name="file-processor",  # Optional: auto-generated
        description="Processes uploaded files",
        tools=["file_reader", "ocr"],
        config=SubscriptionConfig(
            consumer_count=3,
            reclaim_idle_ms=300000,
            dlq_retry_limit=3,
        ),
    )
)
AgentConfig Parameters:
  • topic (str, required): Topic to subscribe to
  • callback (Callable, required): Function to call when message arrives
  • name (str, optional): Agent name (default: auto-generated from callback name)
  • description (str, optional): Human-readable description
  • tools (List[str], optional): List of tools/capabilities
  • config (SubscriptionConfig, optional): Subscription configuration
SubscriptionConfig Parameters:
  • consumer_count (int, default=1): Number of parallel consumers
  • reclaim_idle_ms (int, default=180000): Reclaim timeout (3 minutes)
  • dlq_retry_limit (int, default=3): Max retries before DLQ
  • group_name (str, optional): Custom group name (auto-generated if None)
  • consumer_name (str, optional): Custom consumer name (auto-generated if None)
Use Cases:
  • Register AI agents
  • Set up event handlers
  • Configure processing pipelines
  • Scale with multiple consumers

list_agents()

List all registered agents grouped by topic. Parameters: None Returns: Dict[str, List[Dict]] - Agents grouped by topic Example:
agents = await sdk.list_agents()

# Output structure:
# {
#     "file.uploaded": [
#         {
#             "name": "file-processor",
#             "tools": ["file_reader", "ocr"],
#             "description": "Processes uploaded files",
#             "callback": "my_callback",
#             "config": {"consumer_count": 3, ...}
#         }
#     ],
#     "analysis.requested": [...]
# }

# Iterate
for topic, agent_list in agents.items():
    print(f"Topic: {topic}")
    for agent in agent_list:
        print(f"  - {agent['name']}: {agent['description']}")
Use Cases:
  • Monitor registered agents
  • Audit agent configuration
  • Build dashboards
  • CLI tools

get_agent(topic, agent_name)

Get detailed information about a specific agent. Parameters:
  • topic (str): Topic name
  • agent_name (str): Agent name
Returns: Optional[Dict] - Agent details or None if not found Example:
agent = await sdk.get_agent("file.uploaded", "file-processor")

if agent:
    print(f"Name: {agent['name']}")
    print(f"Description: {agent['description']}")
    print(f"Callback: {agent['callback']}")
    print(f"Config: {agent['config']}")
    print(f"Created: {agent['created_at']}")
else:
    print("Agent not found")
Use Cases:
  • Inspect agent configuration
  • Debugging
  • Health checks
  • CLI tools

unsubscribe_agent(topic, agent_name)

Temporarily stop agent processing (pause). Parameters:
  • topic (str): Topic name
  • agent_name (str): Agent name
Returns: bool - True if unsubscribed, False if not found Example:
# Pause agent
success = await sdk.unsubscribe_agent("file.uploaded", "file-processor")

if success:
    print("✅ Agent paused")
else:
    print("❌ Agent not found")

# To resume: restart the agent runner
# Agent will automatically resume processing
What It Does:
  • ✅ Stops consuming new messages
  • ✅ Keeps consumer group intact
  • ✅ Keeps DLQ preserved
  • ✅ Keeps agent data in storage
What It Doesn’t Do:
  • ❌ Doesn’t delete consumer group
  • ❌ Doesn’t delete DLQ
  • ❌ Doesn’t delete agent from storage
Use Cases:
  • Temporary maintenance
  • Debugging issues
  • Rate limiting
  • Gradual rollout
To Resume:
  • Simply restart the agent runner
  • Agent automatically resumes from where it left off

delete_agent(topic, agent_name, delete_group=True, delete_dlq=False)

Permanently remove agent (complete cleanup). Parameters:
  • topic (str): Topic name
  • agent_name (str): Agent name
  • delete_group (bool, default=True): Delete consumer group from Redis
  • delete_dlq (bool, default=False): Delete dead-letter queue
Returns: bool - True if deleted, False if not found Example:
# Simple delete (keeps DLQ for inspection)
success = await sdk.delete_agent("file.uploaded", "file-processor")

# Complete cleanup (delete everything)
success = await sdk.delete_agent(
    topic="file.uploaded",
    agent_name="file-processor",
    delete_group=True,
    delete_dlq=True
)

if success:
    print("✅ Agent deleted")
else:
    print("❌ Agent not found")
What It Does:
  • ✅ Stops consuming messages (unsubscribes)
  • ✅ Deletes consumer group from Redis (if delete_group=True)
  • ✅ Deletes DLQ from Redis (if delete_dlq=True)
  • ✅ Deletes agent from storage
When to Use:
  • Agent no longer needed
  • Decommissioning service
  • Cleanup after testing
  • Topic migration
Warning:
  • Cannot resume after deletion
  • Pending messages lost (if delete_group=True)
  • Failed messages lost (if delete_dlq=True)

delete_topic(topic)

Delete all agents for a topic. Parameters:
  • topic (str): Topic name
Returns: int - Number of agents deleted Example:
count = await sdk.delete_topic("file.uploaded")
print(f"Deleted {count} agents")
Use Cases:
  • Cleanup entire topic
  • Decommission feature
  • Testing cleanup

Result Methods

get_result(task_id)

Retrieve result for a published task. Parameters:
  • task_id (str): Task ID returned from publish_task()
Returns: Optional[Dict] - Result or None if not found/expired Example:
# Publish task
task_id = await sdk.publish_task(event_envelope)

# Wait a bit
await asyncio.sleep(2)

# Get result
result = await sdk.get_result(task_id)

if result is not None:
    print(f"✅ Result: {result}")
else:
    print("❌ Result not ready or expired")
Notes:
  • Results stored for 24 hours (default TTL)
  • Returns None if:
    • Not processed yet
    • Expired (> 24 hours)
    • Agent returned None
    • Agent failed (check DLQ)
Use Cases:
  • Request-reply pattern
  • API endpoints
  • Synchronous operations
  • Result polling

list_results(limit=100)

List recent task results. Parameters:
  • limit (int, default=100): Max results to return
Returns: List[Dict] - List of results (most recent first) Example:
results = await sdk.list_results(limit=50)

for result_data in results:
    print(f"Task: {result_data['task_id']}")
    print(f"Result: {result_data['result']}")
    print(f"Saved: {result_data['saved_at']}")
Use Cases:
  • Recent activity monitoring
  • Debugging
  • Result inspection
  • Dashboards

delete_result(task_id)

Delete a specific task result. Parameters:
  • task_id (str): Task ID
Returns: bool - True if deleted, False if not found Example:
success = await sdk.delete_result(task_id)

if success:
    print("✅ Result deleted")
else:
    print("❌ Result not found")
Use Cases:
  • Manual cleanup
  • Privacy compliance (GDPR)
  • Test cleanup

Metrics Methods

metrics(topic=None)

Get aggregated performance metrics. Parameters:
  • topic (str, optional): Filter by topic (None = all topics)
Returns: Dict[str, Dict[str, Dict]] - Nested metrics structure Example:
# All metrics
metrics = await sdk.metrics()

# Structure:
# {
#     "file.uploaded": {
#         "file-processor": {
#             "received": 150,
#             "processed": 145,
#             "failed": 5,
#             "avg_processing_time_ms": 1350.5
#         }
#     }
# }

# Filter by topic
file_metrics = await sdk.metrics(topic="file.uploaded")

# Iterate
for topic, agents in metrics.items():
    for agent, stats in agents.items():
        success_rate = stats["processed"] / stats["received"] * 100
        print(f"{agent}: {success_rate:.1f}% success rate")
Use Cases:
  • Performance monitoring
  • SLA tracking
  • Capacity planning
  • Dashboard metrics

Health & Status Methods

health()

Get comprehensive system health status. Parameters: None Returns: Dict - Health status Example:
health = await sdk.health()

# Structure:
# {
#     "status": "RUNNING",  # RUNNING, READY, DEGRADED, DOWN
#     "runner": {
#         "id": "runner-abc-123",
#         "uptime": 3600.5,  # seconds
#         "running": True
#     },
#     "event_bus": {
#         "type": "redis_stream",
#         "status": "healthy",
#         "error": None
#     },
#     "storage": {
#         "backend": "redis",
#         "status": "healthy",
#         "error": None
#     }
# }

print(f"Status: {health['status']}")
print(f"Uptime: {health['runner']['uptime']} seconds")
print(f"Event Bus: {health['event_bus']['status']}")
print(f"Storage: {health['storage']['status']}")
Status Values:
  • RUNNING: Runner active, all systems healthy
  • READY: No runner, but event bus and storage healthy
  • DEGRADED: One system unhealthy
  • DOWN: Both event bus and storage unhealthy
Use Cases:
  • Health checks
  • Monitoring
  • Kubernetes probes
  • Load balancer health

storage_health()

Get detailed storage health information. Parameters: None Returns: Dict - Storage health details Example:
health = await sdk.storage_health()

# Redis backend:
# {
#     "status": "healthy",
#     "backend": "redis",
#     "redis_url": "redis://localhost:6379",
#     "latency_ms": 0.85,
#     "redis_version": "7.0.12",
#     "used_memory": "15.2 MB",
#     "connected_clients": 4
# }

# JSON backend:
# {
#     "status": "healthy",
#     "backend": "json",
#     "storage_dir": ".omnidaemon_data",
#     "agents_count": 2,
#     "results_count": 45,
#     "metrics_count": 890
# }
Use Cases:
  • Storage monitoring
  • Capacity planning
  • Performance tuning

Runner Methods

start()

Start the agent runner. Parameters: None Returns: None Example:
# Register agents first
await sdk.register_agent(agent_config)

# Start runner
await sdk.start()

# Keep alive
while True:
    await asyncio.sleep(1)
What It Does:
  • Starts consuming messages
  • Activates all registered agents
  • Records start time
  • Sets runner status to RUNNING
Use Cases:
  • Start agent processing
  • Production deployment
  • Development testing

shutdown()

Stop the agent runner gracefully. Parameters: None Returns: None Example:
try:
    await sdk.start()
    while True:
        await asyncio.sleep(1)
except KeyboardInterrupt:
    print("Shutting down...")
finally:
    await sdk.shutdown()
    print("Shutdown complete")
What It Does:
  • Stops consuming messages
  • Closes event bus connection
  • Closes storage connection
  • Cleans up resources
  • Deletes start time from storage
Use Cases:
  • Graceful shutdown
  • Cleanup on exit
  • Deployment updates

Event Bus Monitoring Methods

(Only available when EVENT_BUS_TYPE=redis_stream)

list_streams()

List all Redis streams. Parameters: None Returns: List[Dict] - Stream information Raises: ValueError if event bus is not Redis Streams Example:
streams = await sdk.list_streams()

# [
#     {"stream": "omni-stream:file.uploaded", "length": 1523},
#     {"stream": "omni-stream:analysis.requested", "length": 450}
# ]

for stream in streams:
    topic = stream["stream"].replace("omni-stream:", "")
    print(f"{topic}: {stream['length']} messages")

inspect_stream(topic, limit=10)

View recent messages in a stream. Parameters:
  • topic (str): Topic name
  • limit (int, default=10): Max messages to return
Returns: List[Dict] - Messages Raises: ValueError if event bus is not Redis Streams Example:
messages = await sdk.inspect_stream("file.uploaded", limit=10)

for msg in messages:
    print(f"ID: {msg['id']}")
    print(f"Data: {msg['data']}")

list_groups(topic)

List consumer groups for a topic. Parameters:
  • topic (str): Topic name
Returns: List[Dict] - Consumer groups Raises: ValueError if event bus is not Redis Streams Example:
groups = await sdk.list_groups("file.uploaded")

# [
#     {
#         "name": "group:file.uploaded:file-processor",
#         "consumers": 3,
#         "pending": 2
#     }
# ]

for group in groups:
    print(f"{group['name']}: {group['pending']} pending")

inspect_dlq(topic, limit=10)

Inspect dead-letter queue for a topic. Parameters:
  • topic (str): Topic name
  • limit (int, default=10): Max entries to return
Returns: List[Dict] - DLQ entries Raises: ValueError if event bus is not Redis Streams Example:
dlq = await sdk.inspect_dlq("file.uploaded", limit=10)

for entry in dlq:
    data = entry["data"]
    print(f"Failed: {data['error']}")
    print(f"Retries: {data['retry_count']}")
    print(f"Content: {data['failed_message']['content']}")

get_bus_stats()

Get comprehensive event bus statistics. Parameters: None Returns: Dict - Event bus stats Raises: ValueError if event bus is not Redis Streams Example:
stats = await sdk.get_bus_stats()

# {
#     "snapshot": {
#         "streams": 3,
#         "consumer_groups": 5,
#         "total_messages": 2098,
#         "dlq_entries": 12
#     },
#     "redis_info": {
#         "version": "7.0.12",
#         "used_memory": "2.3 MB",
#         "connected_clients": 4
#     }
# }

Storage Operations Methods

clear_agents()

Delete all agents from storage. Parameters: None Returns: int - Number of agents deleted Example:
count = await sdk.clear_agents()
print(f"Deleted {count} agents")
Warning: This only clears storage. Consumer groups remain in Redis.

clear_results()

Delete all task results from storage. Parameters: None Returns: int - Number of results deleted Example:
count = await sdk.clear_results()
print(f"Deleted {count} results")

clear_metrics()

Delete all metrics from storage. Parameters: None Returns: int - Number of metrics deleted Example:
count = await sdk.clear_metrics()
print(f"Deleted {count} metrics")

clear_all()

Delete all data from storage (agents, results, metrics, config). Parameters: None Returns: Dict[str, int] - Counts by type Example:
counts = await sdk.clear_all()

# {
#     "agents": 8,
#     "results": 156,
#     "metrics": 2450,
#     "config": 3
# }

print(f"Deleted:")
print(f"  Agents: {counts['agents']}")
print(f"  Results: {counts['results']}")
print(f"  Metrics: {counts['metrics']}")
print(f"  Config: {counts['config']}")
Warning: This is destructive and cannot be undone!

Configuration Methods

save_config(key, value)

Save configuration value to storage. Parameters:
  • key (str): Configuration key
  • value (Any): Configuration value (JSON-serializable)
Returns: None Example:
await sdk.save_config("webhook_url", "https://api.example.com/callback")
await sdk.save_config("max_retries", 5)
await sdk.save_config("features", ["ocr", "nlp"])

get_config(key, default=None)

Get configuration value from storage. Parameters:
  • key (str): Configuration key
  • default (Any, optional): Default value if key not found
Returns: Configuration value or default Example:
webhook = await sdk.get_config("webhook_url")
max_retries = await sdk.get_config("max_retries", default=3)
features = await sdk.get_config("features", default=[])

print(f"Webhook: {webhook}")
print(f"Max Retries: {max_retries}")
print(f"Features: {features}")

Complete Example

import asyncio
from omnidaemon import OmniDaemonSDK
from omnidaemon import AgentConfig, EventEnvelope, PayloadBase

sdk = OmniDaemonSDK()

async def process_file(message: dict):
    """Agent callback."""
    content = message.get("content", {})
    filename = content.get("filename")
    
    # Process file
    result = f"Processed {filename}"
    
    return {"status": "success", "result": result}

async def main():
    # Register agent
    await sdk.register_agent(
        agent_config=AgentConfig(
            topic="file.uploaded",
            callback=process_file,
        )
    )
    
    # Start runner
    await sdk.start()
    
    # Publish event
    task_id = await sdk.publish_task(
        EventEnvelope(
            topic="file.uploaded",
            payload=PayloadBase(
                content={"filename": "doc.pdf"}
            ),
        )
    )
    
    # Wait for result
    await asyncio.sleep(2)
    result = await sdk.get_result(task_id)
    print(f"Result: {result}")
    
    # Get metrics
    metrics = await sdk.metrics()
    print(f"Metrics: {metrics}")
    
    # Health check
    health = await sdk.health()
    print(f"Status: {health['status']}")
    
    # Cleanup
    await sdk.shutdown()

asyncio.run(main())

Further Reading


Summary

Key Methods:
  • Publishing: publish_task()
  • Agents: register_agent(), list_agents(), get_agent(), delete_agent(), unsubscribe_agent()
  • Results: get_result(), list_results(), delete_result()
  • Metrics: metrics()
  • Health: health(), storage_health()
  • Runner: start(), shutdown()
  • Bus: list_streams(), inspect_stream(), list_groups(), inspect_dlq(), get_bus_stats()
  • Storage: clear_agents(), clear_results(), clear_metrics(), clear_all()
  • Config: save_config(), get_config()
All methods are async and should be awaited! 🚀