Build a cold email enrichment pipeline that scales to thousands of leads by batch-processing search queries, caching results, and extracting structured signals for email personalization. Single-lead enrichment works for small lists, but at scale you need rate limiting, error handling, caching, and parallel processing. This pipeline processes leads in batches, stores enrichment data in a local cache to avoid redundant API calls, and outputs structured data ready for mail merge or CRM import.
Prerequisites
- Python 3.8+ installed
- requests library installed
- A Scavio API key from scavio.dev
- A lead list (CSV or JSON) with company names
Walkthrough
Step 1: Set up batch processing
Configure rate-limited batch processing with a local cache to avoid redundant queries.
import os, requests, json, time, hashlib
API_KEY = os.environ['SCAVIO_API_KEY']
CACHE_FILE = 'enrichment_cache.json'
def load_cache() -> dict:
try:
with open(CACHE_FILE) as f:
return json.load(f)
except FileNotFoundError:
return {}
def save_cache(cache: dict):
with open(CACHE_FILE, 'w') as f:
json.dump(cache, f, indent=2)
def cache_key(query: str) -> str:
return hashlib.md5(query.encode()).hexdigest()
def cached_search(query: str, cache: dict) -> dict:
key = cache_key(query)
if key in cache:
return cache[key]
resp = requests.post('https://api.scavio.dev/api/v1/search',
headers={'x-api-key': API_KEY},
json={'platform': 'google', 'query': query}, timeout=15)
data = resp.json()
cache[key] = data
return data
cache = load_cache()
print(f'Cache entries: {len(cache)}')Step 2: Extract enrichment signals
Parse search results for company signals relevant to cold outreach.
def extract_enrichment(company: str, data: dict) -> dict:
results = data.get('organic_results', [])
enrichment = {
'company': company,
'found': len(results) > 0,
'website': '',
'description': '',
'recent_news': [],
'hiring': False,
'tech_signals': [],
}
for r in results[:8]:
title = r.get('title', '')
snippet = r.get('snippet', '')
link = r.get('link', '')
if not enrichment['website'] and company.lower().replace(' ', '') in link.lower().replace(' ', ''):
enrichment['website'] = link
if not enrichment['description'] and len(snippet) > 50:
enrichment['description'] = snippet[:200]
if any(w in title.lower() for w in ['hiring', 'careers', 'jobs', 'we are hiring']):
enrichment['hiring'] = True
if any(w in (title + snippet).lower() for w in ['raises', 'funding', 'launch', 'announces']):
enrichment['recent_news'].append(title[:80])
return enrichment
data = cached_search('Notion company overview', cache)
signals = extract_enrichment('Notion', data)
print(f"Website: {signals['website']}")
print(f"Hiring: {signals['hiring']}")Step 3: Batch process leads
Process leads in configurable batches with rate limiting and progress tracking.
def batch_enrich(companies: list, batch_size: int = 10, delay: float = 0.5) -> list:
cache = load_cache()
enriched = []
total = len(companies)
for i, company in enumerate(companies):
query = f'{company} company overview'
data = cached_search(query, cache)
signals = extract_enrichment(company, data)
enriched.append(signals)
if (i + 1) % batch_size == 0:
save_cache(cache)
print(f'Progress: {i+1}/{total} ({len(cache)} cached)')
time.sleep(delay)
save_cache(cache)
print(f'Completed: {total} companies enriched')
return enriched
companies = ['Notion', 'Linear', 'Vercel', 'Supabase', 'Clerk']
results = batch_enrich(companies, batch_size=2, delay=0.3)Step 4: Score lead quality
Assign quality scores based on enrichment signals to prioritize outreach.
def score_lead(enrichment: dict) -> int:
score = 0
if enrichment['found']:
score += 1
if enrichment['website']:
score += 1
if enrichment['hiring']:
score += 2 # Hiring = budget available
if enrichment['recent_news']:
score += 2 # Recent activity = responsive
if enrichment['description']:
score += 1
return score
def rank_leads(enriched: list) -> list:
for lead in enriched:
lead['score'] = score_lead(lead)
ranked = sorted(enriched, key=lambda x: x['score'], reverse=True)
for lead in ranked:
print(f" {lead['company']}: score={lead['score']} hiring={lead['hiring']} news={len(lead['recent_news'])}")
return ranked
ranked = rank_leads(results)Step 5: Export for mail merge
Output enriched and scored leads as a CSV ready for mail merge tools.
import csv
def export_enriched(leads: list, output_path: str):
fields = ['company', 'score', 'website', 'description', 'hiring', 'recent_news']
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fields, extrasaction='ignore')
writer.writeheader()
for lead in leads:
row = {**lead}
row['recent_news'] = '; '.join(lead.get('recent_news', [])[:2])
writer.writerow(row)
print(f'Exported {len(leads)} leads to {output_path}')
export_enriched(ranked, 'enriched_leads.csv')Python Example
import requests, os
H = {'x-api-key': os.environ['SCAVIO_API_KEY']}
def enrich(company):
data = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'platform': 'google', 'query': f'{company} company overview'}).json()
results = data.get('organic_results', [])[:3]
hiring = any('hiring' in r.get('title', '').lower() for r in results)
return {'company': company, 'hiring': hiring, 'results': len(results)}
for c in ['Notion', 'Linear', 'Vercel']:
print(enrich(c))JavaScript Example
const H = {'x-api-key': process.env.SCAVIO_API_KEY, 'Content-Type': 'application/json'};
async function enrich(company) {
const r = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST', headers: H,
body: JSON.stringify({platform: 'google', query: `${company} company overview`})
});
const results = (await r.json()).organic_results || [];
return {company, results: results.length, hiring: results.some(r => /hiring|careers/i.test(r.title || ''))};
}
Promise.all(['Notion', 'Linear'].map(enrich)).then(console.log);Expected Output
A scalable cold email enrichment pipeline with caching, batch processing, lead scoring, and CSV export for mail merge integration.