还在用WebSocket做LLM流式传输?FastAPI + SSE让你少踩一半坑

SSE 客户端 服务器 流式 WebSocket
发布于 2026-06-12
1

我们非常重视原创文章,为尊重知识产权并避免潜在的版权问题,我们在此提供文章的摘要供您初步了解。如果您想要查阅更为详尽的内容,访问作者的公众号页面获取完整文章。

扫码阅读
手机扫码阅读
你以为实时通信必须用WebSocket?其实SSE才是隐藏的王者

“这个LLM对话页面的流式输出怎么老断?”
“负载均衡又报WebSocket升级失败……”
“粘性会话没配好,用户被路由到不同节点,对话上下文丢了!”

如果你正在用WebSocket给LLM应用做token流式传输,上面这些坑你大概率踩过。WebSocket确实能干活,但它带来的麻烦也不少:连接升级、负载均衡配置、代理拦截、断线重连……尤其是在生产环境,每一样都可能让你半夜爬起来修bug。

但你可能不知道,绝大多数LLM流式传输场景,根本不需要WebSocket。

今天我要给你介绍一个更简单、更稳定的方案:Server-Sent Events(SSE)。它基于普通HTTP,浏览器原生支持,还能自动重连。配合FastAPI的异步能力,几行代码就能实现丝滑的token流式传输。更重要的是,它能让你的部署架构回归简单,告别那些烦人的WebSocket基础设施问题。

本文会带你从零实现一个生产可用的SSE流式接口,并深入探讨SSE在实时通知、性能优化等方面的最佳实践。读完你会发现,原来实时通信可以这么清爽。

SSE是什么?用一个外卖订单的比喻让你秒懂

想象你在外卖App上点了一份餐。WebSocket相当于你骑手的专属电话——你可以随时打电话问骑手到哪了,骑手也可以随时打电话告诉你“快到了”。双向沟通,非常灵活,但每个骑手都需要一个电话线路,管理起来麻烦。

SSE(Server-Sent Events) 更像是一个订单状态推送服务:你下单后,系统会不断给你发短信“商家已接单”“骑手已取餐”“距您还有1公里”……你只需要收短信,不需要回复。如果信号不好漏掉一条,系统会自动重发。

正式定义:SSE是HTML5标准的一部分,允许服务器通过HTTP连接向客户端推送数据。客户端通过JavaScript的EventSource接口监听事件,连接断开时会自动重连,服务器可以给每个事件指定ID,确保断点续传。

相比WebSocket,SSE最大的特点就是单向:只能服务器→客户端。但这恰恰完美匹配LLM token流式传输和实时通知的场景——客户端只需要接收服务器吐出来的文字。

对比一下

特性
长轮询
SSE
WebSocket
方向
双向(但低效)
单向(服务器→客户端)
双向
协议
HTTP
HTTP
独立协议(ws/wss)
浏览器支持
全支持
全支持(IE除外)
全支持
自动重连
需手动实现
原生支持
需手动实现
穿透性
最好
好(基于HTTP)
可能被代理拦截
适用场景
老旧系统
服务器推送(通知、流式输出)
实时互动(聊天、游戏)

FastAPI + SSE实战:流式输出LLM token

环境准备

确保你安装了Python 3.10+和FastAPI 0.115+(本文代码基于这些版本测试)。还需要一个ASGI服务器,比如Uvicorn:

pip install fastapi uvicorn

核心实现:StreamingResponse + 异步生成器

FastAPI的StreamingResponse可以直接返回一个异步生成器,这正是SSE需要的——持续不断地输出数据。我们按SSE协议格式组装消息:

  • 响应头:Content-Type: text/event-stream
  • 每个事件由若干行组成,以空行分隔
  • 常用字段:event: 事件类型,data: 数据(JSON格式),id: 事件ID
  • 注释行(以: 开头)可用来发送心跳,不会触发客户端事件

下面是一个完整的SSE端点,模拟LLM流式输出:

import asyncioimport jsonimport timefrom typing import AsyncGeneratorfrom fastapi import FastAPI, Requestfrom fastapi.responses import StreamingResponseapp = FastAPI()def format_sse(event: str, data: dict) -> str: """将Python对象格式化为SSE消息""" returnf"event: {event}\n" + f"data: {json.dumps(data, ensure_ascii=False)}\n\n"asyncdef fake_llm_stream(prompt: str) -> AsyncGenerator[str, None]: """模拟LLM流式生成token,实际可替换为OpenAI、智谱等API""" # 这里是一个模拟的token流 for token in ["好的", ",", "这是", "你", "的", "回答", "……"]: await asyncio.sleep(0.1) # 模拟生成延迟 yield token@app.get("/stream")asyncdef stream(prompt: str, request: Request): asyncdef event_generator(): # 可选:发送一个元事件,让客户端知道连接已建立 yield format_sse("meta", {"status": "started", "ts": time.time()})  last_ping = time.time()  try: asyncfor token in fake_llm_stream(prompt): # ⚠️ 关键检查:如果客户端断开连接,立即停止生成 ifawait request.is_disconnected(): break  yield format_sse("token", {"t": token})  # 心跳:每15秒发送一个注释行,防止代理超时 now = time.time() if now - last_ping > 15: yield": ping\n\n"# 注释行,不会触发事件 last_ping = now  # 流结束事件 yield format_sse("done", {"status": "completed", "ts": time.time()}) except Exception as e: # 错误处理:不要把敏感信息发出去 yield format_sse("error", {"message": "stream_failed", "detail": str(e)[:200]})  headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", # 关键:告诉nginx等代理不要缓冲 "X-Accel-Buffering": "no", } return StreamingResponse( event_generator(), media_type="text/event-stream", headers=headers )

