from omnicoreagent import OmniAgent, ToolRegistry, MemoryRouter, EventRouter
from typing import Optional
import asyncio
import logging
from decouple import config
from omnidaemon import OmniDaemonSDK, start_api_server, AgentConfig, SubscriptionConfig
# Initialize SDK (uses dependency injection for storage & event bus)
sdk = OmniDaemonSDK()
logger = logging.getLogger(__name__)
# Configure MCP Tools (filesystem tools via Node.js)
MCP_TOOLS = [
{
"name": "filesystem",
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-filesystem",
"/home/user/your_target_folder", # CHANGE THIS!
],
},
]
class OmniAgentRunner:
"""Wrapper for OmniCore Agent with initialization management."""
def __init__(self):
"""Initialize the agent runner."""
self.agent: Optional[OmniAgent] = None
self.memory_router: Optional[MemoryRouter] = None
self.event_router: Optional[EventRouter] = None
self.connected = False
self.session_id: Optional[str] = None
async def initialize(self):
"""Initialize all OmniCore Agent components."""
if self.connected:
logger.info("OmniAgent already initialized")
return
logger.info("🚀 Initializing OmniAgent...")
# Initialize routers
self.memory_router = MemoryRouter("in_memory")
self.event_router = EventRouter("in_memory")
# Initialize OmniCore Agent
self.agent = OmniAgent(
name="filesystem_assistant_agent",
system_instruction="Help the user manage their files. You can list files, read files, etc.",
model_config={
"provider": "openai",
"model": "gpt-4o",
"temperature": 0,
"max_context_length": 1000,
},
mcp_tools=MCP_TOOLS,
agent_config={
"agent_name": "OmniAgent",
"max_steps": 15,
"tool_call_timeout": 20,
"request_limit": 0, # 0 = unlimited
"total_tokens_limit": 0,
# Memory config
"memory_config": {"mode": "sliding_window", "value": 100},
"memory_results_limit": 5,
"memory_similarity_threshold": 0.5,
# Tool retrieval config
"enable_tools_knowledge_base": False,
"tools_results_limit": 10,
"tools_similarity_threshold": 0.1,
},
memory_router=self.memory_router,
event_router=self.event_router,
debug=False,
)
# Connect to MCP servers
await self.agent.connect_mcp_servers()
self.connected = True
logger.info("✅ OmniAgent initialized successfully")
async def handle_chat(self, message: str):
"""Process a message with the agent."""
if not self.agent:
logger.error("Agent not initialized")
return None
logger.info(f"🤖 Processing: {message}")
# Generate session ID if needed
if not self.session_id:
from datetime import datetime
self.session_id = (
f"cli_session_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S')}"
)
try:
# Run the agent
result = await self.agent.run(message)
response = result.get("response", "No response received")
return response
except Exception as e:
logger.exception(f"Error processing message: {e}")
raise
async def shutdown(self):
"""Clean up OmniAgent resources."""
if self.agent and getattr(self.agent, "mcp_tools", None):
try:
await self.agent.cleanup()
logger.info("OmniAgent MCP cleanup successful")
except Exception as e:
logger.warning(f"OmniAgent MCP cleanup failed: {e}")
# Create agent runner instance
filesystem_agent_runner = OmniAgentRunner()
# ============================================
# CALLBACK FUNCTIONS (Where AI Agent Runs!)
# ============================================
async def call_file_system_agent(message: dict):
"""
Main callback for file system agent.
This is called automatically by OmniDaemon when an event arrives.
Args:
message: Event envelope containing:
- content: Your data (query/command)
- correlation_id: Request tracking
- tenant_id: Multi-tenancy
- source: Event origin
- webhook: Callback URL
- reply_to: Response topic
- etc.
Returns:
Dict with status and result
"""
logger.info(f"Received event: {message}")
# Initialize agent if needed
await filesystem_agent_runner.initialize()
# Extract query from content
query = message.get("content")
# Run OmniCore Agent
result = await filesystem_agent_runner.handle_chat(message=query)
return {
"status": "success",
"data": result,
"processed_by": "OmniCore Agent"
}
async def call_response_handler(message: dict):
"""
Response handler for file system agent responses.
This agent listens to responses from the file system agent.
"""
logger.info(f"Received response: {message}")
# Process response
# (Add your logic here)
return {"status": "acknowledged"}
# ============================================
# MAIN FUNCTION
# ============================================
async def main():
try:
# Register agents with OmniDaemon
logger.info("Registering agents...")
# Agent 1: File System Agent (processes file operations)
await sdk.register_agent(
agent_config=AgentConfig(
name="OMNICOREAGENT_FILESYSTEM_AGENT",
topic="file_system.tasks",
callback=call_file_system_agent,
description="Help the user manage their files. You can list files, read files, etc.",
tools=[], # MCP tools configured in agent
config=SubscriptionConfig(
reclaim_idle_ms=6000, # 6 seconds
dlq_retry_limit=3, # 3 retries before DLQ
consumer_count=3, # 3 parallel consumers
),
)
)
# Agent 2: Response Handler (processes responses)
await sdk.register_agent(
agent_config=AgentConfig(
name="OMNICOREAGENT_RESPONSE",
topic="file_system.response",
callback=call_response_handler,
description="Response handler for file system agent.",
tools=[],
config=SubscriptionConfig(
reclaim_idle_ms=6000,
dlq_retry_limit=3,
consumer_count=1,
),
)
)
logger.info("✅ Agents registered successfully")
# Start the agent runner
logger.info("Starting OmniDaemon agent runner...")
await sdk.start()
logger.info("🎧 OmniDaemon agent runner started")
# Optional: Start API server
enable_api = config("OMNIDAEMON_API_ENABLED", default=False, cast=bool)
api_port = config("OMNIDAEMON_API_PORT", default=8765, cast=int)
if enable_api:
asyncio.create_task(start_api_server(sdk, port=api_port))
logger.info(f"📡 OmniDaemon API running on http://127.0.0.1:{api_port}")
# Keep running to process events
logger.info("✅ Agent runner is now processing events. Press Ctrl+C to stop.")
try:
while True:
await asyncio.sleep(1)
except (KeyboardInterrupt, asyncio.CancelledError):
logger.info("Received shutdown signal (Ctrl+C)...")
except (KeyboardInterrupt, asyncio.CancelledError):
logger.info("Received shutdown signal...")
except Exception as e:
logger.error(f"Error during agent runner execution: {e}", exc_info=True)
raise
finally:
# ALWAYS cleanup resources
logger.info("Shutting down OmniDaemon...")
try:
await sdk.shutdown()
logger.info("✅ OmniDaemon shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}", exc_info=True)
# Cleanup OmniCore Agent resources
try:
await filesystem_agent_runner.shutdown()
logger.info("✅ OmniAgent cleanup complete")
except Exception as e:
logger.error(f"Error during OmniAgent cleanup: {e}", exc_info=True)
# ============================================
# Run the example
# ============================================
if __name__ == "__main__":
asyncio.run(main())