Skip to main content

Monitoring & Debugging

This guide covers everything you need to monitor, debug, and troubleshoot your OmniDaemon system in production.

Overview

What You’ll Learn:
  • ✅ Real-time health monitoring
  • ✅ Metrics collection and analysis
  • ✅ Agent monitoring (status, performance)
  • ✅ Event bus monitoring (streams, consumers, DLQ)
  • ✅ Storage monitoring (health, capacity)
  • ✅ Debugging failed events
  • ✅ Performance optimization
  • ✅ Production best practices

Health Monitoring

Check Overall System Health

omnidaemon health
Output:
╔══════════════════════════════════════════════════════════╗
║              📊 OmniDaemon Health Status                ║
╚══════════════════════════════════════════════════════════╝

System Status: ● RUNNING
Runner ID: runner-abc-123
Uptime: 2.5 hours

━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
                      🔌 Connections                      
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Event Bus:  ✓ Healthy (redis_stream)
Storage:    ✓ Healthy (redis)

✓ System operational and healthy
What It Shows:
  • System Status: RUNNING, READY, DEGRADED, or DOWN
  • Runner ID: Unique identifier for this runner instance
  • Uptime: How long the runner has been active
  • Event Bus: Connection status and type
  • Storage: Connection status and backend type

Health Status Meanings

StatusMeaningAction
RUNNINGAgent runner active, all systems healthy✅ Normal operation
READYNo runner, but event bus and storage healthyℹ️ Ready to start agents
DEGRADEDOne system unhealthy (bus OR storage)⚠️ Investigate unhealthy component
DOWNBoth event bus and storage unhealthy❌ Critical: Check connections

Programmatic Health Check

from omnidaemon import OmniDaemonSDK

sdk = OmniDaemonSDK()

# Get health status
health = await sdk.health()

print(f"Status: {health['status']}")
print(f"Runner: {health['runner']['id']}")
print(f"Uptime: {health['runner']['uptime']}")
print(f"Event Bus: {health['event_bus']['status']}")
print(f"Storage: {health['storage']['status']}")
Use Cases:
  • ✅ Kubernetes liveness probes
  • ✅ Load balancer health checks
  • ✅ Monitoring dashboards
  • ✅ Alerting systems

Agent Monitoring

List All Agents

omnidaemon agent list
Output:
╔══════════════════════════════════════════════════════════╗
║                 🤖 Registered Agents                    ║
╚══════════════════════════════════════════════════════════╝

📂 file_system.tasks
   └─ OMNICOREAGENT_FILESYSTEM_AGENT
      ├─ Callback: call_file_system_agent
      ├─ Consumers: 3
      └─ Config: reclaim=6000ms, retries=3

📂 analysis.tasks
   └─ ANALYZER_AGENT
      ├─ Callback: analyze_data
      ├─ Consumers: 1
      └─ Config: reclaim=300000ms, retries=5

✓ 2 agents across 2 topics
Tree View (default):
  • Shows topic hierarchy
  • Agent details nested under topics
  • Easy to scan
Alternative Formats:
# Compact view
omnidaemon agent list --format compact

# Table view
omnidaemon agent list --format table

# JSON view
omnidaemon agent list --format json

Get Agent Details

omnidaemon agent get --topic file_system.tasks --name OMNICOREAGENT_FILESYSTEM_AGENT
Output:
╔══════════════════════════════════════════════════════════╗
║                   🤖 Agent Details                      ║
╚══════════════════════════════════════════════════════════╝

Name:        OMNICOREAGENT_FILESYSTEM_AGENT
Topic:       file_system.tasks
Callback:    call_file_system_agent
Description: Help the user manage their files

Configuration:
  └─ reclaim_idle_ms:   6000
  └─ dlq_retry_limit:   3
  └─ consumer_count:    3

Created: 2025-11-09 10:30:45

Programmatic Agent Monitoring

