04 流式响应
前面我们写的Agent API,都是等Agent处理完再一次性返回结果。但你用ChatGPT、DeepSeek的时候,回答是一个字一个字蹦出来的,不用等全部生成完就能看到内容。
这就是流式响应——服务端一边生成数据,一边发给客户端,不用等全部准备好。
对Agent来说,流式响应几乎是必须的。因为LLM生成回答需要时间,如果等全部生成完再返回,用户可能要等5-10秒才能看到任何内容。流式模式下,第一个token出来就能显示,用户体验好得多。
打个比方:非流式就像你点了一份外卖,做好了才一起送来;流式就像回转寿司,厨师做好一盘你就拿到一盘,不用等全部做完。
一、基本原理
HTTP响应分两部分:头部和体部。正常情况下,服务器准备好完整的响应体,一次性发给客户端。
流式响应不一样——服务器先发头部,然后通过**生成器(generator)**一块一块地发送响应体。每yield一次,就有一段数据发给客户端。
@app.route("/stream")
def stream():
def generate():
for i in range(5):
yield f"第{i+1}条数据\n"
return generate(), {"Content-Type": "text/plain"}客户端会陆续收到:
第1条数据
第2条数据
第3条数据
第4条数据
第5条数据关键点是yield——每次yield的数据会立即发给客户端,不用等函数执行完。
二、Generator生成器
如果你不熟悉Python的生成器,这里简单解释一下。普通函数用return返回,调用一次就结束。生成器用yield返回,每次调用会从上次yield的地方继续执行:
def normal_func():
return 1
return 2 # 永远不会执行
def generator_func():
yield 1 # 第一次调用到这里暂停
yield 2 # 第二次调用从这里继续
yield 3 # 第三次调用从这里继续
gen = generator_func()
print(next(gen)) # 1
print(next(gen)) # 2
print(next(gen)) # 3Flask接收到生成器后,会不断调用next()获取数据,每拿到一块就发给客户端。
三、SSE格式
对于Agent API,我们通常用**SSE(Server-Sent Events)**格式来做流式响应。SSE是一种基于HTTP的单向推送协议,格式很简单:
data: 第一条消息\n\n
data: 第二条消息\n\n
data: 第三条消息\n\n每条消息以data: 开头,以\n\n(两个换行)结尾。
3.1 为什么用SSE
| 方式 | 特点 | 适合场景 |
|---|---|---|
| 普通流式 | 原始数据流 | 文件下载、CSV导出 |
| SSE | 结构化的事件格式 | Agent逐字输出、实时通知 |
| WebSocket | 双向通信 | 聊天室、游戏 |
SSE对Agent来说是最合适的:
- 基于HTTP,不需要额外的协议支持
- 浏览器原生支持(
EventSourceAPI) - 自动重连
- 格式简单,实现容易
3.2 SSE数据格式
SSE支持几种字段,最常用的是data:
data: 普通消息\n\n
event: token\ndata: 你好\n\n
event: done\ndata: [DONE]\n\n| 字段 | 说明 |
|---|---|
data | 消息内容,可以有多行(每行都以data: 开头) |
event | 事件类型,不指定则默认message |
id | 消息ID,客户端可以用Last-Event-ID重连 |
retry | 重连间隔(毫秒) |
四、Flask实现SSE
来看一个完整的SSE实现:
from flask import Flask, Response, request
app = Flask(__name__)
@app.route("/stream", methods=["GET"])
def stream():
def generate():
# 模拟逐字输出
text = "你好!我是AI助手,很高兴为你服务。"
for char in text:
yield f"data: {char}\n\n"
# 发送结束标记
yield "data: [DONE]\n\n"
return Response(
generate(),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Nginx不缓冲
},
)几个关键点:
| 要素 | 说明 |
|---|---|
content_type="text/event-stream" | 告诉客户端这是SSE流 |
Cache-Control: no-cache | 禁止缓存,保证实时性 |
X-Accel-Buffering: no | 告诉Nginx不要缓冲,否则数据会攒一堆再发 |
yield "data: ...\n\n" | 每条消息以data: 开头,\n\n结尾 |
4.1 客户端接收(JavaScript)
浏览器端用EventSource接收SSE:
const eventSource = new EventSource("/stream");
eventSource.onmessage = function(event) {
if (event.data === "[DONE]") {
eventSource.close();
return;
}
// 把收到的字符追加到页面上
document.getElementById("output").textContent += event.data;
};
eventSource.onerror = function() {
console.log("连接出错");
eventSource.close();
};4.2 客户端接收(fetch + ReadableStream)
如果需要POST请求(发送用户消息),EventSource不支持POST,需要用fetch:
async function chat(message) {
const response = await fetch("/chat", {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({message: message}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const {done, value} = await reader.read();
if (done) break;
const text = decoder.decode(value);
// 解析SSE格式
const lines = text.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") return;
document.getElementById("output").textContent += data;
}
}
}
}五、流式Agent API
把SSE和Agent结合起来,实现真正的流式对话:
from flask import Flask, Response, request, jsonify
import time
app = Flask(__name__)
@app.route("/chat", methods=["POST"])
def chat():
"""流式对话接口"""
data = request.get_json()
if not data or "message" not in data:
return jsonify({"error": "缺少message字段"}), 400
message = data["message"]
def generate():
# 这里后面会替换成真正的Agent流式调用
reply = f"你说的是「{message}」,这是一个模拟的流式回复。"
for char in reply:
yield f"data: {char}\n\n"
time.sleep(0.05) # 模拟LLM生成延迟
yield "data: [DONE]\n\n"
return Response(
generate(),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)测试:
curl -N -X POST http://127.0.0.1:5000/chat \
-H "Content-Type: application/json" \
-d '{"message": "你好"}'-N参数禁用curl的缓冲,这样你能实时看到数据逐字输出。
六、stream_with_context
在生成器函数内部,不能直接访问request对象。因为Flask在开始流式响应后,请求上下文已经结束了。
# ❌ 错误:生成器内访问request会报错
@app.route("/stream")
def stream():
def generate():
name = request.args.get("name") # RuntimeError!
yield f"Hello {name}"
return generate()解决方案是用stream_with_context包装生成器,保持请求上下文在流式过程中一直有效:
from flask import stream_with_context
# ✅ 正确:用stream_with_context保持上下文
@app.route("/stream")
def stream():
name = request.args.get("name", "World")
def generate():
# 这里可以安全地使用在generate之前获取的变量
yield f"Hello {name}!\n\n"
for i in range(3):
yield f"data: 第{i+1}条消息\n\n"
return Response(
stream_with_context(generate()),
content_type="text/event-stream",
)最佳实践:在生成器外部获取请求数据,在生成器内部只使用已经获取的变量:
@app.route("/chat", methods=["POST"])
def chat():
# 在生成器外部获取请求数据
data = request.get_json()
message = data.get("message", "")
def generate():
# 在生成器内部使用已经获取的变量,不要访问request
for char in f"收到: {message}":
yield f"data: {char}\n\n"
yield "data: [DONE]\n\n"
return Response(
stream_with_context(generate()),
content_type="text/event-stream",
)七、JSON格式的SSE
实际项目中,你可能想在SSE消息中传递更多结构化信息,而不只是纯文本。可以把每条消息包装成JSON:
import json
@app.route("/chat", methods=["POST"])
def chat():
data = request.get_json()
message = data.get("message", "")
def generate():
# 每条消息是JSON格式
reply = f"收到: {message}"
for char in reply:
chunk = {"content": char, "type": "token"}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
# 结束标记也是JSON
done = {"type": "done"}
yield f"data: {json.dumps(done)}\n\n"
return Response(
stream_with_context(generate()),
content_type="text/event-stream",
)客户端收到的就是:
data: {"content": "收", "type": "token"}
data: {"content": "到", "type": "token"}
data: {"content": ":", "type": "token"}
...
data: {"type": "done"}这种方式的好处是可以同时传递多种信息——token内容、工具调用状态、错误信息等等。
八、事件类型
SSE支持自定义事件类型,用event:字段:
def generate():
# 普通token
yield "event: token\ndata: 你好\n\n"
# 工具调用开始
yield "event: tool_start\ndata: {\"tool\": \"search\"}\n\n"
# 工具调用结果
yield "event: tool_result\ndata: {\"result\": \"...\"}\n\n"
# 生成结束
yield "event: done\ndata: {}\n\n"客户端根据事件类型分别处理:
const eventSource = new EventSource("/chat");
eventSource.addEventListener("token", (e) => {
// 显示token
output.textContent += e.data;
});
eventSource.addEventListener("tool_start", (e) => {
// 显示"正在搜索..."
const tool = JSON.parse(e.data);
status.textContent = `正在调用 ${tool.tool}...`;
});
eventSource.addEventListener("done", (e) => {
eventSource.close();
});九、注意事项
9.1 顺序保证
SSE保证消息按发送顺序到达客户端。你不用担心消息乱序的问题。
9.2 连接超时
长时间没有数据发送,中间代理(Nginx、CDN等)可能会断开连接。解决方案是定期发送心跳:
import time
def generate():
last_send = time.time()
for token in agent_stream():
yield f"data: {token}\n\n"
last_send = time.time()
# 每15秒检查一次,如果没有数据就发心跳
if time.time() - last_send > 15:
yield ": keepalive\n\n" # 注释行,客户端会忽略9.3 Nginx配置
如果用了Nginx反向代理,需要关闭缓冲:
location /api/ {
proxy_pass http://127.0.0.1:5000;
proxy_buffering off; # 关闭缓冲
proxy_cache off; # 关闭缓存
proxy_read_timeout 300s; # 超时时间设长一点
chunked_transfer_encoding on;
}十、总结
流式响应是Agent API的核心特性:
- 生成器:用
yield一块一块发送数据 - SSE格式:
data: ...\n\n,简单可靠 - stream_with_context:在生成器中安全使用请求数据
- JSON消息:传递结构化的token和事件信息
- Nginx配置:关闭
proxy_buffering保证实时性
有了流式响应,你的Agent API就能像ChatGPT一样,让用户看到逐字输出的效果。
在下一篇文章中,我们将学习配置管理与App Factory模式——当你的Agent API从开发走向生产,需要管理不同环境的配置、API密钥等,这些都需要一套规范的方案。