Skip to main content

Callback Pattern

The callback is where your AI agent runs! This page explains the callback pattern in depth, including what data you receive, how to return results, and best practices.

What is a Callback?

A callback is the function that OmniDaemon calls when an event arrives for your agent.
# This is a callback
async def my_agent(message: dict):
    """
    CALLBACK = Where your AI agent runs!
    
    This function is called automatically when an event arrives.
    """
    # Your logic here
    return {"status": "processed"}
Key Points:
  • Must be async (async def)
  • Receives one parameter (message: dict)
  • Returns a dict (or None)
  • Called automatically by OmniDaemon

The Message Parameter

The message parameter contains the full EventEnvelope sent by the publisher.

What’s Inside

async def my_agent(message: dict):
    # YOUR DATA
    content = message.get("content", {})        # The actual data
    
    # METADATA
    correlation_id = message.get("correlation_id")  # Track requests
    causation_id = message.get("causation_id")      # Event chain
    tenant_id = message.get("tenant_id")            # Multi-tenancy
    source = message.get("source")                  # Event origin
    
    # RESPONSE HANDLING
    webhook = message.get("webhook")                # HTTP callback
    reply_to = message.get("reply_to")              # Response topic
    
    # TIMESTAMPS
    created_at = message.get("created_at")          # When published
    
    # Use this data to make smart decisions!
    return {"status": "processed"}

Example Message

{
    "content": {
        "filename": "document.pdf",
        "user_id": "12345",
        "action": "analyze"
    },
    "correlation_id": "req-abc-123",
    "causation_id": "event-xyz-789",
    "tenant_id": "tenant-acme",
    "source": "web-app",
    "webhook": "https://api.example.com/callback",
    "reply_to": "analysis.response",
    "created_at": "2025-03-12T10:30:00Z"
}

Where Your AI Agent Runs

The callback is WHERE YOUR AI AGENT RUNS!

OmniCore Agent Example

from omnicoreagent import OmniAgent

# Initialize your AI agent OUTSIDE the callback
agent = OmniAgent(
    name="document_analyzer",
    system_instruction="Analyze documents and extract insights",
    model_config={"provider": "openai", "model": "gpt-4o"},
)

# Callback = WHERE you RUN your AI agent
async def analyze_document(message: dict):
    """Process document with OmniCore Agent"""
    content = message.get("content", {})
    filename = content.get("filename")
    
    # THIS is where your AI agent runs!
    result = await agent.run(f"Analyze: {filename}")
    
    return {
        "status": "success",
        "analysis": result,
        "filename": filename
    }

# Register
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="document.uploaded",
        callback=analyze_document,  # Your callback
    )
)

Google ADK Example

from google.adk.agents import LlmAgent
from google.adk.runners import Runner

# Initialize your AI agent OUTSIDE the callback
agent = LlmAgent(
    model="gemini-2.0-flash",
    name="query_assistant",
    instruction="Help users with queries",
)
runner = Runner(agent=agent, app_name="my_app", session_service=session_service)

# Callback = WHERE you RUN your AI agent
async def handle_query(message: dict):
    """Process query with Google ADK"""
    content = message.get("content", {})
    query = content.get("query")
    
    # THIS is where your AI agent runs!
    async for event in runner.run_async(
        user_id="user",
        session_id="session",
        new_message=query
    ):
        if event.is_final_response():
            return {
                "status": "success",
                "response": event.content.parts[0].text
            }

Plain Python Example

# Callback can also be plain Python (no AI framework needed!)
async def process_data(message: dict):
    """Simple data processing"""
    content = message.get("content", {})
    data = content.get("data")
    
    # Your logic here
    processed = data * 2
    
    return {
        "status": "success",
        "result": processed
    }

Smart Callback Patterns

Use the metadata in the message to make intelligent decisions!

1. Multi-Tenant Routing

async def multi_tenant_agent(message: dict):
    """Route processing based on tenant"""
    tenant_id = message.get("tenant_id", "default")
    content = message.get("content", {})
    
    # Load tenant-specific configuration
    tenant_config = await load_tenant_config(tenant_id)
    
    # Process with tenant settings
    result = await process_with_config(content, tenant_config)
    
    return {
        "status": "success",
        "tenant_id": tenant_id,
        "result": result
    }

2. Request Tracking

async def tracking_agent(message: dict):
    """Track requests using correlation_id"""
    correlation_id = message.get("correlation_id")
    content = message.get("content", {})
    
    # Log request start
    logger.info(f"[{correlation_id}] Processing started")
    
    try:
        result = await process(content)
        logger.info(f"[{correlation_id}] Success")
        
        return {
            "status": "success",
            "correlation_id": correlation_id,
            "result": result
        }
    except Exception as e:
        logger.error(f"[{correlation_id}] Failed: {e}")
        raise

