Deploying an MCP server that exposes search and data tools to AI agents requires proper security. Without API key validation, rate limiting, and audit logging, your server is vulnerable to abuse and cost overruns. This tutorial adds three security layers to an MCP data server: API key authentication with rotation support, per-key rate limiting to prevent cost spikes, and structured audit logging for compliance. The patterns apply to any MCP server that wraps the Scavio API or similar paid endpoints.
Prerequisites
- Python 3.9+ installed
- requests library installed
- A Scavio API key from scavio.dev
- Basic understanding of MCP server patterns
Walkthrough
Step 1: Add API key validation middleware
Create a key validation layer that checks incoming MCP requests against a list of authorized keys. Support multiple keys for different clients.
import os, requests, hashlib, time, json
from datetime import datetime
from collections import defaultdict
SCAVIO_KEY = os.environ['SCAVIO_API_KEY']
H = {'x-api-key': SCAVIO_KEY, 'Content-Type': 'application/json'}
# Key management
API_KEYS = {
'key_prod_abc123': {'name': 'production', 'rate_limit': 100, 'active': True},
'key_dev_xyz789': {'name': 'development', 'rate_limit': 20, 'active': True},
'key_old_retired': {'name': 'deprecated', 'rate_limit': 0, 'active': False},
}
def validate_key(api_key: str) -> dict:
key_config = API_KEYS.get(api_key)
if not key_config:
return {'valid': False, 'error': 'Unknown API key'}
if not key_config['active']:
return {'valid': False, 'error': 'API key deactivated'}
return {'valid': True, 'config': key_config}
# Test
print(validate_key('key_prod_abc123')) # Valid
print(validate_key('key_old_retired')) # Deactivated
print(validate_key('unknown_key')) # UnknownStep 2: Implement per-key rate limiting
Add a sliding window rate limiter that tracks requests per key. Block requests that exceed the configured limit.
class RateLimiter:
def __init__(self, window_seconds: int = 3600):
self.window = window_seconds
self.requests = defaultdict(list)
def check(self, key: str, limit: int) -> dict:
now = time.time()
# Clean old entries
self.requests[key] = [t for t in self.requests[key] if now - t < self.window]
current = len(self.requests[key])
if current >= limit:
return {'allowed': False, 'current': current, 'limit': limit,
'retry_after': int(self.requests[key][0] + self.window - now)}
self.requests[key].append(now)
return {'allowed': True, 'current': current + 1, 'limit': limit, 'remaining': limit - current - 1}
rate_limiter = RateLimiter()
# Test rate limiting
for i in range(5):
result = rate_limiter.check('key_dev_xyz789', limit=3)
print(f'Request {i+1}: {"allowed" if result["allowed"] else "BLOCKED"} ({result["current"]}/{result["limit"]})')Step 3: Add audit logging
Log every MCP tool call with timestamp, key identity, tool name, and cost. This provides an audit trail for compliance and cost tracking.
AUDIT_LOG = 'mcp_audit.jsonl'
def log_request(api_key: str, tool_name: str, args: dict, result_size: int, cost: float):
key_config = API_KEYS.get(api_key, {})
entry = {
'timestamp': datetime.now().isoformat(),
'key_name': key_config.get('name', 'unknown'),
'key_hash': hashlib.sha256(api_key.encode()).hexdigest()[:12],
'tool': tool_name,
'args': {k: str(v)[:50] for k, v in args.items()},
'result_bytes': result_size,
'cost_usd': cost,
}
with open(AUDIT_LOG, 'a') as f:
f.write(json.dumps(entry) + '\n')
return entry
# Test
entry = log_request('key_prod_abc123', 'web_search', {'query': 'test'}, 1200, 0.005)
print(f'Logged: {entry["key_name"]} called {entry["tool"]} (${entry["cost_usd"]})')Step 4: Wire security into the MCP request handler
Combine key validation, rate limiting, and audit logging into a single request handler that wraps all MCP tool calls.
def secure_mcp_handler(api_key: str, tool_name: str, args: dict) -> dict:
# Step 1: Validate key
key_check = validate_key(api_key)
if not key_check['valid']:
return {'error': key_check['error'], 'status': 401}
config = key_check['config']
# Step 2: Rate limit
rate_check = rate_limiter.check(api_key, config['rate_limit'])
if not rate_check['allowed']:
return {'error': 'Rate limit exceeded', 'retry_after': rate_check.get('retry_after'), 'status': 429}
# Step 3: Execute the tool
try:
if tool_name == 'web_search':
resp = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'query': args.get('query', ''), 'country_code': 'us', 'num_results': 5})
result = resp.json()
cost = 0.005
else:
return {'error': f'Unknown tool: {tool_name}', 'status': 400}
except Exception as e:
return {'error': str(e), 'status': 500}
# Step 4: Audit log
log_request(api_key, tool_name, args, len(json.dumps(result)), cost)
return {'result': result, 'status': 200, 'remaining_requests': rate_check['remaining']}
# Test the full flow
result = secure_mcp_handler('key_prod_abc123', 'web_search', {'query': 'test search'})
print(f'Status: {result["status"]}, Remaining: {result.get("remaining_requests")}')Python Example
import os, requests, time, json, hashlib
from collections import defaultdict
SCAVIO_KEY = os.environ['SCAVIO_API_KEY']
H = {'x-api-key': SCAVIO_KEY, 'Content-Type': 'application/json'}
KEYS = {'key_prod': {'limit': 100, 'active': True}, 'key_dev': {'limit': 10, 'active': True}}
requests_log = defaultdict(list)
def secure_search(api_key, query):
if api_key not in KEYS or not KEYS[api_key]['active']:
return {'error': 'Invalid key'}
now = time.time()
requests_log[api_key] = [t for t in requests_log[api_key] if now - t < 3600]
if len(requests_log[api_key]) >= KEYS[api_key]['limit']:
return {'error': 'Rate limited'}
requests_log[api_key].append(now)
resp = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'query': query, 'country_code': 'us', 'num_results': 5})
print(f'[{api_key}] {query}: {len(resp.json().get("organic_results", []))} results')
return resp.json()
secure_search('key_prod', 'test query')
secure_search('invalid', 'test') # RejectedJavaScript Example
const SCAVIO_KEY = process.env.SCAVIO_API_KEY;
const KEYS = { key_prod: { limit: 100, active: true }, key_dev: { limit: 10, active: true } };
const requestLog = {};
async function secureSearch(apiKey, query) {
if (!KEYS[apiKey]?.active) return { error: 'Invalid key' };
const now = Date.now();
requestLog[apiKey] = (requestLog[apiKey] || []).filter(t => now - t < 3600000);
if (requestLog[apiKey].length >= KEYS[apiKey].limit) return { error: 'Rate limited' };
requestLog[apiKey].push(now);
const resp = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST',
headers: { 'x-api-key': SCAVIO_KEY, 'Content-Type': 'application/json' },
body: JSON.stringify({ query, country_code: 'us', num_results: 5 })
});
const data = await resp.json();
console.log(`[${apiKey}] ${query}: ${(data.organic_results || []).length} results`);
return data;
}
secureSearch('key_prod', 'test').then(console.log);Expected Output
validate: {'valid': True, 'config': {'name': 'production', 'rate_limit': 100}}
validate: {'valid': False, 'error': 'API key deactivated'}
validate: {'valid': False, 'error': 'Unknown API key'}
Request 1: allowed (1/3)
Request 2: allowed (2/3)
Request 3: allowed (3/3)
Request 4: BLOCKED (3/3)
Logged: production called web_search ($0.005)
Status: 200, Remaining: 98