这篇文章不打算重复文档定义,而是从工程视角回答一个更实际的问题:

LangChain Streaming 在真实项目里应该怎么设计、怎么接入、怎么落地、会踩哪些坑?


一、为什么工程里一定要做 Streaming

很多文章会说 Streaming 能“提升用户体验”,但这句话太空。工程里真正的价值,通常体现在下面四件事上。

1)降低“首字可见时间”,不是只降低总耗时

很多 LLM 请求总耗时其实没有夸张到不能接受,问题在于用户在前 2~5 秒里什么都看不到。

对聊天系统来说,用户感知最强的指标往往不是“10 秒返回完”,而是“300ms 内先看到系统开始说话”。

messages 模式的价值就在这里。它会从所有发生 LLM 调用的节点里持续产出 (token, metadata),也就是你可以尽早把 token 推到前端,而不是等整个 Agent 完成后再统一返回。官方文档明确说明,messages 就是用来流式拿 LLM token 的。

2)Agent 调工具时,必须让用户知道“系统还活着”

普通聊天补全文本时,只要 token 一直在滚动,用户一般不会焦虑。

但 Agent 一旦开始调工具,事情就不一样了:模型可能先生成工具调用,再等待外部 API、数据库、检索服务、内部系统响应。这段时间如果没有反馈,用户很容易认为“卡住了”。

LangChain 的 updates 模式会在每个 agent step 后产出状态更新。对于一次典型的 tool 调用流程,官方文档给出的 step 顺序就是:

这其实非常适合映射到前端状态:“正在分析问题” → “正在调用工具” → “工具返回中” → “正在组织最终答案”。

3)长链路任务需要“过程反馈”,而不是只有最终结果

比如:

这里光有 messages 不够。因为 token 只能说明“模型在输出”,但不能说明“业务流程走到哪一步”。

这正是 custom 的作用:你可以在工具或图节点内部主动发业务进度,比如:

官方文档把这个能力描述为:使用 get_stream_writer() 从图节点内部流出任意自定义数据。

4)Streaming 不只是给用户看,也是给工程师看的

这一点经常被忽略。

真实项目里,复杂 Agent 的问题通常不是“最终答错了”这么简单,而是:

如果你只拿最终 answer,定位问题会非常痛苦。

updates + messages 的组合,天然就是一条轻量级执行观测链路:你能看到 step 级状态,也能看到 token 级输出,必要时还能把 custom 进度事件和 trace id 绑在一起做日志关联。LangGraph 文档也明确说明,v2 流式输出里每个 chunk 都是带 typeStreamPart,适合后端统一分发。


二、三种 Streaming 模式,工程里分别该怎么用

LangChain 官方把 Streaming 分成三类:

定义很简单,但工程里更重要的是:它们各自解决什么问题


1)updates:看“流程步骤”,不看“逐字输出”

官方定义是“每个 agent step 之后流出状态更新”。如果放到工程里理解,它更像是:

Agent 执行阶段的状态流

最适合的场景:

例如一次 create_agent 的典型执行中,你会看到:

这正好就是用户界面里“分析中 / 查数据中 / 回答中”的来源。

优点

缺点

一句话判断

你的需求如果是“我想知道 Agent 现在跑到哪一步了”,优先看 updates


2)messages:看“模型流式输出”,包括 token 和工具调用碎片

官方定义是“从发生 LLM 调用的图节点中流出 (token, metadata) 元组”。工程上可以理解成:

LLM 层的增量输出流

这不只是普通文本 token。文档明确展示了,在 Agent 场景下,messages 还会产出 tool_call_chunk,也就是工具调用参数生成过程中的增量片段。比如模型先吐出工具名,再逐步吐出参数 JSON 的各个片段。

最适合的场景:

优点

缺点

一句话判断

你的需求如果是“我要尽快把模型输出推给用户”,优先看 messages


3)custom:发业务进度,不依赖模型文本

官方给的能力非常直接:在工具或图节点里通过 get_stream_writer() 主动输出任意数据。文档还特别说明,加入 get_stream_writer() 后,这个工具就依赖 LangGraph 执行上下文,脱离该上下文单独调用会失败。

工程里可以把它理解成:

你自己定义的业务事件流

适合的场景:

LangGraph 文档还提到,custom 可以用来包装任意外部流式客户端,即便那个 LLM API 本身不实现 LangChain chat model 接口,也可以通过 writer 手动往外发 chunk。

优点

缺点

一句话判断

