第02章 多Agent系统架构:层级式、平行式与网状式
第02章 多Agent系统架构:层级式、平行式与网状式
“结构决定行为。选择错误的架构,比写出错误的代码更难修复。” —— 软件架构设计原则
多 Agent 系统不是把多个 Agent 随便放在一起就行了。架构决定了 Agent 如何分工、如何通信、如何处理失败。
这一章,我们深入剖析三种核心多 Agent 架构:层级式、平行式和网状式。每种都有适合的场景,选错了会让系统既复杂又低效。
2.1 架构一:层级式(Hierarchical)
# ====================================================
# 层级式架构:有明确的领导者和执行者
# ====================================================
"""
层级式架构结构:
┌─────────────────┐
│ 协调器 Agent │ ← 接收任务、制定计划、分配工作
└────────┬────────┘
│
┌────────┼────────┐
▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ ← 执行具体任务,向协调器汇报
└──────┘ └──────┘ └──────┘
特点:
- 有明确的指挥链
- 协调器负责全局规划
- 工作 Agent 专注执行
- 结果汇总由协调器处理
适合场景:
- 任务有明确的主次关系
- 需要全局一致性的输出
- 工作流有固定的顺序
"""
import asyncio
import json
from dataclasses import dataclass, field
from typing import List, Optional, Callable
from openai import AsyncOpenAI
client = AsyncOpenAI()
@dataclass
class AgentMessage:
"""Agent 间传递的消息"""
sender: str
receiver: str
content: str
message_type: str = "task" # task, result, error, query
metadata: dict = field(default_factory=dict)
@dataclass
class WorkerAgent:
"""层级架构中的工作 Agent"""
name: str
specialty: str # 专业领域
model: str = "gpt-4o-mini"
async def execute(self, task: str) -> AgentMessage:
"""执行分配的任务并返回结果"""
response = await client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": f"你是一名{self.specialty}专家。专注、简洁地完成分配的任务。"
},
{
"role": "user",
"content": task
}
],
max_tokens=600,
)
result = response.choices[0].message.content
print(f" [{self.name}] 完成任务: {task[:40]}...")
return AgentMessage(
sender=self.name,
receiver="coordinator",
content=result,
message_type="result",
)
class HierarchicalCoordinator:
"""
层级式协调器
职责:
1. 接收高层任务
2. 分解为子任务
3. 分配给专工 Agent
4. 收集结果
5. 整合输出
"""
def __init__(self, workers: List[WorkerAgent]):
self.workers = {w.name: w for w in workers}
self.model = "gpt-4o"
async def decompose_task(self, goal: str) -> List[dict]:
"""将高层目标分解为子任务,并分配给对应的工作 Agent"""
worker_descriptions = "\n".join([
f"- {name}: {w.specialty}"
for name, w in self.workers.items()
])
response = await client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": f"""
将以下任务分配给最合适的专家完成:
任务:{goal}
可用专家:
{worker_descriptions}
以JSON格式返回任务分配:
{{"assignments": [{{"agent": "专家名称", "subtask": "具体子任务描述"}}]}}
"""
}],
response_format={"type": "json_object"},
temperature=0.1,
)
data = json.loads(response.choices[0].message.content)
return data.get("assignments", [])
async def synthesize_results(self, goal: str, results: List[AgentMessage]) -> str:
"""将所有工作 Agent 的结果整合成最终输出"""
results_text = "\n\n".join([
f"**{msg.sender} 的贡献:**\n{msg.content}"
for msg in results
])
response = await client.chat.completions.create(
model=self.model,
messages=[{
"role": "user",
"content": f"""
原始目标:{goal}
各专家的贡献:
{results_text}
请将以上内容整合成一个连贯、完整的最终报告。消除重复,保持统一风格。
"""
}],
max_tokens=1000,
)
return response.choices[0].message.content
async def run(self, goal: str) -> str:
"""执行完整的层级式工作流"""
print(f"\n协调器接收任务:{goal}")
# 步骤1:分解任务
print("\n步骤1:分解任务...")
assignments = await self.decompose_task(goal)
print(f" 分解为 {len(assignments)} 个子任务")
# 步骤2:并行分配给工作 Agent
print("\n步骤2:并行分配子任务...")
tasks = []
for assignment in assignments:
agent_name = assignment.get("agent", "")
subtask = assignment.get("subtask", "")
if agent_name in self.workers:
tasks.append(self.workers[agent_name].execute(subtask))
else:
# 没有匹配的 Agent,用第一个可用的
first_worker = list(self.workers.values())[0]
tasks.append(first_worker.execute(subtask))
results = await asyncio.gather(*tasks)
# 步骤3:整合结果
print("\n步骤3:整合所有结果...")
final_output = await self.synthesize_results(goal, list(results))
return final_output
# 创建层级式多 Agent 团队
coordinator = HierarchicalCoordinator(workers=[
WorkerAgent("研究员", "信息搜集与事实核查"),
WorkerAgent("分析师", "数据分析与规律发现"),
WorkerAgent("写手", "内容撰写与表达"),
WorkerAgent("评审", "质量检查与逻辑校验"),
])
result = asyncio.run(coordinator.run(
"为'生产力工具'赛道撰写一份500字的市场机会分析"
))
print(f"\n最终输出(前200字):\n{result[:200]}...")
2.2 架构二:平行式(Parallel)
# ====================================================
# 平行式架构:无领导者,Agent 平等协作
# ====================================================
"""
平行式架构结构:
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘
│ │ │ │
└──────────┴──────────┴──────────┘
│
┌──────▼──────┐
│ 结果聚合器 │ ← 不是协调者,只负责合并结果
└─────────────┘
特点:
- 所有 Agent 平等,同时接收相同的输入
- 每个 Agent 独立产生输出(视角、答案、评估)
- 结果聚合器合并多个视角
- 没有主次之分
适合场景:
- 需要多角度分析同一问题
- 需要通过"多数表决"提高准确率
- 希望获得多样化的创意输出
- 容错性要求高(一个失败不影响整体)
"""
class ParallelAgentSystem:
"""
平行式多 Agent 系统
典型用例:
- 多个 AI 同时回答同一问题,再选最优
- 多个 AI 分别写创意方案,再汇总
- 多个 AI 对同一内容打分,取平均
"""
def __init__(self, agent_configs: List[dict], model: str = "gpt-4o-mini"):
"""
agent_configs: 每个 Agent 的配置,包含 name 和 system_prompt
"""
self.agents = agent_configs
self.model = model
async def _single_agent_response(
self,
agent_config: dict,
user_input: str
) -> dict:
"""单个 Agent 产生响应"""
response = await client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": agent_config["system_prompt"]},
{"role": "user", "content": user_input}
],
max_tokens=500,
)
return {
"agent": agent_config["name"],
"perspective": agent_config.get("perspective", "通用"),
"response": response.choices[0].message.content
}
async def run_parallel(self, query: str) -> List[dict]:
"""所有 Agent 同时处理同一个查询"""
print(f"\n{len(self.agents)} 个 Agent 同时分析:{query}\n")
# 真正的并行执行
responses = await asyncio.gather(*[
self._single_agent_response(agent, query)
for agent in self.agents
])
return list(responses)
async def aggregate_by_voting(self, responses: List[dict]) -> str:
"""聚合策略:让 LLM 对多个回答进行投票/综合"""
perspectives_text = "\n\n".join([
f"**{r['agent']}({r['perspective']}视角):**\n{r['response']}"
for r in responses
])
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": f"""
以下是 {len(responses)} 位专家从不同视角给出的分析:
{perspectives_text}
请综合所有视角,给出一个平衡、全面的最终结论。
指出各视角的共识点和分歧点。
"""
}],
max_tokens=800,
)
return response.choices[0].message.content
async def aggregate_best_of_n(self, query: str, responses: List[dict]) -> str:
"""聚合策略:选出最优回答(Best-of-N)"""
responses_text = "\n\n".join([
f"方案{i+1}({r['agent']}):\n{r['response']}"
for i, r in enumerate(responses)
])
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": f"""
问题:{query}
以下是 {len(responses)} 个不同的回答:
{responses_text}
请选出最准确、最有帮助的一个回答,并说明选择理由。
只输出选中的方案编号和理由。
"""
}],
max_tokens=300,
)
return response.choices[0].message.content
# 演示:用多角度视角分析一个商业决策
multi_perspective_system = ParallelAgentSystem(
agent_configs=[
{
"name": "乐观派",
"perspective": "机会与增长",
"system_prompt": "你是一位乐观的创业者,擅长发现机会和增长潜力。分析问题时关注正面因素和上升空间。"
},
{
"name": "风险专家",
"perspective": "风险与挑战",
"system_prompt": "你是一位风险管理专家,擅长识别潜在风险和挑战。分析问题时关注负面因素和缓解策略。"
},
{
"name": "用户代言人",
"perspective": "用户体验",
"system_prompt": "你是用户体验专家,代表最终用户的需求和感受。分析问题时关注用户价值和使用场景。"
},
{
"name": "财务分析师",
"perspective": "财务可行性",
"system_prompt": "你是财务分析师,关注成本、收益和可持续性。分析问题时关注数字和财务逻辑。"
},
]
)
async def demo_parallel():
query = "我们应该把产品的免费套餐彻底取消,全部转为付费吗?"
responses = await multi_perspective_system.run_parallel(query)
print("各角度观点收集完毕,开始综合分析...")
final = await multi_perspective_system.aggregate_by_voting(responses)
print("\n综合结论:")
print(final[:400] + "...")
asyncio.run(demo_parallel())
2.3 架构三:网状式(Mesh)
# ====================================================
# 网状式架构:Agent 互相通信,动态协作
# ====================================================
"""
网状式架构结构:
┌──────┐ ←→ ┌──────┐
│Agent1│ │Agent2│
└──┬───┘ └──┬───┘
│ ↖ ↗ │
│ ┌──────┐ │
└──→ │Agent3│ ←─┘
└──────┘
特点:
- 任意 Agent 可以互相通信
- 没有固定的流向,根据需要动态路由
- Agent 可以主动发起协作请求
- 更灵活,但更难控制
适合场景:
- 任务高度动态,难以预先规划流程
- Agent 需要基于其他 Agent 的中间结果调整自己的行为
- 需要真正的"涌现"行为(自组织)
"""
import asyncio
from typing import Dict, Callable
class MeshAgent:
"""网状架构中的 Agent,可以互相通信"""
def __init__(self, name: str, specialty: str):
self.name = name
self.specialty = specialty
self.peers: Dict[str, 'MeshAgent'] = {} # 其他 Agent 的引用
self.message_queue: asyncio.Queue = asyncio.Queue()
self.knowledge_base: Dict[str, str] = {} # 从其他 Agent 学到的知识
def register_peer(self, agent: 'MeshAgent'):
"""注册一个可以通信的同伴 Agent"""
self.peers[agent.name] = agent
async def send_message(self, target_name: str, content: str, msg_type: str = "query"):
"""向另一个 Agent 发送消息"""
if target_name in self.peers:
target = self.peers[target_name]
await target.message_queue.put({
"from": self.name,
"content": content,
"type": msg_type,
})
print(f" [{self.name}] → [{target_name}]: {content[:50]}...")
async def ask_peer(self, target_name: str, question: str) -> str:
"""向同伴提问并等待回答"""
if target_name not in self.peers:
return f"找不到 Agent: {target_name}"
target = self.peers[target_name]
# 发送问题
await target.message_queue.put({
"from": self.name,
"content": question,
"type": "query",
})
# 使用 LLM 模拟目标 Agent 回答
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": f"你是 {target_name},专长是{target.specialty}。简洁回答同事的问题。"
},
{"role": "user", "content": question}
],
max_tokens=300,
)
answer = response.choices[0].message.content
self.knowledge_base[f"来自{target_name}"] = answer
return answer
async def work(self, task: str) -> str:
"""执行任务,必要时主动向同伴请教"""
# 分析任务,判断是否需要请教同伴
analysis_response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": f"你是 {self.name},专长是{self.specialty}。"
},
{
"role": "user",
"content": f"""
你的任务:{task}
你的同伴列表:{list(self.peers.keys())}
判断:
1. 你能独立完成这个任务吗?
2. 如果需要帮助,应该问哪个同伴什么问题?
回答格式:
独立完成: 是/否
需要咨询: 同伴名字(如不需要则为"无")
咨询问题: 具体问题(如不需要则为"无")
"""
}
],
max_tokens=200,
)
analysis = analysis_response.choices[0].message.content
# 解析是否需要请教同伴(简化版)
if "是" not in analysis.split("独立完成:")[1][:3] if "独立完成:" in analysis else True:
# 需要请教同伴,找第一个可用的同伴
if self.peers:
peer_name = list(self.peers.keys())[0]
sub_question = f"关于'{task}',你有什么专业见解?"
peer_insight = await self.ask_peer(peer_name, sub_question)
# 整合同伴的输入
final_response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"你是 {self.name},专长是{self.specialty}。"},
{"role": "user", "content": f"任务:{task}\n\n{peer_name}的补充意见:{peer_insight}\n\n请给出你的最终回答。"}
],
max_tokens=400,
)
return final_response.choices[0].message.content
# 独立完成
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"你是 {self.name},专长是{self.specialty}。"},
{"role": "user", "content": task}
],
max_tokens=400,
)
return response.choices[0].message.content
# 网状协作演示
async def mesh_demo():
# 创建 Agent
tech = MeshAgent("技术专家", "软件架构与技术选型")
biz = MeshAgent("商业专家", "商业模式与市场策略")
ux = MeshAgent("UX专家", "用户体验与产品设计")
# 建立双向连接(网状)
for a in [tech, biz, ux]:
for b in [tech, biz, ux]:
if a.name != b.name:
a.register_peer(b)
# 给每个 Agent 分配任务,它们会自主决定是否需要协作
task = "评估是否应该把我们的产品从Web迁移到移动端优先"
print(f"网状协作任务:{task}\n")
results = await asyncio.gather(
tech.work(task),
biz.work(task),
ux.work(task),
)
print("\n各 Agent 的独立结论:")
for agent, result in zip([tech, biz, ux], results):
print(f"\n[{agent.name}]:")
print(result[:200] + "...")
asyncio.run(mesh_demo())
2.4 架构选型指南
# ====================================================
# 架构选型决策树
# ====================================================
def choose_architecture(requirements: dict) -> str:
"""
根据需求特征推荐最合适的多 Agent 架构
requirements 字段:
- has_clear_hierarchy: 任务有明确的主次关系
- needs_diverse_perspectives: 需要多角度观点
- tasks_are_independent: 子任务相互独立
- workflow_is_dynamic: 工作流程高度动态
- needs_consensus: 需要达成共识
- priority: latency/quality/flexibility/cost
"""
h = requirements.get("has_clear_hierarchy", False)
d = requirements.get("needs_diverse_perspectives", False)
i = requirements.get("tasks_are_independent", False)
dyn = requirements.get("workflow_is_dynamic", False)
c = requirements.get("needs_consensus", False)
reasoning = []
if h and not dyn:
reasoning.append("有明确主次关系 → 倾向层级式")
if d and i:
reasoning.append("需要多视角且子任务独立 → 倾向平行式")
if dyn and not h:
reasoning.append("工作流动态变化 → 倾向网状式")
if c:
reasoning.append("需要共识 → 平行式(投票)或网状式(讨论)")
# 决策
if h and not dyn:
architecture = "层级式"
reason = "任务有清晰的上下游关系,协调器可以有效管控全局"
elif d or (i and not h):
architecture = "平行式"
reason = "子任务独立,多视角可以提升输出质量"
elif dyn:
architecture = "网状式"
reason = "任务边界模糊,Agent 需要动态协商"
else:
architecture = "层级式"
reason = "默认推荐,最易于管理和调试"
return f"""
推荐架构:{architecture}
选择原因:{reason}
推理过程:
{chr(10).join(f' - {r}' for r in reasoning) if reasoning else ' - 使用默认推荐'}
实施建议:
{
'从一个协调器 Agent 开始,逐步添加工作 Agent'
if architecture == '层级式' else
'设计标准的任务接口,让所有并行 Agent 接受相同输入格式'
if architecture == '平行式' else
'从3个 Agent 的小网络开始,验证通信协议后再扩展'
}
"""
# 测试几个场景
print("场景1:内容生产流水线")
print(choose_architecture({
"has_clear_hierarchy": True,
"tasks_are_independent": False,
"workflow_is_dynamic": False,
}))
print("\n场景2:创意头脑风暴")
print(choose_architecture({
"needs_diverse_perspectives": True,
"tasks_are_independent": True,
"needs_consensus": True,
}))
print("\n场景3:智能客服系统")
print(choose_architecture({
"workflow_is_dynamic": True,
"has_clear_hierarchy": False,
"needs_diverse_perspectives": False,
}))
本章小结
-
三种架构各有适用场景:层级式适合有明确流程的工作流;平行式适合需要多视角或高容错的场景;网状式适合高度动态的复杂协作。不存在"最好的架构",只有"最合适的架构"。
-
层级式是最容易上手的架构:有明确的协调器角色,工作流可预测,易于调试。90% 的业务场景从层级式开始就足够了。
-
平行式是提升质量的利器:让多个 Agent 独立处理同一问题再合并,就像"多数表决"一样能有效降低单点错误,在需要高准确率的场景非常有价值。
-
网状式是最灵活但最复杂的:Agent 之间的动态通信能产生"涌现"行为,但也难以预测和调试。建议只有在其他架构明显不够用时才采用。
-
架构可以混合:真实系统往往是混合架构——顶层用层级式,某些子系统用平行式,特定动态场景用网状式。先用简单架构,遇到瓶颈再局部演进。
# 核心行动:用选型框架评估你的下一个多 Agent 项目
your_project = {
"has_clear_hierarchy": True, # 根据你的实际情况修改
"needs_diverse_perspectives": False,
"tasks_are_independent": True,
"workflow_is_dynamic": False,
"needs_consensus": False,
}
print(choose_architecture(your_project))
本章提示词模板
【模板1:架构设计需求分析提示词】
我正在设计一个多 Agent 系统,请帮我分析最合适的架构:
任务描述:{task_description}
团队规模预期:约 {n} 个 Agent
关键约束:{constraints}(如:延迟要求、成本预算、可靠性要求)
请分析:
1. 这个任务的信息流向是单向(上→下)还是双向(互相影响)?
2. 子任务之间有没有严格的先后依赖关系?
3. 任务在执行过程中可能如何变化?
4. 最终需要一个统一的输出还是多个并列输出?
推荐架构(层级式/平行式/网状式),并解释理由。
【模板2:架构审查提示词】
请审查以下多 Agent 系统架构设计:
当前架构类型:{architecture_type}
Agent 列表:{agent_list}
信息流向:{information_flow}
关键业务场景:{scenarios}
检查问题:
1. 是否有单点故障?(如协调器崩溃会怎样)
2. Agent 之间的通信是否会产生循环依赖?
3. 是否有更简单的架构能达到同样效果?
4. 当系统负载增加 10 倍时,哪个环节最可能成为瓶颈?
给出具体的风险点和改进建议。