第七章:多查询融合¶
什么是多查询融合¶
多查询融合(Multi-Query Fusion)是将原始查询扩展为多个相关查询,分别检索后合并结果的技术。这种方法可以:
- 提高召回率:覆盖更多可能的匹配
- 降低漏检:弥补单一查询的不足
- 提升鲁棒性:减少查询表述偏差的影响
多查询生成方法¶
1. LLM 生成多查询¶
from openai import OpenAI
client = OpenAI()
def generate_multi_queries(query: str, n: int = 3) -> list[str]:
"""使用 LLM 生成多个相关查询"""
prompt = f"""请为以下问题生成 {n} 个语义相近但表述不同的查询变体。
这些变体将用于检索相关文档。
原问题:{query}
请以 JSON 数组格式返回,例如:["变体1", "变体2", "变体3"]"""
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0.7
)
return json.loads(response.choices[0].message.content)
# 示例
query = "Python 异步编程最佳实践"
variants = generate_multi_queries(query)
# ['Python asyncio 使用技巧', 'Python 异步编程指南', 'asyncio 最佳实践总结']
2. 模板化多查询¶
QUERY_TEMPLATES = [
"{query} 教程",
"{query} 示例",
"如何实现 {query}",
"{query} 最佳实践",
"{query} 解决方案"
]
def template_multi_queries(query: str) -> list[str]:
"""使用模板生成多查询"""
return [t.format(query=query) for t in QUERY_TEMPLATES]
3. 同义词替换多查询¶
def synonym_multi_queries(query: str, synonym_dict: dict) -> list[str]:
"""基于同义词生成多查询"""
variants = [query]
for word in query.split():
if word in synonym_dict:
for syn in synonym_dict[word][:2]:
variants.append(query.replace(word, syn))
return list(set(variants)) # 去重
结果融合策略¶
1. Reciprocal Rank Fusion (RRF)¶
最常用的融合方法,基于排名的倒数:
def reciprocal_rank_fusion(
results_list: list[list[str]],
k: int = 60,
top_k: int = 10
) -> list[str]:
"""RRF 融合算法"""
scores = {}
for results in results_list:
for rank, doc in enumerate(results):
if doc not in scores:
scores[doc] = 0
# RRF 分数 = 1 / (k + rank)
scores[doc] += 1 / (k + rank + 1)
# 按分数排序
sorted_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [doc for doc, score in sorted_docs[:top_k]]
# 示例
results1 = ['doc1', 'doc2', 'doc3', 'doc4']
results2 = ['doc2', 'doc1', 'doc5', 'doc6']
results3 = ['doc3', 'doc2', 'doc1', 'doc7']
fused = reciprocal_rank_fusion([results1, results2, results3])
# ['doc2', 'doc1', 'doc3', ...] # doc2 在多个列表中排名靠前
2. 加权分数融合¶
def weighted_score_fusion(
results_with_scores: list[list[tuple[str, float]]],
weights: list[float] = None,
top_k: int = 10
) -> list[str]:
"""加权分数融合"""
if weights is None:
weights = [1.0 / len(results_with_scores)] * len(results_with_scores)
combined_scores = {}
for results, weight in zip(results_with_scores, weights):
for doc, score in results:
if doc not in combined_scores:
combined_scores[doc] = 0
combined_scores[doc] += score * weight
sorted_docs = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
return [doc for doc, _ in sorted_docs[:top_k]]
3. 投票融合¶
def voting_fusion(results_list: list[list[str]], top_k: int = 10) -> list[str]:
"""投票融合"""
votes = {}
for results in results_list:
for doc in results[:top_k]: # 只考虑前 top_k
votes[doc] = votes.get(doc, 0) + 1
sorted_docs = sorted(votes.items(), key=lambda x: x[1], reverse=True)
return [doc for doc, _ in sorted_docs[:top_k]]
完整的多查询检索系统¶
class MultiQueryRetriever:
def __init__(self, vector_store, llm_client):
self.vector_store = vector_store
self.llm = llm_client
def retrieve(self, query: str, top_k: int = 5) -> list[str]:
"""多查询检索"""
# 1. 生成查询变体
queries = self.generate_queries(query)
# 2. 并行检索
all_results = []
for q in queries:
results = self.vector_store.search(q, top_k * 2)
all_results.append(results)
# 3. 融合结果
return self.fuse_results(all_results, top_k)
def generate_queries(self, query: str) -> list[str]:
"""生成查询变体"""
# 原查询 + LLM 生成 + 模板扩展
queries = [query]
queries.extend(self.llm_generate_queries(query))
queries.extend(self.template_queries(query))
return list(set(queries))[:5] # 去重,最多5个
def fuse_results(self, results_list: list, top_k: int) -> list[str]:
"""融合检索结果"""
return reciprocal_rank_fusion(results_list, top_k=top_k)
高级融合技术¶
1. 自适应权重¶
根据查询质量动态调整权重:
def adaptive_weight_fusion(
query: str,
queries: list[str],
results_list: list[list[str]],
top_k: int = 10
) -> list[str]:
"""自适应权重融合"""
# 计算每个查询与原查询的相似度作为权重
query_emb = model.encode([query])
weights = []
for q in queries:
q_emb = model.encode([q])
similarity = np.dot(query_emb, q_emb.T)[0][0]
weights.append(similarity)
# 归一化权重
weights = np.array(weights) / sum(weights)
return weighted_score_fusion(
[convert_to_scored(r) for r in results_list],
weights=weights,
top_k=top_k
)
2. 多模态融合¶
结合向量检索和关键词检索:
def hybrid_multi_query(query: str, top_k: int = 5):
"""混合检索多查询"""
queries = generate_multi_queries(query)
all_results = []
for q in queries:
# 向量检索
vec_results = vector_search(q, top_k)
# 关键词检索
kw_results = keyword_search(q, top_k)
all_results.extend([vec_results, kw_results])
return reciprocal_rank_fusion(all_results, top_k=top_k)
3. 分层融合¶
先融合同类结果,再跨类融合:
def hierarchical_fusion(
query: str,
semantic_results: list[list[str]],
keyword_results: list[list[str]],
top_k: int = 10
) -> list[str]:
"""分层融合"""
# 第一层:语义结果融合
semantic_fused = reciprocal_rank_fusion(semantic_results, top_k=top_k*2)
# 第一层:关键词结果融合
keyword_fused = reciprocal_rank_fusion(keyword_results, top_k=top_k*2)
# 第二层:跨类型融合
final_fused = reciprocal_rank_fusion(
[semantic_fused, keyword_fused],
top_k=top_k
)
return final_fused
性能优化¶
1. 并行检索¶
import asyncio
async def parallel_multi_query(queries: list[str], top_k: int = 5):
"""并行执行多查询检索"""
async def search_one(q):
return await async_vector_search(q, top_k)
tasks = [search_one(q) for q in queries]
results = await asyncio.gather(*tasks)
return reciprocal_rank_fusion(results, top_k=top_k)
2. 查询缓存¶
from functools import lru_cache
class CachedMultiQueryRetriever:
def __init__(self):
self.cache = {}
@lru_cache(maxsize=1000)
def generate_queries_cached(self, query: str) -> tuple:
"""缓存查询生成结果"""
return tuple(generate_multi_queries(query))
def retrieve(self, query: str, top_k: int = 5):
queries = self.generate_queries_cached(query)
# ... 检索逻辑
3. 早期停止¶
def early_stop_multi_query(
query: str,
top_k: int = 5,
min_overlap: float = 0.8
):
"""早期停止的多查询检索"""
queries = generate_multi_queries(query)
all_results = []
for q in queries:
results = vector_search(q, top_k)
all_results.append(results)
# 检查结果稳定性
if len(all_results) >= 2:
fused = reciprocal_rank_fusion(all_results, top_k)
prev_fused = reciprocal_rank_fusion(all_results[:-1], top_k)
overlap = len(set(fused) & set(prev_fused)) / top_k
if overlap >= min_overlap:
break # 结果已稳定,停止生成更多查询
return fused
实战案例¶
RAG 系统中的多查询¶
class MultiQueryRAG:
def __init__(self):
self.retriever = MultiQueryRetriever()
self.llm = OpenAI()
def answer(self, question: str) -> str:
# 1. 多查询检索
contexts = self.retriever.retrieve(question, top_k=5)
# 2. 构建提示词
prompt = f"""基于以下参考内容回答问题:
参考内容:
{chr(10).join(f'- {c}' for c in contexts)}
问题:{question}
请给出详细、准确的答案:"""
# 3. 生成答案
response = self.llm.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
小结¶
多查询融合是提升检索效果的重要技术:
- 核心思想:多角度查询 + 结果融合
- 融合方法:RRF、加权融合、投票融合
- 优化方向:并行执行、自适应权重、早期停止
下一章我们将通过实战案例综合应用所学技术。