Skip to content

pyagent-patterns API Reference

Base Classes

pyagent_patterns.base.Message dataclass

A single message in an agent conversation.

Attributes:

Name Type Description
role Role

The sender role (system, user, assistant, tool).

content str

The text content of the message.

name str | None

Optional agent name for multi-agent conversations.

metadata dict[str, Any]

Arbitrary key-value metadata attached to the message.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
@dataclass(frozen=True, slots=True)
class Message:
    """A single message in an agent conversation.

    Attributes:
        role: The sender role (system, user, assistant, tool).
        content: The text content of the message.
        name: Optional agent name for multi-agent conversations.
        metadata: Arbitrary key-value metadata attached to the message.
    """

    role: Role
    content: str
    name: str | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def system(cls, content: str, **kw: Any) -> Message:
        return cls(role=Role.SYSTEM, content=content, **kw)

    @classmethod
    def user(cls, content: str, **kw: Any) -> Message:
        return cls(role=Role.USER, content=content, **kw)

    @classmethod
    def assistant(cls, content: str, name: str | None = None, **kw: Any) -> Message:
        return cls(role=Role.ASSISTANT, content=content, name=name, **kw)

pyagent_patterns.base.Agent dataclass

An LLM-backed agent with a name, system prompt, and callable.

Parameters:

Name Type Description Default
name str

Human-readable agent name.

required
llm LLMCallable

The LLM callable to use for this agent.

required
system_prompt str

Optional system prompt prepended to every call.

''
description str

Description of the agent's purpose (for routing/selection).

''

Optional hooks (set via setter methods — no constructor change): _trace_bus: Emit agent_start/agent_end trace events. _context_ledger: Read context before LLM call; write output after. _compressor: Compress agent output before returning. _cost_tracker: Record cost per LLM call.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
@dataclass
class Agent:
    """An LLM-backed agent with a name, system prompt, and callable.

    Args:
        name: Human-readable agent name.
        llm: The LLM callable to use for this agent.
        system_prompt: Optional system prompt prepended to every call.
        description: Description of the agent's purpose (for routing/selection).

    Optional hooks (set via setter methods — no constructor change):
        _trace_bus: Emit agent_start/agent_end trace events.
        _context_ledger: Read context before LLM call; write output after.
        _compressor: Compress agent output before returning.
        _cost_tracker: Record cost per LLM call.
    """

    name: str
    llm: LLMCallable
    system_prompt: str = ""
    description: str = ""

    def __post_init__(self) -> None:
        self._trace_bus: Any = None
        self._context_ledger: Any = None
        self._compressor: Any = None
        self._cost_tracker: Any = None

    # -- Hook setters (return self for chaining) --

    def set_trace_bus(self, bus: Any) -> Agent:
        """Attach a TraceEventBus — emits agent_start/agent_end events on run()."""
        self._trace_bus = bus
        return self

    def set_context(self, ledger: Any) -> Agent:
        """Attach a ContextLedger — reads context before LLM, writes output after."""
        self._context_ledger = ledger
        return self

    def set_compressor(self, compressor: Any) -> Agent:
        """Attach a MessageCompressor — compresses output before returning."""
        self._compressor = compressor
        return self

    def set_cost_tracker(self, tracker: Any) -> Agent:
        """Attach a CostTracker — records cost after each LLM call."""
        self._cost_tracker = tracker
        return self

    async def run(self, messages: list[Message]) -> Message:
        """Send messages to the LLM and return an assistant message.

        When hooks are wired the execution order is:
        1. Emit ``agent_start`` trace event
        2. Prepend context from ledger as messages
        3. Call LLM
        4. Record cost
        5. Compress output
        6. Write result to context ledger
        7. Emit ``agent_end`` trace event with timing/tokens
        """
        start = time.perf_counter()

        # 1. Trace: agent_start
        if self._trace_bus is not None:
            self._emit_trace("agent_start", {})

        # 2. Context: prepend context messages
        call_messages = list(messages)
        if self._context_ledger is not None:
            try:
                ctx_messages = self._context_ledger.to_messages(max_tokens=None)
                if ctx_messages:
                    call_messages = ctx_messages + call_messages
            except Exception:
                logger.debug("Agent '%s': failed to read context ledger", self.name)

        if self.system_prompt:
            call_messages.insert(0, Message.system(self.system_prompt))

        # 3. Call LLM
        content = await self.llm(call_messages)

        # 4. Cost tracking
        if self._cost_tracker is not None:
            try:
                input_tokens = sum(len(m.content) for m in call_messages) // 4
                output_tokens = len(content) // 4
                self._cost_tracker.record(
                    pattern_type="agent",
                    agent_name=self.name,
                    model=getattr(self.llm, "_model", "unknown"),
                    input_tokens=input_tokens,
                    output_tokens=output_tokens,
                    cost_usd=0.0,  # real cost comes from provider
                )
            except Exception:
                logger.debug("Agent '%s': failed to record cost", self.name)

        # 5. Compression
        compression_meta: dict[str, Any] = {}
        if self._compressor is not None:
            try:
                compressed = self._compressor.compress(content)
                compression_meta = {
                    "compressed": True,
                    "original_tokens": compressed.original_tokens,
                    "compressed_tokens": compressed.compressed_tokens,
                    "savings_pct": compressed.savings_pct,
                }
                content = compressed.compressed
                # Emit compression trace event
                if self._trace_bus is not None and compressed.savings_pct > 0:
                    self._emit_trace(
                        "compression",
                        {
                            "original_tokens": compressed.original_tokens,
                            "compressed_tokens": compressed.compressed_tokens,
                            "savings_pct": compressed.savings_pct,
                        },
                    )
            except Exception:
                logger.debug("Agent '%s': compression failed, using original", self.name)

        # 6. Context: write output
        if self._context_ledger is not None:
            try:
                from pyagent_context.item import ContextItem, TrustLevel

                self._context_ledger.append(
                    ContextItem(
                        content=content,
                        source=self.name,
                        trust_level=TrustLevel.INFERRED,
                    )
                )
                # Emit context_update trace event
                if self._trace_bus is not None:
                    self._emit_trace(
                        "context_update",
                        {
                            "source": self.name,
                            "tokens": len(content) // 4,
                        },
                    )
            except Exception:
                logger.debug("Agent '%s': failed to write context", self.name)

        duration = time.perf_counter() - start

        # 7. Trace: agent_end
        if self._trace_bus is not None:
            self._emit_trace(
                "agent_end",
                {
                    "duration_seconds": duration,
                    "output_tokens": len(content) // 4,
                    **compression_meta,
                },
            )

        return Message.assistant(content, name=self.name, metadata=compression_meta)

    def _emit_trace(self, event_type: str, payload: dict[str, Any]) -> None:
        """Emit a trace event to the attached bus (no-op if bus is None)."""
        try:
            from pyagent_trace.events import TraceEvent

            self._trace_bus.emit(
                TraceEvent(
                    timestamp=time.time(),
                    event_type=event_type,
                    agent_name=self.name,
                    payload=payload,
                )
            )
        except Exception:
            pass  # trace must never break agent execution

run(messages) async

Send messages to the LLM and return an assistant message.

When hooks are wired the execution order is: 1. Emit agent_start trace event 2. Prepend context from ledger as messages 3. Call LLM 4. Record cost 5. Compress output 6. Write result to context ledger 7. Emit agent_end trace event with timing/tokens

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
async def run(self, messages: list[Message]) -> Message:
    """Send messages to the LLM and return an assistant message.

    When hooks are wired the execution order is:
    1. Emit ``agent_start`` trace event
    2. Prepend context from ledger as messages
    3. Call LLM
    4. Record cost
    5. Compress output
    6. Write result to context ledger
    7. Emit ``agent_end`` trace event with timing/tokens
    """
    start = time.perf_counter()

    # 1. Trace: agent_start
    if self._trace_bus is not None:
        self._emit_trace("agent_start", {})

    # 2. Context: prepend context messages
    call_messages = list(messages)
    if self._context_ledger is not None:
        try:
            ctx_messages = self._context_ledger.to_messages(max_tokens=None)
            if ctx_messages:
                call_messages = ctx_messages + call_messages
        except Exception:
            logger.debug("Agent '%s': failed to read context ledger", self.name)

    if self.system_prompt:
        call_messages.insert(0, Message.system(self.system_prompt))

    # 3. Call LLM
    content = await self.llm(call_messages)

    # 4. Cost tracking
    if self._cost_tracker is not None:
        try:
            input_tokens = sum(len(m.content) for m in call_messages) // 4
            output_tokens = len(content) // 4
            self._cost_tracker.record(
                pattern_type="agent",
                agent_name=self.name,
                model=getattr(self.llm, "_model", "unknown"),
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                cost_usd=0.0,  # real cost comes from provider
            )
        except Exception:
            logger.debug("Agent '%s': failed to record cost", self.name)

    # 5. Compression
    compression_meta: dict[str, Any] = {}
    if self._compressor is not None:
        try:
            compressed = self._compressor.compress(content)
            compression_meta = {
                "compressed": True,
                "original_tokens": compressed.original_tokens,
                "compressed_tokens": compressed.compressed_tokens,
                "savings_pct": compressed.savings_pct,
            }
            content = compressed.compressed
            # Emit compression trace event
            if self._trace_bus is not None and compressed.savings_pct > 0:
                self._emit_trace(
                    "compression",
                    {
                        "original_tokens": compressed.original_tokens,
                        "compressed_tokens": compressed.compressed_tokens,
                        "savings_pct": compressed.savings_pct,
                    },
                )
        except Exception:
            logger.debug("Agent '%s': compression failed, using original", self.name)

    # 6. Context: write output
    if self._context_ledger is not None:
        try:
            from pyagent_context.item import ContextItem, TrustLevel

            self._context_ledger.append(
                ContextItem(
                    content=content,
                    source=self.name,
                    trust_level=TrustLevel.INFERRED,
                )
            )
            # Emit context_update trace event
            if self._trace_bus is not None:
                self._emit_trace(
                    "context_update",
                    {
                        "source": self.name,
                        "tokens": len(content) // 4,
                    },
                )
        except Exception:
            logger.debug("Agent '%s': failed to write context", self.name)

    duration = time.perf_counter() - start

    # 7. Trace: agent_end
    if self._trace_bus is not None:
        self._emit_trace(
            "agent_end",
            {
                "duration_seconds": duration,
                "output_tokens": len(content) // 4,
                **compression_meta,
            },
        )

    return Message.assistant(content, name=self.name, metadata=compression_meta)

set_compressor(compressor)

Attach a MessageCompressor — compresses output before returning.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
def set_compressor(self, compressor: Any) -> Agent:
    """Attach a MessageCompressor — compresses output before returning."""
    self._compressor = compressor
    return self

set_context(ledger)

Attach a ContextLedger — reads context before LLM, writes output after.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
def set_context(self, ledger: Any) -> Agent:
    """Attach a ContextLedger — reads context before LLM, writes output after."""
    self._context_ledger = ledger
    return self

set_cost_tracker(tracker)

Attach a CostTracker — records cost after each LLM call.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
def set_cost_tracker(self, tracker: Any) -> Agent:
    """Attach a CostTracker — records cost after each LLM call."""
    self._cost_tracker = tracker
    return self

set_trace_bus(bus)

Attach a TraceEventBus — emits agent_start/agent_end events on run().

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
def set_trace_bus(self, bus: Any) -> Agent:
    """Attach a TraceEventBus — emits agent_start/agent_end events on run()."""
    self._trace_bus = bus
    return self

pyagent_patterns.base.Pattern

Bases: ABC

Abstract base class for all multi-agent patterns.

