TikTok Dropshipping Data Pipeline
Build a dropshipping product research pipeline: trending hashtags, viral product videos, creator profiles, engagement patterns. Full Python code at ~$1/day.
A TikTok dropshipping product research pipeline monitors trending hashtags, identifies viral product videos, extracts creator profiles, and tracks engagement patterns over time. Built with the Scavio TikTok API, the full pipeline costs approximately $1/day for monitoring 200 hashtags with daily automated runs. Here is the complete implementation.
Pipeline architecture
The pipeline runs daily in four stages: hashtag monitoring (find trending content), video filtering (identify product-focused videos with high engagement), creator profiling (assess creator reach and reliability), and trend tracking (store signals over time to detect rising products before they peak).
import requests, os, sqlite3, json
from datetime import date, datetime
API_BASE = "https://api.scavio.dev/api/v1/tiktok"
HEADERS = {"Authorization": f"Bearer {os.environ['SCAVIO_API_KEY']}"}
# Pipeline cost breakdown:
# Stage 1: 200 hashtag searches = 200 credits = $1.00
# Stage 2: 50 video detail lookups = 50 credits = $0.25
# Stage 3: 20 creator profiles = 20 credits = $0.10
# Total daily: ~270 credits = $1.35/day = ~$40/monthStage 1: Hashtag monitoring
PRODUCT_HASHTAGS = [
"tiktokmademebuyit", "viralproduct", "amazonfinds",
"homedecor", "kitchengadgets", "cleaningtiktok",
"organizedhome", "techfinds", "fitnessgadgets",
"skincareproduct", "dropshipping2026", "trendingproduct",
"musthave", "budgetfinds", "giftideas",
# ... up to 200 hashtags for your niche
]
def stage1_hashtag_scan(hashtags: list) -> list:
"""Scan hashtags for high-engagement product videos."""
candidates = []
for hashtag in hashtags:
resp = requests.post(
f"{API_BASE}/search",
headers=HEADERS,
json={"query": hashtag, "type": "video"},
timeout=10,
)
if resp.status_code != 200:
continue
videos = resp.json().get("results", [])
for video in videos:
likes = video.get("likes", 0)
shares = video.get("shares", 0)
# Filter: high engagement + recent
if likes > 20000 and shares > 500:
candidates.append({
"video_id": video["id"],
"hashtag": hashtag,
"likes": likes,
"shares": shares,
"comments": video.get("comments", 0),
"creator_id": video.get("author_id", ""),
"description": video.get("description", ""),
"created_at": video.get("create_time", ""),
})
return candidatesStage 2: Video deep-dive
def stage2_video_analysis(candidates: list, top_n: int = 50) -> list:
"""Get detailed data on top candidate videos."""
# Sort by engagement score (likes + shares*3 + comments*2)
scored = sorted(
candidates,
key=lambda v: v["likes"] + v["shares"] * 3 + v["comments"] * 2,
reverse=True,
)[:top_n]
detailed = []
for video in scored:
resp = requests.post(
f"{API_BASE}/video",
headers=HEADERS,
json={"video_id": video["video_id"]},
timeout=10,
)
if resp.status_code != 200:
continue
detail = resp.json()
video.update({
"full_description": detail.get("description", ""),
"music": detail.get("music", {}).get("title", ""),
"hashtags_used": detail.get("hashtags", []),
"duration": detail.get("duration", 0),
"view_count": detail.get("views", 0),
})
# Calculate virality ratio
views = video.get("view_count", 1)
video["engagement_rate"] = (video["likes"] + video["shares"]) / max(views, 1)
detailed.append(video)
return detailedStage 3: Creator profiling
def stage3_creator_profiles(videos: list, top_n: int = 20) -> dict:
"""Profile top creators to assess reliability and reach."""
creator_ids = list(set(v["creator_id"] for v in videos if v["creator_id"]))[:top_n]
profiles = {}
for creator_id in creator_ids:
resp = requests.post(
f"{API_BASE}/user",
headers=HEADERS,
json={"user_id": creator_id},
timeout=10,
)
if resp.status_code != 200:
continue
profile = resp.json()
profiles[creator_id] = {
"username": profile.get("username", ""),
"followers": profile.get("followers", 0),
"following": profile.get("following", 0),
"total_likes": profile.get("total_likes", 0),
"video_count": profile.get("video_count", 0),
"verified": profile.get("verified", False),
"avg_engagement": profile.get("total_likes", 0) / max(profile.get("video_count", 1), 1),
}
return profilesStage 4: Trend tracking and storage
def stage4_store_and_track(videos: list, profiles: dict):
"""Store daily signals and detect rising trends."""
conn = sqlite3.connect("dropship_pipeline.db")
conn.execute("""
CREATE TABLE IF NOT EXISTS daily_signals (
id INTEGER PRIMARY KEY,
date TEXT,
video_id TEXT,
hashtag TEXT,
likes INTEGER,
shares INTEGER,
engagement_rate REAL,
creator_id TEXT,
creator_followers INTEGER,
description TEXT,
UNIQUE(date, video_id)
)
""")
today = str(date.today())
for video in videos:
creator = profiles.get(video["creator_id"], {})
try:
conn.execute(
"INSERT INTO daily_signals VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(today, video["video_id"], video["hashtag"],
video["likes"], video["shares"], video.get("engagement_rate", 0),
video["creator_id"], creator.get("followers", 0),
video.get("description", "")[:500]),
)
except sqlite3.IntegrityError:
pass
conn.commit()
return detect_rising_products(conn)
def detect_rising_products(conn) -> list:
"""Find products appearing across multiple days with growing engagement."""
query = """
SELECT hashtag, COUNT(DISTINCT date) as days_seen,
AVG(likes) as avg_likes, SUM(shares) as total_shares
FROM daily_signals
WHERE date >= date('now', '-7 days')
GROUP BY hashtag
HAVING days_seen >= 3
ORDER BY avg_likes DESC
LIMIT 10
"""
return conn.execute(query).fetchall()Running the full pipeline daily
def run_daily_pipeline():
"""Full daily pipeline execution."""
print(f"Pipeline run: {datetime.now().isoformat()}")
# Stage 1: Scan hashtags (~200 API calls)
candidates = stage1_hashtag_scan(PRODUCT_HASHTAGS)
print(f"Stage 1: {len(candidates)} candidate videos found")
# Stage 2: Deep-dive top 50 (~50 API calls)
detailed = stage2_video_analysis(candidates, top_n=50)
print(f"Stage 2: {len(detailed)} videos analyzed in detail")
# Stage 3: Profile top creators (~20 API calls)
profiles = stage3_creator_profiles(detailed, top_n=20)
print(f"Stage 3: {len(profiles)} creator profiles fetched")
# Stage 4: Store and detect trends (0 API calls)
rising = stage4_store_and_track(detailed, profiles)
print(f"Stage 4: {len(rising)} rising product trends detected")
return {"candidates": len(candidates), "rising_trends": rising}
if __name__ == "__main__":
run_daily_pipeline()Optimizing for cost
- Run Stage 1 with all 200 hashtags but only deep-dive the top 50 videos
- Cache creator profiles -- only re-fetch weekly, not daily
- Skip hashtags that consistently return zero high-engagement videos
- Use engagement thresholds to filter early: saves Stage 2 and 3 calls
At $1.35/day ($40/month), this pipeline gives you automated product research that runs while you sleep. Compare to manually browsing TikTok for hours daily or paying $79/month for a dashboard tool. The pipeline data feeds directly into your supplier research, ad creative planning, and inventory decisions.