你的需求如果是“我要告诉用户系统具体在做什么业务动作”,用 custom


4)一个项目里怎么组合这三种模式

这是最关键的。

我自己的建议是:

原因很简单:

三者不是互斥关系,而是三层不同视角。官方文档也明确支持把多个 stream_mode 作为列表同时传入,而且在 version="v2" 下,每个 chunk 都会带 type 字段,天然适合服务端做统一分发。


三、一个最小可运行示例:先把 Streaming 跑起来

下面先给一个尽量短、但工程上有意义的例子:

说明:下面代码是基于 LangChain 官方 Streaming/Agents 文档能力写的最小整合版。要运行,需要你自己配置模型 provider 的 API Key。官方文档中的 Agent 示例使用了 create_agent(...),Streaming 通过 stream() / astream() 消费。

python
import json
import time
from typing import Any
 
from langchain.agents import create_agent
from langchain.tools import tool
from langchain.messages import AIMessageChunk, AIMessage, ToolMessage
from langgraph.config import get_stream_writer
 
@tool
def query_order_status(order_id: str) -> str:
    """查询订单状态"""
    writer = get_stream_writer()
 
    writer({
        "event": "custom_progress",
        "stage": "tool_query_order",
        "message": f"开始查询订单 {order_id}"
    })
    time.sleep(0.5)
 
    writer({
        "event": "custom_progress",
        "stage": "tool_query_order",
        "message": f"订单 {order_id} 查询成功"
    })
 
    return json.dumps({
        "order_id": order_id,
        "status": "shipped",
        "carrier": "SF Express",
        "eta": "2026-04-02"
    }, ensure_ascii=False)
 
agent = create_agent(
    model="openai:gpt-5",   # 换成你自己可用的模型
    tools=[query_order_status],
)
 
input_data = {
    "messages": [
        {
            "role": "user",
            "content": "帮我查询订单 A10086 的状态,并用中文告诉我结果"
        }
    ]
}
 
for chunk in agent.stream(
    input_data,
    stream_mode=["messages", "updates", "custom"],
    version="v2",
):
    chunk_type = chunk["type"]
 
    if chunk_type == "messages":
        token, metadata = chunk["data"]
 
        # token 可能是普通文本 chunk,也可能是工具调用 chunk
        if isinstance(token, AIMessageChunk):
            if token.text:
                print("[TOKEN]", token.text, flush=True)
 
            if token.tool_call_chunks:
                print("[TOOL_CALL_CHUNK]", token.tool_call_chunks, flush=True)
 
    elif chunk_type == "updates":
        # updates 是“步骤完成后的状态”
        for step, data in chunk["data"].items():
            last_msg = data["messages"][-1]
            if isinstance(last_msg, AIMessage) and last_msg.tool_calls:
                print("[STEP]", step, "tool_calls =", last_msg.tool_calls, flush=True)
            elif isinstance(last_msg, ToolMessage):
                print("[STEP]", step, "tool_result =", last_msg.content, flush=True)
            else:
                print("[STEP]", step, "final =", getattr(last_msg, "content", None), flush=True)
 
    elif chunk_type == "custom":
        print("[CUSTOM]", chunk["data"], flush=True)

这段代码在工程里分别起什么作用

@tool + get_stream_writer()

这部分不是为了“让工具能工作”,而是为了让工具执行过程可以被外部观察。

例如查订单、查物流、跑 SQL、调用内部服务时,工具本身可能要 1~5 秒,这时只有最终返回值远远不够,最好中间也有阶段信息。官方文档就是把 custom 定位为“从工具执行中流出任意更新”。

stream_mode=["messages", "updates", "custom"]

这行代码是整个工程接入的核心。

它等于同时订阅三种视角:

这样后端可以只写一套消费循环,再根据 chunk["type"] 路由到不同处理器。LangGraph v2 文档明确推荐这种按 type 分发的方式。

token.texttoken.tool_call_chunks

这里是最容易踩坑的地方。

很多人第一次看到 messages,以为拿到的一定是文本 token。其实文档已经明确展示了,模型在准备调用工具时,产出的可能是一串 tool_call_chunk,包括工具名、参数 JSON 的部分片段。

所以工程上不要这样做:

python
# 错误示意:把所有 messages 都当文本拼接
full_text+=token.text

而应该区分:

updates 里的 completed state

如果你想拿到完成后的工具调用,只看 messages 不够,因为它给的是增量片段。

官方文档明确建议,在消息被 agent state 跟踪的场景下,用 stream_mode=["messages", "updates"] 来同时拿:

