Skip to content

Recovery Guide

Protect multi-agent workflows from cascading failures with bounded execution, circuit breakers, and graceful degradation. pyagent-patterns ships recovery primitives that compose with every orchestration pattern.


The Problem

LLM calls fail. Models time out, exceed context limits, return empty responses, or throw rate-limit errors. In a single-agent setup that's manageable. In a 5-agent pipeline it's not — one failure propagates forward and the whole workflow produces garbage or crashes.

Recovery gives you bounded behaviour: retry with the same pattern, fall back to a simpler one, or degrade gracefully to a cached or static response.


Three-Level Recovery

flowchart TD
    T[Task] --> L1[Level 0: Primary Pattern]
    L1 -->|Success| R[Result]
    L1 -->|Fail / Timeout / Token Limit| L2[Level 1: Fallback Pattern]
    L2 -->|Success| R
    L2 -->|Fail| L3[Level 2: Graceful Degradation]
    L3 --> R

    style L1 fill:#4CAF50,color:#fff
    style L2 fill:#FF9800,color:#fff
    style L3 fill:#f44336,color:#fff

BoundedExecution

The core recovery primitive. Wraps any pattern with: - Retry — re-runs the primary up to max_retries times on failure - Timeout — aborts if the pattern takes longer than timeout_seconds - Token limit — stops if the workflow consumes more than max_tokens - Fallback — switches to a simpler pattern if all retries fail

import asyncio
from pyagent_patterns.base import Agent
from pyagent_patterns.orchestration import Pipeline
from pyagent_patterns.recovery import BoundedExecution
from pyagent_providers import AnthropicLLM, OpenAILLM

# Primary: thorough multi-stage pipeline
primary = Pipeline(stages=[
    Agent("extractor", AnthropicLLM("claude-sonnet-4-20250514"),
          system_prompt="Extract all facts, figures, and entities."),
    Agent("analyst",   OpenAILLM("gpt-4o"),
          system_prompt="Deep analysis of the extracted data."),
    Agent("writer",    AnthropicLLM("claude-sonnet-4-20250514"),
          system_prompt="Write a detailed investment brief."),
])

# Fallback: single cheap agent
fallback = Pipeline(stages=[
    Agent("quick_analyst", OpenAILLM("gpt-4o-mini"),
          system_prompt="Give a concise summary of the key points."),
])

bounded = BoundedExecution(
    pattern=primary,
    fallback=fallback,
    max_retries=2,
    timeout_seconds=30.0,
    max_tokens=50_000,
)

result = asyncio.run(bounded.run("Analyse Tesla Q3 2025 earnings"))
print(f"Recovery level: {result.metadata.get('recovery_level', 0)}")
# 0 = primary succeeded
# 1 = fallback used (primary failed or timed out)
# 2 = degraded (both failed)
print(result.output)

Recovery metadata

Every result from BoundedExecution carries recovery metadata you can inspect or log:

meta = result.metadata
print(meta.get("recovery_level"))     # 0, 1, or 2
print(meta.get("retries_used"))       # 0–max_retries
print(meta.get("timeout_hit"))        # True if timeout caused fallback
print(meta.get("token_limit_hit"))    # True if token budget caused fallback
print(meta.get("primary_error"))      # Exception message if primary failed

CircuitBreaker

Prevent repeated calls to a pattern that's consistently failing. After failure_threshold consecutive failures, the circuit opens and rejects all requests for reset_timeout_seconds. After the timeout, it allows one test request through.

from pyagent_patterns.recovery import CircuitBreaker

cb = CircuitBreaker(
    failure_threshold=3,        # open after 3 consecutive failures
    reset_timeout_seconds=60,   # try again after 60s
)

result = asyncio.run(cb.execute(expensive_pattern, "Do something"))
state = result.metadata.get("circuit_state")
# "closed"    — normal operation
# "open"      — circuit tripped, request rejected immediately
# "half_open" — one test request allowed through

Combining BoundedExecution and CircuitBreaker

from pyagent_patterns.recovery import BoundedExecution, CircuitBreaker

# Primary wrapped with a circuit breaker
primary_with_breaker = CircuitBreaker(failure_threshold=3, reset_timeout_seconds=120)

