第03章 Agent间通信协议:消息传递与事件总线

第03章 Agent间通信协议:消息传递与事件总线

“沟通不畅,是所有协作失败的根本原因。无论是人类团队还是 AI 团队。” —— 组织管理研究


多 Agent 系统的本质,是 Agent 之间的通信。如果通信设计得不好,即使是最精心设计的架构也会土崩瓦解。

这一章深入讲解 Agent 间的通信协议:消息格式、同步 vs 异步、发布订阅模式,以及如何保证通信可靠性。


3.1 消息格式:结构化通信的基础

# ====================================================
# 标准化的 Agent 消息格式
# ====================================================

import asyncio
import uuid
import time
from dataclasses import dataclass, field
from typing import Any, Optional, List, Dict
from enum import Enum
from openai import AsyncOpenAI

client = AsyncOpenAI()


class MessageType(Enum):
    """消息类型枚举"""
    TASK = "task"           # 分配任务
    RESULT = "result"       # 返回结果
    QUERY = "query"         # 请求信息
    RESPONSE = "response"   # 回答查询
    STATUS = "status"       # 状态更新
    ERROR = "error"         # 错误报告
    BROADCAST = "broadcast" # 广播通知(发给所有人)
    ACK = "ack"             # 确认收到


class MessagePriority(Enum):
    """消息优先级"""
    URGENT = 0      # 紧急(立即处理)
    HIGH = 1        # 高优先级
    NORMAL = 2      # 普通
    LOW = 3         # 低优先级(可延迟)


@dataclass
class AgentMessage:
    """
    标准化 Agent 消息格式
    
    好的消息格式应该:
    1. 有唯一ID(用于追踪和确认)
    2. 有明确的发送者和接收者
    3. 有类型(区分任务、结果、查询等)
    4. 有优先级(影响处理顺序)
    5. 有时间戳(用于排序和超时检测)
    6. 有关联ID(把请求和回复关联起来)
    7. 有过期时间(避免处理过时消息)
    """
    sender: str
    receiver: str              # 特定接收者,或 "broadcast" 表示广播
    content: Any               # 消息内容(字符串、字典等)
    message_type: MessageType = MessageType.TASK
    priority: MessagePriority = MessagePriority.NORMAL
    
    # 自动生成的字段
    message_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    timestamp: float = field(default_factory=time.time)
    correlation_id: Optional[str] = None  # 关联到哪条消息(用于回复)
    expires_at: Optional[float] = None    # 消息过期时间
    
    # 元数据
    metadata: Dict[str, Any] = field(default_factory=dict)
    retry_count: int = 0
    
    def is_expired(self) -> bool:
        """检查消息是否已过期"""
        if self.expires_at is None:
            return False
        return time.time() > self.expires_at
    
    def create_reply(self, content: Any, msg_type: MessageType = MessageType.RESPONSE) -> 'AgentMessage':
        """基于此消息创建回复消息"""
        return AgentMessage(
            sender=self.receiver,      # 回复者变成发送者
            receiver=self.sender,      # 原发送者变成接收者
            content=content,
            message_type=msg_type,
            correlation_id=self.message_id,  # 关联到原消息
        )
    
    def __str__(self):
        return (f"[{self.message_id}] {self.sender}→{self.receiver} "
                f"({self.message_type.value}): {str(self.content)[:50]}")


# 演示消息创建和回复链
def demo_message_chain():
    # 1. 协调器发布任务
    task_msg = AgentMessage(
        sender="coordinator",
        receiver="researcher",
        content={"task": "搜索最新的LLM基准测试结果", "deadline": "10分钟内"},
        message_type=MessageType.TASK,
        priority=MessagePriority.HIGH,
        expires_at=time.time() + 600,  # 10分钟过期
    )
    print(f"发送: {task_msg}")
    
    # 2. 研究员回复结果
    result_msg = task_msg.create_reply(
        content={"findings": "GPT-4o在MMLU得分92.0%..."},
        msg_type=MessageType.RESULT,
    )
    print(f"回复: {result_msg}")
    
    # 3. 研究员中途发出一个查询
    query_msg = AgentMessage(
        sender="researcher",
        receiver="coordinator",
        content="需要搜索中文还是英文来源?",
        message_type=MessageType.QUERY,
        correlation_id=task_msg.message_id,  # 关联到原任务
    )
    print(f"查询: {query_msg}")


