Construisez un pipeline de recherche tolérant aux pannes en superposant une logique de réessai avec backoff exponentiel, des chaînes de repli de fournisseurs et un coupe-circuit qui arrête le routage vers les fournisseurs défaillants. Un fournisseur de recherche unique connaîtra des défaillances transitoires : limites de débit 429, surcharges 503, problèmes DNS et pics de délai d'attente. Les systèmes de production ont besoin d'une récupération automatique de tous ces problèmes sans abandonner les requêtes. Ce pipeline gère chaque mode de défaillance et suit l'état de santé des fournisseurs afin que les décisions de routage soient guidées par les données.
Prérequis
- Python 3.8+ installé
- bibliothèque requests installée
- Une clé API Scavio depuis scavio.dev
- Compréhension des modèles de réessai et de coupe-circuit
Parcours
Étape 1: Définir les fournisseurs de recherche
Configurez les fournisseurs principal et de repli avec leurs points de terminaison et leur authentification. Scavio sert de fournisseur principal avec une haute fiabilité.
import os, requests, time, json
from collections import defaultdict
API_KEY = os.environ['SCAVIO_API_KEY']
PROVIDERS = [
{
'name': 'scavio',
'url': 'https://api.scavio.dev/api/v1/search',
'headers': {'x-api-key': API_KEY},
'body_fn': lambda q: {'platform': 'google', 'query': q},
'parse_fn': lambda r: r.get('organic_results', []),
},
]
health = defaultdict(lambda: {'failures': 0, 'last_failure': 0, 'circuit_open': False})Étape 2: Implémenter le réessai avec backoff exponentiel
Enveloppez chaque appel de fournisseur dans une logique de réessai qui gère les erreurs transitoires (429, 500, 502, 503) avec des délais croissants.
def retry_search(provider: dict, query: str, max_retries: int = 3) -> list:
for attempt in range(max_retries):
try:
resp = requests.post(provider['url'],
headers=provider['headers'],
json=provider['body_fn'](query), timeout=10)
if resp.status_code == 429:
wait = 2 ** attempt
print(f"Rate limited by {provider['name']}, retrying in {wait}s")
time.sleep(wait)
continue
resp.raise_for_status()
results = provider['parse_fn'](resp.json())
health[provider['name']]['failures'] = 0
return results
except requests.exceptions.RequestException as e:
wait = 2 ** attempt
print(f"{provider['name']} attempt {attempt+1} failed: {e}, retrying in {wait}s")
time.sleep(wait)
health[provider['name']]['failures'] += 1
health[provider['name']]['last_failure'] = time.time()
return []Étape 3: Ajouter une chaîne de repli
Si le fournisseur principal échoue à tous les réessais, basculez vers le fournisseur suivant dans la chaîne. Suivez quel fournisseur a servi le résultat.
def fallback_search(query: str) -> dict:
for provider in PROVIDERS:
name = provider['name']
if health[name]['circuit_open']:
elapsed = time.time() - health[name]['last_failure']
if elapsed < 60:
print(f'Circuit open for {name}, skipping')
continue
else:
health[name]['circuit_open'] = False
print(f'Circuit half-open for {name}, retesting')
results = retry_search(provider, query)
if results:
return {'provider': name, 'results': results}
return {'provider': 'none', 'results': []}
result = fallback_search('best crm 2026')
print(f"Served by: {result['provider']}, results: {len(result['results'])}")Étape 4: Suivre l'état de santé des fournisseurs
Surveillez les taux d'échec et ouvrez le coupe-circuit lorsqu'un fournisseur dépasse le seuil d'échec.
FAILURE_THRESHOLD = 5
def update_health(provider_name: str, success: bool):
h = health[provider_name]
if success:
h['failures'] = max(0, h['failures'] - 1)
else:
h['failures'] += 1
h['last_failure'] = time.time()
if h['failures'] >= FAILURE_THRESHOLD:
h['circuit_open'] = True
print(f'Circuit OPEN for {provider_name} ({h["failures"]} consecutive failures)')
def health_report() -> dict:
report = {}
for name, h in health.items():
report[name] = {
'failures': h['failures'],
'circuit_open': h['circuit_open'],
'last_failure': datetime.datetime.fromtimestamp(h['last_failure']).isoformat() if h['last_failure'] else 'never',
}
return report
import datetime
print(json.dumps(health_report(), indent=2))Étape 5: Tester le pipeline complet
Simulez des défaillances pour vérifier le comportement de réessai, de repli et de coupe-circuit de bout en bout.
def test_pipeline():
# Normal query
r = fallback_search('python tutorial 2026')
assert r['results'], 'Normal query should return results'
print(f'Normal: {len(r["results"])} results from {r["provider"]}')
# Multiple queries to verify consistency
queries = ['best crm', 'react vs vue', 'machine learning course']
for q in queries:
r = fallback_search(q)
print(f'{q}: {len(r["results"])} results from {r["provider"]}')
# Health check
print(f'Health: {json.dumps(health_report(), indent=2)}')
print('Pipeline test passed')
test_pipeline()Exemple Python
import requests, os, time
H = {'x-api-key': os.environ['SCAVIO_API_KEY']}
def resilient_search(query, retries=3):
for i in range(retries):
try:
r = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'platform': 'google', 'query': query}, timeout=10)
r.raise_for_status()
return r.json().get('organic_results', [])
except Exception as e:
time.sleep(2 ** i)
return []
print(len(resilient_search('best crm 2026')), 'results')Exemple JavaScript
const H = {'x-api-key': process.env.SCAVIO_API_KEY, 'Content-Type': 'application/json'};
async function resilientSearch(query, retries = 3) {
for (let i = 0; i < retries; i++) {
try {
const r = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST', headers: H, body: JSON.stringify({platform: 'google', query})
});
if (r.ok) return (await r.json()).organic_results || [];
} catch {}
await new Promise(r => setTimeout(r, 2 ** i * 1000));
}
return [];
}
resilientSearch('best crm 2026').then(r => console.log(r.length + ' results'));Sortie attendue
A search pipeline that retries transient failures with backoff, falls back across providers, and opens circuit breakers on sustained failures.