第02章 多Agent系统架构:层级式、平行式与网状式

第02章 多Agent系统架构:层级式、平行式与网状式

“结构决定行为。选择错误的架构,比写出错误的代码更难修复。” —— 软件架构设计原则


多 Agent 系统不是把多个 Agent 随便放在一起就行了。架构决定了 Agent 如何分工、如何通信、如何处理失败。

这一章,我们深入剖析三种核心多 Agent 架构:层级式、平行式和网状式。每种都有适合的场景,选错了会让系统既复杂又低效。


2.1 架构一:层级式(Hierarchical)

# ====================================================
# 层级式架构:有明确的领导者和执行者
# ====================================================

"""
层级式架构结构:
    
    ┌─────────────────┐
    │   协调器 Agent   │  ← 接收任务、制定计划、分配工作
    └────────┬────────┘
             │
    ┌────────┼────────┐
    ▼        ▼        ▼
 ┌──────┐ ┌──────┐ ┌──────┐
 │Agent1│ │Agent2│ │Agent3│  ← 执行具体任务,向协调器汇报
 └──────┘ └──────┘ └──────┘

特点:
- 有明确的指挥链
- 协调器负责全局规划
- 工作 Agent 专注执行
- 结果汇总由协调器处理

适合场景:
- 任务有明确的主次关系
- 需要全局一致性的输出
- 工作流有固定的顺序
"""

import asyncio
import json
from dataclasses import dataclass, field
from typing import List, Optional, Callable
from openai import AsyncOpenAI

client = AsyncOpenAI()


@dataclass
class AgentMessage:
    """Agent 间传递的消息"""
    sender: str
    receiver: str
    content: str
    message_type: str = "task"  # task, result, error, query
    metadata: dict = field(default_factory=dict)


@dataclass
class WorkerAgent:
    """层级架构中的工作 Agent"""
    name: str
    specialty: str  # 专业领域
    model: str = "gpt-4o-mini"
    
    async def execute(self, task: str) -> AgentMessage:
        """执行分配的任务并返回结果"""
        response = await client.chat.completions.create(
            model=self.model,
            messages=[
                {
                    "role": "system",
                    "content": f"你是一名{self.specialty}专家。专注、简洁地完成分配的任务。"
                },
                {
                    "role": "user",
                    "content": task
                }
            ],
            max_tokens=600,
        )
        
        result = response.choices[0].message.content
        print(f"    [{self.name}] 完成任务: {task[:40]}...")
        
        return AgentMessage(
            sender=self.name,
            receiver="coordinator",
            content=result,
            message_type="result",
        )


class HierarchicalCoordinator:
    """
    层级式协调器
    
    职责:
    1. 接收高层任务
    2. 分解为子任务
    3. 分配给专工 Agent
    4. 收集结果
    5. 整合输出
    """
    
    def __init__(self, workers: List[WorkerAgent]):
        self.workers = {w.name: w for w in workers}
        self.model = "gpt-4o"
    
    async def decompose_task(self, goal: str) -> List[dict]:
        """将高层目标分解为子任务,并分配给对应的工作 Agent"""
        
        worker_descriptions = "\n".join([
            f"- {name}: {w.specialty}"
            for name, w in self.workers.items()
        ])
        
        response = await client.chat.completions.create(
            model=self.model,
            messages=[{
                "role": "user",
                "content": f"""
将以下任务分配给最合适的专家完成:

任务:{goal}

可用专家:
{worker_descriptions}

以JSON格式返回任务分配:
{{"assignments": [{{"agent": "专家名称", "subtask": "具体子任务描述"}}]}}
"""
            }],
            response_format={"type": "json_object"},
            temperature=0.1,
        )
        
        data = json.loads(response.choices[0].message.content)
        return data.get("assignments", [])
    
    async def synthesize_results(self, goal: str, results: List[AgentMessage]) -> str:
        """将所有工作 Agent 的结果整合成最终输出"""
        
        results_text = "\n\n".join([
            f"**{msg.sender} 的贡献:**\n{msg.content}"
            for msg in results
        ])
        
        response = await client.chat.completions.create(
            model=self.model,
            messages=[{
                "role": "user",
                "content": f"""
原始目标:{goal}

各专家的贡献:
{results_text}

请将以上内容整合成一个连贯、完整的最终报告。消除重复,保持统一风格。
"""
            }],
            max_tokens=1000,
        )
        
        return response.choices[0].message.content
    
    async def run(self, goal: str) -> str:
        """执行完整的层级式工作流"""
        print(f"\n协调器接收任务:{goal}")
        
        # 步骤1:分解任务
        print("\n步骤1:分解任务...")
        assignments = await self.decompose_task(goal)
        print(f"  分解为 {len(assignments)} 个子任务")
        
        # 步骤2:并行分配给工作 Agent
        print("\n步骤2:并行分配子任务...")
        tasks = []
        for assignment in assignments:
            agent_name = assignment.get("agent", "")
            subtask = assignment.get("subtask", "")
            
            if agent_name in self.workers:
                tasks.append(self.workers[agent_name].execute(subtask))
            else:
                # 没有匹配的 Agent,用第一个可用的
                first_worker = list(self.workers.values())[0]
                tasks.append(first_worker.execute(subtask))
        
        results = await asyncio.gather(*tasks)
        
        # 步骤3:整合结果
        print("\n步骤3:整合所有结果...")
        final_output = await self.synthesize_results(goal, list(results))
        
        return final_output