Subclasses must implement _execute. The run method handles timing, context creation, and metadata collection.

Optional hooks

_trace_bus: Emit pattern_start/pattern_end trace events.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
class Pattern(ABC):
    """Abstract base class for all multi-agent patterns.

    Subclasses must implement `_execute`. The `run` method handles timing,
    context creation, and metadata collection.

    Optional hooks:
        _trace_bus: Emit pattern_start/pattern_end trace events.
    """

    _trace_bus: Any = None

    def set_trace_bus(self, bus: Any) -> Pattern:
        """Attach a TraceEventBus — emits pattern_start/pattern_end on run()."""
        self._trace_bus = bus
        return self

    @property
    @abstractmethod
    def pattern_type(self) -> str:
        """Return the pattern type name (e.g., 'supervisor', 'debate')."""
        ...

    async def run(self, task: str, context: Context | None = None) -> Result:
        """Execute the pattern on the given task.

        Args:
            task: The user task or prompt.
            context: Optional existing context. Created automatically if None.

        Returns:
            Result with output, messages, metadata, timing, and cost estimates.
        """
        ctx = context or Context(task=task)
        ctx.messages.append(Message.user(task))

        # Trace: pattern_start
        if self._trace_bus is not None:
            self._emit_pattern_trace("pattern_start", {})

        start = time.perf_counter()
        result = await self._execute(ctx)
        result.duration_seconds = time.perf_counter() - start
        result.metadata["pattern_type"] = self.pattern_type

        # Rough token estimate: ~4 chars per token
        total_chars = sum(len(m.content) for m in result.messages)
        result.token_estimate = total_chars // 4

        # Trace: pattern_end
        if self._trace_bus is not None:
            self._emit_pattern_trace(
                "pattern_end",
                {
                    "duration_seconds": result.duration_seconds,
                    "token_estimate": result.token_estimate,
                    "output_length": len(result.output),
                },
            )

        return result

    def _emit_pattern_trace(self, event_type: str, payload: dict[str, Any]) -> None:
        """Emit a trace event to the attached bus."""
        try:
            from pyagent_trace.events import TraceEvent

            self._trace_bus.emit(
                TraceEvent(
                    timestamp=time.time(),
                    event_type=event_type,
                    pattern_type=self.pattern_type,
                    payload=payload,
                )
            )
        except Exception:
            pass  # trace must never break pattern execution

    @abstractmethod
    async def _execute(self, ctx: Context) -> Result:
        """Implement the pattern logic. Called by `run`."""
        ...

    async def stream(self, task: str, context: Context | None = None) -> AsyncIterator[str]:
        """Stream partial results as they become available.

        Default implementation runs the full pattern and yields the result.
        Subclasses can override for true streaming.
        """
        result = await self.run(task, context)
        yield result.output

pattern_type abstractmethod property

Return the pattern type name (e.g., 'supervisor', 'debate').

run(task, context=None) async

Execute the pattern on the given task.

Parameters:

Name Type Description Default
task str

The user task or prompt.

required
context Context | None

Optional existing context. Created automatically if None.

None

Returns:

Type Description
Result

Result with output, messages, metadata, timing, and cost estimates.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
async def run(self, task: str, context: Context | None = None) -> Result:
    """Execute the pattern on the given task.

    Args:
        task: The user task or prompt.
        context: Optional existing context. Created automatically if None.

    Returns:
        Result with output, messages, metadata, timing, and cost estimates.
    """
    ctx = context or Context(task=task)
    ctx.messages.append(Message.user(task))

    # Trace: pattern_start
    if self._trace_bus is not None:
        self._emit_pattern_trace("pattern_start", {})

    start = time.perf_counter()
    result = await self._execute(ctx)
    result.duration_seconds = time.perf_counter() - start
    result.metadata["pattern_type"] = self.pattern_type

    # Rough token estimate: ~4 chars per token
    total_chars = sum(len(m.content) for m in result.messages)
    result.token_estimate = total_chars // 4

    # Trace: pattern_end
    if self._trace_bus is not None:
        self._emit_pattern_trace(
            "pattern_end",
            {
                "duration_seconds": result.duration_seconds,
                "token_estimate": result.token_estimate,
                "output_length": len(result.output),
            },
        )

    return result

set_trace_bus(bus)

Attach a TraceEventBus — emits pattern_start/pattern_end on run().

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
def set_trace_bus(self, bus: Any) -> Pattern:
    """Attach a TraceEventBus — emits pattern_start/pattern_end on run()."""
    self._trace_bus = bus
    return self

stream(task, context=None) async

Stream partial results as they become available.

Default implementation runs the full pattern and yields the result. Subclasses can override for true streaming.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
async def stream(self, task: str, context: Context | None = None) -> AsyncIterator[str]:
    """Stream partial results as they become available.

    Default implementation runs the full pattern and yields the result.
    Subclasses can override for true streaming.
    """
    result = await self.run(task, context)
    yield result.output

pyagent_patterns.base.Context dataclass

Shared execution context for a pattern run.

Attributes:

Name Type Description
task str

The original user task/prompt.

messages list[Message]

Accumulated message history.

metadata dict[str, Any]

Arbitrary shared state across agents.

parent_id str | None

ID of the parent context (for nested patterns).

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
@dataclass
class Context:
    """Shared execution context for a pattern run.

    Attributes:
        task: The original user task/prompt.
        messages: Accumulated message history.
        metadata: Arbitrary shared state across agents.
        parent_id: ID of the parent context (for nested patterns).
    """

    task: str
    messages: list[Message] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)
    parent_id: str | None = None
    _id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])

    @property
    def id(self) -> str:
        return self._id

    def child(self, task: str | None = None) -> Context:
        """Create a child context for nested pattern execution."""
        return Context(
            task=task or self.task,
            metadata=dict(self.metadata),
            parent_id=self._id,
        )

child(task=None)

Create a child context for nested pattern execution.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
def child(self, task: str | None = None) -> Context:
    """Create a child context for nested pattern execution."""
    return Context(
        task=task or self.task,
        metadata=dict(self.metadata),
        parent_id=self._id,
    )

pyagent_patterns.base.Result dataclass

Outcome of a pattern execution.

Attributes:

Name Type Description
output str

The final output text.

messages list[Message]

All messages generated during execution.

metadata dict[str, Any]

Pattern-specific metadata (rounds, consensus, votes, etc.).

duration_seconds float

Wall-clock execution time.

token_estimate int

Rough estimate of total tokens consumed.

cost_estimate float

Rough estimate of total cost in USD.

Source code in packages/pyagent-patterns/src/pyagent_patterns/base.py
@dataclass
class Result:
    """Outcome of a pattern execution.

    Attributes:
        output: The final output text.
        messages: All messages generated during execution.
        metadata: Pattern-specific metadata (rounds, consensus, votes, etc.).
        duration_seconds: Wall-clock execution time.
        token_estimate: Rough estimate of total tokens consumed.
        cost_estimate: Rough estimate of total cost in USD.
    """

    output: str
    messages: list[Message] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)
    duration_seconds: float = 0.0
    token_estimate: int = 0
    cost_estimate: float = 0.0

Orchestration (Tier 1)

pyagent_patterns.orchestration.supervisor.Supervisor

Bases: Pattern

Classify → route → collect orchestration pattern.

Parameters:

Name Type Description Default
classifier Agent

Agent that classifies the task into a route key.

required
routes dict[str, Agent]

Mapping of route keys to specialist agents.

required
formatter Agent | None

Optional agent that formats the final response.

None
default_route str | None

Key to use when classification doesn't match any route.

None
Source code in packages/pyagent-patterns/src/pyagent_patterns/orchestration/supervisor.py
class Supervisor(Pattern):
    """Classify → route → collect orchestration pattern.

    Args:
        classifier: Agent that classifies the task into a route key.
        routes: Mapping of route keys to specialist agents.
        formatter: Optional agent that formats the final response.
        default_route: Key to use when classification doesn't match any route.
    """

    def __init__(
        self,
        classifier: Agent,
        routes: dict[str, Agent],
        formatter: Agent | None = None,
        default_route: str | None = None,
    ) -> None:
        self._classifier = classifier
        self._routes = routes
        self._formatter = formatter
        self._default_route = default_route

    @property
    def pattern_type(self) -> str:
        return "supervisor"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []

        # Step 1: Classify the task
        classify_prompt = (
            f"Classify the following task into exactly one of these categories: "
            f"{', '.join(self._routes.keys())}.\n"
            f"Respond with ONLY the category name, nothing else.\n\n"
            f"Task: {ctx.task}"
        )
        classify_msg = Message.user(classify_prompt)
        classification = await self._classifier.run([classify_msg])
        messages.append(classification)

        # Step 2: Route to specialist
        route_key = classification.content.strip().lower()
        agent = self._routes.get(route_key)
        if agent is None and self._default_route:
            agent = self._routes.get(self._default_route)
            route_key = self._default_route
        if agent is None:
            # Fallback: use first available route
            route_key = next(iter(self._routes))
            agent = self._routes[route_key]

        specialist_msg = await agent.run(ctx.messages)
        messages.append(specialist_msg)

        # Step 3: Optional formatting
        output = specialist_msg.content
        if self._formatter:
            format_msgs = [
                Message.user(
                    f"Format the following response for the user:\n\n{specialist_msg.content}"
                )
            ]
            formatted = await self._formatter.run(format_msgs)
            messages.append(formatted)
            output = formatted.content

        return Result(
            output=output,
            messages=messages,
            metadata={
                "route_key": route_key,
                "classifier_output": classification.content,
            },
        )

pyagent_patterns.orchestration.pipeline.Pipeline

Bases: Pattern

Sequential stage chain — output of stage N becomes input of stage N+1.

Parameters:

Name Type Description Default
stages list[Agent]

Ordered list of agents, each processing the previous output.

required
Source code in packages/pyagent-patterns/src/pyagent_patterns/orchestration/pipeline.py
class Pipeline(Pattern):
    """Sequential stage chain — output of stage N becomes input of stage N+1.

    Args:
        stages: Ordered list of agents, each processing the previous output.
    """

    def __init__(self, stages: list[Agent]) -> None:
        if not stages:
            raise ValueError("Pipeline requires at least one stage")
        self._stages = stages

    @property
    def pattern_type(self) -> str:
        return "pipeline"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        current_input = ctx.task

        for _i, stage in enumerate(self._stages):
            stage_msg = Message.user(current_input)
            response = await stage.run([stage_msg])
            messages.append(response)
            current_input = response.content

        return Result(
            output=current_input,
            messages=messages,
            metadata={"stages": len(self._stages), "stage_names": [s.name for s in self._stages]},
        )

    async def stream(self, task: str, context: Context | None = None) -> AsyncIterator[str]:
        """Stream stage completions as they finish."""
        context or Context(task=task)
        current_input = task

        for i, stage in enumerate(self._stages):
            stage_msg = Message.user(current_input)
            response = await stage.run([stage_msg])
            current_input = response.content
            yield f"[Stage {i + 1}/{len(self._stages)}{stage.name}] {current_input}"

stream(task, context=None) async

Stream stage completions as they finish.

Source code in packages/pyagent-patterns/src/pyagent_patterns/orchestration/pipeline.py
async def stream(self, task: str, context: Context | None = None) -> AsyncIterator[str]:
    """Stream stage completions as they finish."""
    context or Context(task=task)
    current_input = task

    for i, stage in enumerate(self._stages):
        stage_msg = Message.user(current_input)
        response = await stage.run([stage_msg])
        current_input = response.content
        yield f"[Stage {i + 1}/{len(self._stages)}{stage.name}] {current_input}"

