Skip to main content

OmniCore Agent Example

This page provides a complete, production-ready example of integrating OmniCore Agent with OmniDaemon. OmniCore Agent is an AI agent framework with MCP (Model Context Protocol) tool support, built by the same team as OmniDaemon.

Overview

What This Example Shows:
  • ✅ Full OmniCore Agent integration
  • ✅ MCP tool integration (filesystem tools)
  • ✅ Event-driven agent pattern
  • ✅ Multiple agent registration
  • ✅ Proper initialization and cleanup
  • ✅ Graceful shutdown handling
  • ✅ Production-ready error handling
Source Code: examples/omnicoreagent/agent_runner.py

Prerequisites

1. Install OmniDaemon

# Recommended: uv (fast & modern)
uv add omnidaemon

# Or with pip:
pip install omnidaemon

2. Install OmniCore Agent

# Recommended: uv
uv add omnicoreagent

# Or with pip:
pip install omnicoreagent

3. Install Node.js (for MCP tools)

MCP filesystem tools run via npx:
# Check if Node.js is installed
node --version
npm --version

# If not installed:
# macOS: brew install node
# Ubuntu: sudo apt install nodejs npm
# Windows: Download from nodejs.org

4. Set Up API Keys

OmniCore Agent requires an OpenAI API key:
# Create .env file
cat > .env << 'EOF'
OPENAI_API_KEY=your_openai_api_key_here

# Optional OmniDaemon settings
REDIS_URL=redis://localhost:6379
STORAGE_BACKEND=json
JSON_STORAGE_DIR=.omnidaemon_data
EOF

5. Set Up Redis (Event Bus)

# Docker (easiest)
docker run -d -p 6379:6379 --name redis redis:latest

# Or install locally
# macOS: brew install redis && brew services start redis
# Ubuntu: sudo apt install redis-server && sudo systemctl start redis

Complete Working Example

File: agent_runner.py

from omnicoreagent import OmniAgent, ToolRegistry, MemoryRouter, EventRouter
from typing import Optional
import asyncio
import logging
from decouple import config
from omnidaemon import OmniDaemonSDK, start_api_server, AgentConfig, SubscriptionConfig

# Initialize SDK (uses dependency injection for storage & event bus)
sdk = OmniDaemonSDK()

logger = logging.getLogger(__name__)

# Configure MCP Tools (filesystem tools via Node.js)
MCP_TOOLS = [
    {
        "name": "filesystem",
        "command": "npx",
        "args": [
            "-y",
            "@modelcontextprotocol/server-filesystem",
            "/home/user/your_target_folder",  # CHANGE THIS!
        ],
    },
]


class OmniAgentRunner:
    """Wrapper for OmniCore Agent with initialization management."""

    def __init__(self):
        """Initialize the agent runner."""
        self.agent: Optional[OmniAgent] = None
        self.memory_router: Optional[MemoryRouter] = None
        self.event_router: Optional[EventRouter] = None
        self.connected = False
        self.session_id: Optional[str] = None

    async def initialize(self):
        """Initialize all OmniCore Agent components."""
        if self.connected:
            logger.info("OmniAgent already initialized")
            return

        logger.info("🚀 Initializing OmniAgent...")

        # Initialize routers
        self.memory_router = MemoryRouter("in_memory")
        self.event_router = EventRouter("in_memory")

        # Initialize OmniCore Agent
        self.agent = OmniAgent(
            name="filesystem_assistant_agent",
            system_instruction="Help the user manage their files. You can list files, read files, etc.",
            model_config={
                "provider": "openai",
                "model": "gpt-4o",
                "temperature": 0,
                "max_context_length": 1000,
            },
            mcp_tools=MCP_TOOLS,
            agent_config={
                "agent_name": "OmniAgent",
                "max_steps": 15,
                "tool_call_timeout": 20,
                "request_limit": 0,  # 0 = unlimited
                "total_tokens_limit": 0,
                # Memory config
                "memory_config": {"mode": "sliding_window", "value": 100},
                "memory_results_limit": 5,
                "memory_similarity_threshold": 0.5,
                # Tool retrieval config
                "enable_tools_knowledge_base": False,
                "tools_results_limit": 10,
                "tools_similarity_threshold": 0.1,
            },
            memory_router=self.memory_router,
            event_router=self.event_router,
            debug=False,
        )

        # Connect to MCP servers
        await self.agent.connect_mcp_servers()
        self.connected = True

        logger.info("✅ OmniAgent initialized successfully")

    async def handle_chat(self, message: str):
        """Process a message with the agent."""
        if not self.agent:
            logger.error("Agent not initialized")
            return None

        logger.info(f"🤖 Processing: {message}")

        # Generate session ID if needed
        if not self.session_id:
            from datetime import datetime
            self.session_id = (
                f"cli_session_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}"
            )

        try:
            # Run the agent
            result = await self.agent.run(message)
            response = result.get("response", "No response received")
            return response
        except Exception as e:
            logger.exception(f"Error processing message: {e}")
            raise

    async def shutdown(self):
        """Clean up OmniAgent resources."""
        if self.agent and getattr(self.agent, "mcp_tools", None):
            try:
                await self.agent.cleanup()
                logger.info("OmniAgent MCP cleanup successful")
            except Exception as e:
                logger.warning(f"OmniAgent MCP cleanup failed: {e}")


