Skip to content

Pipeline Pattern

Sequential stage chain — each agent's output becomes the next agent's input.

Best for: ETL, document processing, multi-step transformation.
LLM calls: N (one per stage). Latency: sum of all stages.


Sequence Diagram

sequenceDiagram
    participant U as User
    participant S1 as Stage 1
    participant S2 as Stage 2
    participant S3 as Stage 3

    U->>S1: Raw input
    S1->>S2: Transformed output
    S2->>S3: Further transformed
    S3-->>U: Final output

Use Case 1 — Earnings Report Processing

Route cheap extraction to Haiku, fact-checking to GPT-4o-mini, final brief to Sonnet.

import asyncio
from pyagent_patterns.base import Agent
from pyagent_patterns.orchestration import Pipeline
from pyagent_providers import AnthropicLLM, OpenAILLM

pipeline = Pipeline(stages=[
    Agent(
        "extractor",
        AnthropicLLM("claude-haiku-3-5-20241022"),
        system_prompt="Extract every claim, figure, and named entity from the text. "
                      "Be exhaustive — list all revenue figures, percentages, and named companies.",
    ),
    Agent(
        "fact_checker",
        OpenAILLM("gpt-4o-mini"),
        system_prompt="Identify which extracted items are verifiable facts vs opinions or speculation. "
                      "Label each: FACT, OPINION, or UNVERIFIABLE.",
    ),
    Agent(
        "writer",
        AnthropicLLM("claude-sonnet-4-20250514"),
        system_prompt="Write a concise, structured brief for a portfolio manager. "
                      "Lead with the single most important fact. Use bullet points. Max 200 words.",
    ),
])

result = asyncio.run(pipeline.run(
    "Tesla Q3 2025 earnings: Revenue $25.2B (+8% YoY), auto gross margin 17.1%, "
    "energy storage deployments up 73% YoY, FSD miles driven passed 2B total. "
    "CEO commented margins will recover in Q4 as production ramps."
))
print(result.output)
print(f"Stages: {result.metadata['stages']}, Stage names: {result.metadata['stage_names']}")
print(f"Duration: {result.duration_seconds:.1f}s, Cost: ${result.cost_estimate:.4f}")

Stream stage completions as they arrive

async def stream_pipeline():
    async for chunk in pipeline.stream(
        "Tesla Q3 2025 earnings: Revenue $25.2B (+8% YoY)..."
    ):
        print(chunk, end="", flush=True)
    print()

asyncio.run(stream_pipeline())

Mix Anthropic and Gemini across stages based on their different strengths.

from pyagent_providers import GeminiLLM

legal_pipeline = Pipeline(stages=[
    Agent(
        "clause_extractor",
        GeminiLLM("gemini-2.5-flash"),
        system_prompt="Extract every contractual obligation, deadline, and liability clause. "
                      "Format as a numbered list. Include section references.",
    ),
    Agent(
        "risk_classifier",
        OpenAILLM("gpt-4o"),
        system_prompt="Classify each clause as HIGH / MEDIUM / LOW risk. "
                      "Flag anything that exposes the company to uncapped liability or IP loss. "
                      "Explain your risk rating in one sentence per clause.",
    ),
    Agent(
        "counsel_brief",
        AnthropicLLM("claude-sonnet-4-20250514"),
        system_prompt="Write an executive brief for in-house counsel. "
                      "Lead with the top 3 risks. Recommend negotiation priorities. "
                      "Keep it under 300 words.",
    ),
])

result = asyncio.run(legal_pipeline.run(open("contract.txt").read()))
print(result.output)
print(f"Stages: {result.metadata['stage_names']}")

Use Case 3 — Multilingual Content Pipeline (LangChain)

from langchain_anthropic import ChatAnthropic
from langchain_openai import ChatOpenAI
from pyagent_providers import LangChainLLM

multilingual = Pipeline(stages=[
    Agent(
        "simplifier",
        LangChainLLM(ChatAnthropic(model="claude-haiku-3-5-20241022")),
        system_prompt="Rewrite the text at a 6th-grade reading level. "
                      "Keep all key information but remove jargon.",
    ),
    Agent(
        "translator",
        LangChainLLM(ChatOpenAI(model="gpt-4o-mini")),
        system_prompt="Translate to Spanish (Latin American). Preserve formatting and tone.",
    ),
    Agent(
        "localiser",
        LangChainLLM(ChatAnthropic(model="claude-haiku-3-5-20241022")),
        system_prompt="Adapt cultural references and idioms for a Mexican audience. "
                      "Flag any phrases that don't translate well.",
    ),
])

result = asyncio.run(multilingual.run(open("product_manual.txt").read()))
print(f"Cost: ${result.cost_estimate:.4f}")

OTel Trace Output

Trace: pyagent.pattern.pipeline (4.1s, $0.012)
├── pyagent.agent.extractor  (0.9s, claude-haiku-3-5-20241022)
├── pyagent.agent.fact_checker (1.8s, gpt-4o-mini)
└── pyagent.agent.writer (1.4s, claude-sonnet-4-20250514)

When to Use

Condition Recommendation
Task has clear sequential stages ✅ Use Pipeline
Each stage transforms output for the next ✅ Use Pipeline
Stages could run independently ❌ Use Fan-Out/Fan-In
You need quality feedback loops ❌ Use Self-Reflection
A coordinator decides which stages to run ❌ Use Orchestrator-Workers

See Also