跳转至

第五章:工具调用模式

ReAct 模式

ReAct(Reasoning + Acting)是一种让模型交替进行推理和行动的模式:

Thought: 用户想知道北京明天的天气,我需要先查询天气
Action: get_weather(city="北京", date="明天")
Observation: {"temperature": 18, "condition": "小雨", "humidity": 75}
Thought: 明天会下雨,我应该提醒用户带伞
Answer: 北京明天有小雨,气温18度,建议您出门带伞

实现 ReAct

SYSTEM_PROMPT = """你是一个智能助手,可以使用工具来帮助用户。

对于每个用户请求,请按以下格式思考:

Thought: 分析用户需求,决定下一步行动
Action: 要调用的工具和参数
Observation: 工具返回的结果(由系统提供)
... (重复 Thought/Action/Observation 直到获得答案)
Answer: 最终回答

可用工具:
- get_weather(city, date): 获取天气
- search_web(query): 搜索网络
- calculate(expression): 数学计算
"""


def react_conversation(user_message: str, max_iterations: int = 5) -> str:
    """ReAct 模式对话"""

    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": user_message}
    ]

    for _ in range(max_iterations):
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools
        )

        message = response.choices[0].message
        messages.append(message)

        # 如果没有工具调用,返回最终答案
        if not message.tool_calls:
            return message.content

        # 执行工具调用
        for tool_call in message.tool_calls:
            result = executor.execute(tool_call)
            messages.append({
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": json.dumps(result, ensure_ascii=False)
            })

    return "处理超时"

规划-执行模式

将复杂任务分解为规划阶段和执行阶段:

PLANNING_PROMPT = """你是一个任务规划专家。

分析用户的请求,制定执行计划。输出 JSON 格式的计划:

{
    "goal": "任务目标",
    "steps": [
        {"tool": "工具名", "args": {"参数": "值"}, "purpose": "步骤目的"},
        ...
    ]
}
"""


def plan_and_execute(user_message: str) -> str:
    """规划-执行模式"""

    # 阶段1:规划
    plan_response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": PLANNING_PROMPT},
            {"role": "user", "content": user_message}
        ],
        tools=tools,
        response_format={"type": "json_object"}
    )

    plan = json.loads(plan_response.choices[0].message.content)
    print(f"执行计划: {json.dumps(plan, ensure_ascii=False, indent=2)}")

    # 阶段2:执行
    results = []
    for step in plan["steps"]:
        tool_name = step["tool"]
        tool_args = step["args"]

        # 执行工具
        result = executor.tools[tool_name](**tool_args)
        results.append({
            "step": step["purpose"],
            "result": result
        })

    # 阶段3:总结
    summary_response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": user_message},
            {"role": "system", "content": f"执行结果:\n{json.dumps(results, ensure_ascii=False, indent=2)}"}
        ]
    )

    return summary_response.choices[0].message.content

多 Agent 协作

不同 Agent 负责不同类型的工具:

class Agent:
    """工具 Agent"""

    def __init__(self, name: str, tools: list, system_prompt: str):
        self.name = name
        self.tools = tools
        self.system_prompt = system_prompt
        self.client = OpenAI()

    def process(self, message: str) -> str:
        """处理消息"""
        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": message}
            ],
            tools=self.tools
        )
        return response.choices[0].message


class Orchestrator:
    """协调器"""

    def __init__(self):
        self.agents = {}

    def register_agent(self, name: str, agent: Agent):
        self.agents[name] = agent

    def route(self, user_message: str) -> str:
        """路由到合适的 Agent"""
        # 分析用户意图
        intent = self.analyze_intent(user_message)

        if intent == "weather":
            return self.agents["weather"].process(user_message)
        elif intent == "search":
            return self.agents["search"].process(user_message)
        elif intent == "database":
            return self.agents["database"].process(user_message)
        else:
            return self.agents["general"].process(user_message)

    def analyze_intent(self, message: str) -> str:
        """分析用户意图"""
        keywords = {
            "weather": ["天气", "温度", "下雨", "晴天"],
            "search": ["搜索", "查找", "查询", "找"],
            "database": ["数据库", "表", "记录", "数据"]
        }

        for intent, words in keywords.items():
            if any(word in message for word in words):
                return intent

        return "general"


# 使用示例
orchestrator = Orchestrator()