# List all agents
agents = await sdk.list_agents()
for topic, agent_list in agents.items():
    print(f"Topic: {topic}")
    for agent in agent_list:
        print(f"  - {agent['name']}")

# Get specific agent
agent = await sdk.get_agent("file_system.tasks", "OMNICOREAGENT_FILESYSTEM_AGENT")
print(f"Agent: {agent['name']}")
print(f"Consumers: {agent['config']['consumer_count']}")

Metrics Monitoring

View All Metrics

omnidaemon metrics
Output:
╔══════════════════════════════════════════════════════════╗
║                  📊 Performance Metrics                 ║
╚══════════════════════════════════════════════════════════╝

Topic                 Agent             Received  Processed  Failed  Avg Time
─────────────────────────────────────────────────────────────────────────────
file_system.tasks    OMNICORE_AGENT         150        145       5   1.35s
analysis.tasks       ANALYZER_AGENT          50         48       2   0.85s
─────────────────────────────────────────────────────────────────────────────
TOTAL                                       200        193       7   1.10s

Success Rate: 96.5%
Total Processing Time: 212.3s
What It Shows:
  • Received: Tasks delivered to agent
  • Processed: Tasks completed successfully
  • Failed: Tasks that errored (sent to DLQ)
  • Avg Time: Average processing time per task
  • Success Rate: (Processed / Received) × 100%

Filter Metrics by Topic

omnidaemon metrics --topic file_system.tasks

Export Metrics

# JSON format for parsing
omnidaemon metrics --format json > metrics.json

# CSV format for spreadsheets
omnidaemon metrics --format csv > metrics.csv

Programmatic Metrics

# Get aggregated metrics
metrics = await sdk.metrics()

for topic, agents in metrics.items():
    print(f"Topic: {topic}")
    for agent, stats in agents.items():
        print(f"  {agent}:")
        print(f"    Received: {stats['received']}")
        print(f"    Processed: {stats['processed']}")
        print(f"    Failed: {stats['failed']}")
        print(f"    Avg Time: {stats['avg_processing_time_ms']}ms")
Use Cases:
  • ✅ Performance dashboards
  • ✅ SLA monitoring
  • ✅ Capacity planning
  • ✅ Bottleneck identification

Event Bus Monitoring

List All Streams

omnidaemon bus list
Output:
╔══════════════════════════════════════════════════════════╗
║                  📊 Streams Overview                    ║
╚══════════════════════════════════════════════════════════╝

Stream                    Messages
─────────────────────────────────
file_system.tasks            1,523
analysis.tasks                 450
notifications                  125
─────────────────────────────────

✓ 3 active streams

Inspect Stream Messages

omnidaemon bus inspect --stream file_system.tasks --limit 10
Output:
╔══════════════════════════════════════════════════════════╗
║         📨 Recent Messages: file_system.tasks           ║
╚══════════════════════════════════════════════════════════╝

Message ID: 1699456789012-0
{
  "content": "List all files",
  "correlation_id": "req-123",
  "tenant_id": "customer-acme",
  "delivery_attempts": 0,
  "created_at": 1699456789.012
}

Message ID: 1699456788001-0
{
  "content": "Upload file",
  "correlation_id": "req-122",
  "delivery_attempts": 1,
  "created_at": 1699456788.001
}

Showing 10 most recent messages

List Consumer Groups

omnidaemon bus groups --stream file_system.tasks
Output:
╔══════════════════════════════════════════════════════════╗
║        👥 Consumer Groups: file_system.tasks            ║
╚══════════════════════════════════════════════════════════╝

Group                                    Consumers  Pending
────────────────────────────────────────────────────────────
group:file_system.tasks:OMNICORE_AGENT        3         2
group:file_system.tasks:BACKUP_AGENT          1         0
────────────────────────────────────────────────────────────

✓ 2 consumer groups
What It Shows:
  • Group: Consumer group name
  • Consumers: Number of active consumers
  • Pending: Messages in Pending Entries List (not ack’d)

