🔮 高级精通教程

开发自定义 MCP Server、编写专属 Skill、构建复杂工作流、深度安全防护与性能优化,成为 AI Agent 架构师

课程概述

本教程面向有经验的用户,将带你深入 AI Agent 架构的核心,掌握自定义开发能力,构建生产级的多 Agent 协作系统。

🔨

自定义 MCP Server

TypeScript/Python 双语言开发、远程部署、测试调试、生产发布

✍️

Skill 开发

完整开发流程:设计、编码、测试、打包、发布到社区

🔄

高级工作流

DAG 编排、条件分支、并行执行、Saga 模式、错误恢复

🛡️

安全与优化

沙箱隔离、RBAC、审计日志、性能调优、成本控制

💡 前置要求: 请先完成 初级教程中级教程,确保你已经掌握了多 Worker 编排和 MCP 配置。本教程需要 TypeScript 或 Python 开发经验。

MCP 协议深度

在开发自定义 MCP Server 之前,深入理解 MCP 协议的架构和通信机制是必要的。

MCP 协议架构

MCP Client
(Hermes/OpenClaw)
JSON-RPC 2.0
over stdio/SSE/HTTP
MCP Server
(你的自定义服务)
外部资源
(API/DB/文件)

MCP 通信生命周期

通信流程
1. 初始化 (Initialize)
   Client → Server: initialize request (capabilities, clientInfo)
   Server → Client: initialize response (capabilities, serverInfo)
   
2. 发现 (Discovery)
   Client → Server: tools/list        → 获取可用工具列表
   Client → Server: resources/list    → 获取可用资源列表
   Client → Server: prompts/list      → 获取可用提示模板
   
3. 调用 (Invocation)
   Client → Server: tools/call        → 调用工具
   Client → Server: resources/read    → 读取资源
   Client → Server: prompts/get       → 获取提示模板
   
4. 通知 (Notification)
   Server → Client: notifications/tools/list_changed  → 工具列表变更
   Server → Client: notifications/resources/updated   → 资源更新
   
5. 关闭 (Shutdown)
   Client → Server: shutdown notification

JSON-RPC 2.0 消息格式

json
// 请求 (Request)
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "query_database",
    "arguments": {
      "sql": "SELECT * FROM users LIMIT 10"
    }
  }
}

// 成功响应 (Response)
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "[{\"id\": 1, \"name\": \"Alice\"}, ...]"
      }
    ]
  }
}

// 错误响应 (Error)
{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32602,
    "message": "Invalid params: sql is required"
  }
}

三大原语深度对比

原语方向是否可变典型用途注册方法
Tools Client → Server 可动态变更 执行操作:查询DB、调用API、写文件 server.tool()
Resources Server → Client 可动态更新 提供数据:文件内容、配置信息、Schema server.resource()
Prompts Client → Server 静态定义 标准化交互:代码审查模板、分析报告模板 server.prompt()

TypeScript MCP Server 开发

使用官方 MCP TypeScript SDK 开发自定义服务器,这是最成熟和推荐的方案。

项目初始化

bash
# 创建项目目录
mkdir my-mcp-server && cd my-mcp-server

# 初始化 Node.js 项目
npm init -y

# 安装依赖
npm install @modelcontextprotocol/sdk zod
npm install -D typescript @types/node tsx

# 创建 TypeScript 配置
cat > tsconfig.json << 'EOF'
{
  "compilerOptions": {
    "target": "ES2022",
    "module": "Node16",
    "moduleResolution": "Node16",
    "outDir": "./dist",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "declaration": true
  },
  "include": ["src/**/*"]
}
EOF

# 创建目录结构
mkdir -p src tools resources prompts

完整 MCP Server 实现

typescript
// src/index.ts - 完整 MCP Server
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";

const server = new McpServer({
  name: "my-enterprise-server",
  version: "1.0.0",
});

// ==========================================
// Tools(工具)- Agent 可调用的函数
// ==========================================

// 工具1: 查询企业数据库
server.tool(
  "query_enterprise_db",
  "查询企业数据库,支持只读 SQL 查询",
  {
    sql: z.string().describe("SQL 查询语句(仅支持 SELECT)"),
    database: z.enum(["production", "staging", "analytics"]).describe("目标数据库"),
    limit: z.number().optional().describe("结果限制数量,默认 100"),
  },
  async ({ sql, database, limit }) => {
    // 安全校验:只允许 SELECT 语句
    const normalizedSql = sql.trim().toUpperCase();
    if (!normalizedSql.startsWith("SELECT") && !normalizedSql.startsWith("WITH")) {
      return {
        content: [{ type: "text", text: "❌ 安全限制:仅支持 SELECT/WITH 查询" }],
        isError: true,
      };
    }

    try {
      const results = await executeQuery(database, sql, limit || 100);
      return {
        content: [{
          type: "text",
          text: JSON.stringify(results, null, 2),
        }],
      };
    } catch (error) {
      return {
        content: [{ type: "text", text: `❌ 查询失败: ${error.message}` }],
        isError: true,
      };
    }
  }
);

// 工具2: 调用内部 API
server.tool(
  "call_internal_api",
  "调用企业内部 API 接口",
  {
    endpoint: z.string().describe("API 端点路径,如 /api/v1/users"),
    method: z.enum(["GET", "POST", "PUT", "DELETE"]).describe("HTTP 方法"),
    body: z.record(z.any()).optional().describe("请求体(JSON)"),
    headers: z.record(z.string()).optional().describe("额外请求头"),
  },
  async ({ endpoint, method, body, headers }) => {
    try {
      const response = await fetch(`https://internal-api.company.com${endpoint}`, {
        method,
        headers: {
          "Content-Type": "application/json",
          "Authorization": `Bearer ${process.env.INTERNAL_API_TOKEN}`,
          ...headers,
        },
        body: body ? JSON.stringify(body) : undefined,
      });

      const data = await response.json();
      return {
        content: [{
          type: "text",
          text: JSON.stringify({ status: response.status, data }, null, 2),
        }],
      };
    } catch (error) {
      return {
        content: [{ type: "text", text: `❌ API 调用失败: ${error.message}` }],
        isError: true,
      };
    }
  }
);