# 创建层级式多 Agent 团队
coordinator = HierarchicalCoordinator(workers=[
    WorkerAgent("研究员", "信息搜集与事实核查"),
    WorkerAgent("分析师", "数据分析与规律发现"),
    WorkerAgent("写手", "内容撰写与表达"),
    WorkerAgent("评审", "质量检查与逻辑校验"),
])

result = asyncio.run(coordinator.run(
    "为'生产力工具'赛道撰写一份500字的市场机会分析"
))
print(f"\n最终输出(前200字):\n{result[:200]}...")

2.2 架构二:平行式(Parallel)

# ====================================================
# 平行式架构:无领导者,Agent 平等协作
# ====================================================

"""
平行式架构结构:
    
 ┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐
 │Agent1│  │Agent2│  │Agent3│  │Agent4│
 └──┬───┘  └──┬───┘  └──┬───┘  └──┬───┘
    │          │          │          │
    └──────────┴──────────┴──────────┘
                     │
              ┌──────▼──────┐
              │  结果聚合器  │  ← 不是协调者,只负责合并结果
              └─────────────┘

特点:
- 所有 Agent 平等,同时接收相同的输入
- 每个 Agent 独立产生输出(视角、答案、评估)
- 结果聚合器合并多个视角
- 没有主次之分

适合场景:
- 需要多角度分析同一问题
- 需要通过"多数表决"提高准确率
- 希望获得多样化的创意输出
- 容错性要求高(一个失败不影响整体)
"""


class ParallelAgentSystem:
    """
    平行式多 Agent 系统
    
    典型用例:
    - 多个 AI 同时回答同一问题,再选最优
    - 多个 AI 分别写创意方案,再汇总
    - 多个 AI 对同一内容打分,取平均
    """
    
    def __init__(self, agent_configs: List[dict], model: str = "gpt-4o-mini"):
        """
        agent_configs: 每个 Agent 的配置,包含 name 和 system_prompt
        """
        self.agents = agent_configs
        self.model = model
    
    async def _single_agent_response(
        self, 
        agent_config: dict, 
        user_input: str
    ) -> dict:
        """单个 Agent 产生响应"""
        response = await client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": agent_config["system_prompt"]},
                {"role": "user", "content": user_input}
            ],
            max_tokens=500,
        )
        
        return {
            "agent": agent_config["name"],
            "perspective": agent_config.get("perspective", "通用"),
            "response": response.choices[0].message.content
        }
    
    async def run_parallel(self, query: str) -> List[dict]:
        """所有 Agent 同时处理同一个查询"""
        print(f"\n{len(self.agents)} 个 Agent 同时分析:{query}\n")
        
        # 真正的并行执行
        responses = await asyncio.gather(*[
            self._single_agent_response(agent, query)
            for agent in self.agents
        ])
        
        return list(responses)
    
    async def aggregate_by_voting(self, responses: List[dict]) -> str:
        """聚合策略:让 LLM 对多个回答进行投票/综合"""
        
        perspectives_text = "\n\n".join([
            f"**{r['agent']}({r['perspective']}视角):**\n{r['response']}"
            for r in responses
        ])
        
        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""
以下是 {len(responses)} 位专家从不同视角给出的分析:

