Skip to content

07 事件流

一、什么是事件流

以前在LangChain中,使用 stream() 时,不同类型的数据消息、状态更新、自定义事件会混在一起返回,开发者需要自己判断数据类型并解析。在LangChain v1.0中,推荐使用 stream_events(version="v3")。它返回一个 Run 对象,里面已经按类型拆分成了多个流:这样前端或业务代码可以直接消费自己关心的数据,不用再从整个Strem事件流中手动筛选数据。

在LangChain中,使用stream_events方法来获取事件流示例如下:

python
from langchain.agents import create_agent


def get_weather(city: str) -> str:
    """查询城市天气"""
    return f"{city}今天晴天,气温28°C~35°C"


agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])

# 使用stream_events获取事件流,version="v3"是固定写法
stream = agent.stream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3"
)

# 流式输出模型回复
for message in stream.messages:
    for delta in message.text:
        print(delta, end="", flush=True)

# 获取最终状态
final_state = stream.output

二、流对象的类型

stream_events返回的流对象提供了多种"投影",每种投影对应不同类型的内容:

投影用途
stream.messages模型消息流,每次LLM调用产生一个
message.text文本增量和最终文本
message.reasoning推理内容(部分模型支持,比如DeepSeek的深度思考)
message.tool_calls工具调用的参数块和最终调用结果
stream.tool_calls工具执行的完整生命周期,包括输入、输出、错误等
stream.valuesAgent状态快照
stream.output最终Agent状态
stream.subgraphs嵌套图运行(子Agent场景)
stream.extensions自定义投影

简单来说,stream.messages用于获取消息列表,stream.tool_calls获取工具调用的过程,stream.values获取Agent的状态数据。

三、流式输出模型消息

最常见的场景就是流式输出模型的回复,让用户看到"打字机"效果:

python
stream = agent.stream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3"
)

for message in stream.messages:
    # 输出当前是哪个节点产生的消息
    print(f"[{message.node}] ", end="")

    # 流式输出文本增量
    for delta in message.text:
        print(delta, end="", flush=True)

    # 获取完整消息和token用量
    full_message = message.output
    usage = full_message.usage_metadata
    if usage:
        print(f"\ntoken用量: {usage}")

message.text是逐字输出的增量,如果你想一次性拿到完整文本,可以用str(message.text)

四、流式输出推理过程

有些模型支持"深度思考",比如DeepSeek的思考模式。这时候模型会先输出一段推理过程,再输出最终答案。通过message.reasoning可以拿到推理部分:

python
stream = agent.stream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3"
)

for message in stream.messages:
    # 输出推理过程
    for delta in message.reasoning:
        print(f"[思考] {delta}", end="", flush=True)

    # 输出最终回答
    for delta in message.text:
        print(delta, end="", flush=True)

输出效果类似:

[思考] 用户问的是杭州天气,我需要调用get_weather工具...
杭州今天晴天,气温28°C~35°C,适合出门。

五、流式输出工具调用

工具调用的流式输出分为两个阶段:

阶段一:模型生成工具调用参数

当LLM决定要调用工具时,它会生成工具名和参数。这个过程可以通过message.tool_calls来流式获取:

python
stream = agent.stream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3"
)

for message in stream.messages:
    # 流式获取工具调用参数的生成过程
    for chunk in message.tool_calls:
        print(f"工具调用参数块: {chunk}")

    # 获取最终确定的工具调用
    finalized = message.tool_calls.get()
    if finalized:
        print(f"最终工具调用: {finalized}")

阶段二:工具执行过程

工具开始执行后,可以通过stream.tool_calls获取执行的完整生命周期:

python
stream = agent.stream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3"
)

for call in stream.tool_calls:
    print(f"调用工具: {call.tool_name}")
    print(f"输入参数: {call.input}")

    # 流式输出工具执行结果
    for delta in call.output_deltas:
        print(delta, end="", flush=True)

    # 获取最终输出和错误信息
    print(f"输出: {call.output}")
    print(f"错误: {call.error}")

六、流式输出状态

有时候你需要实时监控Agent的状态变化,比如短期记忆中的消息列表变化:

python
stream = agent.stream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3"
)

# 每次状态变化都会产生一个快照
for snapshot in stream.values:
    print(snapshot)

# 获取最终状态
final_state = stream.output
print(final_state["messages"][-1].content)

七、子Agent的流式输出

在多Agent协作的场景中,一个Agent可能会调用另一个Agent。这时候子Agent的事件流会通过stream.subgraphs暴露出来:

python
from langchain.agents import create_agent


# 定义天气Agent
def get_weather(city: str) -> str:
    """查询城市天气"""
    return f"{city}今天晴天,气温28°C~35°C"


weather_agent = create_agent(
    model="deepseek-v4-flash",
    tools=[get_weather],
    name="weather_agent",
)


# 定义一个工具,内部调用天气Agent
def call_weather(query: str) -> str:
    """调用天气Agent查询天气"""
    result = weather_agent.invoke({"messages": [{"role": "user", "content": query}]})
    return result["messages"][-1].text


# 定义主Agent
supervisor = create_agent(
    model="deepseek-v4-flash",
    tools=[call_weather],
    name="supervisor",
)

stream = supervisor.stream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3",
)

# 遍历子Agent的流
for subagent in stream.subgraphs:
    if subagent.graph_name != "weather_agent":
        continue
    print(f"[{subagent.graph_name}] ", end="")
    for message in subagent.messages:
        for token in message.text:
            print(token, end="", flush=True)
    print()

通过subagent.graph_name可以区分不同的子Agent,只处理你关心的那一个。

八、同时消费多种流

在实际应用中,你可能同时需要模型消息和工具调用的流。在异步代码中可以用asyncio.gather并发消费:

python
import asyncio

stream = await agent.astream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3"
)


async def consume_messages():
    async for message in stream.messages:
        print(await message.text)


async def consume_tool_calls():
    async for call in stream.tool_calls:
        print(call.tool_name, call.input)


await asyncio.gather(consume_messages(), consume_tool_calls())

在同步代码中,可以用stream.interleave交替消费:

python
stream = agent.stream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3"
)

for name, item in stream.interleave("messages", "tool_calls", "values"):
    if name == "messages":
        print(item.text)
    elif name == "tool_calls":
        print(item.tool_name, item.input)
    elif name == "values":
        print(item)

九、自定义流式投影

如果内置的投影不能满足需求,比如你想输出检索进度、自定义事件等,可以通过自定义transformer来扩展:

python
stream = agent.stream_events(
    {"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
    version="v3",
    transformers=[ToolActivityTransformer],
)

for activity in stream.extensions["tool_activity"]:
    print(activity)

也可以在中间件中注册transformer:

python
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware


class ToolActivityMiddleware(AgentMiddleware):
    transformers = (ToolActivityTransformer,)


agent = create_agent(
    model="deepseek-v4-flash",
    tools=[get_weather],
    middleware=[ToolActivityMiddleware()],
)

十、总结

事件流是LangChain中实现流式输出的核心机制,通过stream_events(version="v3")获取事件流后,可以根据需要选择不同的投影来消费:

  • 想要模型回复的"打字机"效果?用stream.messages配合message.text
  • 想看工具调用过程?用message.tool_callsstream.tool_calls
  • 想监控Agent状态?用stream.values
  • 多Agent场景?用stream.subgraphs
  • 需要自定义输出?用stream.extensions

在下一篇文章中,我们将学习如何在LangChain中实现更细粒度的流式输出控制。