Check Dead Letter Queue (DLQ)

omnidaemon bus dlq --topic file_system.tasks --limit 10
Output:
╔══════════════════════════════════════════════════════════╗
║                   💀 DLQ Entries                        ║
╚══════════════════════════════════════════════════════════╝

Message ID: 1699456789012-0
{
  "topic": "file_system.tasks",
  "original_id": "1699456789012-0",
  "failed_message": {
    "content": "Corrupt data",
    "correlation_id": "req-123"
  },
  "error": "Max retries (3) exceeded",
  "retry_count": 3,
  "failed_at": 1699456799.012
}

ℹ 1 entry in DLQ
Why Messages Go to DLQ:
  • ❌ Max retries exceeded (default: 3)
  • ❌ Callback raised exception repeatedly
  • ❌ Message processing timeout
  • ❌ Invalid message format

Get Bus Statistics

omnidaemon bus stats
Output:
╔══════════════════════════════════════════════════════════╗
║              📊 Event Bus Statistics                    ║
╚══════════════════════════════════════════════════════════╝

Streams:         3
Consumer Groups: 5
Total Messages:  2,098
DLQ Entries:     12

Memory Usage:    2.3 MB
Connected Clients: 4
Redis Version:   7.0.12

✓ Event bus operational

Storage Monitoring

Check Storage Health

omnidaemon storage health
Output (Redis backend):
╔══════════════════════════════════════════════════════════╗
║               💾 Storage Health Status                  ║
╚══════════════════════════════════════════════════════════╝

Backend:        redis
Status:         ✓ Healthy
URL:            redis://localhost:6379
Latency:        0.85 ms

Redis Info:
  └─ Version:   7.0.12
  └─ Memory:    15.2 MB used
  └─ Clients:   4 connected

Data Counts:
  └─ Agents:    8
  └─ Results:   156
  └─ Metrics:   2,450

✓ Storage operational
Output (JSON backend):
╔══════════════════════════════════════════════════════════╗
║               💾 Storage Health Status                  ║
╚══════════════════════════════════════════════════════════╝

Backend:        json
Status:         ✓ Healthy
Directory:      .omnidaemon_data
Connected:      true

Data Counts:
  └─ Agents:    2
  └─ Results:   45
  └─ Metrics:   890

✓ Storage operational

Programmatic Storage Monitoring

# Check storage health
health = await sdk.storage_health()

print(f"Backend: {health['backend']}")
print(f"Status: {health['status']}")

if health['backend'] == 'redis':
    print(f"Latency: {health['latency_ms']}ms")
    print(f"Memory: {health['used_memory']}")

Debugging Failed Events

Step 1: Check Metrics

# See which topic has failures
omnidaemon metrics
Look for:
  • ❌ High “Failed” count
  • ❌ Low success rate (< 95%)
  • ⏱️ High average processing time

Step 2: Inspect DLQ

# View failed messages
omnidaemon bus dlq --topic file_system.tasks --limit 20
Analyze:
  • error: Why it failed
  • retry_count: How many times it was tried
  • failed_message: Original payload

Step 3: Identify Root Cause

Common Failure Patterns: 1. Invalid Data
{
  "error": "KeyError: 'file_path'",
  "failed_message": {
    "content": {
      "action": "upload"
      // Missing 'file_path' field!
    }
  }
}
Fix: Add validation in callback 2. External Service Down
{
  "error": "ConnectionError: API unreachable",
  "retry_count": 3
}
Fix: Increase max_retries for transient errors 3. Timeout
{
  "error": "TimeoutError: Processing took > 180s",
  "retry_count": 3
}
Fix: Increase reclaim_idle_ms or optimize callback 4. Bad Logic
{
  "error": "ValueError: Expected integer, got string",
  "retry_count": 3
}
Fix: Fix callback logic