{perspectives_text}

请综合所有视角,给出一个平衡、全面的最终结论。
指出各视角的共识点和分歧点。
"""
            }],
            max_tokens=800,
        )
        
        return response.choices[0].message.content
    
    async def aggregate_best_of_n(self, query: str, responses: List[dict]) -> str:
        """聚合策略:选出最优回答(Best-of-N)"""
        
        responses_text = "\n\n".join([
            f"方案{i+1}({r['agent']}):\n{r['response']}"
            for i, r in enumerate(responses)
        ])
        
        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[{
                "role": "user",
                "content": f"""
问题:{query}

以下是 {len(responses)} 个不同的回答:

{responses_text}

请选出最准确、最有帮助的一个回答,并说明选择理由。
只输出选中的方案编号和理由。
"""
            }],
            max_tokens=300,
        )
        
        return response.choices[0].message.content


# 演示:用多角度视角分析一个商业决策
multi_perspective_system = ParallelAgentSystem(
    agent_configs=[
        {
            "name": "乐观派",
            "perspective": "机会与增长",
            "system_prompt": "你是一位乐观的创业者,擅长发现机会和增长潜力。分析问题时关注正面因素和上升空间。"
        },
        {
            "name": "风险专家",
            "perspective": "风险与挑战",
            "system_prompt": "你是一位风险管理专家,擅长识别潜在风险和挑战。分析问题时关注负面因素和缓解策略。"
        },
        {
            "name": "用户代言人",
            "perspective": "用户体验",
            "system_prompt": "你是用户体验专家,代表最终用户的需求和感受。分析问题时关注用户价值和使用场景。"
        },
        {
            "name": "财务分析师",
            "perspective": "财务可行性",
            "system_prompt": "你是财务分析师,关注成本、收益和可持续性。分析问题时关注数字和财务逻辑。"
        },
    ]
)

async def demo_parallel():
    query = "我们应该把产品的免费套餐彻底取消,全部转为付费吗?"
    responses = await multi_perspective_system.run_parallel(query)
    
    print("各角度观点收集完毕,开始综合分析...")
    final = await multi_perspective_system.aggregate_by_voting(responses)
    print("\n综合结论:")
    print(final[:400] + "...")

asyncio.run(demo_parallel())

2.3 架构三:网状式(Mesh)

# ====================================================
# 网状式架构:Agent 互相通信,动态协作
# ====================================================

"""
网状式架构结构:
    
 ┌──────┐  ←→  ┌──────┐
 │Agent1│      │Agent2│
 └──┬───┘      └──┬───┘
    │  ↖        ↗  │
    │    ┌──────┐   │
    └──→ │Agent3│ ←─┘
         └──────┘

特点:
- 任意 Agent 可以互相通信
- 没有固定的流向,根据需要动态路由
- Agent 可以主动发起协作请求
- 更灵活,但更难控制

适合场景:
- 任务高度动态,难以预先规划流程
- Agent 需要基于其他 Agent 的中间结果调整自己的行为
- 需要真正的"涌现"行为(自组织)
"""

import asyncio
from typing import Dict, Callable


class MeshAgent:
    """网状架构中的 Agent,可以互相通信"""
    
    def __init__(self, name: str, specialty: str):
        self.name = name
        self.specialty = specialty
        self.peers: Dict[str, 'MeshAgent'] = {}  # 其他 Agent 的引用
        self.message_queue: asyncio.Queue = asyncio.Queue()
        self.knowledge_base: Dict[str, str] = {}  # 从其他 Agent 学到的知识
    
    def register_peer(self, agent: 'MeshAgent'):
        """注册一个可以通信的同伴 Agent"""
        self.peers[agent.name] = agent
    
    async def send_message(self, target_name: str, content: str, msg_type: str = "query"):
        """向另一个 Agent 发送消息"""
        if target_name in self.peers:
            target = self.peers[target_name]
            await target.message_queue.put({
                "from": self.name,
                "content": content,
                "type": msg_type,
            })
            print(f"  [{self.name}] → [{target_name}]: {content[:50]}...")
    
    async def ask_peer(self, target_name: str, question: str) -> str:
        """向同伴提问并等待回答"""
        if target_name not in self.peers:
            return f"找不到 Agent: {target_name}"
        
        target = self.peers[target_name]
        
        # 发送问题
        await target.message_queue.put({
            "from": self.name,
            "content": question,
            "type": "query",
        })
        
        # 使用 LLM 模拟目标 Agent 回答
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": f"你是 {target_name},专长是{target.specialty}。简洁回答同事的问题。"
                },
                {"role": "user", "content": question}
            ],
            max_tokens=300,
        )
        
        answer = response.choices[0].message.content
        self.knowledge_base[f"来自{target_name}"] = answer
        
        return answer
    
    async def work(self, task: str) -> str:
        """执行任务,必要时主动向同伴请教"""
        
        # 分析任务,判断是否需要请教同伴
        analysis_response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": f"你是 {self.name},专长是{self.specialty}。"
                },
                {
                    "role": "user",
                    "content": f"""
