Créez une couche de fiabilité de recherche pour les agents IA qui inclut la logique de circuit breaker, la mise en cache des résultats, la surveillance de la santé et un comportement de repli automatique. Les agents qui dépendent d'un seul appel de recherche sans gestion des erreurs échoueront complètement lorsque l'API de recherche est lente, limitée en débit ou temporairement indisponible. Une couche de fiabilité encapsule les appels de recherche avec une logique de protection qui sert des résultats mis en cache lors des pannes, ouvre un circuit breaker après des échecs répétés et fournit des métriques de santé pour la surveillance. Cela garantit que les agents continuent de fonctionner même en cas de perturbations de l'API de recherche.
Prérequis
- Python 3.8+ installé
- bibliothèque requests installée
- Une clé API Scavio depuis scavio.dev
- Un agent IA qui utilise des outils de recherche
Parcours
Étape 1: Construire le circuit breaker
Implémentez un circuit breaker qui empêche les défaillances en cascade lorsque l'API de recherche est en panne.
import os, requests, time, json, hashlib
from datetime import datetime, timedelta
API_KEY = os.environ['SCAVIO_API_KEY']
class CircuitBreaker:
def __init__(self, failure_threshold: int = 3, reset_timeout: int = 60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failures = 0
self.last_failure = None
self.state = 'closed' # closed = normal, open = blocking, half-open = testing
def can_execute(self) -> bool:
if self.state == 'closed':
return True
if self.state == 'open':
if self.last_failure and (datetime.now() - self.last_failure).seconds > self.reset_timeout:
self.state = 'half-open'
return True
return False
return True # half-open
def record_success(self):
self.failures = 0
self.state = 'closed'
def record_failure(self):
self.failures += 1
self.last_failure = datetime.now()
if self.failures >= self.failure_threshold:
self.state = 'open'
print(f'Circuit OPEN: {self.failures} failures')
breaker = CircuitBreaker()
print(f'Circuit state: {breaker.state}')Étape 2: Ajouter la mise en cache des résultats
Mettez en cache les résultats de recherche pour les servir lors des pannes et réduire les appels API redondants.
class SearchCache:
def __init__(self, ttl_seconds: int = 3600):
self.cache = {}
self.ttl = ttl_seconds
def _key(self, query: str, platform: str) -> str:
return hashlib.md5(f'{platform}:{query}'.encode()).hexdigest()
def get(self, query: str, platform: str = 'google') -> dict:
key = self._key(query, platform)
entry = self.cache.get(key)
if not entry:
return None
age = (datetime.now() - entry['timestamp']).seconds
if age > self.ttl:
return None # Expired
return entry['data']
def get_stale(self, query: str, platform: str = 'google') -> dict:
"""Return cached data even if expired (for fallback during outages)."""
key = self._key(query, platform)
entry = self.cache.get(key)
return entry['data'] if entry else None
def set(self, query: str, platform: str, data: dict):
key = self._key(query, platform)
self.cache[key] = {'data': data, 'timestamp': datetime.now()}
def stats(self) -> dict:
return {'entries': len(self.cache)}
cache = SearchCache(ttl_seconds=3600)
print(f'Cache initialized: {cache.stats()}')Étape 3: Construire l'enveloppe de fiabilité
Combinez circuit breaker, mise en cache et logique de nouvelle tentative en une seule fonction de recherche fiable.
class ReliableSearch:
def __init__(self, api_key: str):
self.api_key = api_key
self.breaker = CircuitBreaker(failure_threshold=3, reset_timeout=60)
self.cache = SearchCache(ttl_seconds=3600)
self.stats = {'hits': 0, 'misses': 0, 'errors': 0, 'circuit_opens': 0}
def search(self, query: str, platform: str = 'google') -> dict:
# Check cache first
cached = self.cache.get(query, platform)
if cached:
self.stats['hits'] += 1
return cached
# Check circuit breaker
if not self.breaker.can_execute():
stale = self.cache.get_stale(query, platform)
if stale:
return {**stale, '_source': 'stale_cache'}
return {'organic_results': [], '_source': 'circuit_open'}
# Make the API call
self.stats['misses'] += 1
try:
resp = requests.post('https://api.scavio.dev/api/v1/search',
headers={'x-api-key': self.api_key},
json={'platform': platform, 'query': query}, timeout=10)
if resp.status_code == 429:
self.breaker.record_failure()
time.sleep(2)
return self._fallback(query, platform)
resp.raise_for_status()
data = resp.json()
self.cache.set(query, platform, data)
self.breaker.record_success()
data['_source'] = 'live'
return data
except Exception as e:
self.stats['errors'] += 1
self.breaker.record_failure()
return self._fallback(query, platform)
def _fallback(self, query: str, platform: str) -> dict:
stale = self.cache.get_stale(query, platform)
if stale:
return {**stale, '_source': 'fallback_cache'}
return {'organic_results': [], '_source': 'no_data'}
search = ReliableSearch(API_KEY)
result = search.search('test query')
print(f"Source: {result.get('_source')}, Results: {len(result.get('organic_results', []))}")Étape 4: Ajouter la surveillance de la santé
Surveillez la santé de la couche de fiabilité et exposez des métriques pour les alertes.
class HealthMonitor:
def __init__(self, reliable_search: ReliableSearch):
self.search = reliable_search
self.checks = []
def check(self) -> dict:
result = {
'timestamp': datetime.now().isoformat(),
'circuit_state': self.search.breaker.state,
'cache_entries': self.search.cache.stats()['entries'],
'stats': self.search.stats.copy(),
'healthy': self.search.breaker.state != 'open',
}
# Test a live search
test = self.search.search('health check test')
result['live_test'] = test.get('_source', 'unknown')
result['live_results'] = len(test.get('organic_results', []))
self.checks.append(result)
return result
def summary(self) -> str:
latest = self.check()
lines = [
f"Health: {'OK' if latest['healthy'] else 'DEGRADED'}",
f"Circuit: {latest['circuit_state']}",
f"Cache: {latest['cache_entries']} entries",
f"Errors: {latest['stats']['errors']}",
f"Live test: {latest['live_test']} ({latest['live_results']} results)",
]
return '\n'.join(lines)
monitor = HealthMonitor(search)
print(monitor.summary())Étape 5: Intégrer avec l'agent
Remplacez les appels de recherche directs dans votre agent par la couche de fiabilité.
# Replace direct API calls with ReliableSearch in your agent:
def agent_tool_search(query: str, platform: str = 'google') -> list:
"""Drop-in replacement for agent search tools."""
result = search.search(query, platform)
source = result.get('_source', 'unknown')
results = result.get('organic_results', [])
# Log the source for debugging
if source != 'live':
print(f'Search served from: {source}')
return [{
'title': r.get('title', ''),
'url': r.get('link', ''),
'snippet': r.get('snippet', ''),
} for r in results[:5]]
# Test the integration
results = agent_tool_search('AI agent frameworks 2026')
print(f'Results: {len(results)}')
for r in results:
print(f" {r['title'][:50]}")
# Show final health
print(f'\n{monitor.summary()}')Exemple Python
import requests, os, time
H = {'x-api-key': os.environ['SCAVIO_API_KEY']}
cache = {}
def reliable_search(query, retries=2):
if query in cache:
return cache[query]
for i in range(retries + 1):
try:
r = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'platform': 'google', 'query': query}, timeout=10)
data = r.json().get('organic_results', [])
cache[query] = data
return data
except: time.sleep(2 ** i)
return cache.get(query, [])
print(len(reliable_search('test')))Exemple JavaScript
const H = {'x-api-key': process.env.SCAVIO_API_KEY, 'Content-Type': 'application/json'};
const cache = new Map();
async function reliableSearch(query, retries = 2) {
if (cache.has(query)) return cache.get(query);
for (let i = 0; i <= retries; i++) {
try {
const r = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST', headers: H,
body: JSON.stringify({platform: 'google', query})
});
const data = (await r.json()).organic_results || [];
cache.set(query, data);
return data;
} catch(e) { await new Promise(r => setTimeout(r, 1000 * 2**i)); }
}
return cache.get(query) || [];
}
reliableSearch('test').then(r => console.log(r.length));Sortie attendue
A production-grade search reliability layer with circuit breaker, result caching, health monitoring, and automatic fallback that prevents agent failures during search disruptions.