当单个工具调用失败时,生产代理就会中断。后备链可确保您的代理始终通过级联主要搜索、辅助源和本地缓存数据返回有用的结果。本教程使用 Scavio Search API 作为主要来源构建一个三层后备链,并使用辅助重新排名通道和本地缓存作为最终安全网。
前置条件
- Python 3.11+ 或 Node.js 20+
- 来自 https://scavio.dev 的 Scavio API 密钥
- 对 try/catch 错误处理的基本了解
- 用于缓存的本地 SQLite 或 JSON 文件
操作指南
步骤 1: 设置主要搜索调用
向 Scavio API 发出初始搜索请求。如果成功,则直接返回结果。将调用包装在错误处理中,以便故障级联到下一层。
import httpx
import json
from pathlib import Path
SCAVIO_API_KEY = "your-api-key"
CACHE_FILE = Path("search_cache.json")
async def primary_search(query: str) -> dict:
"""Tier 1: Live Scavio search."""
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": SCAVIO_API_KEY},
json={"query": query, "num_results": 10}
)
resp.raise_for_status()
return resp.json()步骤 2: 添加二次搜索并重新排名
如果主要调用返回的结果少于三个,则触发更广泛的查询并在本地重新排名。这可以捕获原始查询过于狭窄的边缘情况。
async def secondary_search(query: str) -> dict:
"""Tier 2: Broader query with local re-rank."""
broad_query = " ".join(query.split()[:4]) # simplify query
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": SCAVIO_API_KEY},
json={"query": broad_query, "num_results": 20}
)
resp.raise_for_status()
data = resp.json()
# Re-rank: prefer results containing original query terms
terms = set(query.lower().split())
results = data.get("results", [])
scored = []
for r in results:
title_lower = r.get("title", "").lower()
score = sum(1 for t in terms if t in title_lower)
scored.append((score, r))
scored.sort(key=lambda x: x[0], reverse=True)
data["results"] = [r for _, r in scored[:10]]
return data步骤 3: 实施本地缓存回退
作为最后的手段,从本地 JSON 缓存文件中读取。每次成功的搜索都会写入此缓存,因此当两个 API 层都失败时,过时的数据始终可用。
def write_cache(query: str, data: dict):
cache = json.loads(CACHE_FILE.read_text()) if CACHE_FILE.exists() else {}
cache[query] = data
CACHE_FILE.write_text(json.dumps(cache, indent=2))
def read_cache(query: str) -> dict | None:
if not CACHE_FILE.exists():
return None
cache = json.loads(CACHE_FILE.read_text())
return cache.get(query)步骤 4: 将后备链连接在一起
将所有三层合并为一个函数。代理调用这一函数并始终获得结果,其中元数据指示哪一层为其提供服务。
async def fallback_search(query: str) -> dict:
# Tier 1: primary search
try:
data = await primary_search(query)
if len(data.get("results", [])) >= 3:
write_cache(query, data)
return {"tier": "primary", **data}
except Exception as e:
print(f"Primary search failed: {e}")
# Tier 2: broader secondary search
try:
data = await secondary_search(query)
if data.get("results"):
write_cache(query, data)
return {"tier": "secondary", **data}
except Exception as e:
print(f"Secondary search failed: {e}")
# Tier 3: local cache
cached = read_cache(query)
if cached:
return {"tier": "cache", **cached}
return {"tier": "none", "results": [], "error": "All tiers exhausted"}Python 示例
import asyncio
import httpx
import json
from pathlib import Path
SCAVIO_API_KEY = "your-api-key"
CACHE_FILE = Path("search_cache.json")
async def primary_search(query: str) -> dict:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": SCAVIO_API_KEY},
json={"query": query, "num_results": 10}
)
resp.raise_for_status()
return resp.json()
async def secondary_search(query: str) -> dict:
broad_query = " ".join(query.split()[:4])
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": SCAVIO_API_KEY},
json={"query": broad_query, "num_results": 20}
)
resp.raise_for_status()
data = resp.json()
terms = set(query.lower().split())
results = data.get("results", [])
scored = [(sum(1 for t in terms if t in r.get("title", "").lower()), r) for r in results]
scored.sort(key=lambda x: x[0], reverse=True)
data["results"] = [r for _, r in scored[:10]]
return data
def write_cache(query: str, data: dict):
cache = json.loads(CACHE_FILE.read_text()) if CACHE_FILE.exists() else {}
cache[query] = data
CACHE_FILE.write_text(json.dumps(cache, indent=2))
def read_cache(query: str) -> dict | None:
if not CACHE_FILE.exists():
return None
return json.loads(CACHE_FILE.read_text()).get(query)
async def fallback_search(query: str) -> dict:
try:
data = await primary_search(query)
if len(data.get("results", [])) >= 3:
write_cache(query, data)
return {"tier": "primary", **data}
except Exception:
pass
try:
data = await secondary_search(query)
if data.get("results"):
write_cache(query, data)
return {"tier": "secondary", **data}
except Exception:
pass
cached = read_cache(query)
if cached:
return {"tier": "cache", **cached}
return {"tier": "none", "results": []}
result = asyncio.run(fallback_search("best headless CMS 2026"))
print(f"Served from: {result['tier']}")
print(f"Results: {len(result.get('results', []))}")JavaScript 示例
const SCAVIO_API_KEY = "your-api-key";
const fs = require("fs");
const CACHE_FILE = "search_cache.json";
async function primarySearch(query) {
const resp = await fetch("https://api.scavio.dev/api/v1/search", {
method: "POST",
headers: { "x-api-key": SCAVIO_API_KEY, "Content-Type": "application/json" },
body: JSON.stringify({ query, num_results: 10 })
});
if (!resp.ok) throw new Error("Primary search failed: " + resp.status);
return resp.json();
}
async function secondarySearch(query) {
const broad = query.split(" ").slice(0, 4).join(" ");
const resp = await fetch("https://api.scavio.dev/api/v1/search", {
method: "POST",
headers: { "x-api-key": SCAVIO_API_KEY, "Content-Type": "application/json" },
body: JSON.stringify({ query: broad, num_results: 20 })
});
if (!resp.ok) throw new Error("Secondary search failed");
const data = await resp.json();
const terms = new Set(query.toLowerCase().split(" "));
data.results = data.results
.map(r => ({ score: [...terms].filter(t => (r.title || "").toLowerCase().includes(t)).length, ...r }))
.sort((a, b) => b.score - a.score)
.slice(0, 10);
return data;
}
function writeCache(query, data) {
const cache = fs.existsSync(CACHE_FILE) ? JSON.parse(fs.readFileSync(CACHE_FILE)) : {};
cache[query] = data;
fs.writeFileSync(CACHE_FILE, JSON.stringify(cache, null, 2));
}
function readCache(query) {
if (!fs.existsSync(CACHE_FILE)) return null;
return JSON.parse(fs.readFileSync(CACHE_FILE))[query] || null;
}
async function fallbackSearch(query) {
try {
const data = await primarySearch(query);
if ((data.results || []).length >= 3) { writeCache(query, data); return { tier: "primary", ...data }; }
} catch {}
try {
const data = await secondarySearch(query);
if (data.results?.length) { writeCache(query, data); return { tier: "secondary", ...data }; }
} catch {}
const cached = readCache(query);
if (cached) return { tier: "cache", ...cached };
return { tier: "none", results: [] };
}
fallbackSearch("best headless CMS 2026").then(r => {
console.log("Served from:", r.tier);
console.log("Results:", (r.results || []).length);
});预期输出
Served from: primary
Results: 10