MCP-002:MCP TypeScript SDK教程

随着大型语言模型(LLMs)的普及,如何为 LLMs 提供结构化、安全且标准化的外部信息和能力变得越来越重要。 Model Context Protocol (MCP) 正是为了解决这个问题而设计的。 它允许应用程序以标准化的方式为 LLMs 提供上下文,将提供上下文的职责与实际的 LLM 交互分离开来。

想象一下,你需要让一个 LLM 能够访问用户数据、调用外部 API 或者执行某个计算。 传统的方法可能涉及复杂的提示工程或者为每个 LLM 应用编写特定的集成代码。 MCP 提供了一种更优雅的解决方案:你可以构建 MCP 服务器,这些服务器通过定义好的接口向 LLM 应用程序暴露数据和功能。

MCP TypeScript SDK 是 MCP 协议在 TypeScript/JavaScript 环境下的官方实现. 使用这个 SDK,你可以轻松地:

  • 构建 MCP 客户端,连接到任何 MCP 服务器。
  • 创建 MCP 服务器,暴露资源、提示和工具。
  • 使用 标准传输协议,如 stdio 和 Streamable HTTP。
  • 处理所有 MCP 协议消息和生命周期事件。

MCP 的核心概念

构建一个 MCP 服务器需要理解几个核心概念:

1. Server (服务器):

McpServer 是你与 MCP 协议交互的核心接口。它负责连接管理、协议合规性以及消息路由。创建服务器很简单:

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
const server = new McpServer({
  name: "我的应用",
  version: "1.0.0"
});

2. Resources (资源):

Resources是向 LLMs 暴露数据的方式。它们类似于 REST API 中的 GET 端点,主要用于加载信息到 LLM 的上下文中,并且不应该执行大量的计算或产生副作用。你可以定义静态资源或带有参数的动态资源:

// 静态资源
server.resource(
  "config",
  "config://app",
  async (uri) => ({
    contents: [{
      uri: uri.href,
      text: "应用配置在这里"
    }]
  })
);
// 动态资源,例如获取用户配置
server.resource(
  "user-profile",
  new ResourceTemplate("users://{userId}/config", { list: undefined }),
  async (uri, { userId }) => ({
    contents: [{
      uri: uri.href,
      text: `用户 ${userId} 的配置数据`
    }]
  })
);

3. Tools (工具):

Tools允许 LLMs 通过你的服务器执行动作。与资源不同,工具期望执行计算并产生副作用。 它们类似于 REST API 中的 POST 端点。例如,一个计算 BMI 的工具或调用外部天气 API 的工具:

import { z } from "zod";
server.tool(
  "calculate-bmi",
  {
      weightKg: z.number(),
      heightM: z.number()
  },
  async ({ weightKg, heightM }) => ({
    content: [{
      type: "text",
      text: String(weightKg / (heightM * heightM))
    }]
  })
);
server.tool(
  "fetch-weather",
  { city: z.string() },
  async ({ city }) => {
    const response = await fetch(`https://api.weather.com/${city}`);
    const data = await response.text();
    return {
      content: [{ type: "text", text: data }]
    };
  }
);

4. Prompts (提示):

Prompts是可重用的模板,帮助 LLMs 与你的服务器有效互动。它们可以定义一系列消息,作为与服务器交互的起点。

server.prompt(
  "review-code",
  { code: z.string() },
  ({ code }) => ({
    messages: [{
      role: "user",
      content: {
        type: "text",
        text: `请评审以下代码:\n\n${code}`
      }
    }]
  })
);

运行你的 MCP 服务器

MCP 服务器需要连接到传输层 (transport) 才能与客户端通信。SDK 支持多种传输方式:

stdio:

适用于命令行工具和直接集成。服务器通过标准输入输出流进行通信。

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new McpServer({ /* ... 配置 ... */ });
// ... 设置资源、工具、提示 ...
const transport = new StdioServerTransport();
await server.connect(transport); // 开始监听 stdin/stdout

Streamable HTTP:

适用于远程服务器。它通过 HTTP 处理客户端请求和服务器到客户端的通知。可以配置为有状态(带有会话管理)或无状态。

有状态 (With Session Management): 通过会话 ID 维护客户端和服务器之间的状态。适用于需要保持上下文的场景。

import express from "express";
import { randomUUID } from "node:crypto";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js"



const app = express();
app.use(express.json());

// Map to store transports by session ID
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};

