🔮 高级精通教程
开发自定义 MCP Server、编写专属 Skill、构建复杂工作流、深度安全防护与性能优化,成为 AI Agent 架构师
课程概述
本教程面向有经验的用户,将带你深入 AI Agent 架构的核心,掌握自定义开发能力,构建生产级的多 Agent 协作系统。
自定义 MCP Server
TypeScript/Python 双语言开发、远程部署、测试调试、生产发布
Skill 开发
完整开发流程:设计、编码、测试、打包、发布到社区
高级工作流
DAG 编排、条件分支、并行执行、Saga 模式、错误恢复
安全与优化
沙箱隔离、RBAC、审计日志、性能调优、成本控制
MCP 协议深度
在开发自定义 MCP Server 之前,深入理解 MCP 协议的架构和通信机制是必要的。
MCP 协议架构
(Hermes/OpenClaw)
over stdio/SSE/HTTP
(你的自定义服务)
(API/DB/文件)
MCP 通信生命周期
1. 初始化 (Initialize)
Client → Server: initialize request (capabilities, clientInfo)
Server → Client: initialize response (capabilities, serverInfo)
2. 发现 (Discovery)
Client → Server: tools/list → 获取可用工具列表
Client → Server: resources/list → 获取可用资源列表
Client → Server: prompts/list → 获取可用提示模板
3. 调用 (Invocation)
Client → Server: tools/call → 调用工具
Client → Server: resources/read → 读取资源
Client → Server: prompts/get → 获取提示模板
4. 通知 (Notification)
Server → Client: notifications/tools/list_changed → 工具列表变更
Server → Client: notifications/resources/updated → 资源更新
5. 关闭 (Shutdown)
Client → Server: shutdown notification
JSON-RPC 2.0 消息格式
// 请求 (Request)
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "query_database",
"arguments": {
"sql": "SELECT * FROM users LIMIT 10"
}
}
}
// 成功响应 (Response)
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "[{\"id\": 1, \"name\": \"Alice\"}, ...]"
}
]
}
}
// 错误响应 (Error)
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32602,
"message": "Invalid params: sql is required"
}
}
三大原语深度对比
| 原语 | 方向 | 是否可变 | 典型用途 | 注册方法 |
|---|---|---|---|---|
| Tools | Client → Server | 可动态变更 | 执行操作:查询DB、调用API、写文件 | server.tool() |
| Resources | Server → Client | 可动态更新 | 提供数据:文件内容、配置信息、Schema | server.resource() |
| Prompts | Client → Server | 静态定义 | 标准化交互:代码审查模板、分析报告模板 | server.prompt() |
TypeScript MCP Server 开发
使用官方 MCP TypeScript SDK 开发自定义服务器,这是最成熟和推荐的方案。
项目初始化
# 创建项目目录
mkdir my-mcp-server && cd my-mcp-server
# 初始化 Node.js 项目
npm init -y
# 安装依赖
npm install @modelcontextprotocol/sdk zod
npm install -D typescript @types/node tsx
# 创建 TypeScript 配置
cat > tsconfig.json << 'EOF'
{
"compilerOptions": {
"target": "ES2022",
"module": "Node16",
"moduleResolution": "Node16",
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"declaration": true
},
"include": ["src/**/*"]
}
EOF
# 创建目录结构
mkdir -p src tools resources prompts
完整 MCP Server 实现
// src/index.ts - 完整 MCP Server
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
const server = new McpServer({
name: "my-enterprise-server",
version: "1.0.0",
});
// ==========================================
// Tools(工具)- Agent 可调用的函数
// ==========================================
// 工具1: 查询企业数据库
server.tool(
"query_enterprise_db",
"查询企业数据库,支持只读 SQL 查询",
{
sql: z.string().describe("SQL 查询语句(仅支持 SELECT)"),
database: z.enum(["production", "staging", "analytics"]).describe("目标数据库"),
limit: z.number().optional().describe("结果限制数量,默认 100"),
},
async ({ sql, database, limit }) => {
// 安全校验:只允许 SELECT 语句
const normalizedSql = sql.trim().toUpperCase();
if (!normalizedSql.startsWith("SELECT") && !normalizedSql.startsWith("WITH")) {
return {
content: [{ type: "text", text: "❌ 安全限制:仅支持 SELECT/WITH 查询" }],
isError: true,
};
}
try {
const results = await executeQuery(database, sql, limit || 100);
return {
content: [{
type: "text",
text: JSON.stringify(results, null, 2),
}],
};
} catch (error) {
return {
content: [{ type: "text", text: `❌ 查询失败: ${error.message}` }],
isError: true,
};
}
}
);
// 工具2: 调用内部 API
server.tool(
"call_internal_api",
"调用企业内部 API 接口",
{
endpoint: z.string().describe("API 端点路径,如 /api/v1/users"),
method: z.enum(["GET", "POST", "PUT", "DELETE"]).describe("HTTP 方法"),
body: z.record(z.any()).optional().describe("请求体(JSON)"),
headers: z.record(z.string()).optional().describe("额外请求头"),
},
async ({ endpoint, method, body, headers }) => {
try {
const response = await fetch(`https://internal-api.company.com${endpoint}`, {
method,
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${process.env.INTERNAL_API_TOKEN}`,
...headers,
},
body: body ? JSON.stringify(body) : undefined,
});
const data = await response.json();
return {
content: [{
type: "text",
text: JSON.stringify({ status: response.status, data }, null, 2),
}],
};
} catch (error) {
return {
content: [{ type: "text", text: `❌ API 调用失败: ${error.message}` }],
isError: true,
};
}
}
);
// 工具3: 生成报告
server.tool(
"generate_report",
"生成业务报告并保存为文件",
{
report_type: z.enum(["daily", "weekly", "monthly", "custom"]).describe("报告类型"),
metrics: z.array(z.string()).describe("包含的指标列表"),
date_range: z.object({
start: z.string().describe("开始日期 YYYY-MM-DD"),
end: z.string().describe("结束日期 YYYY-MM-DD"),
}).describe("日期范围"),
format: z.enum(["markdown", "html", "pdf"]).optional().describe("输出格式,默认 markdown"),
},
async ({ report_type, metrics, date_range, format }) => {
const report = await generateBusinessReport({
type: report_type,
metrics,
dateRange: date_range,
format: format || "markdown",
});
return {
content: [
{ type: "text", text: `📊 报告已生成\n类型: ${report_type}\n指标: ${metrics.join(", ")}\n日期: ${date_range.start} ~ ${date_range.end}` },
{ type: "resource", resource: { uri: `report://${report.id}`, mimeType: "text/markdown", text: report.content } },
],
};
}
);
// ==========================================
// Resources(资源)- Agent 可读取的数据
// ==========================================
// 资源1: 数据库 Schema
server.resource(
"schema://tables",
"数据库表结构信息",
async (uri) => ({
contents: [{
uri: uri.href,
mimeType: "application/json",
text: JSON.stringify(await getTableSchemas(), null, 2),
}],
})
);
// 资源2: API 文档
server.resource(
"docs://api",
"内部 API 文档",
async (uri) => ({
contents: [{
uri: uri.href,
mimeType: "text/markdown",
text: await getApiDocumentation(),
}],
})
);
// 资源3: 动态资源模板
server.resource(
"config://{service}",
"服务配置信息",
async (uri, { service }) => ({
contents: [{
uri: uri.href,
mimeType: "application/json",
text: JSON.stringify(await getServiceConfig(service), null, 2),
}],
})
);
// ==========================================
// Prompts(提示模板)- 标准化交互
// ==========================================
server.prompt(
"analyze-data",
"数据分析提示模板,引导 Agent 进行结构化分析",
{
query: z.string().describe("分析查询"),
dataset: z.string().optional().describe("数据集名称"),
},
async ({ query, dataset }) => ({
messages: [{
role: "user",
content: {
type: "text",
text: `请对以下数据进行深入分析:
查询: ${query}
${dataset ? `数据集: ${dataset}` : ""}
分析步骤:
1. 理解数据结构和字段含义
2. 执行必要的查询获取数据
3. 进行统计分析和趋势识别
4. 识别异常值和关键发现
5. 生成可视化建议
6. 给出业务建议
请使用 query_enterprise_db 工具查询数据,结果以 Markdown 格式输出。`,
},
}],
})
);
server.prompt(
"code-review",
"代码审查提示模板",
{
file_path: z.string().describe("待审查的文件路径"),
focus: z.enum(["security", "performance", "style", "all"]).optional().describe("审查重点"),
},
async ({ file_path, focus }) => ({
messages: [{
role: "user",
content: {
type: "text",
text: `请对文件 ${file_path} 进行${focus === "all" ? "全面" : focus}审查:
审查清单:
- [ ] 代码正确性和逻辑完整性
- [ ] 安全漏洞(SQL注入、XSS、敏感信息泄露)
- [ ] 性能问题(N+1查询、内存泄漏、不必要的循环)
- [ ] 代码风格和可维护性
- [ ] 错误处理和边界情况
- [ ] 测试覆盖
请给出具体的改进建议和优先级。`,
},
}],
})
);
// ==========================================
// 启动服务器
// ==========================================
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("🚀 MCP Server 已启动: my-enterprise-server v1.0.0");
}
main().catch(console.error);
// ==========================================
// 辅助函数(示例实现)
// ==========================================
async function executeQuery(database: string, sql: string, limit: number) {
// 实际实现中连接数据库执行查询
return { rows: [], count: 0, database, limit };
}
async function getTableSchemas() {
return { tables: [] };
}
async function getApiDocumentation() {
return "# API Documentation\n...";
}
async function getServiceConfig(service: string) {
return { service, config: {} };
}
async function generateBusinessReport(options: any) {
return { id: "rpt-001", content: "# Report\n..." };
}
package.json 配置
{
"name": "my-mcp-server",
"version": "1.0.0",
"type": "module",
"main": "dist/index.js",
"bin": {
"my-mcp-server": "dist/index.js"
},
"scripts": {
"build": "tsc",
"dev": "tsx src/index.ts",
"start": "node dist/index.js",
"test": "vitest",
"lint": "eslint src/"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0",
"zod": "^3.22.0"
},
"devDependencies": {
"typescript": "^5.3.0",
"@types/node": "^20.0.0",
"tsx": "^4.0.0",
"vitest": "^1.0.0"
}
}
Python MCP Server 开发
使用 Python MCP SDK 开发自定义服务器,适合数据科学和 AI 场景。
项目初始化
# 创建项目
mkdir my-py-mcp-server && cd my-py-mcp-server
# 创建虚拟环境
python -m venv .venv
source .venv/bin/activate
# 安装依赖
pip install mcp httpx pydantic
# 创建目录结构
mkdir -p src tests
完整 Python MCP Server
# src/server.py - 完整 Python MCP Server
import json
import os
from typing import Any
from mcp.server import Server
from mcp.types import (
Tool, TextContent, ImageContent, EmbeddedResource,
Resource, Prompt, PromptMessage, PromptArgument,
)
import mcp.server.stdio
server = Server("my-enterprise-server")
# ==========================================
# 工具注册
# ==========================================
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query_database",
description="查询企业数据库,支持只读 SQL",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL 查询语句(仅 SELECT)",
},
"database": {
"type": "string",
"enum": ["production", "staging", "analytics"],
"description": "目标数据库",
},
"limit": {
"type": "number",
"description": "结果限制,默认 100",
},
},
"required": ["sql", "database"],
},
),
Tool(
name="send_notification",
description="发送企业通知(Slack/邮件/钉钉)",
inputSchema={
"type": "object",
"properties": {
"channel": {
"type": "string",
"enum": ["slack", "email", "dingtalk"],
"description": "通知渠道",
},
"recipient": {
"type": "string",
"description": "接收者(邮箱/用户ID/群ID)",
},
"message": {
"type": "string",
"description": "通知内容",
},
"priority": {
"type": "string",
"enum": ["low", "normal", "high", "urgent"],
"description": "优先级",
},
},
"required": ["channel", "recipient", "message"],
},
),
Tool(
name="analyze_data",
description="使用 Python 分析数据,支持 pandas/numpy/scikit-learn",
inputSchema={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "Python 分析代码",
},
"data_source": {
"type": "string",
"description": "数据源路径或 SQL 查询",
},
},
"required": ["code"],
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
if name == "query_database":
sql = arguments["sql"].strip().upper()
if not (sql.startswith("SELECT") or sql.startswith("WITH")):
return [TextContent(type="text", text="❌ 安全限制:仅支持 SELECT/WITH 查询")]
database = arguments["database"]
limit = arguments.get("limit", 100)
try:
results = await execute_query(database, arguments["sql"], limit)
return [TextContent(type="text", text=json.dumps(results, indent=2, ensure_ascii=False))]
except Exception as e:
return [TextContent(type="text", text=f"❌ 查询失败: {str(e)}")]
elif name == "send_notification":
channel = arguments["channel"]
recipient = arguments["recipient"]
message = arguments["message"]
priority = arguments.get("priority", "normal")
try:
result = await send_notification(channel, recipient, message, priority)
return [TextContent(type="text", text=f"✅ 通知已发送\n渠道: {channel}\n接收: {recipient}\n优先级: {priority}")]
except Exception as e:
return [TextContent(type="text", text=f"❌ 发送失败: {str(e)}")]
elif name == "analyze_data":
code = arguments["code"]
data_source = arguments.get("data_source")
try:
result = await execute_analysis(code, data_source)
return [TextContent(type="text", text=result)]
except Exception as e:
return [TextContent(type="text", text=f"❌ 分析失败: {str(e)}")]
return [TextContent(type="text", text=f"未知工具: {name}")]
# ==========================================
# 资源注册
# ==========================================
@server.list_resources()
async def list_resources() -> list[Resource]:
return [
Resource(
uri="schema://tables",
name="数据库表结构",
mimeType="application/json",
),
Resource(
uri="config://services",
name="服务配置",
mimeType="application/json",
),
]
@server.read_resource()
async def read_resource(uri: str) -> str:
if uri == "schema://tables":
return json.dumps(await get_table_schemas(), indent=2, ensure_ascii=False)
elif uri.startswith("config://"):
service = uri.replace("config://", "")
return json.dumps(await get_service_config(service), indent=2, ensure_ascii=False)
raise ValueError(f"未知资源: {uri}")
# ==========================================
# 提示模板注册
# ==========================================
@server.list_prompts()
async def list_prompts() -> list[Prompt]:
return [
Prompt(
name="analyze-data",
description="数据分析提示模板",
arguments=[
PromptArgument(name="query", description="分析查询", required=True),
PromptArgument(name="dataset", description="数据集名称", required=False),
],
),
]
@server.get_prompt()
async def get_prompt(name: str, arguments: dict[str, str]) -> list[PromptMessage]:
if name == "analyze-data":
query = arguments.get("query", "")
dataset = arguments.get("dataset", "")
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"请分析: {query}\n数据集: {dataset or '自动选择'}\n\n步骤: 1.获取数据 2.统计分析 3.可视化建议 4.业务建议",
),
)
]
raise ValueError(f"未知提示模板: {name}")
# ==========================================
# 启动
# ==========================================
async def main():
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())
远程 MCP Server 开发
当 MCP Server 需要部署在远程服务器上时,使用 SSE 或 Streamable HTTP 传输方式。
SSE 远程 Server(TypeScript)
// src/remote-server.ts - SSE 远程 MCP Server
import express from "express";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
import cors from "cors";
const app = express();
app.use(cors());
app.use(express.json());
const server = new McpServer({
name: "remote-enterprise-server",
version: "1.0.0",
});
// 注册工具(同本地版本)
server.tool("query_database", "查询企业数据库", {
sql: z.string(),
database: z.enum(["production", "staging", "analytics"]),
}, async ({ sql, database }) => {
const results = await executeQuery(database, sql);
return { content: [{ type: "text", text: JSON.stringify(results, null, 2) }] };
});
// SSE 端点
let transport: SSEServerTransport | null = null;
app.get("/sse", (req, res) => {
transport = new SSEServerTransport("/messages", res);
server.connect(transport);
});
app.post("/messages", (req, res) => {
if (transport) {
transport.handlePostMessage(req, res);
}
});
// 健康检查
app.get("/health", (req, res) => {
res.json({ status: "ok", version: "1.0.0" });
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`🚀 Remote MCP Server running on port ${PORT}`);
console.log(`📡 SSE endpoint: http://localhost:${PORT}/sse`);
});
客户端连接远程 Server
# ~/.hermes/config.yaml - 连接远程 MCP Server
mcp_servers:
remote-enterprise:
url: https://mcp.your-company.com/sse
transport: sse
headers:
Authorization: Bearer ${MCP_AUTH_TOKEN}
X-Company-ID: your-company-id
远程部署(Docker + Nginx)
# Dockerfile
FROM node:20-slim
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY dist/ ./dist/
EXPOSE 3000
HEALTHCHECK --interval=30s --timeout=5s \
CMD curl -f http://localhost:3000/health || exit 1
CMD ["node", "dist/remote-server.js"]
# nginx.conf - SSE 代理配置
server {
listen 443 ssl;
server_name mcp.your-company.com;
ssl_certificate /etc/ssl/certs/mcp.crt;
ssl_certificate_key /etc/ssl/private/mcp.key;
location /sse {
proxy_pass http://mcp-server:3000/sse;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
}
location /messages {
proxy_pass http://mcp-server:3000/messages;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
location /health {
proxy_pass http://mcp-server:3000/health;
}
}
MCP 测试与部署
开发完成后,测试和部署是确保 MCP Server 质量的关键步骤。
MCP Inspector 调试工具
# 安装 MCP Inspector
npm install -g @modelcontextprotocol/inspector
# 启动 Inspector(本地 stdio 服务器)
npx @modelcontextprotocol/inspector node dist/index.js
# 启动 Inspector(远程 SSE 服务器)
npx @modelcontextprotocol/inspector --url https://mcp.example.com/sse
# Inspector 提供 Web UI:
# - 查看服务器信息
# - 列出所有 Tools/Resources/Prompts
# - 交互式调用工具
# - 查看请求/响应日志
单元测试
// tests/server.test.ts
import { describe, it, expect, beforeAll, afterAll } from "vitest";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
describe("MCP Server Tests", () => {
let client: Client;
beforeAll(async () => {
const transport = new StdioClientTransport({
command: "node",
args: ["dist/index.js"],
});
client = new Client({ name: "test-client", version: "1.0.0" });
await client.connect(transport);
});
afterAll(async () => {
await client.close();
});
it("should list tools", async () => {
const tools = await client.listTools();
expect(tools.tools.length).toBeGreaterThan(0);
expect(tools.tools.find(t => t.name === "query_database")).toBeDefined();
});
it("should list resources", async () => {
const resources = await client.listResources();
expect(resources.resources.length).toBeGreaterThan(0);
});
it("should call query_database tool", async () => {
const result = await client.callTool({
name: "query_database",
arguments: { sql: "SELECT 1", database: "staging" },
});
expect(result.content).toBeDefined();
});
it("should reject non-SELECT queries", async () => {
const result = await client.callTool({
name: "query_database",
arguments: { sql: "DROP TABLE users", database: "staging" },
});
expect(result.content[0].text).toContain("安全限制");
});
it("should read schema resource", async () => {
const resource = await client.readResource({ uri: "schema://tables" });
expect(resource.contents).toBeDefined();
});
});
注册到 Hermes
# ~/.hermes/config.yaml
mcp_servers:
# 本地 TypeScript Server
my-enterprise:
command: node
args: ["./my-mcp-server/dist/index.js"]
env:
DB_PASSWORD: ${DB_PASSWORD}
INTERNAL_API_TOKEN: ${API_TOKEN}
# 本地 Python Server
my-py-server:
command: python
args: ["./my-py-mcp-server/src/server.py"]
env:
DB_PASSWORD: ${DB_PASSWORD}
# 远程 Server
remote-enterprise:
url: https://mcp.your-company.com/sse
transport: sse
headers:
Authorization: Bearer ${MCP_AUTH_TOKEN}
发布到 npm
# 构建
npm run build
# 运行测试
npm test
# 发布
npm publish
# 发布后,用户可以这样安装使用
npx -y @your-org/my-mcp-server
Skill 开发完整指南
开发自定义 Skill,为 Worker 添加独特的能力。本节从零开始,完整讲解 Skill 的设计、开发、测试和发布流程。
Skill 目录结构
my-skill/
├── manifest.json # 技能元数据(必需)
├── prompt.md # 提示词模板(必需)
├── tools/ # 工具定义
│ ├── main.py # 主工具实现
│ └── helpers.py # 辅助函数
├── resources/ # 资源文件
│ └── templates/ # 模板文件
├── tests/ # 测试
│ └── test_main.py
├── README.md # 文档
└── .skillignore # 忽略文件
manifest.json 完整配置
{
"name": "api-testing",
"version": "1.0.0",
"description": "API 测试技能,支持 REST/GraphQL 接口测试、性能测试、安全扫描",
"author": "your-name",
"license": "MIT",
"category": "testing",
"tags": ["api", "testing", "automation", "rest", "graphql"],
"homepage": "https://github.com/your-name/skill-api-testing",
"repository": "https://github.com/your-name/skill-api-testing",
"compatibility": {
"openclaw": ">=1.0.0",
"hermes": ">=1.0.0",
"python": ">=3.10",
"node": ">=18"
},
"dependencies": {
"python": ">=3.10",
"packages": ["httpx>=0.25.0", "pytest>=7.0.0", "jsonschema>=4.0.0"],
"npm": [],
"mcp_servers": []
},
"tools": [
{
"name": "send_request",
"description": "发送 HTTP 请求并返回响应",
"parameters": {
"method": { "type": "string", "required": true, "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"] },
"url": { "type": "string", "required": true },
"headers": { "type": "object", "required": false },
"body": { "type": "object", "required": false },
"timeout": { "type": "number", "required": false, "default": 30 }
}
},
{
"name": "assert_response",
"description": "断言 HTTP 响应结果",
"parameters": {
"response": { "type": "object", "required": true },
"status_code": { "type": "number", "required": false },
"body_contains": { "type": "string", "required": false },
"json_path": { "type": "string", "required": false },
"json_value": { "type": "any", "required": false },
"response_time_ms": { "type": "number", "required": false }
}
},
{
"name": "run_test_suite",
"description": "运行 API 测试套件",
"parameters": {
"test_file": { "type": "string", "required": true },
"environment": { "type": "string", "required": false, "default": "staging" },
"verbose": { "type": "boolean", "required": false, "default": false }
}
}
],
"prompts": {
"test-api": {
"description": "对 API 端点进行完整测试",
"template": "请对以下 API 端点进行完整测试:{{endpoint}}\n\n测试步骤:\n1. 发送正常请求验证功能\n2. 发送异常请求验证错误处理\n3. 验证响应格式和数据类型\n4. 测试边界条件\n5. 检查安全头和CORS配置\n6. 测试性能(响应时间)",
"parameters": ["endpoint"]
},
"compare-apis": {
"description": "比较两个 API 的响应差异",
"template": "比较以下两个 API 的响应差异:\n- API A: {{api_a}}\n- API B: {{api_b}}\n\n比较维度:状态码、响应时间、数据结构、字段差异",
"parameters": ["api_a", "api_b"]
}
},
"config": {
"default_timeout": 30,
"max_retries": 3,
"environments": {
"staging": { "base_url": "https://staging-api.example.com" },
"production": { "base_url": "https://api.example.com" }
}
}
}
prompt.md 提示词模板
# API Testing Skill
你是一个专业的 API 测试工程师。你的职责是确保 API 的功能正确性、安全性和性能。
## 测试原则
1. **全面性**: 测试正常流程和异常流程
2. **安全性**: 检查认证、授权、输入验证
3. **性能**: 关注响应时间和吞吐量
4. **可重复**: 测试结果应稳定可重复
## 测试流程
### 1. API 发现
- 获取 API 文档(OpenAPI/Swagger)
- 识别所有端点和方法
- 确认认证方式
### 2. 测试设计
- 正向测试:有效输入,预期成功
- 反向测试:无效输入,预期失败
- 边界测试:空值、最大值、特殊字符
- 安全测试:注入、越权、信息泄露
### 3. 执行测试
- 使用 send_request 工具发送请求
- 使用 assert_response 工具验证结果
- 记录所有测试结果
### 4. 报告生成
- 通过/失败统计
- 错误详情和复现步骤
- 性能指标
- 改进建议
## 输出格式
测试结果以 Markdown 表格形式输出:
| 测试用例 | 方法 | 端点 | 状态码 | 结果 | 耗时 |
|---------|------|------|--------|------|------|
| 正常登录 | POST | /auth/login | 200 | ✅ | 120ms |
工具实现
# tools/main.py - API 测试工具实现
import httpx
import time
import json
from typing import Any
async def send_request(
method: str,
url: str,
headers: dict = None,
body: dict = None,
timeout: int = 30,
) -> dict[str, Any]:
"""发送 HTTP 请求并返回响应"""
start_time = time.time()
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.request(
method=method,
url=url,
headers=headers or {},
json=body,
)
elapsed_ms = (time.time() - start_time) * 1000
result = {
"status_code": response.status_code,
"headers": dict(response.headers),
"body": None,
"elapsed_ms": round(elapsed_ms, 2),
}
try:
result["body"] = response.json()
except:
result["body"] = response.text[:1000]
return result
async def assert_response(
response: dict,
status_code: int = None,
body_contains: str = None,
json_path: str = None,
json_value: Any = None,
response_time_ms: int = None,
) -> dict[str, Any]:
"""断言 HTTP 响应结果"""
results = []
if status_code is not None:
passed = response["status_code"] == status_code
results.append({
"check": f"状态码 == {status_code}",
"passed": passed,
"actual": response["status_code"],
})
if body_contains is not None:
body_str = json.dumps(response.get("body", ""))
passed = body_contains in body_str
results.append({
"check": f"响应体包含 '{body_contains}'",
"passed": passed,
})
if response_time_ms is not None:
passed = response["elapsed_ms"] <= response_time_ms
results.append({
"check": f"响应时间 <= {response_time_ms}ms",
"passed": passed,
"actual": f"{response['elapsed_ms']}ms",
})
all_passed = all(r["passed"] for r in results)
return {
"passed": all_passed,
"checks": results,
"summary": f"{sum(r['passed'] for r in results)}/{len(results)} 通过",
}
测试 Skill
# 本地测试
openclaw skills test ./my-skill
# 在特定 Worker 上测试
openclaw skills test ./my-skill --worker coder
# 打包
openclaw skills pack ./my-skill
# 发布到社区
openclaw skills publish ./my-skill
# 安装发布的 Skill
openclaw skills install api-testing
高级工作流编排
超越简单的任务分发,构建支持 DAG 依赖、条件分支、并行执行、错误恢复的复杂工作流。
工作流编排模式
顺序执行(Sequential)
任务按顺序依次执行,前一步的输出是后一步的输入
并行执行(Parallel)
多个独立任务同时执行,提高效率
条件分支(Conditional)
根据执行结果动态选择下一步操作
Saga 模式
分布式事务补偿,失败时自动回滚已完成步骤
完整工作流配置:CI/CD 流水线
# ~/.hermes/workflows/ci-cd-pipeline.yaml
name: ci-cd-pipeline
description: 自动化 CI/CD 流水线
version: "1.0"
# 全局变量
variables:
project_dir: /workspace/my-project
test_env: staging
deploy_env: production
# 工作流步骤(DAG 结构)
steps:
# === 阶段1: 代码质量检查 ===
- id: lint
worker: coder
skill: code-review
input:
path: "{{project_dir}}/src"
checks: [style, complexity, security]
output: lint_result
- id: type-check
worker: coder
skill: code-generation
input:
action: type_check
path: "{{project_dir}}"
output: type_check_result
# === 阶段2: 条件门控 ===
- id: quality-gate
type: condition
depends_on: [lint, type-check]
condition: |
lint_result.passed == true and
type_check_result.errors == 0
on_true: run-tests
on_false: notify-failure
# === 阶段3: 并行测试 ===
- id: run-tests
type: parallel
depends_on: [quality-gate]
tasks:
- id: unit-tests
worker: coder
skill: testing
input: { type: "unit", coverage: true }
- id: integration-tests
worker: coder
skill: testing
input: { type: "integration", env: "{{test_env}}" }
- id: security-scan
worker: reviewer
skill: security-audit
input: { target: "{{project_dir}}/src", depth: "full" }
- id: performance-test
worker: operator
skill: api-testing
input: { endpoint: "http://staging:3000/api", duration: "60s" }
# === 阶段4: 测试结果评估 ===
- id: test-gate
type: condition
depends_on: [run-tests]
condition: |
unit-tests.passed == true and
integration-tests.passed == true and
security-scan.critical == 0 and
performance-test.p95_ms < 500
on_true: deploy-staging
on_false: notify-failure
# === 阶段5: 部署到 Staging ===
- id: deploy-staging
worker: operator
skill: deployment
depends_on: [test-gate]
input:
environment: staging
strategy: rolling
health_check: true
retry:
max_attempts: 3
backoff: exponential
compensate: rollback-staging
# === 阶段6: Staging 验证 ===
- id: staging-validation
worker: operator
skill: api-testing
depends_on: [deploy-staging]
input:
endpoint: "https://staging.example.com/api"
test_suite: "smoke"
# === 阶段7: 人工审批 ===
- id: approval
type: human-approval
depends_on: [staging-validation]
approvers: ["tech-lead", "devops-lead"]
timeout: 24h
on_timeout: notify-pending
# === 阶段8: 部署到生产 ===
- id: deploy-production
worker: operator
skill: deployment
depends_on: [approval]
input:
environment: production
strategy: canary
canary_percentage: 10
health_check: true
rollback_on_failure: true
retry:
max_attempts: 2
compensate: rollback-production
# === 补偿步骤(失败回滚)===
- id: rollback-staging
worker: operator
skill: deployment
input:
action: rollback
environment: staging
version: previous
- id: rollback-production
worker: operator
skill: deployment
input:
action: rollback
environment: production
version: previous
# === 通知步骤 ===
- id: notify-failure
worker: operator
skill: notification
input:
channel: slack
recipient: "#ci-cd-alerts"
message: "❌ CI/CD 流水线失败: {{failed_step}}"
priority: high
- id: notify-pending
worker: operator
skill: notification
input:
channel: slack
recipient: "#ci-cd-alerts"
message: "⏳ 生产部署等待审批超过24小时"
priority: normal
Saga 模式详解
Saga 模式用于处理分布式事务,当某个步骤失败时,自动执行补偿操作回滚已完成的步骤:
# Saga 模式示例:用户注册流程
name: user-registration-saga
steps:
- id: create-user
worker: coder
input: { action: "create_user", data: "{{user_data}}" }
compensate: { action: "delete_user", user_id: "{{create-user.user_id}}" }
- id: setup-profile
worker: coder
depends_on: [create-user]
input: { action: "create_profile", user_id: "{{create-user.user_id}}" }
compensate: { action: "delete_profile", user_id: "{{create-user.user_id}}" }
- id: assign-role
worker: operator
depends_on: [setup-profile]
input: { action: "assign_role", user_id: "{{create-user.user_id}}", role: "user" }
compensate: { action: "remove_role", user_id: "{{create-user.user_id}}" }
- id: send-welcome-email
worker: operator
depends_on: [assign-role]
input: { action: "send_email", to: "{{user_data.email}}", template: "welcome" }
compensate: { action: "send_email", to: "{{user_data.email}}", template: "registration_failed" }
# 如果 assign-role 失败,Saga 会自动执行:
# 1. compensate setup-profile → delete_profile
# 2. compensate create-user → delete_user
# 确保数据一致性
工作流触发方式
# 手动触发
hermes workflow run ci-cd-pipeline
# 带参数触发
hermes workflow run ci-cd-pipeline --var deploy_env=production
# 定时触发
hermes workflow schedule ci-cd-pipeline --cron "0 2 * * *"
# 事件触发(Git push 时自动运行)
hermes workflow trigger ci-cd-pipeline --on git.push --branch main
# 查看工作流状态
hermes workflow status ci-cd-pipeline
# 查看历史执行
hermes workflow history ci-cd-pipeline --last 10
# 取消正在执行的工作流
hermes workflow cancel ci-cd-pipeline --run-id run-abc123
安全深度防护
在生产环境中使用 AI Agent 协作架构,安全是首要考虑。本节深入讲解多层次安全防护策略。
安全架构层次
1. Docker 沙箱隔离
# ~/.hermes/config.yaml - 沙箱配置
terminal:
backend: docker
docker:
image: "hermes-worker:latest"
memory: "512m"
cpus: "1.0"
network: "none"
read_only: true
volumes:
- "./workspace:/workspace:rw"
security_opt:
- "no-new-privileges:true"
cap_drop:
- ALL
pids_limit: 100
tmpfs:
- "/tmp:size=100M"
healthcheck:
test: ["CMD", "echo", "ok"]
interval: 30s
timeout: 5s
2. RBAC 权限控制
# ~/.hermes/config.yaml - RBAC 配置
security:
rbac:
enabled: true
roles:
admin:
permissions: ["*"]
developer:
permissions:
- file:read
- file:write
- code:execute
- git:read
- git:write
- mcp:use:filesystem
- mcp:use:github
analyst:
permissions:
- file:read
- code:execute
- database:query
- mcp:use:postgres
- mcp:use:sqlite
viewer:
permissions:
- file:read
- git:read
- mcp:use:filesystem
worker_roles:
coder: developer
analyst: analyst
operator: developer
reviewer: viewer
# 资源级权限
resource_permissions:
"/workspace/production": # 生产目录
developer: deny
admin: allow
"/workspace/staging": # 测试目录
developer: read-only
admin: allow
3. 审计日志
# ~/.hermes/config.yaml - 审计配置
audit:
enabled: true
log_level: info
# 记录所有操作
events:
- tool_call # 工具调用
- file_access # 文件访问
- mcp_request # MCP 请求
- workflow_step # 工作流步骤
- permission_denied # 权限拒绝
- config_change # 配置变更
# 存储配置
storage:
type: file # file | elasticsearch | syslog
path: /var/log/hermes/audit/
rotation: daily
retention: 90d
format: json
# 敏感信息脱敏
redaction:
patterns:
- name: api_key
pattern: "sk-[a-zA-Z0-9]{20,}"
replacement: "sk-***REDACTED***"
- name: password
pattern: "password[\":\\s]+[\"']?[^\"',}\\s]+"
replacement: "password: ***REDACTED***"
- name: token
pattern: "Bearer [a-zA-Z0-9._-]+"
replacement: "Bearer ***REDACTED***"
4. 密钥管理
# ~/.hermes/config.yaml - 密钥管理
secrets:
provider: vault # env | vault | aws-secrets | gcp-secrets
vault:
address: "https://vault.company.com"
path: "secret/hermes"
auth_method: kubernetes # token | kubernetes | approle
rotation:
enabled: true
interval: 30d
notify_before: 7d
injection:
# 密钥注入方式:环境变量
method: env
prefix: "HERMES_SECRET_"
5. 网络安全
# ~/.hermes/config.yaml - 网络安全
network:
# 出站规则
outbound:
default: deny
allow:
- host: "api.xidao.online"
ports: [443]
- host: "github.com"
ports: [443]
- host: "*.internal.company.com"
ports: [443, 5432]
# 速率限制
rate_limit:
requests_per_minute: 60
tokens_per_minute: 100000
# TLS 配置
tls:
min_version: "1.2"
verify_certificates: true
性能深度优化
在多 Agent 协作系统中,性能优化直接影响用户体验和成本。本节深入讲解各个层面的优化策略。
1. 上下文压缩
# ~/.hermes/config.yaml - 上下文压缩
compression:
mode: safeguard # safeguard | aggressive | none
# safeguard 模式:保留关键信息,压缩冗余
safeguard:
keep_recent_messages: 10 # 保留最近10条消息
keep_tool_calls: true # 保留工具调用记录
keep_tool_results: true # 保留工具结果
compress_system_prompt: false # 不压缩系统提示
max_tokens: 128000
# aggressive 模式:激进压缩,节省 Token
aggressive:
keep_recent_messages: 5
keep_tool_calls: false
keep_tool_results: false
summarize_old_messages: true
summary_model: "google/gemini-2.5-flash" # 用便宜模型做摘要
# 触发条件
trigger:
threshold: 0.8 # 80% 上下文窗口时触发
check_interval: 5 # 每5轮检查一次
2. 结果缓存
# ~/.hermes/config.yaml - 结果缓存
cache:
enabled: true
# 缓存后端
backend: redis # memory | redis | memcached
redis:
host: localhost
port: 6379
db: 1
password: ${REDIS_PASSWORD}
# 缓存策略
strategies:
# 工具调用结果缓存
tool_results:
enabled: true
ttl: 3600 # 1小时
key_pattern: "hermes:cache:tool:{{tool_name}}:{{args_hash}}"
cacheable_tools:
- query_database # 数据库查询可缓存
- search_web # 搜索结果可缓存
non_cacheable_tools:
- write_file # 写操作不缓存
- send_notification # 通知不缓存
# 模型响应缓存
model_responses:
enabled: true
ttl: 1800 # 30分钟
key_pattern: "hermes:cache:model:{{model}}:{{prompt_hash}}"
semantic_cache: true # 语义缓存(相似问题复用)
similarity_threshold: 0.95
# 缓存统计
stats:
enabled: true
log_interval: 60s
3. 连接池与并发
# ~/.hermes/config.yaml - 连接与并发
connection:
pool_size: 10 # API 连接池大小
keep_alive: true # 保持连接
timeout: 30 # 连接超时(秒)
retry_on_failure: true
concurrency:
max_parallel_workers: 5 # 最大并行 Worker 数
max_parallel_tasks: 10 # 最大并行任务数
queue_size: 100 # 任务队列大小
overflow_strategy: queue # queue | reject | redirect
# 流控
rate_limit:
api_calls_per_minute: 60
tokens_per_minute: 200000
burst_allowance: 10
4. Token 优化策略
| 策略 | 原理 | 节省比例 | 配置 |
|---|---|---|---|
| 模型分级 | 简单任务用 Flash,复杂任务用 Opus | 60-80% | Worker 模型配置 |
| 上下文压缩 | 压缩旧消息,保留关键信息 | 30-50% | compression.mode: safeguard |
| 结果缓存 | 重复任务直接返回缓存 | 20-40% | cache.enabled: true |
| 摘要模型 | 用便宜模型做摘要和分类 | 40-60% | summary_model: gemini-2.5-flash |
| 语义缓存 | 相似问题复用已有回答 | 15-30% | semantic_cache: true |
| 批量处理 | 合并小任务为批量请求 | 10-20% | batch.enabled: true |
5. 性能监控仪表盘
# 启动监控仪表盘
hermes dashboard --port 8080
# 关键指标
hermes stats --live
# 输出示例:
# ┌─────────────────────────────────────────────┐
# │ Hermes 性能仪表盘 │
# ├─────────────────────────────────────────────┤
# │ 请求/分钟: 45.2 ▲ +12% │
# │ 平均延迟: 3.2s ▼ -8% │
# │ Token/分钟: 125K ▲ +5% │
# │ 缓存命中率: 34.2% ▲ +3% │
# │ Worker 健康: 4/4 ✅ │
# ├─────────────────────────────────────────────┤
# │ Worker 详情: │
# │ coder: 12 req/min 2.1s avg 98% ok │
# │ analyst: 8 req/min 3.5s avg 99% ok │
# │ operator: 20 req/min 1.2s avg 99% ok │
# │ reviewer: 5 req/min 4.8s avg 97% ok │
# └─────────────────────────────────────────────┘
架构模式与生产部署
根据不同规模和场景选择合适的架构模式,并掌握生产环境的部署和运维。
架构模式对比
| 模式 | 架构 | 适用规模 | 复杂度 | 可用性 |
|---|---|---|---|---|
| 单体模式 | 1 Hermes + 1 OpenClaw | 个人/小项目 | ⭐ | 单点故障 |
| 微服务模式 | 1 Hermes + N 专业化 Worker | 团队/中型项目 | ⭐⭐⭐ | Worker 可独立扩展 |
| 联邦模式 | N Hermes + N Worker 集群 | 跨团队/大型组织 | ⭐⭐⭐⭐⭐ | 高可用、跨区域 |
🏛️ 单体模式
适合个人开发者和小项目,最简单的部署方式:
# 一键启动
hermes chat # Hermes 自带 OpenClaw 能力
🏗️ 微服务模式(推荐)
一个 Hermes + 多个专业化 OpenClaw Worker,适合团队协作:
🌐 联邦模式(高级)
多个 Hermes 实例通过 MCP Hub 互联,适合跨团队/跨组织协作:
(前端团队)
(后端团队)
# ~/.hermes/config.yaml - 联邦模式
federation:
enabled: true
hub_url: "https://mcp-hub.company.com"
identity:
name: "frontend-team"
region: "asia-east"
peers:
- name: "backend-team"
endpoint: "https://hermes-backend.company.com/mcp"
- name: "data-team"
endpoint: "https://hermes-data.company.com/mcp"
routing:
# 跨团队任务路由
rules:
- pattern: "数据库.*"
route_to: data-team
- pattern: "API.*开发"
route_to: backend-team
Kubernetes 生产部署
# k8s/hermes-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: hermes
namespace: ai-agents
spec:
replicas: 2
selector:
matchLabels:
app: hermes
template:
metadata:
labels:
app: hermes
spec:
containers:
- name: hermes
image: hermes-agent:latest
ports:
- containerPort: 18788
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1000m"
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: hermes-secrets
key: api-key
- name: OPENAI_BASE_URL
value: "https://api.xidao.online/v1"
livenessProbe:
httpGet:
path: /health
port: 18788
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 18788
initialDelaySeconds: 5
periodSeconds: 10
---
# Worker Deployment(每个 Worker 类型一个)
apiVersion: apps/v1
kind: Deployment
metadata:
name: worker-coder
namespace: ai-agents
spec:
replicas: 3
selector:
matchLabels:
app: worker-coder
template:
spec:
containers:
- name: openclaw
image: openclaw:latest
env:
- name: OPENCLAW_AGENT
value: "coder"
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: hermes-secrets
key: api-key
---
# HPA 自动扩缩容
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: worker-coder-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: worker-coder
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
可观测性
# ~/.hermes/config.yaml - 可观测性
observability:
# 指标导出
metrics:
enabled: true
provider: prometheus
port: 9090
path: /metrics
# 分布式追踪
tracing:
enabled: true
provider: jaeger
endpoint: "http://jaeger:14268/api/traces"
sample_rate: 0.1 # 采样10%
# 日志
logging:
level: info
format: json
output: stdout # stdout | file | elasticsearch
灾备与恢复
# 备份配置和数据
hermes backup create --output /backup/hermes-$(date +%Y%m%d).tar.gz
# 备份内容:
# - config.yaml(配置)
# - .env(密钥)
# - SOUL.md(身份)
# - memories/(记忆)
# - workflows/(工作流)
# 恢复
hermes backup restore /backup/hermes-20250109.tar.gz
# 定时备份(crontab)
0 2 * * * hermes backup create --output /backup/hermes-$(date +\%Y\%m\%d).tar.gz