Fan-Out / Fan-In Pattern¶
Broadcast the same task to N agents in parallel; a dedicated aggregator synthesises all results.
Best for: Multi-perspective analysis, parallel research, ensemble fact-checking.
LLM calls: N parallel + 1 aggregator. Wall-clock time: max(agent latencies) + aggregator.
Sequence Diagram¶
sequenceDiagram
participant U as User
participant O as Orchestrator
participant A1 as Agent 1
participant A2 as Agent 2
participant A3 as Agent 3
participant Agg as Aggregator
U->>O: Task
par Parallel execution
O->>A1: Same task
O->>A2: Same task
O->>A3: Same task
end
A1-->>Agg: Perspective 1
A2-->>Agg: Perspective 2
A3-->>Agg: Perspective 3
Agg-->>U: Synthesised output
Use Case 1 — Investment Analysis (Bull / Bear / Neutral)¶
import asyncio
from pyagent_patterns.base import Agent
from pyagent_patterns.orchestration import FanOutFanIn
from pyagent_providers import GeminiLLM
analysis = FanOutFanIn(
agents=[
Agent(
"bull",
GeminiLLM("gemini-2.5-flash"),
system_prompt="Argue the strongest possible bullish investment case. "
"Focus on: revenue growth trajectory, market share expansion, "
"competitive moats, and catalysts for multiple expansion. "
"Use specific data points and comparable valuations.",
),
Agent(
"bear",
GeminiLLM("gemini-2.5-flash"),
system_prompt="Argue the strongest possible bearish case. "
"Focus on: valuation vs peers, competitive threats, macro risks, "
"execution risks, and downside scenarios. "
"Use specific data and counter the bull case directly.",
),
Agent(
"neutral",
GeminiLLM("gemini-2.5-flash"),
system_prompt="Give a balanced, probability-weighted assessment. "
"Acknowledge both bull and bear cases explicitly. "
"Identify the 2-3 key swing factors. "
"End with a base case, bull case, and bear case price target.",
),
],
aggregator=Agent(
"analyst",
GeminiLLM("gemini-2.5-pro"),
system_prompt="Synthesise all three perspectives into a structured investment memo: "
"1) Executive Summary (2 sentences), "
"2) Bull Case (key points), "
"3) Bear Case (key points), "
"4) Key Risks & Swing Factors, "
"5) Verdict with conviction level (High/Medium/Low).",
),
)
result = asyncio.run(analysis.run(
"Investment thesis for Nvidia at current valuation — $3.2T market cap, "
"P/E 45x forward, data center revenue growing 150% YoY"
))
print(result.output)
print(f"Parallel agents: {result.metadata['parallel_agents']}")
print(f"Duration: {result.duration_seconds:.1f}s, Cost: ${result.cost_estimate:.4f}")
Stream individual agent results as they complete¶
async def stream_analysis():
async for chunk in analysis.stream(
"Investment thesis for Nvidia at $3.2T market cap"
):
print(chunk, end="", flush=True)
asyncio.run(stream_analysis())
Use Case 2 — Multi-Provider Fact Checking¶
Different providers catch different error types — use all three together.
from pyagent_providers import OpenAILLM, AnthropicLLM
fact_checker = FanOutFanIn(
agents=[
Agent(
"openai_checker",
OpenAILLM("gpt-4o"),
system_prompt="Check factual accuracy. List every claim and mark each: "
"VERIFIED (with source), DISPUTED (with counter-evidence), "
"or UNVERIFIABLE. Be specific about what you can and cannot verify.",
),
Agent(
"anthropic_checker",
AnthropicLLM("claude-sonnet-4-20250514"),
system_prompt="Check logical consistency and internal contradictions. "
"Does the argument hold together? Are there gaps or non-sequiturs? "
"List any logical fallacies or unsupported leaps.",
),
Agent(
"gemini_checker",
GeminiLLM("gemini-2.5-pro"),
system_prompt="Check for statistical and numerical accuracy. "
"Are the numbers plausible? Do percentages add up? "
"Are comparisons fair? Flag any misleading use of statistics.",
),
],
aggregator=Agent(
"verdict",
AnthropicLLM("claude-sonnet-4-20250514"),
system_prompt="Combine all checker reports into a single fact-check verdict. "
"Give an overall reliability score (0-10). "
"List the top 3 concerns and which claims are safe to trust.",
),
)
result = asyncio.run(fact_checker.run(open("article_to_check.txt").read()))
print(result.output)
Use Case 3 — Parallel Code Review (LangChain)¶
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from pyagent_providers import LangChainLLM
code_review = FanOutFanIn(
agents=[
Agent(
"security_reviewer",
LangChainLLM(ChatOpenAI(model="gpt-4o")),
system_prompt="Review for security vulnerabilities: injection, auth bypass, "
"data exposure, insecure dependencies. Be exhaustive.",
),
Agent(
"performance_reviewer",
LangChainLLM(ChatAnthropic(model="claude-sonnet-4-20250514")),
system_prompt="Review for performance issues: N+1 queries, memory leaks, "
"unnecessary allocations, blocking I/O in async contexts.",
),
Agent(
"correctness_reviewer",
LangChainLLM(ChatOpenAI(model="gpt-4o")),
system_prompt="Review for logic errors, edge cases, off-by-one errors, "
"race conditions, and missing error handling.",
),
],
aggregator=Agent(
"lead_reviewer",
LangChainLLM(ChatAnthropic(model="claude-sonnet-4-20250514")),
system_prompt="Combine all reviews into a structured PR feedback comment. "
"Categorise findings as: BLOCKER, WARNING, SUGGESTION. "
"List blockers first. Be specific with line references.",
),
)
result = asyncio.run(code_review.run(open("pr_diff.py").read()))
print(f"Review complete. Cost: ${result.cost_estimate:.4f}")
OTel Trace Output¶
Trace: pyagent.pattern.fan_out_fan_in (2.8s, $0.018)
├── [parallel]
│ ├── pyagent.agent.bull (2.1s, gemini-2.5-flash)
│ ├── pyagent.agent.bear (2.4s, gemini-2.5-flash)
│ └── pyagent.agent.neutral (2.8s, gemini-2.5-flash)
└── pyagent.agent.analyst (1.6s, gemini-2.5-pro)
When to Use¶
| Condition | Recommendation |
|---|---|
| Multiple independent perspectives improve quality | ✅ Use Fan-Out/Fan-In |
| Wall-clock latency matters (parallel is fast) | ✅ Use Fan-Out/Fan-In |
| Analyses depend on each other | ❌ Use Pipeline |
| Agents need fixed hierarchy | ❌ Use Hierarchical |
| Agents should influence each other iteratively | ❌ Use Swarm |