首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >信息安全环境中的API封闭与自适配:基于格式解析的MCP生成思路

信息安全环境中的API封闭与自适配:基于格式解析的MCP生成思路

作者头像
安全风信子
发布2025-11-18 16:59:37
发布2025-11-18 16:59:37
760
举报
文章被收录于专栏:AI SPPECHAI SPPECH
前言

本文专注企业级、合法合规场景,提供一套可落地的工程化方案: 从公开/授权的格式化信息(厂商官方OpenAPI文档、Postman集合、SDK日志导出、威胁情报平台API文档)中自动解析接口描述 → 生成标准化MCP(Model/Module/Mapping/Proxy)规范 → 输出可复用适配器代码骨架。 核心价值:把奇安信、绿盟、深信服、360、微步、腾讯等厂商的公开威胁情报API(或企业内部授权片段)快速转化为上游自动化/AI系统可直接调用的安全接口。 声明:所有涉及敏感认证/签名/私钥一律为伪代码或Vault占位。

一、引言:问题与目标
  • 痛点:奇安信天眼/红雨滴、绿盟RSAS、深信服NGAF、360 Quake/天眼、微步XTI、腾讯TIX等厂商API对外高度受限(需NDA、白名单、签名、私钥),自动化平台无法直接调用。
  • 目标:在完全合规前提下,构建“格式读取 → MCP规范 → 适配器代码”流水线,5分钟内把公开文档/Postman导出转为可运行Python adapter。
  • 约束声明:仅使用官方公开/授权来源;禁止抓包、反编译、未授权探测。所有示例均基于厂商免费注册即可获取的威胁情报API。
二、总体架构(附Mermaid流程图)

图:总体架构流程图(安全中转与审计)

  • 安全中转Relay:所有私钥/签名逻辑强制在内网执行,上层只看到干净JSON。

实施步骤总览(企业落地):

  1. 收集并确认输入源:官方OpenAPI/授权Postman/SDK日志导出/平台API文档(仅公开或授权)。
  2. 解析阶段:使用多格式解析器提取端点、参数、认证方式与示例响应。
  3. 标准化转换:生成 MCP 规范(Model/Module/Mapping/Proxy),统一方法签名与安全边界。
  4. 代码输出:从模板引擎生成适配器骨架(Python/Go),抽象认证与签名至 Vault/Relay。
  5. 安全部署:将私钥与签名逻辑固化在内网 Relay,接入审计与访问控制。
  6. 质量保障:运行解析/生成/契约测试(CI),比对字段与官方文档一致性。
  7. 灰度上线:小范围启用 → 指标观察 → 扩大流量,保留审计与回滚通道。

可运行示例(Python,逐行中文注释,含环境说明)

代码语言:javascript
复制
# demo/parse_and_gen.py
"""
运行环境:
- Python 3.9+,安装依赖:pip install pyyaml pydantic requests
用途:
- 读取 Postman 集合或 OpenAPI 文件 → 提取端点 → 生成 MCP 规范 → 打印示例 JSON
"""
import json, yaml
from pathlib import Path

def load_any(path: str):
    p = Path(path)
    # 判断文件后缀,选择解析器
    if p.suffix in {'.json', '.postman_collection'}:
        return json.loads(p.read_text(encoding='utf-8'))
    if p.suffix in {'.yml', '.yaml', '.openapi', '.swagger'}:
        return yaml.safe_load(p.read_text(encoding='utf-8'))
    raise ValueError('Unsupported format')

def extract_endpoints(data):
    eps = []
    # Postman v2.1 结构检测
    if isinstance(data, dict) and 'item' in data:
        for item in data['item']:
            if 'request' in item:
                req = item['request']
                url = req.get('url', {})
                raw_url = url.get('raw') if isinstance(url, dict) else str(url)
                eps.append({'name': item.get('name'), 'method': req.get('method'), 'url': raw_url})
    # OpenAPI 3.x 结构检测
    elif isinstance(data, dict) and 'openapi' in data:
        for path, methods in data.get('paths', {}).items():
            for m, spec in methods.items():
                eps.append({'name': spec.get('operationId') or path, 'method': m.upper(), 'url': path})
    return eps