3. Source-Based Processing

async def source_aware_agent(message: dict):
    """Different logic based on event source"""
    source = message.get("source", "unknown")
    content = message.get("content", {})
    
    if source == "web-app":
        # Web uploads need virus scanning
        await scan_for_viruses(content)
    
    elif source == "api":
        # API uploads are pre-validated
        pass
    
    elif source == "mobile-app":
        # Mobile uploads need compression
        await compress_file(content)
    
    result = await process(content)
    
    return {"status": "success", "source": source, "result": result}

4. Causation Chain Tracking

async def chain_tracking_agent(message: dict):
    """Track event chains"""
    correlation_id = message.get("correlation_id")
    causation_id = message.get("causation_id")
    content = message.get("content", {})
    
    # Log the chain
    logger.info(f"Event chain: {causation_id}{correlation_id}")
    
    result = await process(content)
    
    # Publish next event in chain
    await sdk.publish_task(
        event_envelope=EventEnvelope(
            topic="next.step",
            payload=PayloadBase(
                content=result,
                correlation_id=correlation_id,  # Keep same correlation
                causation_id=correlation_id,    # Current event caused next
            )
        )
    )
    
    return {"status": "success"}

5. Conditional Processing

async def conditional_agent(message: dict):
    """Only process if conditions met"""
    tenant_id = message.get("tenant_id")
    source = message.get("source")
    content = message.get("content", {})
    
    # Check if we should process
    if not await should_process(tenant_id, source):
        return {
            "status": "skipped",
            "reason": "Conditions not met"
        }
    
    # Process
    result = await process(content)
    
    return {"status": "processed", "result": result}

6. Webhook Notification

async def webhook_agent(message: dict):
    """Automatic webhook notification"""
    webhook = message.get("webhook")
    content = message.get("content", {})
    
    # Process
    result = await process(content)
    
    # OmniDaemon automatically POSTs result to webhook
    # But you can also do it manually for custom logic:
    if webhook:
        await http_post(webhook, result)
    
    return {"status": "success", "result": result}

Return Values

Your callback should return a dictionary (or None).

Success Response

async def my_agent(message: dict):
    result = await process(message)
    
    return {
        "status": "success",
        "result": result,
        "metadata": {"processed_at": datetime.now().isoformat()}
    }

Error Response

async def my_agent(message: dict):
    try:
        result = await process(message)
        return {"status": "success", "result": result}
    
    except ValueError as e:
        # Permanent error - don't retry
        return {
            "status": "error",
            "error": str(e),
            "error_type": "validation_error"
        }

No Return (None)

async def my_agent(message: dict):
    # Process but don't need to return anything
    await log_event(message)
    # Implicitly returns None

What Happens to Return Values?

  1. Stored in storage backend (for 24 hours with TTL)
  2. Sent to webhook (if webhook URL provided)
  3. Published to reply_to topic (if specified)
  4. Tracked in metrics (success/failure counts)

Error Handling

Retriable vs Non-Retriable Errors

async def my_agent(message: dict):
    try:
        result = await process(message)
        return {"status": "success", "result": result}
    
    except NetworkError as e:
        # RETRIABLE: Network issues, rate limits, timeouts
        logger.warning(f"Temporary failure: {e}")
        raise  # OmniDaemon will retry
    
    except ValidationError as e:
        # NON-RETRIABLE: Bad data, invalid input
        logger.error(f"Permanent failure: {e}")
        return {"status": "error", "error": str(e)}  # Don't retry

Retry Configuration

await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file.uploaded",
        callback=my_agent,
        config=SubscriptionConfig(
            max_retries=3,          # Retry up to 3 times
            reclaim_idle_ms=300000, # Reclaim after 5 min
            dlq_enabled=True,       # Send to DLQ after max retries
        )
    )
)

Custom Retry Logic

async def smart_retry_agent(message: dict):
    """Custom retry logic on top of OmniDaemon's retries"""
    content = message.get("content", {})
    retry_count = content.get("_retry_count", 0)
    
    try:
        result = await process(content)
        return {"status": "success", "result": result}
    
    except TemporaryError as e:
        # Retriable error
        if retry_count < 3:
            content["_retry_count"] = retry_count + 1
            # OmniDaemon will auto-retry
            raise
        else:
            # Max retries reached
            return {
                "status": "failed",
                "error": str(e),
                "retries": retry_count
            }
    
    except PermanentError as e:
        # Non-retriable error
        return {"status": "error", "error": str(e)}

Async Best Practices

Use async/await Properly

