import os
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
from google.adk.models.lite_llm import LiteLlm
import asyncio
import logging
from decouple import config
from dotenv import load_dotenv
from omnidaemon import OmniDaemonSDK, start_api_server, AgentConfig, SubscriptionConfig
load_dotenv()
# Get API keys
# api_key = os.getenv("GOOGLE_API_KEY") # For Gemini
api_key = os.getenv("OPENAI_API_KEY") # For OpenAI via LiteLLM
# Initialize SDK (uses dependency injection)
sdk = OmniDaemonSDK()
logger = logging.getLogger(__name__)
# Target folder for filesystem MCP tool
TARGET_FOLDER_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"/home/user/your_target_folder" # CHANGE THIS!
)
# ============================================
# GOOGLE ADK AGENT SETUP
# ============================================
# Create LlmAgent with filesystem tools
filesystem_agent = LlmAgent(
# Option 1: Use Gemini
# model="gemini-2.0-flash",
# Option 2: Use OpenAI via LiteLLM
model=LiteLlm(model="openai/gpt-4o"),
# Option 3: Use other providers via LiteLLM
# model=LiteLlm(model="anthropic/claude-3-opus"),
# model=LiteLlm(model="openai/gpt-4o-mini"),
name="filesystem_assistant_agent",
instruction="Help the user manage their files. You can list files, read files, etc.",
tools=[
McpToolset(
connection_params=StdioConnectionParams(
server_params=StdioServerParameters(
command="npx",
args=[
"-y",
"@modelcontextprotocol/server-filesystem",
os.path.abspath(TARGET_FOLDER_PATH),
],
),
timeout=60,
),
)
],
)
# ============================================
# SESSION MANAGEMENT
# ============================================
# Key Concept: SessionService stores conversation history & state
# InMemorySessionService is simple, non-persistent storage
session_service = InMemorySessionService()
# Define constants for identifying the interaction context
APP_NAME = "filesystem_agent"
USER_ID = "user_1"
SESSION_ID = "session_001"
async def create_session():
"""Create session if it doesn't exist."""
await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID
)
# ============================================
# RUNNER SETUP
# ============================================
# Key Concept: Runner orchestrates the agent execution loop
runner = Runner(
agent=filesystem_agent,
app_name=APP_NAME,
session_service=session_service
)
# ============================================
# CALLBACK FUNCTION (Where AI Runs!)
# ============================================
async def call_file_system_agent(message: dict):
"""
Main callback for Google ADK agent.
This is called automatically by OmniDaemon when an event arrives.
Args:
message: Event envelope containing:
- content: Your data (query/command)
- correlation_id: Request tracking
- tenant_id: Multi-tenancy
- source: Event origin
- webhook: Callback URL
- reply_to: Response topic
- etc.
Returns:
String with agent response
"""
# Create session for this interaction
await create_session()
# Extract query from message
query = message.get("content")
if not query:
return "No content in the message payload"
logger.info(f"\n>>> User Query: {query}")
# Create content for Google ADK
content = types.Content(role="user", parts=[types.Part(text=query)])
# Default response
final_response_text = "Agent did not produce a final response."
# Run agent asynchronously (stream events)
async for event in runner.run_async(
user_id=USER_ID,
session_id=SESSION_ID,
new_message=content
):
# Check if this is the final response
if event.is_final_response():
if event.content and event.content.parts:
# Extract text from first part
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate:
# Handle errors/escalations
final_response_text = (
f"Agent escalated: {event.error_message or 'No specific message.'}"
)
# Stop processing once final response found
break
logger.info(f"<<< Agent Response: {final_response_text}")
return final_response_text
# ============================================
# MAIN FUNCTION
# ============================================
async def main():
try:
# Register Google ADK agent with OmniDaemon
logger.info("Registering Google ADK agent...")
await sdk.register_agent(
agent_config=AgentConfig(
name="GOOGLE_ADK_FILESYSTEM_AGENT",
topic="file_system.tasks",
callback=call_file_system_agent,
description="Help the user manage their files. You can list files, read files, etc.",
tools=[], # MCP tools configured in agent
config=SubscriptionConfig(
reclaim_idle_ms=6000, # 6 seconds
dlq_retry_limit=3, # 3 retries before DLQ
consumer_count=3, # 3 parallel consumers
),
)
)
logger.info("✅ Agent registered successfully")
# Start the agent runner
logger.info("Starting OmniDaemon agent runner...")
await sdk.start()
logger.info("🎧 OmniDaemon agent runner started")
# Optional: Start API server
enable_api = config("OMNIDAEMON_API_ENABLED", default=False, cast=bool)
api_port = config("OMNIDAEMON_API_PORT", default=8765, cast=int)
if enable_api:
asyncio.create_task(start_api_server(sdk, port=api_port))
logger.info(f"📡 OmniDaemon API running on http://127.0.0.1:{api_port}")
# Keep running to process events
logger.info("✅ Agent runner is now processing events. Press Ctrl+C to stop.")
try:
while True:
await asyncio.sleep(1)
except (KeyboardInterrupt, asyncio.CancelledError):
logger.info("Received shutdown signal (Ctrl+C)...")
except (KeyboardInterrupt, asyncio.CancelledError):
logger.info("Received shutdown signal...")
except Exception as e:
logger.error(f"Error during agent runner execution: {e}", exc_info=True)
raise
finally:
# ALWAYS cleanup resources
logger.info("Shutting down OmniDaemon...")
try:
await sdk.shutdown()
logger.info("✅ OmniDaemon shutdown complete")
except Exception as e:
logger.error(f"Error during shutdown: {e}", exc_info=True)
# ============================================
# Run the example
# ============================================
if __name__ == "__main__":
asyncio.run(main())