Skip to content

04 流式响应

前面我们写的Agent API,都是等Agent处理完再一次性返回结果。但你用ChatGPT、DeepSeek的时候,回答是一个字一个字蹦出来的,不用等全部生成完就能看到内容。

这就是流式响应——服务端一边生成数据,一边发给客户端,不用等全部准备好。

对Agent来说,流式响应几乎是必须的。因为LLM生成回答需要时间,如果等全部生成完再返回,用户可能要等5-10秒才能看到任何内容。流式模式下,第一个token出来就能显示,用户体验好得多。

打个比方:非流式就像你点了一份外卖,做好了才一起送来;流式就像回转寿司,厨师做好一盘你就拿到一盘,不用等全部做完。

一、基本原理

HTTP响应分两部分:头部体部。正常情况下,服务器准备好完整的响应体,一次性发给客户端。

流式响应不一样——服务器先发头部,然后通过**生成器(generator)**一块一块地发送响应体。每yield一次,就有一段数据发给客户端。

python
@app.route("/stream")
def stream():
    def generate():
        for i in range(5):
            yield f"第{i+1}条数据\n"

    return generate(), {"Content-Type": "text/plain"}

客户端会陆续收到:

第1条数据
第2条数据
第3条数据
第4条数据
第5条数据

关键点是yield——每次yield的数据会立即发给客户端,不用等函数执行完。

二、Generator生成器

如果你不熟悉Python的生成器,这里简单解释一下。普通函数用return返回,调用一次就结束。生成器用yield返回,每次调用会从上次yield的地方继续执行:

python
def normal_func():
    return 1
    return 2  # 永远不会执行

def generator_func():
    yield 1   # 第一次调用到这里暂停
    yield 2   # 第二次调用从这里继续
    yield 3   # 第三次调用从这里继续

gen = generator_func()
print(next(gen))  # 1
print(next(gen))  # 2
print(next(gen))  # 3

Flask接收到生成器后,会不断调用next()获取数据,每拿到一块就发给客户端。

三、SSE格式

对于Agent API,我们通常用**SSE(Server-Sent Events)**格式来做流式响应。SSE是一种基于HTTP的单向推送协议,格式很简单:

data: 第一条消息\n\n
data: 第二条消息\n\n
data: 第三条消息\n\n

每条消息以data: 开头,以\n\n(两个换行)结尾。

3.1 为什么用SSE

方式特点适合场景
普通流式原始数据流文件下载、CSV导出
SSE结构化的事件格式Agent逐字输出、实时通知
WebSocket双向通信聊天室、游戏

SSE对Agent来说是最合适的:

  • 基于HTTP,不需要额外的协议支持
  • 浏览器原生支持(EventSource API)
  • 自动重连
  • 格式简单,实现容易

3.2 SSE数据格式

SSE支持几种字段,最常用的是data

data: 普通消息\n\n

event: token\ndata: 你好\n\n

event: done\ndata: [DONE]\n\n
字段说明
data消息内容,可以有多行(每行都以data: 开头)
event事件类型,不指定则默认message
id消息ID,客户端可以用Last-Event-ID重连
retry重连间隔(毫秒)

四、Flask实现SSE

来看一个完整的SSE实现:

python
from flask import Flask, Response, request

app = Flask(__name__)


@app.route("/stream", methods=["GET"])
def stream():
    def generate():
        # 模拟逐字输出
        text = "你好!我是AI助手,很高兴为你服务。"
        for char in text:
            yield f"data: {char}\n\n"

        # 发送结束标记
        yield "data: [DONE]\n\n"

    return Response(
        generate(),
        content_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Nginx不缓冲
        },
    )

几个关键点:

要素说明
content_type="text/event-stream"告诉客户端这是SSE流
Cache-Control: no-cache禁止缓存,保证实时性
X-Accel-Buffering: no告诉Nginx不要缓冲,否则数据会攒一堆再发
yield "data: ...\n\n"每条消息以data: 开头,\n\n结尾

4.1 客户端接收(JavaScript)

浏览器端用EventSource接收SSE:

javascript
const eventSource = new EventSource("/stream");

eventSource.onmessage = function(event) {
    if (event.data === "[DONE]") {
        eventSource.close();
        return;
    }
    // 把收到的字符追加到页面上
    document.getElementById("output").textContent += event.data;
};

eventSource.onerror = function() {
    console.log("连接出错");
    eventSource.close();
};

4.2 客户端接收(fetch + ReadableStream)

如果需要POST请求(发送用户消息),EventSource不支持POST,需要用fetch