# ✅ Good - properly async
async def my_agent(message: dict):
    result = await async_api_call()
    data = await another_async_call()
    return {"result": result + data}

# ❌ Bad - blocking calls in async function
async def bad_agent(message: dict):
    result = blocking_api_call()  # Blocks the event loop!
    return {"result": result}

# ✅ Good - wrap blocking calls
async def good_agent(message: dict):
    import asyncio
    result = await asyncio.to_thread(blocking_api_call)
    return {"result": result}

Don’t Block the Event Loop

# ❌ Bad - long-running CPU-bound work
async def bad_agent(message: dict):
    result = expensive_computation()  # Blocks event loop
    return {"result": result}

# ✅ Good - use thread pool for CPU-bound work
async def good_agent(message: dict):
    import asyncio
    result = await asyncio.to_thread(expensive_computation)
    return {"result": result}

Handle Timeouts

async def timeout_agent(message: dict):
    """Add timeout to prevent hanging"""
    import asyncio
    
    try:
        # Timeout after 30 seconds
        result = await asyncio.wait_for(
            process(message),
            timeout=30.0
        )
        return {"status": "success", "result": result}
    
    except asyncio.TimeoutError:
        logger.error("Processing timeout")
        return {
            "status": "timeout",
            "error": "Processing took too long"
        }

Performance Tips

1. Initialize Outside Callback

# ✅ Good - initialize once
agent = OmniAgent(...)  # Initialize OUTSIDE

async def callback(message: dict):
    result = await agent.run(...)  # Reuse agent
    return {"result": result}

# ❌ Bad - initialize every time
async def callback(message: dict):
    agent = OmniAgent(...)  # NEW agent every call!
    result = await agent.run(...)
    return {"result": result}

2. Connection Pooling

# ✅ Good - reuse connections
import aiohttp

# Create session OUTSIDE callback
session = aiohttp.ClientSession()

async def callback(message: dict):
    async with session.get("...") as resp:  # Reuse connection
        data = await resp.json()
    return {"data": data}

# ❌ Bad - new session every time
async def callback(message: dict):
    async with aiohttp.ClientSession() as session:  # NEW session!
        async with session.get("...") as resp:
            data = await resp.json()
    return {"data": data}

3. Batch Operations

async def batch_agent(message: dict):
    """Batch multiple items together"""
    content = message.get("content", {})
    items = content.get("items", [])
    
    # Process in batches
    batch_size = 10
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i+batch_size]
        batch_result = await process_batch(batch)
        results.extend(batch_result)
    
    return {"status": "success", "results": results}

4. Parallel Processing

async def parallel_agent(message: dict):
    """Process multiple items in parallel"""
    import asyncio
    
    content = message.get("content", {})
    items = content.get("items", [])
    
    # Process all items concurrently
    tasks = [process_item(item) for item in items]
    results = await asyncio.gather(*tasks)
    
    return {"status": "success", "results": results}

Testing Callbacks

Unit Testing

import pytest

async def my_agent(message: dict):
    content = message.get("content", {})
    return {"result": content["value"] * 2}

@pytest.mark.asyncio
async def test_my_agent():
    # Arrange
    message = {
        "content": {"value": 5},
        "correlation_id": "test-123"
    }
    
    # Act
    result = await my_agent(message)
    
    # Assert
    assert result["result"] == 10

Mocking

from unittest.mock import patch, AsyncMock

async def my_agent(message: dict):
    content = message.get("content", {})
    result = await external_api_call(content)
    return {"result": result}

@pytest.mark.asyncio
@patch("my_module.external_api_call", new_callable=AsyncMock)
async def test_my_agent_mocked(mock_api):
    # Mock external API
    mock_api.return_value = "mocked_result"
    
    # Test
    message = {"content": {"data": "test"}}
    result = await my_agent(message)
    
    assert result["result"] == "mocked_result"
    mock_api.assert_called_once()

Further Reading


Summary

Key Points:
  • Callback = Where your AI agent runs
  • Receives full EventEnvelope (content + metadata)
  • Use metadata for smart decisions (tenant, source, correlation)
  • Return dict or None
  • Handle errors properly (retriable vs non-retriable)
  • Must be async function
  • Initialize outside, reuse inside
Metadata You Get:
  • content - Your data
  • correlation_id - Request tracking
  • causation_id - Event chain
  • tenant_id - Multi-tenancy
  • source - Event origin
  • webhook - HTTP callback
  • reply_to - Response topic
Smart Patterns:
  • Multi-tenant routing
  • Request tracking
  • Source-based processing
  • Event chain tracking
  • Conditional processing
  • Webhook notifications
The callback is your agent’s brain - use it wisely! 🧠✨