// 工具3: 生成报告
server.tool(
  "generate_report",
  "生成业务报告并保存为文件",
  {
    report_type: z.enum(["daily", "weekly", "monthly", "custom"]).describe("报告类型"),
    metrics: z.array(z.string()).describe("包含的指标列表"),
    date_range: z.object({
      start: z.string().describe("开始日期 YYYY-MM-DD"),
      end: z.string().describe("结束日期 YYYY-MM-DD"),
    }).describe("日期范围"),
    format: z.enum(["markdown", "html", "pdf"]).optional().describe("输出格式,默认 markdown"),
  },
  async ({ report_type, metrics, date_range, format }) => {
    const report = await generateBusinessReport({
      type: report_type,
      metrics,
      dateRange: date_range,
      format: format || "markdown",
    });

    return {
      content: [
        { type: "text", text: `📊 报告已生成\n类型: ${report_type}\n指标: ${metrics.join(", ")}\n日期: ${date_range.start} ~ ${date_range.end}` },
        { type: "resource", resource: { uri: `report://${report.id}`, mimeType: "text/markdown", text: report.content } },
      ],
    };
  }
);

// ==========================================
// Resources(资源)- Agent 可读取的数据
// ==========================================

// 资源1: 数据库 Schema
server.resource(
  "schema://tables",
  "数据库表结构信息",
  async (uri) => ({
    contents: [{
      uri: uri.href,
      mimeType: "application/json",
      text: JSON.stringify(await getTableSchemas(), null, 2),
    }],
  })
);

// 资源2: API 文档
server.resource(
  "docs://api",
  "内部 API 文档",
  async (uri) => ({
    contents: [{
      uri: uri.href,
      mimeType: "text/markdown",
      text: await getApiDocumentation(),
    }],
  })
);

// 资源3: 动态资源模板
server.resource(
  "config://{service}",
  "服务配置信息",
  async (uri, { service }) => ({
    contents: [{
      uri: uri.href,
      mimeType: "application/json",
      text: JSON.stringify(await getServiceConfig(service), null, 2),
    }],
  })
);

// ==========================================
// Prompts(提示模板)- 标准化交互
// ==========================================

server.prompt(
  "analyze-data",
  "数据分析提示模板,引导 Agent 进行结构化分析",
  {
    query: z.string().describe("分析查询"),
    dataset: z.string().optional().describe("数据集名称"),
  },
  async ({ query, dataset }) => ({
    messages: [{
      role: "user",
      content: {
        type: "text",
        text: `请对以下数据进行深入分析:

查询: ${query}
${dataset ? `数据集: ${dataset}` : ""}

分析步骤:
1. 理解数据结构和字段含义
2. 执行必要的查询获取数据
3. 进行统计分析和趋势识别
4. 识别异常值和关键发现
5. 生成可视化建议
6. 给出业务建议

请使用 query_enterprise_db 工具查询数据,结果以 Markdown 格式输出。`,
      },
    }],
  })
);

server.prompt(
  "code-review",
  "代码审查提示模板",
  {
    file_path: z.string().describe("待审查的文件路径"),
    focus: z.enum(["security", "performance", "style", "all"]).optional().describe("审查重点"),
  },
  async ({ file_path, focus }) => ({
    messages: [{
      role: "user",
      content: {
        type: "text",
        text: `请对文件 ${file_path} 进行${focus === "all" ? "全面" : focus}审查:

审查清单:
- [ ] 代码正确性和逻辑完整性
- [ ] 安全漏洞(SQL注入、XSS、敏感信息泄露)
- [ ] 性能问题(N+1查询、内存泄漏、不必要的循环)
- [ ] 代码风格和可维护性
- [ ] 错误处理和边界情况
- [ ] 测试覆盖

请给出具体的改进建议和优先级。`,
      },
    }],
  })
);

// ==========================================
// 启动服务器
// ==========================================

async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("🚀 MCP Server 已启动: my-enterprise-server v1.0.0");
}

main().catch(console.error);

// ==========================================
// 辅助函数(示例实现)
// ==========================================

async function executeQuery(database: string, sql: string, limit: number) {
  // 实际实现中连接数据库执行查询
  return { rows: [], count: 0, database, limit };
}

async function getTableSchemas() {
  return { tables: [] };
}

async function getApiDocumentation() {
  return "# API Documentation\n...";
}

async function getServiceConfig(service: string) {
  return { service, config: {} };
}

async function generateBusinessReport(options: any) {
  return { id: "rpt-001", content: "# Report\n..." };
}

package.json 配置

json
{
  "name": "my-mcp-server",
  "version": "1.0.0",
  "type": "module",
  "main": "dist/index.js",
  "bin": {
    "my-mcp-server": "dist/index.js"
  },
  "scripts": {
    "build": "tsc",
    "dev": "tsx src/index.ts",
    "start": "node dist/index.js",
    "test": "vitest",
    "lint": "eslint src/"
  },
  "dependencies": {
    "@modelcontextprotocol/sdk": "^1.0.0",
    "zod": "^3.22.0"
  },
  "devDependencies": {
    "typescript": "^5.3.0",
    "@types/node": "^20.0.0",
    "tsx": "^4.0.0",
    "vitest": "^1.0.0"
  }
}

Python MCP Server 开发

使用 Python MCP SDK 开发自定义服务器,适合数据科学和 AI 场景。

项目初始化

bash
# 创建项目
mkdir my-py-mcp-server && cd my-py-mcp-server

# 创建虚拟环境
python -m venv .venv
source .venv/bin/activate

# 安装依赖
pip install mcp httpx pydantic

# 创建目录结构
mkdir -p src tests

完整 Python MCP Server

python
# src/server.py - 完整 Python MCP Server
import json
import os
from typing import Any
from mcp.server import Server
from mcp.types import (
    Tool, TextContent, ImageContent, EmbeddedResource,
    Resource, Prompt, PromptMessage, PromptArgument,
)
import mcp.server.stdio

server = Server("my-enterprise-server")