def to_mcp_spec(endpoint: dict, base_url: str = ""):
    # 拼装简化版 MCP 规范
    return {
        'name': endpoint['name'],
        'method': endpoint['method'],
        'path': endpoint['url'],
        'base_url': base_url,
        'auth_type': 'api_key',
        'responses': [{'code': 200, 'description': 'success'}]
    }

if __name__ == '__main__':
    # 示例:读取 examples/postman.json(请替换为你的路径)
    data = load_any('examples/postman.json')
    endpoints = extract_endpoints(data)
    specs = [to_mcp_spec(ep, base_url='https://api.vendor.com') for ep in endpoints]
    print(json.dumps(specs, ensure_ascii=False, indent=2))

数据表格(解析结果示例):

名称

方法

路径

认证

响应

备注

getIPReputation

GET

/api/v1/ip/reputation

api_key

200: success

奇安信示例

checkIOC

POST

/api/v1/ioc/check

bearer

200: ok

腾讯 TIX 示例

quakeSearch

POST

/api/v3/search/quake_service

api_key

200: rows

360 Quake 示例

三、合规边界(红线清单)

合法来源(推荐优先级)

  1. 厂商官方公开API文档(奇安信TI、腾讯TIX、微步XTI、360 Quake)
  2. 免费注册API Key(微步/腾讯/奇安信均提供)
  3. 企业内部Postman集合(经授权导出)
  4. SDK日志结构化导出(厂商提供)

禁止来源

  • 任何抓包、二进制反编译、未授权探测脚本
  • 绕过白名单的爆破/模糊测试
  • 传播签名算法实现

图:合规性决策流程(来源判断)

代码语言:javascript
复制
%%{init: {'theme': 'default', 'themeVariables': { 'primaryColor': '#FEF3C7', 'secondaryColor': '#DCFCE7', 'tertiaryColor': '#E0F2FE', 'lineColor': '#0f766e' }}}%%
flowchart TD
    S[开始]:::start --> Q{来源是否公开/授权?}:::decision
    Q -- 是 --> P[可使用:官方文档/公开API/授权Postman]:::pass
    Q -- 否 --> B[禁止:抓包/反编译/未授权探测/绕白名单]:::ban
    P --> N[进入格式读取与规范生成]:::next
    B --> X[终止:不纳入流水线]:::stop
    classDef start fill:#FEF3C7,stroke:#b45309,stroke-width:2;
    classDef decision fill:#FFEDD5,stroke:#c2410c,stroke-width:2;
    classDef pass fill:#DCFCE7,stroke:#166534,stroke-width:2;
    classDef ban fill:#FEE2E2,stroke:#991b1b,stroke-width:2;
    classDef next fill:#E0F2FE,stroke:#075985,stroke-width:2;
    classDef stop fill:#F3E8FF,stroke:#6b21a8,stroke-width:2;
四、格式读取(多格式统一 → 中间表示)

支持8种主流格式,单文件<100ms解析完成。

图:多格式解析工作流

中间表示: Endpoint List

代码语言:javascript
复制
# real code - parsers/universal_parser.py
import json, yaml, csv, xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List, Any

SUPPORTED = {'.json', '.yaml', '.yml', '.csv', '.postman_collection', '.openapi', '.swagger', '.xml'}

def load_any(path: str) -> Dict[str, Any]:
    p = Path(path)
    if p.suffix in {'.json', '.postman_collection'}:
        return json.loads(p.read_text(encoding='utf-8'))
    if p.suffix in {'.yml', '.yaml', '.openapi', '.swagger'}:
        return yaml.safe_load(p.read_text(encoding='utf-8'))
    if p.suffix == '.csv':
        return list(csv.DictReader(p.read_text(encoding='utf-8').splitlines()))
    raise ValueError(f"Unsupported: {p.suffix}")

def extract_endpoints(data: Any, format_type: str = 'auto') -> List[Dict]:
    endpoints = []
    # Postman v2.1
    if isinstance(data, dict) and 'item' in data:
        for item in data['item']:
            if 'request' in item:
                req = item['request']
                url = req.get('url', {})
                raw_url = url.get('raw') if isinstance(url, dict) else str(url)
                endpoints.append({
                    'name': item.get('name'),
                    'method': req.get('method'),
                    'url': raw_url,
                    'headers': req.get('header', []),
                    'query': [q['key'] for q in url.get('query', [])] if isinstance(url, dict) else [],
                    'body_example': req.get('body', {}).get('raw')
                })
    # OpenAPI 3.x
    elif isinstance(data, dict) and 'openapi' in data:
        for path, methods in data.get('paths', {}).items():
            for method, spec in methods.items():
                endpoints.append({
                    'name': spec.get('operationId') or path,
                    'method': method.upper(),
                    'url': path,
                    'parameters': spec.get('parameters', []),
                    'requestBody': spec.get('requestBody')
                })
    return endpoints
