Table of Contents
- The Failure Taxonomy: Retryable vs Non-Retryable Errors
- Anthropic SDK Built-in Retry Behavior
- Implementing Custom Retry Logic with Exponential Backoff
- Handling Partial Streaming Failures
- Circuit Breaker Pattern for Resilience
- Graceful Degradation and Fallback Strategies
- Timeout Management Best Practices
- Building Observability Into Your Error Handling
- Multi-Region Failover Architecture
Production systems integrating Claude API must handle failures gracefully. Unlike synchronous database calls with predictable latency patterns, Claude API requests involve queuing, throttling, and downstream infrastructure that can fail in unexpected ways. This guide covers enterprise-grade error handling strategies that separate production systems from proof-of-concept implementations.
The Failure Taxonomy: Retryable vs Non-Retryable Errors
Not all errors should trigger retries. Understanding HTTP status codes is critical for building resilient systems. The Claude API returns standard HTTP semantics; the key is interpreting them correctly in context.
Ready to Deploy Claude in Your Organisation?
Our Claude Certified Architects have guided 50+ enterprise deployments. Book a free 30-minute scoping call to map your path from POC to production.
Book a Free Strategy Call →| Status Code | Error Type | Retryable | Recommended Action |
|---|---|---|---|
| 400 | Bad Request | No | Fix request format; log for debugging |
| 401 | Unauthorized | No | Verify API key validity and permissions |
| 403 | Forbidden | No | Check account permissions and quotas |
| 404 | Not Found | No | Verify model ID and endpoint existence |
| 429 | Rate Limited | Yes | Implement backoff; check rate limit headers |
| 500 | Server Error | Yes | Retry with exponential backoff |
| 529 | API Overloaded | Yes | Aggressive backoff; consider fallbacks |
Anthropic SDK Built-in Retry Behavior
Both the Python and Node.js Anthropic SDKs include automatic retry mechanisms with sensible defaults. Before implementing custom logic, understand what the SDK provides so you don't duplicate effort.
The Python SDK automatically retries transient failures using exponential backoff. Configure it via the max_retries parameter:
import anthropic
# Configure with custom retry settings
client = anthropic.Anthropic(
api_key="sk-ant-...",
max_retries=3, # Max retry attempts (default: 2)
timeout=60.0, # Request timeout in seconds
)
message = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "Explain retry logic"}
]
)
The Node.js SDK provides similar configuration. Learn more about SDK integration in our Claude API Integration guide.
Node.js Retry Configurationconst Anthropic = require("@anthropic-ai/sdk");
const client = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY,
maxRetries: 3,
timeout: 60000, // milliseconds
});
const message = await client.messages.create({
model: "claude-3-5-sonnet-20241022",
max_tokens: 1024,
messages: [
{ role: "user", content: "Explain error handling" }
],
});
The SDK uses exponential backoff internally: the delay doubles after each retry, with jitter to prevent thundering herds. For most applications, the default configuration suffices. Only implement custom logic when you need behavior beyond SDK defaults (circuit breakers, fallback models, or domain-specific handling).
Implementing Custom Retry Logic with Exponential Backoff
When you need control beyond SDK defaults—such as custom logging, metrics tracking, or integration with monitoring systems—implement a retry wrapper. Exponential backoff with jitter is the standard pattern:
Python Exponential Backoff with Jitterimport anthropic
import random
import time
from datetime import datetime
def call_claude_with_retry(
client,
model,
messages,
max_retries=3,
base_delay=1.0,
max_delay=60.0
):
"""Call Claude API with exponential backoff and jitter."""
for attempt in range(max_retries + 1):
try:
response = client.messages.create(
model=model,
max_tokens=1024,
messages=messages
)
return response
except anthropic.RateLimitError as e:
if attempt == max_retries:
raise
# Exponential backoff: 1s, 2s, 4s, 8s...
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter: ±10% randomization
jitter = delay * 0.1 * (2 * random.random() - 1)
wait_time = delay + jitter
print(f"Rate limited. Attempt {attempt+1}/{max_retries+1}, " +
f"waiting {wait_time:.2f}s before retry")
time.sleep(wait_time)
except anthropic.APIError as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = delay * 0.1 * (2 * random.random() - 1)
wait_time = delay + jitter
print(f"API error: {e}. Retrying in {wait_time:.2f}s...")
time.sleep(wait_time)
Jitter prevents synchronized retries from multiple clients—a phenomenon called the "thundering herd." Without jitter, all clients retry simultaneously after a timeout, overwhelming the API again. Random delays distribute retry attempts across time, dramatically improving success rates.
The Claude Rate Limiting and Scaling guide provides advanced strategies for managing quota across distributed systems.
Handling Partial Streaming Failures
Streaming responses introduce complexity: the connection opens successfully, but errors occur mid-stream. You cannot simply retry the entire request—the client may have already processed partial output.
Streaming with Checkpoint Recoveryimport anthropic
import json
def stream_with_recovery(client, messages, checkpoint_file=None):
"""Stream Claude response with checkpoint recovery."""
checkpoint = {}
if checkpoint_file:
try:
with open(checkpoint_file, 'r') as f:
checkpoint = json.load(f)
except FileNotFoundError:
checkpoint = {"last_token_count": 0}
# Resume from checkpoint by adjusting request
request_messages = messages.copy()
if checkpoint.get("last_token_count", 0) > 0:
print(f"Resuming from checkpoint: {checkpoint['last_token_count']} tokens processed")
collected_text = ""
token_count = 0
try:
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=request_messages,
) as stream:
for text in stream.text_stream:
collected_text += text
token_count += 1
# Periodic checkpoint saves
if token_count % 100 == 0 and checkpoint_file:
checkpoint["last_token_count"] = token_count
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint, f)
yield text
except anthropic.APIConnectionError as e:
# Connection lost mid-stream
print(f"Stream interrupted at token {token_count}: {e}")
# Save state for recovery
if checkpoint_file:
checkpoint["last_token_count"] = token_count
checkpoint["partial_output"] = collected_text
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint, f)
raise # Let caller decide recovery strategy
For streaming use cases, maintain idempotency keys and partial state. Clients should generate unique request IDs and store intermediate results. Learn detailed streaming patterns in the Claude Streaming Implementation guide.
Circuit Breaker Pattern for Resilience
The circuit breaker pattern prevents cascading failures: after N consecutive errors, stop sending requests to the API and instead fail fast. This gives the API time to recover and prevents wasting resources on doomed requests.
A circuit breaker has three states:
- Closed: Normal operation; requests flow through to the API
- Open: Failure threshold exceeded; requests fail immediately without hitting the API
- Half-Open: Testing if the API has recovered; allow a limited number of requests
import anthropic
import time
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing fast
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
def __init__(
self,
failure_threshold=5,
recovery_timeout=60,
success_threshold=2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.success_count = 0
print("Circuit breaker: entering half-open state")
else:
raise Exception("Circuit breaker is OPEN - API unavailable")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful request."""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
print("Circuit breaker: recovered, returning to CLOSED state")
def _on_failure(self):
"""Handle failed request."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker: OPEN after {self.failure_count} failures")
def _should_attempt_reset(self):
"""Check if recovery timeout has elapsed."""
if not self.last_failure_time:
return True
elapsed = datetime.now() - self.last_failure_time
return elapsed >= timedelta(seconds=self.recovery_timeout)
# Usage
client = anthropic.Anthropic(api_key="sk-ant-...")
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60)
try:
response = breaker.call(
client.messages.create,
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
)
except Exception as e:
print(f"Request failed: {e}")
# Fall back to cached response or simplified logic
Circuit breakers work best in distributed systems where you have cached fallbacks. Pair them with prompt caching to serve cached responses when the API is unavailable.
Graceful Degradation and Fallback Strategies
When Claude API is unavailable or rate-limited, implement fallback strategies that maintain service availability. Options include cached responses, simpler models, or simplified logic:
Fallback with Model Degradationimport anthropic
def call_claude_with_fallback(
messages,
primary_model="claude-3-5-sonnet-20241022",
fallback_model="claude-3-5-haiku-20241022"
):
"""Try primary model, fallback to faster model on failure."""
client = anthropic.Anthropic()
models_to_try = [
primary_model,
fallback_model, # Faster, cheaper
]
for model in models_to_try:
try:
print(f"Attempting with {model}...")
response = client.messages.create(
model=model,
max_tokens=1024,
messages=messages
)
print(f"Success with {model}")
return response
except anthropic.RateLimitError:
if model == models_to_try[-1]:
# Last model failed, return cached response
print(f"All models exhausted. Using cached response.")
return get_cached_response(messages)
print(f"{model} rate limited, trying {models_to_try[models_to_try.index(model)+1]}")
continue
except anthropic.APIError as e:
if model == models_to_try[-1]:
raise
print(f"{model} failed: {e}, trying fallback")
continue
def get_cached_response(messages):
"""Return cached response for common queries."""
# In production, implement actual cache (Redis, in-memory, etc.)
return {
"id": "cached-msg-123",
"content": [{"type": "text", "text": "Cached response"}],
"stop_reason": "end_turn"
}
Fallback strategies require planning: determine which operations can degrade gracefully, maintain response caches, and define acceptable quality thresholds for simplified models.
Timeout Management Best Practices
Timeouts prevent requests from hanging indefinitely. However, Claude requests have variable latency—complex prompts or large outputs take longer. Choose timeouts based on your use case:
- Short timeouts (5-10s): Simple completions, real-time chat interfaces
- Medium timeouts (30-60s): Standard API integration, moderate complexity
- Long timeouts (2-5 min): Complex analysis, large documents, batch processing
Streaming requests need longer timeouts because they establish the connection immediately but transmit data slowly:
Timeout Configurationimport anthropic
# Streaming timeout includes time to establish connection + time to receive tokens
client_streaming = anthropic.Anthropic(
timeout=300.0 # 5 minutes for streaming
)
# Non-streaming timeout can be shorter
client_quick = anthropic.Anthropic(
timeout=60.0 # 1 minute for standard requests
)
# Use appropriate client based on request type
try:
with client_streaming.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=4096, # Large output increases latency
messages=[{"role": "user", "content": "Analyze this 100-page document..."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except anthropic.APITimeoutError:
print("Request timed out - document may be too large")
Monitor actual latencies in production. If timeouts occur for valid requests, increase them. If most requests complete quickly, decrease them to fail fast on infrastructure issues.
Building Observability Into Your Error Handling
Error handling is ineffective without visibility. Implement comprehensive logging and metrics to understand failure modes and optimize retry strategies. See the Claude API Enterprise Guide for detailed monitoring patterns.
Structured Error Loggingimport anthropic
import logging
import json
from datetime import datetime
# Configure structured logging
logger = logging.getLogger(__name__)
handler = logging.FileHandler("claude_api.log")
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
def call_claude_with_logging(
client,
model,
messages,
request_id=None
):
"""Call Claude API with comprehensive error logging."""
request_id = request_id or datetime.now().isoformat()
start_time = datetime.now()
try:
response = client.messages.create(
model=model,
max_tokens=1024,
messages=messages
)
duration = (datetime.now() - start_time).total_seconds()
logger.info(json.dumps({
"event": "claude_api_success",
"request_id": request_id,
"model": model,
"duration_seconds": duration,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"timestamp": datetime.now().isoformat()
}))
return response
except anthropic.RateLimitError as e:
logger.warning(json.dumps({
"event": "claude_api_rate_limit",
"request_id": request_id,
"model": model,
"retry_after": e.response.headers.get("retry-after"),
"timestamp": datetime.now().isoformat()
}))
raise
except anthropic.APIError as e:
duration = (datetime.now() - start_time).total_seconds()
logger.error(json.dumps({
"event": "claude_api_error",
"request_id": request_id,
"model": model,
"error_type": type(e).__name__,
"error_message": str(e),
"duration_seconds": duration,
"timestamp": datetime.now().isoformat()
}))
raise
Track these metrics in production:
- Error Rate: Percentage of requests that fail (should be <1% for healthy systems)
- Retry Success Rate: Percentage of retried requests that eventually succeed
- P95/P99 Latency: Detect slowdowns before they affect users
- Rate Limit Events: Track 429 errors to understand quota utilization
- Circuit Breaker State: Alerts when breaker opens (API degradation)
Multi-Region Failover Architecture
For mission-critical applications, implement multi-region failover. Route requests to different Claude API endpoints based on real-time health checks:
Multi-Region Failover Strategyimport anthropic
from enum import Enum
class Region(Enum):
US_EAST = "https://api.us-east.anthropic.com"
US_WEST = "https://api.us-west.anthropic.com"
EU = "https://api.eu.anthropic.com"
class MultiRegionClient:
def __init__(self):
self.regions = [Region.US_EAST, Region.US_WEST, Region.EU]
self.current_region_idx = 0
self.region_health = {region: True for region in self.regions}
def get_client(self, region):
"""Create Anthropic client for specific region."""
# In production, use region-specific endpoints
return anthropic.Anthropic(
api_key="sk-ant-...",
base_url=region.value
)
def call_with_failover(self, messages, max_attempts=3):
"""Call Claude across regions with automatic failover."""
attempts = 0
while attempts < max_attempts:
region = self.regions[self.current_region_idx]
if not self.region_health[region]:
# Region is marked unhealthy, skip to next
self.current_region_idx = (self.current_region_idx + 1) % len(self.regions)
continue
try:
client = self.get_client(region)
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=messages
)
# Success - mark region as healthy
self.region_health[region] = True
return response
except anthropic.APIError as e:
print(f"Region {region.name} failed: {e}")
self.region_health[region] = False
self.current_region_idx = (self.current_region_idx + 1) % len(self.regions)
attempts += 1
raise Exception("All regions exhausted")
# Usage
multi_region = MultiRegionClient()
response = multi_region.call_with_failover(
messages=[{"role": "user", "content": "Hello"}]
)
Production systems use health checks to identify degraded regions. Monitor error rates and latency per region; mark regions as unhealthy when error rate exceeds thresholds. Combine multi-region failover with the circuit breaker pattern for maximum resilience.
Ready to Build Production-Grade Error Handling?
Our Claude API integration specialists help you design resilient architectures that scale. Learn how enterprises handle millions of requests with <1% error rates.
Explore Claude API Integration ServicesKey Takeaways
- Only retry transient failures (429, 5xx). Never retry client errors (4xx) unless explicitly documented.
- Use SDK defaults (Anthropic Python/Node.js SDKs) for automatic exponential backoff with sensible tuning.
- Implement custom retry logic only when you need behavior beyond SDK defaults (custom logging, circuit breakers, model fallback).
- Add jitter to exponential backoff to prevent thundering herds and synchronized retry storms.
- Use circuit breakers to fail fast and prevent cascading failures when the API is degraded.
- Implement fallback strategies: cached responses, model degradation, or simplified logic for graceful degradation.
- Set timeouts based on request type: 5-10s for chat, 30-60s for standard API calls, 2-5 min for complex analysis.
- Build comprehensive observability: track error rates, retry success rates, latency percentiles, and circuit breaker state.
- Multi-region failover is an advanced optimization for mission-critical systems; start with single-region + SDK retries.