Skip to main content

Google ADK Agent Example

This page provides a complete, production-ready example of integrating Google Agent Development Kit (ADK) with OmniDaemon. Google ADK is Google’s official framework for building AI agents with Gemini models.

Overview

What This Example Shows:
  • ✅ Full Google ADK integration
  • ✅ MCP tool integration (filesystem tools)
  • ✅ Session management with InMemorySessionService
  • ✅ Event-driven agent pattern
  • ✅ LiteLLM integration (use any LLM provider)
  • ✅ Proper cleanup and error handling
  • ✅ Production-ready patterns
Source Code: examples/google_adk/agent_runner.py

Prerequisites

1. Install OmniDaemon

# Recommended: uv (fast & modern)
uv add omnidaemon

# Or with pip:
pip install omnidaemon

2. Install Google ADK

# Recommended: uv
uv add google-adk

# Or with pip:
pip install google-adk

3. Install LiteLLM (Optional - for non-Gemini models)

# Recommended: uv
uv add lite-llm

# Or with pip:
pip install lite-llm

4. Install Node.js (for MCP tools)

# Check if installed
node --version
npm --version

# If not installed:
# macOS: brew install node
# Ubuntu: sudo apt install nodejs npm
# Windows: Download from nodejs.org

5. Set Up API Keys

Google ADK works with Gemini or other models via LiteLLM:
# Create .env file
cat > .env << 'EOF'
# For Gemini
GOOGLE_API_KEY=your_google_api_key_here

# Or for OpenAI (via LiteLLM)
OPENAI_API_KEY=your_openai_api_key_here

# Optional OmniDaemon settings
REDIS_URL=redis://localhost:6379
STORAGE_BACKEND=json
JSON_STORAGE_DIR=.omnidaemon_data
EOF

6. Set Up Redis (Event Bus)

# Docker (easiest)
docker run -d -p 6379:6379 --name redis redis:latest

# Or install locally
# macOS: brew install redis && brew services start redis
# Ubuntu: sudo apt install redis-server && sudo systemctl start redis

Complete Working Example

File: agent_runner.py

import os
from google.adk.agents import LlmAgent
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
from google.adk.tools.mcp_tool.mcp_session_manager import StdioConnectionParams
from mcp import StdioServerParameters
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
from google.adk.models.lite_llm import LiteLlm
import asyncio
import logging
from decouple import config
from dotenv import load_dotenv

from omnidaemon import OmniDaemonSDK, start_api_server, AgentConfig, SubscriptionConfig

load_dotenv()

# Get API keys
# api_key = os.getenv("GOOGLE_API_KEY")  # For Gemini
api_key = os.getenv("OPENAI_API_KEY")    # For OpenAI via LiteLLM

# Initialize SDK (uses dependency injection)
sdk = OmniDaemonSDK()

logger = logging.getLogger(__name__)

# Target folder for filesystem MCP tool
TARGET_FOLDER_PATH = os.path.join(
    os.path.dirname(os.path.abspath(__file__)),
    "/home/user/your_target_folder"  # CHANGE THIS!
)

# ============================================
# GOOGLE ADK AGENT SETUP
# ============================================

# Create LlmAgent with filesystem tools
filesystem_agent = LlmAgent(
    # Option 1: Use Gemini
    # model="gemini-2.0-flash",
    
    # Option 2: Use OpenAI via LiteLLM
    model=LiteLlm(model="openai/gpt-4o"),
    
    # Option 3: Use other providers via LiteLLM
    # model=LiteLlm(model="anthropic/claude-3-opus"),
    # model=LiteLlm(model="openai/gpt-4o-mini"),
    
    name="filesystem_assistant_agent",
    instruction="Help the user manage their files. You can list files, read files, etc.",
    tools=[
        McpToolset(
            connection_params=StdioConnectionParams(
                server_params=StdioServerParameters(
                    command="npx",
                    args=[
                        "-y",
                        "@modelcontextprotocol/server-filesystem",
                        os.path.abspath(TARGET_FOLDER_PATH),
                    ],
                ),
                timeout=60,
            ),
        )
    ],
)

# ============================================
# SESSION MANAGEMENT
# ============================================

# Key Concept: SessionService stores conversation history & state
# InMemorySessionService is simple, non-persistent storage
session_service = InMemorySessionService()

# Define constants for identifying the interaction context
APP_NAME = "filesystem_agent"
USER_ID = "user_1"
SESSION_ID = "session_001"


async def create_session():
    """Create session if it doesn't exist."""
    await session_service.create_session(
        app_name=APP_NAME, 
        user_id=USER_ID, 
        session_id=SESSION_ID
    )


