跳转至

第七章:多查询融合

什么是多查询融合

多查询融合(Multi-Query Fusion)是将原始查询扩展为多个相关查询,分别检索后合并结果的技术。这种方法可以:

  1. 提高召回率:覆盖更多可能的匹配
  2. 降低漏检:弥补单一查询的不足
  3. 提升鲁棒性:减少查询表述偏差的影响

多查询生成方法

1. LLM 生成多查询

from openai import OpenAI

client = OpenAI()

def generate_multi_queries(query: str, n: int = 3) -> list[str]:
    """使用 LLM 生成多个相关查询"""
    prompt = f"""请为以下问题生成 {n} 个语义相近但表述不同的查询变体。
这些变体将用于检索相关文档。

原问题:{query}

请以 JSON 数组格式返回,例如:["变体1", "变体2", "变体3"]"""

    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.7
    )

    return json.loads(response.choices[0].message.content)

# 示例
query = "Python 异步编程最佳实践"
variants = generate_multi_queries(query)
# ['Python asyncio 使用技巧', 'Python 异步编程指南', 'asyncio 最佳实践总结']

2. 模板化多查询

QUERY_TEMPLATES = [
    "{query} 教程",
    "{query} 示例",
    "如何实现 {query}",
    "{query} 最佳实践",
    "{query} 解决方案"
]

def template_multi_queries(query: str) -> list[str]:
    """使用模板生成多查询"""
    return [t.format(query=query) for t in QUERY_TEMPLATES]

3. 同义词替换多查询

def synonym_multi_queries(query: str, synonym_dict: dict) -> list[str]:
    """基于同义词生成多查询"""
    variants = [query]

    for word in query.split():
        if word in synonym_dict:
            for syn in synonym_dict[word][:2]:
                variants.append(query.replace(word, syn))

    return list(set(variants))  # 去重

结果融合策略

1. Reciprocal Rank Fusion (RRF)

最常用的融合方法,基于排名的倒数:

def reciprocal_rank_fusion(
    results_list: list[list[str]], 
    k: int = 60,
    top_k: int = 10
) -> list[str]:
    """RRF 融合算法"""
    scores = {}

    for results in results_list:
        for rank, doc in enumerate(results):
            if doc not in scores:
                scores[doc] = 0
            # RRF 分数 = 1 / (k + rank)
            scores[doc] += 1 / (k + rank + 1)

    # 按分数排序
    sorted_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    return [doc for doc, score in sorted_docs[:top_k]]

# 示例
results1 = ['doc1', 'doc2', 'doc3', 'doc4']
results2 = ['doc2', 'doc1', 'doc5', 'doc6']
results3 = ['doc3', 'doc2', 'doc1', 'doc7']

fused = reciprocal_rank_fusion([results1, results2, results3])
# ['doc2', 'doc1', 'doc3', ...]  # doc2 在多个列表中排名靠前

2. 加权分数融合

def weighted_score_fusion(
    results_with_scores: list[list[tuple[str, float]]],
    weights: list[float] = None,
    top_k: int = 10
) -> list[str]:
    """加权分数融合"""
    if weights is None:
        weights = [1.0 / len(results_with_scores)] * len(results_with_scores)

    combined_scores = {}

    for results, weight in zip(results_with_scores, weights):
        for doc, score in results:
            if doc not in combined_scores:
                combined_scores[doc] = 0
            combined_scores[doc] += score * weight

    sorted_docs = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
    return [doc for doc, _ in sorted_docs[:top_k]]

3. 投票融合

def voting_fusion(results_list: list[list[str]], top_k: int = 10) -> list[str]:
    """投票融合"""
    votes = {}

    for results in results_list:
        for doc in results[:top_k]:  # 只考虑前 top_k
            votes[doc] = votes.get(doc, 0) + 1

    sorted_docs = sorted(votes.items(), key=lambda x: x[1], reverse=True)
    return [doc for doc, _ in sorted_docs[:top_k]]

完整的多查询检索系统

class MultiQueryRetriever:
    def __init__(self, vector_store, llm_client):
        self.vector_store = vector_store
        self.llm = llm_client

    def retrieve(self, query: str, top_k: int = 5) -> list[str]:
        """多查询检索"""
        # 1. 生成查询变体
        queries = self.generate_queries(query)

        # 2. 并行检索
        all_results = []
        for q in queries:
            results = self.vector_store.search(q, top_k * 2)
            all_results.append(results)

        # 3. 融合结果
        return self.fuse_results(all_results, top_k)

    def generate_queries(self, query: str) -> list[str]:
        """生成查询变体"""
        # 原查询 + LLM 生成 + 模板扩展
        queries = [query]
        queries.extend(self.llm_generate_queries(query))
        queries.extend(self.template_queries(query))

        return list(set(queries))[:5]  # 去重,最多5个

    def fuse_results(self, results_list: list, top_k: int) -> list[str]:
        """融合检索结果"""
        return reciprocal_rank_fusion(results_list, top_k=top_k)