pyagent_patterns.orchestration.fan_out_fan_in.FanOutFanIn

Bases: Pattern

Parallel execution with result aggregation.

Parameters:

Name Type Description Default
agents list[Agent]

List of agents to run in parallel on the same task.

required
aggregator Agent

Agent that combines all parallel outputs into one.

required
Source code in packages/pyagent-patterns/src/pyagent_patterns/orchestration/fan_out_fan_in.py
class FanOutFanIn(Pattern):
    """Parallel execution with result aggregation.

    Args:
        agents: List of agents to run in parallel on the same task.
        aggregator: Agent that combines all parallel outputs into one.
    """

    def __init__(self, agents: list[Agent], aggregator: Agent) -> None:
        if not agents:
            raise ValueError("FanOutFanIn requires at least one agent")
        self._agents = agents
        self._aggregator = aggregator

    @property
    def pattern_type(self) -> str:
        return "fan_out_fan_in"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []

        # Fan-out: run all agents in parallel
        tasks = [agent.run(ctx.messages) for agent in self._agents]
        parallel_results = await asyncio.gather(*tasks)
        messages.extend(parallel_results)

        # Fan-in: aggregate results
        combined = "\n\n".join(
            f"--- {self._agents[i].name} ---\n{r.content}" for i, r in enumerate(parallel_results)
        )
        agg_prompt = Message.user(
            f"Combine the following analyses into a unified response:\n\n{combined}"
        )
        aggregated = await self._aggregator.run([agg_prompt])
        messages.append(aggregated)

        return Result(
            output=aggregated.content,
            messages=messages,
            metadata={
                "parallel_agents": len(self._agents),
                "agent_names": [a.name for a in self._agents],
            },
        )

    async def stream(self, task: str, context: Context | None = None) -> AsyncIterator[str]:
        """Stream individual agent results as they complete."""
        ctx = context or Context(task=task)
        ctx.messages.append(Message.user(task))

        async def _run_one(agent: Agent) -> tuple[str, str]:
            result = await agent.run(ctx.messages)
            return agent.name, result.content

        tasks = [_run_one(agent) for agent in self._agents]
        for coro in asyncio.as_completed(tasks):
            name, content = await coro
            yield f"[{name}] {content}"

stream(task, context=None) async

Stream individual agent results as they complete.

Source code in packages/pyagent-patterns/src/pyagent_patterns/orchestration/fan_out_fan_in.py
async def stream(self, task: str, context: Context | None = None) -> AsyncIterator[str]:
    """Stream individual agent results as they complete."""
    ctx = context or Context(task=task)
    ctx.messages.append(Message.user(task))

    async def _run_one(agent: Agent) -> tuple[str, str]:
        result = await agent.run(ctx.messages)
        return agent.name, result.content

    tasks = [_run_one(agent) for agent in self._agents]
    for coro in asyncio.as_completed(tasks):
        name, content = await coro
        yield f"[{name}] {content}"

pyagent_patterns.orchestration.hierarchical.Hierarchical

Bases: Pattern

Manager → Team Leads → Workers hierarchical coordination.

Parameters:

Name Type Description Default
manager Agent

Top-level manager that decomposes work and synthesizes results.

required
teams list[Team]

List of teams, each with a lead and workers.

required
Source code in packages/pyagent-patterns/src/pyagent_patterns/orchestration/hierarchical.py
class Hierarchical(Pattern):
    """Manager → Team Leads → Workers hierarchical coordination.

    Args:
        manager: Top-level manager that decomposes work and synthesizes results.
        teams: List of teams, each with a lead and workers.
    """

    def __init__(self, manager: Agent, teams: list[Team]) -> None:
        self._manager = manager
        self._teams = teams

    @property
    def pattern_type(self) -> str:
        return "hierarchical"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []

        # Step 1: Manager decomposes task
        decompose_prompt = Message.user(
            f"Decompose this task into subtasks for these teams: "
            f"{', '.join(t.name for t in self._teams)}.\n"
            f"For each team, provide a clear subtask description.\n\n"
            f"Task: {ctx.task}"
        )
        manager_plan = await self._manager.run([decompose_prompt])
        messages.append(manager_plan)

        # Step 2: Teams work in parallel
        async def _run_team(team: Team, plan: str) -> tuple[str, list[Message]]:
            team_msgs: list[Message] = []

            # Team lead delegates to workers
            worker_tasks = [
                worker.run([Message.user(f"Team {team.name} task: {plan}\nDo your part.")])
                for worker in team.workers
            ]
            worker_results = await asyncio.gather(*worker_tasks)
            team_msgs.extend(worker_results)

            # Team lead synthesizes worker outputs
            worker_summary = "\n".join(
                f"- {team.workers[i].name}: {r.content}" for i, r in enumerate(worker_results)
            )
            lead_msg = Message.user(f"Synthesize your team's work:\n{worker_summary}")
            lead_result = await team.lead.run([lead_msg])
            team_msgs.append(lead_result)
            return lead_result.content, team_msgs

        team_tasks = [_run_team(team, manager_plan.content) for team in self._teams]
        team_outputs = await asyncio.gather(*team_tasks)

        for _, team_msgs in team_outputs:
            messages.extend(team_msgs)

        # Step 3: Manager synthesizes all team outputs
        team_summary = "\n\n".join(
            f"--- {self._teams[i].name} Team ---\n{output}"
            for i, (output, _) in enumerate(team_outputs)
        )
        synthesis_prompt = Message.user(
            f"Synthesize these team outputs into a final response:\n\n{team_summary}"
        )
        final = await self._manager.run([synthesis_prompt])
        messages.append(final)

        return Result(
            output=final.content,
            messages=messages,
            metadata={
                "teams": len(self._teams),
                "total_workers": sum(len(t.workers) for t in self._teams),
                "team_names": [t.name for t in self._teams],
            },
        )

pyagent_patterns.orchestration.orchestrator_workers.OrchestratorWorkers

Bases: Pattern

Dynamic task delegation: orchestrator plans → workers execute → synthesize.

Parameters:

Name Type Description Default
orchestrator Agent

Agent that plans the work and synthesizes results.

required
workers list[Agent]

Pool of available worker agents. The orchestrator selects from these.

required
Source code in packages/pyagent-patterns/src/pyagent_patterns/orchestration/orchestrator_workers.py
class OrchestratorWorkers(Pattern):
    """Dynamic task delegation: orchestrator plans → workers execute → synthesize.

    Args:
        orchestrator: Agent that plans the work and synthesizes results.
        workers: Pool of available worker agents. The orchestrator selects from these.
    """

    def __init__(self, orchestrator: Agent, workers: list[Agent]) -> None:
        self._orchestrator = orchestrator
        self._workers = {w.name: w for w in workers}

    @property
    def pattern_type(self) -> str:
        return "orchestrator_workers"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        worker_names = list(self._workers.keys())

        # Step 1: Orchestrator plans the work
        plan_prompt = Message.user(
            f"You have these workers available: {', '.join(worker_names)}.\n"
            f"Plan how to accomplish this task by assigning subtasks to workers.\n"
            f'Respond as JSON: {{"assignments": [{{"worker": "name", "subtask": "description"}}]}}\n\n'
            f"Task: {ctx.task}"
        )
        plan_msg = await self._orchestrator.run([plan_prompt])
        messages.append(plan_msg)

        # Step 2: Parse assignments and dispatch
        assignments = self._parse_assignments(plan_msg.content)
        worker_results: list[tuple[str, str]] = []

        async def _run_assignment(worker_name: str, subtask: str) -> tuple[str, str]:
            worker = self._workers.get(worker_name)
            if worker is None:
                # Fallback to first available worker
                worker = next(iter(self._workers.values()))
            result = await worker.run([Message.user(subtask)])
            return worker.name, result.content

        tasks = [_run_assignment(a["worker"], a["subtask"]) for a in assignments]
        results = await asyncio.gather(*tasks)
        for name, content in results:
            messages.append(Message.assistant(content, name=name))
            worker_results.append((name, content))

        # Step 3: Orchestrator synthesizes
        results_summary = "\n\n".join(
            f"--- {name} ---\n{content}" for name, content in worker_results
        )
        synthesis_prompt = Message.user(
            f"Synthesize these worker results into a final response:\n\n{results_summary}"
        )
        final = await self._orchestrator.run([synthesis_prompt])
        messages.append(final)

        return Result(
            output=final.content,
            messages=messages,
            metadata={
                "assignments": assignments,
                "workers_used": len(assignments),
            },
        )

    @staticmethod
    def _parse_assignments(content: str) -> list[dict[str, str]]:
        """Parse orchestrator's JSON assignment plan. Falls back gracefully."""
        try:
            # Try to extract JSON from the response
            start = content.find("{")
            end = content.rfind("}") + 1
            if start >= 0 and end > start:
                data = json.loads(content[start:end])
                return data.get("assignments", [])
        except (json.JSONDecodeError, KeyError):
            pass
        # Fallback: single assignment to first worker
        return [{"worker": "default", "subtask": content}]

Resolution (Tier 2)

pyagent_patterns.resolution.self_reflection.SelfReflection

Bases: Pattern

Generate → critique → refine iterative loop.

Parameters:

Name Type Description Default
agent Agent

The agent that generates and refines output.

required
critic Agent | None

Optional separate critic agent. If None, the same agent self-critiques.

None
max_rounds int

Maximum number of generate-critique rounds.

3
stop_phrase str

If the critic's response contains this phrase, stop early.

'APPROVED'
Source code in packages/pyagent-patterns/src/pyagent_patterns/resolution/self_reflection.py
class SelfReflection(Pattern):
    """Generate → critique → refine iterative loop.

    Args:
        agent: The agent that generates and refines output.
        critic: Optional separate critic agent. If None, the same agent self-critiques.
        max_rounds: Maximum number of generate-critique rounds.
        stop_phrase: If the critic's response contains this phrase, stop early.
    """

    def __init__(
        self,
        agent: Agent,
        critic: Agent | None = None,
        max_rounds: int = 3,
        stop_phrase: str = "APPROVED",
    ) -> None:
        self._agent = agent
        self._critic = critic or agent
        self._max_rounds = max_rounds
        self._stop_phrase = stop_phrase

    @property
    def pattern_type(self) -> str:
        return "self_reflection"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        current_output = ""
        critique_text = ""

        for round_num in range(1, self._max_rounds + 1):
            # Generate (or refine)
            if round_num == 1:
                gen_prompt = Message.user(ctx.task)
            else:
                gen_prompt = Message.user(
                    f"Revise your previous output based on this feedback:\n\n"
                    f"Previous output:\n{current_output}\n\n"
                    f"Feedback:\n{critique_text}"
                )
            gen_result = await self._agent.run([gen_prompt])
            messages.append(gen_result)
            current_output = gen_result.content

            # Critique
            critique_prompt = Message.user(
                f"Review the following output for errors, gaps, or improvements. "
                f"If the output is satisfactory, respond with '{self._stop_phrase}'. "
                f"Otherwise, provide specific feedback.\n\n"
                f"Output to review:\n{current_output}"
            )
            critique_result = await self._critic.run([critique_prompt])
            messages.append(critique_result)

            critique_text = critique_result.content

            if self._stop_phrase in critique_text:
                break

        return Result(
            output=current_output,
            messages=messages,
            metadata={
                "rounds": round_num,
                "max_rounds": self._max_rounds,
                "early_stop": self._stop_phrase in messages[-1].content,
            },
        )

pyagent_patterns.resolution.cross_reflection.CrossReflection

Bases: Pattern

Peer review: one agent generates, another reviews, generator revises.

Parameters:

Name Type Description Default
generator Agent

Agent that produces the initial output and revisions.

