Overview
Every 15 minutes, run a known query through your agent pipeline and verify the response contains expected data. Catches API degradation, auth expiry, and pipeline breaks before they affect users.
Trigger
Every 15 minutes
Schedule
Every 15 minutes
Workflow Steps
Run canary query
Execute a known query that should always return specific results.
Validate response shape
Check that response contains expected fields and result count.
Check latency
Measure response time against baseline threshold.
Alert on failure
Send alert if validation fails or latency exceeds threshold.
Python Implementation
import requests, os, time
H = {"x-api-key": os.environ["SCAVIO_API_KEY"]}
CANARY_QUERY = "python programming language"
EXPECTED_DOMAIN = "python.org"
MAX_LATENCY_MS = 3000
def health_check():
start = time.time()
try:
r = requests.post("https://api.scavio.dev/api/v1/search",
headers=H, json={"platform": "google", "query": CANARY_QUERY}, timeout=10)
latency_ms = (time.time() - start) * 1000
data = r.json()
results = data.get("organic_results", [])
has_expected = any(EXPECTED_DOMAIN in r.get("link", "") for r in results)
return {
"status": "healthy" if has_expected and latency_ms < MAX_LATENCY_MS else "degraded",
"latency_ms": round(latency_ms),
"result_count": len(results),
"expected_found": has_expected
}
except Exception as e:
return {"status": "down", "error": str(e)}
print(health_check())JavaScript Implementation
const start = Date.now();
const r = await fetch("https://api.scavio.dev/api/v1/search", {
method: "POST", headers: {"x-api-key": process.env.SCAVIO_API_KEY, "Content-Type": "application/json"},
body: JSON.stringify({platform: "google", query: "python programming language"})
});
const ms = Date.now() - start;
const data = await r.json();
const ok = data.organic_results?.some(r => r.link?.includes("python.org"));
console.log({status: ok && ms < 3000 ? "healthy" : "degraded", latency_ms: ms});Platforms Used
Web search with knowledge graph, PAA, and AI overviews