这在工程里非常重要。因为前端显示可以基于 token,但后端决定“现在是否已正式进入工具执行阶段”,更可靠的依据往往是 updates 里的完成态。


四、更接近真实项目的示例:FastAPI + SSE 把 LangChain 流转成前端可消费事件

真实项目里,前端通常不会直接理解 LangChain 的原始 chunk。

更稳妥的做法是:

  1. 后端消费 LangChain 的流式事件
  2. 转换成你自己的统一事件协议
  3. 通过 SSE 或 WebSocket 推给前端

这里我用 FastAPI + SSE 举例。原因很简单:


五、先设计事件协议,再写代码

我推荐的前后端协议最少包含下面几类事件:

为什么这样拆

token

给前端做打字机效果,只放最终要显示给用户的文本 token。

step

告诉前端 Agent 当前处于哪个阶段,例如:

tool_start

模型已经确定要调用哪个工具,参数是什么。

tool_end

工具执行完成,拿到了什么结果。

custom_progress

工具内部更细的进度,例如“已拉取第 3 页数据”。

done

本轮对话流结束。

error

链路中的任何异常,都统一映射成这类事件,前端好处理。


六、FastAPI + SSE 后端示例

python
import asyncio
import json
import logging
from typing import AsyncIterator
 
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from langchain.agents import create_agent
from langchain.tools import tool
from langchain.messages import AIMessageChunk, AIMessage, ToolMessage
from langgraph.config import get_stream_writer
 
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
 
app = FastAPI()
 
@tool
def search_kb(query: str) -> str:
    """搜索知识库"""
    writer = get_stream_writer()
    writer({
        "event": "custom_progress",
        "stage": "kb_search",
        "message": f"开始搜索知识库: {query}"
    })
 
    # 模拟耗时
    import time
    time.sleep(0.8)
 
    writer({
        "event": "custom_progress",
        "stage": "kb_search",
        "message": "知识库检索完成,命中 3 条候选结果"
    })
 
    return (
        "退款规则:商品签收后 7 天内支持无理由退款;"
        "如已发货,退款到账时效 1~3 个工作日。"
    )
 
agent = create_agent(
    model="openai:gpt-5",   # 替换为你自己的模型
    tools=[search_kb],
)
 
def sse_pack(event: str, data: dict) -> str:
    return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
 
@app.get("/chat/stream")
async def chat_stream(request: Request, q: str):
    async def event_generator() -> AsyncIterator[str]:
        final_text_parts = []
 
        try:
            async for chunk in agent.astream(
                {
                    "messages": [
                        {"role": "user", "content": q}
                    ]
                },
                stream_mode=["messages", "updates", "custom"],
                version="v2",
            ):
                # 客户端断开连接时尽快停止
                if await request.is_disconnected():
                    logger.info("client disconnected")
                    break
 
                chunk_type = chunk["type"]
 
                if chunk_type == "messages":
                    token, metadata = chunk["data"]
 
                    if isinstance(token, AIMessageChunk):
                        # 1) 文本 token:直接推给前端展示
                        if token.text:
                            final_text_parts.append(token.text)
                            yield sse_pack("token", {
                                "text": token.text,
                                "node": metadata.get("langgraph_node"),
                            })
 
                        # 2) 工具调用 chunk:不要当成文本展示
                        if token.tool_call_chunks:
                            yield sse_pack("tool_call_delta", {
                                "chunks": token.tool_call_chunks,
                                "node": metadata.get("langgraph_node"),
                            })
 
                elif chunk_type == "updates":
                    for step, data in chunk["data"].items():
                        last_msg = data["messages"][-1]
 
                        yield sse_pack("step", {
                            "step": step,
                            "message_type": last_msg.__class__.__name__,
                        })
 
                        if isinstance(last_msg, AIMessage) and last_msg.tool_calls:
                            yield sse_pack("tool_start", {
                                "step": step,
                                "tool_calls": last_msg.tool_calls,
                            })
 
                        elif isinstance(last_msg, ToolMessage):
                            yield sse_pack("tool_end", {
                                "step": step,
                                "tool_output": last_msg.content,
                            })
 
                elif chunk_type == "custom":
                    data = chunk["data"]
                    yield sse_pack("custom_progress", data if isinstance(data, dict) else {
                        "message": str(data)
                    })
 
            yield sse_pack("done", {
                "text": "".join(final_text_parts)
            })
 
        except asyncio.CancelledError:
            logger.warning("request cancelled")
            raise
 
        except Exception as e:
            logger.exception("stream failed")
            yield sse_pack("error", {
                "error": type(e).__name__,
                "message": str(e),
            })
 
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

