Skip to main content

OmniCore Agent Example

Learn how to run OmniCore Agent in production with OmniDaemon using the Supervisor pattern.
πŸ’‘ Working code: examples/agents_with_supervisors/

The Challenge

When running AI agents in production, you face a critical problem: one agent crash shouldn’t kill your entire system. Most AI frameworks run everything in a single Python process:
  • ❌ Agent crashes β†’ entire process dies
  • ❌ Memory leak in one agent β†’ affects all agents
  • ❌ No isolation β†’ agents share the same memory space
  • ❌ Exception in agent code β†’ takes down the whole system
This isn’t acceptable for production. You need fault isolation.

The Supervisor Solution

OmniDaemon solves this with Agent Supervisors - lightweight process managers for your AI agents.

What is a Supervisor?

A Supervisor is a component that runs your agent in its own isolated process and manages its lifecycle. Think of it like Docker containers, but for Python AI agents:
  • Each agent runs in a separate process (different PID)
  • Supervisor monitors the agent process health
  • Automatically restarts crashed agents
  • Handles clean shutdown and resource cleanup
  • Communicates with agent via stdio (JSON protocol)

Why This Matters

Without Supervisor (all in one process):
Main Process (PID: 1234)
β”œβ”€β”€ OmniDaemon
β”œβ”€β”€ Agent A ← crashes
└── Agent B ← also dies! ❌
With Supervisor (process isolation):
Main Process (PID: 1234)
β”œβ”€β”€ OmniDaemon
β”œβ”€β”€ Supervisor A β†’ Agent A Process (PID: 5678) ← crashes
β”‚                  ↓ auto-restarts βœ…
└── Supervisor B β†’ Agent B Process (PID: 5679) ← keeps running βœ…
Production Benefits:
  • βœ… Fault Isolation - Agent A crashes, Agent B keeps running
  • βœ… Auto-Recovery - Crashed agents restart automatically
  • βœ… Resource Isolation - Each agent has its own memory/CPU space
  • βœ… Clean Shutdown - Supervisors handle graceful termination
πŸ“š Deep Dive: Agent Supervisor Architecture explains the technical details of how supervisors work under the hood.

How It Works

Instead of running your agent callback directly in the main process (risky), you wrap it in a supervisor.

❌ Simple Pattern (Development Only)

# Agent runs in main process - one crash kills everything
async def my_agent_callback(message: dict):
    agent = OmniAgent(...)
    return await agent.run(message["content"])

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="my.topic",
        callback=my_agent_callback  # Runs in main process!
    )
)
Problems:
  • No process isolation
  • Crashes affect everything
  • No auto-restart

βœ… Supervisor Pattern (Production-Ready)

# Agent runs in separate process managed by Supervisor
supervisor = await create_supervisor_from_directory(
    agent_name="my_agent",
    agent_dir="./my_agent",           # Directory with agent code
    callback_function="agent.callback" # Function to call
)

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="my.topic",
        callback=supervisor.handle_event  # Supervisor manages everything!
    )
)
Benefits:
  • Process isolation βœ…
  • Auto-restart on crash βœ…
  • Resource cleanup βœ…
The supervisor handles process spawning, health monitoring, stdio communication, and crash recovery automatically.
πŸ“š How it works internally: Complete Agent Process Flow shows the full lifecycle from event to response.

Implementation

Now that you understand WHY supervisors exist, let’s build a production-ready OmniCore Agent.

Prerequisites

# 1. Install OmniDaemon
pip install omnidaemon

# 2. Install OmniCore Agent
pip install omnicoreagent

# 3. Start Redis (event bus)
docker run -d -p 6379:6379 --name redis redis:latest

# 4. Set environment variables
export OPENAI_API_KEY=your_key_here
export REDIS_URL=redis://localhost:6379

Directory Structure

The Supervisor pattern requires your agent code in a directory:
my_agent/
β”œβ”€β”€ __init__.py
β”œβ”€β”€ agent.py           # Your agent logic (runs in isolated process)
└── requirements.txt   # Agent dependencies (installed in isolation)
πŸ’‘ Why a directory? The supervisor needs to spawn a new process with your agent code. Organizing it in a directory allows the supervisor to install dependencies in isolation and import your callback function properly.

Step 1: Agent Code

Create my_agent/agent.py with your OmniCore Agent logic:
from omnicoreagent import OmniAgent
import logging

logger = logging.getLogger(__name__)

# Initialize agent once (runs in separate process)
_agent = None

async def get_agent():
    """Lazy initialization - agent created on first use"""
    global _agent
    if _agent is None:
        _agent = OmniAgent(
            name="filesystem_agent",
            system_instruction="Help users manage files.",
            model_config={
                "provider": "openai",
                "model": "gpt-4o",
                "temperature": 0,
            },
            mcp_tools=[{
                "name": "filesystem",
                "command": "npx",
                "args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/folder"]
            }],
        )
        await _agent.connect_mcp_servers()
    return _agent

async def call_omnicore_agent(message: dict):
    """
    This callback runs in the ISOLATED PROCESS.
    
    The supervisor will:
    1. Spawn a new Python process  
    2. Import this module
    3. Call this function with the message
    4. Return the result back via stdio
    """
    agent = await get_agent()
    query = message.get("content", "")
    
    logger.info(f"Processing: {query}")
    result = await agent.run(query)
    
    return {
        "status": "success",
        "data": result.get("response", ""),
    }
Key Points:
  • This code runs in a separate process, not the main process
  • MCP tools connect once and stay connected
  • Clean separation of concerns

Step 2: Agent Runner

