Why Multi-Agent Systems Matter for Enterprise
A single Claude agent with 20 tools can handle a lot. But it runs sequentially — one tool call at a time — and its context window becomes the ceiling for task complexity. Multi-agent systems remove both constraints. Multiple specialised agents can run in parallel, each with a focused tool set and its own context window. Complex enterprise workflows — the kind that touch HR, finance, legal, and operations in a single process — become architecturally tractable.
The patterns we cover here are the same ones used in our enterprise agent deployments. They work at scale, they're debuggable, and they integrate with the Claude Agent SDK's sub-agent model.
The Core Orchestration Patterns
Pattern A: Orchestrator-Worker
The most common pattern. A single orchestrator agent receives the top-level task, decomposes it into subtasks, and delegates each to a specialised worker agent. Workers complete their tasks and return results. The orchestrator assembles the final output.
This is the right pattern when: tasks can be cleanly decomposed into independent subtasks, worker agents need domain-specific tools or system prompts, and you want a single point of control and audit.
import asyncio
from anthropic.lib.agents import Agent, tool, SubAgentRunner
runner = SubAgentRunner(client=client)
# Worker agents — specialist, narrow scope
financial_analyst = Agent(
client=client,
model="claude-sonnet-4-6",
system="You are a financial analyst. Analyse financial data and produce structured reports.",
tools=[get_financial_data, calculate_ratios, query_market_data]
)
legal_reviewer = Agent(
client=client,
model="claude-opus-4-6",
system="You are a legal reviewer. Identify compliance issues and regulatory risks.",
tools=[search_regulations, check_contract_terms, query_legal_database]
)
# Orchestrator tools that delegate to workers
@tool
async def run_financial_analysis(company: str, period: str) -> str:
"""Run financial analysis on a company for a given period."""
result = await runner.arun(financial_analyst,
f"Analyse {company}'s financials for {period}. Cover revenue, margins, and cash flow.")
return result.output
@tool
async def run_legal_review(document_id: str, jurisdiction: str) -> str:
"""Run legal compliance review on a document."""
result = await runner.arun(legal_reviewer,
f"Review document {document_id} for compliance with {jurisdiction} regulations.")
return result.output
# Orchestrator
due_diligence_orchestrator = Agent(
client=client,
model="claude-opus-4-6",
system="""You coordinate due diligence investigations. For each company:
1. Run parallel financial analysis AND legal review simultaneously
2. Synthesise both reports into a unified risk assessment
3. Flag any areas where financial and legal findings interact""",
tools=[run_financial_analysis, run_legal_review]
)Pattern B: Parallel Fan-Out
When multiple independent subtasks need to run simultaneously, fan-out dramatically reduces total runtime. Instead of running 5 analyses sequentially (5 × 30s = 150s), run them in parallel (30s + synthesis overhead). For enterprise workflows with independent data sources, this is frequently the difference between a 2-minute and a 10-minute execution time.
async def parallel_market_analysis(companies: list[str]) -> dict:
"""Run market analysis on multiple companies in parallel."""
async def analyse_one(company: str) -> tuple[str, str]:
result = await runner.arun(
market_analyst,
f"Analyse market position and competitive landscape for {company}"
)
return company, result.output
# Fan out — all analyses run simultaneously
tasks = [analyse_one(company) for company in companies]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Fan in — collect results, handle individual failures
analysis_map = {}
for item in results:
if isinstance(item, Exception):
logger.error(f"Analysis failed: {item}")
continue
company, analysis = item
analysis_map[company] = analysis
return analysis_map
# Synthesis step runs after all parallel tasks complete
async def full_sector_report(sector: str, companies: list[str]) -> str:
analyses = await parallel_market_analysis(companies)
synthesis_result = await synthesis_agent.arun(
f"Synthesise these {len(analyses)} company analyses into a sector report for {sector}:\n"
+ "\n\n".join(f"## {co}\n{analysis}" for co, analysis in analyses.items())
)
return synthesis_result.outputPattern C: Pipeline with Handoffs
For tasks where each stage depends on the previous stage's output, a pipeline pattern is appropriate. Agent A processes the input and produces structured output. Agent B receives Agent A's output as its input. Each stage adds value without needing visibility into the full pipeline context.
This pattern is used in document processing workflows: extract → classify → enrich → validate → store. Each stage is a focused agent. The pipeline coordinator manages the handoffs and handles stage failures.
Pattern D: Critic-Generator Loop
A generator agent produces an initial output. A critic agent reviews it against specific criteria. If the critic identifies issues, the output is returned to the generator with the critique. The loop continues until the critic approves or a maximum iteration count is reached. This pattern produces significantly higher quality output for complex writing tasks, code generation, and structured analysis — at the cost of 2–4× the token usage.
Agent Communication: What to Pass Between Agents
The biggest operational mistake in multi-agent systems is passing raw text between agents. An orchestrator shouldn't send "here's the financial analysis, now do the legal review" as a block of prose. It should send structured data.
| What to pass | Format | Why |
|---|---|---|
| Task instructions | Structured dict with task_type, scope, constraints, output_format | Reduces ambiguity; easier to validate and log |
| Results from previous agents | Typed data structures, not prose summaries | Downstream agents parse data reliably; prose summaries lose precision |
| Context and constraints | Separate from task instructions; injected into system prompt | Persistent constraints don't compete with task instructions for attention |
| Error information | Structured error object with code, message, and suggested_action | Orchestrator can make routing decisions based on error type |
| Confidence/quality signals | Numeric score or categorical rating from the producing agent | Orchestrator can decide whether to accept or re-run |
Managing State Across Agents
Multi-agent systems need shared state. The orchestrator needs to track which subtasks are complete, which are in progress, and what their outputs are. Worker agents may need read-only access to context produced by other workers. A naive approach — passing everything in every message — blows up context windows fast.
The production pattern uses a task state store: a lightweight database (Redis works well) that holds the current state of each task in the pipeline. Agents write their outputs to the state store, and downstream agents read what they need. The orchestrator monitors the state store to decide when to advance the pipeline.
from dataclasses import dataclass, field
from enum import Enum
import json
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class PipelineState:
pipeline_id: str
input: dict
stages: dict = field(default_factory=dict) # stage_name -> {status, output, error}
def mark_running(self, stage: str):
self.stages[stage] = {"status": TaskStatus.RUNNING.value, "output": None}
self._save()
def mark_complete(self, stage: str, output: any):
self.stages[stage] = {"status": TaskStatus.COMPLETE.value, "output": output}
self._save()
def mark_failed(self, stage: str, error: str):
self.stages[stage] = {"status": TaskStatus.FAILED.value, "error": error}
self._save()
def get_output(self, stage: str) -> any:
return self.stages.get(stage, {}).get("output")
def _save(self):
redis_client.set(f"pipeline:{self.pipeline_id}", json.dumps({
"pipeline_id": self.pipeline_id,
"input": self.input,
"stages": self.stages
}))Failure Handling in Multi-Agent Systems
When a single agent fails, you handle it locally. When an agent in a multi-agent pipeline fails, you need a strategy: should the pipeline halt, retry the failed stage, route around it, or proceed with partial results?
The answers depend on the business logic, but the engineering approach is consistent: every agent call should be wrapped in a fault boundary that catches failures and returns structured error information to the orchestrator. The orchestrator then makes the routing decision based on predefined rules.
Design multi-agent systems to degrade gracefully. If the legal review agent fails, the financial analysis result should still be usable. Build partial result handling into your orchestrator from day one — retrofitting it is painful.
Retry Strategies
Different failure types warrant different retry strategies. API rate limit errors: exponential backoff with jitter, retry up to 3 times. Tool execution failures: retry once with a modified prompt asking Claude to try an alternative approach. Context overflow: restart the agent with a summarised context. Validation failures (output doesn't meet schema): retry with explicit format instructions. Infrastructure failures: log, alert, and fail gracefully — don't retry aggressively.
Observability for Multi-Agent Systems
A single agent's behaviour is traceable — you have one conversation thread. A multi-agent system running 5–10 concurrent agents across a complex workflow is far harder to debug without proper instrumentation. You need distributed tracing that correlates all agent activity within a single pipeline execution.
The minimum viable observability stack for production multi-agent deployments includes: a pipeline trace ID propagated through all agent calls (so you can reconstruct the full execution path), per-agent span logs recording input, output, tool calls, token usage, and latency for each agent invocation, and a pipeline state log tracking stage transitions with timestamps. Feed all of this into your existing observability platform — we typically use OpenTelemetry exporters to Datadog or Grafana.
When Multi-Agent Is (and Isn't) the Right Answer
Multi-agent systems add complexity. Don't use them unless the task justifies it.
| Use multi-agent when... | Stick with single agent when... |
|---|---|
| Task has clearly independent parallel subtasks | Task is linear and sequential |
| Different subtasks need different tool sets | 5–8 tools cover everything needed |
| Total tool count exceeds ~15 | Tool set is manageable in one context |
| Subtasks benefit from specialised system prompts | One system prompt handles all cases |
| Pipeline stages have clear input/output interfaces | Task requires tight integration between reasoning steps |
| Quality requires critic-generator review loops | First-pass quality is acceptable |
If you're unsure, start with a single agent and add specialisation only when you hit a specific limitation — context overflow, tool count, or quality ceiling. Premature multi-agent architecture creates coordination overhead without corresponding benefits.
Further Reading in This Series
- Enterprise AI Agent Architecture: Design Patterns & Security — the full architecture reference
- Claude Agent SDK Guide: Build Production AI Agents — SDK fundamentals
- AI Agent Evaluation & Testing — measuring and improving multi-agent quality
- AI Agent Development Services — work with our certified architects
- MCP Protocol Guide — connecting agents to enterprise systems
- Claude Tool Use Guide — tool design fundamentals
Architect Your Multi-Agent System
Multi-agent systems for enterprise workflows are what we specialise in. From pattern selection to production deployment, our Claude Certified Architects design systems that scale.