ScavioScavio
产品定价文档
登录开始使用
  1. 首页
  2. 教程
  3. 如何通过轮换和范围访问来保护 MCP 服务器凭证
教程

如何通过轮换和范围访问来保护 MCP 服务器凭证

通过自动轮换、范围访问令牌和加密存储来保护您的 MCP 服务器凭据。 2026 年 MCP 部署的生产就绪模式。

获取免费API密钥API文档

MCP 服务器将 AI 代理与外部工具连接起来,但泄露的凭证暴露了该服务器背后的每个工具。本教程实现了凭证轮换、范围访问令牌和静态加密存储,因此即使单个密钥泄漏,您的 MCP 服务器也能保持安全。所有示例都使用 Scavio Search API 作为受保护资源。

前置条件

  • 安装了加密库的 Python 3.11+
  • 来自 https://scavio.dev 的 Scavio API 密钥
  • MCP协议的基本了解
  • 环境变量管理器(direnv 或 dotenv)

操作指南

步骤 1: 静态加密凭证

存储使用 Fernet 对称加密加密的 API 密钥。加密密钥位于环境变量中,而不位于代码或配置文件中。

Python
from cryptography.fernet import Fernet
import os
import json
from pathlib import Path

CRED_FILE = Path("mcp_credentials.enc")

def get_cipher():
    key = os.environ.get("MCP_ENCRYPTION_KEY")
    if not key:
        raise RuntimeError("MCP_ENCRYPTION_KEY not set")
    return Fernet(key.encode())

def store_credential(name: str, value: str):
    cipher = get_cipher()
    creds = load_all_credentials()
    creds[name] = cipher.encrypt(value.encode()).decode()
    CRED_FILE.write_text(json.dumps(creds, indent=2))

def load_credential(name: str) -> str:
    cipher = get_cipher()
    creds = load_all_credentials()
    encrypted = creds.get(name)
    if not encrypted:
        raise KeyError(f"Credential '{name}' not found")
    return cipher.decrypt(encrypted.encode()).decode()

def load_all_credentials() -> dict:
    if not CRED_FILE.exists():
        return {}
    return json.loads(CRED_FILE.read_text())

步骤 2: 实施范围访问令牌

生成短期的、有范围的令牌,限制客户端可以调用哪些 MCP 工具。每个令牌都对允许的工具名称和到期时间戳进行编码。

Python
import hashlib
import time
import secrets

ACTIVE_TOKENS: dict[str, dict] = {}

def create_scoped_token(
    allowed_tools: list[str],
    ttl_seconds: int = 3600
) -> str:
    token = secrets.token_urlsafe(32)
    ACTIVE_TOKENS[token] = {
        "tools": set(allowed_tools),
        "expires": time.time() + ttl_seconds,
        "created": time.time()
    }
    return token

def validate_token(token: str, tool_name: str) -> bool:
    entry = ACTIVE_TOKENS.get(token)
    if not entry:
        return False
    if time.time() > entry["expires"]:
        del ACTIVE_TOKENS[token]
        return False
    return tool_name in entry["tools"]

def revoke_token(token: str):
    ACTIVE_TOKENS.pop(token, None)

步骤 3: 添加自动凭证轮换

按计划轮换 Scavio API 密钥。轮换功能会重新加密新密钥、更新 MCP 服务器配置并记录轮换事件,而无需停机。

Python
import logging
from datetime import datetime

logger = logging.getLogger("mcp_security")

ROTATION_LOG = Path("rotation_log.json")

def rotate_credential(name: str, new_value: str):
    old_exists = name in load_all_credentials()
    store_credential(name, new_value)

    log_entry = {
        "credential": name,
        "rotated_at": datetime.now().isoformat(),
        "action": "rotated" if old_exists else "created"
    }

    log_data = []
    if ROTATION_LOG.exists():
        log_data = json.loads(ROTATION_LOG.read_text())
    log_data.append(log_entry)
    ROTATION_LOG.write_text(json.dumps(log_data, indent=2))
    logger.info(f"Credential '{name}' {log_entry['action']}")