你的任务:{task}

你的同伴列表:{list(self.peers.keys())}

判断:
1. 你能独立完成这个任务吗?
2. 如果需要帮助,应该问哪个同伴什么问题?

回答格式:
独立完成: 是/否
需要咨询: 同伴名字(如不需要则为"无")
咨询问题: 具体问题(如不需要则为"无")
"""
                }
            ],
            max_tokens=200,
        )
        
        analysis = analysis_response.choices[0].message.content
        
        # 解析是否需要请教同伴(简化版)
        if "是" not in analysis.split("独立完成:")[1][:3] if "独立完成:" in analysis else True:
            # 需要请教同伴,找第一个可用的同伴
            if self.peers:
                peer_name = list(self.peers.keys())[0]
                sub_question = f"关于'{task}',你有什么专业见解?"
                peer_insight = await self.ask_peer(peer_name, sub_question)
                
                # 整合同伴的输入
                final_response = await client.chat.completions.create(
                    model="gpt-4o-mini",
                    messages=[
                        {"role": "system", "content": f"你是 {self.name},专长是{self.specialty}。"},
                        {"role": "user", "content": f"任务:{task}\n\n{peer_name}的补充意见:{peer_insight}\n\n请给出你的最终回答。"}
                    ],
                    max_tokens=400,
                )
                return final_response.choices[0].message.content
        
        # 独立完成
        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": f"你是 {self.name},专长是{self.specialty}。"},
                {"role": "user", "content": task}
            ],
            max_tokens=400,
        )
        return response.choices[0].message.content


# 网状协作演示
async def mesh_demo():
    # 创建 Agent
    tech = MeshAgent("技术专家", "软件架构与技术选型")
    biz = MeshAgent("商业专家", "商业模式与市场策略")
    ux = MeshAgent("UX专家", "用户体验与产品设计")
    
    # 建立双向连接(网状)
    for a in [tech, biz, ux]:
        for b in [tech, biz, ux]:
            if a.name != b.name:
                a.register_peer(b)
    
    # 给每个 Agent 分配任务,它们会自主决定是否需要协作
    task = "评估是否应该把我们的产品从Web迁移到移动端优先"
    
    print(f"网状协作任务:{task}\n")
    
    results = await asyncio.gather(
        tech.work(task),
        biz.work(task),
        ux.work(task),
    )
    
    print("\n各 Agent 的独立结论:")
    for agent, result in zip([tech, biz, ux], results):
        print(f"\n[{agent.name}]:")
        print(result[:200] + "...")

asyncio.run(mesh_demo())

2.4 架构选型指南

# ====================================================
# 架构选型决策树
# ====================================================

def choose_architecture(requirements: dict) -> str:
    """
    根据需求特征推荐最合适的多 Agent 架构
    
    requirements 字段:
    - has_clear_hierarchy: 任务有明确的主次关系
    - needs_diverse_perspectives: 需要多角度观点
    - tasks_are_independent: 子任务相互独立
    - workflow_is_dynamic: 工作流程高度动态
    - needs_consensus: 需要达成共识
    - priority: latency/quality/flexibility/cost
    """
    
    h = requirements.get("has_clear_hierarchy", False)
    d = requirements.get("needs_diverse_perspectives", False)
    i = requirements.get("tasks_are_independent", False)
    dyn = requirements.get("workflow_is_dynamic", False)
    c = requirements.get("needs_consensus", False)
    
    reasoning = []
    
    if h and not dyn:
        reasoning.append("有明确主次关系 → 倾向层级式")
    
    if d and i:
        reasoning.append("需要多视角且子任务独立 → 倾向平行式")
    
    if dyn and not h:
        reasoning.append("工作流动态变化 → 倾向网状式")
    
    if c:
        reasoning.append("需要共识 → 平行式(投票)或网状式(讨论)")
    
    # 决策
    if h and not dyn:
        architecture = "层级式"
        reason = "任务有清晰的上下游关系,协调器可以有效管控全局"
    elif d or (i and not h):
        architecture = "平行式"
        reason = "子任务独立,多视角可以提升输出质量"
    elif dyn:
        architecture = "网状式"
        reason = "任务边界模糊,Agent 需要动态协商"
    else:
        architecture = "层级式"
        reason = "默认推荐,最易于管理和调试"
    
    return f"""
