Skip to main content

Agent Lifecycle Management

This page explains the complete lifecycle of an agent in OmniDaemon, from registration to deletion, including pausing and resuming operations.

Overview

An agent in OmniDaemon goes through several states during its lifetime:
┌──────────────┐
│  Unregistered│
└──────────────┘

       │ register_agent()

┌──────────────┐
│  Registered  │◀───────┐
│  & Active    │        │
└──────────────┘        │
       │                │ restart runner
       │ unsubscribe()  │
       ▼                │
┌──────────────┐        │
│    Paused    │────────┘
│ (Unsubscribed)
└──────────────┘

       │ delete_agent()

┌──────────────┐
│   Deleted    │
└──────────────┘
Let’s explore each stage in detail.

1. Registration

Registration is when you tell OmniDaemon about your agent and what topic it should listen to.

Simple Registration

import asyncio
from omnidaemon import OmniDaemonSDK
from omnidaemon import AgentConfig

sdk = OmniDaemonSDK()

async def my_agent(message: dict):
    """Your AI agent logic"""
    content = message.get("content", {})
    # Process the content
    return {"status": "processed"}

async def main():
    # Register agent - REQUIRED: topic and callback
    await sdk.register_agent(
        agent_config=AgentConfig(
            topic="file.uploaded",
            callback=my_agent,
        )
    )
    
    await sdk.start()
    
    try:
        while True:
            await asyncio.sleep(1)
    except KeyboardInterrupt:
        pass
    finally:
        await sdk.shutdown()

asyncio.run(main())

What Happens During Registration

  1. Agent metadata stored in the storage backend:
    {
        "name": "file_uploaded_agent",
        "topic": "file.uploaded",
        "description": null,
        "version": "1.0.0",
        "registered_at": "2025-03-12T10:30:00Z",
        "config": {
            "max_retries": 3,
            "reclaim_idle_ms": 300000,
            "dlq_enabled": true
        }
    }
    
  2. Consumer group created on the event bus:
    Topic: file.uploaded
    Consumer Group: file_uploaded_agent
    Status: Active
    
  3. Agent starts listening for events on the topic
  4. Start time recorded in storage (for health checks and uptime tracking)

Registration Options

await sdk.register_agent(
    agent_config=AgentConfig(
        # REQUIRED
        topic="file.uploaded",           # Topic to listen to
        callback=my_agent,                # Function to call
        
        # OPTIONAL
        name="file-processor",            # Default: auto-generated from topic
        description="Processes uploads",  # Default: None
        version="1.0.0",                  # Default: "1.0.0"
        tags=["file", "upload"],          # Default: []
        
        # SUBSCRIPTION CONFIG (Optional)
        config=SubscriptionConfig(
            max_retries=3,                # Default: 3
            reclaim_idle_ms=300000,       # Default: 300000 (5 min)
            dlq_enabled=True,             # Default: True
        )
    )
)

Multiple Agents

You can register multiple agents in the same runner:
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
    )
)

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="image.uploaded",
        callback=process_image,
    )
)

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="analysis.requested",
        callback=analyze_data,
    )
)

await sdk.start()  # All agents start listening

2. Active State (Processing)

Once registered and started, your agent is active and processing events.

Event Processing Flow

1. Event published to topic
   └─▶ Event bus receives message

2. Event bus delivers to consumer group
   └─▶ One agent in the group receives it

3. Agent callback executed
   └─▶ Your agent logic runs

4. Result returned
   └─▶ Success or failure

5. If success:
   └─▶ Message acknowledged and removed
   └─▶ Result stored (if configured)
   └─▶ Metrics updated

6. If failure:
   └─▶ Retry (up to max_retries)
   └─▶ If still failing → DLQ

Example Processing

async def process_file(message: dict):
    """
    This function is called for each event.
    
    message contains the full EventEnvelope:
    - content: Your data
    - correlation_id: Request tracking
    - tenant_id: Multi-tenancy
    - source: Event origin
    - webhook: Callback URL
    - reply_to: Response topic
    - etc.
    """
    
    # Extract data
    content = message.get("content", {})
    filename = content.get("filename")
    
    # Your AI agent logic
    result = await ai_process_file(filename)
    
    # Return result
    return {
        "status": "success",
        "processed_file": filename,
        "result": result
    }

Monitoring Active Agents

# List all registered agents
omnidaemon agent list

# Get specific agent details
omnidaemon agent get --topic file.uploaded --name file-processor

# Check metrics
omnidaemon metrics --topic file.uploaded

# System health
omnidaemon health

3. Pausing (Unsubscribe)

Sometimes you want to pause an agent without deleting it completely. This is useful for:
  • Temporary maintenance
  • Testing changes before deploying
  • Scaling down during low traffic
  • Debugging issues