# Create agent runner instance
filesystem_agent_runner = OmniAgentRunner()


# ============================================
# CALLBACK FUNCTIONS (Where AI Agent Runs!)
# ============================================

async def call_file_system_agent(message: dict):
    """
    Main callback for file system agent.
    
    This is called automatically by OmniDaemon when an event arrives.
    
    Args:
        message: Event envelope containing:
            - content: Your data (query/command)
            - correlation_id: Request tracking
            - tenant_id: Multi-tenancy
            - source: Event origin
            - webhook: Callback URL
            - reply_to: Response topic
            - etc.
    
    Returns:
        Dict with status and result
    """
    logger.info(f"Received event: {message}")
    
    # Initialize agent if needed
    await filesystem_agent_runner.initialize()
    
    # Extract query from content
    query = message.get("content")
    
    # Run OmniCore Agent
    result = await filesystem_agent_runner.handle_chat(message=query)
    
    return {
        "status": "success",
        "data": result,
        "processed_by": "OmniCore Agent"
    }


async def call_response_handler(message: dict):
    """
    Response handler for file system agent responses.
    
    This agent listens to responses from the file system agent.
    """
    logger.info(f"Received response: {message}")
    
    # Process response
    # (Add your logic here)
    
    return {"status": "acknowledged"}


# ============================================
# MAIN FUNCTION
# ============================================

async def main():
    try:
        # Register agents with OmniDaemon
        logger.info("Registering agents...")
        
        # Agent 1: File System Agent (processes file operations)
        await sdk.register_agent(
            agent_config=AgentConfig(
                name="OMNICOREAGENT_FILESYSTEM_AGENT",
                topic="file_system.tasks",
                callback=call_file_system_agent,
                description="Help the user manage their files. You can list files, read files, etc.",
                tools=[],  # MCP tools configured in agent
                config=SubscriptionConfig(
                    reclaim_idle_ms=6000,      # 6 seconds
                    dlq_retry_limit=3,         # 3 retries before DLQ
                    consumer_count=3,          # 3 parallel consumers
                ),
            )
        )
        
        # Agent 2: Response Handler (processes responses)
        await sdk.register_agent(
            agent_config=AgentConfig(
                name="OMNICOREAGENT_RESPONSE",
                topic="file_system.response",
                callback=call_response_handler,
                description="Response handler for file system agent.",
                tools=[],
                config=SubscriptionConfig(
                    reclaim_idle_ms=6000,
                    dlq_retry_limit=3,
                    consumer_count=1,
                ),
            )
        )
        
        logger.info("✅ Agents registered successfully")

        # Start the agent runner
        logger.info("Starting OmniDaemon agent runner...")
        await sdk.start()
        logger.info("🎧 OmniDaemon agent runner started")

        # Optional: Start API server
        enable_api = config("OMNIDAEMON_API_ENABLED", default=False, cast=bool)
        api_port = config("OMNIDAEMON_API_PORT", default=8765, cast=int)
        if enable_api:
            asyncio.create_task(start_api_server(sdk, port=api_port))
            logger.info(f"📡 OmniDaemon API running on http://127.0.0.1:{api_port}")

        # Keep running to process events
        logger.info("✅ Agent runner is now processing events. Press Ctrl+C to stop.")
        try:
            while True:
                await asyncio.sleep(1)
        except (KeyboardInterrupt, asyncio.CancelledError):
            logger.info("Received shutdown signal (Ctrl+C)...")

    except (KeyboardInterrupt, asyncio.CancelledError):
        logger.info("Received shutdown signal...")
    except Exception as e:
        logger.error(f"Error during agent runner execution: {e}", exc_info=True)
        raise

    finally:
        # ALWAYS cleanup resources
        logger.info("Shutting down OmniDaemon...")
        try:
            await sdk.shutdown()
            logger.info("✅ OmniDaemon shutdown complete")
        except Exception as e:
            logger.error(f"Error during shutdown: {e}", exc_info=True)

        # Cleanup OmniCore Agent resources
        try:
            await filesystem_agent_runner.shutdown()
            logger.info("✅ OmniAgent cleanup complete")
        except Exception as e:
            logger.error(f"Error during OmniAgent cleanup: {e}", exc_info=True)


