第03章 Agent间通信协议:消息传递与事件总线
第03章 Agent间通信协议:消息传递与事件总线
“沟通不畅,是所有协作失败的根本原因。无论是人类团队还是 AI 团队。” —— 组织管理研究
多 Agent 系统的本质,是 Agent 之间的通信。如果通信设计得不好,即使是最精心设计的架构也会土崩瓦解。
这一章深入讲解 Agent 间的通信协议:消息格式、同步 vs 异步、发布订阅模式,以及如何保证通信可靠性。
3.1 消息格式:结构化通信的基础
# ====================================================
# 标准化的 Agent 消息格式
# ====================================================
import asyncio
import uuid
import time
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from enum import Enum
from openai import AsyncOpenAI
client = AsyncOpenAI()
class MessageType(Enum):
"""消息类型枚举"""
TASK = "task" # 分配任务
RESULT = "result" # 返回结果
QUERY = "query" # 请求信息
RESPONSE = "response" # 回答查询
STATUS = "status" # 状态更新
ERROR = "error" # 错误报告
BROADCAST = "broadcast" # 广播通知(发给所有人)
ACK = "ack" # 确认收到
class MessagePriority(Enum):
"""消息优先级"""
URGENT = 0 # 紧急(立即处理)
HIGH = 1 # 高优先级
NORMAL = 2 # 普通
LOW = 3 # 低优先级(可延迟)
@dataclass
class AgentMessage:
"""
标准化 Agent 消息格式
好的消息格式应该:
1. 有唯一ID(用于追踪和确认)
2. 有明确的发送者和接收者
3. 有类型(区分任务、结果、查询等)
4. 有优先级(影响处理顺序)
5. 有时间戳(用于排序和超时检测)
6. 有关联ID(把请求和回复关联起来)
7. 有过期时间(避免处理过时消息)
"""
sender: str
receiver: str # 特定接收者,或 "broadcast" 表示广播
content: Any # 消息内容(字符串、字典等)
message_type: MessageType = MessageType.TASK
priority: MessagePriority = MessagePriority.NORMAL
# 自动生成的字段
message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: float = field(default_factory=time.time)
correlation_id: Optional[str] = None # 关联到哪条消息(用于回复)
expires_at: Optional[float] = None # 消息过期时间
# 元数据
metadata: Dict[str, Any] = field(default_factory=dict)
retry_count: int = 0
def is_expired(self) -> bool:
"""检查消息是否已过期"""
if self.expires_at is None:
return False
return time.time() > self.expires_at
def create_reply(self, content: Any, msg_type: MessageType = MessageType.RESPONSE) -> 'AgentMessage':
"""基于此消息创建回复消息"""
return AgentMessage(
sender=self.receiver, # 回复者变成发送者
receiver=self.sender, # 原发送者变成接收者
content=content,
message_type=msg_type,
correlation_id=self.message_id, # 关联到原消息
)
def __str__(self):
return (f"[{self.message_id}] {self.sender}→{self.receiver} "
f"({self.message_type.value}): {str(self.content)[:50]}")
# 演示消息创建和回复链
def demo_message_chain():
# 1. 协调器发布任务
task_msg = AgentMessage(
sender="coordinator",
receiver="researcher",
content={"task": "搜索最新的LLM基准测试结果", "deadline": "10分钟内"},
message_type=MessageType.TASK,
priority=MessagePriority.HIGH,
expires_at=time.time() + 600, # 10分钟过期
)
print(f"发送: {task_msg}")
# 2. 研究员回复结果
result_msg = task_msg.create_reply(
content={"findings": "GPT-4o在MMLU得分92.0%..."},
msg_type=MessageType.RESULT,
)
print(f"回复: {result_msg}")
# 3. 研究员中途发出一个查询
query_msg = AgentMessage(
sender="researcher",
receiver="coordinator",
content="需要搜索中文还是英文来源?",
message_type=MessageType.QUERY,
correlation_id=task_msg.message_id, # 关联到原任务
)
print(f"查询: {query_msg}")
demo_message_chain()
3.2 同步 vs 异步通信
# ====================================================
# 同步通信 vs 异步通信的实现对比
# ====================================================
"""
同步通信:发送方等待接收方的回复
发送 → 等待 → 收到回复 → 继续
异步通信:发送方不等待,继续做其他事
发送 → 继续做其他事 → 收到回复(稍后)→ 处理回复
何时用同步:
- 必须得到回复才能继续(如:需要另一个 Agent 的决策)
- 响应时间短(<2秒)
何时用异步:
- 可以继续做其他工作(不需要立刻等结果)
- 响应时间长(>5秒)
- 并发多个请求
"""
class SyncAgentChannel:
"""同步通信通道:等待回复"""
def __init__(self):
self.handlers: Dict[str, callable] = {}
def register_handler(self, agent_name: str, handler: callable):
self.handlers[agent_name] = handler
async def send_and_wait(
self,
message: AgentMessage,
timeout: float = 30.0
) -> Optional[AgentMessage]:
"""发送消息并等待回复(同步)"""
if message.receiver not in self.handlers:
raise ValueError(f"找不到 Agent: {message.receiver}")
try:
# 直接调用接收方的处理函数,等待结果
result = await asyncio.wait_for(
self.handlers[message.receiver](message),
timeout=timeout
)
return result
except asyncio.TimeoutError:
print(f"⏰ 超时:{message.receiver} 在 {timeout}秒内没有回复")
return None
class AsyncAgentChannel:
"""
异步通信通道:不等待回复
使用消息队列实现
"""
def __init__(self):
self.queues: Dict[str, asyncio.Queue] = {}
self.reply_futures: Dict[str, asyncio.Future] = {}
def create_mailbox(self, agent_name: str):
"""为 Agent 创建消息队列(邮箱)"""
self.queues[agent_name] = asyncio.Queue()
async def send(self, message: AgentMessage):
"""异步发送消息(立即返回,不等待处理)"""
if message.receiver not in self.queues:
raise ValueError(f"找不到邮箱: {message.receiver}")
await self.queues[message.receiver].put(message)
print(f"📬 投递: {message}")
async def receive(self, agent_name: str, timeout: float = None) -> Optional[AgentMessage]:
"""Agent 从自己的邮箱取消息"""
if agent_name not in self.queues:
return None
try:
if timeout:
msg = await asyncio.wait_for(
self.queues[agent_name].get(),
timeout=timeout
)
else:
msg = await self.queues[agent_name].get()
# 检查过期
if msg.is_expired():
print(f"🗑️ 丢弃过期消息: {msg}")
return None
return msg
except asyncio.TimeoutError:
return None
async def request_reply(
self,
message: AgentMessage,
timeout: float = 30.0
) -> Optional[AgentMessage]:
"""
发送消息并等待特定的回复(异步请求-回复模式)
通过 message_id 匹配请求和回复
"""
# 注册一个 Future,等待对应的回复
future: asyncio.Future = asyncio.get_event_loop().create_future()
self.reply_futures[message.message_id] = future
# 发送消息
await self.send(message)
try:
# 等待回复(有超时)
reply = await asyncio.wait_for(future, timeout=timeout)
return reply
except asyncio.TimeoutError:
del self.reply_futures[message.message_id]
return None
async def deliver_reply(self, reply: AgentMessage):
"""收到回复时,触发对应的 Future"""
if reply.correlation_id in self.reply_futures:
future = self.reply_futures.pop(reply.correlation_id)
if not future.done():
future.set_result(reply)
# 演示异步通信
async def demo_async_communication():
channel = AsyncAgentChannel()
channel.create_mailbox("analyst")
channel.create_mailbox("coordinator")
print("=== 异步通信演示 ===\n")
# 同时发送多个请求(不等待)
tasks_to_send = [
AgentMessage("coordinator", "analyst", f"分析数据集{i}", MessageType.TASK)
for i in range(3)
]
print("协调器同时发送3个任务(不等待)...")
for task in tasks_to_send:
await channel.send(task)
print("\n分析师处理收到的任务(模拟)...")
for _ in range(3):
msg = await channel.receive("analyst", timeout=1.0)
if msg:
print(f" 处理: {msg}")
asyncio.run(demo_async_communication())
3.3 事件总线:发布订阅模式
# ====================================================
# 事件总线:解耦 Agent 间的通信
# ====================================================
"""
事件总线解决的问题:
直接通信需要知道对方的名字(紧耦合)。
事件总线让 Agent 只关注"发生了什么事",不需要知道谁处理(松耦合)。
发布者 → 事件总线 → [订阅者1, 订阅者2, 订阅者3]
好处:
- 增加/删除 Agent 不影响其他 Agent
- 一个事件可以触发多个 Agent 的响应
- 事件历史可以记录和回放
"""
from typing import Callable, Set
import json
@dataclass
class Event:
"""系统事件"""
event_type: str # 事件类型(如 "task_completed", "error_occurred")
publisher: str # 发布者
data: Any # 事件数据
event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: float = field(default_factory=time.time)
class EventBus:
"""
多 Agent 系统的事件总线
支持:
- 主题订阅(订阅特定类型的事件)
- 通配符订阅(订阅所有事件)
- 同步和异步处理
- 事件历史记录
"""
def __init__(self, history_limit: int = 100):
self.subscribers: Dict[str, List[Callable]] = {} # 主题 → 处理函数列表
self.wildcard_subscribers: List[Callable] = [] # 订阅所有事件
self.history: List[Event] = [] # 事件历史
self.history_limit = history_limit
self._publish_count = 0
def subscribe(self, event_type: str, handler: Callable):
"""订阅特定类型的事件"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
print(f" 订阅: {handler.__qualname__} 订阅了 '{event_type}'")
def subscribe_all(self, handler: Callable):
"""订阅所有事件(监控用)"""
self.wildcard_subscribers.append(handler)
async def publish(self, event: Event):
"""发布事件,触发所有订阅者"""
# 记录历史
self.history.append(event)
if len(self.history) > self.history_limit:
self.history.pop(0)
self._publish_count += 1
print(f"\n📢 事件发布: [{event.event_type}] 来自 {event.publisher}")
# 通知特定订阅者
handlers = self.subscribers.get(event.event_type, [])
# 通知通配符订阅者
all_handlers = handlers + self.wildcard_subscribers
if not all_handlers:
print(f" (无人订阅 '{event.event_type}' 事件)")
return
# 并行通知所有订阅者
await asyncio.gather(*[
self._safe_handle(handler, event)
for handler in all_handlers
])
async def _safe_handle(self, handler: Callable, event: Event):
"""安全处理事件(捕获异常,不影响其他处理者)"""
try:
if asyncio.iscoroutinefunction(handler):
await handler(event)
else:
handler(event)
except Exception as e:
print(f" ⚠️ 处理器 {handler.__name__} 出错: {e}")
def get_history(self, event_type: str = None, limit: int = 10) -> List[Event]:
"""获取事件历史"""
events = self.history
if event_type:
events = [e for e in events if e.event_type == event_type]
return events[-limit:]
@property
def stats(self) -> dict:
return {
"总事件数": self._publish_count,
"订阅主题数": len(self.subscribers),
"通配符订阅者数": len(self.wildcard_subscribers),
"历史记录数": len(self.history),
}
# 演示事件总线
async def demo_event_bus():
bus = EventBus()
# 不同 Agent 订阅不同类型的事件
async def quality_checker(event: Event):
print(f" [质检Agent] 收到新报告,开始质量检查: {event.data.get('title', '')}")
async def notification_agent(event: Event):
print(f" [通知Agent] 任务完成,发送通知给用户")
async def logger_agent(event: Event):
print(f" [日志Agent] 记录事件: {event.event_type} at {event.timestamp:.0f}")
# 订阅
bus.subscribe("report_generated", quality_checker)
bus.subscribe("report_generated", notification_agent)
bus.subscribe_all(logger_agent) # 日志 Agent 订阅所有事件
print("=== 事件总线演示 ===\n")
# 某个 Agent 完成了一份报告,发布事件
await bus.publish(Event(
event_type="report_generated",
publisher="writer_agent",
data={"title": "2026年AI市场分析报告", "pages": 15},
))
print("\n事件总线统计:")
for k, v in bus.stats.items():
print(f" {k}: {v}")
asyncio.run(demo_event_bus())
本章小结
-
标准化消息格式是多 Agent 通信的地基:每条消息应该有唯一ID、时间戳、类型、优先级和过期时间。这些元数据是追踪、调试和可靠性的基础,不是可选的。
-
同步通信适合简单依赖,异步通信适合复杂协作:如果 A 必须等 B 的结果才能继续,用同步;如果 A 可以继续做其他事,用异步。混合使用能兼顾效率和一致性。
-
事件总线是松耦合的最佳实践:Agent 只需要知道"发生了什么事",不需要知道"谁在处理"。这让系统更容易扩展(增加新 Agent 不改变现有代码)。
-
消息过期机制防止系统被旧请求淹没:一条在 5 分钟前发出的任务消息,到达时可能已经没有意义。设置过期时间,让系统自动丢弃过时消息。
-
通信可靠性需要主动设计:确认机制(ACK)、重试策略、超时检测——这些不会自动出现,必须在设计阶段就考虑进去。
# 核心行动:为你的多 Agent 系统搭建基础通信层
async def setup_basic_communication():
"""5分钟搭建多 Agent 通信基础设施"""
# 1. 创建事件总线
bus = EventBus()
# 2. 创建异步通道
channel = AsyncAgentChannel()
# 3. 为每个 Agent 注册邮箱
for agent_name in ["coordinator", "researcher", "analyst", "writer"]:
channel.create_mailbox(agent_name)
# 4. 设置关键事件的订阅
async def on_task_complete(event: Event):
print(f"任务完成: {event.data}")
bus.subscribe("task_completed", on_task_complete)
print("✅ 通信基础设施已就绪")
return bus, channel
asyncio.run(setup_basic_communication())
本章提示词模板
【模板1:通信协议设计提示词】
请为以下多 Agent 系统设计通信协议:
Agent 列表:{agent_list}
主要工作流:{workflow_description}
性能要求:{latency_requirements}
请设计:
1. 消息类型列表(这个系统需要哪几种消息类型)
2. 关键消息的格式(最重要的3种消息的字段设计)
3. 通信模式选择(哪些交互应该同步,哪些应该异步)
4. 需要哪些事件总线主题(Agent 应该广播哪些系统事件)
5. 超时和重试策略(各类消息的合理超时时间)
【模板2:通信问题诊断提示词】
我的多 Agent 系统出现了以下通信问题:
症状:{symptom}(如:消息丢失、死锁、响应超慢)
系统架构:{architecture}
消息量:每秒约 {msg_per_second} 条消息
可能原因分析:
1. 这是同步/异步使用不当的问题吗?
2. 是否存在消息队列积压?
3. 是否有循环依赖导致死锁?
4. 消息格式是否有问题(如缺少过期时间)?
请给出具体的排查步骤和修复建议。