通过批处理搜索查询、缓存结果和提取结构化信号以实现电子邮件个性化,构建冷电子邮件丰富管道,可扩展到数千个潜在客户。单线索富集适用于小型列表,但在规模上,您需要速率限制、错误处理、缓存和并行处理。该管道批量处理潜在客户,将丰富数据存储在本地缓存中以避免冗余 API 调用,并输出准备用于邮件合并或 CRM 导入的结构化数据。
前置条件
- 已安装 Python 3.8+
- 请求已安装库
- 来自 scavio.dev 的 Scavio API 密钥
- 包含公司名称的潜在客户列表(CSV 或 JSON)
操作指南
步骤 1: 设置批处理
使用本地缓存配置限速批处理以避免冗余查询。
Python
import os, requests, json, time, hashlib
API_KEY = os.environ['SCAVIO_API_KEY']
CACHE_FILE = 'enrichment_cache.json'
def load_cache() -> dict:
try:
with open(CACHE_FILE) as f:
return json.load(f)
except FileNotFoundError:
return {}
def save_cache(cache: dict):
with open(CACHE_FILE, 'w') as f:
json.dump(cache, f, indent=2)
def cache_key(query: str) -> str:
return hashlib.md5(query.encode()).hexdigest()
def cached_search(query: str, cache: dict) -> dict:
key = cache_key(query)
if key in cache:
return cache[key]
resp = requests.post('https://api.scavio.dev/api/v1/search',
headers={'x-api-key': API_KEY},
json={'platform': 'google', 'query': query}, timeout=15)
data = resp.json()
cache[key] = data
return data
cache = load_cache()
print(f'Cache entries: {len(cache)}')步骤 2: 提取富集信号
解析与冷外展相关的公司信号的搜索结果。
Python
def extract_enrichment(company: str, data: dict) -> dict:
results = data.get('organic_results', [])
enrichment = {
'company': company,
'found': len(results) > 0,
'website': '',
'description': '',
'recent_news': [],
'hiring': False,
'tech_signals': [],
}
for r in results[:8]:
title = r.get('title', '')
snippet = r.get('snippet', '')
link = r.get('link', '')
if not enrichment['website'] and company.lower().replace(' ', '') in link.lower().replace(' ', ''):
enrichment['website'] = link
if not enrichment['description'] and len(snippet) > 50:
enrichment['description'] = snippet[:200]
if any(w in title.lower() for w in ['hiring', 'careers', 'jobs', 'we are hiring']):
enrichment['hiring'] = True
if any(w in (title + snippet).lower() for w in ['raises', 'funding', 'launch', 'announces']):
enrichment['recent_news'].append(title[:80])
return enrichment
data = cached_search('Notion company overview', cache)
signals = extract_enrichment('Notion', data)
print(f"Website: {signals['website']}")
print(f"Hiring: {signals['hiring']}")步骤 3: 批处理线索
通过速率限制和进度跟踪以可配置的批次处理线索。
Python
def batch_enrich(companies: list, batch_size: int = 10, delay: float = 0.5) -> list:
cache = load_cache()
enriched = []
total = len(companies)
for i, company in enumerate(companies):
query = f'{company} company overview'
data = cached_search(query, cache)
signals = extract_enrichment(company, data)
enriched.append(signals)
if (i + 1) % batch_size == 0:
save_cache(cache)
print(f'Progress: {i+1}/{total} ({len(cache)} cached)')
time.sleep(delay)
save_cache(cache)
print(f'Completed: {total} companies enriched')
return enriched
companies = ['Notion', 'Linear', 'Vercel', 'Supabase', 'Clerk']
results = batch_enrich(companies, batch_size=2, delay=0.3)步骤 4: 为潜在客户质量评分
根据丰富信号分配质量分数,以优先考虑外展活动。
Python
def score_lead(enrichment: dict) -> int:
score = 0
if enrichment['found']:
score += 1
if enrichment['website']:
score += 1
if enrichment['hiring']:
score += 2 # Hiring = budget available
if enrichment['recent_news']:
score += 2 # Recent activity = responsive
if enrichment['description']:
score += 1
return score
def rank_leads(enriched: list) -> list:
for lead in enriched:
lead['score'] = score_lead(lead)
ranked = sorted(enriched, key=lambda x: x['score'], reverse=True)
for lead in ranked:
print(f" {lead['company']}: score={lead['score']} hiring={lead['hiring']} news={len(lead['recent_news'])}")
return ranked
ranked = rank_leads(results)步骤 5: 导出以进行邮件合并
将丰富和评分的潜在客户输出为 CSV,准备用于邮件合并工具。
Python
import csv
def export_enriched(leads: list, output_path: str):
fields = ['company', 'score', 'website', 'description', 'hiring', 'recent_news']
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fields, extrasaction='ignore')
writer.writeheader()
for lead in leads:
row = {**lead}
row['recent_news'] = '; '.join(lead.get('recent_news', [])[:2])
writer.writerow(row)
print(f'Exported {len(leads)} leads to {output_path}')
export_enriched(ranked, 'enriched_leads.csv')Python 示例
Python
import requests, os
H = {'x-api-key': os.environ['SCAVIO_API_KEY']}
def enrich(company):
data = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'platform': 'google', 'query': f'{company} company overview'}).json()
results = data.get('organic_results', [])[:3]
hiring = any('hiring' in r.get('title', '').lower() for r in results)
return {'company': company, 'hiring': hiring, 'results': len(results)}
for c in ['Notion', 'Linear', 'Vercel']:
print(enrich(c))JavaScript 示例
JavaScript
const H = {'x-api-key': process.env.SCAVIO_API_KEY, 'Content-Type': 'application/json'};
async function enrich(company) {
const r = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST', headers: H,
body: JSON.stringify({platform: 'google', query: `${company} company overview`})
});
const results = (await r.json()).organic_results || [];
return {company, results: results.length, hiring: results.some(r => /hiring|careers/i.test(r.title || ''))};
}
Promise.all(['Notion', 'Linear'].map(enrich)).then(console.log);预期输出
JSON
A scalable cold email enrichment pipeline with caching, batch processing, lead scoring, and CSV export for mail merge integration.