# ============================================
# RUNNER SETUP
# ============================================

# Key Concept: Runner orchestrates the agent execution loop
runner = Runner(
    agent=filesystem_agent, 
    app_name=APP_NAME, 
    session_service=session_service
)


# ============================================
# CALLBACK FUNCTION (Where AI Runs!)
# ============================================

async def call_file_system_agent(message: dict):
    """
    Main callback for Google ADK agent.
    
    This is called automatically by OmniDaemon when an event arrives.
    
    Args:
        message: Event envelope containing:
            - content: Your data (query/command)
            - correlation_id: Request tracking
            - tenant_id: Multi-tenancy
            - source: Event origin
            - webhook: Callback URL
            - reply_to: Response topic
            - etc.
    
    Returns:
        String with agent response
    """
    # Create session for this interaction
    await create_session()
    
    # Extract query from message
    query = message.get("content")
    if not query:
        return "No content in the message payload"
    
    logger.info(f"\n>>> User Query: {query}")

    # Create content for Google ADK
    content = types.Content(role="user", parts=[types.Part(text=query)])

    # Default response
    final_response_text = "Agent did not produce a final response."

    # Run agent asynchronously (stream events)
    async for event in runner.run_async(
        user_id=USER_ID, 
        session_id=SESSION_ID, 
        new_message=content
    ):
        # Check if this is the final response
        if event.is_final_response():
            if event.content and event.content.parts:
                # Extract text from first part
                final_response_text = event.content.parts[0].text
            elif event.actions and event.actions.escalate:
                # Handle errors/escalations
                final_response_text = (
                    f"Agent escalated: {event.error_message or 'No specific message.'}"
                )
            # Stop processing once final response found
            break

    logger.info(f"<<< Agent Response: {final_response_text}")
    return final_response_text


# ============================================
# MAIN FUNCTION
# ============================================

async def main():
    try:
        # Register Google ADK agent with OmniDaemon
        logger.info("Registering Google ADK agent...")
        
        await sdk.register_agent(
            agent_config=AgentConfig(
                name="GOOGLE_ADK_FILESYSTEM_AGENT",
                topic="file_system.tasks",
                callback=call_file_system_agent,
                description="Help the user manage their files. You can list files, read files, etc.",
                tools=[],  # MCP tools configured in agent
                config=SubscriptionConfig(
                    reclaim_idle_ms=6000,      # 6 seconds
                    dlq_retry_limit=3,         # 3 retries before DLQ
                    consumer_count=3,          # 3 parallel consumers
                ),
            )
        )
        
        logger.info("✅ Agent registered successfully")

        # Start the agent runner
        logger.info("Starting OmniDaemon agent runner...")
        await sdk.start()
        logger.info("🎧 OmniDaemon agent runner started")

        # Optional: Start API server
        enable_api = config("OMNIDAEMON_API_ENABLED", default=False, cast=bool)
        api_port = config("OMNIDAEMON_API_PORT", default=8765, cast=int)
        if enable_api:
            asyncio.create_task(start_api_server(sdk, port=api_port))
            logger.info(f"📡 OmniDaemon API running on http://127.0.0.1:{api_port}")

        # Keep running to process events
        logger.info("✅ Agent runner is now processing events. Press Ctrl+C to stop.")
        try:
            while True:
                await asyncio.sleep(1)
        except (KeyboardInterrupt, asyncio.CancelledError):
            logger.info("Received shutdown signal (Ctrl+C)...")

    except (KeyboardInterrupt, asyncio.CancelledError):
        logger.info("Received shutdown signal...")
    except Exception as e:
        logger.error(f"Error during agent runner execution: {e}", exc_info=True)
        raise

    finally:
        # ALWAYS cleanup resources
        logger.info("Shutting down OmniDaemon...")
        try:
            await sdk.shutdown()
            logger.info("✅ OmniDaemon shutdown complete")
        except Exception as e:
            logger.error(f"Error during shutdown: {e}", exc_info=True)


# ============================================
# Run the example
# ============================================
if __name__ == "__main__":
    asyncio.run(main())

Code Explanation

1. Imports

from google.adk.agents import LlmAgent
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types
from google.adk.models.lite_llm import LiteLlm
from omnidaemon import OmniDaemonSDK
from omnidaemon import AgentConfig, SubscriptionConfig
Key Imports:
  • LlmAgent - Google ADK agent class
  • InMemorySessionService - Session/conversation management
  • Runner - Orchestrates agent execution
  • types.Content - Message format for Google ADK
  • LiteLlm - Use non-Gemini models (OpenAI, Anthropic, etc.)

2. Agent Configuration