required
reviewer Agent

Agent that reviews and provides feedback.

required
max_rounds int

Maximum number of generate-review-revise cycles.

2
stop_phrase str

If reviewer says this, stop early.

'APPROVED'
Source code in packages/pyagent-patterns/src/pyagent_patterns/resolution/cross_reflection.py
class CrossReflection(Pattern):
    """Peer review: one agent generates, another reviews, generator revises.

    Args:
        generator: Agent that produces the initial output and revisions.
        reviewer: Agent that reviews and provides feedback.
        max_rounds: Maximum number of generate-review-revise cycles.
        stop_phrase: If reviewer says this, stop early.
    """

    def __init__(
        self,
        generator: Agent,
        reviewer: Agent,
        max_rounds: int = 2,
        stop_phrase: str = "APPROVED",
    ) -> None:
        self._generator = generator
        self._reviewer = reviewer
        self._max_rounds = max_rounds
        self._stop_phrase = stop_phrase

    @property
    def pattern_type(self) -> str:
        return "cross_reflection"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        current_output = ""
        review_text = ""

        for round_num in range(1, self._max_rounds + 1):
            # Generate or revise
            if round_num == 1:
                gen_prompt = Message.user(ctx.task)
            else:
                gen_prompt = Message.user(
                    f"Revise based on peer feedback:\n\n"
                    f"Your output:\n{current_output}\n\n"
                    f"Peer feedback:\n{review_text}"
                )
            gen_result = await self._generator.run([gen_prompt])
            messages.append(gen_result)
            current_output = gen_result.content

            # Peer review
            review_prompt = Message.user(
                f"Review this output from your peer. Provide constructive feedback. "
                f"If the output is satisfactory, respond with '{self._stop_phrase}'.\n\n"
                f"{current_output}"
            )
            review_result = await self._reviewer.run([review_prompt])
            messages.append(review_result)

            review_text = review_result.content

            if self._stop_phrase in review_text:
                break

        return Result(
            output=current_output,
            messages=messages,
            metadata={
                "rounds": round_num,
                "generator": self._generator.name,
                "reviewer": self._reviewer.name,
            },
        )

pyagent_patterns.resolution.debate.Debate

Bases: Pattern

Structured adversarial debate with judge resolution.

Parameters:

Name Type Description Default
debaters list[Agent]

List of agents, each arguing a different position.

required
judge Agent

Agent that evaluates arguments and renders final decision.

required
rounds int

Number of argumentation rounds.

3
positions list[str] | None

Optional list of position labels (e.g., ["BUY", "SELL"]). If not provided, positions are assigned as "Position 1", "Position 2", etc.

None
Source code in packages/pyagent-patterns/src/pyagent_patterns/resolution/debate.py
class Debate(Pattern):
    """Structured adversarial debate with judge resolution.

    Args:
        debaters: List of agents, each arguing a different position.
        judge: Agent that evaluates arguments and renders final decision.
        rounds: Number of argumentation rounds.
        positions: Optional list of position labels (e.g., ["BUY", "SELL"]).
            If not provided, positions are assigned as "Position 1", "Position 2", etc.
    """

    def __init__(
        self,
        debaters: list[Agent],
        judge: Agent,
        rounds: int = 3,
        positions: list[str] | None = None,
    ) -> None:
        if len(debaters) < 2:
            raise ValueError("Debate requires at least 2 debaters")
        self._debaters = debaters
        self._judge = judge
        self._rounds = rounds
        self._positions = positions or [f"Position {i + 1}" for i in range(len(debaters))]

    @property
    def pattern_type(self) -> str:
        return "debate"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        debate_log: list[dict[str, str]] = []
        round_args_prev: list[str] = []

        for round_num in range(1, self._rounds + 1):
            round_args: list[str] = []

            for i, debater in enumerate(self._debaters):
                position = self._positions[i]
                if round_num == 1:
                    prompt = Message.user(
                        f"You are arguing for '{position}' on this topic: {ctx.task}\n"
                        f"Present your opening argument."
                    )
                else:
                    opponent_args = "\n".join(
                        f"- {self._positions[j]}: {a}"
                        for j, a in enumerate(round_args_prev)
                        if j != i
                    )
                    prompt = Message.user(
                        f"You are arguing for '{position}'. Round {round_num}.\n"
                        f"Counter these arguments:\n{opponent_args}\n\n"
                        f"Strengthen your position."
                    )

                arg_result = await debater.run([prompt])
                messages.append(arg_result)
                round_args.append(arg_result.content)
                debate_log.append(
                    {
                        "round": round_num,
                        "debater": debater.name,
                        "position": position,
                        "argument": arg_result.content,
                    }
                )

            round_args_prev = round_args

        # Judge evaluates all arguments
        full_debate = "\n\n".join(
            f"[Round {entry['round']}] {entry['position']} ({entry['debater']}):\n{entry['argument']}"
            for entry in debate_log
        )
        judge_prompt = Message.user(
            f"You are the judge. Evaluate these arguments and render a final decision.\n"
            f"Topic: {ctx.task}\n\n{full_debate}\n\n"
            f"State your decision and reasoning."
        )
        verdict = await self._judge.run([judge_prompt])
        messages.append(verdict)

        return Result(
            output=verdict.content,
            messages=messages,
            metadata={
                "rounds": self._rounds,
                "debaters": [d.name for d in self._debaters],
                "positions": self._positions,
                "debate_log": debate_log,
            },
        )

pyagent_patterns.resolution.voting.Voting

Bases: Pattern

Independent voting with configurable aggregation strategy.

Parameters:

Name Type Description Default
voters list[Agent]

List of agents that each cast a vote.

required
strategy VotingStrategy

Voting strategy (majority or weighted).

MAJORITY
weights list[float] | None

Optional per-agent weights for weighted voting. Must match length of voters. Defaults to equal weights.

None
normalize bool

If True, ask each voter to respond with a concise answer suitable for comparison.

True
Source code in packages/pyagent-patterns/src/pyagent_patterns/resolution/voting.py
class Voting(Pattern):
    """Independent voting with configurable aggregation strategy.

    Args:
        voters: List of agents that each cast a vote.
        strategy: Voting strategy (majority or weighted).
        weights: Optional per-agent weights for weighted voting.
            Must match length of voters. Defaults to equal weights.
        normalize: If True, ask each voter to respond with a concise answer
            suitable for comparison.
    """

    def __init__(
        self,
        voters: list[Agent],
        strategy: VotingStrategy = VotingStrategy.MAJORITY,
        weights: list[float] | None = None,
        normalize: bool = True,
    ) -> None:
        if len(voters) < 2:
            raise ValueError("Voting requires at least 2 voters")
        self._voters = voters
        self._strategy = strategy
        self._weights = weights or [1.0] * len(voters)
        self._normalize = normalize

    @property
    def pattern_type(self) -> str:
        return "voting"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []

        suffix = ""
        if self._normalize:
            suffix = (
                "\n\nRespond with a concise answer (one word or short phrase) first, then explain."
            )

        # All voters run in parallel
        tasks = [voter.run([Message.user(ctx.task + suffix)]) for voter in self._voters]
        votes = await asyncio.gather(*tasks)
        messages.extend(votes)

        # Extract vote labels (first line of each response)
        vote_labels = [v.content.strip().split("\n")[0].strip() for v in votes]

        # Tally
        if self._strategy == VotingStrategy.MAJORITY:
            counter = Counter(vote_labels)
            winner = counter.most_common(1)[0][0]
            tally = dict(counter)
        else:
            # Weighted voting
            weighted_counts: dict[str, float] = {}
            for label, weight in zip(vote_labels, self._weights, strict=False):
                weighted_counts[label] = weighted_counts.get(label, 0.0) + weight
            winner = max(weighted_counts, key=weighted_counts.get)  # type: ignore[arg-type]
            tally = weighted_counts

        return Result(
            output=winner,
            messages=messages,
            metadata={
                "strategy": self._strategy.value,
                "votes": vote_labels,
                "tally": tally,
                "winner": winner,
                "voter_names": [v.name for v in self._voters],
            },
        )

pyagent_patterns.resolution.evaluator_optimizer.EvaluatorOptimizer

Bases: Pattern

Generate → evaluate → revise loop with explicit evaluation criteria.

Parameters:

Name Type Description Default
generator Agent

Agent that produces and revises output.

required
evaluator Agent

Agent that scores output against criteria.

required
criteria list[str] | None

List of evaluation criteria the evaluator checks.

None
max_rounds int

Maximum optimization rounds.

3
pass_threshold int

Score (1-10) at which the output is considered acceptable.

7
Source code in packages/pyagent-patterns/src/pyagent_patterns/resolution/evaluator_optimizer.py
class EvaluatorOptimizer(Pattern):
    """Generate → evaluate → revise loop with explicit evaluation criteria.

    Args:
        generator: Agent that produces and revises output.
        evaluator: Agent that scores output against criteria.
        criteria: List of evaluation criteria the evaluator checks.
        max_rounds: Maximum optimization rounds.
        pass_threshold: Score (1-10) at which the output is considered acceptable.
    """

    def __init__(
        self,
        generator: Agent,
        evaluator: Agent,
        criteria: list[str] | None = None,
        max_rounds: int = 3,
        pass_threshold: int = 7,
    ) -> None:
        self._generator = generator
        self._evaluator = evaluator
        self._criteria = criteria or ["correctness", "clarity", "completeness"]
        self._max_rounds = max_rounds
        self._pass_threshold = pass_threshold

    @property
    def pattern_type(self) -> str:
        return "evaluator_optimizer"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        current_output = ""
        scores: list[int] = []
        eval_text = ""

        criteria_text = "\n".join(f"- {c}" for c in self._criteria)

        for round_num in range(1, self._max_rounds + 1):
            # Generate or revise
            if round_num == 1:
                gen_prompt = Message.user(ctx.task)
            else:
                gen_prompt = Message.user(
                    f"Revise your output based on evaluator feedback:\n\n"
                    f"Previous output:\n{current_output}\n\n"
                    f"Feedback:\n{eval_text}"
                )
            gen_result = await self._generator.run([gen_prompt])
            messages.append(gen_result)
            current_output = gen_result.content

            # Evaluate
            eval_prompt = Message.user(
                f"Evaluate this output against these criteria:\n{criteria_text}\n\n"
                f"Output to evaluate:\n{current_output}\n\n"
                f"Provide a score (1-10) and specific feedback for improvement. "
                f"Format: SCORE: N\nFEEDBACK: ..."
            )
            eval_result = await self._evaluator.run([eval_prompt])
            messages.append(eval_result)

            eval_text = eval_result.content

            # Parse score
            score = self._parse_score(eval_text)
            scores.append(score)

            if score >= self._pass_threshold:
                break

        return Result(
            output=current_output,
            messages=messages,
            metadata={
                "rounds": round_num,
                "scores": scores,
                "final_score": scores[-1] if scores else 0,
                "passed": scores[-1] >= self._pass_threshold if scores else False,
                "criteria": self._criteria,
            },
        )

    @staticmethod
    def _parse_score(content: str) -> int:
        """Extract numeric score from evaluator response."""
        for line in content.split("\n"):
            line = line.strip().upper()
            if line.startswith("SCORE:"):
                try:
                    return int(line.split(":")[1].strip().split()[0])
                except (ValueError, IndexError):
                    pass
        # Fallback: look for any number between 1-10
        import re

        numbers = re.findall(r"\b([1-9]|10)\b", content)
        return int(numbers[0]) if numbers else 5

Structural (Tier 3)

pyagent_patterns.structural.role_based.RoleBased

Bases: Pattern

Agents with distinct roles collaborate in structured rounds.