Create agent_runner.py that manages the supervisor:
import asyncio
import logging
from omnidaemon import OmniDaemonSDK, AgentConfig, SubscriptionConfig
from omnidaemon.agent_runner.agent_supervisor_runner import (
    create_supervisor_from_directory,
    shutdown_all_supervisors,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

sdk = OmniDaemonSDK()
_supervisor = None

async def get_supervisor():
    """Initialize supervisor - manages agent process"""
    global _supervisor
    if _supervisor is None:
        _supervisor = await create_supervisor_from_directory(
            agent_name="omnicore_filesystem_agent",
            agent_dir="./my_agent",  # Points to our agent directory
            callback_function="agent.call_omnicore_agent",  # Function to call
        )
    return _supervisor

async def call_supervised_agent(message: dict):
    """Wrapper that delegates to supervisor"""
    supervisor = await get_supervisor()
    return await supervisor.handle_event(message)

async def main():
    try:
        # Pre-initialize supervisor (spawns agent process)
        logger.info("Initializing supervisor...")
        await get_supervisor()
        
        # Register with OmniDaemon
        await sdk.register_agent(
            agent_config=AgentConfig(
                name="omnicore_filesystem_agent",
                topic="file.tasks",
                callback=call_supervised_agent,  # Supervisor handles events
                config=SubscriptionConfig(
                    reclaim_idle_ms=6000,
                    dlq_retry_limit=3,
                    consumer_count=3,
                ),
            )
        )
        
        logger.info("Starting OmniDaemon...")
        await sdk.start()
        logger.info("βœ… Agent running in isolated process. Press Ctrl+C to stop.")
        
        await asyncio.Event().wait()
        
    except KeyboardInterrupt:
        logger.info("Shutdown signal received")
    finally:
        logger.info("Shutting down...")
        await sdk.shutdown()  # Shutdown event bus
        await shutdown_all_supervisors()  # Shutdown agent processes
        logger.info("Shutdown complete")

if __name__ == "__main__":
    asyncio.run(main())
What Happens Here:
  1. create_supervisor_from_directory() spawns a new Python process
  2. Process imports my_agent.agent module
  3. Supervisor keeps process running, waiting for events
  4. When event arrives, supervisor sends it to agent via stdio
  5. Agent processes and returns result
  6. Supervisor sends result back to OmniDaemon

Step 3: Run It

python agent_runner.py
Expected output:
INFO - Initializing supervisor...
INFO - Starting OmniDaemon...
INFO - βœ… Agent running in isolated process. Press Ctrl+C to stop.
The agent is now running in its own process, ready to handle events!

Testing Your Agent

Publish an event to test:
# publisher.py
import asyncio
from omnidaemon import OmniDaemonSDK, EventEnvelope, PayloadBase

sdk = OmniDaemonSDK()

async def main():
    event = EventEnvelope(
        topic="file.tasks",
        payload=PayloadBase(content="List all files in the directory")
    )
    
    task_id = await sdk.publish_task(event_envelope=event)
    print(f"πŸ“¨ Task ID: {task_id}")
    
    await asyncio.sleep(5)  # Wait for processing
    
    result = await sdk.get_result(task_id)
    print(f"βœ… Result: {result}")

asyncio.run(main())
Run it:
python publisher.py

Production Deployment

Environment Variables

# .env
OPENAI_API_KEY=your_key
REDIS_URL=redis://prod-redis:6379
STORAGE_BACKEND=redis  # Use Redis for production
LOG_LEVEL=INFO

Horizontal Scaling

Run multiple instances for load balancing:
# Each instance runs the same code, automatic load balancing via consumer groups
python agent_runner.py &  # Instance 1
python agent_runner.py &  # Instance 2
python agent_runner.py &  # Instance 3
OmniDaemon automatically distributes work across all instances.

Monitoring

# Check health
omnidaemon health

# View metrics
omnidaemon metrics --topic file.tasks

# Inspect failures (DLQ)
omnidaemon bus dlq --topic file.tasks

Complete Working Example

See the full production implementation with Google ADK: πŸ“ examples/agents_with_supervisors/ This example includes:
  • Google ADK integration
  • MCP filesystem tools
  • Process isolation with supervisors
  • Error handling
  • Graceful shutdown
  • Multiple agents

Understanding the Architecture

This supervisor pattern is fundamental to how OmniDaemon achieves production-grade reliability.

Process Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Main Process (agent_runner.py)    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ OmniDaemonSDK                 β”‚  β”‚
β”‚  β”‚ - Event bus (Redis)           β”‚  β”‚
β”‚  β”‚ - Agent registration          β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ AgentSupervisor               β”‚  β”‚
β”‚  β”‚ - Spawns subprocess           β”‚  β”‚
β”‚  β”‚ - Monitors health             β”‚  β”‚
β”‚  β”‚ - stdio communication         β”‚  β”‚
β”‚  β”‚ - Auto-restart on crash       β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚             β”‚ stdio (JSON)           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
              ↓
       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
       β”‚ Agent Process (PID: 5678)    β”‚
       β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
       β”‚ β”‚ my_agent/agent.py        β”‚ β”‚
       β”‚ β”‚ - OmniCore Agent         β”‚ β”‚
       β”‚ β”‚ - MCP tools              β”‚ β”‚
       β”‚ β”‚ - Isolated dependencies  β”‚ β”‚
       β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Core Concepts: Implementation Guides: Other Examples:

Key Takeaways

βœ… Supervisors provide process isolation - One agent crash doesn’t affect others
βœ… Production requirement - Use supervisors for any production AI agent
βœ… Simple to use - create_supervisor_from_directory() handles everything
βœ… Auto-recovery - Crashed agents restart automatically
βœ… Resource cleanup - Graceful shutdown of MCP tools and connections
The supervisor pattern is what makes OmniDaemon production-ready. It transforms your AI agents from fragile single-process scripts into robust, fault-tolerant services.
Next: Try the Google ADK example to see supervisors with a different AI framework.