Tutorial

How to Secure MCP Server Credentials with Rotation and Scoped Access

Secure your MCP server credentials with automatic rotation, scoped access tokens, and encrypted storage. Production-ready patterns for 2026 MCP deployments.

MCP servers bridge AI agents to external tools, but a leaked credential exposes every tool behind that server. This tutorial implements credential rotation, scoped access tokens, and encrypted-at-rest storage so your MCP server stays secure even if a single key leaks. All examples use the Scavio Search API as the protected resource.

Prerequisites

  • Python 3.11+ with cryptography library installed
  • A Scavio API key from https://scavio.dev
  • Basic understanding of the MCP protocol
  • An environment variable manager (direnv or dotenv)

Walkthrough

Step 1: Encrypt credentials at rest

Store API keys encrypted using Fernet symmetric encryption. The encryption key lives in an environment variable, never in code or config files.

Python
from cryptography.fernet import Fernet
import os
import json
from pathlib import Path

CRED_FILE = Path("mcp_credentials.enc")

def get_cipher():
    key = os.environ.get("MCP_ENCRYPTION_KEY")
    if not key:
        raise RuntimeError("MCP_ENCRYPTION_KEY not set")
    return Fernet(key.encode())

def store_credential(name: str, value: str):
    cipher = get_cipher()
    creds = load_all_credentials()
    creds[name] = cipher.encrypt(value.encode()).decode()
    CRED_FILE.write_text(json.dumps(creds, indent=2))

def load_credential(name: str) -> str:
    cipher = get_cipher()
    creds = load_all_credentials()
    encrypted = creds.get(name)
    if not encrypted:
        raise KeyError(f"Credential '{name}' not found")
    return cipher.decrypt(encrypted.encode()).decode()

def load_all_credentials() -> dict:
    if not CRED_FILE.exists():
        return {}
    return json.loads(CRED_FILE.read_text())

Step 2: Implement scoped access tokens

Generate short-lived, scoped tokens that limit which MCP tools a client can invoke. Each token encodes allowed tool names and an expiry timestamp.

Python
import hashlib
import time
import secrets

ACTIVE_TOKENS: dict[str, dict] = {}

def create_scoped_token(
    allowed_tools: list[str],
    ttl_seconds: int = 3600
) -> str:
    token = secrets.token_urlsafe(32)
    ACTIVE_TOKENS[token] = {
        "tools": set(allowed_tools),
        "expires": time.time() + ttl_seconds,
        "created": time.time()
    }
    return token

def validate_token(token: str, tool_name: str) -> bool:
    entry = ACTIVE_TOKENS.get(token)
    if not entry:
        return False
    if time.time() > entry["expires"]:
        del ACTIVE_TOKENS[token]
        return False
    return tool_name in entry["tools"]

def revoke_token(token: str):
    ACTIVE_TOKENS.pop(token, None)

Step 3: Add automatic credential rotation

Rotate the Scavio API key on a schedule. The rotation function re-encrypts the new key, updates the MCP server config, and logs the rotation event without downtime.

Python
import logging
from datetime import datetime

logger = logging.getLogger("mcp_security")

ROTATION_LOG = Path("rotation_log.json")

def rotate_credential(name: str, new_value: str):
    old_exists = name in load_all_credentials()
    store_credential(name, new_value)

    log_entry = {
        "credential": name,
        "rotated_at": datetime.now().isoformat(),
        "action": "rotated" if old_exists else "created"
    }

    log_data = []
    if ROTATION_LOG.exists():
        log_data = json.loads(ROTATION_LOG.read_text())
    log_data.append(log_entry)
    ROTATION_LOG.write_text(json.dumps(log_data, indent=2))
    logger.info(f"Credential '{name}' {log_entry['action']}")

def check_rotation_due(name: str, max_age_days: int = 30) -> bool:
    if not ROTATION_LOG.exists():
        return True
    log_data = json.loads(ROTATION_LOG.read_text())
    entries = [e for e in log_data if e["credential"] == name]
    if not entries:
        return True
    last = datetime.fromisoformat(entries[-1]["rotated_at"])
    age = (datetime.now() - last).days
    return age >= max_age_days

Step 4: Wire credentials into the MCP server handler

Load the encrypted credential at request time, validate the scoped token, and call the Scavio API. Credentials are never held in memory longer than a single request.

Python
import httpx

async def handle_mcp_tool_call(token: str, tool_name: str, params: dict) -> dict:
    # Step 1: Validate scoped token
    if not validate_token(token, tool_name):
        return {"error": "Unauthorized or expired token"}

    # Step 2: Load credential just-in-time
    try:
        api_key = load_credential("scavio_api_key")
    except KeyError:
        return {"error": "Credential not configured"}

    # Step 3: Execute the search
    if tool_name == "web_search":
        async with httpx.AsyncClient(timeout=15) as client:
            resp = await client.post(
                "https://api.scavio.dev/api/v1/search",
                headers={"x-api-key": api_key},
                json={"query": params.get("query", ""), "num_results": 5}
            )
            resp.raise_for_status()
            return {"result": resp.json()}

    return {"error": f"Unknown tool: {tool_name}"}

Python Example