# ==========================================
# 工具注册
# ==========================================

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="query_database",
            description="查询企业数据库,支持只读 SQL",
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "SQL 查询语句(仅 SELECT)",
                    },
                    "database": {
                        "type": "string",
                        "enum": ["production", "staging", "analytics"],
                        "description": "目标数据库",
                    },
                    "limit": {
                        "type": "number",
                        "description": "结果限制,默认 100",
                    },
                },
                "required": ["sql", "database"],
            },
        ),
        Tool(
            name="send_notification",
            description="发送企业通知(Slack/邮件/钉钉)",
            inputSchema={
                "type": "object",
                "properties": {
                    "channel": {
                        "type": "string",
                        "enum": ["slack", "email", "dingtalk"],
                        "description": "通知渠道",
                    },
                    "recipient": {
                        "type": "string",
                        "description": "接收者(邮箱/用户ID/群ID)",
                    },
                    "message": {
                        "type": "string",
                        "description": "通知内容",
                    },
                    "priority": {
                        "type": "string",
                        "enum": ["low", "normal", "high", "urgent"],
                        "description": "优先级",
                    },
                },
                "required": ["channel", "recipient", "message"],
            },
        ),
        Tool(
            name="analyze_data",
            description="使用 Python 分析数据,支持 pandas/numpy/scikit-learn",
            inputSchema={
                "type": "object",
                "properties": {
                    "code": {
                        "type": "string",
                        "description": "Python 分析代码",
                    },
                    "data_source": {
                        "type": "string",
                        "description": "数据源路径或 SQL 查询",
                    },
                },
                "required": ["code"],
            },
        ),
    ]


@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
    if name == "query_database":
        sql = arguments["sql"].strip().upper()
        if not (sql.startswith("SELECT") or sql.startswith("WITH")):
            return [TextContent(type="text", text="❌ 安全限制:仅支持 SELECT/WITH 查询")]
        
        database = arguments["database"]
        limit = arguments.get("limit", 100)
        
        try:
            results = await execute_query(database, arguments["sql"], limit)
            return [TextContent(type="text", text=json.dumps(results, indent=2, ensure_ascii=False))]
        except Exception as e:
            return [TextContent(type="text", text=f"❌ 查询失败: {str(e)}")]

    elif name == "send_notification":
        channel = arguments["channel"]
        recipient = arguments["recipient"]
        message = arguments["message"]
        priority = arguments.get("priority", "normal")
        
        try:
            result = await send_notification(channel, recipient, message, priority)
            return [TextContent(type="text", text=f"✅ 通知已发送\n渠道: {channel}\n接收: {recipient}\n优先级: {priority}")]
        except Exception as e:
            return [TextContent(type="text", text=f"❌ 发送失败: {str(e)}")]

    elif name == "analyze_data":
        code = arguments["code"]
        data_source = arguments.get("data_source")
        
        try:
            result = await execute_analysis(code, data_source)
            return [TextContent(type="text", text=result)]
        except Exception as e:
            return [TextContent(type="text", text=f"❌ 分析失败: {str(e)}")]

    return [TextContent(type="text", text=f"未知工具: {name}")]


# ==========================================
# 资源注册
# ==========================================

@server.list_resources()
async def list_resources() -> list[Resource]:
    return [
        Resource(
            uri="schema://tables",
            name="数据库表结构",
            mimeType="application/json",
        ),
        Resource(
            uri="config://services",
            name="服务配置",
            mimeType="application/json",
        ),
    ]


@server.read_resource()
async def read_resource(uri: str) -> str:
    if uri == "schema://tables":
        return json.dumps(await get_table_schemas(), indent=2, ensure_ascii=False)
    elif uri.startswith("config://"):
        service = uri.replace("config://", "")
        return json.dumps(await get_service_config(service), indent=2, ensure_ascii=False)
    raise ValueError(f"未知资源: {uri}")


# ==========================================
# 提示模板注册
# ==========================================

@server.list_prompts()
async def list_prompts() -> list[Prompt]:
    return [
        Prompt(
            name="analyze-data",
            description="数据分析提示模板",
            arguments=[
                PromptArgument(name="query", description="分析查询", required=True),
                PromptArgument(name="dataset", description="数据集名称", required=False),
            ],
        ),
    ]


@server.get_prompt()
async def get_prompt(name: str, arguments: dict[str, str]) -> list[PromptMessage]:
    if name == "analyze-data":
        query = arguments.get("query", "")
        dataset = arguments.get("dataset", "")
        return [
            PromptMessage(
                role="user",
                content=TextContent(
                    type="text",
                    text=f"请分析: {query}\n数据集: {dataset or '自动选择'}\n\n步骤: 1.获取数据 2.统计分析 3.可视化建议 4.业务建议",
                ),
            )
        ]
    raise ValueError(f"未知提示模板: {name}")


# ==========================================
# 启动
# ==========================================

async def main():
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(read_stream, write_stream, server.create_initialization_options())

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

远程 MCP Server 开发

当 MCP Server 需要部署在远程服务器上时,使用 SSE 或 Streamable HTTP 传输方式。

SSE 远程 Server(TypeScript)

typescript
// src/remote-server.ts - SSE 远程 MCP Server
import express from "express";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
import cors from "cors";

const app = express();
app.use(cors());
app.use(express.json());

const server = new McpServer({
  name: "remote-enterprise-server",
  version: "1.0.0",
});

// 注册工具(同本地版本)
server.tool("query_database", "查询企业数据库", {
  sql: z.string(),
  database: z.enum(["production", "staging", "analytics"]),
}, async ({ sql, database }) => {
  const results = await executeQuery(database, sql);
  return { content: [{ type: "text", text: JSON.stringify(results, null, 2) }] };
});

// SSE 端点
let transport: SSEServerTransport | null = null;

app.get("/sse", (req, res) => {
  transport = new SSEServerTransport("/messages", res);
  server.connect(transport);
});

app.post("/messages", (req, res) => {
  if (transport) {
    transport.handlePostMessage(req, res);
  }
});

// 健康检查
app.get("/health", (req, res) => {
  res.json({ status: "ok", version: "1.0.0" });
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`🚀 Remote MCP Server running on port ${PORT}`);
  console.log(`📡 SSE endpoint: http://localhost:${PORT}/sse`);
});

