MCP 服务器将 AI 代理与外部工具连接起来,但泄露的凭证暴露了该服务器背后的每个工具。本教程实现了凭证轮换、范围访问令牌和静态加密存储,因此即使单个密钥泄漏,您的 MCP 服务器也能保持安全。所有示例都使用 Scavio Search API 作为受保护资源。
前置条件
- 安装了加密库的 Python 3.11+
- 来自 https://scavio.dev 的 Scavio API 密钥
- MCP协议的基本了解
- 环境变量管理器(direnv 或 dotenv)
操作指南
步骤 1: 静态加密凭证
存储使用 Fernet 对称加密加密的 API 密钥。加密密钥位于环境变量中,而不位于代码或配置文件中。
from cryptography.fernet import Fernet
import os
import json
from pathlib import Path
CRED_FILE = Path("mcp_credentials.enc")
def get_cipher():
key = os.environ.get("MCP_ENCRYPTION_KEY")
if not key:
raise RuntimeError("MCP_ENCRYPTION_KEY not set")
return Fernet(key.encode())
def store_credential(name: str, value: str):
cipher = get_cipher()
creds = load_all_credentials()
creds[name] = cipher.encrypt(value.encode()).decode()
CRED_FILE.write_text(json.dumps(creds, indent=2))
def load_credential(name: str) -> str:
cipher = get_cipher()
creds = load_all_credentials()
encrypted = creds.get(name)
if not encrypted:
raise KeyError(f"Credential '{name}' not found")
return cipher.decrypt(encrypted.encode()).decode()
def load_all_credentials() -> dict:
if not CRED_FILE.exists():
return {}
return json.loads(CRED_FILE.read_text())步骤 2: 实施范围访问令牌
生成短期的、有范围的令牌,限制客户端可以调用哪些 MCP 工具。每个令牌都对允许的工具名称和到期时间戳进行编码。
import hashlib
import time
import secrets
ACTIVE_TOKENS: dict[str, dict] = {}
def create_scoped_token(
allowed_tools: list[str],
ttl_seconds: int = 3600
) -> str:
token = secrets.token_urlsafe(32)
ACTIVE_TOKENS[token] = {
"tools": set(allowed_tools),
"expires": time.time() + ttl_seconds,
"created": time.time()
}
return token
def validate_token(token: str, tool_name: str) -> bool:
entry = ACTIVE_TOKENS.get(token)
if not entry:
return False
if time.time() > entry["expires"]:
del ACTIVE_TOKENS[token]
return False
return tool_name in entry["tools"]
def revoke_token(token: str):
ACTIVE_TOKENS.pop(token, None)步骤 3: 添加自动凭证轮换
按计划轮换 Scavio API 密钥。轮换功能会重新加密新密钥、更新 MCP 服务器配置并记录轮换事件,而无需停机。
import logging
from datetime import datetime
logger = logging.getLogger("mcp_security")
ROTATION_LOG = Path("rotation_log.json")
def rotate_credential(name: str, new_value: str):
old_exists = name in load_all_credentials()
store_credential(name, new_value)
log_entry = {
"credential": name,
"rotated_at": datetime.now().isoformat(),
"action": "rotated" if old_exists else "created"
}
log_data = []
if ROTATION_LOG.exists():
log_data = json.loads(ROTATION_LOG.read_text())
log_data.append(log_entry)
ROTATION_LOG.write_text(json.dumps(log_data, indent=2))
logger.info(f"Credential '{name}' {log_entry['action']}")
def check_rotation_due(name: str, max_age_days: int = 30) -> bool:
if not ROTATION_LOG.exists():
return True
log_data = json.loads(ROTATION_LOG.read_text())
entries = [e for e in log_data if e["credential"] == name]
if not entries:
return True
last = datetime.fromisoformat(entries[-1]["rotated_at"])
age = (datetime.now() - last).days
return age >= max_age_days步骤 4: 将凭证连接到 MCP 服务器处理程序
在请求时加载加密凭证,验证作用域令牌,然后调用 Scavio API。凭证在内存中保存的时间绝不会超过单个请求。
import httpx
async def handle_mcp_tool_call(token: str, tool_name: str, params: dict) -> dict:
# Step 1: Validate scoped token
if not validate_token(token, tool_name):
return {"error": "Unauthorized or expired token"}
# Step 2: Load credential just-in-time
try:
api_key = load_credential("scavio_api_key")
except KeyError:
return {"error": "Credential not configured"}
# Step 3: Execute the search
if tool_name == "web_search":
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": api_key},
json={"query": params.get("query", ""), "num_results": 5}
)
resp.raise_for_status()
return {"result": resp.json()}
return {"error": f"Unknown tool: {tool_name}"}Python 示例
import os
import json
import secrets
import time
import asyncio
import httpx
from pathlib import Path
from cryptography.fernet import Fernet
# --- Setup ---
# Generate encryption key once: Fernet.generate_key().decode()
# export MCP_ENCRYPTION_KEY="your-fernet-key"
CRED_FILE = Path("mcp_credentials.enc")
ACTIVE_TOKENS: dict[str, dict] = {}
def get_cipher():
return Fernet(os.environ["MCP_ENCRYPTION_KEY"].encode())
def store_credential(name: str, value: str):
cipher = get_cipher()
creds = json.loads(CRED_FILE.read_text()) if CRED_FILE.exists() else {}
creds[name] = cipher.encrypt(value.encode()).decode()
CRED_FILE.write_text(json.dumps(creds, indent=2))
def load_credential(name: str) -> str:
cipher = get_cipher()
creds = json.loads(CRED_FILE.read_text())
return cipher.decrypt(creds[name].encode()).decode()
def create_scoped_token(tools: list[str], ttl: int = 3600) -> str:
token = secrets.token_urlsafe(32)
ACTIVE_TOKENS[token] = {"tools": set(tools), "expires": time.time() + ttl}
return token
def validate_token(token: str, tool: str) -> bool:
entry = ACTIVE_TOKENS.get(token)
if not entry or time.time() > entry["expires"]:
return False
return tool in entry["tools"]
async def secure_search(token: str, query: str) -> dict:
if not validate_token(token, "web_search"):
return {"error": "Unauthorized"}
api_key = load_credential("scavio_api_key")
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": api_key},
json={"query": query, "num_results": 5}
)
resp.raise_for_status()
return resp.json()
# Usage
store_credential("scavio_api_key", "your-api-key")
token = create_scoped_token(["web_search"], ttl=1800)
result = asyncio.run(secure_search(token, "MCP server security best practices 2026"))
print(f"Results: {len(result.get('results', []))}")JavaScript 示例
const crypto = require("crypto");
const ALGORITHM = "aes-256-gcm";
const activeTokens = new Map();
function encrypt(text, key) {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(ALGORITHM, Buffer.from(key, "hex"), iv);
let encrypted = cipher.update(text, "utf8", "hex");
encrypted += cipher.final("hex");
const tag = cipher.getAuthTag().toString("hex");
return iv.toString("hex") + ":" + tag + ":" + encrypted;
}
function decrypt(data, key) {
const [ivHex, tagHex, encrypted] = data.split(":");
const decipher = crypto.createDecipheriv(ALGORITHM, Buffer.from(key, "hex"), Buffer.from(ivHex, "hex"));
decipher.setAuthTag(Buffer.from(tagHex, "hex"));
let decrypted = decipher.update(encrypted, "hex", "utf8");
decrypted += decipher.final("utf8");
return decrypted;
}
function createScopedToken(tools, ttlSeconds = 3600) {
const token = crypto.randomBytes(32).toString("base64url");
activeTokens.set(token, { tools: new Set(tools), expires: Date.now() + ttlSeconds * 1000 });
return token;
}
function validateToken(token, tool) {
const entry = activeTokens.get(token);
if (!entry || Date.now() > entry.expires) return false;
return entry.tools.has(tool);
}
async function secureSearch(token, query) {
if (!validateToken(token, "web_search")) throw new Error("Unauthorized");
const encKey = process.env.MCP_ENCRYPTION_KEY;
const stored = JSON.parse(require("fs").readFileSync("mcp_credentials.enc", "utf8"));
const apiKey = decrypt(stored.scavio_api_key, encKey);
const resp = await fetch("https://api.scavio.dev/api/v1/search", {
method: "POST",
headers: { "x-api-key": apiKey, "Content-Type": "application/json" },
body: JSON.stringify({ query, num_results: 5 })
});
return resp.json();
}
const token = createScopedToken(["web_search"], 1800);
secureSearch(token, "MCP server security 2026").then(r => {
console.log("Results:", (r.results || []).length);
});预期输出
Results: 5