Parameters:

Name Type Description Default
agents list[Agent]

List of role-specialized agents (order matters for turn-taking).

required
rounds int

Number of communication rounds.

1
shared_context bool

If True, all agents see all prior messages. If False, each agent only sees the immediately preceding message.

True
Source code in packages/pyagent-patterns/src/pyagent_patterns/structural/role_based.py
class RoleBased(Pattern):
    """Agents with distinct roles collaborate in structured rounds.

    Args:
        agents: List of role-specialized agents (order matters for turn-taking).
        rounds: Number of communication rounds.
        shared_context: If True, all agents see all prior messages. If False,
            each agent only sees the immediately preceding message.
    """

    def __init__(
        self,
        agents: list[Agent],
        rounds: int = 1,
        shared_context: bool = True,
    ) -> None:
        self._agents = agents
        self._rounds = rounds
        self._shared_context = shared_context

    @property
    def pattern_type(self) -> str:
        return "role_based"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        conversation: list[Message] = [Message.user(ctx.task)]

        for _round_num in range(1, self._rounds + 1):
            for agent in self._agents:
                if self._shared_context:
                    input_msgs = list(conversation)
                else:
                    input_msgs = [conversation[-1]] if conversation else [Message.user(ctx.task)]

                response = await agent.run(input_msgs)
                messages.append(response)
                conversation.append(response)

        return Result(
            output=conversation[-1].content,
            messages=messages,
            metadata={
                "rounds": self._rounds,
                "roles": [a.name for a in self._agents],
                "shared_context": self._shared_context,
            },
        )

pyagent_patterns.structural.layered.Layered

Bases: Pattern

Hierarchical layers of agents with increasing abstraction.

Parameters:

Name Type Description Default
layers list[Layer]

Ordered list of layers from bottom (data) to top (synthesis).

required
Source code in packages/pyagent-patterns/src/pyagent_patterns/structural/layered.py
class Layered(Pattern):
    """Hierarchical layers of agents with increasing abstraction.

    Args:
        layers: Ordered list of layers from bottom (data) to top (synthesis).
    """

    def __init__(self, layers: list[Layer]) -> None:
        if not layers:
            raise ValueError("Layered requires at least one layer")
        self._layers = layers

    @property
    def pattern_type(self) -> str:
        return "layered"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        layer_input = ctx.task

        for _i, layer in enumerate(self._layers):
            # Run all agents in this layer in parallel
            tasks = [agent.run([Message.user(layer_input)]) for agent in layer.agents]
            layer_results = await asyncio.gather(*tasks)
            messages.extend(layer_results)

            # Combine layer output as input for next layer
            if len(layer_results) == 1:
                layer_input = layer_results[0].content
            else:
                layer_input = "\n\n".join(
                    f"[{layer.agents[j].name}]: {r.content}" for j, r in enumerate(layer_results)
                )

        return Result(
            output=layer_input,
            messages=messages,
            metadata={
                "layer_count": len(self._layers),
                "layer_names": [layer.name for layer in self._layers],
                "agents_per_layer": [len(layer.agents) for layer in self._layers],
            },
        )

pyagent_patterns.structural.topology.Topology

Bases: Pattern

Configurable agent communication topology.

Parameters:

Name Type Description Default
agents list[Agent]

List of agents participating in the topology.

required
topology TopologyType

The communication structure (chain, star, mesh).

CHAIN
hub_index int

For star topology, the index of the hub agent. Defaults to 0.

0
rounds int

For mesh topology, number of communication rounds.

1
Source code in packages/pyagent-patterns/src/pyagent_patterns/structural/topology.py
class Topology(Pattern):
    """Configurable agent communication topology.

    Args:
        agents: List of agents participating in the topology.
        topology: The communication structure (chain, star, mesh).
        hub_index: For star topology, the index of the hub agent. Defaults to 0.
        rounds: For mesh topology, number of communication rounds.
    """

    def __init__(
        self,
        agents: list[Agent],
        topology: TopologyType = TopologyType.CHAIN,
        hub_index: int = 0,
        rounds: int = 1,
    ) -> None:
        if len(agents) < 2:
            raise ValueError("Topology requires at least 2 agents")
        self._agents = agents
        self._topology = topology
        self._hub_index = hub_index
        self._rounds = rounds

    @property
    def pattern_type(self) -> str:
        return f"topology_{self._topology.value}"

    async def _execute(self, ctx: Context) -> Result:
        if self._topology == TopologyType.CHAIN:
            return await self._chain(ctx)
        elif self._topology == TopologyType.STAR:
            return await self._star(ctx)
        else:
            return await self._mesh(ctx)

    async def _chain(self, ctx: Context) -> Result:
        """A→B→C: sequential processing."""
        messages: list[Message] = []
        current = ctx.task

        for agent in self._agents:
            result = await agent.run([Message.user(current)])
            messages.append(result)
            current = result.content

        return Result(output=current, messages=messages, metadata={"topology": "chain"})

    async def _star(self, ctx: Context) -> Result:
        """Hub↔{spokes}: hub collects from all spokes."""
        messages: list[Message] = []
        hub = self._agents[self._hub_index]
        spokes = [a for i, a in enumerate(self._agents) if i != self._hub_index]

        # Spokes process in parallel
        tasks = [spoke.run([Message.user(ctx.task)]) for spoke in spokes]
        spoke_results = await asyncio.gather(*tasks)
        messages.extend(spoke_results)

        # Hub synthesizes
        summary = "\n".join(f"- {spokes[i].name}: {r.content}" for i, r in enumerate(spoke_results))
        hub_result = await hub.run(
            [Message.user(f"Synthesize these inputs:\n{summary}\n\nOriginal task: {ctx.task}")]
        )
        messages.append(hub_result)

        return Result(output=hub_result.content, messages=messages, metadata={"topology": "star"})

    async def _mesh(self, ctx: Context) -> Result:
        """Full mesh: every agent sees every other agent's output per round."""
        messages: list[Message] = []
        outputs = {a.name: "" for a in self._agents}

        # Initial round
        tasks = [a.run([Message.user(ctx.task)]) for a in self._agents]
        initial = await asyncio.gather(*tasks)
        for agent, result in zip(self._agents, initial, strict=False):
            outputs[agent.name] = result.content
            messages.append(result)

        # Subsequent mesh rounds
        for _ in range(1, self._rounds + 1):
            for agent in self._agents:
                peer_outputs = "\n".join(
                    f"- {name}: {content}"
                    for name, content in outputs.items()
                    if name != agent.name
                )
                result = await agent.run(
                    [
                        Message.user(
                            f"Task: {ctx.task}\n\nPeer outputs:\n{peer_outputs}\n\nProvide your updated response."
                        )
                    ]
                )
                outputs[agent.name] = result.content
                messages.append(result)

        # Final output is concatenation of all agent outputs
        final = "\n\n".join(f"[{name}]: {content}" for name, content in outputs.items())
        return Result(
            output=final, messages=messages, metadata={"topology": "mesh", "rounds": self._rounds}
        )

pyagent_patterns.structural.blackboard.Blackboard

Bases: Pattern

Shared-state communication via a blackboard store.

Parameters:

Name Type Description Default
agents list[BlackboardAgent]

List of BlackboardAgent wrappers specifying read/write keys.

required
rounds int

Number of rounds agents process the blackboard.

1
initial_state dict[str, Any] | None

Optional initial blackboard values.

None
Source code in packages/pyagent-patterns/src/pyagent_patterns/structural/blackboard.py
class Blackboard(Pattern):
    """Shared-state communication via a blackboard store.

    Args:
        agents: List of BlackboardAgent wrappers specifying read/write keys.
        rounds: Number of rounds agents process the blackboard.
        initial_state: Optional initial blackboard values.
    """

    def __init__(
        self,
        agents: list[BlackboardAgent],
        rounds: int = 1,
        initial_state: dict[str, Any] | None = None,
    ) -> None:
        self._agents = agents
        self._rounds = rounds
        self._initial_state = initial_state or {}

    @property
    def pattern_type(self) -> str:
        return "blackboard"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        board = BlackboardState()

        # Initialize blackboard
        board.write("task", ctx.task, "system")
        for key, value in self._initial_state.items():
            board.write(key, value, "system")

        for _round_num in range(1, self._rounds + 1):
            for ba in self._agents:
                # Build prompt from readable keys
                readable = {k: board.read(k) for k in ba.reads if board.read(k) is not None}
                prompt = (
                    f"Task: {ctx.task}\n\n"
                    f"Blackboard state (your readable keys):\n"
                    + "\n".join(f"  {k}: {v}" for k, v in readable.items())
                    + f"\n\nYou must produce values for: {', '.join(ba.writes)}\n"
                    f"Respond with one value per line in format KEY: VALUE"
                )

                result = await ba.agent.run([Message.user(prompt)])
                messages.append(result)

                # Parse and write results to blackboard
                for line in result.content.split("\n"):
                    if ":" in line:
                        key, _, value = line.partition(":")
                        key = key.strip().lower().replace(" ", "_")
                        if key in ba.writes:
                            board.write(key, value.strip(), ba.agent.name)

        return Result(
            output=str(board.snapshot()),
            messages=messages,
            metadata={
                "rounds": self._rounds,
                "final_state": board.snapshot(),
                "agents": [a.agent.name for a in self._agents],
            },
        )

Advanced (Tier 4)

pyagent_patterns.advanced.talker_reasoner.TalkerReasoner

Bases: Pattern

Dual-process: fast intuition (System 1) + slow deliberation (System 2).

Parameters:

Name Type Description Default
talker Agent

Fast, cheap agent for routine queries (System 1).

required
reasoner Agent

Slow, expensive agent for complex queries (System 2).

required
classifier Agent | None

Optional agent that decides talker vs reasoner. If None, always starts with talker and escalates on uncertainty keywords.

None
complexity_threshold list[str] | None

Keywords in talker output that trigger escalation to reasoner.

None
Source code in packages/pyagent-patterns/src/pyagent_patterns/advanced/talker_reasoner.py
class TalkerReasoner(Pattern):
    """Dual-process: fast intuition (System 1) + slow deliberation (System 2).

    Args:
        talker: Fast, cheap agent for routine queries (System 1).
        reasoner: Slow, expensive agent for complex queries (System 2).
        classifier: Optional agent that decides talker vs reasoner.
            If None, always starts with talker and escalates on uncertainty keywords.
        complexity_threshold: Keywords in talker output that trigger escalation to reasoner.
    """

    def __init__(
        self,
        talker: Agent,
        reasoner: Agent,
        classifier: Agent | None = None,
        complexity_threshold: list[str] | None = None,
    ) -> None:
        self._talker = talker
        self._reasoner = reasoner
        self._classifier = classifier
        self._complexity_keywords = complexity_threshold or [
            "I'm not sure",
            "I don't know",
            "complex",
            "need to think",
            "uncertain",
            "ESCALATE",
        ]

    @property
    def pattern_type(self) -> str:
        return "talker_reasoner"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []

        if self._classifier:
            # Use classifier to decide
            classify_prompt = Message.user(
                f"Is this query simple or complex? "
                f"Respond with exactly 'SIMPLE' or 'COMPLEX'.\n\n"
                f"Query: {ctx.task}"
            )
            classification = await self._classifier.run([classify_prompt])
            messages.append(classification)
            use_reasoner = "COMPLEX" in classification.content.upper()
        else:
            use_reasoner = False

        if not use_reasoner:
            # System 1: fast talker
            talker_result = await self._talker.run(ctx.messages)
            messages.append(talker_result)

            # Check if talker signals uncertainty → escalate to reasoner
            should_escalate = any(
                kw.lower() in talker_result.content.lower() for kw in self._complexity_keywords
            )

            if should_escalate:
                use_reasoner = True
            else:
                return Result(
                    output=talker_result.content,
                    messages=messages,
                    metadata={"system": "talker", "escalated": False},
                )

        # System 2: slow reasoner
        reasoner_result = await self._reasoner.run(ctx.messages)
        messages.append(reasoner_result)

        return Result(
            output=reasoner_result.content,
            messages=messages,
            metadata={"system": "reasoner", "escalated": not bool(self._classifier)},
        )