Step 4: Test Fix

# Get failed message from DLQ
dlq_entries = await sdk.inspect_dlq("file_system.tasks")
failed_msg = dlq_entries[0]["data"]["failed_message"]

# Test callback locally
from your_agent import your_callback

try:
    result = await your_callback(failed_msg)
    print(f"✅ Fixed! Result: {result}")
except Exception as e:
    print(f"❌ Still failing: {e}")

Step 5: Republish (if needed)

# After fixing, republish from DLQ
event = EventEnvelope(
    topic="file_system.tasks",
    correlation_id=failed_msg.get("correlation_id"),
    payload=PayloadBase(
        content=failed_msg["content"]
    ),
)

task_id = await sdk.publish_task(event_envelope=event)
print(f"✅ Republished: {task_id}")

Performance Optimization

1. Monitor Processing Times

# Check average times
omnidaemon metrics
Targets:
  • ✅ Fast tasks: < 1 second
  • ✅ Medium tasks: 1-10 seconds
  • ⚠️ Slow tasks: > 10 seconds
If slow:
  • Optimize callback logic
  • Reduce external API calls
  • Use caching
  • Process asynchronously

2. Monitor Pending Messages

# Check pending count
omnidaemon bus groups --stream file_system.tasks
High pending (> 100)?
  • ⚠️ Agents can’t keep up with load
  • Fix: Increase consumer_count
await sdk.register_agent(
    agent_config=AgentConfig(
        topic="file_system.tasks",
        callback=your_callback,
        config=SubscriptionConfig(
            consumer_count=10,  # Increase from 3
        ),
    )
)

3. Monitor DLQ Growth

# Check DLQ regularly
omnidaemon bus dlq --topic file_system.tasks
Growing DLQ?
  • ❌ Systematic failures
  • Fix: Identify and fix root cause (see Debugging section)

4. Monitor Memory Usage

# Redis backend
omnidaemon storage health
High memory (> 80% of limit)?
  • Clear old data
  • Reduce result TTL
  • Trim metrics
# Clear old results
omnidaemon storage clear-results

# Clear old metrics
omnidaemon storage clear-metrics

Production Monitoring Best Practices

1. Set Up Continuous Monitoring

# Cron job: Check health every minute
* * * * * omnidaemon health --format json > /var/log/omnidaemon/health.json

2. Alert on Anomalies

# Alert script (run every 5 minutes)
import asyncio
from omnidaemon import OmniDaemonSDK

sdk = OmniDaemonSDK()

async def check_and_alert():
    # Check health
    health = await sdk.health()
    if health["status"] not in ["RUNNING", "READY"]:
        send_alert(f"OmniDaemon health: {health['status']}")
    
    # Check metrics
    metrics = await sdk.metrics()
    for topic, agents in metrics.items():
        for agent, stats in agents.items():
            success_rate = stats["processed"] / stats["received"] * 100
            if success_rate < 95:
                send_alert(f"{agent} success rate: {success_rate}%")
            
            if stats["avg_processing_time_ms"] > 10000:  # 10 seconds
                send_alert(f"{agent} slow: {stats['avg_processing_time_ms']}ms")
    
    # Check DLQ
    for topic in ["file_system.tasks", "analysis.tasks"]:
        dlq = await sdk.inspect_dlq(topic, limit=1)
        if len(dlq) > 0:
            send_alert(f"DLQ has entries for {topic}")

asyncio.run(check_and_alert())

3. Log Aggregation

# Structured logging for aggregation
import logging
import json

logger = logging.getLogger("omnidaemon")

async def log_metrics():
    metrics = await sdk.metrics()
    
    for topic, agents in metrics.items():
        for agent, stats in agents.items():
            # JSON logging for Elasticsearch/Splunk
            logger.info(json.dumps({
                "type": "metrics",
                "topic": topic,
                "agent": agent,
                "received": stats["received"],
                "processed": stats["processed"],
                "failed": stats["failed"],
                "success_rate": stats["processed"] / stats["received"],
                "avg_time_ms": stats["avg_processing_time_ms"],
            }))

