第二章:SSE 实现¶
FastAPI 实现¶
基本实现¶
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def event_generator():
for i in range(10):
data = json.dumps({"count": i, "message": f"消息 {i}"})
yield f"data: {data}\n\n"
await asyncio.sleep(1)
@app.get("/events")
async def sse_events():
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
带事件类型¶
async def event_generator():
for i in range(10):
data = json.dumps({"count": i})
yield f"event: update\ndata: {data}\n\n"
await asyncio.sleep(1)
# 发送完成事件
yield "event: done\ndata: {}\n\n"
带 ID 和重试¶
async def event_generator():
for i in range(10):
data = json.dumps({"count": i})
yield f"id: {i}\nevent: message\ndata: {data}\nretry: 3000\n\n"
await asyncio.sleep(1)
客户端断开检测¶
from fastapi import Request
@app.get("/events")
async def sse_events(request: Request):
async def event_generator():
for i in range(100):
# 检测客户端是否断开
if await request.is_disconnected():
print("客户端已断开")
break
data = json.dumps({"count": i})
yield f"data: {data}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
AI 流式响应¶
from openai import OpenAI
client = OpenAI()
@app.get("/chat")
async def chat_stream(prompt: str):
async def generate():
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
# 转义换行符
content = content.replace("\n", "\\n")
yield f"data: {json.dumps({'content': content})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
前端完整示例¶
<!DOCTYPE html>
<html>
<head>
<title>SSE 示例</title>
</head>
<body>
<div id="messages"></div>
<button onclick="start()">开始</button>
<button onclick="stop()">停止</button>
<script>
let eventSource = null;
function start() {
eventSource = new EventSource('/events');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
addMessage(data.message);
};
eventSource.addEventListener('update', (event) => {
const data = JSON.parse(event.data);
addMessage(`更新: ${data.count}`);
});
eventSource.onerror = (error) => {
console.error('错误:', error);
addMessage('连接错误,正在重连...');
};
}
function stop() {
if (eventSource) {
eventSource.close();
eventSource = null;
}
}
function addMessage(text) {
const div = document.getElementById('messages');
div.innerHTML += `<p>${text}</p>`;
}
</script>
</body>
</html>
使用 sse-starlette¶
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
async def event_generator():
for i in range(10):
yield {
"event": "message",
"id": str(i),
"data": f"消息 {i}"
}
await asyncio.sleep(1)
@app.get("/events")
async def events():
return EventSourceResponse(event_generator())
小结¶
本章学习了:
- ✅ FastAPI 实现 SSE
- ✅ 事件类型和 ID
- ✅ 客户端断开检测
- ✅ AI 流式响应
- ✅ 前端完整示例
- ✅ sse-starlette 库
恭喜完成 SSE 教程! 🎉