filesystem_agent = LlmAgent(
    model=LiteLlm(model="openai/gpt-4o"),  # Or "gemini-2.0-flash"
    name="filesystem_assistant_agent",
    instruction="Help the user manage their files...",
    tools=[McpToolset(...)],
)
Model Options:
# Gemini (native)
model="gemini-2.0-flash"
model="gemini-1.5-pro"

# OpenAI (via LiteLLM)
model=LiteLlm(model="openai/gpt-4o")
model=LiteLlm(model="openai/gpt-4o-mini")

# Anthropic (via LiteLLM)
model=LiteLlm(model="anthropic/claude-3-opus")
model=LiteLlm(model="anthropic/claude-3-sonnet")

# Other providers (via LiteLLM)
model=LiteLlm(model="azure/gpt-4")
model=LiteLlm(model="together_ai/llama-3-70b")

3. Session Management

# Create session service
session_service = InMemorySessionService()

# Define session identifiers
APP_NAME = "filesystem_agent"
USER_ID = "user_1"
SESSION_ID = "session_001"

# Create session
await session_service.create_session(
    app_name=APP_NAME, 
    user_id=USER_ID, 
    session_id=SESSION_ID
)
Why Sessions?
  • ✅ Maintains conversation history
  • ✅ Preserves context across messages
  • ✅ Enables multi-turn conversations

4. The Callback (Where AI Runs!)

async def call_file_system_agent(message: dict):
    # Create session
    await create_session()
    
    # Extract query
    query = message.get("content")
    
    # Create Google ADK content
    content = types.Content(role="user", parts=[types.Part(text=query)])
    
    # Run agent (THIS IS WHERE AI RUNS!)
    async for event in runner.run_async(
        user_id=USER_ID,
        session_id=SESSION_ID,
        new_message=content
    ):
        if event.is_final_response():
            final_response_text = event.content.parts[0].text
            break
    
    return final_response_text
Key Points:
  • runner.run_async() streams events from the agent
  • Loop until is_final_response() returns True
  • Extract text from event.content.parts[0].text

5. Agent Registration

await sdk.register_agent(
    agent_config=AgentConfig(
        name="GOOGLE_ADK_FILESYSTEM_AGENT",
        topic="file_system.tasks",
        callback=call_file_system_agent,
        config=SubscriptionConfig(
            reclaim_idle_ms=6000,
            dlq_retry_limit=3,
            consumer_count=3,
        ),
    )
)

Running the Example

1. Update Target Folder

Edit agent_runner.py:
TARGET_FOLDER_PATH = "/your/actual/path/here"  # CHANGE THIS!

2. Choose Your Model

# Option 1: Gemini (requires GOOGLE_API_KEY)
filesystem_agent = LlmAgent(
    model="gemini-2.0-flash",
    # ...
)

# Option 2: OpenAI (requires OPENAI_API_KEY)
filesystem_agent = LlmAgent(
    model=LiteLlm(model="openai/gpt-4o"),
    # ...
)

3. Run the Agent

python agent_runner.py
Expected Output:
INFO:root:Registering Google ADK agent...
INFO:root:✅ Agent registered successfully
INFO:root:Starting OmniDaemon agent runner...
[Runner abc-123] Registered agent 'GOOGLE_ADK_FILESYSTEM_AGENT' on topic 'file_system.tasks'
INFO:root:🎧 OmniDaemon agent runner started
INFO:root:✅ Agent runner is now processing events. Press Ctrl+C to stop.

4. Publish an Event

In a new terminal:
# publisher.py
import asyncio
from omnidaemon import OmniDaemonSDK
from omnidaemon import EventEnvelope, PayloadBase

sdk = OmniDaemonSDK()

async def main():
    event = EventEnvelope(
        topic="file_system.tasks",
        payload=PayloadBase(
            content="What files are in the directory?"
        ),
    )
    
    task_id = await sdk.publish_task(event_envelope=event)
    print(f"📨 Task ID: {task_id}")
    
    # Wait for processing
    await asyncio.sleep(5)
    
    # Get result
    result = await sdk.get_result(task_id)
    print(f"✅ Result: {result}")

asyncio.run(main())
Run it:
python publisher.py
Expected Output:
📨 Task ID: 550e8400-e29b-41d4-a716-446655440000
✅ Result: "The directory contains the following files:\n1. document.txt\n2. image.png\n3. folder1/\n"
In the agent terminal:
INFO:root:
>>> User Query: What files are in the directory?
INFO:root:<<< Agent Response: The directory contains the following files:
1. document.txt
2. image.png
3. folder1/
[Agent filesystem_assistant_agent] Processed task in 1.85s

Customization

Use Different LLM Providers

# Anthropic Claude
model=LiteLlm(model="anthropic/claude-3-opus")

