第三章:查询分解¶
什么是查询分解¶
查询分解将复杂查询拆分为多个简单的子查询,分别检索后合并结果。
适用场景¶
复杂查询特征:
1. 包含多个问题 - "Python 和 Java 的区别,哪个更适合初学者"
2. 需要多步推理 - "2024年奥运会金牌最多的国家人口是多少"
3. 涉及多个实体 - "比较特斯拉和比亚迪的电动车性能"
4. 条件组合 - "价格在5000以下且评分4星以上的手机"
分解方法¶
1. 基于 LLM 的分解¶
from openai import OpenAI
client = OpenAI()
def decompose_query(query: str) -> List[str]:
"""使用 LLM 分解查询"""
prompt = f"""请将以下复杂查询分解为多个简单的子查询:
原始查询:{query}
要求:
1. 每个子查询应该是独立可回答的
2. 子查询之间不应有依赖关系
3. 保持原始查询的完整意图
请以 JSON 数组格式返回子查询列表:"""
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
import json
result = json.loads(response.choices[0].message.content)
return result.get("sub_queries", [query])
# 示例
query = "比较 Python 和 Java 的性能,并推荐适合初学者的语言"
sub_queries = decompose_query(query)
print(sub_queries)
# [
# "Python 编程语言的性能特点",
# "Java 编程语言的性能特点",
# "Python 和 Java 性能对比",
# "适合初学者的编程语言推荐"
# ]
2. 基于规则的分解¶
import re
def rule_based_decompose(query: str) -> List[str]:
"""基于规则的查询分解"""
sub_queries = []
# 检测比较模式
compare_pattern = r"比较\s+(.+?)\s+和\s+(.+?)(?:的|之)(.+)"
match = re.search(compare_pattern, query)
if match:
entity1, entity2, aspect = match.groups()
sub_queries.extend([
f"{entity1}的{aspect}",
f"{entity2}的{aspect}",
f"{entity1}和{entity2}的{aspect}对比"
])
# 检测并列问题
if "," in query or "并且" in query or "同时" in query:
parts = re.split(r'[,并且同时]', query)
sub_queries.extend([p.strip() for p in parts if p.strip()])
# 如果没有分解,返回原查询
return sub_queries if sub_queries else [query]
# 示例
query = "比较特斯拉和比亚迪的电动车续航能力"
print(rule_based_decompose(query))
# ['特斯拉的电动车续航能力', '比亚迪的电动车续航能力', '特斯拉和比亚迪的电动车续航能力对比']
3. 依赖感知分解¶
class QueryDecomposer:
"""支持依赖关系的查询分解"""
def decompose_with_deps(self, query: str) -> dict:
"""分解查询并识别依赖关系"""
prompt = f"""分析以下查询,分解为子查询并标注依赖关系:
查询:{query}
请返回 JSON 格式:
{{
"sub_queries": [
{{
"id": "q1",
"query": "子查询内容",
"depends_on": [] // 依赖的其他查询 id
}}
]
}}"""
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
import json
return json.loads(response.choices[0].message.content)
def execute_decomposed(self, query: str, retriever) -> dict:
"""按依赖顺序执行分解后的查询"""
decomposition = self.decompose_with_deps(query)
results = {}
# 拓扑排序执行
executed = set()
sub_queries = decomposition.get("sub_queries", [])
while len(executed) < len(sub_queries):
for sq in sub_queries:
if sq["id"] in executed:
continue
# 检查依赖是否满足
deps = sq.get("depends_on", [])
if all(d in executed for d in deps):
# 执行检索
results[sq["id"]] = retriever.search(sq["query"])
executed.add(sq["id"])
return results
# 示例
query = "2024年奥运会金牌最多的国家,该国首都的人口是多少"
decomposer = QueryDecomposer()
result = decomposer.decompose_with_deps(query)
# {
# "sub_queries": [
# {"id": "q1", "query": "2024年奥运会金牌最多的国家", "depends_on": []},
# {"id": "q2", "query": "该国首都的人口", "depends_on": ["q1"]}
# ]
# }
结果合并策略¶
1. 简单合并¶
def merge_results(sub_results: List[List[dict]]) -> List[dict]:
"""简单合并所有结果"""
all_docs = []
seen = set()
for docs in sub_results:
for doc in docs:
if doc["id"] not in seen:
all_docs.append(doc)
seen.add(doc["id"])
return all_docs
2. 加权合并¶
def weighted_merge(sub_results: List[List[dict]], weights: List[float]) -> List[dict]:
"""加权合并结果"""
doc_scores = {}
for docs, weight in zip(sub_results, weights):
for rank, doc in enumerate(docs):
doc_id = doc["id"]
score = weight * (1.0 / (rank + 1))
if doc_id not in doc_scores:
doc_scores[doc_id] = {"doc": doc, "score": 0}
doc_scores[doc_id]["score"] += score
# 按分数排序
sorted_docs = sorted(doc_scores.values(), key=lambda x: x["score"], reverse=True)
return [item["doc"] for item in sorted_docs]
3. 互惠排名融合(RRF)¶
def reciprocal_rank_fusion(sub_results: List[List[dict]], k: int = 60) -> List[dict]:
"""互惠排名融合"""
doc_scores = {}
for docs in sub_results:
for rank, doc in enumerate(docs):
doc_id = doc["id"]
rrf_score = 1.0 / (k + rank + 1)
if doc_id not in doc_scores:
doc_scores[doc_id] = {"doc": doc, "score": 0}
doc_scores[doc_id]["score"] += rrf_score
sorted_docs = sorted(doc_scores.values(), key=lambda x: x["score"], reverse=True)
return [item["doc"] for item in sorted_docs]
完整示例¶
class DecomposedRetriever:
"""支持查询分解的检索器"""
def __init__(self, base_retriever):
self.retriever = base_retriever
self.decomposer = QueryDecomposer()
def retrieve(self, query: str, top_k: int = 10) -> List[dict]:
"""检索入口"""
# 1. 分解查询
sub_queries = decompose_query(query)
# 2. 检索每个子查询
all_results = []
for sq in sub_queries:
results = self.retriever.search(sq, top_k=top_k)
all_results.append(results)
# 3. 合并结果
if len(all_results) == 1:
return all_results[0]
return reciprocal_rank_fusion(all_results)
# 使用
retriever = DecomposedRetriever(base_retriever)
results = retriever.retrieve("比较 Python 和 Java 的性能差异")
小结¶
查询分解能处理复杂的多意图查询。下一章学习意图识别技术。