Build a fault-tolerant search pipeline by layering retry logic with exponential backoff, provider fallback chains, and a circuit breaker that stops routing to unhealthy providers. A single search provider will experience transient failures: 429 rate limits, 503 overloads, DNS hiccups, and timeout spikes. Production systems need automatic recovery from all of these without dropping queries. This pipeline handles each failure mode and tracks provider health so routing decisions are data-driven.
Prerequisites
- Python 3.8+ installed
- requests library installed
- A Scavio API key from scavio.dev
- Understanding of retry and circuit breaker patterns
Walkthrough
Step 1: Define search providers
Configure primary and fallback providers with their endpoints and auth. Scavio serves as the primary with high reliability.
import os, requests, time, json
from collections import defaultdict
API_KEY = os.environ['SCAVIO_API_KEY']
PROVIDERS = [
{
'name': 'scavio',
'url': 'https://api.scavio.dev/api/v1/search',
'headers': {'x-api-key': API_KEY},
'body_fn': lambda q: {'platform': 'google', 'query': q},
'parse_fn': lambda r: r.get('organic_results', []),
},
]
health = defaultdict(lambda: {'failures': 0, 'last_failure': 0, 'circuit_open': False})Step 2: Implement retry with exponential backoff
Wrap each provider call in retry logic that handles transient errors (429, 500, 502, 503) with increasing delays.
def retry_search(provider: dict, query: str, max_retries: int = 3) -> list:
for attempt in range(max_retries):
try:
resp = requests.post(provider['url'],
headers=provider['headers'],
json=provider['body_fn'](query), timeout=10)
if resp.status_code == 429:
wait = 2 ** attempt
print(f"Rate limited by {provider['name']}, retrying in {wait}s")
time.sleep(wait)
continue
resp.raise_for_status()
results = provider['parse_fn'](resp.json())
health[provider['name']]['failures'] = 0
return results
except requests.exceptions.RequestException as e:
wait = 2 ** attempt
print(f"{provider['name']} attempt {attempt+1} failed: {e}, retrying in {wait}s")
time.sleep(wait)
health[provider['name']]['failures'] += 1
health[provider['name']]['last_failure'] = time.time()
return []Step 3: Add fallback chain
If the primary provider fails all retries, cascade to the next provider in the chain. Track which provider served the result.
def fallback_search(query: str) -> dict:
for provider in PROVIDERS:
name = provider['name']
if health[name]['circuit_open']:
elapsed = time.time() - health[name]['last_failure']
if elapsed < 60:
print(f'Circuit open for {name}, skipping')
continue
else:
health[name]['circuit_open'] = False
print(f'Circuit half-open for {name}, retesting')
results = retry_search(provider, query)
if results:
return {'provider': name, 'results': results}
return {'provider': 'none', 'results': []}
result = fallback_search('best crm 2026')
print(f"Served by: {result['provider']}, results: {len(result['results'])}")Step 4: Track provider health
Monitor failure rates and open the circuit breaker when a provider exceeds the failure threshold.
FAILURE_THRESHOLD = 5
def update_health(provider_name: str, success: bool):
h = health[provider_name]
if success:
h['failures'] = max(0, h['failures'] - 1)
else:
h['failures'] += 1
h['last_failure'] = time.time()
if h['failures'] >= FAILURE_THRESHOLD:
h['circuit_open'] = True
print(f'Circuit OPEN for {provider_name} ({h["failures"]} consecutive failures)')
def health_report() -> dict:
report = {}
for name, h in health.items():
report[name] = {
'failures': h['failures'],
'circuit_open': h['circuit_open'],
'last_failure': datetime.datetime.fromtimestamp(h['last_failure']).isoformat() if h['last_failure'] else 'never',
}
return report
import datetime
print(json.dumps(health_report(), indent=2))Step 5: Test the full pipeline
Simulate failures to verify retry, fallback, and circuit breaker behavior end-to-end.
def test_pipeline():
# Normal query
r = fallback_search('python tutorial 2026')
assert r['results'], 'Normal query should return results'
print(f'Normal: {len(r["results"])} results from {r["provider"]}')
# Multiple queries to verify consistency
queries = ['best crm', 'react vs vue', 'machine learning course']
for q in queries:
r = fallback_search(q)
print(f'{q}: {len(r["results"])} results from {r["provider"]}')
# Health check
print(f'Health: {json.dumps(health_report(), indent=2)}')
print('Pipeline test passed')
test_pipeline()Python Example
import requests, os, time
H = {'x-api-key': os.environ['SCAVIO_API_KEY']}
def resilient_search(query, retries=3):
for i in range(retries):
try:
r = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'platform': 'google', 'query': query}, timeout=10)
r.raise_for_status()
return r.json().get('organic_results', [])
except Exception as e:
time.sleep(2 ** i)
return []
print(len(resilient_search('best crm 2026')), 'results')JavaScript Example
const H = {'x-api-key': process.env.SCAVIO_API_KEY, 'Content-Type': 'application/json'};
async function resilientSearch(query, retries = 3) {
for (let i = 0; i < retries; i++) {
try {
const r = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST', headers: H, body: JSON.stringify({platform: 'google', query})
});
if (r.ok) return (await r.json()).organic_results || [];
} catch {}
await new Promise(r => setTimeout(r, 2 ** i * 1000));
}
return [];
}
resilientSearch('best crm 2026').then(r => console.log(r.length + ' results'));Expected Output
A search pipeline that retries transient failures with backoff, falls back across providers, and opens circuit breakers on sustained failures.