跳转至

第三章:查询分解

什么是查询分解

查询分解将复杂查询拆分为多个简单的子查询,分别检索后合并结果。

适用场景

复杂查询特征:
1. 包含多个问题 - "Python 和 Java 的区别,哪个更适合初学者"
2. 需要多步推理 - "2024年奥运会金牌最多的国家人口是多少"
3. 涉及多个实体 - "比较特斯拉和比亚迪的电动车性能"
4. 条件组合 - "价格在5000以下且评分4星以上的手机"

分解方法

1. 基于 LLM 的分解

from openai import OpenAI

client = OpenAI()

def decompose_query(query: str) -> List[str]:
    """使用 LLM 分解查询"""
    prompt = f"""请将以下复杂查询分解为多个简单的子查询:

原始查询:{query}

要求:
1. 每个子查询应该是独立可回答的
2. 子查询之间不应有依赖关系
3. 保持原始查询的完整意图

请以 JSON 数组格式返回子查询列表:"""

    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"}
    )

    import json
    result = json.loads(response.choices[0].message.content)
    return result.get("sub_queries", [query])

# 示例
query = "比较 Python 和 Java 的性能,并推荐适合初学者的语言"
sub_queries = decompose_query(query)
print(sub_queries)
# [
#   "Python 编程语言的性能特点",
#   "Java 编程语言的性能特点",
#   "Python 和 Java 性能对比",
#   "适合初学者的编程语言推荐"
# ]

2. 基于规则的分解

import re

def rule_based_decompose(query: str) -> List[str]:
    """基于规则的查询分解"""
    sub_queries = []

    # 检测比较模式
    compare_pattern = r"比较\s+(.+?)\s+和\s+(.+?)(?:的|之)(.+)"
    match = re.search(compare_pattern, query)
    if match:
        entity1, entity2, aspect = match.groups()
        sub_queries.extend([
            f"{entity1}{aspect}",
            f"{entity2}{aspect}",
            f"{entity1}{entity2}{aspect}对比"
        ])

    # 检测并列问题
    if "," in query or "并且" in query or "同时" in query:
        parts = re.split(r'[,并且同时]', query)
        sub_queries.extend([p.strip() for p in parts if p.strip()])

    # 如果没有分解,返回原查询
    return sub_queries if sub_queries else [query]

# 示例
query = "比较特斯拉和比亚迪的电动车续航能力"
print(rule_based_decompose(query))
# ['特斯拉的电动车续航能力', '比亚迪的电动车续航能力', '特斯拉和比亚迪的电动车续航能力对比']

3. 依赖感知分解

class QueryDecomposer:
    """支持依赖关系的查询分解"""

    def decompose_with_deps(self, query: str) -> dict:
        """分解查询并识别依赖关系"""
        prompt = f"""分析以下查询,分解为子查询并标注依赖关系:

查询:{query}

请返回 JSON 格式:
{{
    "sub_queries": [
        {{
            "id": "q1",
            "query": "子查询内容",
            "depends_on": []  // 依赖的其他查询 id
        }}
    ]
}}"""

        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": prompt}],
            response_format={"type": "json_object"}
        )

        import json
        return json.loads(response.choices[0].message.content)

    def execute_decomposed(self, query: str, retriever) -> dict:
        """按依赖顺序执行分解后的查询"""
        decomposition = self.decompose_with_deps(query)
        results = {}

        # 拓扑排序执行
        executed = set()
        sub_queries = decomposition.get("sub_queries", [])

        while len(executed) < len(sub_queries):
            for sq in sub_queries:
                if sq["id"] in executed:
                    continue

                # 检查依赖是否满足
                deps = sq.get("depends_on", [])
                if all(d in executed for d in deps):
                    # 执行检索
                    results[sq["id"]] = retriever.search(sq["query"])
                    executed.add(sq["id"])

        return results

# 示例
query = "2024年奥运会金牌最多的国家,该国首都的人口是多少"
decomposer = QueryDecomposer()
result = decomposer.decompose_with_deps(query)
# {
#     "sub_queries": [
#         {"id": "q1", "query": "2024年奥运会金牌最多的国家", "depends_on": []},
#         {"id": "q2", "query": "该国首都的人口", "depends_on": ["q1"]}
#     ]
# }

结果合并策略

1. 简单合并

def merge_results(sub_results: List[List[dict]]) -> List[dict]:
    """简单合并所有结果"""
    all_docs = []
    seen = set()

    for docs in sub_results:
        for doc in docs:
            if doc["id"] not in seen:
                all_docs.append(doc)
                seen.add(doc["id"])

    return all_docs

2. 加权合并

def weighted_merge(sub_results: List[List[dict]], weights: List[float]) -> List[dict]:
    """加权合并结果"""
    doc_scores = {}

    for docs, weight in zip(sub_results, weights):
        for rank, doc in enumerate(docs):
            doc_id = doc["id"]
            score = weight * (1.0 / (rank + 1))

            if doc_id not in doc_scores:
                doc_scores[doc_id] = {"doc": doc, "score": 0}
            doc_scores[doc_id]["score"] += score

    # 按分数排序
    sorted_docs = sorted(doc_scores.values(), key=lambda x: x["score"], reverse=True)
    return [item["doc"] for item in sorted_docs]

3. 互惠排名融合(RRF)

def reciprocal_rank_fusion(sub_results: List[List[dict]], k: int = 60) -> List[dict]:
    """互惠排名融合"""
    doc_scores = {}

    for docs in sub_results:
        for rank, doc in enumerate(docs):
            doc_id = doc["id"]
            rrf_score = 1.0 / (k + rank + 1)

            if doc_id not in doc_scores:
                doc_scores[doc_id] = {"doc": doc, "score": 0}
            doc_scores[doc_id]["score"] += rrf_score

    sorted_docs = sorted(doc_scores.values(), key=lambda x: x["score"], reverse=True)
    return [item["doc"] for item in sorted_docs]

完整示例

class DecomposedRetriever:
    """支持查询分解的检索器"""

    def __init__(self, base_retriever):
        self.retriever = base_retriever
        self.decomposer = QueryDecomposer()

    def retrieve(self, query: str, top_k: int = 10) -> List[dict]:
        """检索入口"""
        # 1. 分解查询
        sub_queries = decompose_query(query)

        # 2. 检索每个子查询
        all_results = []
        for sq in sub_queries:
            results = self.retriever.search(sq, top_k=top_k)
            all_results.append(results)

        # 3. 合并结果
        if len(all_results) == 1:
            return all_results[0]

        return reciprocal_rank_fusion(all_results)

# 使用
retriever = DecomposedRetriever(base_retriever)
results = retriever.retrieve("比较 Python 和 Java 的性能差异")

小结

查询分解能处理复杂的多意图查询。下一章学习意图识别技术。