Les serveurs MCP relient les agents IA aux outils externes, mais une fuite de justificatif expose chaque outil derrière ce serveur. Ce tutoriel implémente la rotation des justificatifs, des jetons d'accès à portée limitée et un stockage chiffré au repos pour que votre serveur MCP reste sécurisé même si une seule clé fuit. Tous les exemples utilisent l'API Scavio Search comme ressource protégée.
Prérequis
- Python 3.11+ avec la bibliothèque cryptography installée
- Une clé API Scavio provenant de https://scavio.dev
- Compréhension de base du protocole MCP
- Un gestionnaire de variables d'environnement (direnv ou dotenv)
Parcours
Étape 1: Chiffrer les justificatifs au repos
Stockez les clés API chiffrées à l'aide du chiffrement symétrique Fernet. La clé de chiffrement réside dans une variable d'environnement, jamais dans le code ou les fichiers de configuration.
from cryptography.fernet import Fernet
import os
import json
from pathlib import Path
CRED_FILE = Path("mcp_credentials.enc")
def get_cipher():
key = os.environ.get("MCP_ENCRYPTION_KEY")
if not key:
raise RuntimeError("MCP_ENCRYPTION_KEY not set")
return Fernet(key.encode())
def store_credential(name: str, value: str):
cipher = get_cipher()
creds = load_all_credentials()
creds[name] = cipher.encrypt(value.encode()).decode()
CRED_FILE.write_text(json.dumps(creds, indent=2))
def load_credential(name: str) -> str:
cipher = get_cipher()
creds = load_all_credentials()
encrypted = creds.get(name)
if not encrypted:
raise KeyError(f"Credential '{name}' not found")
return cipher.decrypt(encrypted.encode()).decode()
def load_all_credentials() -> dict:
if not CRED_FILE.exists():
return {}
return json.loads(CRED_FILE.read_text())Étape 2: Implémenter des jetons d'accès à portée limitée
Générez des jetons à durée de vie limitée et à portée limitée qui limitent les outils MCP qu'un client peut invoquer. Chaque jeton encode les noms d'outils autorisés et un horodatage d'expiration.
import hashlib
import time
import secrets
ACTIVE_TOKENS: dict[str, dict] = {}
def create_scoped_token(
allowed_tools: list[str],
ttl_seconds: int = 3600
) -> str:
token = secrets.token_urlsafe(32)
ACTIVE_TOKENS[token] = {
"tools": set(allowed_tools),
"expires": time.time() + ttl_seconds,
"created": time.time()
}
return token
def validate_token(token: str, tool_name: str) -> bool:
entry = ACTIVE_TOKENS.get(token)
if not entry:
return False
if time.time() > entry["expires"]:
del ACTIVE_TOKENS[token]
return False
return tool_name in entry["tools"]
def revoke_token(token: str):
ACTIVE_TOKENS.pop(token, None)Étape 3: Ajouter une rotation automatique des justificatifs
Faites pivoter la clé API Scavio selon un calendrier. La fonction de rotation réchiffre la nouvelle clé, met à jour la configuration du serveur MCP et enregistre l'événement de rotation sans interruption de service.
import logging
from datetime import datetime
logger = logging.getLogger("mcp_security")
ROTATION_LOG = Path("rotation_log.json")
def rotate_credential(name: str, new_value: str):
old_exists = name in load_all_credentials()
store_credential(name, new_value)
log_entry = {
"credential": name,
"rotated_at": datetime.now().isoformat(),
"action": "rotated" if old_exists else "created"
}
log_data = []
if ROTATION_LOG.exists():
log_data = json.loads(ROTATION_LOG.read_text())
log_data.append(log_entry)
ROTATION_LOG.write_text(json.dumps(log_data, indent=2))
logger.info(f"Credential '{name}' {log_entry['action']}")
def check_rotation_due(name: str, max_age_days: int = 30) -> bool:
if not ROTATION_LOG.exists():
return True
log_data = json.loads(ROTATION_LOG.read_text())
entries = [e for e in log_data if e["credential"] == name]
if not entries:
return True
last = datetime.fromisoformat(entries[-1]["rotated_at"])
age = (datetime.now() - last).days
return age >= max_age_daysÉtape 4: Intégrer les justificatifs dans le gestionnaire du serveur MCP
Chargez le justificatif chiffré au moment de la requête, validez le jeton à portée limitée et appelez l'API Scavio. Les justificatifs ne sont jamais conservés en mémoire plus longtemps qu'une seule requête.
import httpx
async def handle_mcp_tool_call(token: str, tool_name: str, params: dict) -> dict:
# Step 1: Validate scoped token
if not validate_token(token, tool_name):
return {"error": "Unauthorized or expired token"}
# Step 2: Load credential just-in-time
try:
api_key = load_credential("scavio_api_key")
except KeyError:
return {"error": "Credential not configured"}
# Step 3: Execute the search
if tool_name == "web_search":
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": api_key},
json={"query": params.get("query", ""), "num_results": 5}
)
resp.raise_for_status()
return {"result": resp.json()}
return {"error": f"Unknown tool: {tool_name}"}Exemple Python
import os
import json
import secrets
import time
import asyncio
import httpx
from pathlib import Path
from cryptography.fernet import Fernet
# --- Setup ---
# Generate encryption key once: Fernet.generate_key().decode()
# export MCP_ENCRYPTION_KEY="your-fernet-key"
CRED_FILE = Path("mcp_credentials.enc")
ACTIVE_TOKENS: dict[str, dict] = {}
def get_cipher():
return Fernet(os.environ["MCP_ENCRYPTION_KEY"].encode())
def store_credential(name: str, value: str):
cipher = get_cipher()
creds = json.loads(CRED_FILE.read_text()) if CRED_FILE.exists() else {}
creds[name] = cipher.encrypt(value.encode()).decode()
CRED_FILE.write_text(json.dumps(creds, indent=2))
def load_credential(name: str) -> str:
cipher = get_cipher()
creds = json.loads(CRED_FILE.read_text())
return cipher.decrypt(creds[name].encode()).decode()
def create_scoped_token(tools: list[str], ttl: int = 3600) -> str:
token = secrets.token_urlsafe(32)
ACTIVE_TOKENS[token] = {"tools": set(tools), "expires": time.time() + ttl}
return token
def validate_token(token: str, tool: str) -> bool:
entry = ACTIVE_TOKENS.get(token)
if not entry or time.time() > entry["expires"]:
return False
return tool in entry["tools"]
async def secure_search(token: str, query: str) -> dict:
if not validate_token(token, "web_search"):
return {"error": "Unauthorized"}
api_key = load_credential("scavio_api_key")
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": api_key},
json={"query": query, "num_results": 5}
)
resp.raise_for_status()
return resp.json()
# Usage
store_credential("scavio_api_key", "your-api-key")
token = create_scoped_token(["web_search"], ttl=1800)
result = asyncio.run(secure_search(token, "MCP server security best practices 2026"))
print(f"Results: {len(result.get('results', []))}")Exemple JavaScript
const crypto = require("crypto");
const ALGORITHM = "aes-256-gcm";
const activeTokens = new Map();
function encrypt(text, key) {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(ALGORITHM, Buffer.from(key, "hex"), iv);
let encrypted = cipher.update(text, "utf8", "hex");
encrypted += cipher.final("hex");
const tag = cipher.getAuthTag().toString("hex");
return iv.toString("hex") + ":" + tag + ":" + encrypted;
}
function decrypt(data, key) {
const [ivHex, tagHex, encrypted] = data.split(":");
const decipher = crypto.createDecipheriv(ALGORITHM, Buffer.from(key, "hex"), Buffer.from(ivHex, "hex"));
decipher.setAuthTag(Buffer.from(tagHex, "hex"));
let decrypted = decipher.update(encrypted, "hex", "utf8");
decrypted += decipher.final("utf8");
return decrypted;
}
function createScopedToken(tools, ttlSeconds = 3600) {
const token = crypto.randomBytes(32).toString("base64url");
activeTokens.set(token, { tools: new Set(tools), expires: Date.now() + ttlSeconds * 1000 });
return token;
}
function validateToken(token, tool) {
const entry = activeTokens.get(token);
if (!entry || Date.now() > entry.expires) return false;
return entry.tools.has(tool);
}
async function secureSearch(token, query) {
if (!validateToken(token, "web_search")) throw new Error("Unauthorized");
const encKey = process.env.MCP_ENCRYPTION_KEY;
const stored = JSON.parse(require("fs").readFileSync("mcp_credentials.enc", "utf8"));
const apiKey = decrypt(stored.scavio_api_key, encKey);
const resp = await fetch("https://api.scavio.dev/api/v1/search", {
method: "POST",
headers: { "x-api-key": apiKey, "Content-Type": "application/json" },
body: JSON.stringify({ query, num_results: 5 })
});
return resp.json();
}
const token = createScopedToken(["web_search"], 1800);
secureSearch(token, "MCP server security 2026").then(r => {
console.log("Results:", (r.results || []).length);
});Sortie attendue
Results: 5