
先讲一个笔者最近生产环境真实遇到的问题。
我们做了一个故障根因分析智能体——一有故障事件冒出来,它就自动去各个系统里查一圈,最后写出一份《故障根因分析报告》。干完这一摊事,快则几十秒,慢则两三分钟。
一开始我们图省事,用了最直白的接法:发一个请求过去,然后就干等着,等它把整份报告写完,再一次性拿回来。在自己电脑上测,一切正常;可一搬到线上,怪事来了——碰到复杂点的事件,报告就大面积超时、拿不回来。
查到最后才发现,问题不在智能体本身,而在它前面那道"门卫"——网关。这道门卫有条铁规矩:一个请求最多只放行 120 秒,超了就直接把连接掐掉。偏偏智能体一旦"想"得久一点、报告写得长一点,就容易超过两分钟。于是出现了很尴尬的一幕:后台明明还在埋头苦写,门卫却已经先一步把客户打发走了,前端只收到一个冷冰冰的超时报错。
解决办法说出来还有点反直觉:把调用方式从"一次性拿结果"换成"流式(streaming)边写边拿",问题就没了。 同样是两三分钟的活,流式就能稳稳地把报告拿回来。
凭什么换个姿势就行?这正是本文要聊的。顺着这个问题往下挖,会牵出大模型 / 智能体 API 流式输出的一整套门道——从"AI 的字为什么是一个一个往外蹦的",到"流式凭什么能绕过网关那 120 秒的限制",再到把它真正搬上生产环境会踩到的那些坑。
用过 ChatGPT 或 Claude 的人都见过这样的画面:你按下回车,AI 的回答像打字机一样,一个字一个字地浮现在屏幕上。
很多人以为这是个前端动画——为了显得"像人在打字",故意把已经生成好的答案放慢播放。事实恰恰相反:那些字真的是刚刚才被"算"出来的。大语言模型生成文本的方式是自回归的,它每次只产出一个 token(可以粗略理解为一个字或半个词),然后把这个 token 连同之前的内容一起作为输入,再产出下一个。模型在服务端"想"到哪,屏幕上就显示到哪。
这就引出了一个工程问题:一个完整回答可能要生成几百上千个 token,耗时几秒到几十秒。服务端是等全部生成完再一次性返回,还是生成一点就推送一点?
前者叫非流式调用(non-streaming),后者叫流式调用(streaming)。打个比方:非流式像中餐宴席,所有菜在后厨做齐了一起上桌,客人前半小时只能干等;流式像日料板前,师傅做好一贯就递一贯,你从第一分钟就开始吃了。
两种方式下,总耗时几乎一样——模型该算多久还是多久。流式改变的不是总时长,而是体验的分布:用户从"等 30 秒看到全部"变成"等 0.5 秒看到第一个字"。衡量这件事的核心指标叫 TTFT(Time To First Token,首 token 延迟),它和"总时长"是流式时代最重要的两个延迟指标。
但流式的价值不止于体验。回到前记里那个 120 秒的网关:非流式调用在长任务期间整条连接上一个字节都没有动静,对中间设备来说,这跟一条卡死的连接没有区别,超时就被掐了;而流式调用全程都在持续往回写数据块,连接始终"活着"。这个"长连接保活"的特性,后面第五节会展开——它正是我们绕过网关超时的钥匙。
要让服务端"生成一点推一点",普通的 HTTP 请求-响应模式(一问一答、答完即断)显然不够用。业界有几个候选方案:轮询、WebSocket、SSE。大模型 API 几乎清一色选择了 SSE(Server-Sent Events),这不是巧合。
SSE 本质上是一个不关闭的 HTTP 响应。客户端发起一次普通的 HTTP POST 请求,服务端返回 Content-Type: text/event-stream,然后保持连接不断开,持续向客户端写入一行行文本格式的事件:
event: content_block_delta
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "text_delta", "text": "你好"}}每个事件由 event:(事件名)和 data:(JSON 载荷)组成,事件之间用空行分隔。客户端按行解析,收到一个事件处理一个。
为什么不用功能更强的 WebSocket?因为通信方向决定了选型:
WebSocket 并非没有用武之地——语音对话这类需要客户端持续上传音频、服务端持续下发回复的真双向场景(如各家的 Realtime API),用的就是 WebSocket。但对于"发一个请求、收一串回答"的主流场景,SSE 是更克制、更正确的选择。
知道了传输管道,再来看管道里流的是什么。以 Anthropic 的 Messages API 为例(其他厂商结构思路类似,事件名不同),发起请求时只需在请求体里加一个字段:
JSON
{
"model": "claude-sonnet-4-5",
"max_tokens": 1024,
"stream": true,
"messages": [{"role": "user", "content": "你好"}]
}服务端返回的事件流有清晰的"三层嵌套"结构,像一份有开头、正文、结尾的电报:
event: message_start ← 整条消息开始,带消息 ID、模型名等元数据
event: content_block_start ← 第一个内容块开始(类型:text)
event: content_block_delta ← 增量:"你"
event: content_block_delta ← 增量:"好"
event: content_block_delta ← 增量:"!"
event: content_block_stop ← 第一个内容块结束
event: message_delta ← 消息级元数据更新:停止原因、token 用量
event: message_stop ← 整条消息结束理解这个结构的关键是 content block(内容块) 这个概念:一条 AI 回复不是一整段平铺的文本,而是由若干个块组成的序列——可能是一段文本、一次思考过程、一次工具调用。每个块都有自己的 start → delta(若干次)→ stop 生命周期,块与块用 index 编号区分。
客户端的工作,本质上就是一台状态机:收到 message_start 初始化一个空消息对象;收到 content_block_start 在对应 index 上挂一个新块;收到 content_block_delta 把增量追加到对应的块上;收到各种 stop 关闭对应层级。中间还会穿插 ping 事件保活,出错时则会收到 error 事件。
实际工程里你很可能不止对接一家。好消息是:主流厂商的流式设计是同构的——分块推送、增量累加、末尾收尾——只是命名和封装不同。把它们摆在一张表里,迁移成本一目了然:
维度 | Anthropic Messages | OpenAI Chat Completions | Google Gemini |
|---|---|---|---|
开启方式 | 请求体 stream: true | 请求体 stream: true | 用 streamGenerateContent 端点 |
事件组织 | 命名事件 event: xxx + 三层生命周期 | 扁平 chunk,无事件名 | candidates 分块 |
文本增量字段 | delta.text(text_delta) | choices[0].delta.content | candidates[0].content.parts[].text |
工具调用增量 | input_json_delta.partial_json(JSON 片段) | delta.tool_calls[].function.arguments(JSON 片段) | functionCall(通常整块给出) |
结束信号 | message_stop 事件 | 末行 data: [DONE] | 流自然结束 + finishReason |
用量统计 | message_start / message_delta 里的 usage | 末块 usage(需开 stream_options) | usageMetadata |
记住一句话就够了:命名不同,设计同构,迁移理解成本很低。 你只要在自己的状态机里把"增量从哪个字段取、结束信号怎么判断"做成可替换的适配层,换厂商就只是换适配器。
这一点在我们前记里的内部网关上也成立——它的事件名既不是 Anthropic 的也不是 OpenAI 的,而是网关自己包的一层(后面会看到真实代码),但"逐块推、按字段累加"的内核完全一样。
到这里,纯聊天场景的流式就讲完了。但如果只讲到这里,这篇文章和官方文档没有区别。真正有意思的部分在后面:当主角从"聊天机器人"变成"智能体"时,流式会遇到什么新问题。
在往下挖之前,先把一条分界线画清楚——纯大模型调用和智能体 API 调用,根本不是一回事。
最典型的纯大模型调用,就是 OpenAI 式的 chat.completions:你发一段 messages 过去,模型回一段文本(流式就是把这段文本切片推回来),一问一答、一次请求一次响应,到此结束。模型只负责"说",不负责"做";调用方只管把字显示出来,不需要替它执行任何动作。哪怕开了流式,本质仍是"一条流 = 一次完整回答"。
智能体 API 调用则是另一套玩法:模型不光会说,还会要求你替它做事——发起工具调用、等你把结果喂回去、再接着说,如此往复,直到任务真正完成。所以一次智能体"任务"通常不是一条流,而是多条流的串联,中间夹着你的程序在执行工具。两者的差异可以摆成一张表:
维度 | 纯大模型调用(OpenAI 式 chat) | 智能体 API 调用(agent) |
|---|---|---|
交互形态 | 一问一答,单次请求-响应 | 多轮循环(agent loop),请求串联 |
模型能做什么 | 只输出文本/内容 | 输出文本 + 发起工具调用,能驱动外部动作 |
调用方角色 | 收下文本、渲染即可 | 要执行工具、把结果回传、维护整个循环 |
一次"任务"等于 | 一条流 | 多条流 + 中间的工具执行时间 |
怎么算结束 | 流收到结束信号即完成 | 看 stop_reason:tool_use 表示"还没完,该你干活" |
状态维护 | 基本无需跨轮维护 | 每轮完整输出都要回传,状态全在调用方手里 |
流的连续性 | 速率相对均匀 | 流内有停顿、流之间有工具执行的"静默期" |
请注意:这两者用的往往是同一套底层模型、同一套流式协议(第三节那套 content block 事件原样适用),区别不在传输层,而在交互模型——纯大模型是"一次性的口述",智能体是"带反馈回路的协作"。下面三小节,就沿着这张表里"智能体特有"的那几列往下挖:工具调用怎么流、循环怎么串、思考过程怎么流。
智能体能"做事",靠的就是上表里那一列工具调用(tool use / function calling):模型不直接回答,而是输出一个结构化的"调用请求",比如 {"name": "query_cmdb", "input": {"ip": "10.2.3.4"}},由你的程序代为执行后把结果交还给模型。
问题来了:这个调用请求的参数是一个 JSON 对象,而模型生成任何东西都是逐 token 的——所以这个 JSON 也是被切成碎片流出来的。在事件流里,它长这样:
event: content_block_start ← 类型:tool_use,工具名:query_cmdb
event: content_block_delta ← partial_json: '{"ip'
event: content_block_delta ← partial_json: '": "10.'
event: content_block_delta ← partial_json: '2.3.4"}'
event: content_block_stop注意 delta 的类型从文本场景的 text_delta 变成了 input_json_delta,载荷字段是 partial_json——一段段不完整的 JSON 字符串片段。切分位置完全不讲道理,可能正好切在键名中间、引号中间。
这给客户端带来了文本流没有的难题:
'{"ip' 不是合法 JSON,你必须把一个块内的所有 partial_json 片段按顺序拼接,等到 content_block_stop 之后才能完整解析、执行工具。content_block_start 时就显示"⚙ 正在调用 query_cmdb…"的占位状态,把碎片拼接的过程藏在后台。更大的不同在宏观层面。一次智能体任务往往不是"一问一答",而是一个循环:
用户提问
│
▼
┌─ 模型流式输出 ──────────────────────┐
│ "我先查一下这台机器的信息"(文本流) │
│ query_cmdb({"ip": "10.2.3.4"})(工具流)│
└──────────── stop_reason: tool_use ──┘
│
▼ ← 流暂停,你的程序执行工具(可能要几秒)
│
▼ 把工具结果作为新消息追加,再次调用 API
┌─ 模型再次流式输出 ───────────────────┐
│ "查到了,这台机器属于告警工作流平台…" │
└──────────── stop_reason: end_turn ──┘每一轮 API 调用都是一条独立的 SSE 流,以 stop_reason: tool_use 结束就意味着"该你干活了"。整个 agent loop 是多条流的串联,中间夹着不归模型管的工具执行时间。
这带来几个工程上必须想清楚的问题:
第一,用户感知的"流"和实际的"流"不是一回事。 用户看到的是一个连续的对话气泡,实际背后可能是三四条 SSE 流加两三次工具执行。前端需要把"模型在说话""智能体在干活""模型继续说话"这几种状态编排成连贯的体验——工具执行期间没有任何 token 流出,如果不给用户"正在执行…"的反馈,界面会出现尴尬的静止。
第二,流内部也会有停顿。 即使在一条流内部,事件到达的速率也不均匀。官方文档明确提示:使用工具时,事件之间可能出现可感知的延迟。所以客户端的超时设置不能按"每 100ms 必有数据"来设计,要区分"连接级超时"和"事件间隔超时"。
第三,状态在循环间必须完整传递。 API 是无状态的,第二轮调用必须带上第一轮的完整输出——包括模型说的话和它发起的工具调用块。这意味着客户端的状态机不只是为了渲染,还要精确重建出完整的消息对象用于下一轮请求。流式解析错一个字,下一轮就可能整体报错。
新一代模型还有"扩展思考"(extended thinking / reasoning)能力:在正式回答前先输出一段推理过程。在流式 API 里,这表现为又一种 delta 类型——thinking_delta,和正文的 text_delta 在不同的内容块里先后流出。
对智能体产品来说,这是一个体验设计的机会:思考过程往往比最终答案更长、更早开始流出,把它折叠展示("正在思考…"可展开),既填补了 TTFT 之后、正式回答之前的空窗,又给了用户审视推理链的入口。客户端要做的,就是在状态机里多识别一种块类型,分流到不同的 UI 区域。
(记住"思考过程是独立的流"这件事——下一节的真实代码里,你会看到我们正是把 reasoning_content 和 content 分开累加的。)
原理讲完,来看怎么落地。这一节一条龙走完:先从消费端代码写起,到工具流拼接、错误处理,再到把它放上生产链路要趟的那些坑。前半段是"在你自己的程序里怎么写",后半段是"出了你的程序、过了一堆中间设备之后会怎样"。
最省事的方式是用官方 SDK。以 Anthropic Python SDK 为例,纯文本场景下,SDK 把 SSE 解析和状态机维护全包了,"边收边渲染"简化成一个迭代器:
Python
withclient.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role":"user","content":"你好"}],
)asstream:
fortextinstream.text_stream:
print(text,end="",flush=True)但如果你要识别思考块、工具块,或者做精细的状态管理,就需要下沉到裸事件循环,按事件类型分发——这能让你真正看清 SDK 在底层替你做了什么:
Python
withclient.messages.stream(...)asstream:
foreventinstream:
ifevent.type=="content_block_start":
block_type=event.content_block.type# text / thinking / tool_use
# 在对应 index 上挂一个新块,UI 显示占位状态
elifevent.type=="content_block_delta":
d=event.delta
ifd.type=="text_delta":
ui.append_text(d.text)# 正文逐字渲染
elifd.type=="thinking_delta":
ui.append_thinking(d.thinking)# 思考流,折叠区
elifd.type=="input_json_delta":
buffer[event.index]+=d.partial_json# 工具参数,先攒着
elifevent.type=="content_block_stop":
# 工具块收尾:此时才整体解析参数
...浏览器端没有 SDK 兜底,思路完全一致——fetch 拿到 ReadableStream,逐块解码再按行切 SSE 事件:
JavaScript
constresp=awaitfetch(url, { method:"POST", body: JSON.stringify(payload) });
constreader=resp.body.getReader();
constdecoder=newTextDecoder();
letbuf="";
while (true) {
const { value, done } =awaitreader.read();
if (done) break;
buf+=decoder.decode(value, { stream:true });
constlines=buf.split("\n");
buf=lines.pop(); // 最后一行可能不完整,留到下一轮
for (constlineoflines) {
if (line.startsWith("data:")) handleEvent(JSON.parse(line.slice(5)));
}
}注意那个 buf = lines.pop() 的小细节:网络读到的一个 chunk 未必正好落在行边界上,最后半行必须缓存到下一轮再拼。这是手写 SSE 解析最容易翻车的地方。
如前所述,工具参数是碎片化的 JSON,规则很简单:按 index 把 partial_json 顺序拼接,等 content_block_stop 后再整体解析。
Python
tool_buffers={}# index -> 拼接中的 JSON 字符串
# delta 阶段:只累加,不解析
tool_buffers[index]+=delta.partial_json
# stop 阶段:整体解析才安全
args=json.loads(tool_buffers[index])
run_tool(name,args)想"边收边用"(比如参数没收完就先在 UI 上展示已成形的字段)才需要上 jiter 这类部分解析库——大多数场景下,"攒齐再解析"已经够用,别过度设计。
本地 demo 跑通很容易,但长连接跨网络,断流不是异常,是常态。三件事必须想清楚:
message_stop,OpenAI 是 [DONE])。没收到结束信号连接就断了,就要按"非正常结束"处理,而不是当成正常完成——否则你会把一份残缺的输出当成成品。流式场景下,你真正该卡的是事件间隔超时,而不是整请求超时。因为一个长任务的总耗时可能好几分钟,但只要数据块一直在零星地来,连接就是健康的。举个例子:一个总耗时三分钟、但每隔几秒就吐一块数据的任务,用"整请求 30 秒"去卡会被一刀切掉——哪怕连接全程健康;改用"事件间隔"去卡,只要相邻两块的间隔不超阈值,它就能正常跑完。
那这个间隔阈值定多少?它取决于流里最长的合理静默:纯文本流的块间隔常在毫秒级,但智能体执行工具或深度思考时,几十秒不吐数据也属正常。所以阈值要落在"比最长的正常停顿更宽、又比真正的断连更短"这个区间里——设太小会误杀正常的长停顿,设太大则发现不了真的卡死。这是个需要按业务实测的经验值,不是一个放之四海皆准的固定数字(我们的生产代码取了 110 秒,刚好压在网关 120 秒的硬超时之内)。
这一点,正是我们绕过前记里 120 秒网关超时的核心机制。 在 5.4 可以看真实代码。
代码写对了,流式也未必"流"得起来。把流式智能体部署到企业环境,SSE 这条"长时间不关闭的 HTTP 连接"要穿过 CDN、负载均衡、网关、Ingress 等一长串中间设备,每一层都可能动手脚。
缓冲是头号杀手。 反向代理默认会缓冲响应——攒一批数据再转发,这对普通页面是优化,对 SSE 是灾难:服务端明明在逐字推送,代理却攒到几 KB 才放行,用户看到的就是"卡半天、糊一脸"的块状输出,流式形同虚设。Nginx 需要 proxy_buffering off(或让上游返回 X-Accel-Buffering: no 头),其他代理和某些 CDN 也有各自的等价配置。排查流式"不流"的问题时,第一个嫌疑人永远是链路上某层的缓冲。
超时是第二杀手,也是我们前记里那个坑的真身。 很多网关、负载均衡器对请求有默认的超时上限。回到开头:我们那个内部网关卡的是整请求 120 秒,而智能体一次复杂分析要跑两三分钟。非流式调用期间连接上毫无数据流动,到点就被掐——这不是智能体的错,是"长任务 + 整请求超时"的必然冲突。
流式调用怎么破?看我们生产代码里这个函数(已脱敏):
Python
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1,max=10),
retry=retry_if_result(lambdaok:okisFalse),# 返回 False 就重试
reraise=False,
)
defcall_agent_summary_stream(event_id,chat_content,um_code):
"""流式调用智能体生成故障根因分析报告。成功返回 True,失败返回 False(触发重试)。
开启流式后服务端持续推送 SSE 数据块,只要块间隔不超过 read 超时(110s),
连接就一直存活,从而绕过非流式接口前的 120s 网关超时。
"""
headers={
'Content-Type':'application/json',
'appId':'<APP_ID>','token':'<TOKEN>',
'X-traceId':str(uuid.uuid4()),'X-channel':'api_call',
}
payload={
"agentAppId":AGENT_APP_ID,
"message":[{"content":chat_content,"content_type":"text"}],
"streaming":True,# ① 开启流式,绕过非流式 120s 网关超时
"umCode":um_code,
}
start=time.time()
try:
# ② 关键:连接超时 10s,读超时 110s(块间隔上限),而非整请求超时
resp=requests.post(AGENT_URL,headers=headers,json=payload,
timeout=(10,110),stream=True)
ifresp.status_code!=200:
returnFalse
content,reasoning_content="",""
forlineinresp.iter_lines():# ③ 逐行读 SSE
ifnotline:
continue
line_str=line.decode('utf-8')
ifline_str.startswith('data:{"event":"chunk_message"'):
try:
obj=json.loads(line_str[5:])['data']['data']# 去掉 "data:" 前缀
exceptException:
continue
# ④ 思考流与正文流分开累加(对应 4.3 节)
ifobj.get('reasoning_content'):
reasoning_content+=obj['reasoning_content']
ifobj.get('content'):
content+=obj['content']
full_output=content+reasoning_content
# ⑤ 内容级校验:缺报告标题 / 出现占位文案,都视为生成失败 -> 触发重试
if"故障根因分析报告"notinfull_output:
returnFalse
if"自主规划智能体输出为空"infull_output:
returnFalse
returnTrue
exceptrequests.exceptions.Timeout:
returnFalse# ⑥ 超时归为可重试
exceptrequests.exceptions.RequestException:
returnFalse
finally:
logger.info(f"事件{event_id}流式请求耗时: {time.time() - start:.1f} 秒")把这段代码和前面的原理对一下,每个标号都有出处:
streaming: True 后,服务端逐块往回写;timeout=(10, 110) 里的 110 是 requests 的读超时,含义是"两次数据块之间最多等 110 秒",不是整请求超时。所以哪怕智能体总共跑了三分钟,只要它每隔一会儿就吐一块数据,连接就一直活着,120 秒的网关整请求超时根本不会触发。这就是流式能绕过去、非流式绕不过去的全部秘密。iter_lines() 就是第二节说的"按行解析 SSE",requests 在底层处理了 chunk 与行边界。data:{"event":"chunk_message",...}(注意:事件名既不是 Anthropic 也不是 OpenAI 的,是网关自定义的——正好印证第三节"命名不同、设计同构"),但内核一样:reasoning_content 是思考过程,content 是正文,分开累加。return False,由外层 tenacity 的 retry_if_result 触发重试。这正是 5.3 说的——重试 = 从头重新生成,所以才值得为"内容质量"而重试,而不只是为网络抖动。断流必须按"必然发生"来设计。 长连接跨网络,断流是常态。上面代码把超时和网络异常统一收敛成 return False,交给重试机制兜底,就是这个思路的体现。还有一个工程编排细节值得一提:在我们的消费循环里,这个"流式 + 重试"的重调用被刻意挪到了整个处理逻辑的最后一步执行——因为它最慢(最坏情况重试三次、每次几分钟),放在前面会阻塞拓扑查询和数据入库。长耗时的流式调用,不要卡在主流程关键路径上。
可观测性要为流式重新建模。 传统 HTTP 监控记录"请求耗时"一个数字,对流式毫无意义——一条正常的流式响应耗时 40 秒甚至几分钟再正常不过。有意义的指标是:TTFT(首 token 延迟,反映排队和首包链路)、token 间隔(反映生成吞吐)、流完成率(多少比例的流正常收到结束信号)、每轮 agent loop 的工具执行耗时。在 OpenTelemetry 体系下,通常把一次完整的智能体任务建成一条 trace,每轮模型调用和每次工具执行各是一个 span,TTFT 作为 span 上的事件或属性记录。
前面几节是顺着"流式"这条线往下钻的。但跳出流式本身,还有一个更大的问题值得收个尾:当你把一个大模型/智能体 API 接进一套传统的业务代码里,到底要注意什么?
本文那段生产代码就是一个典型的融合现场——一边是确定性的 Kafka 消费、拓扑查询、数据库入库,另一边是又慢、又贵、又"会自己思考和动手"的智能体调用。最容易翻车的,就是用写传统 RPC 的思维去写这类调用。下面七条,是我们踩过坑之后总结的注意点。
1. 时延与执行模型——别在请求主路径上同步等。 LLM/智能体调用动辄几秒到几分钟,和毫秒级的 DB/RPC 不是一个量级。直接塞进 Web 请求的同步链路,会拖垮线程池、触发各级超时(就是前记那个 120s 网关的坑)。能异步就异步(消息队列 / 任务表 + 回调),长任务用流式保活;并且像本文代码那样把慢调用挪出关键路径——先把入库、拓扑查询这些确定性逻辑做完,再在循环最后触发智能体,别让它阻塞主流程。
2. 输出不确定——LLM 是概率组件,不是函数。 传统代码默认"输入定则输出定",LLM 不是:同样的 prompt 可能给出格式不一、字段缺失、甚至幻觉的结果。融合层必须把模型输出当作不可信的外部输入来对待——强制结构化输出(JSON schema / tool use)、解析后做校验、校验失败要有兜底。本文那个"输出里没有《故障根因分析报告》标题就判定失败重试",正是内容级校验的例子。永远不要让一段模型自由文本直接驱动下游的写操作。
3. 失败与重试语义不同——重试等于重新生成 + 重新计费。 传统幂等接口可以放心重发;LLM 重试是从头重算、重新花钱,还可能给出不一样的结果。所以要设好超时(区分连接级与事件间隔超时,见 5.3、5.4)、限制重试次数,对昂贵调用加缓存或去重,避免重试风暴把账单和限流一起打爆。
4. 成本与限流是一等公民。 传统接口调用近乎免费,LLM 按 token 计费、且有 RPS/TPM 限额。融合时要做预算控制、限流与降级(高峰期切小模型、裁剪上下文),并把 token 消耗纳入监控——否则一个写错的循环就是一张大额账单。
5. 安全边界——智能体会"自己动手"。 工具调用让模型能触发真实动作:查库、调内部 API、执行代码。绝不能把模型输出直接当可信指令执行。工具侧要做白名单 + 参数校验 + 权限最小化,高危操作加人工确认,并警惕 prompt 注入(用户输入里藏的"忽略以上指令")。这是传统代码里不存在、智能体特有的攻击面。
6. 可观测性要重建。 传统 APM 的"请求耗时"对流式毫无意义(一条正常的流就是 40 秒)。要按 5.4 那套重新建模:TTFT、token 间隔、流完成率、每轮工具耗时、token 用量与成本,把一次完整的智能体任务建成一条 trace。
7. 状态管理在你这边。 LLM API 是无状态的,多轮对话和 agent loop 的上下文必须由你的应用完整维护并回传(见 4.2 的"状态跨轮传递")。存哪、传哪、怎么裁剪上下文以控制长度和成本,都是融合层的责任,而不是 API 替你扛。
一句话收口:把大模型/智能体当成一个"慢、贵、不可信、有副作用"的外部依赖来集成,而不是一个普通的本地函数。 时延、不确定性、成本、安全这四条线,都得在融合层单独设防——流式只是其中"时延"那条线上最关键的一块拼图。
本文是笔者从一次生产环境的智能体 API 调用超时问题出发,做的一点总结和延伸思考。
走到智能体时代我越来越体会到:传统代码之间的 API 调用,和面向大模型、智能体的 API 调用,已经是很不一样的两件事——前者快、确定、一问一答;后者更慢、更不确定,还会自己思考、自己动手。也正因如此,我把流式调用底层的原理和实践认真梳理了一遍,写成这篇文章,希望对大家有帮助。