pyagent_patterns.advanced.swarm.Swarm

Bases: Pattern

Decentralized swarm: agents interact locally, behavior emerges globally.

Parameters:

Name Type Description Default
agents list[Agent]

Pool of swarm agents (all follow same local rules).

required
rounds int

Number of interaction rounds.

3
neighbor_count int

How many random peers each agent interacts with per round.

2
aggregation str

How to produce final output from swarm state. "last" = last round's outputs, "vote" = majority vote.

'last'
Source code in packages/pyagent-patterns/src/pyagent_patterns/advanced/swarm.py
class Swarm(Pattern):
    """Decentralized swarm: agents interact locally, behavior emerges globally.

    Args:
        agents: Pool of swarm agents (all follow same local rules).
        rounds: Number of interaction rounds.
        neighbor_count: How many random peers each agent interacts with per round.
        aggregation: How to produce final output from swarm state.
            "last" = last round's outputs, "vote" = majority vote.
    """

    def __init__(
        self,
        agents: list[Agent],
        rounds: int = 3,
        neighbor_count: int = 2,
        aggregation: str = "last",
    ) -> None:
        if len(agents) < 2:
            raise ValueError("Swarm requires at least 2 agents")
        self._agents = agents
        self._rounds = rounds
        self._neighbor_count = min(neighbor_count, len(agents) - 1)
        self._aggregation = aggregation

    @property
    def pattern_type(self) -> str:
        return "swarm"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        # Initialize each agent's state
        states: dict[str, str] = {}

        # Round 0: each agent independently responds to the task
        init_tasks = [agent.run([Message.user(ctx.task)]) for agent in self._agents]
        init_results = await asyncio.gather(*init_tasks)
        for agent, result in zip(self._agents, init_results, strict=False):
            states[agent.name] = result.content
            messages.append(result)

        # Subsequent rounds: agents interact with random neighbors
        for _round_num in range(1, self._rounds + 1):
            new_states: dict[str, str] = {}

            async def _update_agent(
                agent: Agent, _states: dict[str, str] = states
            ) -> tuple[str, str]:
                # Select random neighbors
                others = [a for a in self._agents if a.name != agent.name]
                neighbors = random.sample(others, min(self._neighbor_count, len(others)))

                neighbor_views = "\n".join(f"- {n.name}: {_states[n.name]}" for n in neighbors)
                prompt = Message.user(
                    f"Task: {ctx.task}\n\n"
                    f"Your current response: {_states[agent.name]}\n\n"
                    f"Neighbor responses:\n{neighbor_views}\n\n"
                    f"Update your response considering your neighbors' views."
                )
                result = await agent.run([prompt])
                return agent.name, result.content

            update_tasks = [_update_agent(agent) for agent in self._agents]
            updates = await asyncio.gather(*update_tasks)
            for name, content in updates:
                new_states[name] = content
                messages.append(Message.assistant(content, name=name))

            states = new_states

        # Aggregate final output
        if self._aggregation == "vote":
            from collections import Counter

            first_lines = [s.split("\n")[0].strip() for s in states.values()]
            winner = Counter(first_lines).most_common(1)[0][0]
            output = winner
        else:
            output = "\n\n".join(f"[{name}]: {content}" for name, content in states.items())

        return Result(
            output=output,
            messages=messages,
            metadata={
                "agents": len(self._agents),
                "rounds": self._rounds,
                "aggregation": self._aggregation,
                "final_states": states,
            },
        )

pyagent_patterns.advanced.human_in_the_loop.HumanInTheLoop

Bases: Pattern

Agent with human approval gate.

Parameters:

Name Type Description Default
agent Agent

The LLM agent that processes the task.

required
review_fn HumanReviewFn

Callable that presents output to human and returns decision. Signature: (output: str, metadata: dict) -> HumanDecision

auto_approve
max_revisions int

Maximum number of revision attempts after rejection.

3
Source code in packages/pyagent-patterns/src/pyagent_patterns/advanced/human_in_the_loop.py
class HumanInTheLoop(Pattern):
    """Agent with human approval gate.

    Args:
        agent: The LLM agent that processes the task.
        review_fn: Callable that presents output to human and returns decision.
            Signature: (output: str, metadata: dict) -> HumanDecision
        max_revisions: Maximum number of revision attempts after rejection.
    """

    def __init__(
        self,
        agent: Agent,
        review_fn: HumanReviewFn = auto_approve,
        max_revisions: int = 3,
    ) -> None:
        self._agent = agent
        self._review_fn = review_fn
        self._max_revisions = max_revisions

    @property
    def pattern_type(self) -> str:
        return "human_in_the_loop"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []

        # Initial agent response
        response = await self._agent.run(ctx.messages)
        messages.append(response)
        current_output = response.content

        for revision in range(self._max_revisions + 1):
            # Human review
            decision = self._review_fn(current_output, {"revision": revision, "task": ctx.task})

            if decision.approved:
                final_output = decision.modified_output or current_output
                return Result(
                    output=final_output,
                    messages=messages,
                    metadata={
                        "approved": True,
                        "revisions": revision,
                        "human_modified": decision.modified_output is not None,
                    },
                )

            if revision < self._max_revisions:
                # Revise based on human feedback
                revision_prompt = Message.user(
                    f"Revise your output based on this feedback:\n\n"
                    f"Your output:\n{current_output}\n\n"
                    f"Feedback:\n{decision.feedback}"
                )
                response = await self._agent.run([revision_prompt])
                messages.append(response)
                current_output = response.content

        # Max revisions exceeded — return last output with rejection flag
        return Result(
            output=current_output,
            messages=messages,
            metadata={"approved": False, "revisions": self._max_revisions},
        )

pyagent_patterns.advanced.react.ReAct

Bases: Pattern

Reasoning + Acting loop with tool use.

Parameters:

Name Type Description Default
agent Agent

The reasoning agent.

required
tools dict[str, ToolFn] | None

Mapping of tool names to callable functions.

None
max_steps int

Maximum number of Thought→Action→Observation cycles.

5
finish_token str

Token in agent response that signals task completion.

'FINISH'
Source code in packages/pyagent-patterns/src/pyagent_patterns/advanced/react.py
class ReAct(Pattern):
    """Reasoning + Acting loop with tool use.

    Args:
        agent: The reasoning agent.
        tools: Mapping of tool names to callable functions.
        max_steps: Maximum number of Thought→Action→Observation cycles.
        finish_token: Token in agent response that signals task completion.
    """

    def __init__(
        self,
        agent: Agent,
        tools: dict[str, ToolFn] | None = None,
        max_steps: int = 5,
        finish_token: str = "FINISH",
    ) -> None:
        self._agent = agent
        self._tools = tools or {}
        self._max_steps = max_steps
        self._finish_token = finish_token

    @property
    def pattern_type(self) -> str:
        return "react"

    async def _execute(self, ctx: Context) -> Result:
        messages: list[Message] = []
        trace: list[dict[str, Any]] = []

        tool_list = ", ".join(self._tools.keys()) if self._tools else "none"
        system_prompt = (
            f"You are a ReAct agent. Available tools: {tool_list}\n\n"
            f"On each step, respond in this EXACT format:\n"
            f"Thought: [your reasoning]\n"
            f"Action: [tool_name(input)] OR {self._finish_token}[final answer]\n\n"
            f"After receiving an observation, continue reasoning.\n"
            f"When you have the final answer, use: {self._finish_token}[your answer]"
        )

        conversation: list[Message] = [
            Message.system(system_prompt),
            Message.user(ctx.task),
        ]

        for step in range(1, self._max_steps + 1):
            # Agent reasons and decides action
            response = await self._agent.run(conversation)
            messages.append(response)
            conversation.append(response)

            step_data: dict[str, Any] = {"step": step, "response": response.content}

            # Check for finish
            if self._finish_token in response.content:
                # Extract final answer
                parts = response.content.split(self._finish_token, 1)
                final_answer = parts[1].strip() if len(parts) > 1 else response.content
                step_data["action"] = "finish"
                trace.append(step_data)
                break

            # Parse action
            action_name, action_input = self._parse_action(response.content)
            step_data["action"] = action_name
            step_data["action_input"] = action_input

            # Execute tool
            if action_name and action_name in self._tools:
                try:
                    observation = self._tools[action_name](action_input)
                except Exception as e:
                    observation = f"Error: {e}"
                step_data["observation"] = observation
            else:
                observation = f"Unknown tool: {action_name}. Available: {tool_list}"
                step_data["observation"] = observation

            trace.append(step_data)

            # Feed observation back
            obs_msg = Message.user(f"Observation: {observation}")
            conversation.append(obs_msg)
            messages.append(obs_msg)
        else:
            final_answer = messages[-1].content if messages else ""

        return Result(
            output=final_answer,
            messages=messages,
            metadata={
                "steps": len(trace),
                "max_steps": self._max_steps,
                "trace": trace,
                "tools_used": [t["action"] for t in trace if t.get("action") != "finish"],
            },
        )

    @staticmethod
    def _parse_action(content: str) -> tuple[str | None, str]:
        """Parse 'Action: tool_name(input)' from agent response."""
        for line in content.split("\n"):
            line = line.strip()
            if line.lower().startswith("action:"):
                action_text = line.split(":", 1)[1].strip()
                # Parse tool_name(input)
                if "(" in action_text and action_text.endswith(")"):
                    name = action_text[: action_text.index("(")]
                    inp = action_text[action_text.index("(") + 1 : -1]
                    return name.strip(), inp.strip().strip('"').strip("'")
                return action_text, ""
        return None, ""

Composite

pyagent_patterns.composite.CompositePattern

Bases: Pattern

Chain multiple patterns with escalation on quality failure.

Parameters:

Name Type Description Default
patterns list[Pattern]

Ordered list of patterns to try.

required
quality_check QualityCheckFn

Function that evaluates whether a pattern's result is acceptable. If it returns False, the next pattern is tried.

always_pass
combine_results bool

If True, passes previous output as context to next pattern.

True
Source code in packages/pyagent-patterns/src/pyagent_patterns/composite.py
class CompositePattern(Pattern):
    """Chain multiple patterns with escalation on quality failure.

    Args:
        patterns: Ordered list of patterns to try.
        quality_check: Function that evaluates whether a pattern's result
            is acceptable. If it returns False, the next pattern is tried.
        combine_results: If True, passes previous output as context to next pattern.
    """

    def __init__(
        self,
        patterns: list[Pattern],
        quality_check: QualityCheckFn = always_pass,
        combine_results: bool = True,
    ) -> None:
        if not patterns:
            raise ValueError("CompositePattern requires at least one pattern")
        self._patterns = patterns
        self._quality_check = quality_check
        self._combine_results = combine_results

    @property
    def pattern_type(self) -> str:
        types = [p.pattern_type for p in self._patterns]
        return f"composite({'+'.join(types)})"

    async def _execute(self, ctx: Context) -> Result:
        all_messages = []
        escalation_log = []

        for i, pattern in enumerate(self._patterns):
            # Create child context for each pattern
            child_ctx = ctx.child()

            result = await pattern._execute(child_ctx)
            all_messages.extend(result.messages)
            escalation_log.append(
                {
                    "pattern": pattern.pattern_type,
                    "output_length": len(result.output),
                    "metadata": result.metadata,
                }
            )

            if self._quality_check(result):
                return Result(
                    output=result.output,
                    messages=all_messages,
                    metadata={
                        "escalation_level": i,
                        "pattern_used": pattern.pattern_type,
                        "escalation_log": escalation_log,
                        "total_patterns_tried": i + 1,
                    },
                )

            # If not last pattern and combining, update context with current output
            if self._combine_results and i < len(self._patterns) - 1:
                ctx.metadata["previous_output"] = result.output
                ctx.metadata["previous_pattern"] = pattern.pattern_type

        # All patterns tried, return last result
        return Result(
            output=result.output,
            messages=all_messages,
            metadata={
                "escalation_level": len(self._patterns) - 1,
                "pattern_used": self._patterns[-1].pattern_type,
                "escalation_log": escalation_log,
                "total_patterns_tried": len(self._patterns),
                "fully_escalated": True,
            },
        )