代码解释

  • format_sse 辅助函数确保格式正确,注意最后的两个换行符不可省略。
  • event_generator 内部使用异步生成器,每次yield都会立刻通过网络发送出去。
  • request.is_disconnected() 是FastAPI提供的方法,用于检测客户端是否已关闭连接。如果不检查,用户关闭页面后服务器还在拼命生成token,白白浪费CPU
  • 心跳注释行 : ping\n\n 是SSE协议允许的,它可以保持连接活跃,而不会触发客户端的onmessage事件。
  • X-Accel-Buffering: no 是给nginx的指令,禁止它对响应进行缓冲。如果不加,nginx可能会攒一堆数据再一起发,流式效果就没了。

浏览器客户端:简单到难以置信

SSE的客户端API是浏览器内置的,无需任何第三方库:

<pre id="output">pre> <script>  const prompt = encodeURIComponent("写一首关于代码的诗"); const es = new EventSource(`/stream?prompt=${prompt}`); const output = document.getElementById('output'); let text = '';  es.addEventListener('token', (e) => {  const { t } = JSON.parse(e.data);  text += t;  output.textContent = text;  });  es.addEventListener('done', () => {  es.close(); // 关闭连接,释放资源  console.log('流式输出完成');  });  es.addEventListener('error', (e) => {  console.error('连接出错,自动重连中...', e);  // 如果不想自动重连,可以调用 es.close()  }); script> 

对比WebSocket:不需要管理重连逻辑,不需要处理二进制帧,不需要担心握手失败。EventSource在连接断开后默认会等待几秒后自动重连,而且如果服务器发送了id字段,重连时会自动带上Last-Event-ID头,让服务器可以从断点继续发送,避免消息丢失。

扩展思考:不止LLM,实时通知也能轻松搞定

SSE的应用场景远不止LLM流式传输。任何需要服务器主动推送的场景,比如实时通知、仪表盘数据更新、股票行情,都可以用SSE优雅实现。

实战:基于Redis Pub/Sub的广播通知

假设我们有一个需要向所有在线用户推送通知的系统。当后台任务完成、新消息到达时,我们希望所有订阅了通知的用户实时收到。架构如下:

[后台任务] → Redis发布消息 → [所有FastAPI进程订阅] → [通过SSE推送给各自连接的客户端]

使用FastAPI+Redis实现:

asyncdef event_stream(user_id: str):  pubsub = redis.pubsub()  await pubsub.subscribe("global_notifications") # 订阅全局频道  try:  asyncfor message in pubsub.listen():  if message['type'] == 'message':  data = json.loads(message['data'])  # 可以按user_id过滤,也可以广播给所有人  yield format_sse("notification", data)  finally:  await pubsub.unsubscribe("global_notifications") @app.get("/notifications/{user_id}") asyncdef notifications(user_id: str):  return StreamingResponse(event_stream(user_id), media_type="text/event-stream") 

任何地方需要推送通知时:

await redis.publish("global_notifications", json.dumps({"text": "新订单!", "user_id": 123}))

所有连接到/notifications/的客户端都会立即收到这条消息。你可以用类似的方式实现更精细的按用户隔离(每个用户订阅自己的频道)。

⚠️ 注意:多进程部署时,每个进程都有自己的Redis订阅,因此所有进程都能收到消息并推送给自己的客户端。这要求你的Redis是共享的,且进程数不能太多(否则Redis连接数暴涨)。大规模时可考虑用Kafka或NATS替代。

生产环境避坑指南

SSE虽然简单,但在生产环境中仍有几个容易踩的坑,我帮你提前列出来:

1. 代理缓冲会毁掉流式效果

Nginx默认会缓冲后端响应,直到攒够一定大小才发给客户端。这对于SSE是灾难性的——用户会看到文字“一顿一顿”地出现。解决方案:

  • **在FastAPI返回头中添加 X-Accel-Buffering: no**(仅对nginx有效)
  • 或者在nginx配置中针对SSE路径关闭缓冲:
location /stream { proxy_buffering off; proxy_cache off; proxy_set_header X-Accel-Buffering no; # 其他代理设置...}

对于其他代理(如ALB),也需要查阅文档禁用缓冲。

2. 超时问题:心跳+适当配置

许多负载均衡器有默认的60秒空闲超时。如果LLM思考时间超过60秒(比如复杂推理),连接就会被切断。解决:

  • 发送心跳注释行,如上文代码中的

数据STUDIO

点击领取《Python学习手册》,后台回复「福利」获取。『数据STUDIO』专注于数据科学原创文章分享,内容以 Python 为核心语言,涵盖机器学习、数据分析、可视化、MySQL等领域干货知识总结及实战项目。

158 篇文章
浏览 204K

还在用多套工具管项目?

一个平台搞定产品、项目、质量与效能,告别整合之苦,实现全流程闭环。

加入社区微信群
与行业大咖零距离交流学习
PMO实践白皮书
白皮书上线