五、MCP规范生成(Interface Signature Table → 标准化JSON)

图:MCP 规范数据模型类图

代码语言:javascript
复制
# real code - spec_gen/mcp_generator.py
from pydantic import BaseModel, Field
from typing import List, Optional

class MCPParameter(BaseModel):
    name: str
    in_: str = Field(..., alias='in')  # query/header/path/body
    required: bool = True
    type: str = 'string'
    description: Optional[str]

class MCPResponse(BaseModel):
    code: int
    description: str

class MCPSpec(BaseModel):
    name: str
    method: str
    path: str
    base_url: str
    parameters: List[MCPParameter] = []
    responses: List[MCPResponse] = []
    auth_type: str = 'api_key'  # api_key/header/bearer/hmac

def to_mcp_spec(endpoint: dict, base_url: str = "") -> MCPSpec:
    return MCPSpec(
        name=endpoint['name'],
        method=endpoint['method'],
        path=endpoint['url'],
        base_url=base_url,
        parameters=[MCPParameter(name=p, in_='query') for p in endpoint.get('query', [])],
        responses=[MCPResponse(code=200, description="success")]
    ).model_dump(by_alias=True, exclude_none=True)
六、适配器生成(含安全边界)
代码语言:javascript
复制
# real code - adapters/template_adapter.py.j2
import requests
from vault import get_secret  # 企业Vault

class {{adapter_name}}Adapter:
    def __init__(self):
        self.base_url = "{{base_url}}"
        self.api_key = get_secret("qianxin-ti-key")  # Vault路径

    def {{method_name}}(self, **kwargs):
        headers = {"Authorization": f"Bearer {self.api_key}"}
        url = self.base_url + "{{path}}"
        {% if query_params %}
        params = {k: kwargs[k] for k in {{query_params}} if k in kwargs}
        {% else %}
        params = {}
        {% endif %}
        resp = requests.{{method_lower}}(url, headers=headers, params=params, timeout=10)
        resp.raise_for_status()
        return resp.json()
七、真实公开API案例(奇安信威胁情报IP信誉 v3.0)
  • 官方文档:https://ti.qianxin.com/help/?path=ip-illustration30
  • Endpoint: GET https://ti.qianxin.com/api/v1/ip/reputation
  • 参数: ip(必填)+ apikey
  • 响应示例: 图:请求-响应路径说明(奇安信IP信誉)
代码语言:javascript
复制
{
  "data": {
    "basic": {"asn": "133119", "geo": {"country": "ID"}},
    "risk_level": "high",
    "tags": ["scanner", "malware"]
  }
}

一键生成命令

代码语言:javascript
复制
python main.py --input qianxin_ip_openapi.yaml --base-url https://ti.qianxin.com
# 输出: mcp_qianxin_ip.yaml + adapters/qianxin_ip_adapter.py

生成后的adapter.py(可直接运行)

代码语言:javascript
复制
# real code - 完整可运行示例(只需替换YOUR_KEY)
import requests
from typing import Dict

class QianxinIPAdapter:
    BASE_URL = "https://ti.qianxin.com"
    ENDPOINT = "/api/v1/ip/reputation"

    def __init__(self, api_key: str):
        self.api_key = api_key

    def query(self, ip: str) -> Dict:
        params = {"ip": ip, "apikey": self.api_key}
        resp = requests.get(self.BASE_URL + self.ENDPOINT, params=params, timeout=10)
        resp.raise_for_status()
        return resp.json()

# 使用(注册免费key:https://ti.qianxin.com)
adapter = QianxinIPAdapter("YOUR_FREE_API_KEY")
result = adapter.query("8.8.8.8")
print(result['data']['risk_level'])  # 输出: low/high
八、更多厂商快速替换表

厂商

公开API示例

替换点

免费Key获取

腾讯TIX