Recovery

pyagent_patterns.recovery.BoundedExecution dataclass

Wrap a pattern with resource limits.

Three-level recovery: 1. Retry with same pattern 2. Fallback to a cheaper/simpler pattern 3. Graceful degradation (return partial result)

Parameters:

Name Type Description Default
pattern Pattern

The primary pattern to execute.

required
fallback Pattern | None

Optional simpler pattern to use on failure.

None
max_retries int

Maximum retry attempts before escalating.

2
timeout_seconds float

Maximum wall-clock time for the entire execution.

300.0
max_tokens int

Maximum total tokens before stopping.

100000
Source code in packages/pyagent-patterns/src/pyagent_patterns/recovery.py
@dataclass
class BoundedExecution:
    """Wrap a pattern with resource limits.

    Three-level recovery:
    1. Retry with same pattern
    2. Fallback to a cheaper/simpler pattern
    3. Graceful degradation (return partial result)

    Args:
        pattern: The primary pattern to execute.
        fallback: Optional simpler pattern to use on failure.
        max_retries: Maximum retry attempts before escalating.
        timeout_seconds: Maximum wall-clock time for the entire execution.
        max_tokens: Maximum total tokens before stopping.
    """

    pattern: Pattern
    fallback: Pattern | None = None
    max_retries: int = 2
    timeout_seconds: float = 300.0
    max_tokens: int = 100_000

    async def run(self, task: str, context: Context | None = None) -> Result:
        """Execute with bounded resources and three-level recovery."""
        start = time.perf_counter()

        # Level 1: Try primary pattern with retries
        last_error: Exception | None = None
        for attempt in range(1, self.max_retries + 1):
            elapsed = time.perf_counter() - start
            if elapsed > self.timeout_seconds:
                break

            try:
                remaining = self.timeout_seconds - elapsed
                result = await asyncio.wait_for(
                    self.pattern.run(task, context),
                    timeout=remaining,
                )
                if result.token_estimate <= self.max_tokens:
                    result.metadata["recovery_level"] = 0
                    result.metadata["attempts"] = attempt
                    return result
                # Token limit exceeded — try next level
                last_error = Exception(
                    f"Token limit exceeded: {result.token_estimate}/{self.max_tokens}"
                )
                break
            except TimeoutError:
                last_error = TimeoutError(f"Timeout after {self.timeout_seconds}s")
                break
            except Exception as e:
                last_error = e
                continue

        # Level 2: Fallback pattern
        if self.fallback:
            try:
                remaining = max(0.0, self.timeout_seconds - (time.perf_counter() - start))
                result = await asyncio.wait_for(
                    self.fallback.run(task, context),
                    timeout=remaining if remaining > 0 else 30.0,
                )
                result.metadata["recovery_level"] = 1
                result.metadata["fallback_reason"] = str(last_error)
                return result
            except Exception:
                pass

        # Level 3: Graceful degradation
        return Result(
            output=f"[Degraded] Unable to complete task after {self.max_retries} attempts. Last error: {last_error}",
            metadata={
                "recovery_level": 2,
                "degraded": True,
                "last_error": str(last_error),
            },
        )

run(task, context=None) async

Execute with bounded resources and three-level recovery.

Source code in packages/pyagent-patterns/src/pyagent_patterns/recovery.py
async def run(self, task: str, context: Context | None = None) -> Result:
    """Execute with bounded resources and three-level recovery."""
    start = time.perf_counter()

    # Level 1: Try primary pattern with retries
    last_error: Exception | None = None
    for attempt in range(1, self.max_retries + 1):
        elapsed = time.perf_counter() - start
        if elapsed > self.timeout_seconds:
            break

        try:
            remaining = self.timeout_seconds - elapsed
            result = await asyncio.wait_for(
                self.pattern.run(task, context),
                timeout=remaining,
            )
            if result.token_estimate <= self.max_tokens:
                result.metadata["recovery_level"] = 0
                result.metadata["attempts"] = attempt
                return result
            # Token limit exceeded — try next level
            last_error = Exception(
                f"Token limit exceeded: {result.token_estimate}/{self.max_tokens}"
            )
            break
        except TimeoutError:
            last_error = TimeoutError(f"Timeout after {self.timeout_seconds}s")
            break
        except Exception as e:
            last_error = e
            continue

    # Level 2: Fallback pattern
    if self.fallback:
        try:
            remaining = max(0.0, self.timeout_seconds - (time.perf_counter() - start))
            result = await asyncio.wait_for(
                self.fallback.run(task, context),
                timeout=remaining if remaining > 0 else 30.0,
            )
            result.metadata["recovery_level"] = 1
            result.metadata["fallback_reason"] = str(last_error)
            return result
        except Exception:
            pass

    # Level 3: Graceful degradation
    return Result(
        output=f"[Degraded] Unable to complete task after {self.max_retries} attempts. Last error: {last_error}",
        metadata={
            "recovery_level": 2,
            "degraded": True,
            "last_error": str(last_error),
        },
    )

pyagent_patterns.recovery.CircuitBreaker

Prevent cascading failures in multi-agent systems.

When a pattern fails repeatedly, the circuit opens and rejects requests immediately. After a reset timeout, it enters half-open state and allows one test request through.

Parameters:

Name Type Description Default
failure_threshold int

Number of consecutive failures before opening.

3
reset_timeout_seconds float

Seconds before transitioning from open to half-open.

60.0
fallback_result str

Result to return when circuit is open.

'[Circuit Open] Service temporarily unavailable.'
Source code in packages/pyagent-patterns/src/pyagent_patterns/recovery.py
class CircuitBreaker:
    """Prevent cascading failures in multi-agent systems.

    When a pattern fails repeatedly, the circuit opens and rejects
    requests immediately. After a reset timeout, it enters half-open
    state and allows one test request through.

    Args:
        failure_threshold: Number of consecutive failures before opening.
        reset_timeout_seconds: Seconds before transitioning from open to half-open.
        fallback_result: Result to return when circuit is open.
    """

    def __init__(
        self,
        failure_threshold: int = 3,
        reset_timeout_seconds: float = 60.0,
        fallback_result: str = "[Circuit Open] Service temporarily unavailable.",
    ) -> None:
        self._threshold = failure_threshold
        self._reset_timeout = reset_timeout_seconds
        self._fallback_result = fallback_result
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = 0.0

    @property
    def state(self) -> CircuitState:
        if (
            self._state == CircuitState.OPEN
            and time.time() - self._last_failure_time > self._reset_timeout
        ):
            self._state = CircuitState.HALF_OPEN
        return self._state

    async def execute(self, pattern: Pattern, task: str, context: Context | None = None) -> Result:
        """Execute a pattern through the circuit breaker."""
        current_state = self.state

        if current_state == CircuitState.OPEN:
            return Result(
                output=self._fallback_result,
                metadata={"circuit_state": "open", "failure_count": self._failure_count},
            )

        try:
            result = await pattern.run(task, context)
            self._on_success()
            result.metadata["circuit_state"] = self._state.value
            return result
        except Exception as e:
            self._on_failure()
            if self._state == CircuitState.OPEN:
                return Result(
                    output=self._fallback_result,
                    metadata={
                        "circuit_state": "open",
                        "failure_count": self._failure_count,
                        "last_error": str(e),
                    },
                )
            raise

    def _on_success(self) -> None:
        self._failure_count = 0
        self._state = CircuitState.CLOSED

    def _on_failure(self) -> None:
        self._failure_count += 1
        self._last_failure_time = time.time()
        if self._failure_count >= self._threshold:
            self._state = CircuitState.OPEN

execute(pattern, task, context=None) async

Execute a pattern through the circuit breaker.

Source code in packages/pyagent-patterns/src/pyagent_patterns/recovery.py
async def execute(self, pattern: Pattern, task: str, context: Context | None = None) -> Result:
    """Execute a pattern through the circuit breaker."""
    current_state = self.state

    if current_state == CircuitState.OPEN:
        return Result(
            output=self._fallback_result,
            metadata={"circuit_state": "open", "failure_count": self._failure_count},
        )

    try:
        result = await pattern.run(task, context)
        self._on_success()
        result.metadata["circuit_state"] = self._state.value
        return result
    except Exception as e:
        self._on_failure()
        if self._state == CircuitState.OPEN:
            return Result(
                output=self._fallback_result,
                metadata={
                    "circuit_state": "open",
                    "failure_count": self._failure_count,
                    "last_error": str(e),
                },
            )
        raise

Guardrails

pyagent_patterns.guardrails.GuardrailChain

Chain multiple guardrails together. All must pass.

Parameters:

Name Type Description Default
guardrails list[Guardrail]

List of guardrails to apply in order.

required
Source code in packages/pyagent-patterns/src/pyagent_patterns/guardrails.py
class GuardrailChain:
    """Chain multiple guardrails together. All must pass.

    Args:
        guardrails: List of guardrails to apply in order.
    """

    def __init__(self, guardrails: list[Guardrail]) -> None:
        self._guardrails = guardrails

    def check(self, content: str) -> GuardrailResult:
        current_content = content

        for guard in self._guardrails:
            result = guard.check(current_content)
            if not result.passed:
                return result
            if result.sanitized_content is not None:
                current_content = result.sanitized_content

        if current_content != content:
            return GuardrailResult(passed=True, sanitized_content=current_content)
        return GuardrailResult(passed=True)

pyagent_patterns.guardrails.LengthGuard

Bases: Guardrail

Reject messages exceeding a maximum length.

Parameters:

Name Type Description Default
max_chars int

Maximum allowed characters.

10000
truncate bool

If True, truncate instead of rejecting.

False
Source code in packages/pyagent-patterns/src/pyagent_patterns/guardrails.py
class LengthGuard(Guardrail):
    """Reject messages exceeding a maximum length.

    Args:
        max_chars: Maximum allowed characters.
        truncate: If True, truncate instead of rejecting.
    """

    def __init__(self, max_chars: int = 10000, truncate: bool = False) -> None:
        self._max_chars = max_chars
        self._truncate = truncate

    def check(self, content: str) -> GuardrailResult:
        if len(content) <= self._max_chars:
            return GuardrailResult(passed=True)
        if self._truncate:
            return GuardrailResult(
                passed=True,
                message=f"Truncated from {len(content)} to {self._max_chars} chars",
                sanitized_content=content[: self._max_chars] + "... [truncated]",
            )
        return GuardrailResult(
            passed=False,
            message=f"Content exceeds maximum length: {len(content)}/{self._max_chars} chars",
        )

pyagent_patterns.guardrails.PIIGuard

Bases: Guardrail

Detect and optionally redact personally identifiable information.

