还在用WebSocket做LLM流式传输?FastAPI + SSE让你少踩一半坑
版权声明
我们非常重视原创文章,为尊重知识产权并避免潜在的版权问题,我们在此提供文章的摘要供您初步了解。如果您想要查阅更为详尽的内容,访问作者的公众号页面获取完整文章。
你以为实时通信必须用WebSocket?其实SSE才是隐藏的王者
“这个LLM对话页面的流式输出怎么老断?”
“负载均衡又报WebSocket升级失败……”
“粘性会话没配好,用户被路由到不同节点,对话上下文丢了!”
如果你正在用WebSocket给LLM应用做token流式传输,上面这些坑你大概率踩过。WebSocket确实能干活,但它带来的麻烦也不少:连接升级、负载均衡配置、代理拦截、断线重连……尤其是在生产环境,每一样都可能让你半夜爬起来修bug。
但你可能不知道,绝大多数LLM流式传输场景,根本不需要WebSocket。
今天我要给你介绍一个更简单、更稳定的方案:Server-Sent Events(SSE)。它基于普通HTTP,浏览器原生支持,还能自动重连。配合FastAPI的异步能力,几行代码就能实现丝滑的token流式传输。更重要的是,它能让你的部署架构回归简单,告别那些烦人的WebSocket基础设施问题。
本文会带你从零实现一个生产可用的SSE流式接口,并深入探讨SSE在实时通知、性能优化等方面的最佳实践。读完你会发现,原来实时通信可以这么清爽。
SSE是什么?用一个外卖订单的比喻让你秒懂
想象你在外卖App上点了一份餐。WebSocket相当于你骑手的专属电话——你可以随时打电话问骑手到哪了,骑手也可以随时打电话告诉你“快到了”。双向沟通,非常灵活,但每个骑手都需要一个电话线路,管理起来麻烦。
而SSE(Server-Sent Events) 更像是一个订单状态推送服务:你下单后,系统会不断给你发短信“商家已接单”“骑手已取餐”“距您还有1公里”……你只需要收短信,不需要回复。如果信号不好漏掉一条,系统会自动重发。
正式定义:SSE是HTML5标准的一部分,允许服务器通过HTTP连接向客户端推送数据。客户端通过JavaScript的EventSource接口监听事件,连接断开时会自动重连,服务器可以给每个事件指定ID,确保断点续传。
相比WebSocket,SSE最大的特点就是单向:只能服务器→客户端。但这恰恰完美匹配LLM token流式传输和实时通知的场景——客户端只需要接收服务器吐出来的文字。
对比一下:
FastAPI + SSE实战:流式输出LLM token
环境准备
确保你安装了Python 3.10+和FastAPI 0.115+(本文代码基于这些版本测试)。还需要一个ASGI服务器,比如Uvicorn:
pip install fastapi uvicorn
核心实现:StreamingResponse + 异步生成器
FastAPI的StreamingResponse可以直接返回一个异步生成器,这正是SSE需要的——持续不断地输出数据。我们按SSE协议格式组装消息:
响应头: Content-Type: text/event-stream每个事件由若干行组成,以空行分隔 常用字段: event:事件类型,data:数据(JSON格式),id:事件ID注释行(以 :开头)可用来发送心跳,不会触发客户端事件
下面是一个完整的SSE端点,模拟LLM流式输出:
import asyncioimport jsonimport timefrom typing import AsyncGeneratorfrom fastapi import FastAPI, Requestfrom fastapi.responses import StreamingResponseapp = FastAPI()def format_sse(event: str, data: dict) -> str: """将Python对象格式化为SSE消息""" returnf"event: {event}\n" + f"data: {json.dumps(data, ensure_ascii=False)}\n\n"asyncdef fake_llm_stream(prompt: str) -> AsyncGenerator[str, None]: """模拟LLM流式生成token,实际可替换为OpenAI、智谱等API""" # 这里是一个模拟的token流 for token in ["好的", ",", "这是", "你", "的", "回答", "……"]: await asyncio.sleep(0.1) # 模拟生成延迟 yield token@app.get("/stream")asyncdef stream(prompt: str, request: Request): asyncdef event_generator(): # 可选:发送一个元事件,让客户端知道连接已建立 yield format_sse("meta", {"status": "started", "ts": time.time()}) last_ping = time.time() try: asyncfor token in fake_llm_stream(prompt): # ⚠️ 关键检查:如果客户端断开连接,立即停止生成 ifawait request.is_disconnected(): break yield format_sse("token", {"t": token}) # 心跳:每15秒发送一个注释行,防止代理超时 now = time.time() if now - last_ping > 15: yield": ping\n\n"# 注释行,不会触发事件 last_ping = now # 流结束事件 yield format_sse("done", {"status": "completed", "ts": time.time()}) except Exception as e: # 错误处理:不要把敏感信息发出去 yield format_sse("error", {"message": "stream_failed", "detail": str(e)[:200]}) headers = { "Cache-Control": "no-cache", "Connection": "keep-alive", # 关键:告诉nginx等代理不要缓冲 "X-Accel-Buffering": "no", } return StreamingResponse( event_generator(), media_type="text/event-stream", headers=headers )
代码解释:
format_sse辅助函数确保格式正确,注意最后的两个换行符不可省略。event_generator内部使用异步生成器,每次yield都会立刻通过网络发送出去。request.is_disconnected()是FastAPI提供的方法,用于检测客户端是否已关闭连接。如果不检查,用户关闭页面后服务器还在拼命生成token,白白浪费CPU。心跳注释行 : ping\n\n是SSE协议允许的,它可以保持连接活跃,而不会触发客户端的onmessage事件。X-Accel-Buffering: no是给nginx的指令,禁止它对响应进行缓冲。如果不加,nginx可能会攒一堆数据再一起发,流式效果就没了。
浏览器客户端:简单到难以置信
SSE的客户端API是浏览器内置的,无需任何第三方库:
<pre id="output">pre> <script> const prompt = encodeURIComponent("写一首关于代码的诗"); const es = new EventSource(`/stream?prompt=${prompt}`); const output = document.getElementById('output'); let text = ''; es.addEventListener('token', (e) => { const { t } = JSON.parse(e.data); text += t; output.textContent = text; }); es.addEventListener('done', () => { es.close(); // 关闭连接,释放资源 console.log('流式输出完成'); }); es.addEventListener('error', (e) => { console.error('连接出错,自动重连中...', e); // 如果不想自动重连,可以调用 es.close() }); script>
对比WebSocket:不需要管理重连逻辑,不需要处理二进制帧,不需要担心握手失败。EventSource在连接断开后默认会等待几秒后自动重连,而且如果服务器发送了id字段,重连时会自动带上Last-Event-ID头,让服务器可以从断点继续发送,避免消息丢失。
扩展思考:不止LLM,实时通知也能轻松搞定
SSE的应用场景远不止LLM流式传输。任何需要服务器主动推送的场景,比如实时通知、仪表盘数据更新、股票行情,都可以用SSE优雅实现。
实战:基于Redis Pub/Sub的广播通知
假设我们有一个需要向所有在线用户推送通知的系统。当后台任务完成、新消息到达时,我们希望所有订阅了通知的用户实时收到。架构如下:
[后台任务] → Redis发布消息 → [所有FastAPI进程订阅] → [通过SSE推送给各自连接的客户端]
使用FastAPI+Redis实现:
asyncdef event_stream(user_id: str): pubsub = redis.pubsub() await pubsub.subscribe("global_notifications") # 订阅全局频道 try: asyncfor message in pubsub.listen(): if message['type'] == 'message': data = json.loads(message['data']) # 可以按user_id过滤,也可以广播给所有人 yield format_sse("notification", data) finally: await pubsub.unsubscribe("global_notifications") @app.get("/notifications/{user_id}") asyncdef notifications(user_id: str): return StreamingResponse(event_stream(user_id), media_type="text/event-stream")
任何地方需要推送通知时:
await redis.publish("global_notifications", json.dumps({"text": "新订单!", "user_id": 123}))
所有连接到/notifications/的客户端都会立即收到这条消息。你可以用类似的方式实现更精细的按用户隔离(每个用户订阅自己的频道)。
⚠️ 注意:多进程部署时,每个进程都有自己的Redis订阅,因此所有进程都能收到消息并推送给自己的客户端。这要求你的Redis是共享的,且进程数不能太多(否则Redis连接数暴涨)。大规模时可考虑用Kafka或NATS替代。
生产环境避坑指南
SSE虽然简单,但在生产环境中仍有几个容易踩的坑,我帮你提前列出来:
1. 代理缓冲会毁掉流式效果
Nginx默认会缓冲后端响应,直到攒够一定大小才发给客户端。这对于SSE是灾难性的——用户会看到文字“一顿一顿”地出现。解决方案:
**在FastAPI返回头中添加 X-Accel-Buffering: no**(仅对nginx有效)或者在nginx配置中针对SSE路径关闭缓冲:
location /stream { proxy_buffering off; proxy_cache off; proxy_set_header X-Accel-Buffering no; # 其他代理设置...}
对于其他代理(如ALB),也需要查阅文档禁用缓冲。
2. 超时问题:心跳+适当配置
许多负载均衡器有默认的60秒空闲超时。如果LLM思考时间超过60秒(比如复杂推理),连接就会被切断。解决:
发送心跳注释行,如上文代码中的
数据STUDIO
点击领取《Python学习手册》,后台回复「福利」获取。『数据STUDIO』专注于数据科学原创文章分享,内容以 Python 为核心语言,涵盖机器学习、数据分析、可视化、MySQL等领域干货知识总结及实战项目。
还在用多套工具管项目?
一个平台搞定产品、项目、质量与效能,告别整合之苦,实现全流程闭环。
白皮书上线