高级融合技术

1. 自适应权重

根据查询质量动态调整权重:

def adaptive_weight_fusion(
    query: str,
    queries: list[str],
    results_list: list[list[str]],
    top_k: int = 10
) -> list[str]:
    """自适应权重融合"""
    # 计算每个查询与原查询的相似度作为权重
    query_emb = model.encode([query])
    weights = []

    for q in queries:
        q_emb = model.encode([q])
        similarity = np.dot(query_emb, q_emb.T)[0][0]
        weights.append(similarity)

    # 归一化权重
    weights = np.array(weights) / sum(weights)

    return weighted_score_fusion(
        [convert_to_scored(r) for r in results_list],
        weights=weights,
        top_k=top_k
    )

2. 多模态融合

结合向量检索和关键词检索:

def hybrid_multi_query(query: str, top_k: int = 5):
    """混合检索多查询"""
    queries = generate_multi_queries(query)

    all_results = []
    for q in queries:
        # 向量检索
        vec_results = vector_search(q, top_k)
        # 关键词检索
        kw_results = keyword_search(q, top_k)

        all_results.extend([vec_results, kw_results])

    return reciprocal_rank_fusion(all_results, top_k=top_k)

3. 分层融合

先融合同类结果,再跨类融合:

def hierarchical_fusion(
    query: str,
    semantic_results: list[list[str]],
    keyword_results: list[list[str]],
    top_k: int = 10
) -> list[str]:
    """分层融合"""
    # 第一层:语义结果融合
    semantic_fused = reciprocal_rank_fusion(semantic_results, top_k=top_k*2)

    # 第一层:关键词结果融合
    keyword_fused = reciprocal_rank_fusion(keyword_results, top_k=top_k*2)

    # 第二层:跨类型融合
    final_fused = reciprocal_rank_fusion(
        [semantic_fused, keyword_fused], 
        top_k=top_k
    )

    return final_fused

性能优化

1. 并行检索

import asyncio

async def parallel_multi_query(queries: list[str], top_k: int = 5):
    """并行执行多查询检索"""
    async def search_one(q):
        return await async_vector_search(q, top_k)

    tasks = [search_one(q) for q in queries]
    results = await asyncio.gather(*tasks)

    return reciprocal_rank_fusion(results, top_k=top_k)

2. 查询缓存

from functools import lru_cache

class CachedMultiQueryRetriever:
    def __init__(self):
        self.cache = {}

    @lru_cache(maxsize=1000)
    def generate_queries_cached(self, query: str) -> tuple:
        """缓存查询生成结果"""
        return tuple(generate_multi_queries(query))

    def retrieve(self, query: str, top_k: int = 5):
        queries = self.generate_queries_cached(query)
        # ... 检索逻辑

3. 早期停止

def early_stop_multi_query(
    query: str, 
    top_k: int = 5,
    min_overlap: float = 0.8
):
    """早期停止的多查询检索"""
    queries = generate_multi_queries(query)
    all_results = []

    for q in queries:
        results = vector_search(q, top_k)
        all_results.append(results)

        # 检查结果稳定性
        if len(all_results) >= 2:
            fused = reciprocal_rank_fusion(all_results, top_k)
            prev_fused = reciprocal_rank_fusion(all_results[:-1], top_k)

            overlap = len(set(fused) & set(prev_fused)) / top_k
            if overlap >= min_overlap:
                break  # 结果已稳定,停止生成更多查询

    return fused

实战案例

RAG 系统中的多查询

class MultiQueryRAG:
    def __init__(self):
        self.retriever = MultiQueryRetriever()
        self.llm = OpenAI()

    def answer(self, question: str) -> str:
        # 1. 多查询检索
        contexts = self.retriever.retrieve(question, top_k=5)

        # 2. 构建提示词
        prompt = f"""基于以下参考内容回答问题:

参考内容:
{chr(10).join(f'- {c}' for c in contexts)}

问题:{question}

请给出详细、准确的答案:"""

        # 3. 生成答案
        response = self.llm.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}]
        )

        return response.choices[0].message.content

小结

多查询融合是提升检索效果的重要技术:

  • 核心思想:多角度查询 + 结果融合
  • 融合方法:RRF、加权融合、投票融合
  • 优化方向:并行执行、自适应权重、早期停止

下一章我们将通过实战案例综合应用所学技术。