# Azure OpenAI
model=LiteLlm(model="azure/gpt-4")

# Together AI
model=LiteLlm(model="together_ai/llama-3-70b")

# Mistral
model=LiteLlm(model="mistral/mistral-large")

Persistent Session Storage

# Instead of InMemorySessionService:
from google.adk.sessions import DatabaseSessionService

session_service = DatabaseSessionService(
    database_url="postgresql://localhost/sessions"
)

Multi-User Sessions

async def call_file_system_agent(message: dict):
    # Extract user info from message
    user_id = message.get("tenant_id", "default_user")
    session_id = f"session_{user_id}_{message.get('correlation_id')}"
    
    # Create unique session per user
    await session_service.create_session(
        app_name=APP_NAME,
        user_id=user_id,
        session_id=session_id
    )
    
    # Run with user-specific session
    async for event in runner.run_async(
        user_id=user_id,
        session_id=session_id,
        new_message=content
    ):
        # ...

Add More MCP Tools

filesystem_agent = LlmAgent(
    model="gemini-2.0-flash",
    name="assistant",
    instruction="Help with files and GitHub",
    tools=[
        # Filesystem tools
        McpToolset(
            connection_params=StdioConnectionParams(
                server_params=StdioServerParameters(
                    command="npx",
                    args=["-y", "@modelcontextprotocol/server-filesystem", "/path"],
                ),
            ),
        ),
        # GitHub tools
        McpToolset(
            connection_params=StdioConnectionParams(
                server_params=StdioServerParameters(
                    command="npx",
                    args=["-y", "@modelcontextprotocol/server-github"],
                ),
            ),
        ),
    ],
)

Production Deployment

1. Use Environment Variables

# .env
GOOGLE_API_KEY=your_key
# Or:
OPENAI_API_KEY=your_key

REDIS_URL=redis://prod-redis:6379
STORAGE_BACKEND=redis
OMNIDAEMON_API_ENABLED=true
LOG_LEVEL=INFO

2. Use Persistent Sessions

from google.adk.sessions import RedisSessionService

session_service = RedisSessionService(
    redis_url="redis://prod-redis:6379"
)

3. Scale with Multiple Runners

# Run multiple instances
python agent_runner.py &  # Instance 1
python agent_runner.py &  # Instance 2
python agent_runner.py &  # Instance 3

4. Monitor with CLI

# Check health
omnidaemon health

# Check metrics
omnidaemon metrics --topic file_system.tasks

# Inspect failures
omnidaemon bus dlq --topic file_system.tasks

Troubleshooting

Google ADK Not Found

# Install Google ADK (recommended: uv)
uv add google-adk

# Or with pip:
pip install google-adk

# Or with specific version
pip install google-adk==1.0.0

LiteLLM Errors

# Install LiteLLM (recommended: uv)
uv add litellm

# Or with pip:
pip install litellm

# Check API key is set
echo $OPENAI_API_KEY
echo $GOOGLE_API_KEY

Session Errors

# Add error handling
try:
    await create_session()
except Exception as e:
    logger.warning(f"Session creation failed: {e}")
    # Create new session with different ID
    session_id = f"{SESSION_ID}_{int(time.time())}"

MCP Tools Not Working

# Test MCP command manually
npx -y @modelcontextprotocol/server-filesystem /your/path

# Check Node.js version
node --version  # Should be v18 or higher

Google ADK vs OmniCore Agent

FeatureGoogle ADKOmniCore Agent
ProviderGoogleIndependent
ModelsGemini + LiteLLMAny (OpenAI, Anthropic, etc.)
MCP Tools✅ Yes✅ Yes
Session ManagementBuilt-inBuilt-in
Event Streaming✅ Yes✅ Yes
Memory ManagementBasicAdvanced (MemoryRouter)
Best ForGemini usersMulti-provider
Both work perfectly with OmniDaemon!

Further Reading


Summary

What You Learned:
  • ✅ How to integrate Google ADK with OmniDaemon
  • ✅ Session management with InMemorySessionService
  • ✅ Event streaming with runner.run_async()
  • ✅ Using LiteLLM for multi-provider support
  • ✅ MCP tool configuration
  • ✅ Production deployment strategies
Key Takeaways:
  • Google ADK uses async streaming for responses
  • Sessions maintain conversation context
  • LiteLLM enables any LLM provider
  • Same event-driven pattern as OmniCore Agent
  • Clean shutdown handled automatically
Next Steps:
  • Try with different LLM providers
  • Implement persistent sessions
  • Scale with multiple runners
  • Deploy to production!
Google ADK + OmniDaemon = Powerful Gemini agents! 🚀