客户端连接远程 Server

yaml
# ~/.hermes/config.yaml - 连接远程 MCP Server
mcp_servers:
  remote-enterprise:
    url: https://mcp.your-company.com/sse
    transport: sse
    headers:
      Authorization: Bearer ${MCP_AUTH_TOKEN}
      X-Company-ID: your-company-id

远程部署(Docker + Nginx)

dockerfile
# Dockerfile
FROM node:20-slim

WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY dist/ ./dist/

EXPOSE 3000

HEALTHCHECK --interval=30s --timeout=5s \
  CMD curl -f http://localhost:3000/health || exit 1

CMD ["node", "dist/remote-server.js"]
nginx
# nginx.conf - SSE 代理配置
server {
    listen 443 ssl;
    server_name mcp.your-company.com;

    ssl_certificate /etc/ssl/certs/mcp.crt;
    ssl_certificate_key /etc/ssl/private/mcp.key;

    location /sse {
        proxy_pass http://mcp-server:3000/sse;
        proxy_http_version 1.1;
        proxy_set_header Connection '';
        proxy_buffering off;
        proxy_cache off;
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
    }

    location /messages {
        proxy_pass http://mcp-server:3000/messages;
        proxy_http_version 1.1;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
    }

    location /health {
        proxy_pass http://mcp-server:3000/health;
    }
}

MCP 测试与部署

开发完成后,测试和部署是确保 MCP Server 质量的关键步骤。

MCP Inspector 调试工具

bash
# 安装 MCP Inspector
npm install -g @modelcontextprotocol/inspector

# 启动 Inspector(本地 stdio 服务器)
npx @modelcontextprotocol/inspector node dist/index.js

# 启动 Inspector(远程 SSE 服务器)
npx @modelcontextprotocol/inspector --url https://mcp.example.com/sse

# Inspector 提供 Web UI:
# - 查看服务器信息
# - 列出所有 Tools/Resources/Prompts
# - 交互式调用工具
# - 查看请求/响应日志

单元测试

typescript
// tests/server.test.ts
import { describe, it, expect, beforeAll, afterAll } from "vitest";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

describe("MCP Server Tests", () => {
  let client: Client;

  beforeAll(async () => {
    const transport = new StdioClientTransport({
      command: "node",
      args: ["dist/index.js"],
    });
    client = new Client({ name: "test-client", version: "1.0.0" });
    await client.connect(transport);
  });

  afterAll(async () => {
    await client.close();
  });

  it("should list tools", async () => {
    const tools = await client.listTools();
    expect(tools.tools.length).toBeGreaterThan(0);
    expect(tools.tools.find(t => t.name === "query_database")).toBeDefined();
  });

  it("should list resources", async () => {
    const resources = await client.listResources();
    expect(resources.resources.length).toBeGreaterThan(0);
  });

  it("should call query_database tool", async () => {
    const result = await client.callTool({
      name: "query_database",
      arguments: { sql: "SELECT 1", database: "staging" },
    });
    expect(result.content).toBeDefined();
  });

  it("should reject non-SELECT queries", async () => {
    const result = await client.callTool({
      name: "query_database",
      arguments: { sql: "DROP TABLE users", database: "staging" },
    });
    expect(result.content[0].text).toContain("安全限制");
  });

  it("should read schema resource", async () => {
    const resource = await client.readResource({ uri: "schema://tables" });
    expect(resource.contents).toBeDefined();
  });
});

注册到 Hermes

yaml
# ~/.hermes/config.yaml
mcp_servers:
  # 本地 TypeScript Server
  my-enterprise:
    command: node
    args: ["./my-mcp-server/dist/index.js"]
    env:
      DB_PASSWORD: ${DB_PASSWORD}
      INTERNAL_API_TOKEN: ${API_TOKEN}
      
  # 本地 Python Server
  my-py-server:
    command: python
    args: ["./my-py-mcp-server/src/server.py"]
    env:
      DB_PASSWORD: ${DB_PASSWORD}
      
  # 远程 Server
  remote-enterprise:
    url: https://mcp.your-company.com/sse
    transport: sse
    headers:
      Authorization: Bearer ${MCP_AUTH_TOKEN}

发布到 npm

bash
# 构建
npm run build

# 运行测试
npm test

# 发布
npm publish

# 发布后,用户可以这样安装使用
npx -y @your-org/my-mcp-server

Skill 开发完整指南

开发自定义 Skill,为 Worker 添加独特的能力。本节从零开始,完整讲解 Skill 的设计、开发、测试和发布流程。

Skill 目录结构

目录结构
my-skill/
├── manifest.json       # 技能元数据(必需)
├── prompt.md           # 提示词模板(必需)
├── tools/              # 工具定义
│   ├── main.py         # 主工具实现
│   └── helpers.py      # 辅助函数
├── resources/          # 资源文件
│   └── templates/      # 模板文件
├── tests/              # 测试
│   └── test_main.py
├── README.md           # 文档
└── .skillignore        # 忽略文件

manifest.json 完整配置