def check_rotation_due(name: str, max_age_days: int = 30) -> bool:
    if not ROTATION_LOG.exists():
        return True
    log_data = json.loads(ROTATION_LOG.read_text())
    entries = [e for e in log_data if e["credential"] == name]
    if not entries:
        return True
    last = datetime.fromisoformat(entries[-1]["rotated_at"])
    age = (datetime.now() - last).days
    return age >= max_age_days

步骤 4: 将凭证连接到 MCP 服务器处理程序

在请求时加载加密凭证,验证作用域令牌,然后调用 Scavio API。凭证在内存中保存的时间绝不会超过单个请求。

Python
import httpx

async def handle_mcp_tool_call(token: str, tool_name: str, params: dict) -> dict:
    # Step 1: Validate scoped token
    if not validate_token(token, tool_name):
        return {"error": "Unauthorized or expired token"}

    # Step 2: Load credential just-in-time
    try:
        api_key = load_credential("scavio_api_key")
    except KeyError:
        return {"error": "Credential not configured"}

    # Step 3: Execute the search
    if tool_name == "web_search":
        async with httpx.AsyncClient(timeout=15) as client:
            resp = await client.post(
                "https://api.scavio.dev/api/v1/search",
                headers={"x-api-key": api_key},
                json={"query": params.get("query", ""), "num_results": 5}
            )
            resp.raise_for_status()
            return {"result": resp.json()}

    return {"error": f"Unknown tool: {tool_name}"}

Python 示例

Python
import os
import json
import secrets
import time
import asyncio
import httpx
from pathlib import Path
from cryptography.fernet import Fernet

# --- Setup ---
# Generate encryption key once: Fernet.generate_key().decode()
# export MCP_ENCRYPTION_KEY="your-fernet-key"

CRED_FILE = Path("mcp_credentials.enc")
ACTIVE_TOKENS: dict[str, dict] = {}

def get_cipher():
    return Fernet(os.environ["MCP_ENCRYPTION_KEY"].encode())

def store_credential(name: str, value: str):
    cipher = get_cipher()
    creds = json.loads(CRED_FILE.read_text()) if CRED_FILE.exists() else {}
    creds[name] = cipher.encrypt(value.encode()).decode()
    CRED_FILE.write_text(json.dumps(creds, indent=2))

def load_credential(name: str) -> str:
    cipher = get_cipher()
    creds = json.loads(CRED_FILE.read_text())
    return cipher.decrypt(creds[name].encode()).decode()

def create_scoped_token(tools: list[str], ttl: int = 3600) -> str:
    token = secrets.token_urlsafe(32)
    ACTIVE_TOKENS[token] = {"tools": set(tools), "expires": time.time() + ttl}
    return token

def validate_token(token: str, tool: str) -> bool:
    entry = ACTIVE_TOKENS.get(token)
    if not entry or time.time() > entry["expires"]:
        return False
    return tool in entry["tools"]

async def secure_search(token: str, query: str) -> dict:
    if not validate_token(token, "web_search"):
        return {"error": "Unauthorized"}
    api_key = load_credential("scavio_api_key")
    async with httpx.AsyncClient(timeout=15) as client:
        resp = await client.post(
            "https://api.scavio.dev/api/v1/search",
            headers={"x-api-key": api_key},
            json={"query": query, "num_results": 5}
        )
        resp.raise_for_status()
        return resp.json()

# Usage
store_credential("scavio_api_key", "your-api-key")
token = create_scoped_token(["web_search"], ttl=1800)
result = asyncio.run(secure_search(token, "MCP server security best practices 2026"))
print(f"Results: {len(result.get('results', []))}")

JavaScript 示例

JavaScript
const crypto = require("crypto");

const ALGORITHM = "aes-256-gcm";
const activeTokens = new Map();

function encrypt(text, key) {
  const iv = crypto.randomBytes(16);
  const cipher = crypto.createCipheriv(ALGORITHM, Buffer.from(key, "hex"), iv);
  let encrypted = cipher.update(text, "utf8", "hex");
  encrypted += cipher.final("hex");
  const tag = cipher.getAuthTag().toString("hex");
  return iv.toString("hex") + ":" + tag + ":" + encrypted;
}

