Overview
This n8n-compatible pipeline monitors the health of API data sources used in your n8n workflows. It sends test queries to each Scavio endpoint (Google, Amazon, Reddit, TikTok) and verifies the response contains expected data. If any endpoint returns errors or empty results, the pipeline sends an alert to Slack with the failure details. Run as a scheduled n8n workflow to catch API issues before they break production workflows.
Trigger
Cron schedule (every 6 hours)
Schedule
Runs every 6 hours
Workflow Steps
Define health check endpoints
List all Scavio endpoints used in production workflows with test queries for each.
Execute test queries
Send a test query to each endpoint and record response status, latency, and result count.
Validate response quality
Check that each response contains at least 1 organic result and valid JSON structure.
Alert on failures
If any endpoint fails validation, send a Slack alert with endpoint name, error details, and timestamp.
Python Implementation
import requests
import json
from datetime import datetime
from pathlib import Path
import time
API_KEY = "your_scavio_api_key"
TIKTOK_URL = "https://api.scavio.dev/api/v1/tiktok"
HEALTH_CHECKS = [
{"name": "google", "platform": "google", "query": "test query", "min_results": 1},
{"name": "amazon", "platform": "amazon", "query": "laptop", "min_results": 1},
{"name": "reddit", "platform": "reddit", "query": "python programming", "min_results": 1},
]
def check_endpoint(check: dict) -> dict:
start = time.time()
try:
res = requests.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": API_KEY},
json={"platform": check["platform"], "query": check["query"]},
timeout=15,
)
latency = int((time.time() - start) * 1000)
if not res.ok:
return {"name": check["name"], "status": "error", "code": res.status_code, "latency_ms": latency}
data = res.json()
result_count = len(data.get("organic", []))
healthy = result_count >= check["min_results"]
return {"name": check["name"], "status": "healthy" if healthy else "degraded", "results": result_count, "latency_ms": latency}
except Exception as e:
return {"name": check["name"], "status": "down", "error": str(e), "latency_ms": int((time.time() - start) * 1000)}
def run():
date = datetime.utcnow().strftime("%Y-%m-%d %H:%M")
results = [check_endpoint(c) for c in HEALTH_CHECKS]
failures = [r for r in results if r["status"] != "healthy"]
print(f"Health check {date}:")
for r in results:
status_icon = "OK" if r["status"] == "healthy" else "FAIL"
print(f" [{status_icon}] {r['name']}: {r['status']} ({r['latency_ms']}ms)")
if failures:
print(f" ALERT: {len(failures)} endpoints unhealthy")
Path("api_health.json").write_text(json.dumps({"timestamp": date, "results": results}, indent=2))
if __name__ == "__main__":
run()JavaScript Implementation
const API_KEY = "your_scavio_api_key";
const CHECKS = [
{ name: "google", platform: "google", query: "test query" },
{ name: "amazon", platform: "amazon", query: "laptop" },
{ name: "reddit", platform: "reddit", query: "python programming" },
];
async function checkEndpoint(check) {
const start = Date.now();
try {
const res = await fetch("https://api.scavio.dev/api/v1/search", {
method: "POST",
headers: { "x-api-key": API_KEY, "content-type": "application/json" },
body: JSON.stringify({ platform: check.platform, query: check.query }),
});
const ms = Date.now() - start;
if (!res.ok) return { name: check.name, status: "error", code: res.status, ms };
const data = await res.json();
return { name: check.name, status: (data.organic ?? []).length > 0 ? "healthy" : "degraded", ms };
} catch (e) {
return { name: check.name, status: "down", error: e.message, ms: Date.now() - start };
}
}
for (const c of CHECKS) {
const r = await checkEndpoint(c);
console.log(`[${r.status === "healthy" ? "OK" : "FAIL"}] ${r.name}: ${r.status} (${r.ms}ms)`);
}Platforms Used
Web search with knowledge graph, PAA, and AI overviews
Amazon
Product search with prices, ratings, and reviews
Community, posts & threaded comments from any subreddit