demo_message_chain()

3.2 同步 vs 异步通信

# ====================================================
# 同步通信 vs 异步通信的实现对比
# ====================================================

"""
同步通信:发送方等待接收方的回复
  发送 → 等待 → 收到回复 → 继续

异步通信:发送方不等待,继续做其他事
  发送 → 继续做其他事 → 收到回复(稍后)→ 处理回复

何时用同步:
- 必须得到回复才能继续(如:需要另一个 Agent 的决策)
- 响应时间短(<2秒)

何时用异步:
- 可以继续做其他工作(不需要立刻等结果)
- 响应时间长(>5秒)
- 并发多个请求
"""


class SyncAgentChannel:
    """同步通信通道:等待回复"""
    
    def __init__(self):
        self.handlers: Dict[str, callable] = {}
    
    def register_handler(self, agent_name: str, handler: callable):
        self.handlers[agent_name] = handler
    
    async def send_and_wait(
        self, 
        message: AgentMessage, 
        timeout: float = 30.0
    ) -> Optional[AgentMessage]:
        """发送消息并等待回复(同步)"""
        
        if message.receiver not in self.handlers:
            raise ValueError(f"找不到 Agent: {message.receiver}")
        
        try:
            # 直接调用接收方的处理函数,等待结果
            result = await asyncio.wait_for(
                self.handlers[message.receiver](message),
                timeout=timeout
            )
            return result
        except asyncio.TimeoutError:
            print(f"⏰ 超时:{message.receiver} 在 {timeout}秒内没有回复")
            return None


class AsyncAgentChannel:
    """
    异步通信通道:不等待回复
    使用消息队列实现
    """
    
    def __init__(self):
        self.queues: Dict[str, asyncio.Queue] = {}
        self.reply_futures: Dict[str, asyncio.Future] = {}
    
    def create_mailbox(self, agent_name: str):
        """为 Agent 创建消息队列(邮箱)"""
        self.queues[agent_name] = asyncio.Queue()
    
    async def send(self, message: AgentMessage):
        """异步发送消息(立即返回,不等待处理)"""
        if message.receiver not in self.queues:
            raise ValueError(f"找不到邮箱: {message.receiver}")
        
        await self.queues[message.receiver].put(message)
        print(f"📬 投递: {message}")
    
    async def receive(self, agent_name: str, timeout: float = None) -> Optional[AgentMessage]:
        """Agent 从自己的邮箱取消息"""
        if agent_name not in self.queues:
            return None
        
        try:
            if timeout:
                msg = await asyncio.wait_for(
                    self.queues[agent_name].get(),
                    timeout=timeout
                )
            else:
                msg = await self.queues[agent_name].get()
            
            # 检查过期
            if msg.is_expired():
                print(f"🗑️ 丢弃过期消息: {msg}")
                return None
            
            return msg
        except asyncio.TimeoutError:
            return None
    
    async def request_reply(
        self, 
        message: AgentMessage, 
        timeout: float = 30.0
    ) -> Optional[AgentMessage]:
        """
        发送消息并等待特定的回复(异步请求-回复模式)
        
        通过 message_id 匹配请求和回复
        """
        # 注册一个 Future,等待对应的回复
        future: asyncio.Future = asyncio.get_event_loop().create_future()
        self.reply_futures[message.message_id] = future
        
        # 发送消息
        await self.send(message)
        
        try:
            # 等待回复(有超时)
            reply = await asyncio.wait_for(future, timeout=timeout)
            return reply
        except asyncio.TimeoutError:
            del self.reply_futures[message.message_id]
            return None
    
    async def deliver_reply(self, reply: AgentMessage):
        """收到回复时,触发对应的 Future"""
        if reply.correlation_id in self.reply_futures:
            future = self.reply_futures.pop(reply.correlation_id)
            if not future.done():
                future.set_result(reply)


