跳转至

第十章:最佳实践

成本优化

选择合适的模型

场景 推荐模型 理由
简单问答 gpt-5.4-nano / DeepSeek Chat 最便宜
代码生成 gpt-5.4-mini / DeepSeek Chat 性价比高
复杂推理 gpt-5.4 / Claude Opus 能力最强
长文档 Kimi / Claude 支持长上下文
测试开发 glm-4-flash 免费

使用 Prompt Caching

# OpenAI / DeepSeek 支持缓存
# 系统提示词会被自动缓存

system_prompt = """
你是一个专业的代码审查助手。
请按照以下标准审查代码:
1. 代码风格
2. 性能优化
3. 安全漏洞
4. 可维护性
...(很长的提示词)
"""

# 第一次请求:缓存写入
response = client.chat.completions.create(
    model="gpt-5.4-mini",
    messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": "审查这段代码..."}
    ]
)

# 后续请求:缓存命中,费用降低 90%

使用 Batch API

# 批量处理可节省 50%
import asyncio
from openai import AsyncOpenAI

async def batch_process(items: list):
    """批量处理"""
    client = AsyncOpenAI(api_key="sk-xxx")

    tasks = [
        client.chat.completions.create(
            model="gpt-5.4-nano",
            messages=[{"role": "user", "content": item}]
        )
        for item in items
    ]

    return await asyncio.gather(*tasks)

控制输出长度

# 限制输出 token 数
response = client.chat.completions.create(
    model="gpt-5.4-mini",
    messages=[{"role": "user", "content": "简短介绍 Python"}],
    max_tokens=100  # 限制输出
)

费用监控

import time
from dataclasses import dataclass

@dataclass
class UsageStats:
    total_input_tokens: int = 0
    total_output_tokens: int = 0
    total_cost: float = 0.0
    request_count: int = 0

class CostTracker:
    """费用追踪器"""

    PRICES = {
        "gpt-5.4": {"input": 2.50, "output": 15.00},
        "gpt-5.4-mini": {"input": 0.75, "output": 4.50},
        "gpt-5.4-nano": {"input": 0.20, "output": 1.25},
        "deepseek-chat": {"input": 0.14, "output": 0.28},  # 人民币转美元
    }

    def __init__(self):
        self.stats = UsageStats()

    def track(self, response, model: str):
        """追踪使用量"""
        usage = response.usage
        price = self.PRICES.get(model, self.PRICES["gpt-5.4-mini"])

        input_cost = (usage.prompt_tokens / 1_000_000) * price["input"]
        output_cost = (usage.completion_tokens / 1_000_000) * price["output"]
        cost = input_cost + output_cost

        self.stats.total_input_tokens += usage.prompt_tokens
        self.stats.total_output_tokens += usage.completion_tokens
        self.stats.total_cost += cost
        self.stats.request_count += 1

        return cost

    def report(self):
        """生成报告"""
        return f"""
使用统计:
- 请求数: {self.stats.request_count}
- 输入 Tokens: {self.stats.total_input_tokens:,}
- 输出 Tokens: {self.stats.total_output_tokens:,}
- 总费用: ${self.stats.total_cost:.4f}
"""

# 使用
tracker = CostTracker()

response = client.chat.completions.create(...)
cost = tracker.track(response, "gpt-5.4-mini")
print(f"本次费用: ${cost:.6f}")

print(tracker.report())

安全实践

API Key 管理

import os
from dotenv import load_dotenv

# 从环境变量加载
load_dotenv()

api_key = os.getenv("OPENAI_API_KEY")

# 不要硬编码 API Key!
# ❌ api_key = "sk-xxx"  # 危险!

输入验证

def validate_input(text: str, max_length: int = 10000) -> str:
    """验证用户输入"""
    # 长度限制
    if len(text) > max_length:
        raise ValueError(f"输入过长,最大 {max_length} 字符")

    # 敏感词过滤
    sensitive_words = ["password", "secret", "token"]
    for word in sensitive_words:
        if word in text.lower():
            raise ValueError(f"输入包含敏感词: {word}")

    return text.strip()

输出过滤

import re

def sanitize_output(text: str) -> str:
    """过滤输出中的敏感信息"""
    # 过滤可能的 API Key
    text = re.sub(r'sk-[a-zA-Z0-9]{20,}', '[REDACTED]', text)

    # 过滤邮箱
    text = re.sub(r'[\w\.-]+@[\w\.-]+\.\w+', '[EMAIL]', text)

    # 过滤手机号
    text = re.sub(r'1[3-9]\d{9}', '[PHONE]', text)

    return text

速率限制

import time
from collections import deque

class RateLimiter:
    """速率限制器"""

    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests = deque()

    def wait_if_needed(self):
        """如果需要则等待"""
        now = time.time()

        # 清理过期记录
        while self.requests and self.requests[0] < now - self.window:
            self.requests.popleft()

        # 检查是否超限
        if len(self.requests) >= self.max_requests:
            wait_time = self.requests[0] + self.window - now
            if wait_time > 0:
                time.sleep(wait_time)

        self.requests.append(now)

# 使用:每分钟最多 60 个请求
limiter = RateLimiter(max_requests=60, window_seconds=60)

for prompt in prompts:
    limiter.wait_if_needed()
    response = client.chat.completions.create(...)

