# Saga Coordinator
async def process_order_saga(order_data: dict):
"""
Order processing saga:
1. Reserve inventory
2. Charge payment
3. Create shipment
If any step fails, compensate previous steps.
"""
saga_id = f"saga_{uuid.uuid4()}"
correlation_id = f"order_{order_data['order_id']}"
compensations = [] # Track what to undo
try:
# Step 1: Reserve inventory
inventory_result = await execute_saga_step(
topic="inventory.reserve",
correlation_id=correlation_id,
causation_id=saga_id,
content={"items": order_data["items"]},
)
if not inventory_result.get("success"):
raise SagaFailure("Inventory reservation failed")
compensations.append({
"topic": "inventory.release",
"content": {"reservation_id": inventory_result["reservation_id"]},
})
# Step 2: Charge payment
payment_result = await execute_saga_step(
topic="payment.charge",
correlation_id=correlation_id,
causation_id=saga_id,
content={
"amount": order_data["total"],
"customer_id": order_data["customer_id"],
},
)
if not payment_result.get("success"):
raise SagaFailure("Payment failed")
compensations.append({
"topic": "payment.refund",
"content": {"transaction_id": payment_result["transaction_id"]},
})
# Step 3: Create shipment
shipment_result = await execute_saga_step(
topic="shipment.create",
correlation_id=correlation_id,
causation_id=saga_id,
content={
"order_id": order_data["order_id"],
"address": order_data["shipping_address"],
},
)
if not shipment_result.get("success"):
raise SagaFailure("Shipment creation failed")
# SUCCESS! All steps completed
return {
"status": "completed",
"saga_id": saga_id,
"inventory": inventory_result,
"payment": payment_result,
"shipment": shipment_result,
}
except SagaFailure as e:
# FAILURE! Compensate all completed steps
print(f"❌ Saga failed: {e}")
await compensate_saga(compensations, correlation_id)
return {
"status": "failed",
"saga_id": saga_id,
"error": str(e),
"compensated": len(compensations),
}
async def execute_saga_step(topic: str, correlation_id: str, causation_id: str, content: dict):
"""Execute one saga step and wait for result."""
task_id = await sdk.publish_task(
EventEnvelope(
topic=topic,
correlation_id=correlation_id,
causation_id=causation_id,
payload=PayloadBase(content=content),
)
)
# Wait for result (with timeout)
for _ in range(60): # 30 seconds
result = await sdk.get_result(task_id)
if result is not None:
return result
await asyncio.sleep(0.5)
raise TimeoutError(f"Step {topic} timed out")
async def compensate_saga(compensations: list, correlation_id: str):
"""Execute compensation steps in reverse order."""
print(f"🔄 Compensating {len(compensations)} steps...")
for compensation in reversed(compensations):
try:
await sdk.publish_task(
EventEnvelope(
topic=compensation["topic"],
correlation_id=correlation_id,
payload=PayloadBase(content=compensation["content"]),
)
)
print(f"✅ Compensated: {compensation['topic']}")
except Exception as e:
print(f"❌ Compensation failed: {e}")
# Log for manual intervention
log_compensation_failure(compensation, e)
class SagaFailure(Exception):
"""Saga step failed."""
pass