跳转至

第二章:SSE 实现

FastAPI 实现

基本实现

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def event_generator():
    for i in range(10):
        data = json.dumps({"count": i, "message": f"消息 {i}"})
        yield f"data: {data}\n\n"
        await asyncio.sleep(1)

@app.get("/events")
async def sse_events():
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

带事件类型

async def event_generator():
    for i in range(10):
        data = json.dumps({"count": i})
        yield f"event: update\ndata: {data}\n\n"
        await asyncio.sleep(1)

    # 发送完成事件
    yield "event: done\ndata: {}\n\n"

带 ID 和重试

async def event_generator():
    for i in range(10):
        data = json.dumps({"count": i})
        yield f"id: {i}\nevent: message\ndata: {data}\nretry: 3000\n\n"
        await asyncio.sleep(1)

客户端断开检测

from fastapi import Request

@app.get("/events")
async def sse_events(request: Request):
    async def event_generator():
        for i in range(100):
            # 检测客户端是否断开
            if await request.is_disconnected():
                print("客户端已断开")
                break

            data = json.dumps({"count": i})
            yield f"data: {data}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

AI 流式响应

from openai import OpenAI

client = OpenAI()

@app.get("/chat")
async def chat_stream(prompt: str):
    async def generate():
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )

        for chunk in response:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                # 转义换行符
                content = content.replace("\n", "\\n")
                yield f"data: {json.dumps({'content': content})}\n\n"

        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

前端完整示例

<!DOCTYPE html>
<html>
<head>
    <title>SSE 示例</title>
</head>
<body>
    <div id="messages"></div>
    <button onclick="start()">开始</button>
    <button onclick="stop()">停止</button>

    <script>
        let eventSource = null;

        function start() {
            eventSource = new EventSource('/events');

            eventSource.onmessage = (event) => {
                const data = JSON.parse(event.data);
                addMessage(data.message);
            };

            eventSource.addEventListener('update', (event) => {
                const data = JSON.parse(event.data);
                addMessage(`更新: ${data.count}`);
            });

            eventSource.onerror = (error) => {
                console.error('错误:', error);
                addMessage('连接错误,正在重连...');
            };
        }

        function stop() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
            }
        }

        function addMessage(text) {
            const div = document.getElementById('messages');
            div.innerHTML += `<p>${text}</p>`;
        }
    </script>
</body>
</html>

使用 sse-starlette

pip install sse-starlette
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse

app = FastAPI()

async def event_generator():
    for i in range(10):
        yield {
            "event": "message",
            "id": str(i),
            "data": f"消息 {i}"
        }
        await asyncio.sleep(1)

@app.get("/events")
async def events():
    return EventSourceResponse(event_generator())

小结

本章学习了:

  • ✅ FastAPI 实现 SSE
  • ✅ 事件类型和 ID
  • ✅ 客户端断开检测
  • ✅ AI 流式响应
  • ✅ 前端完整示例
  • ✅ sse-starlette 库

恭喜完成 SSE 教程! 🎉