最近 MCP 这么火,了解了一段时间也该写篇总结,那就开始吧。
MCP(Model Context Protocol,模型上下文协议) ,2024年11月底,由 Anthropic 推出的一种开放标准,旨在统一大型语言模型(LLM)与外部数据源和工具之间的通信协议。 官网的介绍: https://modelcontextprotocol.io/introduction
MCP 包括几个核心功能:
假设我们提供 web 搜索功能,那么怎么通过 MCP 对接到大模型上呢?通过开发 MCP Server,于是我基于 duckduckgo 提供了文本,图片和视频搜索的 API,参考如下:
class DuckDuckGoSearch:
"""DuckDuckGo 搜索功能封装"""
def __init__(self):
self.ddgs = DDGS()
def search(self, keywords: str, max_results: int = 10, safesearch: str = 'Off',
timelimit: str = 'y') -> Dict[str, List[Dict[str, Any]]]:
"""通用文本搜索
Args:
keywords: 搜索关键词
max_results: 最大结果数量
safesearch: 安全搜索选项 ('On' or 'Off')
timelimit: 时间限制 ('d', 'w', 'm', 'y')
Returns:
包含搜索结果的字典
"""
try:
results = []
ddgs_gen = self.ddgs.text(
keywords,
safesearch=safesearch,
timelimit=timelimit,
backend="lite"
)
for r in islice(ddgs_gen, max_results):
results.append(r)
return {'results': results}
except Exception as e:
return {'results': [], 'error': str(e)}
def search_answers(self, keywords: str, max_results: int = 5) -> Dict[str, List[Dict[str, Any]]]:
"""问答搜索
Args:
keywords: 搜索关键词
max_results: 最大结果数量
Returns:
包含答案的字典
"""
try:
results = []
# 使用 text 方法替代 answers 方法
ddgs_gen = self.ddgs.text(
keywords,
safesearch='Off',
timelimit='y',
backend="lite",
region='wt-wt'# 使用全球区域
)
for r in islice(ddgs_gen, max_results):
results.append(r)
return {'results': results}
except Exception as e:
return {'results': [], 'error': str(e)}
def search_images(self, keywords: str, max_results: int = 10,
safesearch: str = 'Off') -> Dict[str, List[Dict[str, Any]]]:
"""图片搜索
Args:
keywords: 搜索关键词
max_results: 最大结果数量
safesearch: 安全搜索选项 ('On' or 'Off')
Returns:
包含图片信息的字典
"""
try:
results = []
ddgs_gen = self.ddgs.images(
keywords,
safesearch=safesearch,
timelimit=None
)
for r in islice(ddgs_gen, max_results):
results.append(r)
return {'results': results}
except Exception as e:
return {'results': [], 'error': str(e)}
def search_videos(self, keywords: str, max_results: int = 10,
safesearch: str = 'Off', resolution: str = "high") -> Dict[str, List[Dict[str, Any]]]:
"""视频搜索
Args:
keywords: 搜索关键词
max_results: 最大结果数量
safesearch: 安全搜索选项 ('On' or 'Off')
resolution: 视频分辨率 ("high" or "standard")
Returns:
包含视频信息的字典
"""
try:
results = []
ddgs_gen = self.ddgs.videos(
keywords,
safesearch=safesearch,
timelimit=None,
resolution=resolution
)
for r in islice(ddgs_gen, max_results):
results.append(r)
return {'results': results}
except Exception as e:
return {'results': [], 'error': str(e)}
以上是对于 duckduckgo 封装,除了提供搜索以外,我们需要按照规范开发 MCP Server,代码如下:
# 初始化 FastMCP 服务器
app = FastMCP('web-search')
@app.tool()
async def web_search(query: str) -> str:
"""
搜索互联网内容
Args:
query: 要搜索内容
Returns:
搜索结果的总结
"""
ddg = DuckDuckGoSearch()
return ddg.search(query)
if __name__ == "__main__":
app.run(transport='stdio')
最终引入库如下:
# !pip install duckduckgo-search
# !pip install mcp
from itertools import islice
from typing import List, Dict, Any, Optional
from mcp.server import FastMCP
from duckduckgo_search import DDGS
开发完上述的 MCP Server,通常我们是需要调试功能,使用官方的 Inspector 可视化工具来执行(首先需要安装 nodejs,确保 npx 命令可以使用),命令如下:
npx -y @modelcontextprotocol/inspector <command> <arg1> <arg2>
按照我上述文件名为 mcp_server.py
,启动:npx -y @modelcontextprotocol/inspector python3.11 mcp_server.py
,执行界面如下:
然后打开本地浏览器:http://127.0.0.1:6274
,就可以进入调试界面:
上面开发了 MCP Server,那么怎么让大模型调用 MCP Server 呢?步骤如下:
代码如下(注意这里需要通过环境变量配置 OPENAI_API_KEY 和 OPENAI_API_BASE):
import json
import asyncio
import os
from typing import Optional
from contextlib import AsyncExitStack
from openai import OpenAI
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class MCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_API_BASE"),
)
self.mode_name = "gpt-4o-mini"
asyncdef connect_to_server(self):
server_params = StdioServerParameters(
# 服务器执行的命令
command='python3.11',
# 运行的参数
args=['mcp_server.py'],
# 环境变量,默认为 None,表示使用当前环境变量
# env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params))
stdio, write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(stdio, write))
await self.session.initialize()
asyncdef process_query(self, query: str) -> str:
system_prompt = (
"You are a helpful assistant."
"You have the function of online search. "
"Please MUST call web_search tool to search the Internet content before answering."
"Please do not lose the user's question information when searching,"
"and try to maintain the completeness of the question content as much as possible."
"When there is a date related question in the user's question,"
"please use the search function directly to search and PROHIBIT inserting specific time."
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
]
# 获取所有 mcp 服务器 工具列表信息
response = await self.session.list_tools()
# 生成 function call 的描述信息
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}
} for tool in response.tools]
print(f"\n\n ========> Available tools:\n{response}\n")
# 请求 function call 的描述信息通过 tools 参数传入
response = self.client.chat.completions.create(
model=self.mode_name,
messages=messages,
tools=available_tools,
)
# 处理返回的内容
content = response.choices[0]
if content.finish_reason == "tool_calls":
# 如何是需要使用工具,就解析工具
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具
result = await self.session.call_tool(tool_name, tool_args)
print(f"\n\nCalling tool [{tool_name}] with args [{tool_args}]\nCalling tool response: [{result}]\n\n")
# 将返回的调用哪个工具数据和工具执行完成后的数据都存入messages中
messages.append(content.message.model_dump())
messages.append({
"role": "tool",
"content": result.content[0].text,
"tool_call_id": tool_call.id,
})
# 将上面的结果再返回给模型用于生产最终的结果
response = self.client.chat.completions.create(
model=self.mode_name,
messages=messages,
)
return response.choices[0].message.content
return content.message.content
asyncdef chat(self):
whileTrue:
try:
query = input("\nQuery: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("\n" + response)
except Exception as e:
import traceback
traceback.print_exc()
asyncdef cleanup(self):
"""Clean up resources"""
await self.exit_stack.aclose()
asyncdef main():
client = MCPClient()
try:
await client.connect_to_server()
await client.chat()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
Sampling 是采样,就是允许服务器通过客户端请求 LLM 完成,从而实现复杂的代理行为,同时保持安全性和隐私性,通俗的讲就是可以确认某个流程是否可以继续执行,执行顺序如下:
代码如下:
@app.tool()
asyncdef shell(cmd: str) -> str:
"""
执行 shell 脚本
Args:
cmd: 要执行的 shell 命令
Returns:
获取返回的结果
"""
# 创建 SamplingMessage 用于触发 sampling callback 函数
result = await app.get_context().session.create_message(
messages=[
SamplingMessage(
role='user', content=TextContent(
type='text', text=f'是否可以执行当前命令: {cmd} (Y/N)')
)
],
max_tokens=1024
)
print(f"result.content: {result.content}")
# 获取到 sampling callback 函数的返回值,并根据返回值进行处理
if result.content.text == 'Y':
print(f'执行命令: {cmd}')
import subprocess
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
return result.stdout
else:
print(f'拒绝执行命令: {cmd}')
returnf'命令执行被拒绝, content: {result.content}'
可以在调试界面中确认是否继续往下执行:
MCP 中提供了 Prompts 的功能,通过传入参数可以自定义 Prompt 模板,主要是方便后续可以动态生成,或者根据输入逻辑控制 LLM,样例代码如下:
@app.prompt("代码专家")
def ask_review(code_snippet: str) -> str:
return f"Please review the following code snippet for potential bugs and style issues:\n```python\n{code_snippet}\n```"
if __name__ == "__main__":
app.run(transport='stdio')
调试工具中可以直接使用:
MCP 中提供了可以使用的资源列表,允许服务器公开可由客户端读取并用作 LLM 交互上下文的数据和内容,其中资源协议格式:[protocol]://[host]/[path]
,比如可以提供文件,数据库等。
样例代码如下:
@app.resource("db://users/{user_id}/email")
async def get_user_email(user_id: str) -> str:
"""Retrieves the email address for a given user ID."""
# Replace with actual database lookup
emails = {"123": "alice@example.com", "456": "bob@example.com"}
return emails.get(user_id, "not_found@example.com")
调试工具中可以直接使用:
MCP Server 本身是没有生命周期,但是 FastMCP 为了能结合业务本身的逻辑,提供了生命周期的控制,分别是:初始化,交互通信中,服务被关闭,那么在代码中怎么控制呢?
@dataclass
class AppContext:
histories: dict
def __init__(self, histories: dict):
self.histories = histories
print(f"初始化 AppContext: {self.histories}")
@asynccontextmanager
asyncdef app_lifespan(server):
# 在 MCP 初始化时执行
histories = {}
try:
yield AppContext(histories=histories)
finally:
print(f"关闭服务器:{histories}")
# 初始化 FastMCP 服务器
app = FastMCP(
'mcp-server',
lifespan=app_lifespan,
)
做 LLM 应用开发,基本上所有的工具都集成到 LangChain,MCP 也不例外,如下是如何在 LangChain 中使用的代码:
# !pip install langchain_mcp_adapters
# !pip install langgraph
# !pip install langchain_openai
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from langchain_mcp_adapters.tools import load_mcp_tools
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
import os
import asyncio
model = ChatOpenAI(
openai_api_base=os.getenv("OPENAI_API_BASE"),
openai_api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o",
)
server_params = StdioServerParameters(
# 服务器执行的命令
command='python3.11',
# 运行的参数
args=['mcp_server.py'],
# 环境变量,默认为 None,表示使用当前环境变量
# env=None
)
asyncdef main():
asyncwith stdio_client(server_params) as (read, write):
asyncwith ClientSession(read, write) as session:
await session.initialize()
# 获取工具列表
tools = await load_mcp_tools(session)
# 创建并使用 ReAct agent
agent = create_react_agent(model, tools)
agent_response = await agent.ainvoke({'messages': '深圳天气如何?'})
print(f"agent_response: {agent_response}")
if __name__ == "__main__":
asyncio.run(main())
打开 mcp.json 可以手动配置:
{
"mcpServers": {
"mcp-server": {
"command": "python3.11",
"args": ["/Volumes/my/mpserver/blog/机器学习/code/mcp/mcp-server.py"]
}
}
}
也可以参考官方配置 SSE 协议:
{
"mcpServers": {
"server-name": {
"url": "http://localhost:3000/sse",
"env": {
"API_KEY": "value"
}
}
}
}
MCP 官方提供了很多服务,可以参考:https://mcp.so/。 另外也有一些开源项目,有兴趣可以看看:https://github.com/yzfly/Awesome-MCP-ZH?tab=readme-ov-file。
(1)https://modelcontextprotocol.io/tutorials/building-mcp-with-llms (2)https://github.com/yzfly/Awesome-MCP-ZH?tab=readme-ov-file