13 MCP协议
在前面的文章中,我们学习了如何在LangChain中定义和使用工具,但这些工具都是写死在代码里的。在实际开发中,我们经常会遇到这样的场景:你有一堆现成的工具服务,比如数据库查询服务、文件处理服务、天气API服务等等,这些服务可能用不同的语言写的,部署在不同的机器上,你希望Agent能直接调用它们,而不是把代码全部重写一遍。
这就轮到MCP出场了。
一、什么是MCP
MCP全称是Model Context Protocol,翻译过来就是模型上下文协议。它是Anthropic推出的一个开放协议,目的是让AI应用能有一个统一的方式来连接外部工具和数据源。
打个比方,如果说LLM是大脑,工具是手和脚,那MCP就像是一个"万能插头",不管你是什么工具,只要按照MCP协议来实现,Agent就能直接用。
MCP的核心思想就是把工具服务独立出来,做成一个个MCP Server,然后Agent通过MCP Client去连接这些Server,获取工具列表并调用。这样做的好处是:
- 工具可以复用,多个Agent可以共享同一套工具服务
- 工具可以用任何语言实现,不局限于Python
- 工具可以独立部署和更新,不影响Agent代码
二、快速开始
2.1 安装依赖
在LangChain中使用MCP,需要安装langchain-mcp-adapters库:
uv add langchain-mcp-adapters2.2 创建MCP Server
首先我们来创建一个简单的MCP Server,这里使用FastMCP库来快速搭建:
uv add fastmcp创建一个数学计算服务,提供加法和乘法两个工具:
# math_server.py
from fastmcp import FastMCP
# 创建MCP Server,名称为Math
mcp = FastMCP("Math")
@mcp.tool()
def add(a: int, b: int) -> int:
"""两数相加"""
return a + b
@mcp.tool()
def multiply(a: int, b: int) -> int:
"""两数相乘"""
return a * b
if __name__ == "__main__":
# 使用stdio方式启动
mcp.run(transport="stdio")再创建一个天气查询服务:
# weather_server.py
from fastmcp import FastMCP
mcp = FastMCP("Weather")
@mcp.tool()
async def get_weather(city: str) -> str:
"""根据城市名查询天气"""
# 这里模拟返回天气数据
weather_data = {
"杭州": "晴天,气温28°C~35°C",
"北京": "多云,气温22°C~30°C",
"上海": "小雨,气温25°C~31°C"
}
return weather_data.get(city, f"{city}的天气数据暂未收录")
if __name__ == "__main__":
# 使用streamable-http方式启动,监听8001端口
mcp.run(transport="streamable-http")2.3 在Agent中调用MCP工具
有了MCP Server之后,就可以在Agent中通过MultiServerMCPClient来连接并调用这些工具了:
import asyncio
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
async def main():
# 1.配置MCP Server连接信息
client = MultiServerMCPClient(
{
"math": {
"transport": "stdio", # 本地子进程方式
"command": "python",
"args": ["math_server.py"],
},
"weather": {
"transport": "http", # HTTP远程方式
"url": "http://localhost:8001/mcp",
}
}
)
# 2.获取所有MCP工具
tools = await client.get_tools()
# 3.创建Agent并传入工具
agent = create_agent("deepseek-v4-flash", tools)
# 4.调用Agent
result = await agent.ainvoke(
{"messages": [{"role": "user", "content": "杭州天气怎么样?3加5等于多少?"}]}
)
print(result["messages"][-1].content)
if __name__ == "__main__":
asyncio.run(main())注意:
MultiServerMCPClient默认是无状态的,每次工具调用都会创建一个新的会话,执行完就清理掉。
三、传输方式
MCP支持两种主要的传输方式,用来处理客户端和服务器之间的通信。
3.1 HTTP传输
HTTP传输(也叫streamable-http)是最常用的方式,适合远程调用场景。Server启动后会监听一个HTTP端口,Client通过HTTP请求来调用工具。
client = MultiServerMCPClient(
{
"weather": {
"transport": "http",
"url": "http://localhost:8001/mcp",
}
}
)如果MCP Server需要身份验证,可以在配置中添加请求头:
client = MultiServerMCPClient(
{
"weather": {
"transport": "http",
"url": "http://localhost:8001/mcp",
"headers": {
"Authorization": "Bearer your_token_here",
},
}
}
)3.2 stdio传输
stdio方式适合本地工具,Client会把Server当作一个子进程启动,通过标准输入输出来通信。这种方式不需要占用端口,适合开发和测试阶段使用。
client = MultiServerMCPClient(
{
"math": {
"transport": "stdio",
"command": "python",
"args": ["math_server.py"],
}
}
)注意: stdio方式的连接是有状态的,子进程会一直存活直到客户端断开。
四、核心功能
4.1 工具(Tools)
工具是MCP最核心的功能,MCP Server通过工具暴露可执行的函数,Agent可以像调用本地工具一样调用它们。
使用client.get_tools()获取所有工具:
client = MultiServerMCPClient({...})
tools = await client.get_tools()
agent = create_agent("deepseek-v4-flash", tools)4.2 资源(Resources)
资源允许MCP Server暴露数据,比如文件内容、数据库记录等。Client可以读取这些数据用于上下文或者RAG场景。
client = MultiServerMCPClient({...})
# 加载某个Server的所有资源
blobs = await client.get_resources("server_name")
# 或者按URI加载特定资源
blobs = await client.get_resources(
"server_name",
uris=["file:///path/to/data.txt"]
)
for blob in blobs:
print(f"URI: {blob.metadata['uri']}")
print(f"内容: {blob.as_string()}")4.3 提示词(Prompts)
MCP Server还可以提供预定义的提示词模板,Client可以直接获取这些模板来使用。
client = MultiServerMCPClient({...})
# 获取提示词模板
messages = await client.get_prompt("server_name", "summarize")
# 带参数获取
messages = await client.get_prompt(
"server_name",
"code_review",
arguments={"language": "python"}
)
for msg in messages:
print(f"{msg.type}: {msg.content}")五、高级用法
5.1 工具拦截器
MCP Server是独立运行的进程,它没法直接访问Agent的运行时信息,比如用户上下文、长期记忆这些。拦截器就是用来解决这个问题的——你可以在MCP工具调用前后插入自定义逻辑。
拦截器的写法很简单,就是一个异步函数,接收request和handler两个参数:
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.interceptors import MCPToolCallRequest
# 一个简单的日志拦截器
async def logging_interceptor(
request: MCPToolCallRequest,
handler,
):
"""记录工具调用的入参和返回结果"""
print(f"调用工具: {request.name}, 参数: {request.args}")
result = await handler(request)
print(f"工具 {request.name} 返回: {result}")
return result
# 创建Client时传入拦截器
client = MultiServerMCPClient(
{"math": {"transport": "stdio", "command": "python", "args": ["math_server.py"]}},
tool_interceptors=[logging_interceptor],
)修改请求参数
拦截器可以在工具执行前修改参数:
async def double_args_interceptor(
request: MCPToolCallRequest,
handler,
):
"""把所有数值参数翻倍"""
modified_args = {k: v * 2 for k, v in request.args.items()}
modified_request = request.override(args=modified_args)
return await handler(modified_request)
# 原本调用 add(a=2, b=3),实际会变成 add(a=4, b=6)注入用户上下文
结合create_agent的context_schema,拦截器可以把用户信息注入到MCP工具调用中:
from dataclasses import dataclass
from langchain.agents import create_agent
@dataclass
class UserContext:
user_id: str
user_name: str
async def inject_user_context(
request: MCPToolCallRequest,
handler,
):
"""把用户信息注入到工具参数中"""
runtime = request.runtime
user_id = runtime.context.user_id
user_name = runtime.context.user_name
modified_request = request.override(
args={**request.args, "user_id": user_id, "user_name": user_name}
)
return await handler(modified_request)
client = MultiServerMCPClient(
{...},
tool_interceptors=[inject_user_context],
)
tools = await client.get_tools()
agent = create_agent("deepseek-v4-flash", tools, context_schema=UserContext)
# 调用时传入用户上下文
result = await agent.ainvoke(
{"messages": [{"role": "user", "content": "查询我的订单"}]},
context=UserContext(user_id="u_001", user_name="张三")
)访问长期记忆
拦截器还可以访问Agent的长期记忆(Store),用来读取或保存用户偏好:
from langchain.agents import create_agent
from langgraph.store.memory import InMemoryStore
async def personalize_search(
request: MCPToolCallRequest,
handler,
):
"""根据用户偏好个性化搜索"""
runtime = request.runtime
user_id = runtime.context.user_id
store = runtime.store
# 从长期记忆读取用户偏好
prefs = store.get(("preferences",), user_id)
if prefs and request.name == "search":
modified_args = {
**request.args,
"language": prefs.value.get("language", "zh"),
"limit": prefs.value.get("result_limit", 10),
}
request = request.override(args=modified_args)
return await handler(request)
client = MultiServerMCPClient(
{...},
tool_interceptors=[personalize_search],
)
tools = await client.get_tools()
agent = create_agent(
"deepseek-v4-flash",
tools,
context_schema=UserContext,
store=InMemoryStore()
)组合多个拦截器
多个拦截器会按照"洋葱"模式执行,第一个拦截器是最外层:
async def outer_interceptor(request, handler):
print("外层:执行前")
result = await handler(request)
print("外层:执行后")
return result
async def inner_interceptor(request, handler):
print("内层:执行前")
result = await handler(request)
print("内层:执行后")
return result
client = MultiServerMCPClient(
{...},
tool_interceptors=[outer_interceptor, inner_interceptor],
)
# 执行顺序:
# 外层:执行前 -> 内层:执行前 -> 工具执行 -> 内层:执行后 -> 外层:执行后错误重试
拦截器还可以实现错误重试逻辑:
import asyncio
async def retry_interceptor(
request: MCPToolCallRequest,
handler,
max_retries: int = 3,
delay: float = 1.0,
):
"""失败时自动重试"""
last_error = None
for attempt in range(max_retries):
try:
return await handler(request)
except Exception as e:
last_error = e
if attempt < max_retries - 1:
wait_time = delay * (2 ** attempt)
print(f"工具 {request.name} 调用失败(第{attempt + 1}次),{wait_time}秒后重试...")
await asyncio.sleep(wait_time)
raise last_error5.2 进度通知
有些工具执行时间比较长,比如处理大文件、生成报告等,这时候可以通过回调函数来获取进度信息:
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.callbacks import Callbacks, CallbackContext
async def on_progress(
progress: float,
total: float | None,
message: str | None,
context: CallbackContext,
):
"""处理进度更新"""
percent = (progress / total * 100) if total else progress
tool_name = context.tool_name or ""
print(f"[{context.server_name}][{tool_name}] 进度: {percent:.1f}% - {message}")
client = MultiServerMCPClient(
{...},
callbacks=Callbacks(on_progress=on_progress),
)5.3 日志记录
MCP Server可以在运行时输出日志,Client通过回调来接收这些日志:
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.callbacks import Callbacks, CallbackContext
from mcp.types import LoggingMessageNotificationParams
async def on_log_message(
params: LoggingMessageNotificationParams,
context: CallbackContext,
):
"""接收Server的日志输出"""
print(f"[{context.server_name}] {params.level}: {params.data}")
client = MultiServerMCPClient(
{...},
callbacks=Callbacks(on_logging_message=on_log_message),
)5.4 有状态会话
默认情况下每次工具调用都是独立的会话,但有些场景需要保持会话状态(比如Server需要记住之前的对话上下文)。这时候可以用client.session()创建持久会话:
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain.agents import create_agent
client = MultiServerMCPClient({...})
# 创建持久会话
async with client.session("server_name") as session:
# 在这个会话中加载工具
tools = await load_mcp_tools(session)
agent = create_agent("deepseek-v4-flash", tools)
# 在同一个会话中多次调用,Server可以保持状态
result1 = await agent.ainvoke({"messages": [{"role": "user", "content": "记住我的名字叫张三"}]})
result2 = await agent.ainvoke({"messages": [{"role": "user", "content": "我叫什么名字?"}]})六、总结
MCP协议为LangChain提供了一种标准化的方式来连接外部工具服务,核心要点:
- MCP Server负责提供工具、资源、提示词等能力
- MCP Client(
MultiServerMCPClient)负责连接Server并获取这些能力 - 支持HTTP和stdio两种传输方式,分别适合远程和本地场景
- 通过拦截器可以在工具调用前后注入自定义逻辑,比如日志记录、参数修改、用户上下文注入等
在下一篇文章中,我们将学习如何在LangChain中使用多Agent协作来完成更复杂的任务。