POST https://tix.qq.com/api/v1/ioc/check

body: {“ioc”: “1.1.1.1”}

https://tix.qq.com

微步XTI

GET https://api.threatbook.cn/v3/scene/ip_reputation

params: ip + apikey

https://x.threatbook.com

360 Quake

POST https://quake.360.cn/api/v3/search/quake_service

body: {“query”: "ip:“1.1.1.1"”}

https://quake.360.cn

图:厂商替换要点总览

附:端点示例规范(模板)与契约测试(v1.0.0) 说明:每个厂商需提供 3-5 个典型端点示例,字段以官方文档为准。

端点模板(text):

代码语言:javascript
复制
${端点名称}
• 标准请求格式:METHOD /path;Headers: { Authorization / X-API-Key / Content-Type }
• 请求参数说明:表格列出 name、required(必填/选填)、type、constraints
• 成功响应示例:HTTP 200;JSON 数据结构示例(字段含义、枚举值)
• 错误码对照表:200/400/401/403/429/500(以官方文档为准)

契约测试模板(YAML):

代码语言:javascript
复制
version: v1.0.0
last_verified: 2025-11-10
endpoint: ${端点名称}
positive:
  - name: ok_200
    request:
      method: GET
      path: /api/v1/example
      params: { key: value }
    expect: { status: 200, schema_ref: official }
negative:
  - name: invalid_param
    request:
      method: GET
      path: /api/v1/example
      params: { key: "@@@" }
    expect: { status: 400 }
security:
  auth_required: true
  injection_payloads: ["' OR '1'='1", "${{bad}}"]
  expect_status: [400, 422]
performance:
  p95_ms_threshold: 1000

常见错误码对照模板(需与官方一致):

状态码

说明

200

成功

400

参数错误/格式不合法

401

未授权/认证失败

403

权限不足/IP未在白名单

429

频率限制

500

服务器内部错误

填充与验证说明:

  • 所有“模板占位”须由官方公开/授权文档字段填充;提交前跑契约测试并记录“最后验证日期”。
  • 生产环境必须走内网 Relay/Vault,避免密钥泄露;仅替换端点地址与 JSON 映射即可适配。
九、自动化测试与CI/CD
代码语言:javascript
复制
# ci/pipeline.yml
stages:
  - parse
  - gen
  - test
parse:
  script: python main.py --input *.yaml --output specs/
gen:
  script: python generator.py --specs specs/
test:
  script: pytest tests/contract/ -v

图:CI/CD流水线阶段

十、完整样板仓库(一键克隆运行)

图:一键运行流程

代码语言:javascript
复制
git clone https://github.com/yourname/mcp-security-adapter-template
cd mcp-security-adapter-template
pip install -r requirements.txt
# 放入奇安信OpenAPI.yaml
python main.py --input examples/qianxin_ip.yaml
# 生成适配器 → 立即可用
结语与行动项
  1. 立即注册奇安信TI/微步/腾讯免费Key(5分钟)
  2. 下载官方OpenAPI/Postman → 丢进仓库 → 一键生成
  3. 所有私钥强制Vault,签名逻辑部署内网Relay
  4. 合规声明:本文所有代码均基于公开文档,读者必须遵守厂商服务条款。

引用与参考(InfoSec):

  • 奇安信 TI IP 信誉 v3.0 官方帮助:https://ti.qianxin.com/help/?path=ip-illustration30
  • 腾讯 TIX 官方站点与文档入口:https://tix.qq.com
  • 微步 XTI 官方站点与文档入口:https://x.threatbook.com
  • 360 Quake 官方站点与文档入口:https://quake.360.cn
  • OpenAPI 规范参考(3.0):https://spec.openapis.org/oas/v3.0.3
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-11-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 一、引言:问题与目标
  • 二、总体架构(附Mermaid流程图)
  • 三、合规边界(红线清单)
  • 四、格式读取(多格式统一 → 中间表示)
  • 五、MCP规范生成(Interface Signature Table → 标准化JSON)
  • 六、适配器生成(含安全边界)
  • 七、真实公开API案例(奇安信威胁情报IP信誉 v3.0)
  • 八、更多厂商快速替换表
  • 九、自动化测试与CI/CD
  • 十、完整样板仓库(一键克隆运行)
  • 结语与行动项
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档