javascript
async function chat(message) {
    const response = await fetch("/chat", {
        method: "POST",
        headers: {"Content-Type": "application/json"},
        body: JSON.stringify({message: message}),
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
        const {done, value} = await reader.read();
        if (done) break;

        const text = decoder.decode(value);
        // 解析SSE格式
        const lines = text.split("\n");
        for (const line of lines) {
            if (line.startsWith("data: ")) {
                const data = line.slice(6);
                if (data === "[DONE]") return;
                document.getElementById("output").textContent += data;
            }
        }
    }
}

五、流式Agent API

把SSE和Agent结合起来,实现真正的流式对话:

python
from flask import Flask, Response, request, jsonify
import time

app = Flask(__name__)


@app.route("/chat", methods=["POST"])
def chat():
    """流式对话接口"""
    data = request.get_json()
    if not data or "message" not in data:
        return jsonify({"error": "缺少message字段"}), 400

    message = data["message"]

    def generate():
        # 这里后面会替换成真正的Agent流式调用
        reply = f"你说的是「{message}」,这是一个模拟的流式回复。"
        for char in reply:
            yield f"data: {char}\n\n"
            time.sleep(0.05)  # 模拟LLM生成延迟

        yield "data: [DONE]\n\n"

    return Response(
        generate(),
        content_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )

测试:

bash
curl -N -X POST http://127.0.0.1:5000/chat \
  -H "Content-Type: application/json" \
  -d '{"message": "你好"}'

-N参数禁用curl的缓冲,这样你能实时看到数据逐字输出。

六、stream_with_context

在生成器函数内部,不能直接访问request对象。因为Flask在开始流式响应后,请求上下文已经结束了。

python
# ❌ 错误:生成器内访问request会报错
@app.route("/stream")
def stream():
    def generate():
        name = request.args.get("name")  # RuntimeError!
        yield f"Hello {name}"
    return generate()

解决方案是用stream_with_context包装生成器,保持请求上下文在流式过程中一直有效:

python
from flask import stream_with_context

# ✅ 正确:用stream_with_context保持上下文
@app.route("/stream")
def stream():
    name = request.args.get("name", "World")

    def generate():
        # 这里可以安全地使用在generate之前获取的变量
        yield f"Hello {name}\n\n"
        for i in range(3):
            yield f"data: 第{i+1}条消息\n\n"

    return Response(
        stream_with_context(generate()),
        content_type="text/event-stream",
    )

最佳实践:在生成器外部获取请求数据,在生成器内部只使用已经获取的变量

python
@app.route("/chat", methods=["POST"])
def chat():
    # 在生成器外部获取请求数据
    data = request.get_json()
    message = data.get("message", "")

    def generate():
        # 在生成器内部使用已经获取的变量,不要访问request
        for char in f"收到: {message}":
            yield f"data: {char}\n\n"
        yield "data: [DONE]\n\n"

    return Response(
        stream_with_context(generate()),
        content_type="text/event-stream",
    )

七、JSON格式的SSE

实际项目中,你可能想在SSE消息中传递更多结构化信息,而不只是纯文本。可以把每条消息包装成JSON:

python
import json

@app.route("/chat", methods=["POST"])
def chat():
    data = request.get_json()
    message = data.get("message", "")

    def generate():
        # 每条消息是JSON格式
        reply = f"收到: {message}"
        for char in reply:
            chunk = {"content": char, "type": "token"}
            yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"

        # 结束标记也是JSON
        done = {"type": "done"}
        yield f"data: {json.dumps(done)}\n\n"

    return Response(
        stream_with_context(generate()),
        content_type="text/event-stream",
    )

客户端收到的就是:

data: {"content": "收", "type": "token"}

data: {"content": "到", "type": "token"}

data: {"content": ":", "type": "token"}

...

data: {"type": "done"}

这种方式的好处是可以同时传递多种信息——token内容、工具调用状态、错误信息等等。

八、事件类型

SSE支持自定义事件类型,用event:字段:

python
def generate():
    # 普通token
    yield "event: token\ndata: 你好\n\n"

    # 工具调用开始
    yield "event: tool_start\ndata: {\"tool\": \"search\"}\n\n"

    # 工具调用结果
    yield "event: tool_result\ndata: {\"result\": \"...\"}\n\n"

    # 生成结束
    yield "event: done\ndata: {}\n\n"

客户端根据事件类型分别处理:

javascript
const eventSource = new EventSource("/chat");

eventSource.addEventListener("token", (e) => {
    // 显示token
    output.textContent += e.data;
});

eventSource.addEventListener("tool_start", (e) => {
    // 显示"正在搜索..."
    const tool = JSON.parse(e.data);
    status.textContent = `正在调用 ${tool.tool}...`;
});

eventSource.addEventListener("done", (e) => {
    eventSource.close();
});

九、注意事项

9.1 顺序保证

SSE保证消息按发送顺序到达客户端。你不用担心消息乱序的问题。

9.2 连接超时

长时间没有数据发送,中间代理(Nginx、CDN等)可能会断开连接。解决方案是定期发送心跳:

python
import time

def generate():
    last_send = time.time()
    for token in agent_stream():
        yield f"data: {token}\n\n"
        last_send = time.time()

        # 每15秒检查一次,如果没有数据就发心跳
        if time.time() - last_send > 15:
            yield ": keepalive\n\n"  # 注释行,客户端会忽略

9.3 Nginx配置

如果用了Nginx反向代理,需要关闭缓冲:

nginx
location /api/ {
    proxy_pass http://127.0.0.1:5000;
    proxy_buffering off;          # 关闭缓冲
    proxy_cache off;              # 关闭缓存
    proxy_read_timeout 300s;      # 超时时间设长一点
    chunked_transfer_encoding on;
}

十、总结

流式响应是Agent API的核心特性:

  • 生成器:用yield一块一块发送数据
  • SSE格式data: ...\n\n,简单可靠
  • stream_with_context:在生成器中安全使用请求数据
  • JSON消息:传递结构化的token和事件信息
  • Nginx配置:关闭proxy_buffering保证实时性

有了流式响应,你的Agent API就能像ChatGPT一样,让用户看到逐字输出的效果。

在下一篇文章中,我们将学习配置管理与App Factory模式——当你的Agent API从开发走向生产,需要管理不同环境的配置、API密钥等,这些都需要一套规范的方案。