// Handle POST requests for client-to-server communication
app.post('/mcp', async (req, res) => {
  // Check for existing session ID
  const sessionId = req.headers['mcp-session-id'] as string | undefined;
  let transport: StreamableHTTPServerTransport;

  if (sessionId && transports[sessionId]) {
    // Reuse existing transport
    transport = transports[sessionId];
  } else if (!sessionId && isInitializeRequest(req.body)) {
    // New initialization request
    transport = new StreamableHTTPServerTransport({
      sessionIdGenerator: () => randomUUID(),
      onsessioninitialized: (sessionId) => {
        // Store the transport by session ID
        transports[sessionId] = transport;
      }
    });

    // Clean up transport when closed
    transport.onclose = () => {
      if (transport.sessionId) {
        delete transports[transport.sessionId];
      }
    };
    const server = new McpServer({
      name: "example-server",
      version: "1.0.0"
    });

    // ... set up server resources, tools, and prompts ...

    // Connect to the MCP server
    await server.connect(transport);
  } else {
    // Invalid request
    res.status(400).json({
      jsonrpc: '2.0',
      error: {
        code: -32000,
        message: 'Bad Request: No valid session ID provided',
      },
      id: null,
    });
    return;
  }

  // Handle the request
  await transport.handleRequest(req, res, req.body);
});

// Reusable handler for GET and DELETE requests
const handleSessionRequest = async (req: express.Request, res: express.Response) => {
  const sessionId = req.headers['mcp-session-id'] as string | undefined;
  if (!sessionId || !transports[sessionId]) {
    res.status(400).send('Invalid or missing session ID');
    return;
  }
  
  const transport = transports[sessionId];
  await transport.handleRequest(req, res);
};

// Handle GET requests for server-to-client notifications via SSE
app.get('/mcp', handleSessionRequest);

// Handle DELETE requests for session termination
app.delete('/mcp', handleSessionRequest);

app.listen(3000);

无状态 (Without Session Management): 每个请求都是独立的。适用于简单的 API 包装器或水平扩展部署.

const app = express();
app.use(express.json());

app.post('/mcp', async (req: Request, res: Response) => {
  // In stateless mode, create a new instance of transport and server for each request
  // to ensure complete isolation. A single instance would cause request ID collisions
  // when multiple clients connect concurrently.
  
  try {
    const server = getServer(); 
    const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({
      sessionIdGenerator: undefined,
    });
    res.on('close', () => {
      console.log('Request closed');
      transport.close();
      server.close();
    });
    await server.connect(transport);
    await transport.handleRequest(req, res, req.body);
  } catch (error) {
    console.error('Error handling MCP request:', error);
    if (!res.headersSent) {
      res.status(500).json({
        jsonrpc: '2.0',
        error: {
          code: -32603,
          message: 'Internal server error',
        },
        id: null,
      });
    }
  }
});

app.get('/mcp', async (req: Request, res: Response) => {
  console.log('Received GET MCP request');
  res.writeHead(405).end(JSON.stringify({
    jsonrpc: "2.0",
    error: {
      code: -32000,
      message: "Method not allowed."
    },
    id: null
  }));
});

app.delete('/mcp', async (req: Request, res: Response) => {
  console.log('Received DELETE MCP request');
  res.writeHead(405).end(JSON.stringify({
    jsonrpc: "2.0",
    error: {
      code: -32000,
      message: "Method not allowed."
    },
    id: null
  }));
});


// Start the server
const PORT = 3000;
app.listen(PORT, () => {
  console.log(`MCP Stateless Streamable HTTP Server listening on port ${PORT}`);
});

SDK 提供了 StreamableHTTPServerTransport 来实现这一点。实现细节会涉及设置 Express.js 等 HTTP 框架来路由请求。

测试与调试

你可以使用 MCP Inspector 工具来测试你的服务器。详细信息可以参考其 README。

示例一览

SDK 提供了多个示例帮助你快速上手。

Echo Server:

一个简单的服务器,演示了如何注册资源、工具和提示,并将输入原样返回。

import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";

const server = new McpServer({
  name: "Echo",
  version: "1.0.0"
});

server.resource(
  "echo",
  new ResourceTemplate("echo://{message}", { list: undefined }),
  async (uri, { message }) => ({
    contents: [{
      uri: uri.href,
      text: `Resource echo: ${message}`
    }]
  })
);

server.tool(
  "echo",
  { message: z.string() },
  async ({ message }) => ({
    content: [{ type: "text", text: `Tool echo: ${message}` }]
  })
);

server.prompt(
  "echo",
  { message: z.string() },
  ({ message }) => ({
    messages: [{
      role: "user",
      content: {
        type: "text",
        text: `Please process this message: ${message}`
      }
    }]
  })
);

SQLite Explorer:

一个更复杂的例子,展示了如何构建一个 MCP 服务器来查询 SQLite 数据库 schema 和执行 SQL 命令,将数据库能力暴露给 LLM 应用。

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import sqlite3 from "sqlite3";
import { promisify } from "util";
import { z } from "zod";

const server = new McpServer({
  name: "SQLite Explorer",
  version: "1.0.0"
});

// Helper to create DB connection
const getDb = () => {
  const db = new sqlite3.Database("database.db");
  return {
    all: promisify<string, any[]>(db.all.bind(db)),
    close: promisify(db.close.bind(db))
  };
};