推荐架构:{architecture}
选择原因:{reason}

推理过程:
{chr(10).join(f'  - {r}' for r in reasoning) if reasoning else '  - 使用默认推荐'}

实施建议:
{
    '从一个协调器 Agent 开始,逐步添加工作 Agent' 
    if architecture == '层级式' else
    '设计标准的任务接口,让所有并行 Agent 接受相同输入格式'
    if architecture == '平行式' else
    '从3个 Agent 的小网络开始,验证通信协议后再扩展'
}
"""


# 测试几个场景
print("场景1:内容生产流水线")
print(choose_architecture({
    "has_clear_hierarchy": True,
    "tasks_are_independent": False,
    "workflow_is_dynamic": False,
}))

print("\n场景2:创意头脑风暴")
print(choose_architecture({
    "needs_diverse_perspectives": True,
    "tasks_are_independent": True,
    "needs_consensus": True,
}))

print("\n场景3:智能客服系统")
print(choose_architecture({
    "workflow_is_dynamic": True,
    "has_clear_hierarchy": False,
    "needs_diverse_perspectives": False,
}))

本章小结

  1. 三种架构各有适用场景:层级式适合有明确流程的工作流;平行式适合需要多视角或高容错的场景;网状式适合高度动态的复杂协作。不存在"最好的架构",只有"最合适的架构"。

  2. 层级式是最容易上手的架构:有明确的协调器角色,工作流可预测,易于调试。90% 的业务场景从层级式开始就足够了。

  3. 平行式是提升质量的利器:让多个 Agent 独立处理同一问题再合并,就像"多数表决"一样能有效降低单点错误,在需要高准确率的场景非常有价值。

  4. 网状式是最灵活但最复杂的:Agent 之间的动态通信能产生"涌现"行为,但也难以预测和调试。建议只有在其他架构明显不够用时才采用。

  5. 架构可以混合:真实系统往往是混合架构——顶层用层级式,某些子系统用平行式,特定动态场景用网状式。先用简单架构,遇到瓶颈再局部演进。

# 核心行动:用选型框架评估你的下一个多 Agent 项目
your_project = {
    "has_clear_hierarchy": True,    # 根据你的实际情况修改
    "needs_diverse_perspectives": False,
    "tasks_are_independent": True,
    "workflow_is_dynamic": False,
    "needs_consensus": False,
}

print(choose_architecture(your_project))

本章提示词模板

【模板1:架构设计需求分析提示词】
我正在设计一个多 Agent 系统,请帮我分析最合适的架构:

任务描述:{task_description}
团队规模预期:约 {n} 个 Agent
关键约束:{constraints}(如:延迟要求、成本预算、可靠性要求)

请分析:
1. 这个任务的信息流向是单向(上→下)还是双向(互相影响)?
2. 子任务之间有没有严格的先后依赖关系?
3. 任务在执行过程中可能如何变化?
4. 最终需要一个统一的输出还是多个并列输出?

推荐架构(层级式/平行式/网状式),并解释理由。
【模板2:架构审查提示词】
请审查以下多 Agent 系统架构设计:

当前架构类型:{architecture_type}
Agent 列表:{agent_list}
信息流向:{information_flow}
关键业务场景:{scenarios}

检查问题:
1. 是否有单点故障?(如协调器崩溃会怎样)
2. Agent 之间的通信是否会产生循环依赖?
3. 是否有更简单的架构能达到同样效果?
4. 当系统负载增加 10 倍时,哪个环节最可能成为瓶颈?

给出具体的风险点和改进建议。

→ 第03章:Agent间通信协议:消息传递与事件总线