监控告警

日志记录

import logging
import json
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='llm_api.log'
)

def log_request(model: str, prompt: str, response, latency: float):
    """记录请求日志"""
    log_data = {
        "timestamp": datetime.now().isoformat(),
        "model": model,
        "prompt_length": len(prompt),
        "input_tokens": response.usage.prompt_tokens,
        "output_tokens": response.usage.completion_tokens,
        "latency_ms": round(latency * 1000, 2),
        "success": True
    }
    logging.info(json.dumps(log_data))

def log_error(model: str, prompt: str, error: Exception):
    """记录错误日志"""
    log_data = {
        "timestamp": datetime.now().isoformat(),
        "model": model,
        "prompt_length": len(prompt),
        "error": str(error),
        "success": False
    }
    logging.error(json.dumps(log_data))

性能监控

import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class Metrics:
    latency: float
    input_tokens: int
    output_tokens: int
    model: str
    success: bool
    error: Optional[str] = None

class PerformanceMonitor:
    """性能监控器"""

    def __init__(self):
        self.metrics = []

    def record(self, metrics: Metrics):
        """记录指标"""
        self.metrics.append(metrics)

    def get_stats(self, minutes: int = 5):
        """获取统计信息"""
        cutoff = time.time() - minutes * 60
        recent = [m for m in self.metrics if m.latency > 0]

        if not recent:
            return None

        latencies = [m.latency for m in recent]
        success_rate = sum(1 for m in recent if m.success) / len(recent)

        return {
            "avg_latency": sum(latencies) / len(latencies),
            "max_latency": max(latencies),
            "min_latency": min(latencies),
            "success_rate": success_rate,
            "request_count": len(recent)
        }

告警通知

def check_and_alert(monitor: PerformanceMonitor):
    """检查并发送告警"""
    stats = monitor.get_stats(minutes=5)

    if not stats:
        return

    alerts = []

    # 成功率告警
    if stats["success_rate"] < 0.95:
        alerts.append(f"⚠️ 成功率低于 95%: {stats['success_rate']:.1%}")

    # 延迟告警
    if stats["avg_latency"] > 5:
        alerts.append(f"⚠️ 平均延迟过高: {stats['avg_latency']:.2f}s")

    if alerts:
        send_alert("\n".join(alerts))

def send_alert(message: str):
    """发送告警(可对接钉钉、企业微信等)"""
    print(f"[ALERT] {message}")
    # 实际实现:调用 webhook 发送消息

完整示例

import os
import time
import logging
from openai import OpenAI, RateLimitError, APIError
from dotenv import load_dotenv

load_dotenv()

class LLMClient:
    """生产级 LLM 客户端"""

    def __init__(
        self,
        model: str = "gpt-5.4-mini",
        max_retries: int = 3,
        rate_limit: tuple = (60, 60)  # 60 requests per 60 seconds
    ):
        self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
        self.model = model
        self.max_retries = max_retries
        self.rate_limiter = RateLimiter(*rate_limit)
        self.cost_tracker = CostTracker()
        self.monitor = PerformanceMonitor()

        logging.basicConfig(level=logging.INFO)

    def chat(self, prompt: str, **kwargs) -> str:
        """对话"""
        # 输入验证
        prompt = validate_input(prompt)

        # 速率限制
        self.rate_limiter.wait_if_needed()

        # 重试逻辑
        for attempt in range(self.max_retries):
            try:
                start_time = time.time()

                response = self.client.chat.completions.create(
                    model=self.model,
                    messages=[{"role": "user", "content": prompt}],
                    **kwargs
                )

                latency = time.time() - start_time

                # 记录
                self.cost_tracker.track(response, self.model)
                self.monitor.record(Metrics(
                    latency=latency,
                    input_tokens=response.usage.prompt_tokens,
                    output_tokens=response.usage.completion_tokens,
                    model=self.model,
                    success=True
                ))

                return sanitize_output(response.choices[0].message.content)

            except RateLimitError:
                if attempt < self.max_retries - 1:
                    time.sleep(2 ** attempt)
                else:
                    raise

            except APIError as e:
                self.monitor.record(Metrics(
                    latency=0,
                    input_tokens=0,
                    output_tokens=0,
                    model=self.model,
                    success=False,
                    error=str(e)
                ))
                if attempt < self.max_retries - 1:
                    time.sleep(1)
                else:
                    raise

        return None

# 使用
client = LLMClient(model="gpt-5.4-mini")

response = client.chat("介绍一下 Python")
print(response)

print(client.cost_tracker.report())

小结

本章学习了:

  • ✅ 成本优化策略
  • ✅ 安全实践(API Key 管理、输入输出验证)
  • ✅ 速率限制
  • ✅ 监控告警
  • ✅ 生产级客户端实现

教程总结

恭喜你完成了大模型集成教程!你已掌握:

  1. 主流模型:OpenAI、DeepSeek、Claude、通义千问、智谱、Kimi
  2. API 调用:基本对话、流式输出、函数调用
  3. 多模态:图片理解、视频分析
  4. 进阶技巧:异步调用、错误处理、Token 管理
  5. 生产实践:成本优化、安全、监控

继续学习: - Prompt Engineering - 提示词工程 - RAG 系统 - 检索增强生成 - vLLM 部署 - 本地模型部署