How to Pause

Via SDK:
await sdk.unsubscribe_agent(
    topic="file.uploaded",
    agent_name="file-processor"
)
Via CLI:
omnidaemon agent unsubscribe \
    --topic file.uploaded \
    --name file-processor
Via API:
curl -X POST http://localhost:8765/agents/file.uploaded/file-processor/unsubscribe

What Happens When Paused

  1. Agent stops consuming new messages
    • No new events delivered to this agent
    • Other agents in the group (if any) continue
  2. Consumer group remains on event bus
    • Message history preserved
    • Position in stream maintained
  3. Agent metadata kept in storage
    • Still shows as registered
    • Can see configuration and history
  4. DLQ preserved (if it exists)
    • Failed messages not deleted
    • Can still inspect and retry

Resuming from Pause

To resume a paused agent, simply restart your agent runner:
python agent_runner.py
The agent will:
  • Re-register with same configuration
  • Resume from where it left off
  • Start processing new events
Why this works:
  • Consumer group still exists
  • Event bus remembers last processed position
  • Agent picks up from there

4. Deletion

Deletion permanently removes an agent and optionally cleans up its infrastructure.

Simple Delete (Keep Infrastructure)

Deletes agent from registry but keeps consumer group and DLQ: Via SDK:
await sdk.delete_agent(
    topic="file.uploaded",
    agent_name="file-processor",
    delete_group=False,  # Keep consumer group
    delete_dlq=False,    # Keep DLQ
)
Via CLI:
omnidaemon agent delete \
    --topic file.uploaded \
    --name file-processor
Use case: Temporary removal, planning to add agent back later

Complete Delete (Clean Everything)

Deletes agent AND cleans up all infrastructure: Via SDK:
await sdk.delete_agent(
    topic="file.uploaded",
    agent_name="file-processor",
    delete_group=True,  # Delete consumer group
    delete_dlq=True,    # Delete DLQ
)
Via CLI:
omnidaemon agent delete \
    --topic file.uploaded \
    --name file-processor \
    --cleanup \
    --delete-dlq
Use case: Permanently removing agent, won’t use again

What Happens During Deletion

Simple Delete:
  1. ✅ Agent removed from registry
  2. ✅ Stops processing new events
  3. ❌ Consumer group NOT deleted
  4. ❌ DLQ NOT deleted
  5. ❌ Message history preserved
Complete Delete:
  1. ✅ Agent removed from registry
  2. ✅ Stops processing new events
  3. ✅ Consumer group deleted
  4. ✅ DLQ deleted (if requested)
  5. ✅ All infrastructure cleaned up

Delete Entire Topic

Remove all agents for a topic: Via SDK:
await sdk.delete_topic(topic="file.uploaded")
Via CLI:
omnidaemon agent delete-topic --topic file.uploaded
This removes ALL agents subscribed to the topic.

5. Graceful Shutdown

When you stop your agent runner (Ctrl+C), OmniDaemon performs a graceful shutdown:
try:
    while True:
        await asyncio.sleep(1)
except KeyboardInterrupt:
    pass
finally:
    await sdk.shutdown()  # Graceful shutdown

What Happens During Shutdown

  1. Stop accepting new messages
    • Agent unsubscribes from topic
    • No new events delivered
  2. Finish current processing
    • In-flight messages completed
    • Results stored
  3. Clean up resources
    • Close event bus connections
    • Close storage connections
    • Release memory
  4. Clear start time from storage
    • Health checks show “stopped”
    • Uptime resets
  5. Exit cleanly
    • No hanging processes
    • No orphaned connections

Handling Signals

OmniDaemon handles these signals gracefully:
# KeyboardInterrupt (Ctrl+C)
except KeyboardInterrupt:
    pass

# System signals
import signal

def signal_handler(sig, frame):
    asyncio.create_task(sdk.shutdown())

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

Lifecycle Management Strategies

Development

# Quick iteration
# Just Ctrl+C and restart
python agent_runner.py

# Make changes...
# Ctrl+C

python agent_runner.py  # Resumes automatically

Staging

# Deploy new version
omnidaemon agent unsubscribe --topic file.uploaded --name file-processor
# Old version stops processing

python new_agent_runner.py
# New version starts

# If issues, rollback:
omnidaemon agent unsubscribe --topic file.uploaded --name file-processor-v2
python old_agent_runner.py  # Old version resumes

Production

# Blue-Green Deployment

# 1. Deploy v2 (runs alongside v1)
python agent_runner_v2.py &

# 2. Monitor metrics
omnidaemon metrics --topic file.uploaded

# 3. If good, remove v1
omnidaemon agent delete --topic file.uploaded --name file-processor-v1

