第六章:错误处理¶
服务端错误¶
连接中断¶
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def robust_generator():
try:
count = 0
while True:
count += 1
yield f"data: {json.dumps({'count': count})}\n\n"
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Client disconnected")
# 清理资源
except Exception as e:
print(f"Error: {e}")
yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
@app.get("/robust-events")
async def robust_sse():
return StreamingResponse(
robust_generator(),
media_type="text/event-stream"
)
客户端错误处理¶
class RobustSSEClient {
constructor(url) {
this.url = url
this.eventSource = null
this.reconnectAttempts = 0
this.maxReconnectAttempts = 5
this.connect()
}
connect() {
this.eventSource = new EventSource(this.url)
this.eventSource.onopen = () => {
console.log('Connected')
this.reconnectAttempts = 0
}
this.eventSource.onerror = (error) => {
console.error('Connection error:', error)
if (this.eventSource.readyState === EventSource.CLOSED) {
console.log('Connection closed')
this.handleDisconnect()
} else if (this.eventSource.readyState === EventSource.CONNECTING) {
console.log('Reconnecting...')
}
}
// 监听错误事件
this.eventSource.addEventListener('error', (event) => {
console.error('Server error:', event.data)
})
}
handleDisconnect() {
this.reconnectAttempts++
if (this.reconnectAttempts <= this.maxReconnectAttempts) {
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000)
console.log(`Reconnecting in ${delay}ms...`)
setTimeout(() => this.connect(), delay)
} else {
console.error('Max reconnect attempts reached')
}
}
close() {
if (this.eventSource) {
this.eventSource.close()
}
}
}
小结¶
错误处理要点:
- 服务端错误:try/except、资源清理
- 客户端错误:onerror、重连策略
- 指数退避:延迟重连
下一章我们将学习生产部署。