json
{
  "name": "api-testing",
  "version": "1.0.0",
  "description": "API 测试技能,支持 REST/GraphQL 接口测试、性能测试、安全扫描",
  "author": "your-name",
  "license": "MIT",
  "category": "testing",
  "tags": ["api", "testing", "automation", "rest", "graphql"],
  "homepage": "https://github.com/your-name/skill-api-testing",
  "repository": "https://github.com/your-name/skill-api-testing",
  
  "compatibility": {
    "openclaw": ">=1.0.0",
    "hermes": ">=1.0.0",
    "python": ">=3.10",
    "node": ">=18"
  },
  
  "dependencies": {
    "python": ">=3.10",
    "packages": ["httpx>=0.25.0", "pytest>=7.0.0", "jsonschema>=4.0.0"],
    "npm": [],
    "mcp_servers": []
  },
  
  "tools": [
    {
      "name": "send_request",
      "description": "发送 HTTP 请求并返回响应",
      "parameters": {
        "method": { "type": "string", "required": true, "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"] },
        "url": { "type": "string", "required": true },
        "headers": { "type": "object", "required": false },
        "body": { "type": "object", "required": false },
        "timeout": { "type": "number", "required": false, "default": 30 }
      }
    },
    {
      "name": "assert_response",
      "description": "断言 HTTP 响应结果",
      "parameters": {
        "response": { "type": "object", "required": true },
        "status_code": { "type": "number", "required": false },
        "body_contains": { "type": "string", "required": false },
        "json_path": { "type": "string", "required": false },
        "json_value": { "type": "any", "required": false },
        "response_time_ms": { "type": "number", "required": false }
      }
    },
    {
      "name": "run_test_suite",
      "description": "运行 API 测试套件",
      "parameters": {
        "test_file": { "type": "string", "required": true },
        "environment": { "type": "string", "required": false, "default": "staging" },
        "verbose": { "type": "boolean", "required": false, "default": false }
      }
    }
  ],
  
  "prompts": {
    "test-api": {
      "description": "对 API 端点进行完整测试",
      "template": "请对以下 API 端点进行完整测试:{{endpoint}}\n\n测试步骤:\n1. 发送正常请求验证功能\n2. 发送异常请求验证错误处理\n3. 验证响应格式和数据类型\n4. 测试边界条件\n5. 检查安全头和CORS配置\n6. 测试性能(响应时间)",
      "parameters": ["endpoint"]
    },
    "compare-apis": {
      "description": "比较两个 API 的响应差异",
      "template": "比较以下两个 API 的响应差异:\n- API A: {{api_a}}\n- API B: {{api_b}}\n\n比较维度:状态码、响应时间、数据结构、字段差异",
      "parameters": ["api_a", "api_b"]
    }
  },
  
  "config": {
    "default_timeout": 30,
    "max_retries": 3,
    "environments": {
      "staging": { "base_url": "https://staging-api.example.com" },
      "production": { "base_url": "https://api.example.com" }
    }
  }
}

prompt.md 提示词模板

markdown
# API Testing Skill

你是一个专业的 API 测试工程师。你的职责是确保 API 的功能正确性、安全性和性能。

## 测试原则

1. **全面性**: 测试正常流程和异常流程
2. **安全性**: 检查认证、授权、输入验证
3. **性能**: 关注响应时间和吞吐量
4. **可重复**: 测试结果应稳定可重复

## 测试流程

### 1. API 发现
- 获取 API 文档(OpenAPI/Swagger)
- 识别所有端点和方法
- 确认认证方式

### 2. 测试设计
- 正向测试:有效输入,预期成功
- 反向测试:无效输入,预期失败
- 边界测试:空值、最大值、特殊字符
- 安全测试:注入、越权、信息泄露

### 3. 执行测试
- 使用 send_request 工具发送请求
- 使用 assert_response 工具验证结果
- 记录所有测试结果

### 4. 报告生成
- 通过/失败统计
- 错误详情和复现步骤
- 性能指标
- 改进建议

## 输出格式

测试结果以 Markdown 表格形式输出:

| 测试用例 | 方法 | 端点 | 状态码 | 结果 | 耗时 |
|---------|------|------|--------|------|------|
| 正常登录 | POST | /auth/login | 200 | ✅ | 120ms |

工具实现

python
# tools/main.py - API 测试工具实现
import httpx
import time
import json
from typing import Any

async def send_request(
    method: str,
    url: str,
    headers: dict = None,
    body: dict = None,
    timeout: int = 30,
) -> dict[str, Any]:
    """发送 HTTP 请求并返回响应"""
    start_time = time.time()
    
    async with httpx.AsyncClient(timeout=timeout) as client:
        response = await client.request(
            method=method,
            url=url,
            headers=headers or {},
            json=body,
        )
    
    elapsed_ms = (time.time() - start_time) * 1000
    
    result = {
        "status_code": response.status_code,
        "headers": dict(response.headers),
        "body": None,
        "elapsed_ms": round(elapsed_ms, 2),
    }
    
    try:
        result["body"] = response.json()
    except:
        result["body"] = response.text[:1000]
    
    return result


async def assert_response(
    response: dict,
    status_code: int = None,
    body_contains: str = None,
    json_path: str = None,
    json_value: Any = None,
    response_time_ms: int = None,
) -> dict[str, Any]:
    """断言 HTTP 响应结果"""
    results = []
    
    if status_code is not None:
        passed = response["status_code"] == status_code
        results.append({
            "check": f"状态码 == {status_code}",
            "passed": passed,
            "actual": response["status_code"],
        })
    
    if body_contains is not None:
        body_str = json.dumps(response.get("body", ""))
        passed = body_contains in body_str
        results.append({
            "check": f"响应体包含 '{body_contains}'",
            "passed": passed,
        })
    
    if response_time_ms is not None:
        passed = response["elapsed_ms"] <= response_time_ms
        results.append({
            "check": f"响应时间 <= {response_time_ms}ms",
            "passed": passed,
            "actual": f"{response['elapsed_ms']}ms",
        })
    
    all_passed = all(r["passed"] for r in results)
    return {
        "passed": all_passed,
        "checks": results,
        "summary": f"{sum(r['passed'] for r in results)}/{len(results)} 通过",
    }

测试 Skill

bash
# 本地测试
openclaw skills test ./my-skill

# 在特定 Worker 上测试
openclaw skills test ./my-skill --worker coder

# 打包
openclaw skills pack ./my-skill

# 发布到社区
openclaw skills publish ./my-skill

# 安装发布的 Skill
openclaw skills install api-testing

高级工作流编排

超越简单的任务分发,构建支持 DAG 依赖、条件分支、并行执行、错误恢复的复杂工作流。

工作流编排模式

🔗

顺序执行(Sequential)

任务按顺序依次执行,前一步的输出是后一步的输入

并行执行(Parallel)

多个独立任务同时执行,提高效率

🔀

条件分支(Conditional)

根据执行结果动态选择下一步操作

🔄

Saga 模式

分布式事务补偿,失败时自动回滚已完成步骤

完整工作流配置:CI/CD 流水线

yaml
# ~/.hermes/workflows/ci-cd-pipeline.yaml
name: ci-cd-pipeline
description: 自动化 CI/CD 流水线
version: "1.0"