# ============================================
# Run the example
# ============================================
if __name__ == "__main__":
    asyncio.run(main())

Code Explanation

1. Imports

from omnicoreagent import OmniAgent, ToolRegistry, MemoryRouter, EventRouter
from omnidaemon import OmniDaemonSDK
from omnidaemon import AgentConfig, SubscriptionConfig
Key Imports:
  • OmniAgent - Main OmniCore Agent class
  • MemoryRouter, EventRouter - Memory and event management
  • OmniDaemonSDK - OmniDaemon SDK (uses DI for storage/event bus)
  • AgentConfig, SubscriptionConfig - Configuration schemas

2. OmniCore Agent Initialization

class OmniAgentRunner:
    async def initialize(self):
        # Create routers
        self.memory_router = MemoryRouter("in_memory")
        self.event_router = EventRouter("in_memory")
        
        # Initialize agent
        self.agent = OmniAgent(
            name="filesystem_assistant_agent",
            system_instruction="...",
            model_config={"provider": "openai", "model": "gpt-4o"},
            mcp_tools=MCP_TOOLS,
            agent_config={...},
            memory_router=self.memory_router,
            event_router=self.event_router,
        )
        
        # Connect MCP servers
        await self.agent.connect_mcp_servers()
Why This Pattern?
  • Lazy initialization - Only initialize when first message arrives
  • Reusable - Same agent instance for all messages
  • Efficient - MCP servers stay connected

3. The Callback (Where AI Runs!)

async def call_file_system_agent(message: dict):
    """This is called when an event arrives."""
    
    # Initialize if needed
    await filesystem_agent_runner.initialize()
    
    # Extract query
    query = message.get("content")
    
    # Run OmniCore Agent (THIS IS WHERE AI RUNS!)
    result = await filesystem_agent_runner.handle_chat(message=query)
    
    return {"status": "success", "data": result}
Key Points:
  • message contains full EventEnvelope (content, metadata, etc.)
  • agent.run() executes the OmniCore Agent
  • Return value saved to storage (24h TTL)

4. Agent Registration

await sdk.register_agent(
    agent_config=AgentConfig(
        name="OMNICOREAGENT_FILESYSTEM_AGENT",
        topic="file_system.tasks",
        callback=call_file_system_agent,
        config=SubscriptionConfig(
            reclaim_idle_ms=6000,
            dlq_retry_limit=3,
            consumer_count=3,
        ),
    )
)
Configuration Explained:
  • name - Unique agent identifier
  • topic - Events to listen to
  • callback - Function to call (where AI runs)
  • reclaim_idle_ms - How long before reclaiming (6 sec for fast tasks)
  • dlq_retry_limit - Max retries before DLQ (3)
  • consumer_count - Parallel consumers (3 for load balancing)

5. Graceful Shutdown

finally:
    # OmniDaemon cleanup
    await sdk.shutdown()
    
    # OmniCore Agent cleanup
    await filesystem_agent_runner.shutdown()
Why Both?
  • sdk.shutdown() - Closes event bus, storage connections
  • agent.shutdown() - Closes MCP tool connections

Running the Example

1. Update Target Folder

Edit agent_runner.py and change the filesystem path:
MCP_TOOLS = [
    {
        "name": "filesystem",
        "command": "npx",
        "args": [
            "-y",
            "@modelcontextprotocol/server-filesystem",
            "/your/actual/path/here",  # CHANGE THIS!
        ],
    },
]

2. Run the Agent

python agent_runner.py
Expected Output:
INFO:root:Registering agents...
INFO:root:✅ Agents registered successfully
INFO:root:Starting OmniDaemon agent runner...
[Runner abc-123] Registered agent 'OMNICOREAGENT_FILESYSTEM_AGENT' on topic 'file_system.tasks'
[Runner abc-123] Registered agent 'OMNICOREAGENT_RESPONSE' on topic 'file_system.response'
INFO:root:🎧 OmniDaemon agent runner started
INFO:root:✅ Agent runner is now processing events. Press Ctrl+C to stop.

3. Publish an Event

In a new terminal:
# publisher.py
import asyncio
from omnidaemon import OmniDaemonSDK
from omnidaemon import EventEnvelope, PayloadBase

