La surveillance des prix multi-plateformes compare le même produit sur Amazon et Google Shopping pour trouver la meilleure offre et détecter les variations de prix. Au lieu de construire des scrapers séparés pour chaque plateforme, l'API Scavio fournit à la fois les données des produits Amazon et les résultats Google Shopping via un seul point d'accès. Ce tutoriel construit un pipeline de surveillance qui suit les produits sur plusieurs plateformes, stocke les prix historiques et alerte lorsque les prix descendent en dessous d'un seuil. Chaque vérification de plateforme coûte 0,005 $.
Prérequis
- Python 3.9+ installé
- bibliothèque requests installée
- Une clé API Scavio provenant de scavio.dev
- Une liste de produits à surveiller
Parcours
Étape 1: Rechercher des produits sur les deux plateformes
Créez des fonctions pour rechercher sur Amazon et Google Shopping le même produit. Les deux utilisent l'API de recherche Scavio avec des paramètres de plateforme différents.
import requests, os
API_KEY = os.environ['SCAVIO_API_KEY']
ENDPOINT = 'https://api.scavio.dev/api/v1/search'
def search_amazon(query: str) -> list:
resp = requests.post(ENDPOINT,
headers={'x-api-key': API_KEY, 'Content-Type': 'application/json'},
json={'platform': 'amazon', 'query': query, 'marketplace': 'US'})
return resp.json().get('products', [])
def search_google_shopping(query: str) -> list:
resp = requests.post(ENDPOINT,
headers={'x-api-key': API_KEY, 'Content-Type': 'application/json'},
json={'query': query, 'country_code': 'us', 'type': 'shopping'})
return resp.json().get('shopping_results', [])Étape 2: Normaliser les prix sur les plateformes
Amazon et Google Shopping renvoient les prix dans des formats différents. Normalisez-les en valeurs flottantes pour la comparaison.
import re
def parse_price(price_str) -> float:
if not price_str:
return 0.0
if isinstance(price_str, (int, float)):
return float(price_str)
cleaned = re.sub(r'[^0-9.]', '', str(price_str))
try:
return float(cleaned)
except ValueError:
return 0.0
def compare_prices(query: str) -> dict:
amazon = search_amazon(query)
google = search_google_shopping(query)
amazon_prices = [{'title': p.get('title', '')[:60], 'price': parse_price(p.get('price')),
'platform': 'amazon', 'link': p.get('link', '')}
for p in amazon if parse_price(p.get('price')) > 0]
google_prices = [{'title': p.get('title', '')[:60], 'price': parse_price(p.get('price')),
'platform': 'google_shopping', 'link': p.get('link', '')}
for p in google if parse_price(p.get('price')) > 0]
all_prices = sorted(amazon_prices + google_prices, key=lambda x: x['price'])
return {'query': query, 'total_listings': len(all_prices),
'cheapest': all_prices[0] if all_prices else None,
'all': all_prices[:10]}
result = compare_prices('Sony WH-1000XM5')
print(f'Cheapest: ${result["cheapest"]["price"]} on {result["cheapest"]["platform"]}')Étape 3: Stocker l'historique des prix en JSON
Enregistrez des instantanés de prix quotidiens pour suivre les tendances et détecter les baisses. Chaque produit obtient une série temporelle de prix par plateforme.
import json
from datetime import date
HISTORY_FILE = 'price_history.json'
def load_history() -> dict:
try:
with open(HISTORY_FILE) as f:
return json.load(f)
except FileNotFoundError:
return {}
def save_snapshot(query: str, prices: list) -> None:
history = load_history()
today = date.today().isoformat()
if query not in history:
history[query] = []
history[query].append({
'date': today,
'prices': [{'platform': p['platform'], 'price': p['price'],
'title': p['title'][:40]} for p in prices[:5]]
})
with open(HISTORY_FILE, 'w') as f:
json.dump(history, f, indent=2)
# Save today's data:
result = compare_prices('Sony WH-1000XM5')
save_snapshot('Sony WH-1000XM5', result['all'])
print(f'Saved snapshot with {len(result["all"])} listings')Étape 4: Détecter les baisses de prix à partir de l'historique
Comparez les prix d'aujourd'hui avec l'instantané précédent. Alertez lorsqu'un produit descend en dessous d'un seuil ou présente une diminution significative en pourcentage.
def detect_drops(query: str, drop_threshold_pct: float = 10.0) -> list:
history = load_history()
snapshots = history.get(query, [])
if len(snapshots) < 2:
return []
prev = {p['title']: p['price'] for p in snapshots[-2]['prices']}
current = {p['title']: p['price'] for p in snapshots[-1]['prices']}
drops = []
for title, curr_price in current.items():
if title in prev and prev[title] > 0:
pct_change = ((curr_price - prev[title]) / prev[title]) * 100
if pct_change < -drop_threshold_pct:
drops.append({
'product': title,
'old_price': prev[title],
'new_price': curr_price,
'drop_pct': round(abs(pct_change), 1)
})
return drops
drops = detect_drops('Sony WH-1000XM5')
for d in drops:
print(f'PRICE DROP: {d["product"]} ${d["old_price"]} -> ${d["new_price"]} (-{d["drop_pct"]}%)')Étape 5: Exécuter une surveillance quotidienne pour tous les produits
Créez un script principal qui surveille plusieurs produits et rapporte tous les résultats. Planifiez-le avec cron pour une exécution quotidienne.
def daily_monitor(products: list) -> None:
print(f'Price monitor: {len(products)} products, {date.today()}')
all_drops = []
for product in products:
result = compare_prices(product)
save_snapshot(product, result['all'])
cheapest = result.get('cheapest')
if cheapest:
print(f' {product}: ${cheapest["price"]} ({cheapest["platform"]})')
drops = detect_drops(product)
all_drops.extend(drops)
if all_drops:
print(f'\n{len(all_drops)} price drops detected:')
for d in all_drops:
print(f' {d["product"]}: ${d["old_price"]} -> ${d["new_price"]} (-{d["drop_pct"]}%)')
cost = len(products) * 2 * 0.005 # 2 API calls per product
print(f'\nAPI cost: ${cost:.2f} ({len(products) * 2} credits)')
if __name__ == '__main__':
daily_monitor(['Sony WH-1000XM5', 'Apple AirPods Pro', 'Samsung Galaxy Buds'])Exemple Python
import os, requests, re, json
from datetime import date
API_KEY = os.environ['SCAVIO_API_KEY']
EP = 'https://api.scavio.dev/api/v1/search'
def search(body):
return requests.post(EP, headers={'x-api-key': API_KEY, 'Content-Type': 'application/json'}, json=body).json()
def parse_price(p):
try: return float(re.sub(r'[^0-9.]', '', str(p or '0')))
except: return 0.0
def compare(query):
amazon = search({'platform': 'amazon', 'query': query, 'marketplace': 'US'}).get('products', [])
google = search({'query': query, 'country_code': 'us', 'type': 'shopping'}).get('shopping_results', [])
all_p = [{'title': p.get('title','')[:50], 'price': parse_price(p.get('price')), 'src': 'amazon'} for p in amazon]
all_p += [{'title': p.get('title','')[:50], 'price': parse_price(p.get('price')), 'src': 'google'} for p in google]
return sorted([p for p in all_p if p['price'] > 0], key=lambda x: x['price'])
for q in ['Sony WH-1000XM5', 'AirPods Pro']:
results = compare(q)
if results:
print(f'{q}: ${results[0]["price"]} ({results[0]["src"]})')Exemple JavaScript
const API_KEY = process.env.SCAVIO_API_KEY;
const EP = 'https://api.scavio.dev/api/v1/search';
async function search(body) {
const r = await fetch(EP, {
method: 'POST',
headers: { 'x-api-key': API_KEY, 'Content-Type': 'application/json' },
body: JSON.stringify(body)
});
return r.json();
}
function parsePrice(p) {
return parseFloat(String(p || '0').replace(/[^0-9.]/g, '')) || 0;
}
async function compare(query) {
const [amazon, google] = await Promise.all([
search({ platform: 'amazon', query, marketplace: 'US' }),
search({ query, country_code: 'us', type: 'shopping' })
]);
const all = [
...(amazon.products || []).map(p => ({ title: p.title, price: parsePrice(p.price), src: 'amazon' })),
...(google.shopping_results || []).map(p => ({ title: p.title, price: parsePrice(p.price), src: 'google' }))
].filter(p => p.price > 0).sort((a, b) => a.price - b.price);
return all;
}
compare('Sony WH-1000XM5').then(r => {
if (r.length) console.log(`Cheapest: $${r[0].price} (${r[0].src})`);
});Sortie attendue
Price monitor: 3 products, 2026-05-13
Sony WH-1000XM5: $278.00 (amazon)
Apple AirPods Pro: $189.99 (google_shopping)
Samsung Galaxy Buds: $99.99 (amazon)
1 price drops detected:
Apple AirPods Pro: $199.99 -> $189.99 (-5.0%)
API cost: $0.03 (6 credits)