# 注册专业 Agent
orchestrator.register_agent("weather", Agent(
    name="weather",
    tools=weather_tools,
    system_prompt="你是天气专家,帮助用户查询和分析天气信息。"
))

orchestrator.register_agent("search", Agent(
    name="search",
    tools=search_tools,
    system_prompt="你是搜索专家,帮助用户查找互联网信息。"
))

# 处理用户请求
answer = orchestrator.route("北京明天天气怎么样?")

工具链模式

将多个工具串联执行:

class ToolChain:
    """工具链"""

    def __init__(self):
        self.steps = []

    def add_step(self, tool_name: str, args: dict, transform: callable = None):
        """
        添加步骤

        Args:
            tool_name: 工具名称
            args: 参数(可以是静态值或从上一步结果提取的函数)
            transform: 结果转换函数
        """
        self.steps.append({
            "tool": tool_name,
            "args": args,
            "transform": transform
        })
        return self

    def execute(self, initial_input: dict = None) -> dict:
        """执行工具链"""
        context = initial_input or {}

        for i, step in enumerate(self.steps):
            # 解析参数
            args = {}
            for key, value in step["args"].items():
                if callable(value):
                    args[key] = value(context)
                else:
                    args[key] = value

            # 执行工具
            result = executor.tools[step["tool"]](**args)

            # 转换结果
            if step["transform"]:
                result = step["transform"](result)

            # 更新上下文
            context[f"step_{i}_result"] = result

        return context


# 使用示例:查询天气 -> 判断是否下雨 -> 发送提醒
chain = ToolChain() \
    .add_step("get_weather", {"city": lambda ctx: ctx["city"]}) \
    .add_step(
        "send_email",
        {
            "to": "user@example.com",
            "subject": "天气提醒",
            "body": lambda ctx: f"明天天气: {ctx['step_0_result']['condition']}"
        },
        transform=lambda r: {"sent": True}
    )

result = chain.execute({"city": "北京"})

错误恢复模式

def resilient_tool_call(tool_call, max_retries: int = 3) -> dict:
    """带重试的工具调用"""

    for attempt in range(max_retries):
        try:
            result = executor.execute(tool_call)

            if result.get("success"):
                return result

            # 如果是参数错误,让模型修正
            if result.get("error") == "INVALID_PARAMETERS":
                correction = client.chat.completions.create(
                    model="gpt-4o",
                    messages=[
                        {"role": "system", "content": "修正工具调用参数"},
                        {"role": "user", "content": f"错误: {result}\n原始参数: {tool_call.function.arguments}"}
                    ]
                )
                # 使用修正后的参数重试
                tool_call.function.arguments = correction.choices[0].message.content
                continue

            return result

        except Exception as e:
            if attempt == max_retries - 1:
                return {"success": False, "error": str(e)}

            # 等待后重试
            time.sleep(2 ** attempt)

    return {"success": False, "error": "超过最大重试次数"}

缓存模式

from functools import lru_cache
import hashlib

class CachedToolExecutor(ToolExecutor):
    """带缓存的工具执行器"""

    def __init__(self, cache_size: int = 1000):
        super().__init__()
        self.cache = {}
        self.cache_size = cache_size

    def _cache_key(self, name: str, args: dict) -> str:
        """生成缓存键"""
        key_str = f"{name}:{json.dumps(args, sort_keys=True)}"
        return hashlib.md5(key_str.encode()).hexdigest()

    def execute(self, tool_call) -> Any:
        """执行时检查缓存"""
        name = tool_call.function.name
        args = json.loads(tool_call.function.arguments)

        # 检查缓存
        cache_key = self._cache_key(name, args)
        if cache_key in self.cache:
            return self.cache[cache_key]

        # 执行并缓存
        result = super().execute(tool_call)

        # 只缓存成功的结果
        if result.get("success"):
            if len(self.cache) >= self.cache_size:
                # 简单的 LRU:删除最早的缓存
                oldest_key = next(iter(self.cache))
                del self.cache[oldest_key]

            self.cache[cache_key] = result

        return result

小结

本章学习了:

  • ✅ ReAct 推理-行动模式
  • ✅ 规划-执行模式
  • ✅ 多 Agent 协作
  • ✅ 工具链模式
  • ✅ 错误恢复和缓存

下一章

第六章:实战项目 - 构建一个完整的智能助手。