Workflow

Trading Agent Data Pipeline

Hourly pipeline that aggregates news, Reddit sentiment, and market commentary into a unified context for trading agent decisions.

Overview

This workflow runs hourly to assemble a complete market context for trading agents. It pulls breaking news from Google, sentiment from Reddit financial communities, and analysis from YouTube market commentary. The aggregated context feeds directly into trading decision logic, giving agents a multi-source view that would take a human analyst hours to compile manually.

Trigger

Cron schedule (every hour during market hours)

Schedule

Runs every hour during market hours (9:30 AM - 4 PM ET)

Workflow Steps

1

Define active tickers

Load the list of tickers the trading agent is currently monitoring.

2

Fetch breaking news

Search Google News for each ticker to get the latest headlines and developments.

3

Pull Reddit sentiment

Search Reddit financial subreddits for discussion threads on each ticker.

4

Get YouTube commentary

Search YouTube for recent market analysis videos mentioning the ticker.

5

Aggregate into context object

Merge news, sentiment, and commentary into a single structured context per ticker.

6

Feed to trading agent

Write the context to the agent's input queue or shared state for decision making.

Python Implementation

Python
import requests
import json
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from pathlib import Path

API_KEY = "your_scavio_api_key"
TICKERS = ["NVDA", "AAPL", "TSLA", "MSFT", "AMZN"]

def search(platform: str, query: str) -> list[dict]:
    res = requests.post(
        "https://api.scavio.dev/api/v1/search",
        headers={"x-api-key": API_KEY},
        json={"platform": platform, "query": query},
        timeout=10,
    )
    res.raise_for_status()
    return res.json().get("organic", [])[:5]

def fetch_ticker_context(ticker: str) -> dict:
    news_query = f"{ticker} stock news today"
    reddit_query = f"{ticker} stock discussion"
    youtube_query = f"{ticker} market analysis"

    with ThreadPoolExecutor(max_workers=3) as pool:
        news_future = pool.submit(search, "google", news_query)
        reddit_future = pool.submit(search, "reddit", reddit_query)
        youtube_future = pool.submit(search, "youtube", youtube_query)

    return {
        "ticker": ticker,
        "timestamp": datetime.utcnow().isoformat(),
        "news": [{"title": r.get("title", ""), "snippet": r.get("snippet", ""), "link": r.get("link", "")} for r in news_future.result()],
        "reddit": [{"title": r.get("title", ""), "score": r.get("score", 0), "subreddit": r.get("subreddit", "")} for r in reddit_future.result()],
        "youtube": [{"title": r.get("title", ""), "channel": r.get("channel", ""), "views": r.get("views", 0)} for r in youtube_future.result()],
    }

def run():
    contexts = []
    for ticker in TICKERS:
        ctx = fetch_ticker_context(ticker)
        contexts.append(ctx)

    output = {
        "run_time": datetime.utcnow().isoformat(),
        "tickers": contexts,
    }

    # Write to agent input queue
    output_path = Path("trading_context.json")
    output_path.write_text(json.dumps(output, indent=2))

    print(f"Context assembled for {len(TICKERS)} tickers at {output['run_time']}")
    for ctx in contexts:
        print(f"  {ctx['ticker']}: {len(ctx['news'])} news, {len(ctx['reddit'])} reddit, {len(ctx['youtube'])} youtube")

if __name__ == "__main__":
    run()

JavaScript Implementation

JavaScript
const API_KEY = "your_scavio_api_key";
const TICKERS = ["NVDA", "AAPL", "TSLA", "MSFT", "AMZN"];

async function search(platform, query) {
  const res = await fetch("https://api.scavio.dev/api/v1/search", {
    method: "POST",
    headers: { "x-api-key": API_KEY, "content-type": "application/json" },
    body: JSON.stringify({ platform, query }),
  });
  if (!res.ok) throw new Error(`scavio ${res.status}`);
  return ((await res.json()).organic ?? []).slice(0, 5);
}

async function fetchTickerContext(ticker) {
  const [news, reddit, youtube] = await Promise.all([
    search("google", `${ticker} stock news today`),
    search("reddit", `${ticker} stock discussion`),
    search("youtube", `${ticker} market analysis`),
  ]);

  return {
    ticker,
    timestamp: new Date().toISOString(),
    news: news.map((r) => ({ title: r.title ?? "", snippet: r.snippet ?? "", link: r.link ?? "" })),
    reddit: reddit.map((r) => ({ title: r.title ?? "", score: r.score ?? 0, subreddit: r.subreddit ?? "" })),
    youtube: youtube.map((r) => ({ title: r.title ?? "", channel: r.channel ?? "", views: r.views ?? 0 })),
  };
}

async function run() {
  const fs = await import("fs/promises");
  const contexts = await Promise.all(TICKERS.map(fetchTickerContext));

  const output = { runTime: new Date().toISOString(), tickers: contexts };
  await fs.writeFile("trading_context.json", JSON.stringify(output, null, 2));

  console.log(`Context assembled for ${TICKERS.length} tickers at ${output.runTime}`);
  for (const ctx of contexts) {
    console.log(`  ${ctx.ticker}: ${ctx.news.length} news, ${ctx.reddit.length} reddit, ${ctx.youtube.length} youtube`);
  }
}

run();

Platforms Used

Google

Web search with knowledge graph, PAA, and AI overviews

YouTube

Video search with transcripts and metadata

Reddit

Community, posts & threaded comments from any subreddit

Frequently Asked Questions

This workflow runs hourly to assemble a complete market context for trading agents. It pulls breaking news from Google, sentiment from Reddit financial communities, and analysis from YouTube market commentary. The aggregated context feeds directly into trading decision logic, giving agents a multi-source view that would take a human analyst hours to compile manually.

This workflow uses a cron schedule (every hour during market hours). Runs every hour during market hours (9:30 AM - 4 PM ET).

This workflow uses the following Scavio platforms: google, youtube, reddit. Each platform is called via the same unified API endpoint.

Yes. Scavio's free tier includes 250 credits per month with no credit card required. That is enough to test and validate this workflow before scaling it.

Trading Agent Data Pipeline

Hourly pipeline that aggregates news, Reddit sentiment, and market commentary into a unified context for trading agent decisions.