# 全局变量
variables:
  project_dir: /workspace/my-project
  test_env: staging
  deploy_env: production

# 工作流步骤(DAG 结构)
steps:
  # === 阶段1: 代码质量检查 ===
  - id: lint
    worker: coder
    skill: code-review
    input:
      path: "{{project_dir}}/src"
      checks: [style, complexity, security]
    output: lint_result
    
  - id: type-check
    worker: coder
    skill: code-generation
    input:
      action: type_check
      path: "{{project_dir}}"
    output: type_check_result

  # === 阶段2: 条件门控 ===
  - id: quality-gate
    type: condition
    depends_on: [lint, type-check]
    condition: |
      lint_result.passed == true and 
      type_check_result.errors == 0
    on_true: run-tests
    on_false: notify-failure
    
  # === 阶段3: 并行测试 ===
  - id: run-tests
    type: parallel
    depends_on: [quality-gate]
    tasks:
      - id: unit-tests
        worker: coder
        skill: testing
        input: { type: "unit", coverage: true }
        
      - id: integration-tests
        worker: coder
        skill: testing
        input: { type: "integration", env: "{{test_env}}" }
        
      - id: security-scan
        worker: reviewer
        skill: security-audit
        input: { target: "{{project_dir}}/src", depth: "full" }
        
      - id: performance-test
        worker: operator
        skill: api-testing
        input: { endpoint: "http://staging:3000/api", duration: "60s" }

  # === 阶段4: 测试结果评估 ===
  - id: test-gate
    type: condition
    depends_on: [run-tests]
    condition: |
      unit-tests.passed == true and
      integration-tests.passed == true and
      security-scan.critical == 0 and
      performance-test.p95_ms < 500
    on_true: deploy-staging
    on_false: notify-failure

  # === 阶段5: 部署到 Staging ===
  - id: deploy-staging
    worker: operator
    skill: deployment
    depends_on: [test-gate]
    input:
      environment: staging
      strategy: rolling
      health_check: true
    retry:
      max_attempts: 3
      backoff: exponential
    compensate: rollback-staging
    
  # === 阶段6: Staging 验证 ===
  - id: staging-validation
    worker: operator
    skill: api-testing
    depends_on: [deploy-staging]
    input:
      endpoint: "https://staging.example.com/api"
      test_suite: "smoke"
      
  # === 阶段7: 人工审批 ===
  - id: approval
    type: human-approval
    depends_on: [staging-validation]
    approvers: ["tech-lead", "devops-lead"]
    timeout: 24h
    on_timeout: notify-pending
    
  # === 阶段8: 部署到生产 ===
  - id: deploy-production
    worker: operator
    skill: deployment
    depends_on: [approval]
    input:
      environment: production
      strategy: canary
      canary_percentage: 10
      health_check: true
      rollback_on_failure: true
    retry:
      max_attempts: 2
    compensate: rollback-production

  # === 补偿步骤(失败回滚)===
  - id: rollback-staging
    worker: operator
    skill: deployment
    input:
      action: rollback
      environment: staging
      version: previous
      
  - id: rollback-production
    worker: operator
    skill: deployment
    input:
      action: rollback
      environment: production
      version: previous

  # === 通知步骤 ===
  - id: notify-failure
    worker: operator
    skill: notification
    input:
      channel: slack
      recipient: "#ci-cd-alerts"
      message: "❌ CI/CD 流水线失败: {{failed_step}}"
      priority: high
      
  - id: notify-pending
    worker: operator
    skill: notification
    input:
      channel: slack
      recipient: "#ci-cd-alerts"
      message: "⏳ 生产部署等待审批超过24小时"
      priority: normal

Saga 模式详解

Saga 模式用于处理分布式事务,当某个步骤失败时,自动执行补偿操作回滚已完成的步骤:

yaml
# Saga 模式示例:用户注册流程
name: user-registration-saga

steps:
  - id: create-user
    worker: coder
    input: { action: "create_user", data: "{{user_data}}" }
    compensate: { action: "delete_user", user_id: "{{create-user.user_id}}" }
    
  - id: setup-profile
    worker: coder
    depends_on: [create-user]
    input: { action: "create_profile", user_id: "{{create-user.user_id}}" }
    compensate: { action: "delete_profile", user_id: "{{create-user.user_id}}" }
    
  - id: assign-role
    worker: operator
    depends_on: [setup-profile]
    input: { action: "assign_role", user_id: "{{create-user.user_id}}", role: "user" }
    compensate: { action: "remove_role", user_id: "{{create-user.user_id}}" }
    
  - id: send-welcome-email
    worker: operator
    depends_on: [assign-role]
    input: { action: "send_email", to: "{{user_data.email}}", template: "welcome" }
    compensate: { action: "send_email", to: "{{user_data.email}}", template: "registration_failed" }

# 如果 assign-role 失败,Saga 会自动执行:
# 1. compensate setup-profile → delete_profile
# 2. compensate create-user → delete_user
# 确保数据一致性

工作流触发方式

bash
# 手动触发
hermes workflow run ci-cd-pipeline

# 带参数触发
hermes workflow run ci-cd-pipeline --var deploy_env=production

# 定时触发
hermes workflow schedule ci-cd-pipeline --cron "0 2 * * *"

# 事件触发(Git push 时自动运行)
hermes workflow trigger ci-cd-pipeline --on git.push --branch main

# 查看工作流状态
hermes workflow status ci-cd-pipeline

# 查看历史执行
hermes workflow history ci-cd-pipeline --last 10

# 取消正在执行的工作流
hermes workflow cancel ci-cd-pipeline --run-id run-abc123

安全深度防护

在生产环境中使用 AI Agent 协作架构,安全是首要考虑。本节深入讲解多层次安全防护策略。

安全架构层次

🌐 网络层
🔐 认证层
🛡️ 授权层
📦 沙箱层
📋 审计层

1. Docker 沙箱隔离

