MCP servers bridge AI agents to external tools, but a leaked credential exposes every tool behind that server. This tutorial implements credential rotation, scoped access tokens, and encrypted-at-rest storage so your MCP server stays secure even if a single key leaks. All examples use the Scavio Search API as the protected resource.
Prerequisites
- Python 3.11+ with cryptography library installed
- A Scavio API key from https://scavio.dev
- Basic understanding of the MCP protocol
- An environment variable manager (direnv or dotenv)
Walkthrough
Step 1: Encrypt credentials at rest
Store API keys encrypted using Fernet symmetric encryption. The encryption key lives in an environment variable, never in code or config files.
from cryptography.fernet import Fernet
import os
import json
from pathlib import Path
CRED_FILE = Path("mcp_credentials.enc")
def get_cipher():
key = os.environ.get("MCP_ENCRYPTION_KEY")
if not key:
raise RuntimeError("MCP_ENCRYPTION_KEY not set")
return Fernet(key.encode())
def store_credential(name: str, value: str):
cipher = get_cipher()
creds = load_all_credentials()
creds[name] = cipher.encrypt(value.encode()).decode()
CRED_FILE.write_text(json.dumps(creds, indent=2))
def load_credential(name: str) -> str:
cipher = get_cipher()
creds = load_all_credentials()
encrypted = creds.get(name)
if not encrypted:
raise KeyError(f"Credential '{name}' not found")
return cipher.decrypt(encrypted.encode()).decode()
def load_all_credentials() -> dict:
if not CRED_FILE.exists():
return {}
return json.loads(CRED_FILE.read_text())Step 2: Implement scoped access tokens
Generate short-lived, scoped tokens that limit which MCP tools a client can invoke. Each token encodes allowed tool names and an expiry timestamp.
import hashlib
import time
import secrets
ACTIVE_TOKENS: dict[str, dict] = {}
def create_scoped_token(
allowed_tools: list[str],
ttl_seconds: int = 3600
) -> str:
token = secrets.token_urlsafe(32)
ACTIVE_TOKENS[token] = {
"tools": set(allowed_tools),
"expires": time.time() + ttl_seconds,
"created": time.time()
}
return token
def validate_token(token: str, tool_name: str) -> bool:
entry = ACTIVE_TOKENS.get(token)
if not entry:
return False
if time.time() > entry["expires"]:
del ACTIVE_TOKENS[token]
return False
return tool_name in entry["tools"]
def revoke_token(token: str):
ACTIVE_TOKENS.pop(token, None)Step 3: Add automatic credential rotation
Rotate the Scavio API key on a schedule. The rotation function re-encrypts the new key, updates the MCP server config, and logs the rotation event without downtime.
import logging
from datetime import datetime
logger = logging.getLogger("mcp_security")
ROTATION_LOG = Path("rotation_log.json")
def rotate_credential(name: str, new_value: str):
old_exists = name in load_all_credentials()
store_credential(name, new_value)
log_entry = {
"credential": name,
"rotated_at": datetime.now().isoformat(),
"action": "rotated" if old_exists else "created"
}
log_data = []
if ROTATION_LOG.exists():
log_data = json.loads(ROTATION_LOG.read_text())
log_data.append(log_entry)
ROTATION_LOG.write_text(json.dumps(log_data, indent=2))
logger.info(f"Credential '{name}' {log_entry['action']}")
def check_rotation_due(name: str, max_age_days: int = 30) -> bool:
if not ROTATION_LOG.exists():
return True
log_data = json.loads(ROTATION_LOG.read_text())
entries = [e for e in log_data if e["credential"] == name]
if not entries:
return True
last = datetime.fromisoformat(entries[-1]["rotated_at"])
age = (datetime.now() - last).days
return age >= max_age_daysStep 4: Wire credentials into the MCP server handler
Load the encrypted credential at request time, validate the scoped token, and call the Scavio API. Credentials are never held in memory longer than a single request.
import httpx
async def handle_mcp_tool_call(token: str, tool_name: str, params: dict) -> dict:
# Step 1: Validate scoped token
if not validate_token(token, tool_name):
return {"error": "Unauthorized or expired token"}
# Step 2: Load credential just-in-time
try:
api_key = load_credential("scavio_api_key")
except KeyError:
return {"error": "Credential not configured"}
# Step 3: Execute the search
if tool_name == "web_search":
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": api_key},
json={"query": params.get("query", ""), "num_results": 5}
)
resp.raise_for_status()
return {"result": resp.json()}
return {"error": f"Unknown tool: {tool_name}"}Python Example
import os
import json
import secrets
import time
import asyncio
import httpx
from pathlib import Path
from cryptography.fernet import Fernet
# --- Setup ---
# Generate encryption key once: Fernet.generate_key().decode()
# export MCP_ENCRYPTION_KEY="your-fernet-key"
CRED_FILE = Path("mcp_credentials.enc")
ACTIVE_TOKENS: dict[str, dict] = {}
def get_cipher():
return Fernet(os.environ["MCP_ENCRYPTION_KEY"].encode())
def store_credential(name: str, value: str):
cipher = get_cipher()
creds = json.loads(CRED_FILE.read_text()) if CRED_FILE.exists() else {}
creds[name] = cipher.encrypt(value.encode()).decode()
CRED_FILE.write_text(json.dumps(creds, indent=2))
def load_credential(name: str) -> str:
cipher = get_cipher()
creds = json.loads(CRED_FILE.read_text())
return cipher.decrypt(creds[name].encode()).decode()
def create_scoped_token(tools: list[str], ttl: int = 3600) -> str:
token = secrets.token_urlsafe(32)
ACTIVE_TOKENS[token] = {"tools": set(tools), "expires": time.time() + ttl}
return token
def validate_token(token: str, tool: str) -> bool:
entry = ACTIVE_TOKENS.get(token)
if not entry or time.time() > entry["expires"]:
return False
return tool in entry["tools"]
async def secure_search(token: str, query: str) -> dict:
if not validate_token(token, "web_search"):
return {"error": "Unauthorized"}
api_key = load_credential("scavio_api_key")
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
"https://api.scavio.dev/api/v1/search",
headers={"x-api-key": api_key},
json={"query": query, "num_results": 5}
)
resp.raise_for_status()
return resp.json()
# Usage
store_credential("scavio_api_key", "your-api-key")
token = create_scoped_token(["web_search"], ttl=1800)
result = asyncio.run(secure_search(token, "MCP server security best practices 2026"))
print(f"Results: {len(result.get('results', []))}")JavaScript Example
const crypto = require("crypto");
const ALGORITHM = "aes-256-gcm";
const activeTokens = new Map();
function encrypt(text, key) {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv(ALGORITHM, Buffer.from(key, "hex"), iv);
let encrypted = cipher.update(text, "utf8", "hex");
encrypted += cipher.final("hex");
const tag = cipher.getAuthTag().toString("hex");
return iv.toString("hex") + ":" + tag + ":" + encrypted;
}
function decrypt(data, key) {
const [ivHex, tagHex, encrypted] = data.split(":");
const decipher = crypto.createDecipheriv(ALGORITHM, Buffer.from(key, "hex"), Buffer.from(ivHex, "hex"));
decipher.setAuthTag(Buffer.from(tagHex, "hex"));
let decrypted = decipher.update(encrypted, "hex", "utf8");
decrypted += decipher.final("utf8");
return decrypted;
}
function createScopedToken(tools, ttlSeconds = 3600) {
const token = crypto.randomBytes(32).toString("base64url");
activeTokens.set(token, { tools: new Set(tools), expires: Date.now() + ttlSeconds * 1000 });
return token;
}
function validateToken(token, tool) {
const entry = activeTokens.get(token);
if (!entry || Date.now() > entry.expires) return false;
return entry.tools.has(tool);
}
async function secureSearch(token, query) {
if (!validateToken(token, "web_search")) throw new Error("Unauthorized");
const encKey = process.env.MCP_ENCRYPTION_KEY;
const stored = JSON.parse(require("fs").readFileSync("mcp_credentials.enc", "utf8"));
const apiKey = decrypt(stored.scavio_api_key, encKey);
const resp = await fetch("https://api.scavio.dev/api/v1/search", {
method: "POST",
headers: { "x-api-key": apiKey, "Content-Type": "application/json" },
body: JSON.stringify({ query, num_results: 5 })
});
return resp.json();
}
const token = createScopedToken(["web_search"], 1800);
secureSearch(token, "MCP server security 2026").then(r => {
console.log("Results:", (r.results || []).length);
});Expected Output
Results: 5