跳转至

第七章:检索评估

评估指标

召回率 (Recall)

召回率衡量检索系统找到相关文档的能力。

Recall@K = 检索到的相关文档数 / 总相关文档数
def recall_at_k(retrieved_docs, relevant_docs, k):
    """计算 Recall@K"""
    retrieved_set = set(retrieved_docs[:k])
    relevant_set = set(relevant_docs)

    if len(relevant_set) == 0:
        return 0.0

    return len(retrieved_set & relevant_set) / len(relevant_set)

精确率 (Precision)

精确率衡量检索结果中相关文档的比例。

Precision@K = 检索到的相关文档数 / K
def precision_at_k(retrieved_docs, relevant_docs, k):
    """计算 Precision@K"""
    retrieved_set = set(retrieved_docs[:k])
    relevant_set = set(relevant_docs)

    return len(retrieved_set & relevant_set) / k

MRR (Mean Reciprocal Rank)

衡量第一个相关文档的排名位置。

MRR = 1 / rank(第一个相关文档)
def mrr(retrieved_docs, relevant_docs):
    """计算 MRR"""
    relevant_set = set(relevant_docs)

    for rank, doc in enumerate(retrieved_docs, 1):
        if doc in relevant_set:
            return 1.0 / rank

    return 0.0

NDCG (Normalized Discounted Cumulative Gain)

考虑文档相关性和位置的评估指标。

import numpy as np

def dcg_at_k(relevances, k):
    """计算 DCG@K"""
    relevances = relevances[:k]
    gains = (2 ** np.array(relevances) - 1) / np.log2(np.arange(2, len(relevances) + 2))
    return np.sum(gains)

def ndcg_at_k(retrieved_docs, relevance_scores, k):
    """计算 NDCG@K"""
    # 获取检索结果的相关性分数
    relevances = [relevance_scores.get(doc, 0) for doc in retrieved_docs[:k]]

    # 计算 DCG
    dcg = dcg_at_k(relevances, k)

    # 计算 IDCG(理想情况)
    ideal_relevances = sorted(relevance_scores.values(), reverse=True)[:k]
    idcg = dcg_at_k(ideal_relevances, k)

    if idcg == 0:
        return 0.0

    return dcg / idcg

MAP (Mean Average Precision)

def average_precision(retrieved_docs, relevant_docs):
    """计算 Average Precision"""
    relevant_set = set(relevant_docs)
    precisions = []

    for i, doc in enumerate(retrieved_docs, 1):
        if doc in relevant_set:
            precisions.append(len(relevant_set & set(retrieved_docs[:i])) / i)

    if not precisions:
        return 0.0

    return np.mean(precisions)

def mean_average_precision(all_retrieved, all_relevant):
    """计算 MAP"""
    aps = [
        average_precision(retrieved, relevant)
        for retrieved, relevant in zip(all_retrieved, all_relevant)
    ]
    return np.mean(aps)

评估数据集

构建评估数据集

from dataclasses import dataclass
from typing import List, Dict

@dataclass
class EvaluationQuery:
    query_id: str
    query: str
    relevant_docs: List[str]  # 相关文档 ID 列表
    relevance_scores: Dict[str, int]  # 文档 ID -> 相关性分数 (0-3)

# 示例评估数据
eval_dataset = [
    EvaluationQuery(
        query_id="q1",
        query="Python 编程入门",
        relevant_docs=["doc1", "doc2", "doc5"],
        relevance_scores={"doc1": 3, "doc2": 2, "doc5": 1}
    ),
    EvaluationQuery(
        query_id="q2",
        query="机器学习算法",
        relevant_docs=["doc3", "doc4"],
        relevance_scores={"doc3": 3, "doc4": 2}
    ),
]

使用 RAGAS 评估

from ragas import evaluate
from ragas.metrics import context_recall, context_precision
from datasets import Dataset

# 准备评估数据
eval_data = {
    'question': ['Python是什么?', '机器学习是什么?'],
    'contexts': [
        ['Python是一种编程语言', 'Python广泛用于数据科学'],
        ['机器学习是AI的分支', '深度学习使用神经网络']
    ],
    'ground_truth': ['Python是一种编程语言', '机器学习是人工智能的分支']
}

dataset = Dataset.from_dict(eval_data)

# 评估
results = evaluate(
    dataset,
    metrics=[context_recall, context_precision]
)

print(results)

检索器对比评估

完整评估流程

from typing import List, Dict, Callable
import numpy as np

class RetrievalEvaluator:
    def __init__(self, eval_dataset: List[EvaluationQuery]):
        self.eval_dataset = eval_dataset

    def evaluate_retriever(
        self,
        retriever: Callable,
        k_values: List[int] = [1, 3, 5, 10]
    ) -> Dict:
        """评估检索器"""
        results = {f'recall@{k}': [] for k in k_values}
        results.update({f'precision@{k}': [] for k in k_values})
        results.update({f'ndcg@{k}': [] for k in k_values})
        results['mrr'] = []

        for query_data in self.eval_dataset:
            # 执行检索
            retrieved = retriever(query_data.query)
            retrieved_ids = [doc.metadata.get('id') for doc in retrieved]

            # 计算指标
            for k in k_values:
                results[f'recall@{k}'].append(
                    recall_at_k(retrieved_ids, query_data.relevant_docs, k)
                )
                results[f'precision@{k}'].append(
                    precision_at_k(retrieved_ids, query_data.relevant_docs, k)
                )
                results[f'ndcg@{k}'].append(
                    ndcg_at_k(retrieved_ids, query_data.relevance_scores, k)
                )

            results['mrr'].append(mrr(retrieved_ids, query_data.relevant_docs))

        # 计算平均值
        return {k: np.mean(v) for k, v in results.items()}

    def compare_retrievers(
        self,
        retrievers: Dict[str, Callable],
        k_values: List[int] = [1, 3, 5, 10]
    ) -> Dict:
        """对比多个检索器"""
        comparison = {}

        for name, retriever in retrievers.items():
            comparison[name] = self.evaluate_retriever(retriever, k_values)

        return comparison

