通过将重试逻辑与指数退避、提供者回退链以及停止路由到不健康提供者的断路器分层,构建容错搜索管道。单个搜索提供商将遇到暂时性故障:429 速率限制、503 过载、DNS 打嗝和超时峰值。生产系统需要从所有这些情况中自动恢复而不丢失查询。该管道处理每种故障模式并跟踪提供商的运行状况,因此路由决策是数据驱动的。
前置条件
- 已安装 Python 3.8+
- 请求已安装库
- 来自 scavio.dev 的 Scavio API 密钥
- 了解重试和断路器模式
操作指南
步骤 1: 定义搜索提供商
配置主要和后备提供商及其端点和身份验证。 Scavio作为主用,可靠性高。
Python
import os, requests, time, json
from collections import defaultdict
API_KEY = os.environ['SCAVIO_API_KEY']
PROVIDERS = [
{
'name': 'scavio',
'url': 'https://api.scavio.dev/api/v1/search',
'headers': {'x-api-key': API_KEY},
'body_fn': lambda q: {'platform': 'google', 'query': q},
'parse_fn': lambda r: r.get('organic_results', []),
},
]
health = defaultdict(lambda: {'failures': 0, 'last_failure': 0, 'circuit_open': False})步骤 2: 使用指数退避实现重试
将每个提供程序调用包装在重试逻辑中,以处理延迟增加的瞬态错误(429、500、502、503)。
Python
def retry_search(provider: dict, query: str, max_retries: int = 3) -> list:
for attempt in range(max_retries):
try:
resp = requests.post(provider['url'],
headers=provider['headers'],
json=provider['body_fn'](query), timeout=10)
if resp.status_code == 429:
wait = 2 ** attempt
print(f"Rate limited by {provider['name']}, retrying in {wait}s")
time.sleep(wait)
continue
resp.raise_for_status()
results = provider['parse_fn'](resp.json())
health[provider['name']]['failures'] = 0
return results
except requests.exceptions.RequestException as e:
wait = 2 ** attempt
print(f"{provider['name']} attempt {attempt+1} failed: {e}, retrying in {wait}s")
time.sleep(wait)
health[provider['name']]['failures'] += 1
health[provider['name']]['last_failure'] = time.time()
return []步骤 3: 添加后备链
如果主提供程序在所有重试中均失败,则级联到链中的下一个提供程序。跟踪哪个提供商提供了结果。
Python
def fallback_search(query: str) -> dict:
for provider in PROVIDERS:
name = provider['name']
if health[name]['circuit_open']:
elapsed = time.time() - health[name]['last_failure']
if elapsed < 60:
print(f'Circuit open for {name}, skipping')
continue
else:
health[name]['circuit_open'] = False
print(f'Circuit half-open for {name}, retesting')
results = retry_search(provider, query)
if results:
return {'provider': name, 'results': results}
return {'provider': 'none', 'results': []}
result = fallback_search('best crm 2026')
print(f"Served by: {result['provider']}, results: {len(result['results'])}")步骤 4: 跟踪提供者的健康状况
监控故障率并在提供商超过故障阈值时打开断路器。
Python
FAILURE_THRESHOLD = 5
def update_health(provider_name: str, success: bool):
h = health[provider_name]
if success:
h['failures'] = max(0, h['failures'] - 1)
else:
h['failures'] += 1
h['last_failure'] = time.time()
if h['failures'] >= FAILURE_THRESHOLD:
h['circuit_open'] = True
print(f'Circuit OPEN for {provider_name} ({h["failures"]} consecutive failures)')
def health_report() -> dict:
report = {}
for name, h in health.items():
report[name] = {
'failures': h['failures'],
'circuit_open': h['circuit_open'],
'last_failure': datetime.datetime.fromtimestamp(h['last_failure']).isoformat() if h['last_failure'] else 'never',
}
return report
import datetime
print(json.dumps(health_report(), indent=2))步骤 5: 测试整个管道
模拟失败以验证端到端的重试、回退和断路器行为。
Python
def test_pipeline():
# Normal query
r = fallback_search('python tutorial 2026')
assert r['results'], 'Normal query should return results'
print(f'Normal: {len(r["results"])} results from {r["provider"]}')
# Multiple queries to verify consistency
queries = ['best crm', 'react vs vue', 'machine learning course']
for q in queries:
r = fallback_search(q)
print(f'{q}: {len(r["results"])} results from {r["provider"]}')
# Health check
print(f'Health: {json.dumps(health_report(), indent=2)}')
print('Pipeline test passed')
test_pipeline()Python 示例
Python
import requests, os, time
H = {'x-api-key': os.environ['SCAVIO_API_KEY']}
def resilient_search(query, retries=3):
for i in range(retries):
try:
r = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'platform': 'google', 'query': query}, timeout=10)
r.raise_for_status()
return r.json().get('organic_results', [])
except Exception as e:
time.sleep(2 ** i)
return []
print(len(resilient_search('best crm 2026')), 'results')JavaScript 示例
JavaScript
const H = {'x-api-key': process.env.SCAVIO_API_KEY, 'Content-Type': 'application/json'};
async function resilientSearch(query, retries = 3) {
for (let i = 0; i < retries; i++) {
try {
const r = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST', headers: H, body: JSON.stringify({platform: 'google', query})
});
if (r.ok) return (await r.json()).organic_results || [];
} catch {}
await new Promise(r => setTimeout(r, 2 ** i * 1000));
}
return [];
}
resilientSearch('best crm 2026').then(r => console.log(r.length + ' results'));预期输出
JSON
A search pipeline that retries transient failures with backoff, falls back across providers, and opens circuit breakers on sustained failures.