function decrypt(data, key) {
  const [ivHex, tagHex, encrypted] = data.split(":");
  const decipher = crypto.createDecipheriv(ALGORITHM, Buffer.from(key, "hex"), Buffer.from(ivHex, "hex"));
  decipher.setAuthTag(Buffer.from(tagHex, "hex"));
  let decrypted = decipher.update(encrypted, "hex", "utf8");
  decrypted += decipher.final("utf8");
  return decrypted;
}

function createScopedToken(tools, ttlSeconds = 3600) {
  const token = crypto.randomBytes(32).toString("base64url");
  activeTokens.set(token, { tools: new Set(tools), expires: Date.now() + ttlSeconds * 1000 });
  return token;
}

function validateToken(token, tool) {
  const entry = activeTokens.get(token);
  if (!entry || Date.now() > entry.expires) return false;
  return entry.tools.has(tool);
}

async function secureSearch(token, query) {
  if (!validateToken(token, "web_search")) throw new Error("Unauthorized");
  const encKey = process.env.MCP_ENCRYPTION_KEY;
  const stored = JSON.parse(require("fs").readFileSync("mcp_credentials.enc", "utf8"));
  const apiKey = decrypt(stored.scavio_api_key, encKey);
  const resp = await fetch("https://api.scavio.dev/api/v1/search", {
    method: "POST",
    headers: { "x-api-key": apiKey, "Content-Type": "application/json" },
    body: JSON.stringify({ query, num_results: 5 })
  });
  return resp.json();
}

const token = createScopedToken(["web_search"], 1800);
secureSearch(token, "MCP server security 2026").then(r => {
  console.log("Results:", (r.results || []).length);
});

预期输出

JSON
Results: 5

相关教程

  • 如何通过 MCP 将 Scavio 搜索添加到 Hermes Agent v0.14.0
  • 如何构建结合搜索和金融数据的交易数据MCP服务器
  • 如何使用 Scavio Search API 构建代理工具后备链

常见问题

大多数开发者在15到30分钟内完成本教程。您需要一个Scavio API密钥(免费套餐即可)和可用的Python或JavaScript环境。

安装了加密库的 Python 3.11+. 来自 https://scavio.dev 的 Scavio API 密钥. MCP协议的基本了解. 环境变量管理器(direnv 或 dotenv). Scavio API密钥注册即送50个免费积分。

可以。免费套餐注册即送50个积分,完全足够完成本教程并构建一个可运行的原型解决方案。

Scavio提供原生LangChain包(langchain-scavio)、MCP服务器以及适用于任何HTTP客户端的REST API。本教程使用 the raw REST API, 但您可以根据需要适配您选择的框架。

相关资源

Comparison

MCP Search Integration vs Direct API Integration

Read more
Best Of

2026 年 MCP 服务器最佳搜索 API

Read more
Use Case

MCP 自定义搜索服务器

Read more
Best Of

2026年MCP服务器构建者最佳API

Read more
Use Case

用于业务运营的 MCP 自定义 API 集成

Read more
Solution

用编码前搜索检查验证MCP工具输出

Read more

开始构建

通过自动轮换、范围访问令牌和加密存储来保护您的 MCP 服务器凭据。 2026 年 MCP 部署的生产就绪模式。

获取免费API密钥阅读文档
ScavioScavio

面向AI智能体的实时搜索API。搜索所有平台,不仅仅是Google。

产品

  • 功能
  • 定价
  • 控制台
  • 联盟计划

开发者

  • 文档
  • API参考
  • 快速开始
  • MCP集成
  • Python SDK

替代方案

  • Tavily替代方案
  • SerpAPI替代方案
  • Firecrawl替代方案
  • Exa替代方案

工具

  • JSON格式化
  • cURL转代码
  • Token计数器
  • 全部工具

© 2026 Scavio. 保留所有权利。

Featured on TAAFT
服务条款隐私政策