Build a search reliability layer for AI agents that includes circuit breaker logic, result caching, health monitoring, and automatic fallback behavior. Agents that depend on a single search call without error handling will fail completely when the search API is slow, rate-limited, or temporarily unavailable. A reliability layer wraps search calls with protection logic that serves cached results during outages, opens a circuit breaker after repeated failures, and provides health metrics for monitoring. This ensures agents continue functioning even during search API disruptions.
Prerequisites
- Python 3.8+ installed
- requests library installed
- A Scavio API key from scavio.dev
- An AI agent that uses search tools
Walkthrough
Step 1: Build the circuit breaker
Implement a circuit breaker that prevents cascading failures when the search API is down.
import os, requests, time, json, hashlib
from datetime import datetime, timedelta
API_KEY = os.environ['SCAVIO_API_KEY']
class CircuitBreaker:
def __init__(self, failure_threshold: int = 3, reset_timeout: int = 60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failures = 0
self.last_failure = None
self.state = 'closed' # closed = normal, open = blocking, half-open = testing
def can_execute(self) -> bool:
if self.state == 'closed':
return True
if self.state == 'open':
if self.last_failure and (datetime.now() - self.last_failure).seconds > self.reset_timeout:
self.state = 'half-open'
return True
return False
return True # half-open
def record_success(self):
self.failures = 0
self.state = 'closed'
def record_failure(self):
self.failures += 1
self.last_failure = datetime.now()
if self.failures >= self.failure_threshold:
self.state = 'open'
print(f'Circuit OPEN: {self.failures} failures')
breaker = CircuitBreaker()
print(f'Circuit state: {breaker.state}')Step 2: Add result caching
Cache search results to serve during outages and reduce redundant API calls.
class SearchCache:
def __init__(self, ttl_seconds: int = 3600):
self.cache = {}
self.ttl = ttl_seconds
def _key(self, query: str, platform: str) -> str:
return hashlib.md5(f'{platform}:{query}'.encode()).hexdigest()
def get(self, query: str, platform: str = 'google') -> dict:
key = self._key(query, platform)
entry = self.cache.get(key)
if not entry:
return None
age = (datetime.now() - entry['timestamp']).seconds
if age > self.ttl:
return None # Expired
return entry['data']
def get_stale(self, query: str, platform: str = 'google') -> dict:
"""Return cached data even if expired (for fallback during outages)."""
key = self._key(query, platform)
entry = self.cache.get(key)
return entry['data'] if entry else None
def set(self, query: str, platform: str, data: dict):
key = self._key(query, platform)
self.cache[key] = {'data': data, 'timestamp': datetime.now()}
def stats(self) -> dict:
return {'entries': len(self.cache)}
cache = SearchCache(ttl_seconds=3600)
print(f'Cache initialized: {cache.stats()}')Step 3: Build the reliability wrapper
Combine circuit breaker, caching, and retry logic into a single reliable search function.
class ReliableSearch:
def __init__(self, api_key: str):
self.api_key = api_key
self.breaker = CircuitBreaker(failure_threshold=3, reset_timeout=60)
self.cache = SearchCache(ttl_seconds=3600)
self.stats = {'hits': 0, 'misses': 0, 'errors': 0, 'circuit_opens': 0}
def search(self, query: str, platform: str = 'google') -> dict:
# Check cache first
cached = self.cache.get(query, platform)
if cached:
self.stats['hits'] += 1
return cached
# Check circuit breaker
if not self.breaker.can_execute():
stale = self.cache.get_stale(query, platform)
if stale:
return {**stale, '_source': 'stale_cache'}
return {'organic_results': [], '_source': 'circuit_open'}
# Make the API call
self.stats['misses'] += 1
try:
resp = requests.post('https://api.scavio.dev/api/v1/search',
headers={'x-api-key': self.api_key},
json={'platform': platform, 'query': query}, timeout=10)
if resp.status_code == 429:
self.breaker.record_failure()
time.sleep(2)
return self._fallback(query, platform)
resp.raise_for_status()
data = resp.json()
self.cache.set(query, platform, data)
self.breaker.record_success()
data['_source'] = 'live'
return data
except Exception as e:
self.stats['errors'] += 1
self.breaker.record_failure()
return self._fallback(query, platform)
def _fallback(self, query: str, platform: str) -> dict:
stale = self.cache.get_stale(query, platform)
if stale:
return {**stale, '_source': 'fallback_cache'}
return {'organic_results': [], '_source': 'no_data'}
search = ReliableSearch(API_KEY)
result = search.search('test query')
print(f"Source: {result.get('_source')}, Results: {len(result.get('organic_results', []))}")Step 4: Add health monitoring
Monitor the reliability layer's health and expose metrics for alerting.
class HealthMonitor:
def __init__(self, reliable_search: ReliableSearch):
self.search = reliable_search
self.checks = []
def check(self) -> dict:
result = {
'timestamp': datetime.now().isoformat(),
'circuit_state': self.search.breaker.state,
'cache_entries': self.search.cache.stats()['entries'],
'stats': self.search.stats.copy(),
'healthy': self.search.breaker.state != 'open',
}
# Test a live search
test = self.search.search('health check test')
result['live_test'] = test.get('_source', 'unknown')
result['live_results'] = len(test.get('organic_results', []))
self.checks.append(result)
return result
def summary(self) -> str:
latest = self.check()
lines = [
f"Health: {'OK' if latest['healthy'] else 'DEGRADED'}",
f"Circuit: {latest['circuit_state']}",
f"Cache: {latest['cache_entries']} entries",
f"Errors: {latest['stats']['errors']}",
f"Live test: {latest['live_test']} ({latest['live_results']} results)",
]
return '\n'.join(lines)
monitor = HealthMonitor(search)
print(monitor.summary())Step 5: Integrate with agent
Replace direct search calls in your agent with the reliability layer.
# Replace direct API calls with ReliableSearch in your agent:
def agent_tool_search(query: str, platform: str = 'google') -> list:
"""Drop-in replacement for agent search tools."""
result = search.search(query, platform)
source = result.get('_source', 'unknown')
results = result.get('organic_results', [])
# Log the source for debugging
if source != 'live':
print(f'Search served from: {source}')
return [{
'title': r.get('title', ''),
'url': r.get('link', ''),
'snippet': r.get('snippet', ''),
} for r in results[:5]]
# Test the integration
results = agent_tool_search('AI agent frameworks 2026')
print(f'Results: {len(results)}')
for r in results:
print(f" {r['title'][:50]}")
# Show final health
print(f'\n{monitor.summary()}')Python Example
import requests, os, time
H = {'x-api-key': os.environ['SCAVIO_API_KEY']}
cache = {}
def reliable_search(query, retries=2):
if query in cache:
return cache[query]
for i in range(retries + 1):
try:
r = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'platform': 'google', 'query': query}, timeout=10)
data = r.json().get('organic_results', [])
cache[query] = data
return data
except: time.sleep(2 ** i)
return cache.get(query, [])
print(len(reliable_search('test')))JavaScript Example
const H = {'x-api-key': process.env.SCAVIO_API_KEY, 'Content-Type': 'application/json'};
const cache = new Map();
async function reliableSearch(query, retries = 2) {
if (cache.has(query)) return cache.get(query);
for (let i = 0; i <= retries; i++) {
try {
const r = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST', headers: H,
body: JSON.stringify({platform: 'google', query})
});
const data = (await r.json()).organic_results || [];
cache.set(query, data);
return data;
} catch(e) { await new Promise(r => setTimeout(r, 1000 * 2**i)); }
}
return cache.get(query) || [];
}
reliableSearch('test').then(r => console.log(r.length));Expected Output
A production-grade search reliability layer with circuit breaker, result caching, health monitoring, and automatic fallback that prevents agent failures during search disruptions.