跳转至

第七章:多智能体协作

协作模式

主从模式

主从模式:
┌─────────────┐
│ Master Agent│
└─────────────┘
   ┌───┴───┬───────┐
   ▼       ▼       ▼
┌─────┐ ┌─────┐ ┌─────┐
│Agent│ │Agent│ │Agent│
│  A  │ │  B  │ │  C  │
└─────┘ └─────┘ └─────┘

对等模式

对等模式:
┌─────┐     ┌─────┐
│Agent│ ←→ │Agent│
│  A  │     │  B  │
└─────┘     └─────┘
   ↑           ↑
   └─────┬─────┘
      ┌─────┐
      │Agent│
      │  C  │
      └─────┘

协作实现

Agent 编排器

from typing import Dict, List
import asyncio

class AgentOrchestrator:
    def __init__(self):
        self.agents: Dict[str, A2AClient] = {}

    def register_agent(self, name: str, client: A2AClient):
        self.agents[name] = client

    async def broadcast(self, message: str) -> Dict[str, dict]:
        """广播消息给所有 Agent"""
        results = {}
        tasks = []

        for name, client in self.agents.items():
            task_id = f"task-{name}"
            tasks.append((name, client.send_task(task_id, message)))

        for name, coro in tasks:
            results[name] = await coro

        return results

    async def chain(self, message: str, agents: List[str]) -> dict:
        """链式调用 Agent"""
        current_message = message

        for agent_name in agents:
            client = self.agents[agent_name]
            task_id = f"task-{agent_name}"

            result = await client.send_task(task_id, current_message)

            # 更新消息为下一个 Agent 的输入
            if "result" in result and "artifacts" in result["result"]:
                current_message = result["result"]["artifacts"][0]["text"]

        return {"final_result": current_message}

# 使用
orchestrator = AgentOrchestrator()
orchestrator.register_agent("analyzer", A2AClient("http://analyzer:8000"))
orchestrator.register_agent("summarizer", A2AClient("http://summarizer:8000"))

# 链式调用
result = await orchestrator.chain(
    "Analyze this data",
    ["analyzer", "summarizer"]
)

任务分发

class TaskDispatcher:
    def __init__(self, agents: Dict[str, A2AClient]):
        self.agents = agents

    async def dispatch_by_capability(self, task_type: str, message: str):
        """根据能力分发任务"""
        capability_map = {
            "analysis": "analyzer",
            "summary": "summarizer",
            "translation": "translator"
        }

        agent_name = capability_map.get(task_type)
        if not agent_name:
            raise ValueError(f"Unknown task type: {task_type}")

        client = self.agents[agent_name]
        return await client.send_task(f"task-{task_type}", message)

小结

多智能体协作要点:

  • 协作模式:主从模式、对等模式
  • Agent 编排器:注册、广播、链式调用
  • 任务分发:按能力分发

下一章我们将学习最佳实践。