# 演示异步通信
async def demo_async_communication():
    channel = AsyncAgentChannel()
    channel.create_mailbox("analyst")
    channel.create_mailbox("coordinator")
    
    print("=== 异步通信演示 ===\n")
    
    # 同时发送多个请求(不等待)
    tasks_to_send = [
        AgentMessage("coordinator", "analyst", f"分析数据集{i}", MessageType.TASK)
        for i in range(3)
    ]
    
    print("协调器同时发送3个任务(不等待)...")
    for task in tasks_to_send:
        await channel.send(task)
    
    print("\n分析师处理收到的任务(模拟)...")
    for _ in range(3):
        msg = await channel.receive("analyst", timeout=1.0)
        if msg:
            print(f"  处理: {msg}")


asyncio.run(demo_async_communication())

3.3 事件总线:发布订阅模式

# ====================================================
# 事件总线:解耦 Agent 间的通信
# ====================================================

"""
事件总线解决的问题:
直接通信需要知道对方的名字(紧耦合)。
事件总线让 Agent 只关注"发生了什么事",不需要知道谁处理(松耦合)。

发布者 → 事件总线 → [订阅者1, 订阅者2, 订阅者3]

好处:
- 增加/删除 Agent 不影响其他 Agent
- 一个事件可以触发多个 Agent 的响应
- 事件历史可以记录和回放
"""

from typing import Callable, Set
import json


@dataclass
class Event:
    """系统事件"""
    event_type: str          # 事件类型(如 "task_completed", "error_occurred")
    publisher: str           # 发布者
    data: Any               # 事件数据
    event_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    timestamp: float = field(default_factory=time.time)


