部署向 AI 代理公开搜索和数据工具的 MCP 服务器需要适当的安全性。如果没有 API 密钥验证、速率限制和审核日志记录,您的服务器很容易受到滥用和成本超支。本教程向 MCP 数据服务器添加了三个安全层:具有轮换支持的 API 密钥身份验证、用于防止成本激增的每个密钥速率限制以及用于合规性的结构化审核日志记录。这些模式适用于包装 Scavio API 或类似付费端点的任何 MCP 服务器。
前置条件
- 已安装 Python 3.9+
- 请求已安装库
- 来自 scavio.dev 的 Scavio API 密钥
- 对 MCP 服务器模式的基本了解
操作指南
步骤 1: 添加API密钥验证中间件
创建一个密钥验证层,根据授权密钥列表检查传入的 MCP 请求。支持不同客户端的多个密钥。
import os, requests, hashlib, time, json
from datetime import datetime
from collections import defaultdict
SCAVIO_KEY = os.environ['SCAVIO_API_KEY']
H = {'x-api-key': SCAVIO_KEY, 'Content-Type': 'application/json'}
# Key management
API_KEYS = {
'key_prod_abc123': {'name': 'production', 'rate_limit': 100, 'active': True},
'key_dev_xyz789': {'name': 'development', 'rate_limit': 20, 'active': True},
'key_old_retired': {'name': 'deprecated', 'rate_limit': 0, 'active': False},
}
def validate_key(api_key: str) -> dict:
key_config = API_KEYS.get(api_key)
if not key_config:
return {'valid': False, 'error': 'Unknown API key'}
if not key_config['active']:
return {'valid': False, 'error': 'API key deactivated'}
return {'valid': True, 'config': key_config}
# Test
print(validate_key('key_prod_abc123')) # Valid
print(validate_key('key_old_retired')) # Deactivated
print(validate_key('unknown_key')) # Unknown步骤 2: 实施每键速率限制
添加一个滑动窗口速率限制器来跟踪每个键的请求。阻止超出配置限制的请求。
class RateLimiter:
def __init__(self, window_seconds: int = 3600):
self.window = window_seconds
self.requests = defaultdict(list)
def check(self, key: str, limit: int) -> dict:
now = time.time()
# Clean old entries
self.requests[key] = [t for t in self.requests[key] if now - t < self.window]
current = len(self.requests[key])
if current >= limit:
return {'allowed': False, 'current': current, 'limit': limit,
'retry_after': int(self.requests[key][0] + self.window - now)}
self.requests[key].append(now)
return {'allowed': True, 'current': current + 1, 'limit': limit, 'remaining': limit - current - 1}
rate_limiter = RateLimiter()
# Test rate limiting
for i in range(5):
result = rate_limiter.check('key_dev_xyz789', limit=3)
print(f'Request {i+1}: {"allowed" if result["allowed"] else "BLOCKED"} ({result["current"]}/{result["limit"]})')步骤 3: 添加审核日志记录
记录每个 MCP 工具调用的时间戳、密钥身份、工具名称和成本。这为合规性和成本跟踪提供了审计跟踪。
AUDIT_LOG = 'mcp_audit.jsonl'
def log_request(api_key: str, tool_name: str, args: dict, result_size: int, cost: float):
key_config = API_KEYS.get(api_key, {})
entry = {
'timestamp': datetime.now().isoformat(),
'key_name': key_config.get('name', 'unknown'),
'key_hash': hashlib.sha256(api_key.encode()).hexdigest()[:12],
'tool': tool_name,
'args': {k: str(v)[:50] for k, v in args.items()},
'result_bytes': result_size,
'cost_usd': cost,
}
with open(AUDIT_LOG, 'a') as f:
f.write(json.dumps(entry) + '\n')
return entry
# Test
entry = log_request('key_prod_abc123', 'web_search', {'query': 'test'}, 1200, 0.005)
print(f'Logged: {entry["key_name"]} called {entry["tool"]} (${entry["cost_usd"]})')步骤 4: 将安全性连接到 MCP 请求处理程序中
将密钥验证、速率限制和审核日志记录合并到一个包装所有 MCP 工具调用的请求处理程序中。
def secure_mcp_handler(api_key: str, tool_name: str, args: dict) -> dict:
# Step 1: Validate key
key_check = validate_key(api_key)
if not key_check['valid']:
return {'error': key_check['error'], 'status': 401}
config = key_check['config']
# Step 2: Rate limit
rate_check = rate_limiter.check(api_key, config['rate_limit'])
if not rate_check['allowed']:
return {'error': 'Rate limit exceeded', 'retry_after': rate_check.get('retry_after'), 'status': 429}
# Step 3: Execute the tool
try:
if tool_name == 'web_search':
resp = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'query': args.get('query', ''), 'country_code': 'us', 'num_results': 5})
result = resp.json()
cost = 0.005
else:
return {'error': f'Unknown tool: {tool_name}', 'status': 400}
except Exception as e:
return {'error': str(e), 'status': 500}
# Step 4: Audit log
log_request(api_key, tool_name, args, len(json.dumps(result)), cost)
return {'result': result, 'status': 200, 'remaining_requests': rate_check['remaining']}
# Test the full flow
result = secure_mcp_handler('key_prod_abc123', 'web_search', {'query': 'test search'})
print(f'Status: {result["status"]}, Remaining: {result.get("remaining_requests")}')Python 示例
import os, requests, time, json, hashlib
from collections import defaultdict
SCAVIO_KEY = os.environ['SCAVIO_API_KEY']
H = {'x-api-key': SCAVIO_KEY, 'Content-Type': 'application/json'}
KEYS = {'key_prod': {'limit': 100, 'active': True}, 'key_dev': {'limit': 10, 'active': True}}
requests_log = defaultdict(list)
def secure_search(api_key, query):
if api_key not in KEYS or not KEYS[api_key]['active']:
return {'error': 'Invalid key'}
now = time.time()
requests_log[api_key] = [t for t in requests_log[api_key] if now - t < 3600]
if len(requests_log[api_key]) >= KEYS[api_key]['limit']:
return {'error': 'Rate limited'}
requests_log[api_key].append(now)
resp = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'query': query, 'country_code': 'us', 'num_results': 5})
print(f'[{api_key}] {query}: {len(resp.json().get("organic_results", []))} results')
return resp.json()
secure_search('key_prod', 'test query')
secure_search('invalid', 'test') # RejectedJavaScript 示例
const SCAVIO_KEY = process.env.SCAVIO_API_KEY;
const KEYS = { key_prod: { limit: 100, active: true }, key_dev: { limit: 10, active: true } };
const requestLog = {};
async function secureSearch(apiKey, query) {
if (!KEYS[apiKey]?.active) return { error: 'Invalid key' };
const now = Date.now();
requestLog[apiKey] = (requestLog[apiKey] || []).filter(t => now - t < 3600000);
if (requestLog[apiKey].length >= KEYS[apiKey].limit) return { error: 'Rate limited' };
requestLog[apiKey].push(now);
const resp = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST',
headers: { 'x-api-key': SCAVIO_KEY, 'Content-Type': 'application/json' },
body: JSON.stringify({ query, country_code: 'us', num_results: 5 })
});
const data = await resp.json();
console.log(`[${apiKey}] ${query}: ${(data.organic_results || []).length} results`);
return data;
}
secureSearch('key_prod', 'test').then(console.log);预期输出
validate: {'valid': True, 'config': {'name': 'production', 'rate_limit': 100}}
validate: {'valid': False, 'error': 'API key deactivated'}
validate: {'valid': False, 'error': 'Unknown API key'}
Request 1: allowed (1/3)
Request 2: allowed (2/3)
Request 3: allowed (3/3)
Request 4: BLOCKED (3/3)
Logged: production called web_search ($0.005)
Status: 200, Remaining: 98