Overview
This n8n sub-workflow sits between enrichment API calls and downstream processing nodes. It validates responses, maps vendor-specific fields to a canonical schema, handles missing fields with defaults, and passes uniform objects downstream. When an API changes its response format, only the normalizer needs updating.
Trigger
Called as sub-workflow from any enrichment pipeline in n8n
Schedule
Called on demand from parent workflows
Workflow Steps
Receive raw API response
Accept the raw JSON response from any enrichment API call (Scavio, clearbit, or other vendor).
Detect response format
Identify which vendor the response came from based on response structure or a passed vendor_id parameter.
Map fields to canonical schema
Transform vendor-specific field names (e.g., 'organic' vs 'results' vs 'items') to canonical field names (e.g., 'entries').
Apply type coercion and defaults
Ensure numeric fields are numbers, string fields are strings, and missing fields have sensible defaults.
Return normalized output
Pass the canonical schema downstream. All subsequent nodes receive identical data structure regardless of source vendor.
Python Implementation
import requests
import json
API_KEY = "your_scavio_api_key"
def normalize_scavio(raw: dict, platform: str) -> list[dict]:
"""Normalize Scavio response to canonical enrichment schema."""
entries = []
for item in raw.get("organic", []):
entry = {
"title": str(item.get("title", "")),
"url": str(item.get("link", "")),
"description": str(item.get("snippet", "")),
"position": int(item.get("position", 0)),
"source": f"scavio_{platform}",
}
# Numeric fields with type coercion
if item.get("price") is not None:
entry["price"] = float(item["price"])
if item.get("rating") is not None:
entry["rating"] = float(item["rating"])
if item.get("reviews") is not None:
entry["review_count"] = int(item["reviews"])
if item.get("views") is not None:
entry["view_count"] = int(item["views"])
if item.get("score") is not None:
entry["engagement_score"] = int(item["score"])
entries.append(entry)
return entries
def normalize_generic(raw: dict) -> list[dict]:
"""Fallback normalizer for unknown API formats."""
# Try common response patterns
items = raw.get("results", raw.get("items", raw.get("data", [])))
if not isinstance(items, list):
return []
return [{"title": str(item.get("title", item.get("name", ""))), "url": str(item.get("url", item.get("link", ""))), "description": str(item.get("description", item.get("snippet", ""))), "source": "unknown"} for item in items]
def enrich_and_normalize(query: str, platform: str) -> list[dict]:
"""Search and normalize in one step."""
res = requests.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": API_KEY},
json={"platform": platform, "query": query},
timeout=15,
)
res.raise_for_status()
return normalize_scavio(res.json(), platform)
# Downstream code always receives same schema
google = enrich_and_normalize("CRM software", "google")
amazon = enrich_and_normalize("CRM book", "amazon")
reddit = enrich_and_normalize("CRM recommendation", "reddit")
for source_name, results in [("google", google), ("amazon", amazon), ("reddit", reddit)]:
print(f"{source_name}: {len(results)} entries")
if results:
print(f" Fields: {list(results[0].keys())}")JavaScript Implementation
const API_KEY = "your_scavio_api_key";
function normalizeScavio(raw, platform) {
return (raw.organic ?? []).map((item) => {
const entry = {
title: String(item.title ?? ""),
url: String(item.link ?? ""),
description: String(item.snippet ?? ""),
position: Number(item.position ?? 0),
source: `scavio_${platform}`,
};
if (item.price != null) entry.price = Number(item.price);
if (item.rating != null) entry.rating = Number(item.rating);
if (item.reviews != null) entry.reviewCount = Number(item.reviews);
if (item.views != null) entry.viewCount = Number(item.views);
if (item.score != null) entry.engagementScore = Number(item.score);
return entry;
});
}
async function enrichAndNormalize(query, platform) {
const res = await fetch("https://api.scavio.dev/api/v1/search", {
method: "POST",
headers: { "x-api-key": API_KEY, "content-type": "application/json" },
body: JSON.stringify({ platform, query }),
});
if (!res.ok) throw new Error(`scavio ${res.status}`);
return normalizeScavio(await res.json(), platform);
}
const google = await enrichAndNormalize("CRM software", "google");
const amazon = await enrichAndNormalize("CRM book", "amazon");
console.log(`Google: ${google.length} entries, Amazon: ${amazon.length} entries`);
if (google[0]) console.log("Fields:", Object.keys(google[0]).join(", "));Platforms Used
Web search with knowledge graph, PAA, and AI overviews
YouTube
Video search with transcripts and metadata
Amazon
Product search with prices, ratings, and reviews
Walmart
Product search with pricing and fulfillment data
Community, posts & threaded comments from any subreddit