yaml
# ~/.hermes/config.yaml - 沙箱配置
terminal:
  backend: docker
  docker:
    image: "hermes-worker:latest"
    memory: "512m"
    cpus: "1.0"
    network: "none"
    read_only: true
    volumes:
      - "./workspace:/workspace:rw"
    security_opt:
      - "no-new-privileges:true"
    cap_drop:
      - ALL
    pids_limit: 100
    tmpfs:
      - "/tmp:size=100M"
    healthcheck:
      test: ["CMD", "echo", "ok"]
      interval: 30s
      timeout: 5s

2. RBAC 权限控制

yaml
# ~/.hermes/config.yaml - RBAC 配置
security:
  rbac:
    enabled: true
    
    roles:
      admin:
        permissions: ["*"]
        
      developer:
        permissions:
          - file:read
          - file:write
          - code:execute
          - git:read
          - git:write
          - mcp:use:filesystem
          - mcp:use:github
          
      analyst:
        permissions:
          - file:read
          - code:execute
          - database:query
          - mcp:use:postgres
          - mcp:use:sqlite
          
      viewer:
        permissions:
          - file:read
          - git:read
          - mcp:use:filesystem
    
    worker_roles:
      coder: developer
      analyst: analyst
      operator: developer
      reviewer: viewer
    
    # 资源级权限
    resource_permissions:
      "/workspace/production":  # 生产目录
        developer: deny
        admin: allow
      "/workspace/staging":     # 测试目录
        developer: read-only
        admin: allow

3. 审计日志

yaml
# ~/.hermes/config.yaml - 审计配置
audit:
  enabled: true
  log_level: info
  
  # 记录所有操作
  events:
    - tool_call          # 工具调用
    - file_access        # 文件访问
    - mcp_request        # MCP 请求
    - workflow_step      # 工作流步骤
    - permission_denied  # 权限拒绝
    - config_change      # 配置变更
    
  # 存储配置
  storage:
    type: file            # file | elasticsearch | syslog
    path: /var/log/hermes/audit/
    rotation: daily
    retention: 90d
    format: json
    
  # 敏感信息脱敏
  redaction:
    patterns:
      - name: api_key
        pattern: "sk-[a-zA-Z0-9]{20,}"
        replacement: "sk-***REDACTED***"
      - name: password
        pattern: "password[\":\\s]+[\"']?[^\"',}\\s]+"
        replacement: "password: ***REDACTED***"
      - name: token
        pattern: "Bearer [a-zA-Z0-9._-]+"
        replacement: "Bearer ***REDACTED***"

4. 密钥管理

yaml
# ~/.hermes/config.yaml - 密钥管理
secrets:
  provider: vault          # env | vault | aws-secrets | gcp-secrets
  
  vault:
    address: "https://vault.company.com"
    path: "secret/hermes"
    auth_method: kubernetes  # token | kubernetes | approle
    
  rotation:
    enabled: true
    interval: 30d
    notify_before: 7d
    
  injection:
    # 密钥注入方式:环境变量
    method: env
    prefix: "HERMES_SECRET_"

5. 网络安全

yaml
# ~/.hermes/config.yaml - 网络安全
network:
  # 出站规则
  outbound:
    default: deny
    allow:
      - host: "api.xidao.online"
        ports: [443]
      - host: "github.com"
        ports: [443]
      - host: "*.internal.company.com"
        ports: [443, 5432]
        
  # 速率限制
  rate_limit:
    requests_per_minute: 60
    tokens_per_minute: 100000
    
  # TLS 配置
  tls:
    min_version: "1.2"
    verify_certificates: true

性能深度优化

在多 Agent 协作系统中,性能优化直接影响用户体验和成本。本节深入讲解各个层面的优化策略。

1. 上下文压缩

yaml
# ~/.hermes/config.yaml - 上下文压缩
compression:
  mode: safeguard          # safeguard | aggressive | none
  
  # safeguard 模式:保留关键信息,压缩冗余
  safeguard:
    keep_recent_messages: 10     # 保留最近10条消息
    keep_tool_calls: true        # 保留工具调用记录
    keep_tool_results: true      # 保留工具结果
    compress_system_prompt: false # 不压缩系统提示
    max_tokens: 128000
    
  # aggressive 模式:激进压缩,节省 Token
  aggressive:
    keep_recent_messages: 5
    keep_tool_calls: false
    keep_tool_results: false
    summarize_old_messages: true
    summary_model: "google/gemini-2.5-flash"  # 用便宜模型做摘要
    
  # 触发条件
  trigger:
    threshold: 0.8             # 80% 上下文窗口时触发
    check_interval: 5          # 每5轮检查一次

2. 结果缓存

yaml
# ~/.hermes/config.yaml - 结果缓存
cache:
  enabled: true
  
  # 缓存后端
  backend: redis              # memory | redis | memcached
  redis:
    host: localhost
    port: 6379
    db: 1
    password: ${REDIS_PASSWORD}
    
  # 缓存策略
  strategies:
    # 工具调用结果缓存
    tool_results:
      enabled: true
      ttl: 3600               # 1小时
      key_pattern: "hermes:cache:tool:{{tool_name}}:{{args_hash}}"
      cacheable_tools:
        - query_database       # 数据库查询可缓存
        - search_web           # 搜索结果可缓存
      non_cacheable_tools:
        - write_file           # 写操作不缓存
        - send_notification    # 通知不缓存
        
    # 模型响应缓存
    model_responses:
      enabled: true
      ttl: 1800               # 30分钟
      key_pattern: "hermes:cache:model:{{model}}:{{prompt_hash}}"
      semantic_cache: true     # 语义缓存(相似问题复用)
      similarity_threshold: 0.95
      
  # 缓存统计
  stats:
    enabled: true
    log_interval: 60s

3. 连接池与并发

yaml
# ~/.hermes/config.yaml - 连接与并发
connection:
  pool_size: 10               # API 连接池大小
  keep_alive: true            # 保持连接
  timeout: 30                 # 连接超时(秒)
  retry_on_failure: true
  
concurrency:
  max_parallel_workers: 5     # 最大并行 Worker 数
  max_parallel_tasks: 10      # 最大并行任务数
  queue_size: 100             # 任务队列大小
  overflow_strategy: queue    # queue | reject | redirect
  
  # 流控
  rate_limit:
    api_calls_per_minute: 60
    tokens_per_minute: 200000
    burst_allowance: 10