# 4. If issues, remove v2
omnidaemon agent delete --topic file.uploaded --name file-processor-v2

Health Monitoring

Check Agent Status

# Overall system health
omnidaemon health

# Specific agent
omnidaemon agent get --topic file.uploaded --name file-processor

# Processing metrics
omnidaemon metrics --topic file.uploaded

Real-Time Status

The health command shows real-time status:
omnidaemon health
Output:
Status: running          # Agent runner is active
Registered Agents: 3     # Total agents registered
Active Consumers: 3      # Agents actively processing
Uptime: 2h 15m          # Time since start
Status Values:
  • running - Agent runner active, processing events
  • stopped - Agent runner not running (but agents still registered)
  • ready - No agents registered, infrastructure healthy
  • degraded - Some infrastructure unhealthy
  • down - Critical infrastructure down

Advanced Patterns

Conditional Registration

Register agents based on environment:
import os

# Only register in production
if os.getenv("ENV") == "production":
    await sdk.register_agent(
        agent_config=AgentConfig(
            topic="billing.process",
            callback=process_billing,
        )
    )

# Only in development
if os.getenv("ENV") == "development":
    await sdk.register_agent(
        agent_config=AgentConfig(
            topic="test.events",
            callback=debug_handler,
        )
    )

Dynamic Agent Registration

Register agents based on configuration:
import yaml

# Load agent config from file
with open("agents.yaml") as f:
    agents_config = yaml.safe_load(f)

# Register each agent
for agent in agents_config["agents"]:
    callback = import_callback(agent["callback_module"])
    
    await sdk.register_agent(
        agent_config=AgentConfig(
            topic=agent["topic"],
            callback=callback,
            name=agent["name"],
        )
    )

Health-Check-Based Registration

Only register if dependencies are healthy:
# Check dependencies
if await check_database_health():
    await sdk.register_agent(...)
else:
    logger.error("Database unhealthy, skipping registration")

Best Practices

1. Use Descriptive Names

# ✅ Good
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
        name="file-processor-v2",
        description="Extracts text from uploaded files",
    )
)

# ❌ Bad
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
        name="agent1",
    )
)

2. Version Your Agents

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=process_file,
        name="file-processor",
        version="2.1.0",  # Track versions!
    )
)

3. Handle Shutdown Gracefully

async def main():
    await sdk.register_agent(...)
    await sdk.start()
    
    try:
        while True:
            await asyncio.sleep(1)
    except (KeyboardInterrupt, asyncio.CancelledError):
        pass  # Graceful exit
    finally:
        await sdk.shutdown()  # Always cleanup

4. Monitor Agent Health

# Periodic health check
async def monitor_health():
    while True:
        health = await sdk.health()
        if health["status"] != "running":
            logger.warning(f"System unhealthy: {health}")
        await asyncio.sleep(60)

# Run in background
asyncio.create_task(monitor_health())

5. Use Unsubscribe for Maintenance

# Before deploying changes
omnidaemon agent unsubscribe --topic file.uploaded --name file-processor

# Deploy changes...
# Test...

# If good, restart
python agent_runner.py

# If bad, rollback and restart
git checkout previous-version
python agent_runner.py

Troubleshooting

Agent Not Processing Events

# 1. Check if agent is registered
omnidaemon agent list

# 2. Check if agent is active
omnidaemon health

# 3. Check consumer groups
omnidaemon bus groups --stream file.uploaded

# 4. Check for errors in DLQ
omnidaemon bus dlq --topic file.uploaded

Agent Stuck After Shutdown

# 1. Check for running processes
ps aux | grep agent_runner

# 2. Force kill if needed
kill -9 <PID>

# 3. Clear stale data
omnidaemon storage clear-agents

Can’t Delete Agent

# 1. Unsubscribe first
omnidaemon agent unsubscribe --topic file.uploaded --name file-processor

# 2. Then delete
omnidaemon agent delete --topic file.uploaded --name file-processor

# 3. If still issues, force cleanup
omnidaemon agent delete \
    --topic file.uploaded \
    --name file-processor \
    --cleanup \
    --delete-dlq

Further Reading


Summary

Agent Lifecycle States:
  1. Unregistered - Agent doesn’t exist yet
  2. Registered & Active - Agent processing events
  3. Paused (Unsubscribed) - Agent stopped, infrastructure preserved
  4. Deleted - Agent removed, optionally clean up infrastructure
Key Operations:
  • register_agent() - Create and start agent
  • unsubscribe_agent() - Pause agent (keep infrastructure)
  • delete_agent() - Remove agent (optionally clean up)
  • shutdown() - Graceful stop
Monitoring:
  • omnidaemon health - Real-time system status
  • omnidaemon agent list - All registered agents
  • omnidaemon metrics - Processing statistics