首页
学习
活动
专区
圈层
工具
发布

加速AI集成:优化MCP服务器连接到Flink SQL Gateway以提升性能

在我最近的两篇博客文章中,我解释了什么是 MCP [1] 以及为什么 MCP 是下一代 AI 驱动架构——模型导向架构(MOA)[2] 的关键推动力。

在本文中,我们将深入探讨如何构建 MCP 服务器,并演示如何使用 MCP 将 Apache Flink 作为数据计算引擎“连接”到 LLM 生态系统中。

为什么我选择 Flink SQL Gateway 作为示例?我将在本文末尾揭示一个显而易见的原因。还有一个更大的愿景将在未来的文章中介绍。

这篇文章将专注于编写 MCP 服务器。如果您想了解 MCP 架构的概述,官方的 MCP 文档 [3] 已经很好地解释了 MCP 客户端、MCP 服务器、协议层、传输层等核心概念。如果您不熟悉这些内容,我建议您先查看一下。

我们将使用 Python 来实现 MCP 服务器。目标是使用户能够利用 AI 代理——例如 Claude Desktop——与 Flink SQL Gateway 无缝交互。

让我们开始吧。

MCP 服务器核心概念

MCP 服务器将交互上下文分解为几个定义的组件:Prompts、Resources、Tools、Images、Sampling、Root 和 Context。为了保持我们的重点,我们将覆盖其中三个最重要的组件:

Prompts

Prompts 是预先编写的、可重用的模板,定义了 LLM 应如何与用户交互。您可以将它们视为由预定义的令牌、用户输入令牌以及这些元素的组合方式组成的结构化指令。

Prompts 可以是:

• 带有变量的静态模板

• 指导模型通过一系列步骤进行多步工作的工作流程,包含多个动态消息

Prompts 通常是用户驱动的。

Resources

Resources 扩展了 prompts 的概念,通过向 LLM 暴露类似文件的数据。这些可以包括纯文本、PDF、音频文件、视频或其他二进制内容。Resources 不需要位于与 MCP 服务器或客户端相同的主机上——它们可以从远程位置检索。

一个重要的考虑因素:开发人员在准备资源时应避免进行重计算。

Resources 通常是应用驱动的。

Tools

Tools 提供了一种抽象,使 LLM 能够调用功能。在 MCP 的上下文中,工具告诉模型,您可以触发一个动作并使用结果,而不是从提示中生成令牌或从资源中读取。

这就是 MCP 特别强大的地方。

如果生成令牌不是目标怎么办?

如果目标是与外部系统交互并改变其状态呢?

这种能力将 MCP 从一个令牌获取协议转变为模型驱动架构的基础——模型不仅仅是读取,而是采取行动。

Tools 通常是模型控制的。

Flink SQL Gateway 概览

Flink SQL Gateway 是一个服务,充当提交和管理 Apache Flink 上的 SQL 查询的集中入口点,这是一个分布式流和批处理框架。它通过允许用户/客户端(通过 REST、JDBC 或其他 API)来简化与 Flink 的交互:

• 管理会话和连接。

• 提交用于流/批处理的 SQL 查询。

• 监控查询执行。

在这篇博客文章中,我们将仅使用本地运行的 Flink 集群来演示 MCP 服务器的工作原理。

要开始,请参考 Flink 官方文档 [4],了解如何在本地下载、解压和启动 Flink 的说明:

$ ./bin/start-cluster.sh

如果您看到以下输出,则 Flink 集群运行良好:

None

通过调用以下命令在本地启动 SQL Gateway:

./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost

仔细检查 SQL Gateway 是否正在运行:

$ curl http://localhost:8083/v1/info

根据下载的 Flink 版本,您应该得到类似于以下内容的响应:

{"productName":"Apache Flink","version":"2.0.0"}为 Flink SQL Gateway 构建 MCP 服务器

我建议使用 Python UV 来管理您的项目。简而言之,Python UV 是一个快速、现代的 Python 包安装和解析器,旨在替代 pip 和 pip-tools 等工具。它旨在解决 Python 的依赖管理和安装瓶颈,重点关注速度、可靠性和简单性。其 Github 仓库中的官方文档 [5] 是一个好的起点。

Python 项目初始化

// initialize project

$ uv init flink-mcp-server

$ cd flink-mcp-server

// create virtual env

$ uv venv

$ source .venv/bin/activate

// add dependency

$ uv add "mcp[cli]"

$ uv add "httx"

$ uv add "python-dotenv"

请注意,默认情况下,uv 创建一个扁平的项目结构。对于较大或生产级项目,您可能希望切换到更易于维护的 src 布局 [6]。

构建 MCP 服务器

首先,创建一个空的 mcp_server.py 文件:

### mcp_server.py

import logging

from api_client import APIClient

from dotenv import load_dotenv

import os

import httpx

from mcp.server.fastmcp import FastMCP

### Configure logging to log to a file

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[

logging.FileHandler("tmp/mcp_server.log"),

logging.StreamHandler()

])

logger = logging.getLogger(__name__)

### Replace with your actual SQL Gateway base URL

SQL_GATEWAY_BASE_URL = "http://localhost:8083"

load_dotenv()

base_url = os.getenv('SQL_GATEWAY_API_BASE_URL', SQL_GATEWAY_BASE_URL)

logger.info(f"Flink MCP Server Connecting to: {base_url}")

api = APIClient(base_url)创建 MCP 服务器

mcp = FastMCP("Flink SQLGateway MCP 服务器")

## TODO: 添加 mcp 提示、资源、工具