server.resource(
  "schema",
  "schema://main",
  async (uri) => {
    const db = getDb();
    try {
      const tables = await db.all(
        "SELECT sql FROM sqlite_master WHERE type='table'"
      );
      return {
        contents: [{
          uri: uri.href,
          text: tables.map((t: {sql: string}) => t.sql).join("\n")
        }]
      };
    } finally {
      await db.close();
    }
  }
);

server.tool(
  "query",
  { sql: z.string() },
  async ({ sql }) => {
    const db = getDb();
    try {
      const results = await db.all(sql);
      return {
        content: [{
          type: "text",
          text: JSON.stringify(results, null, 2)
        }]
      };
    } catch (err: unknown) {
      const error = err as Error;
      return {
        content: [{
          type: "text",
          text: `Error: ${error.message}`
        }],
        isError: true
      };
    } finally {
      await db.close();
    }
  }
);

进阶用法

SDK 还支持更高级的场景:

Dynamic Servers (动态服务器):

可以在服务器连接后动态地添加、更新或移除工具/提示/资源。例如,根据用户的权限升级启用更多工具。这会自动触发 listChanged 通知给客户端.

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";

const server = new McpServer({
  name: "Dynamic Example",
  version: "1.0.0"
});

const listMessageTool = server.tool(
  "listMessages",
  { channel: z.string() },
  async ({ channel }) => ({
    content: [{ type: "text", text: await listMessages(channel) }]
  })
);

const putMessageTool = server.tool(
  "putMessage",
  { channel: z.string(), message: z.string() },
  async ({ channel, message }) => ({
    content: [{ type: "text", text: await putMessage(channel, string) }]
  })
);
// Until we upgrade auth, `putMessage` is disabled (won't show up in listTools)
putMessageTool.disable()

const upgradeAuthTool = server.tool(
  "upgradeAuth",
  { permission: z.enum(["write', vadmin"])},
  // Any mutations here will automatically emit `listChanged` notifications
  async ({ permission }) => {
    const { ok, err, previous } = await upgradeAuthAndStoreToken(permission)
    if (!ok) return {content: [{ type: "text", text: `Error: ${err}` }]}

    // If we previously had read-only access, 'putMessage' is now available
    if (previous === "read") {
      putMessageTool.enable()
    }

    if (permission === 'write') {
      // If we've just upgraded to 'write' permissions, we can still call 'upgradeAuth' 
      // but can only upgrade to 'admin'. 
      upgradeAuthTool.update({
        paramSchema: { permission: z.enum(["admin"]) }, // change validation rules
      })
    } else {
      // If we're now an admin, we no longer have anywhere to upgrade to, so fully remove that tool
      upgradeAuthTool.remove()
    }
  }
)

// Connect as normal
const transport = new StdioServerTransport();
await server.connect(transport);

Low-Level Server (低层级服务器):

对于需要更精细控制协议消息的开发者,可以直接使用低层级的 Server 类,手动设置请求处理器。

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  ListPromptsRequestSchema,
  GetPromptRequestSchema
} from "@modelcontextprotocol/sdk/types.js";

const server = new Server(
  {
    name: "example-server",
    version: "1.0.0"
  },
  {
    capabilities: {
      prompts: {}
    }
  }
);

server.setRequestHandler(ListPromptsRequestSchema, async () => {
  return {
    prompts: [{
      name: "example-prompt",
      description: "An example prompt template",
      arguments: [{
        name: "arg1",
        description: "Example argument",
        required: true
      }]
    }]
  };
});

server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  if (request.params.name !== "example-prompt") {
    throw new Error("Unknown prompt");
  }
  return {
    description: "Example prompt",
    messages: [{
      role: "user",
      content: {
        type: "text",
        text: "Example prompt text"
      }
    }]
  };
});

const transport = new StdioServerTransport();
await server.connect(transport);

Writing MCP Clients (编写 MCP 客户端):

除了构建服务器,SDK 也提供了高层级的 Client 接口,方便你编写 MCP 客户端来连接服务器,执行如列出提示、获取资源、调用工具等操作。

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

const transport = new StdioClientTransport({
  command: "node",
  args: ["server.js"]
});

const client = new Client(
  {
    name: "example-client",
    version: "1.0.0"
  }
);

await client.connect(transport);

// List prompts
const prompts = await client.listPrompts();

// Get a prompt
const prompt = await client.getPrompt({
  name: "example-prompt",
  arguments: {
    arg1: "value"
  }
});

// List resources
const resources = await client.listResources();

// Read a resource
const resource = await client.readResource({
  uri: "file:///example.txt"
});

// Call a tool
const result = await client.callTool({
  name: "example-tool",
  arguments: {
    arg1: "value"
  }
});

总结

MCP TypeScript SDK 提供了一套强大的工具,帮助开发者以标准化、安全的方式构建 LLMs 的外部能力接口。 通过定义资源、工具和提示,并选择合适的传输方式,你可以将任何应用程序的数据和功能暴露给 LLM 应用程序,开启智能应用开发的新可能。

想了解更多?请参考 MCP 协议官方文档和 TypeScript SDK 的 GitHub 仓库。