sdk = OmniDaemonSDK()

async def main():
    event = EventEnvelope(
        topic="file_system.tasks",
        payload=PayloadBase(
            content="List all files in the directory"
        ),
    )
    
    task_id = await sdk.publish_task(event_envelope=event)
    print(f"📨 Task ID: {task_id}")
    
    # Wait for processing
    await asyncio.sleep(5)
    
    # Get result
    result = await sdk.get_result(task_id)
    print(f"✅ Result: {result}")

asyncio.run(main())
Run it:
python publisher.py
Expected Output:
📨 Task ID: 550e8400-e29b-41d4-a716-446655440000
✅ Result: {
    'status': 'success',
    'data': 'Here are the files in the directory:\n1. file1.txt\n2. file2.pdf\n3. folder1/',
    'processed_by': 'OmniCore Agent'
}
In the agent terminal:
INFO:root:Received event: {'content': 'List all files in the directory', 'topic': 'file_system.tasks', ...}
INFO:root:🚀 Initializing OmniAgent...
INFO:root:✅ OmniAgent initialized successfully
INFO:root:🤖 Processing: List all files in the directory
[Agent filesystem_assistant_agent] Processed task in 2.35s

Customization

Change AI Model

self.agent = OmniAgent(
    model_config={
        "provider": "openai",
        "model": "gpt-4o-mini",  # Cheaper model
        # Or: "gpt-4-turbo", "gpt-3.5-turbo"
        "temperature": 0.7,      # More creative
    },
)

Add More MCP Tools

MCP_TOOLS = [
    {
        "name": "filesystem",
        "command": "npx",
        "args": ["-y", "@modelcontextprotocol/server-filesystem", "/path1"],
    },
    {
        "name": "github",
        "command": "npx",
        "args": ["-y", "@modelcontextprotocol/server-github"],
    },
]

Adjust Memory Config

agent_config={
    "memory_config": {
        "mode": "full_history",  # Keep all history
        # Or: "sliding_window" with "value": 50
    },
    "memory_results_limit": 10,  # More memory results
}

Multiple Agents

# Agent 1: File operations
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.operations",
        callback=file_agent_callback,
    )
)

# Agent 2: Analysis
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.analysis",
        callback=analysis_agent_callback,
    )
)

Production Deployment

1. Use Environment Variables

# .env
OPENAI_API_KEY=your_key
REDIS_URL=redis://prod-redis:6379
STORAGE_BACKEND=redis
OMNIDAEMON_API_ENABLED=true
OMNIDAEMON_API_PORT=8765
LOG_LEVEL=INFO

2. Use Redis Storage

STORAGE_BACKEND=redis
REDIS_URL=redis://prod-redis:6379

3. Scale with Multiple Runners

# Terminal 1
python agent_runner.py

# Terminal 2 (same code, automatic load balancing!)
python agent_runner.py

# Terminal 3
python agent_runner.py

4. Monitor with CLI

# Check health
omnidaemon health

# Check metrics
omnidaemon metrics --topic file_system.tasks

# Inspect DLQ
omnidaemon bus dlq --topic file_system.tasks

Troubleshooting

MCP Tools Not Working

# Check Node.js
node --version
npm --version

# Test MCP command manually
npx -y @modelcontextprotocol/server-filesystem /your/path

Agent Not Processing

# Check if agent registered
omnidaemon agent list

# Check event bus
omnidaemon bus list

# Check Redis
redis-cli ping

OpenAI API Errors

# Add better error handling
try:
    result = await self.agent.run(message)
    return {"status": "success", "data": result}
except Exception as e:
    logger.exception(f"Agent error: {e}")
    return {
        "status": "error",
        "error": str(e),
        "error_type": type(e).__name__
    }

Further Reading


Summary

What You Learned:
  • ✅ How to integrate OmniCore Agent with OmniDaemon
  • ✅ MCP tool configuration (filesystem tools)
  • ✅ Proper initialization and cleanup
  • ✅ Multiple agent registration
  • ✅ Event-driven agent pattern
  • ✅ Production deployment strategies
Key Takeaways:
  • Initialize agent INSIDE callback (lazy initialization)
  • Reuse agent instance for all messages
  • Always cleanup resources in finally block
  • Use SubscriptionConfig for retries and scaling
  • Monitor with OmniDaemon CLI tools
Next Steps:
  • Try with your own MCP tools
  • Add more agents for different topics
  • Scale with multiple runners
  • Deploy to production!
OmniCore Agent + OmniDaemon = Powerful event-driven AI! 🚀