Workflow

n8n Enrichment Normalizer Workflow

n8n sub-workflow that normalizes enrichment API responses into a canonical schema. Prevents silent downstream failures from format changes.

Overview

This n8n sub-workflow sits between enrichment API calls and downstream processing nodes. It validates responses, maps vendor-specific fields to a canonical schema, handles missing fields with defaults, and passes uniform objects downstream. When an API changes its response format, only the normalizer needs updating.

Trigger

Called as sub-workflow from any enrichment pipeline in n8n

Schedule

Called on demand from parent workflows

Workflow Steps

1

Receive raw API response

Accept the raw JSON response from any enrichment API call (Scavio, clearbit, or other vendor).

2

Detect response format

Identify which vendor the response came from based on response structure or a passed vendor_id parameter.

3

Map fields to canonical schema

Transform vendor-specific field names (e.g., 'organic' vs 'results' vs 'items') to canonical field names (e.g., 'entries').

4

Apply type coercion and defaults

Ensure numeric fields are numbers, string fields are strings, and missing fields have sensible defaults.

5

Return normalized output

Pass the canonical schema downstream. All subsequent nodes receive identical data structure regardless of source vendor.

Python Implementation

Python
import requests
import json

API_KEY = "your_scavio_api_key"

def normalize_scavio(raw: dict, platform: str) -> list[dict]:
    """Normalize Scavio response to canonical enrichment schema."""
    entries = []
    for item in raw.get("organic", []):
        entry = {
            "title": str(item.get("title", "")),
            "url": str(item.get("link", "")),
            "description": str(item.get("snippet", "")),
            "position": int(item.get("position", 0)),
            "source": f"scavio_{platform}",
        }
        # Numeric fields with type coercion
        if item.get("price") is not None:
            entry["price"] = float(item["price"])
        if item.get("rating") is not None:
            entry["rating"] = float(item["rating"])
        if item.get("reviews") is not None:
            entry["review_count"] = int(item["reviews"])
        if item.get("views") is not None:
            entry["view_count"] = int(item["views"])
        if item.get("score") is not None:
            entry["engagement_score"] = int(item["score"])
        entries.append(entry)
    return entries

def normalize_generic(raw: dict) -> list[dict]:
    """Fallback normalizer for unknown API formats."""
    # Try common response patterns
    items = raw.get("results", raw.get("items", raw.get("data", [])))
    if not isinstance(items, list):
        return []
    return [{"title": str(item.get("title", item.get("name", ""))), "url": str(item.get("url", item.get("link", ""))), "description": str(item.get("description", item.get("snippet", ""))), "source": "unknown"} for item in items]

def enrich_and_normalize(query: str, platform: str) -> list[dict]:
    """Search and normalize in one step."""
    res = requests.post(
        "https://api.scavio.dev/api/v1/search",
        headers={"x-api-key": API_KEY},
        json={"platform": platform, "query": query},
        timeout=15,
    )
    res.raise_for_status()
    return normalize_scavio(res.json(), platform)

# Downstream code always receives same schema
google = enrich_and_normalize("CRM software", "google")
amazon = enrich_and_normalize("CRM book", "amazon")
reddit = enrich_and_normalize("CRM recommendation", "reddit")

for source_name, results in [("google", google), ("amazon", amazon), ("reddit", reddit)]:
    print(f"{source_name}: {len(results)} entries")
    if results:
        print(f"  Fields: {list(results[0].keys())}")

JavaScript Implementation

JavaScript
const API_KEY = "your_scavio_api_key";

function normalizeScavio(raw, platform) {
  return (raw.organic ?? []).map((item) => {
    const entry = {
      title: String(item.title ?? ""),
      url: String(item.link ?? ""),
      description: String(item.snippet ?? ""),
      position: Number(item.position ?? 0),
      source: `scavio_${platform}`,
    };
    if (item.price != null) entry.price = Number(item.price);
    if (item.rating != null) entry.rating = Number(item.rating);
    if (item.reviews != null) entry.reviewCount = Number(item.reviews);
    if (item.views != null) entry.viewCount = Number(item.views);
    if (item.score != null) entry.engagementScore = Number(item.score);
    return entry;
  });
}

async function enrichAndNormalize(query, platform) {
  const res = await fetch("https://api.scavio.dev/api/v1/search", {
    method: "POST",
    headers: { "x-api-key": API_KEY, "content-type": "application/json" },
    body: JSON.stringify({ platform, query }),
  });
  if (!res.ok) throw new Error(`scavio ${res.status}`);
  return normalizeScavio(await res.json(), platform);
}

const google = await enrichAndNormalize("CRM software", "google");
const amazon = await enrichAndNormalize("CRM book", "amazon");
console.log(`Google: ${google.length} entries, Amazon: ${amazon.length} entries`);
if (google[0]) console.log("Fields:", Object.keys(google[0]).join(", "));

Platforms Used

Google

Web search with knowledge graph, PAA, and AI overviews

YouTube

Video search with transcripts and metadata

Amazon

Product search with prices, ratings, and reviews

Walmart

Product search with pricing and fulfillment data

Reddit

Community, posts & threaded comments from any subreddit

Frequently Asked Questions

This n8n sub-workflow sits between enrichment API calls and downstream processing nodes. It validates responses, maps vendor-specific fields to a canonical schema, handles missing fields with defaults, and passes uniform objects downstream. When an API changes its response format, only the normalizer needs updating.

This workflow uses a called as sub-workflow from any enrichment pipeline in n8n. Called on demand from parent workflows.

This workflow uses the following Scavio platforms: google, youtube, amazon, walmart, reddit. Each platform is called via the same unified API endpoint.

Yes. Scavio's free tier includes 250 credits per month with no credit card required. That is enough to test and validate this workflow before scaling it.

n8n Enrichment Normalizer Workflow

n8n sub-workflow that normalizes enrichment API responses into a canonical schema. Prevents silent downstream failures from format changes.