Overview
This workflow runs weekly to audit all connected MCP servers, inventory their exposed tools, categorize each by risk level, and flag any changes since the last audit. New tools are highlighted, high-risk capabilities are blocked, and an audit report is generated for security review. The workflow ensures that agent permission surfaces do not silently expand over time.
Trigger
Cron schedule (every Monday at 9:00 AM UTC)
Schedule
Runs every Monday at 9:00 AM UTC
Workflow Steps
Enumerate connected MCP servers
Read the list of MCP server endpoints from configuration and connect to each to fetch their tool manifests.
Inventory all exposed tools
Collect the complete list of tools each server exposes, including tool names, descriptions, and parameter schemas.
Categorize by risk level
Classify each tool as low, medium, or high risk based on keyword patterns in the tool name and description.
Diff against previous audit
Compare current tool inventory against last week's audit to detect new tools, removed tools, or changed descriptions.
Generate audit report
Compile findings into a structured report with new tools flagged, risk summary, and recommended actions.
Python Implementation
import json
from pathlib import Path
from datetime import datetime
HIGH_RISK = ["file", "database", "email", "exec", "shell", "write", "delete", "admin"]
MEDIUM_RISK = ["create", "update", "modify", "send", "post"]
def categorize_tool(tool_name: str, description: str = "") -> str:
combined = f"{tool_name} {description}".lower()
if any(kw in combined for kw in HIGH_RISK):
return "high"
if any(kw in combined for kw in MEDIUM_RISK):
return "medium"
return "low"
def audit_servers(servers: dict) -> dict:
inventory = []
for server_name, tools in servers.items():
for tool in tools:
name = tool if isinstance(tool, str) else tool.get("name", "")
desc = "" if isinstance(tool, str) else tool.get("description", "")
risk = categorize_tool(name, desc)
inventory.append({"server": server_name, "tool": name, "risk": risk})
return inventory
def diff_audits(current: list, previous: list) -> dict:
current_set = {(t["server"], t["tool"]) for t in current}
previous_set = {(t["server"], t["tool"]) for t in previous}
return {
"new_tools": [{"server": s, "tool": t} for s, t in current_set - previous_set],
"removed_tools": [{"server": s, "tool": t} for s, t in previous_set - current_set],
}
def run():
# Example: MCP server tool manifests
servers = {
"scavio_mcp": ["search_google", "search_youtube", "search_amazon", "search_walmart", "search_reddit", "search_tiktok"],
"filesystem_mcp": ["read_file", "write_file", "list_directory", "delete_file"],
"database_mcp": ["query_database", "create_record", "update_record"],
}
current = audit_servers(servers)
history_path = Path("mcp_audit_history.json")
previous = json.loads(history_path.read_text()) if history_path.exists() else []
changes = diff_audits(current, previous)
# Save current audit
history_path.write_text(json.dumps(current, indent=2))
# Generate report
risk_summary = {"high": 0, "medium": 0, "low": 0}
for tool in current:
risk_summary[tool["risk"]] += 1
date = datetime.utcnow().strftime("%Y-%m-%d")
report = {
"date": date,
"total_tools": len(current),
"risk_summary": risk_summary,
"new_tools": changes["new_tools"],
"removed_tools": changes["removed_tools"],
"high_risk_tools": [t for t in current if t["risk"] == "high"],
}
Path(f"mcp_audit_{date}.json").write_text(json.dumps(report, indent=2))
print(f"MCP Audit: {len(current)} tools ({risk_summary['high']} high, {risk_summary['medium']} medium, {risk_summary['low']} low)")
if changes["new_tools"]:
print(f" NEW: {len(changes['new_tools'])} tools added")
for t in changes["new_tools"]:
print(f" {t['server']}/{t['tool']}")
print(f" HIGH RISK: {len(report['high_risk_tools'])} tools")
for t in report["high_risk_tools"]:
print(f" {t['server']}/{t['tool']}")
if __name__ == "__main__":
run()JavaScript Implementation
function categorize(toolName) {
const lower = toolName.toLowerCase();
const high = ["file", "database", "email", "exec", "shell", "write", "delete"];
const med = ["create", "update", "modify", "send"];
if (high.some((k) => lower.includes(k))) return "high";
if (med.some((k) => lower.includes(k))) return "medium";
return "low";
}
const servers = {
scavio_mcp: ["search_google", "search_youtube", "search_amazon", "search_reddit", "search_tiktok"],
filesystem_mcp: ["read_file", "write_file", "list_directory", "delete_file"],
database_mcp: ["query_database", "create_record", "update_record"],
};
const inventory = [];
for (const [server, tools] of Object.entries(servers)) {
for (const tool of tools) inventory.push({ server, tool, risk: categorize(tool) });
}
const summary = { high: 0, medium: 0, low: 0 };
for (const t of inventory) summary[t.risk]++;
console.log(`MCP Audit: ${inventory.length} tools (${summary.high} high, ${summary.medium} medium, ${summary.low} low)`);
for (const t of inventory.filter((t) => t.risk === "high")) console.log(` HIGH: ${t.server}/${t.tool}`);Platforms Used
Web search with knowledge graph, PAA, and AI overviews
YouTube
Video search with transcripts and metadata
Amazon
Product search with prices, ratings, and reviews
Walmart
Product search with pricing and fulfillment data
Community, posts & threaded comments from any subreddit
TikTok
Trending video, creator, and product discovery