Detects: email addresses, phone numbers, SSNs, credit card numbers.

Parameters:

Name Type Description Default
redact bool

If True, redact PII and pass. If False, reject on detection.

True
Source code in packages/pyagent-patterns/src/pyagent_patterns/guardrails.py
class PIIGuard(Guardrail):
    """Detect and optionally redact personally identifiable information.

    Detects: email addresses, phone numbers, SSNs, credit card numbers.

    Args:
        redact: If True, redact PII and pass. If False, reject on detection.
    """

    _PATTERNS: ClassVar[dict[str, str]] = {
        "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
        "phone": r"\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
        "ssn": r"\b\d{3}[-]?\d{2}[-]?\d{4}\b",
        "credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
    }

    def __init__(self, redact: bool = True) -> None:
        self._redact = redact

    def check(self, content: str) -> GuardrailResult:
        detections: list[str] = []
        sanitized = content

        for pii_type, pattern in self._PATTERNS.items():
            matches = re.findall(pattern, content)
            if matches:
                detections.append(f"{pii_type}: {len(matches)} found")
                if self._redact:
                    sanitized = re.sub(pattern, f"[REDACTED-{pii_type.upper()}]", sanitized)

        if not detections:
            return GuardrailResult(passed=True)

        if self._redact:
            return GuardrailResult(
                passed=True,
                message=f"PII redacted: {', '.join(detections)}",
                sanitized_content=sanitized,
            )

        return GuardrailResult(
            passed=False,
            message=f"PII detected: {', '.join(detections)}",
        )

pyagent_patterns.guardrails.ContentGuard

Bases: Guardrail

Block content matching configurable deny patterns.

Parameters:

Name Type Description Default
deny_patterns list[str] | None

List of regex patterns that should be blocked.

None
deny_words list[str] | None

List of exact words/phrases to block.

None
Source code in packages/pyagent-patterns/src/pyagent_patterns/guardrails.py
class ContentGuard(Guardrail):
    """Block content matching configurable deny patterns.

    Args:
        deny_patterns: List of regex patterns that should be blocked.
        deny_words: List of exact words/phrases to block.
    """

    def __init__(
        self,
        deny_patterns: list[str] | None = None,
        deny_words: list[str] | None = None,
    ) -> None:
        self._patterns = [re.compile(p, re.IGNORECASE) for p in (deny_patterns or [])]
        self._words = [w.lower() for w in (deny_words or [])]

    def check(self, content: str) -> GuardrailResult:
        content_lower = content.lower()

        for word in self._words:
            if word in content_lower:
                return GuardrailResult(passed=False, message=f"Blocked word detected: '{word}'")

        for pattern in self._patterns:
            if pattern.search(content):
                return GuardrailResult(
                    passed=False, message=f"Blocked pattern matched: {pattern.pattern}"
                )

        return GuardrailResult(passed=True)

Advisor

pyagent_patterns.advisor.PatternAdvisor

Recommend patterns based on task description and constraints.

Usage

advisor = PatternAdvisor() rec = advisor.recommend("Write and review code", Constraints(quality=Quality.HIGH)) print(rec.pattern, rec.reason)

Source code in packages/pyagent-patterns/src/pyagent_patterns/advisor.py
class PatternAdvisor:
    """Recommend patterns based on task description and constraints.

    Usage:
        advisor = PatternAdvisor()
        rec = advisor.recommend("Write and review code", Constraints(quality=Quality.HIGH))
        print(rec.pattern, rec.reason)
    """

    def recommend(self, task: str, constraints: Constraints | None = None) -> Recommendation:
        """Recommend the best pattern for the given task and constraints."""
        c = constraints or Constraints()
        task_lower = task.lower()

        # Decision tree based on Augment 2026 "Five Decision Rules"

        # Rule 1: Simple single-step tasks → Pipeline or single agent
        if (
            not c.multi_step
            and c.quality in (Quality.DRAFT, Quality.STANDARD)
            and c.latency == Latency.REALTIME
        ):
            return Recommendation(
                pattern="pipeline",
                reason="Simple task with real-time latency → minimal sequential processing",
                estimated_calls=1,
                estimated_cost_range="$0.001-0.003",
                alternatives=["talker_reasoner"],
            )

        # Rule 2: Cost-sensitive → Route to cheapest viable model
        if c.max_cost_usd < 0.01:
            return Recommendation(
                pattern="talker_reasoner",
                reason="Tight budget → use cheap model for easy, expensive only when needed",
                estimated_calls=1,
                estimated_cost_range="$0.001-0.005",
                alternatives=["pipeline"],
            )

        # Rule 3: High reliability / fault tolerance → Voting or Fan-Out
        if c.fault_tolerant:
            return Recommendation(
                pattern="voting",
                reason="Fault-tolerant requirement → multiple independent agents with consensus",
                estimated_calls=3,
                estimated_cost_range="$0.006-0.012",
                alternatives=["fan_out_fan_in"],
            )

        # Rule 4: High quality → Reflection, Debate, or Evaluator
        if c.quality in (Quality.HIGH, Quality.CRITICAL):
            # Check for adversarial/debate keywords
            if any(
                w in task_lower for w in ["compare", "pros and cons", "debate", "argue", "versus"]
            ):
                return Recommendation(
                    pattern="debate",
                    reason="High quality + adversarial task → structured debate with judge",
                    estimated_calls=7,
                    estimated_cost_range="$0.014-0.028",
                    alternatives=["evaluator_optimizer", "cross_reflection"],
                )

            # Check for code/writing that benefits from review
            if any(w in task_lower for w in ["write", "code", "generate", "create", "draft"]):
                return Recommendation(
                    pattern="self_reflection",
                    reason="High quality creative/code task → generate-critique-refine loop",
                    estimated_calls=4,
                    estimated_cost_range="$0.004-0.012",
                    alternatives=["cross_reflection", "evaluator_optimizer"],
                )

            return Recommendation(
                pattern="evaluator_optimizer",
                reason="High quality task → explicit evaluation criteria with optimization",
                estimated_calls=4,
                estimated_cost_range="$0.004-0.008",
                alternatives=["self_reflection"],
            )

        # Rule 5: Multi-step/complex → Supervisor, Hierarchical, or Pipeline
        if c.multi_step or any(
            w in task_lower for w in ["steps", "process", "workflow", "pipeline"]
        ):
            if any(w in task_lower for w in ["team", "delegate", "manage", "coordinate"]):
                return Recommendation(
                    pattern="hierarchical",
                    reason="Multi-step with team coordination → hierarchical delegation",
                    estimated_calls=7,
                    estimated_cost_range="$0.010-0.020",
                    alternatives=["supervisor", "orchestrator_workers"],
                )

            if any(w in task_lower for w in ["classify", "route", "triage", "categorize"]):
                return Recommendation(
                    pattern="supervisor",
                    reason="Multi-step with classification → supervisor routes to specialists",
                    estimated_calls=3,
                    estimated_cost_range="$0.004-0.008",
                    alternatives=["pipeline"],
                )

            return Recommendation(
                pattern="pipeline",
                reason="Multi-step sequential task → stage-by-stage processing",
                estimated_calls=4,
                estimated_cost_range="$0.004-0.008",
                alternatives=["supervisor"],
            )

        # Default: Pipeline (safest general-purpose)
        return Recommendation(
            pattern="pipeline",
            reason="General-purpose task → sequential pipeline with composable stages",
            estimated_calls=2,
            estimated_cost_range="$0.002-0.004",
            alternatives=["supervisor", "self_reflection"],
        )

recommend(task, constraints=None)

Recommend the best pattern for the given task and constraints.

Source code in packages/pyagent-patterns/src/pyagent_patterns/advisor.py
def recommend(self, task: str, constraints: Constraints | None = None) -> Recommendation:
    """Recommend the best pattern for the given task and constraints."""
    c = constraints or Constraints()
    task_lower = task.lower()

    # Decision tree based on Augment 2026 "Five Decision Rules"

    # Rule 1: Simple single-step tasks → Pipeline or single agent
    if (
        not c.multi_step
        and c.quality in (Quality.DRAFT, Quality.STANDARD)
        and c.latency == Latency.REALTIME
    ):
        return Recommendation(
            pattern="pipeline",
            reason="Simple task with real-time latency → minimal sequential processing",
            estimated_calls=1,
            estimated_cost_range="$0.001-0.003",
            alternatives=["talker_reasoner"],
        )

    # Rule 2: Cost-sensitive → Route to cheapest viable model
    if c.max_cost_usd < 0.01:
        return Recommendation(
            pattern="talker_reasoner",
            reason="Tight budget → use cheap model for easy, expensive only when needed",
            estimated_calls=1,
            estimated_cost_range="$0.001-0.005",
            alternatives=["pipeline"],
        )

    # Rule 3: High reliability / fault tolerance → Voting or Fan-Out
    if c.fault_tolerant:
        return Recommendation(
            pattern="voting",
            reason="Fault-tolerant requirement → multiple independent agents with consensus",
            estimated_calls=3,
            estimated_cost_range="$0.006-0.012",
            alternatives=["fan_out_fan_in"],
        )

    # Rule 4: High quality → Reflection, Debate, or Evaluator
    if c.quality in (Quality.HIGH, Quality.CRITICAL):
        # Check for adversarial/debate keywords
        if any(
            w in task_lower for w in ["compare", "pros and cons", "debate", "argue", "versus"]
        ):
            return Recommendation(
                pattern="debate",
                reason="High quality + adversarial task → structured debate with judge",
                estimated_calls=7,
                estimated_cost_range="$0.014-0.028",
                alternatives=["evaluator_optimizer", "cross_reflection"],
            )

        # Check for code/writing that benefits from review
        if any(w in task_lower for w in ["write", "code", "generate", "create", "draft"]):
            return Recommendation(
                pattern="self_reflection",
                reason="High quality creative/code task → generate-critique-refine loop",
                estimated_calls=4,
                estimated_cost_range="$0.004-0.012",
                alternatives=["cross_reflection", "evaluator_optimizer"],
            )

        return Recommendation(
            pattern="evaluator_optimizer",
            reason="High quality task → explicit evaluation criteria with optimization",
            estimated_calls=4,
            estimated_cost_range="$0.004-0.008",
            alternatives=["self_reflection"],
        )

    # Rule 5: Multi-step/complex → Supervisor, Hierarchical, or Pipeline
    if c.multi_step or any(
        w in task_lower for w in ["steps", "process", "workflow", "pipeline"]
    ):
        if any(w in task_lower for w in ["team", "delegate", "manage", "coordinate"]):
            return Recommendation(
                pattern="hierarchical",
                reason="Multi-step with team coordination → hierarchical delegation",
                estimated_calls=7,
                estimated_cost_range="$0.010-0.020",
                alternatives=["supervisor", "orchestrator_workers"],
            )

        if any(w in task_lower for w in ["classify", "route", "triage", "categorize"]):
            return Recommendation(
                pattern="supervisor",
                reason="Multi-step with classification → supervisor routes to specialists",
                estimated_calls=3,
                estimated_cost_range="$0.004-0.008",
                alternatives=["pipeline"],
            )

        return Recommendation(
            pattern="pipeline",
            reason="Multi-step sequential task → stage-by-stage processing",
            estimated_calls=4,
            estimated_cost_range="$0.004-0.008",
            alternatives=["supervisor"],
        )

    # Default: Pipeline (safest general-purpose)
    return Recommendation(
        pattern="pipeline",
        reason="General-purpose task → sequential pipeline with composable stages",
        estimated_calls=2,
        estimated_cost_range="$0.002-0.004",
        alternatives=["supervisor", "self_reflection"],
    )