第十章:最佳实践¶
成本优化¶
选择合适的模型¶
| 场景 | 推荐模型 | 理由 |
|---|---|---|
| 简单问答 | gpt-5.4-nano / DeepSeek Chat | 最便宜 |
| 代码生成 | gpt-5.4-mini / DeepSeek Chat | 性价比高 |
| 复杂推理 | gpt-5.4 / Claude Opus | 能力最强 |
| 长文档 | Kimi / Claude | 支持长上下文 |
| 测试开发 | glm-4-flash | 免费 |
使用 Prompt Caching¶
# OpenAI / DeepSeek 支持缓存
# 系统提示词会被自动缓存
system_prompt = """
你是一个专业的代码审查助手。
请按照以下标准审查代码:
1. 代码风格
2. 性能优化
3. 安全漏洞
4. 可维护性
...(很长的提示词)
"""
# 第一次请求:缓存写入
response = client.chat.completions.create(
model="gpt-5.4-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "审查这段代码..."}
]
)
# 后续请求:缓存命中,费用降低 90%
使用 Batch API¶
# 批量处理可节省 50%
import asyncio
from openai import AsyncOpenAI
async def batch_process(items: list):
"""批量处理"""
client = AsyncOpenAI(api_key="sk-xxx")
tasks = [
client.chat.completions.create(
model="gpt-5.4-nano",
messages=[{"role": "user", "content": item}]
)
for item in items
]
return await asyncio.gather(*tasks)
控制输出长度¶
# 限制输出 token 数
response = client.chat.completions.create(
model="gpt-5.4-mini",
messages=[{"role": "user", "content": "简短介绍 Python"}],
max_tokens=100 # 限制输出
)
费用监控¶
import time
from dataclasses import dataclass
@dataclass
class UsageStats:
total_input_tokens: int = 0
total_output_tokens: int = 0
total_cost: float = 0.0
request_count: int = 0
class CostTracker:
"""费用追踪器"""
PRICES = {
"gpt-5.4": {"input": 2.50, "output": 15.00},
"gpt-5.4-mini": {"input": 0.75, "output": 4.50},
"gpt-5.4-nano": {"input": 0.20, "output": 1.25},
"deepseek-chat": {"input": 0.14, "output": 0.28}, # 人民币转美元
}
def __init__(self):
self.stats = UsageStats()
def track(self, response, model: str):
"""追踪使用量"""
usage = response.usage
price = self.PRICES.get(model, self.PRICES["gpt-5.4-mini"])
input_cost = (usage.prompt_tokens / 1_000_000) * price["input"]
output_cost = (usage.completion_tokens / 1_000_000) * price["output"]
cost = input_cost + output_cost
self.stats.total_input_tokens += usage.prompt_tokens
self.stats.total_output_tokens += usage.completion_tokens
self.stats.total_cost += cost
self.stats.request_count += 1
return cost
def report(self):
"""生成报告"""
return f"""
使用统计:
- 请求数: {self.stats.request_count}
- 输入 Tokens: {self.stats.total_input_tokens:,}
- 输出 Tokens: {self.stats.total_output_tokens:,}
- 总费用: ${self.stats.total_cost:.4f}
"""
# 使用
tracker = CostTracker()
response = client.chat.completions.create(...)
cost = tracker.track(response, "gpt-5.4-mini")
print(f"本次费用: ${cost:.6f}")
print(tracker.report())
安全实践¶
API Key 管理¶
import os
from dotenv import load_dotenv
# 从环境变量加载
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
# 不要硬编码 API Key!
# ❌ api_key = "sk-xxx" # 危险!
输入验证¶
def validate_input(text: str, max_length: int = 10000) -> str:
"""验证用户输入"""
# 长度限制
if len(text) > max_length:
raise ValueError(f"输入过长,最大 {max_length} 字符")
# 敏感词过滤
sensitive_words = ["password", "secret", "token"]
for word in sensitive_words:
if word in text.lower():
raise ValueError(f"输入包含敏感词: {word}")
return text.strip()
输出过滤¶
import re
def sanitize_output(text: str) -> str:
"""过滤输出中的敏感信息"""
# 过滤可能的 API Key
text = re.sub(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED]', text)
# 过滤邮箱
text = re.sub(r'[\w\.-]+@[\w\.-]+\.\w+', '[EMAIL]', text)
# 过滤手机号
text = re.sub(r'1[3-9]\d{9}', '[PHONE]', text)
return text
速率限制¶
import time
from collections import deque
class RateLimiter:
"""速率限制器"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window = window_seconds
self.requests = deque()
def wait_if_needed(self):
"""如果需要则等待"""
now = time.time()
# 清理过期记录
while self.requests and self.requests[0] < now - self.window:
self.requests.popleft()
# 检查是否超限
if len(self.requests) >= self.max_requests:
wait_time = self.requests[0] + self.window - now
if wait_time > 0:
time.sleep(wait_time)
self.requests.append(now)
# 使用:每分钟最多 60 个请求
limiter = RateLimiter(max_requests=60, window_seconds=60)
for prompt in prompts:
limiter.wait_if_needed()
response = client.chat.completions.create(...)
监控告警¶
日志记录¶
import logging
import json
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='llm_api.log'
)
def log_request(model: str, prompt: str, response, latency: float):
"""记录请求日志"""
log_data = {
"timestamp": datetime.now().isoformat(),
"model": model,
"prompt_length": len(prompt),
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens,
"latency_ms": round(latency * 1000, 2),
"success": True
}
logging.info(json.dumps(log_data))
def log_error(model: str, prompt: str, error: Exception):
"""记录错误日志"""
log_data = {
"timestamp": datetime.now().isoformat(),
"model": model,
"prompt_length": len(prompt),
"error": str(error),
"success": False
}
logging.error(json.dumps(log_data))
性能监控¶
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class Metrics:
latency: float
input_tokens: int
output_tokens: int
model: str
success: bool
error: Optional[str] = None
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.metrics = []
def record(self, metrics: Metrics):
"""记录指标"""
self.metrics.append(metrics)
def get_stats(self, minutes: int = 5):
"""获取统计信息"""
cutoff = time.time() - minutes * 60
recent = [m for m in self.metrics if m.latency > 0]
if not recent:
return None
latencies = [m.latency for m in recent]
success_rate = sum(1 for m in recent if m.success) / len(recent)
return {
"avg_latency": sum(latencies) / len(latencies),
"max_latency": max(latencies),
"min_latency": min(latencies),
"success_rate": success_rate,
"request_count": len(recent)
}
告警通知¶
def check_and_alert(monitor: PerformanceMonitor):
"""检查并发送告警"""
stats = monitor.get_stats(minutes=5)
if not stats:
return
alerts = []
# 成功率告警
if stats["success_rate"] < 0.95:
alerts.append(f"⚠️ 成功率低于 95%: {stats['success_rate']:.1%}")
# 延迟告警
if stats["avg_latency"] > 5:
alerts.append(f"⚠️ 平均延迟过高: {stats['avg_latency']:.2f}s")
if alerts:
send_alert("\n".join(alerts))
def send_alert(message: str):
"""发送告警(可对接钉钉、企业微信等)"""
print(f"[ALERT] {message}")
# 实际实现:调用 webhook 发送消息
完整示例¶
import os
import time
import logging
from openai import OpenAI, RateLimitError, APIError
from dotenv import load_dotenv
load_dotenv()
class LLMClient:
"""生产级 LLM 客户端"""
def __init__(
self,
model: str = "gpt-5.4-mini",
max_retries: int = 3,
rate_limit: tuple = (60, 60) # 60 requests per 60 seconds
):
self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.model = model
self.max_retries = max_retries
self.rate_limiter = RateLimiter(*rate_limit)
self.cost_tracker = CostTracker()
self.monitor = PerformanceMonitor()
logging.basicConfig(level=logging.INFO)
def chat(self, prompt: str, **kwargs) -> str:
"""对话"""
# 输入验证
prompt = validate_input(prompt)
# 速率限制
self.rate_limiter.wait_if_needed()
# 重试逻辑
for attempt in range(self.max_retries):
try:
start_time = time.time()
response = self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
latency = time.time() - start_time
# 记录
self.cost_tracker.track(response, self.model)
self.monitor.record(Metrics(
latency=latency,
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
model=self.model,
success=True
))
return sanitize_output(response.choices[0].message.content)
except RateLimitError:
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
except APIError as e:
self.monitor.record(Metrics(
latency=0,
input_tokens=0,
output_tokens=0,
model=self.model,
success=False,
error=str(e)
))
if attempt < self.max_retries - 1:
time.sleep(1)
else:
raise
return None
# 使用
client = LLMClient(model="gpt-5.4-mini")
response = client.chat("介绍一下 Python")
print(response)
print(client.cost_tracker.report())
小结¶
本章学习了:
- ✅ 成本优化策略
- ✅ 安全实践(API Key 管理、输入输出验证)
- ✅ 速率限制
- ✅ 监控告警
- ✅ 生产级客户端实现
教程总结¶
恭喜你完成了大模型集成教程!你已掌握:
- 主流模型:OpenAI、DeepSeek、Claude、通义千问、智谱、Kimi
- API 调用:基本对话、流式输出、函数调用
- 多模态:图片理解、视频分析
- 进阶技巧:异步调用、错误处理、Token 管理
- 生产实践:成本优化、安全、监控
继续学习: - Prompt Engineering - 提示词工程 - RAG 系统 - 检索增强生成 - vLLM 部署 - 本地模型部署