class EventBus:
    """
    多 Agent 系统的事件总线
    
    支持:
    - 主题订阅(订阅特定类型的事件)
    - 通配符订阅(订阅所有事件)
    - 同步和异步处理
    - 事件历史记录
    """
    
    def __init__(self, history_limit: int = 100):
        self.subscribers: Dict[str, List[Callable]] = {}  # 主题 → 处理函数列表
        self.wildcard_subscribers: List[Callable] = []    # 订阅所有事件
        self.history: List[Event] = []                    # 事件历史
        self.history_limit = history_limit
        self._publish_count = 0
    
    def subscribe(self, event_type: str, handler: Callable):
        """订阅特定类型的事件"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
        print(f"  订阅: {handler.__qualname__} 订阅了 '{event_type}'")
    
    def subscribe_all(self, handler: Callable):
        """订阅所有事件(监控用)"""
        self.wildcard_subscribers.append(handler)
    
    async def publish(self, event: Event):
        """发布事件,触发所有订阅者"""
        
        # 记录历史
        self.history.append(event)
        if len(self.history) > self.history_limit:
            self.history.pop(0)
        
        self._publish_count += 1
        print(f"\n📢 事件发布: [{event.event_type}] 来自 {event.publisher}")
        
        # 通知特定订阅者
        handlers = self.subscribers.get(event.event_type, [])
        
        # 通知通配符订阅者
        all_handlers = handlers + self.wildcard_subscribers
        
        if not all_handlers:
            print(f"  (无人订阅 '{event.event_type}' 事件)")
            return
        
        # 并行通知所有订阅者
        await asyncio.gather(*[
            self._safe_handle(handler, event)
            for handler in all_handlers
        ])
    
    async def _safe_handle(self, handler: Callable, event: Event):
        """安全处理事件(捕获异常,不影响其他处理者)"""
        try:
            if asyncio.iscoroutinefunction(handler):
                await handler(event)
            else:
                handler(event)
        except Exception as e:
            print(f"  ⚠️ 处理器 {handler.__name__} 出错: {e}")
    
    def get_history(self, event_type: str = None, limit: int = 10) -> List[Event]:
        """获取事件历史"""
        events = self.history
        if event_type:
            events = [e for e in events if e.event_type == event_type]
        return events[-limit:]
    
    @property
    def stats(self) -> dict:
        return {
            "总事件数": self._publish_count,
            "订阅主题数": len(self.subscribers),
            "通配符订阅者数": len(self.wildcard_subscribers),
            "历史记录数": len(self.history),
        }


# 演示事件总线
async def demo_event_bus():
    bus = EventBus()
    
    # 不同 Agent 订阅不同类型的事件
    async def quality_checker(event: Event):
        print(f"  [质检Agent] 收到新报告,开始质量检查: {event.data.get('title', '')}")
    
    async def notification_agent(event: Event):
        print(f"  [通知Agent] 任务完成,发送通知给用户")
    
    async def logger_agent(event: Event):
        print(f"  [日志Agent] 记录事件: {event.event_type} at {event.timestamp:.0f}")
    
    # 订阅
    bus.subscribe("report_generated", quality_checker)
    bus.subscribe("report_generated", notification_agent)
    bus.subscribe_all(logger_agent)  # 日志 Agent 订阅所有事件
    
    print("=== 事件总线演示 ===\n")
    
    # 某个 Agent 完成了一份报告,发布事件
    await bus.publish(Event(
        event_type="report_generated",
        publisher="writer_agent",
        data={"title": "2026年AI市场分析报告", "pages": 15},
    ))
    
    print("\n事件总线统计:")
    for k, v in bus.stats.items():
        print(f"  {k}: {v}")


asyncio.run(demo_event_bus())

本章小结

  1. 标准化消息格式是多 Agent 通信的地基:每条消息应该有唯一ID、时间戳、类型、优先级和过期时间。这些元数据是追踪、调试和可靠性的基础,不是可选的。

  2. 同步通信适合简单依赖,异步通信适合复杂协作:如果 A 必须等 B 的结果才能继续,用同步;如果 A 可以继续做其他事,用异步。混合使用能兼顾效率和一致性。

  3. 事件总线是松耦合的最佳实践:Agent 只需要知道"发生了什么事",不需要知道"谁在处理"。这让系统更容易扩展(增加新 Agent 不改变现有代码)。

  4. 消息过期机制防止系统被旧请求淹没:一条在 5 分钟前发出的任务消息,到达时可能已经没有意义。设置过期时间,让系统自动丢弃过时消息。

  5. 通信可靠性需要主动设计:确认机制(ACK)、重试策略、超时检测——这些不会自动出现,必须在设计阶段就考虑进去。

# 核心行动:为你的多 Agent 系统搭建基础通信层
async def setup_basic_communication():
    """5分钟搭建多 Agent 通信基础设施"""
    
    # 1. 创建事件总线
    bus = EventBus()
    
    # 2. 创建异步通道
    channel = AsyncAgentChannel()
    
    # 3. 为每个 Agent 注册邮箱
    for agent_name in ["coordinator", "researcher", "analyst", "writer"]:
        channel.create_mailbox(agent_name)
    
    # 4. 设置关键事件的订阅
    async def on_task_complete(event: Event):
        print(f"任务完成: {event.data}")
    
    bus.subscribe("task_completed", on_task_complete)
    
    print("✅ 通信基础设施已就绪")
    return bus, channel

asyncio.run(setup_basic_communication())

本章提示词模板

【模板1:通信协议设计提示词】
请为以下多 Agent 系统设计通信协议:

Agent 列表:{agent_list}
主要工作流:{workflow_description}
性能要求:{latency_requirements}

请设计:
1. 消息类型列表(这个系统需要哪几种消息类型)
2. 关键消息的格式(最重要的3种消息的字段设计)
3. 通信模式选择(哪些交互应该同步,哪些应该异步)
4. 需要哪些事件总线主题(Agent 应该广播哪些系统事件)
5. 超时和重试策略(各类消息的合理超时时间)
【模板2:通信问题诊断提示词】
我的多 Agent 系统出现了以下通信问题:

症状:{symptom}(如:消息丢失、死锁、响应超慢)
系统架构:{architecture}
消息量:每秒约 {msg_per_second} 条消息

可能原因分析:
1. 这是同步/异步使用不当的问题吗?
2. 是否存在消息队列积压?
3. 是否有循环依赖导致死锁?
4. 消息格式是否有问题(如缺少过期时间)?

请给出具体的排查步骤和修复建议。

→ 第04章:角色设计与任务分配:谁来做什么