Déployer un serveur MCP qui expose des outils de recherche et de données aux agents IA nécessite une sécurité appropriée. Sans validation de clé API, limitation de débit et journalisation d'audit, votre serveur est vulnérable aux abus et aux dépassements de coûts. Ce tutoriel ajoute trois couches de sécurité à un serveur de données MCP : authentification par clé API avec prise en charge de la rotation, limitation de débit par clé pour éviter les pics de coûts, et journalisation d'audit structurée pour la conformité. Les modèles s'appliquent à tout serveur MCP qui encapsule l'API Scavio ou des points de terminaison payants similaires.
Prérequis
- Python 3.9+ installé
- bibliothèque requests installée
- Une clé API Scavio depuis scavio.dev
- Compréhension de base des modèles de serveur MCP
Parcours
Étape 1: Ajouter un middleware de validation de clé API
Créez une couche de validation de clé qui vérifie les requêtes MCP entrantes par rapport à une liste de clés autorisées. Prenez en charge plusieurs clés pour différents clients.
import os, requests, hashlib, time, json
from datetime import datetime
from collections import defaultdict
SCAVIO_KEY = os.environ['SCAVIO_API_KEY']
H = {'x-api-key': SCAVIO_KEY, 'Content-Type': 'application/json'}
# Key management
API_KEYS = {
'key_prod_abc123': {'name': 'production', 'rate_limit': 100, 'active': True},
'key_dev_xyz789': {'name': 'development', 'rate_limit': 20, 'active': True},
'key_old_retired': {'name': 'deprecated', 'rate_limit': 0, 'active': False},
}
def validate_key(api_key: str) -> dict:
key_config = API_KEYS.get(api_key)
if not key_config:
return {'valid': False, 'error': 'Unknown API key'}
if not key_config['active']:
return {'valid': False, 'error': 'API key deactivated'}
return {'valid': True, 'config': key_config}
# Test
print(validate_key('key_prod_abc123')) # Valid
print(validate_key('key_old_retired')) # Deactivated
print(validate_key('unknown_key')) # UnknownÉtape 2: Implémenter la limitation de débit par clé
Ajoutez un limiteur de débit à fenêtre glissante qui suit les requêtes par clé. Bloquez les requêtes qui dépassent la limite configurée.
class RateLimiter:
def __init__(self, window_seconds: int = 3600):
self.window = window_seconds
self.requests = defaultdict(list)
def check(self, key: str, limit: int) -> dict:
now = time.time()
# Clean old entries
self.requests[key] = [t for t in self.requests[key] if now - t < self.window]
current = len(self.requests[key])
if current >= limit:
return {'allowed': False, 'current': current, 'limit': limit,
'retry_after': int(self.requests[key][0] + self.window - now)}
self.requests[key].append(now)
return {'allowed': True, 'current': current + 1, 'limit': limit, 'remaining': limit - current - 1}
rate_limiter = RateLimiter()
# Test rate limiting
for i in range(5):
result = rate_limiter.check('key_dev_xyz789', limit=3)
print(f'Request {i+1}: {"allowed" if result["allowed"] else "BLOCKED"} ({result["current"]}/{result["limit"]})')Étape 3: Ajouter la journalisation d'audit
Journalisez chaque appel d'outil MCP avec horodatage, identité de la clé, nom de l'outil et coût. Cela fournit une piste d'audit pour la conformité et le suivi des coûts.
AUDIT_LOG = 'mcp_audit.jsonl'
def log_request(api_key: str, tool_name: str, args: dict, result_size: int, cost: float):
key_config = API_KEYS.get(api_key, {})
entry = {
'timestamp': datetime.now().isoformat(),
'key_name': key_config.get('name', 'unknown'),
'key_hash': hashlib.sha256(api_key.encode()).hexdigest()[:12],
'tool': tool_name,
'args': {k: str(v)[:50] for k, v in args.items()},
'result_bytes': result_size,
'cost_usd': cost,
}
with open(AUDIT_LOG, 'a') as f:
f.write(json.dumps(entry) + '\n')
return entry
# Test
entry = log_request('key_prod_abc123', 'web_search', {'query': 'test'}, 1200, 0.005)
print(f'Logged: {entry["key_name"]} called {entry["tool"]} (${entry["cost_usd"]})')Étape 4: Intégrer la sécurité dans le gestionnaire de requêtes MCP
Combinez la validation de clé, la limitation de débit et la journalisation d'audit en un seul gestionnaire de requêtes qui encapsule tous les appels d'outils MCP.
def secure_mcp_handler(api_key: str, tool_name: str, args: dict) -> dict:
# Step 1: Validate key
key_check = validate_key(api_key)
if not key_check['valid']:
return {'error': key_check['error'], 'status': 401}
config = key_check['config']
# Step 2: Rate limit
rate_check = rate_limiter.check(api_key, config['rate_limit'])
if not rate_check['allowed']:
return {'error': 'Rate limit exceeded', 'retry_after': rate_check.get('retry_after'), 'status': 429}
# Step 3: Execute the tool
try:
if tool_name == 'web_search':
resp = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'query': args.get('query', ''), 'country_code': 'us', 'num_results': 5})
result = resp.json()
cost = 0.005
else:
return {'error': f'Unknown tool: {tool_name}', 'status': 400}
except Exception as e:
return {'error': str(e), 'status': 500}
# Step 4: Audit log
log_request(api_key, tool_name, args, len(json.dumps(result)), cost)
return {'result': result, 'status': 200, 'remaining_requests': rate_check['remaining']}
# Test the full flow
result = secure_mcp_handler('key_prod_abc123', 'web_search', {'query': 'test search'})
print(f'Status: {result["status"]}, Remaining: {result.get("remaining_requests")}')Exemple Python
import os, requests, time, json, hashlib
from collections import defaultdict
SCAVIO_KEY = os.environ['SCAVIO_API_KEY']
H = {'x-api-key': SCAVIO_KEY, 'Content-Type': 'application/json'}
KEYS = {'key_prod': {'limit': 100, 'active': True}, 'key_dev': {'limit': 10, 'active': True}}
requests_log = defaultdict(list)
def secure_search(api_key, query):
if api_key not in KEYS or not KEYS[api_key]['active']:
return {'error': 'Invalid key'}
now = time.time()
requests_log[api_key] = [t for t in requests_log[api_key] if now - t < 3600]
if len(requests_log[api_key]) >= KEYS[api_key]['limit']:
return {'error': 'Rate limited'}
requests_log[api_key].append(now)
resp = requests.post('https://api.scavio.dev/api/v1/search', headers=H,
json={'query': query, 'country_code': 'us', 'num_results': 5})
print(f'[{api_key}] {query}: {len(resp.json().get("organic_results", []))} results')
return resp.json()
secure_search('key_prod', 'test query')
secure_search('invalid', 'test') # RejectedExemple JavaScript
const SCAVIO_KEY = process.env.SCAVIO_API_KEY;
const KEYS = { key_prod: { limit: 100, active: true }, key_dev: { limit: 10, active: true } };
const requestLog = {};
async function secureSearch(apiKey, query) {
if (!KEYS[apiKey]?.active) return { error: 'Invalid key' };
const now = Date.now();
requestLog[apiKey] = (requestLog[apiKey] || []).filter(t => now - t < 3600000);
if (requestLog[apiKey].length >= KEYS[apiKey].limit) return { error: 'Rate limited' };
requestLog[apiKey].push(now);
const resp = await fetch('https://api.scavio.dev/api/v1/search', {
method: 'POST',
headers: { 'x-api-key': SCAVIO_KEY, 'Content-Type': 'application/json' },
body: JSON.stringify({ query, country_code: 'us', num_results: 5 })
});
const data = await resp.json();
console.log(`[${apiKey}] ${query}: ${(data.organic_results || []).length} results`);
return data;
}
secureSearch('key_prod', 'test').then(console.log);Sortie attendue
validate: {'valid': True, 'config': {'name': 'production', 'rate_limit': 100}}
validate: {'valid': False, 'error': 'API key deactivated'}
validate: {'valid': False, 'error': 'Unknown API key'}
Request 1: allowed (1/3)
Request 2: allowed (2/3)
Request 3: allowed (3/3)
Request 4: BLOCKED (3/3)
Logged: production called web_search ($0.005)
Status: 200, Remaining: 98