if __name__ == "__main__":

  # 运行 MCP 服务器

  mcp.run(transport="stdio")

APIClient 是一个封装 httpx 调用并处理异常的类。

现在我们来创建以下 mcp 工具:

• get_flink_cluster_info(): 获取 Flink 集群的元数据

• open_a_new_session(): 为 Flink 会话集群打开一个新会话

• get_session_handle_config(…): 获取会话句柄配置

• execute_a_query(…): 使用给定的 session_handle 执行 SQL 查询语句

• fetch_result(…): 获取操作的结果

请注意,为您的 @mcp.tool() 方法编写高质量的文档字符串非常重要。清晰而准确的文档有助于 LLM 理解何时以及如何使用工具。文档越准确,LLM 在对话中与工具的互动就会越好。以下代码片段展示了 execute_a_query(…) 的示例:

@mcp.tool()

async def execute_a_query(session_handle: str, query: str) -> str:

  """使用给定的 session_handle 执行 SQL 查询语句

  参数:

      session_handle (str): 由 open_a_new_session() 返回并记住以供将来使用的 session_handle。

      query (str): 要执行的 SQL 查询。

  返回:

      一条消息,指示操作句柄以供将来使用。

  """

  try:

      url = f'/v3/sessions/{session_handle}/statements'

      logger.info(f"执行查询: {url},语句: {query}")  # 调试行以验证 URL

      response = api.post(url, {"statement": query})

      operation_handle = response.get('operationHandle')

      return f"operation_handle: {operation_handle}"

  except httpx.RequestError as e:

      return f"请求错误: {e}"

为了帮助 LLM 理解如何与 Flink SQL Gateway 互动,您还可以定义自定义 MCP 提示。这些提示引导 LLM 通过特定的工作流程。例如,您可以实现 manage_session_handle() 来教 LLM 如何有效管理会话句柄:

@mcp.prompt()

def manage_session_handle() -> str:

  return (f"请检查 session_handle 是否已存在。如果没有,请通过调用 open_a_new_session() 创建一个新的 session_handle。"

          f"保存 session_handle 以供将来使用。"

          f"如果没有 session_handle 或者它已过期,您需要通过调用 open_a_new_session() 创建一个新的 session_handle。请记得保存以供将来使用。")测试您的 MCP 服务器

官方 MCP 文档描述了如何安装和使用 MCP Inspector:

$ npx @modelcontextprotocol/inspector uv <path_to_the_file>/mcp_server.py

还有另一种方法可以进行检查:

$ uv pip install mcp

$ mcp dev mcp_server.py在 Claude Desktop 中安装 MPC 服务器

打开 Claude Desktop,从菜单中打开文件,然后选择设置。选择开发者,然后点击“编辑配置”,这将指向 claude_desktop_config.json。打开 json 文件并添加:

{

"mcpServers":{

  "Flink SQLGateway MCP 服务器":{

    "command":"/Users/<user_name>/.local/bin/uv",

    "args":[

      "run",

      "--with",

      "mcp",

      "--with",

      "httpx",

      "<path_to_project>/src/sql_gateway/mcp_server.py"

    ]

  }

}

}

关闭 Claude Desktop,重新打开它,您将看到 mcp-flink-server 已安装。

通过 Claude Desktop 作为 AI 代理与 Flink SQL Gateway 互动

现在让我们与 Claude 交谈。首先,我们询问有关 Flink 集群的信息。

None

在适当的 mcp 工具和 mcp 提示开发完成后,正如您所看到的,Claude 能够检索并推断出许多有关运行中 Flink 集群的有用信息:

• 它识别出 Flink 版本为 2.0.0

• 检测到有一个 TaskManager

• 注意到配置了一个任务槽

• 确定并行度设置为 1

更令人印象深刻的是 Claude 的推断能力:它得出结论认为该集群似乎是为开发或测试目的设置的,适合本地工作流程。更有趣的是,Claude 知道 Flink SQL Gateway,测试了连接并确认其正常工作——尽管我在最初的问题中从未提到 SQL Gateway。

多亏了我构建的 MCP 服务器,Claude 能够检索相关元数据并利用其推理能力提供超出我预期的见解。

为了说明 LLM 如何帮助我们的日常工作并提高整体效率,我故意停止了对 Claude 的进一步指示。

几分钟后……

让我们尝试执行一个 SQL 查询。由于这只是一个示例 MCP 服务器,用于演示概念,我尚未配置完整的上游/下游数据源和连接器。我们来尝试一个非常简单的 "SELECT 1" 语句:

None

接连不断的惊喜——Claude 知道会话已过期,自动创建了一个新会话,执行了查询并获取了结果。所有这些都是自动发生的,这要归功于 MCP 服务器中提供的良好定义的工具和提示。

所有这些操作,如会话管理、查询执行和结果检索,都是由 LLM 推断并处理的。作为最终用户,我唯一提供的输入是查询语句:“SELECT 1”。

一切都将因AI而改变

从这个演示中,我们可以清楚地看到:

• 当与MCP Server集成以处理特定领域任务时,LLMs的有效性

• LLMs在协助用户和减少手动工作方面的智能程度

• 如何将低级技术细节抽象化,使尖端技术对非专家可访问,从而使他们能够专注于业务逻辑

我们能进一步做吗?绝对可以!

一个自然的下一步是集成一个文本到SQL的MCP Server。这将允许用户使用自然语言与Flink集群进行交互。问题可以被转换为可执行的SQL语句,使系统变得更加直观和强大。

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OmnwDit3X22EpZpvJ4LtcwfQ0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。
领券