# 使用示例
evaluator = RetrievalEvaluator(eval_dataset)

# 对比不同检索器
retrievers = {
    'bm25': bm25_retriever.invoke,
    'vector': vector_retriever.invoke,
    'hybrid': hybrid_retriever.invoke,
    'reranked': reranked_retriever.invoke,
}

results = evaluator.compare_retrievers(retrievers)

# 打印结果
for name, metrics in results.items():
    print(f"\n{name}:")
    for metric, value in metrics.items():
        print(f"  {metric}: {value:.4f}")

A/B 测试

在线 A/B 测试

import random
from datetime import datetime

class ABTestFramework:
    def __init__(self):
        self.results = {'A': [], 'B': []}

    def assign_group(self, user_id: str) -> str:
        """分配用户到测试组"""
        random.seed(user_id)
        return 'A' if random.random() < 0.5 else 'B'

    def log_interaction(
        self,
        group: str,
        query: str,
        retrieved_docs: List,
        user_feedback: int  # 1=满意, 0=不满意
    ):
        """记录用户交互"""
        self.results[group].append({
            'timestamp': datetime.now(),
            'query': query,
            'retrieved_count': len(retrieved_docs),
            'feedback': user_feedback
        })

    def analyze(self) -> Dict:
        """分析 A/B 测试结果"""
        analysis = {}

        for group in ['A', 'B']:
            data = self.results[group]
            if not data:
                continue

            total = len(data)
            satisfied = sum(1 for d in data if d['feedback'] == 1)

            analysis[group] = {
                'total_queries': total,
                'satisfaction_rate': satisfied / total if total > 0 else 0,
                'avg_retrieved': sum(d['retrieved_count'] for d in data) / total
            }

        return analysis

# 使用示例
ab_test = ABTestFramework()

# 用户查询
user_id = "user123"
group = ab_test.assign_group(user_id)

# 根据组选择检索器
retriever = retriever_a if group == 'A' else retriever_b
results = retriever.invoke(query)

# 记录用户反馈
ab_test.log_interaction(group, query, results, user_feedback=1)

# 分析结果
print(ab_test.analyze())

评估报告

生成评估报告

import pandas as pd
from typing import Dict

def generate_evaluation_report(
    comparison_results: Dict[str, Dict],
    output_path: str = None
) -> str:
    """生成评估报告"""

    # 创建 DataFrame
    df = pd.DataFrame(comparison_results).T

    # 格式化
    report = "# 检索系统评估报告\n\n"
    report += "## 评估指标对比\n\n"
    report += df.to_markdown(floatfmt=".4f")

    # 找出最佳检索器
    report += "\n\n## 最佳检索器\n\n"
    for metric in df.columns:
        best = df[metric].idxmax()
        report += f"- **{metric}**: {best} ({df[metric][best]:.4f})\n"

    # 建议
    report += "\n\n## 建议\n\n"
    best_overall = df.mean(axis=1).idxmax()
    report += f"综合表现最佳的是 **{best_overall}** 检索器。\n"

    if output_path:
        with open(output_path, 'w') as f:
            f.write(report)

    return report

# 使用示例
report = generate_evaluation_report(results, "evaluation_report.md")
print(report)

持续评估

自动化评估流程

import schedule
import time

class ContinuousEvaluation:
    def __init__(self, retriever, eval_dataset, alert_threshold=0.7):
        self.retriever = retriever
        self.eval_dataset = eval_dataset
        self.alert_threshold = alert_threshold
        self.history = []

    def run_evaluation(self):
        """运行评估"""
        evaluator = RetrievalEvaluator(self.eval_dataset)
        results = evaluator.evaluate_retriever(self.retriever.invoke)

        # 记录历史
        self.history.append({
            'timestamp': datetime.now(),
            'results': results
        })

        # 检查是否需要告警
        if results['recall@5'] < self.alert_threshold:
            self.send_alert(results)

        return results

    def send_alert(self, results):
        """发送告警"""
        print(f"⚠️ 告警: Recall@5 低于阈值: {results['recall@5']:.4f}")

    def start_scheduled_evaluation(self, interval_hours=24):
        """启动定时评估"""
        schedule.every(interval_hours).hours.do(self.run_evaluation)

        while True:
            schedule.run_pending()
            time.sleep(3600)

# 使用示例
continuous_eval = ContinuousEvaluation(hybrid_retriever, eval_dataset)
results = continuous_eval.run_evaluation()

小结

本章学习了:

  • ✅ 检索评估指标(Recall、Precision、MRR、NDCG、MAP)
  • ✅ 评估数据集构建
  • ✅ 检索器对比评估
  • ✅ A/B 测试框架
  • ✅ 评估报告生成
  • ✅ 持续评估机制

下一章

第八章:生产实践 - 学习检索系统的生产部署最佳实践。