这段后端代码的工程意义

1)不要把 LangChain 原始 chunk 直接透传给前端

原始 chunk 是框架内部语义,前端不应该依赖太深。

比如今天是 AIMessageChunk.tool_call_chunks,未来框架结构变了,你前端就会被迫改。更稳妥的做法是后端做一层协议转换,前端只认你定义的事件名。

2)tokentool_call_delta 一定要分开

这是 Streaming 接入里最常见的 bug。

因为 messages 里既可能有文本 token,也可能有 tool_call_chunk。官方文档展示得非常清楚:调用工具时,模型会一段一段流出工具名和参数 JSON。

如果你把这些 chunk 当普通文本显示,前端就会出现:

所以正确做法是:

3)工具真正“开始执行”的判断,尽量基于 updates

messages 里的 tool_call_chunk 只是“模型正在生成调用意图”,还不代表工具已经真正执行。

updates 里拿到完成态的 AIMessage.tool_calls,以及后续的 ToolMessage,更适合映射成 tool_start / tool_end。官方文档也建议在需要拿到完成后的 parsed tool call 时,把 messagesupdates 一起开。

4)request.is_disconnected() 很重要

很多线上 Streaming 接口的问题不是“不会流”,而是“用户关了页面后后端还在傻跑”。

SSE / WebSocket 场景都要考虑客户端中断:

不及时取消,模型调用和工具调用会继续消耗资源。


七、前端应该如何理解这些事件

一个稳定的聊天前端,通常应该维护一个明确的状态机,而不是“收到什么就 render 什么”。

我建议至少区分四种可见状态:

  1. 模型正在思考 触发时机:收到 step(model),但还没有用户可见文本 token
  2. 正在调用工具 触发时机:收到 tool_start
  3. 工具执行完成,正在组织答案 触发时机:收到 tool_end,但最终 answer token 还没开始
  4. 最终回答输出中 触发时机:收到文本 token

这样用户对系统行为的预期是稳定的,不会因为工具链路中间断流就以为系统死掉。


八、工程里最容易踩的坑

下面这部分最重要。


坑 1:为什么 messages 模式下会出现 tool_call_chunk,而不是直接文本?

因为 messages 流的是LLM 的增量消息块,不是“面向用户的最终文本”。

官方文档明确写到,在 Streaming tool calls 场景里,你可能既想拿到“工具调用生成中的部分 JSON”,也想拿到“最终被执行的完整 tool calls”。messages 给的是前者的增量片段,所以会出现 tool_call_chunk。完成态则需要结合 updates 或你自己聚合。

工程结论:


坑 2:为什么 custom 里的 get_stream_writer() 不能脱离 LangGraph 执行上下文单独调用?

官方文档直接说明了这一点:如果你在工具里加了 get_stream_writer(),那这个工具就不能脱离 LangGraph 执行上下文独立调用。

原因用工程语言解释就是:

get_stream_writer() 不是普通全局对象,它依赖当前 graph run 的 runtime context。

没有这层上下文,就没有地方把事件写出去。

这会带来一个实际问题:

很多人会把 tool 单独写单元测试,然后直接调用函数,结果一跑就炸。

更稳妥的写法有两个:

方案 A:给工具核心逻辑和流式包装分层

python
def _query_order_core(order_id: str) -> dict:
    return {"order_id": order_id, "status": "shipped"}
 
@tool
def query_order(order_id: str) -> str:
    writer = get_stream_writer()
    writer({"event": "progress", "message": "start"})
    result = _query_order_core(order_id)
    writer({"event": "progress", "message": "done"})
    return json.dumps(result, ensure_ascii=False)

这样 _query_order_core() 可以单独测试,query_order() 负责 LangChain 包装。

方案 B:让 writer 变成可注入依赖

这样普通环境传 None,流式环境传真实 writer。

LangGraph 文档还提到,在 Python < 3.11 的 async 环境里,get_stream_writer() 不可用,需要改成显式传 writer。这个思路本身也说明:writer 最好在工程上被视为一种依赖,而不是业务函数的唯一入口。


坑 3:前端如何区分“模型正在思考”“正在调用工具”“工具执行完成”“最终回答输出中”?

不要试图仅靠 token 猜。

更稳的做法是组合事件:

也就是说,前端状态主要看 updates,文本展示主要看 messages


坑 4:如何避免前端一边展示 token,一边被 tool call 打乱?

这是最常见的前端渲染问题。

原则只有一条:不要把所有 messages 里的 chunk 都拼进同一个文本缓冲区。

推荐做法:

如果你真的想展示工具调用过程,也要单独渲染成“系统状态”,不要混进回答正文。


坑 5:多种 stream mode 同时开启时,后端事件分发怎么写?

不要写一大坨 if else 然后直接业务逻辑塞进去。

建议抽成标准分发器:

python
handlers = {
    "messages": handle_messages,
    "updates": handle_updates,
    "custom": handle_custom,
}
 
async for chunk in agent.astream(...):
    handler = handlers.get(chunk["type"])
    if handler:
        async for event in handler(chunk):
            yield event

因为 LangGraph v2 的核心约定就是:每个 chunk 都是带 typeStreamPart。既然框架已经把事件类型分好了,后端最好也按事件总线的方式处理。


坑 6:异常处理、超时处理、取消请求处理怎么做?

这块是 Streaming 落地里最容易被忽略的地方。

异常处理

无论是模型异常、工具异常、序列化异常,最终都要变成统一的 error 事件返回给前端。

不要让连接直接断掉,否则前端体验会非常糟。

超时处理

至少要分两层:

Agent 文档里就展示了模型实例可以设置 timeout

工程上建议:

取消请求

SSE 场景用 request.is_disconnected();WebSocket 场景则监听断连。

取消之后应尽快:


坑 7:日志怎么记,才能排查线上问题?

推荐至少记录三层日志:

第一层:请求级

第二层:step 级

第三层:流事件级

最关键的一点是:

日志要和流式事件协议保持同构。

例如你已经定义了:

那日志里最好也对应这三个阶段。这样线上复盘时,你看日志就像在“回放事件流”。


九、工程建议:不同项目该怎么选 Streaming 方案

1)什么场景只用 messages 就够了

适合:

例如一个纯问答助手,大部分时候只输出文本,没有明显的业务步骤。

判断标准:

如果你的前端 UI 不打算展示“正在查库 / 正在检索 / 正在调用工具”,那只用 messages 完全可以。


2)什么场景必须配合 updates

适合:

官方文档对工具调用流式的建议,本质上就是:

只看 messages 你拿到的是增量碎片;想拿完成态,要结合 updates

我的建议:

只要是 Agent + Tools,就默认把 updates 打开。


3)什么场景应该补充 custom

适合:

比如:

这些场景如果只靠 messagesupdates,信息还是不够细。

这时候 custom 最有价值。


4)中小项目怎么选

中小项目目标一般是“尽快上线,不要把协议搞太复杂”。

我建议:

这样已经能覆盖 80% 的需求。


5)复杂 Agent 项目怎么选

复杂项目往往有这些特征:

我建议:


十、我推荐的默认实践方案

这部分给一套能直接落地的大多数业务系统的做法。

默认后端方案

默认事件协议

前端只认这几类:

默认消费规则

默认前端规则

默认日志规则

每次请求至少记录:

默认代码组织建议

不要把 writer、业务逻辑、工具包装混在一起。

推荐分层:

这样后面你要替换模型、替换推送协议、替换前端协议,都不会牵一发动全身。


总结

LangChain Streaming 真正有价值的地方,不是“能流式”,而是它把 Agent 执行过程拆成了三层可观测信号:

如果只把 Streaming 理解成“让回答一个字一个字往外蹦”,那它的价值只用了 30%。

真正适合工程落地的做法,是把它当成一条事件流总线:后端消费 LangChain 流,转成你自己的前端协议和日志协议,再把用户体验、调试体验、线上可观测性统一起来。官方文档已经把基础能力铺好了,关键在于你不要停留在 API 层,而要把这些 chunk 设计成系统行为。


推荐落地方案

如果你现在要给一个大多数业务系统上 Streaming,我推荐直接用下面这套默认架构:

方案选型

事件协议

默认策略

最重要的三条工程原则

  1. 不要把 messages 里的所有 chunk 都当成文本。
  2. 不要让前端直接依赖 LangChain 原始 chunk 结构。
  3. 不要只做用户可见 Streaming,不做日志和状态流。

按这套方案做,基本能覆盖绝大多数客服、搜索、知识库问答、数据查询类 LLM 应用,而且后面要扩展可观测性、取消请求、问题排查,也不会推翻重来。