bounded = BoundedExecution(
    pattern=primary_pipeline,
    fallback=cheap_fallback,
    max_retries=1,
    timeout_seconds=20.0,
)

async def run_safe(task: str):
    # Check circuit state first
    cb_result = await primary_with_breaker.execute(bounded, task)
    return cb_result.output

Recovery with CompositePattern

CompositePattern escalates through a series of patterns when quality checks fail. Wrap it with BoundedExecution to also handle hard failures (timeouts, API errors).

import asyncio
from pyagent_patterns.composite import CompositePattern, min_length_check
from pyagent_patterns.resolution import SelfReflection, Debate, Voting
from pyagent_patterns.recovery import BoundedExecution
from pyagent_providers import AnthropicLLM, OpenAILLM

cheap_llm     = OpenAILLM("gpt-4o-mini")
expensive_llm = AnthropicLLM("claude-sonnet-4-20250514")

# Escalation: quality-based escalation (cheap → moderate → expensive)
escalation = CompositePattern(
    patterns=[
        SelfReflection(
            agent=Agent("coder", cheap_llm),
            max_rounds=2,
        ),
        Debate(
            debaters=[Agent("pro", cheap_llm), Agent("con", cheap_llm)],
            judge=Agent("judge", expensive_llm),
            rounds=1,
        ),
        Voting(
            voters=[Agent(f"voter_{i}", cheap_llm) for i in range(3)]
        ),
    ],
    quality_check=min_length_check(200),
)

# Recovery: failure-based fallback (API down, timeout, token limit)
safe_workflow = BoundedExecution(
    pattern=escalation,
    fallback=Pipeline(stages=[Agent("last_resort", cheap_llm)]),
    max_retries=1,
    timeout_seconds=60.0,
)

result = asyncio.run(safe_workflow.run("Design a rate-limiting system"))
print(f"Escalation level: {result.metadata.get('escalation_level', 0)}")
print(f"Recovery level:   {result.metadata.get('recovery_level', 0)}")

Recovery Patterns by Failure Mode

API rate limits

import asyncio
from pyagent_patterns.recovery import BoundedExecution

# Generous retry with backoff for rate-limited APIs
bounded = BoundedExecution(
    pattern=primary,
    fallback=fallback,
    max_retries=3,
    retry_delay_seconds=5.0,   # wait 5s between retries
    timeout_seconds=120.0,
)

Context window exceeded

from pyagent_compress import CompressMiddleware, TokenBudget
from pyagent_patterns.recovery import BoundedExecution

# Compress first, recover if compression isn't enough
budget = TokenBudget(workflow_limit=100_000)
middleware = CompressMiddleware(target_ratio=0.5, budget=budget)

bounded = BoundedExecution(
    pattern=middleware.wrap_all(primary_stages),
    fallback=Pipeline(stages=[Agent("summariser", cheap_llm,
                                    system_prompt="Give a 3-sentence summary.")]),
    max_tokens=90_000,
)

Model unavailability

from pyagent_providers.router import ProviderRouter, RoutingStrategy
from pyagent_patterns.recovery import BoundedExecution

# Provider-level fallback (OpenAI → Anthropic → Gemini)
provider_router = ProviderRouter(registry, strategy=RoutingStrategy.FALLBACK_CHAIN)

# Pattern-level fallback (expensive → cheap)
bounded = BoundedExecution(
    pattern=Pipeline(stages=expensive_agents),
    fallback=Pipeline(stages=cheap_agents),
    max_retries=2,
    timeout_seconds=30.0,
)

Monitoring and Alerting

Track recovery events to know when your system is under stress:

import logging
from pyagent_patterns.recovery import BoundedExecution

logger = logging.getLogger(__name__)

bounded = BoundedExecution(
    pattern=primary,
    fallback=fallback,
    max_retries=2,
    timeout_seconds=30.0,
)

async def monitored_run(task: str):
    result = await bounded.run(task)
    level = result.metadata.get("recovery_level", 0)

    if level == 1:
        logger.warning("primary_failed", extra={
            "error": result.metadata.get("primary_error"),
            "retries": result.metadata.get("retries_used"),
        })
    elif level == 2:
        logger.error("full_degradation", extra={
            "task_preview": task[:100],
        })

    return result

See Also