第七章:检索评估¶
评估指标¶
召回率 (Recall)¶
召回率衡量检索系统找到相关文档的能力。
def recall_at_k(retrieved_docs, relevant_docs, k):
"""计算 Recall@K"""
retrieved_set = set(retrieved_docs[:k])
relevant_set = set(relevant_docs)
if len(relevant_set) == 0:
return 0.0
return len(retrieved_set & relevant_set) / len(relevant_set)
精确率 (Precision)¶
精确率衡量检索结果中相关文档的比例。
def precision_at_k(retrieved_docs, relevant_docs, k):
"""计算 Precision@K"""
retrieved_set = set(retrieved_docs[:k])
relevant_set = set(relevant_docs)
return len(retrieved_set & relevant_set) / k
MRR (Mean Reciprocal Rank)¶
衡量第一个相关文档的排名位置。
def mrr(retrieved_docs, relevant_docs):
"""计算 MRR"""
relevant_set = set(relevant_docs)
for rank, doc in enumerate(retrieved_docs, 1):
if doc in relevant_set:
return 1.0 / rank
return 0.0
NDCG (Normalized Discounted Cumulative Gain)¶
考虑文档相关性和位置的评估指标。
import numpy as np
def dcg_at_k(relevances, k):
"""计算 DCG@K"""
relevances = relevances[:k]
gains = (2 ** np.array(relevances) - 1) / np.log2(np.arange(2, len(relevances) + 2))
return np.sum(gains)
def ndcg_at_k(retrieved_docs, relevance_scores, k):
"""计算 NDCG@K"""
# 获取检索结果的相关性分数
relevances = [relevance_scores.get(doc, 0) for doc in retrieved_docs[:k]]
# 计算 DCG
dcg = dcg_at_k(relevances, k)
# 计算 IDCG(理想情况)
ideal_relevances = sorted(relevance_scores.values(), reverse=True)[:k]
idcg = dcg_at_k(ideal_relevances, k)
if idcg == 0:
return 0.0
return dcg / idcg
MAP (Mean Average Precision)¶
def average_precision(retrieved_docs, relevant_docs):
"""计算 Average Precision"""
relevant_set = set(relevant_docs)
precisions = []
for i, doc in enumerate(retrieved_docs, 1):
if doc in relevant_set:
precisions.append(len(relevant_set & set(retrieved_docs[:i])) / i)
if not precisions:
return 0.0
return np.mean(precisions)
def mean_average_precision(all_retrieved, all_relevant):
"""计算 MAP"""
aps = [
average_precision(retrieved, relevant)
for retrieved, relevant in zip(all_retrieved, all_relevant)
]
return np.mean(aps)
评估数据集¶
构建评估数据集¶
from dataclasses import dataclass
from typing import List, Dict
@dataclass
class EvaluationQuery:
query_id: str
query: str
relevant_docs: List[str] # 相关文档 ID 列表
relevance_scores: Dict[str, int] # 文档 ID -> 相关性分数 (0-3)
# 示例评估数据
eval_dataset = [
EvaluationQuery(
query_id="q1",
query="Python 编程入门",
relevant_docs=["doc1", "doc2", "doc5"],
relevance_scores={"doc1": 3, "doc2": 2, "doc5": 1}
),
EvaluationQuery(
query_id="q2",
query="机器学习算法",
relevant_docs=["doc3", "doc4"],
relevance_scores={"doc3": 3, "doc4": 2}
),
]
使用 RAGAS 评估¶
from ragas import evaluate
from ragas.metrics import context_recall, context_precision
from datasets import Dataset
# 准备评估数据
eval_data = {
'question': ['Python是什么?', '机器学习是什么?'],
'contexts': [
['Python是一种编程语言', 'Python广泛用于数据科学'],
['机器学习是AI的分支', '深度学习使用神经网络']
],
'ground_truth': ['Python是一种编程语言', '机器学习是人工智能的分支']
}
dataset = Dataset.from_dict(eval_data)
# 评估
results = evaluate(
dataset,
metrics=[context_recall, context_precision]
)
print(results)
检索器对比评估¶
完整评估流程¶
from typing import List, Dict, Callable
import numpy as np
class RetrievalEvaluator:
def __init__(self, eval_dataset: List[EvaluationQuery]):
self.eval_dataset = eval_dataset
def evaluate_retriever(
self,
retriever: Callable,
k_values: List[int] = [1, 3, 5, 10]
) -> Dict:
"""评估检索器"""
results = {f'recall@{k}': [] for k in k_values}
results.update({f'precision@{k}': [] for k in k_values})
results.update({f'ndcg@{k}': [] for k in k_values})
results['mrr'] = []
for query_data in self.eval_dataset:
# 执行检索
retrieved = retriever(query_data.query)
retrieved_ids = [doc.metadata.get('id') for doc in retrieved]
# 计算指标
for k in k_values:
results[f'recall@{k}'].append(
recall_at_k(retrieved_ids, query_data.relevant_docs, k)
)
results[f'precision@{k}'].append(
precision_at_k(retrieved_ids, query_data.relevant_docs, k)
)
results[f'ndcg@{k}'].append(
ndcg_at_k(retrieved_ids, query_data.relevance_scores, k)
)
results['mrr'].append(mrr(retrieved_ids, query_data.relevant_docs))
# 计算平均值
return {k: np.mean(v) for k, v in results.items()}
def compare_retrievers(
self,
retrievers: Dict[str, Callable],
k_values: List[int] = [1, 3, 5, 10]
) -> Dict:
"""对比多个检索器"""
comparison = {}
for name, retriever in retrievers.items():
comparison[name] = self.evaluate_retriever(retriever, k_values)
return comparison
# 使用示例
evaluator = RetrievalEvaluator(eval_dataset)
# 对比不同检索器
retrievers = {
'bm25': bm25_retriever.invoke,
'vector': vector_retriever.invoke,
'hybrid': hybrid_retriever.invoke,
'reranked': reranked_retriever.invoke,
}
results = evaluator.compare_retrievers(retrievers)
# 打印结果
for name, metrics in results.items():
print(f"\n{name}:")
for metric, value in metrics.items():
print(f" {metric}: {value:.4f}")
A/B 测试¶
在线 A/B 测试¶
import random
from datetime import datetime
class ABTestFramework:
def __init__(self):
self.results = {'A': [], 'B': []}
def assign_group(self, user_id: str) -> str:
"""分配用户到测试组"""
random.seed(user_id)
return 'A' if random.random() < 0.5 else 'B'
def log_interaction(
self,
group: str,
query: str,
retrieved_docs: List,
user_feedback: int # 1=满意, 0=不满意
):
"""记录用户交互"""
self.results[group].append({
'timestamp': datetime.now(),
'query': query,
'retrieved_count': len(retrieved_docs),
'feedback': user_feedback
})
def analyze(self) -> Dict:
"""分析 A/B 测试结果"""
analysis = {}
for group in ['A', 'B']:
data = self.results[group]
if not data:
continue
total = len(data)
satisfied = sum(1 for d in data if d['feedback'] == 1)
analysis[group] = {
'total_queries': total,
'satisfaction_rate': satisfied / total if total > 0 else 0,
'avg_retrieved': sum(d['retrieved_count'] for d in data) / total
}
return analysis
# 使用示例
ab_test = ABTestFramework()
# 用户查询
user_id = "user123"
group = ab_test.assign_group(user_id)
# 根据组选择检索器
retriever = retriever_a if group == 'A' else retriever_b
results = retriever.invoke(query)
# 记录用户反馈
ab_test.log_interaction(group, query, results, user_feedback=1)
# 分析结果
print(ab_test.analyze())
评估报告¶
生成评估报告¶
import pandas as pd
from typing import Dict
def generate_evaluation_report(
comparison_results: Dict[str, Dict],
output_path: str = None
) -> str:
"""生成评估报告"""
# 创建 DataFrame
df = pd.DataFrame(comparison_results).T
# 格式化
report = "# 检索系统评估报告\n\n"
report += "## 评估指标对比\n\n"
report += df.to_markdown(floatfmt=".4f")
# 找出最佳检索器
report += "\n\n## 最佳检索器\n\n"
for metric in df.columns:
best = df[metric].idxmax()
report += f"- **{metric}**: {best} ({df[metric][best]:.4f})\n"
# 建议
report += "\n\n## 建议\n\n"
best_overall = df.mean(axis=1).idxmax()
report += f"综合表现最佳的是 **{best_overall}** 检索器。\n"
if output_path:
with open(output_path, 'w') as f:
f.write(report)
return report
# 使用示例
report = generate_evaluation_report(results, "evaluation_report.md")
print(report)
持续评估¶
自动化评估流程¶
import schedule
import time
class ContinuousEvaluation:
def __init__(self, retriever, eval_dataset, alert_threshold=0.7):
self.retriever = retriever
self.eval_dataset = eval_dataset
self.alert_threshold = alert_threshold
self.history = []
def run_evaluation(self):
"""运行评估"""
evaluator = RetrievalEvaluator(self.eval_dataset)
results = evaluator.evaluate_retriever(self.retriever.invoke)
# 记录历史
self.history.append({
'timestamp': datetime.now(),
'results': results
})
# 检查是否需要告警
if results['recall@5'] < self.alert_threshold:
self.send_alert(results)
return results
def send_alert(self, results):
"""发送告警"""
print(f"⚠️ 告警: Recall@5 低于阈值: {results['recall@5']:.4f}")
def start_scheduled_evaluation(self, interval_hours=24):
"""启动定时评估"""
schedule.every(interval_hours).hours.do(self.run_evaluation)
while True:
schedule.run_pending()
time.sleep(3600)
# 使用示例
continuous_eval = ContinuousEvaluation(hybrid_retriever, eval_dataset)
results = continuous_eval.run_evaluation()
小结¶
本章学习了:
- ✅ 检索评估指标(Recall、Precision、MRR、NDCG、MAP)
- ✅ 评估数据集构建
- ✅ 检索器对比评估
- ✅ A/B 测试框架
- ✅ 评估报告生成
- ✅ 持续评估机制
下一章¶
第八章:生产实践 - 学习检索系统的生产部署最佳实践。