4. Token 优化策略

策略原理节省比例配置
模型分级 简单任务用 Flash,复杂任务用 Opus 60-80% Worker 模型配置
上下文压缩 压缩旧消息,保留关键信息 30-50% compression.mode: safeguard
结果缓存 重复任务直接返回缓存 20-40% cache.enabled: true
摘要模型 用便宜模型做摘要和分类 40-60% summary_model: gemini-2.5-flash
语义缓存 相似问题复用已有回答 15-30% semantic_cache: true
批量处理 合并小任务为批量请求 10-20% batch.enabled: true

5. 性能监控仪表盘

bash
# 启动监控仪表盘
hermes dashboard --port 8080

# 关键指标
hermes stats --live

# 输出示例:
# ┌─────────────────────────────────────────────┐
# │  Hermes 性能仪表盘                          │
# ├─────────────────────────────────────────────┤
# │  请求/分钟:    45.2  ▲ +12%                 │
# │  平均延迟:     3.2s  ▼ -8%                  │
# │  Token/分钟:   125K  ▲ +5%                  │
# │  缓存命中率:   34.2% ▲ +3%                  │
# │  Worker 健康:  4/4   ✅                      │
# ├─────────────────────────────────────────────┤
# │  Worker 详情:                               │
# │  coder:     12 req/min  2.1s avg  98% ok   │
# │  analyst:   8 req/min   3.5s avg  99% ok   │
# │  operator:  20 req/min  1.2s avg  99% ok   │
# │  reviewer:  5 req/min   4.8s avg  97% ok   │
# └─────────────────────────────────────────────┘
💰 成本优化: XiDao API 统一定价,一个令牌访问所有模型。用 Gemini Flash 处理简单任务(成本最低),Claude Opus 处理复杂任务(效果最好),综合成本最优。配合缓存和压缩策略,可降低 60%+ 的 API 调用成本。

架构模式与生产部署

根据不同规模和场景选择合适的架构模式,并掌握生产环境的部署和运维。

架构模式对比

模式架构适用规模复杂度可用性
单体模式 1 Hermes + 1 OpenClaw 个人/小项目 单点故障
微服务模式 1 Hermes + N 专业化 Worker 团队/中型项目 ⭐⭐⭐ Worker 可独立扩展
联邦模式 N Hermes + N Worker 集群 跨团队/大型组织 ⭐⭐⭐⭐⭐ 高可用、跨区域

🏛️ 单体模式

适合个人开发者和小项目,最简单的部署方式:

Hermes
OpenClaw
bash
# 一键启动
hermes chat  # Hermes 自带 OpenClaw 能力

🏗️ 微服务模式(推荐)

一个 Hermes + 多个专业化 OpenClaw Worker,适合团队协作:

Hermes
MCP
Coder
Analyst
Operator
Reviewer

🌐 联邦模式(高级)

多个 Hermes 实例通过 MCP Hub 互联,适合跨团队/跨组织协作:

Hermes A
(前端团队)
MCP Hub
Hermes B
(后端团队)
yaml
# ~/.hermes/config.yaml - 联邦模式
federation:
  enabled: true
  hub_url: "https://mcp-hub.company.com"
  
  identity:
    name: "frontend-team"
    region: "asia-east"
    
  peers:
    - name: "backend-team"
      endpoint: "https://hermes-backend.company.com/mcp"
    - name: "data-team"
      endpoint: "https://hermes-data.company.com/mcp"
      
  routing:
    # 跨团队任务路由
    rules:
      - pattern: "数据库.*"
        route_to: data-team
      - pattern: "API.*开发"
        route_to: backend-team

Kubernetes 生产部署

yaml
# k8s/hermes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: hermes
  namespace: ai-agents
spec:
  replicas: 2
  selector:
    matchLabels:
      app: hermes
  template:
    metadata:
      labels:
        app: hermes
    spec:
      containers:
        - name: hermes
          image: hermes-agent:latest
          ports:
            - containerPort: 18788
          resources:
            requests:
              memory: "256Mi"
              cpu: "500m"
            limits:
              memory: "512Mi"
              cpu: "1000m"
          env:
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: hermes-secrets
                  key: api-key
            - name: OPENAI_BASE_URL
              value: "https://api.xidao.online/v1"
          livenessProbe:
            httpGet:
              path: /health
              port: 18788
            initialDelaySeconds: 10
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /health
              port: 18788
            initialDelaySeconds: 5
            periodSeconds: 10
---
# Worker Deployment(每个 Worker 类型一个)
apiVersion: apps/v1
kind: Deployment
metadata:
  name: worker-coder
  namespace: ai-agents
spec:
  replicas: 3
  selector:
    matchLabels:
      app: worker-coder
  template:
    spec:
      containers:
        - name: openclaw
          image: openclaw:latest
          env:
            - name: OPENCLAW_AGENT
              value: "coder"
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: hermes-secrets
                  key: api-key
---
# HPA 自动扩缩容
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: worker-coder-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: worker-coder
  minReplicas: 2
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

可观测性

yaml
# ~/.hermes/config.yaml - 可观测性
observability:
  # 指标导出
  metrics:
    enabled: true
    provider: prometheus
    port: 9090
    path: /metrics
    
  # 分布式追踪
  tracing:
    enabled: true
    provider: jaeger
    endpoint: "http://jaeger:14268/api/traces"
    sample_rate: 0.1  # 采样10%
    
  # 日志
  logging:
    level: info
    format: json
    output: stdout    # stdout | file | elasticsearch

灾备与恢复

bash
# 备份配置和数据
hermes backup create --output /backup/hermes-$(date +%Y%m%d).tar.gz

# 备份内容:
# - config.yaml(配置)
# - .env(密钥)
# - SOUL.md(身份)
# - memories/(记忆)
# - workflows/(工作流)

# 恢复
hermes backup restore /backup/hermes-20250109.tar.gz

# 定时备份(crontab)
0 2 * * * hermes backup create --output /backup/hermes-$(date +\%Y\%m\%d).tar.gz