Python
import os
import json
import secrets
import time
import asyncio
import httpx
from pathlib import Path
from cryptography.fernet import Fernet

# --- Setup ---
# Generate encryption key once: Fernet.generate_key().decode()
# export MCP_ENCRYPTION_KEY="your-fernet-key"

CRED_FILE = Path("mcp_credentials.enc")
ACTIVE_TOKENS: dict[str, dict] = {}

def get_cipher():
    return Fernet(os.environ["MCP_ENCRYPTION_KEY"].encode())

def store_credential(name: str, value: str):
    cipher = get_cipher()
    creds = json.loads(CRED_FILE.read_text()) if CRED_FILE.exists() else {}
    creds[name] = cipher.encrypt(value.encode()).decode()
    CRED_FILE.write_text(json.dumps(creds, indent=2))

def load_credential(name: str) -> str:
    cipher = get_cipher()
    creds = json.loads(CRED_FILE.read_text())
    return cipher.decrypt(creds[name].encode()).decode()

def create_scoped_token(tools: list[str], ttl: int = 3600) -> str:
    token = secrets.token_urlsafe(32)
    ACTIVE_TOKENS[token] = {"tools": set(tools), "expires": time.time() + ttl}
    return token

def validate_token(token: str, tool: str) -> bool:
    entry = ACTIVE_TOKENS.get(token)
    if not entry or time.time() > entry["expires"]:
        return False
    return tool in entry["tools"]

async def secure_search(token: str, query: str) -> dict:
    if not validate_token(token, "web_search"):
        return {"error": "Unauthorized"}
    api_key = load_credential("scavio_api_key")
    async with httpx.AsyncClient(timeout=15) as client:
        resp = await client.post(
            "https://api.scavio.dev/api/v1/search",
            headers={"x-api-key": api_key},
            json={"query": query, "num_results": 5}
        )
        resp.raise_for_status()
        return resp.json()

# Usage
store_credential("scavio_api_key", "your-api-key")
token = create_scoped_token(["web_search"], ttl=1800)
result = asyncio.run(secure_search(token, "MCP server security best practices 2026"))
print(f"Results: {len(result.get('results', []))}")

JavaScript Example

JavaScript
const crypto = require("crypto");

const ALGORITHM = "aes-256-gcm";
const activeTokens = new Map();

function encrypt(text, key) {
  const iv = crypto.randomBytes(16);
  const cipher = crypto.createCipheriv(ALGORITHM, Buffer.from(key, "hex"), iv);
  let encrypted = cipher.update(text, "utf8", "hex");
  encrypted += cipher.final("hex");
  const tag = cipher.getAuthTag().toString("hex");
  return iv.toString("hex") + ":" + tag + ":" + encrypted;
}

function decrypt(data, key) {
  const [ivHex, tagHex, encrypted] = data.split(":");
  const decipher = crypto.createDecipheriv(ALGORITHM, Buffer.from(key, "hex"), Buffer.from(ivHex, "hex"));
  decipher.setAuthTag(Buffer.from(tagHex, "hex"));
  let decrypted = decipher.update(encrypted, "hex", "utf8");
  decrypted += decipher.final("utf8");
  return decrypted;
}

function createScopedToken(tools, ttlSeconds = 3600) {
  const token = crypto.randomBytes(32).toString("base64url");
  activeTokens.set(token, { tools: new Set(tools), expires: Date.now() + ttlSeconds * 1000 });
  return token;
}

function validateToken(token, tool) {
  const entry = activeTokens.get(token);
  if (!entry || Date.now() > entry.expires) return false;
  return entry.tools.has(tool);
}

async function secureSearch(token, query) {
  if (!validateToken(token, "web_search")) throw new Error("Unauthorized");
  const encKey = process.env.MCP_ENCRYPTION_KEY;
  const stored = JSON.parse(require("fs").readFileSync("mcp_credentials.enc", "utf8"));
  const apiKey = decrypt(stored.scavio_api_key, encKey);
  const resp = await fetch("https://api.scavio.dev/api/v1/search", {
    method: "POST",
    headers: { "x-api-key": apiKey, "Content-Type": "application/json" },
    body: JSON.stringify({ query, num_results: 5 })
  });
  return resp.json();
}

const token = createScopedToken(["web_search"], 1800);
secureSearch(token, "MCP server security 2026").then(r => {
  console.log("Results:", (r.results || []).length);
});

Expected Output

JSON
Results: 5

Related Tutorials

Frequently Asked Questions

Most developers complete this tutorial in 15 to 30 minutes. You will need a Scavio API key (free tier works) and a working Python or JavaScript environment.

Python 3.11+ with cryptography library installed. A Scavio API key from https://scavio.dev. Basic understanding of the MCP protocol. An environment variable manager (direnv or dotenv). A Scavio API key gives you 250 free credits per month.

Yes. The free tier includes 250 credits per month, which is more than enough to complete this tutorial and prototype a working solution.

Scavio has a native LangChain package (langchain-scavio), an MCP server, and a plain REST API that works with any HTTP client. This tutorial uses the raw REST API, but you can adapt to your framework of choice.

Start Building

Secure your MCP server credentials with automatic rotation, scoped access tokens, and encrypted storage. Production-ready patterns for 2026 MCP deployments.