4. Dashboards

Grafana/Prometheus Example:
# Export metrics for Prometheus
from prometheus_client import Gauge, Counter

tasks_received = Counter("omnidaemon_tasks_received", "Tasks received", ["topic", "agent"])
tasks_processed = Counter("omnidaemon_tasks_processed", "Tasks processed", ["topic", "agent"])
tasks_failed = Counter("omnidaemon_tasks_failed", "Tasks failed", ["topic", "agent"])
processing_time = Gauge("omnidaemon_processing_time_ms", "Avg processing time", ["topic", "agent"])

async def export_metrics():
    metrics = await sdk.metrics()
    
    for topic, agents in metrics.items():
        for agent, stats in agents.items():
            tasks_received.labels(topic=topic, agent=agent).inc(stats["received"])
            tasks_processed.labels(topic=topic, agent=agent).inc(stats["processed"])
            tasks_failed.labels(topic=topic, agent=agent).inc(stats["failed"])
            processing_time.labels(topic=topic, agent=agent).set(stats["avg_processing_time_ms"])

5. Regular Maintenance

# Daily: Check and clear old data
0 2 * * * omnidaemon storage clear-results  # 2 AM daily

# Weekly: Inspect DLQ and resolve
0 9 * * 1 omnidaemon bus dlq --topic all > /var/log/omnidaemon/dlq_weekly.log

# Monthly: Review metrics and optimize
# Export and analyze metrics trends

Troubleshooting Checklist

System Not Starting

# 1. Check Redis connection
redis-cli ping  # Should return PONG

# 2. Check environment variables
echo $REDIS_URL
echo $EVENT_BUS_TYPE
echo $STORAGE_BACKEND

# 3. Check logs
tail -f /var/log/omnidaemon/agent_runner.log

# 4. Test health
omnidaemon health

Agents Not Processing

# 1. Check if agents registered
omnidaemon agent list

# 2. Check event bus
omnidaemon bus list

# 3. Check consumer groups
omnidaemon bus groups --stream your_topic

# 4. Inspect stream for messages
omnidaemon bus inspect --stream your_topic

High Latency

# 1. Check metrics
omnidaemon metrics

# 2. Check pending messages
omnidaemon bus groups --stream slow_topic

# 3. Check Redis latency
omnidaemon storage health

# 4. Increase consumers
# Edit agent registration: consumer_count=10

DLQ Growing

# 1. Inspect DLQ
omnidaemon bus dlq --topic problematic_topic --limit 50

# 2. Identify pattern
# Look for common errors

# 3. Fix callback
# Update agent code

# 4. Test locally
# Process failed message manually

# 5. Deploy fix
# Restart agents

# 6. Monitor
# Watch DLQ stop growing

Further Reading


Summary

Key Monitoring Commands:
omnidaemon health              # System status
omnidaemon agent list          # Agent overview
omnidaemon metrics             # Performance metrics
omnidaemon bus list            # Streams overview
omnidaemon bus dlq --topic X   # Failed messages
omnidaemon storage health      # Storage status
Key Metrics to Watch:
  • ✅ Success rate (> 95%)
  • ✅ Processing time (< 10s)
  • ✅ Pending messages (< 100)
  • ✅ DLQ growth (near 0)
  • ✅ Memory usage (< 80%)
Production Best Practices:
  • Monitor continuously (cron/systemd timers)
  • Alert on anomalies
  • Aggregate logs
  • Create dashboards
  • Regular maintenance
Debugging Workflow:
  1. Check metrics (identify problem topic)
  2. Inspect DLQ (identify failure pattern)
  3. Analyze root cause
  4. Test fix locally
  5. Deploy and monitor
Monitoring is key to production success! 📊✨