第01章 从实验到生产:两个世界的鸿沟
第01章 从实验到生产:两个世界的鸿沟
“每个工程师在第一次把东西推上生产之前,都以为自己准备好了。” —— 生产事故复盘中最常见的开场白
本地环境和生产环境,表面上都是在跑同一段代码,但实际上是两个完全不同的世界。理解这条鸿沟,是做好生产部署的第一步。
1.1 两个世界的真实差距
# ====================================================
# 实验环境 vs 生产环境:差距清单
# ====================================================
"""
这不是哲学讨论,这是真实踩过的坑的总结。
┌────────────────┬──────────────────────┬──────────────────────┐
│ 维度 │ 实验环境 │ 生产环境 │
├────────────────┼──────────────────────┼──────────────────────┤
│ 用户 │ 开发者自己(1人) │ 真实用户(N万人) │
│ 并发 │ 顺序请求 │ 同时几百个请求 │
│ 错误处理 │ 打印 traceback 就行 │ 必须优雅降级 │
│ 密钥管理 │ .env 文件或硬编码 │ Secrets Manager │
│ 日志 │ print() 看终端 │ 结构化日志+收集系统 │
│ 监控 │ 没有 │ 24/7指标+告警 │
│ 部署 │ python app.py │ 容器+编排+CI/CD │
│ 扩容 │ 不需要 │ 根据流量自动伸缩 │
│ 回滚 │ git reset 一下 │ 需要零停机回滚方案 │
│ 安全 │ 基本不考虑 │ Prompt注入/数据泄露 │
└────────────────┴──────────────────────┴──────────────────────┘
Agent 特有的生产挑战(普通服务没有的):
1. LLM API 故障/超时(第三方依赖更脆)
2. Token 成本意外爆炸(循环调用bug,一夜花了$5000)
3. Prompt 注入攻击(用户输入干扰 Agent 行为)
4. 响应不稳定(同一个输入,不同时刻不同质量)
5. 上下文窗口溢出(长对话后 Agent 开始"失忆")
"""
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from openai import AsyncOpenAI
client = AsyncOpenAI()
# 实验环境写法(别这样做)
async def bad_production_agent(user_input: str) -> str:
"""
这段代码在本地能跑,但不能上生产:
- 没有超时控制(LLM 可能挂起)
- 没有错误处理(异常会直接抛出给用户)
- 没有日志(出了问题无法排查)
- 没有成本控制(可能意外多次调用)
"""
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_input}],
)
return response.choices[0].message.content
# 生产环境写法(应该这样做)
@dataclass
class ProductionAgentConfig:
"""Agent 生产配置,不允许硬编码"""
model: str = "gpt-4o-mini"
timeout_seconds: float = 30.0
max_tokens: int = 500
max_retries: int = 2
fallback_message: str = "抱歉,服务暂时不可用,请稍后再试。"
class ProductionAgent:
"""
生产级 Agent:包含超时、重试、日志、错误处理
"""
def __init__(self, config: ProductionAgentConfig):
self.config = config
self.request_count = 0
self.error_count = 0
async def respond(
self,
user_input: str,
request_id: str = None, # 用于日志追踪
) -> Dict[str, Any]:
"""
生产级响应方法
返回结构化结果(不是裸字符串),方便调用方处理
"""
if not request_id:
request_id = f"req_{int(time.time() * 1000)}"
self.request_count += 1
start_time = time.time()
# 结构化日志(生产中会发送到日志系统)
print(f"[{request_id}] 开始处理 | 输入长度: {len(user_input)}")
for attempt in range(self.config.max_retries + 1):
try:
# 超时控制
response = await asyncio.wait_for(
client.chat.completions.create(
model=self.config.model,
messages=[{"role": "user", "content": user_input}],
max_tokens=self.config.max_tokens,
),
timeout=self.config.timeout_seconds,
)
output = response.choices[0].message.content
latency_ms = (time.time() - start_time) * 1000
# 记录成功指标
print(f"[{request_id}] 成功 | 延迟: {latency_ms:.0f}ms | "
f"Token: {response.usage.total_tokens}")
return {
"success": True,
"output": output,
"request_id": request_id,
"latency_ms": latency_ms,
"tokens_used": response.usage.total_tokens,
"model": self.config.model,
}
except asyncio.TimeoutError:
print(f"[{request_id}] 超时 (尝试 {attempt+1}/{self.config.max_retries+1})")
if attempt == self.config.max_retries:
self.error_count += 1
return {
"success": False,
"output": self.config.fallback_message,
"request_id": request_id,
"error": "timeout",
}
except Exception as e:
print(f"[{request_id}] 错误: {type(e).__name__}: {e}")
if attempt == self.config.max_retries:
self.error_count += 1
return {
"success": False,
"output": self.config.fallback_message,
"request_id": request_id,
"error": str(type(e).__name__),
}
# 不应该到达这里
return {"success": False, "output": self.config.fallback_message, "request_id": request_id}
def get_health_metrics(self) -> Dict:
"""供健康检查端点调用"""
error_rate = self.error_count / max(self.request_count, 1)
return {
"total_requests": self.request_count,
"error_count": self.error_count,
"error_rate": error_rate,
"status": "healthy" if error_rate < 0.05 else "degraded",
}
async def demo_production_vs_experiment():
print("=== 实验 vs 生产 Agent 对比 ===\n")
config = ProductionAgentConfig()
agent = ProductionAgent(config)
# 正常请求
result = await agent.respond("什么是 Kubernetes?", request_id="req_001")
print(f"\n结果: {result['output'][:100]}...\n")
# 查看健康指标
metrics = agent.get_health_metrics()
print(f"健康指标: {metrics}")
asyncio.run(demo_production_vs_experiment())
1.2 生产就绪检查清单
# ====================================================
# 生产就绪 Checklist
# ====================================================
"""
在把 Agent 推上生产之前,逐条检查这个清单。
每一条都是真实踩过的坑。
"""
PRODUCTION_READINESS_CHECKLIST = {
"安全": [
"[ ] API Key 不在代码仓库中(已用环境变量或 Secrets Manager)",
"[ ] 所有敏感配置通过环境变量注入,不在镜像里",
"[ ] 已测试 Prompt 注入攻击(第11章)",
"[ ] 用户输入有长度限制(防止 Token 炸弹)",
"[ ] 输出有敏感信息过滤",
],
"可靠性": [
"[ ] 所有 LLM 调用都有超时控制(建议30秒)",
"[ ] 有重试机制(建议2-3次,指数退避)",
"[ ] 有降级响应(LLM 失败时的 fallback 消息)",
"[ ] 服务有健康检查端点(/health)",
"[ ] 有优雅关机处理(接到 SIGTERM 时完成正在处理的请求)",
],
"可观测性": [
"[ ] 日志是结构化格式(JSON),包含 request_id",
"[ ] 有延迟、错误率、Token 用量的指标",
"[ ] 有分布式追踪(能看到一个请求的完整链路)",
"[ ] 关键指标有告警(错误率 > 5%、延迟 > 5s)",
],
"成本控制": [
"[ ] 设置了 max_tokens 限制",
"[ ] 有每日/每月的 Token 用量告警",
"[ ] 有 Token 消耗的指标和趋势图",
"[ ] 测试过并发情况下的 Token 用量是否符合预期",
],
"部署": [
"[ ] 有 Dockerfile,镜像可以稳定构建",
"[ ] 有 CI/CD 流水线(代码推送自动测试+部署)",
"[ ] 有回滚方案(新版本有问题时能5分钟内回滚)",
"[ ] 已在 staging 环境测试过,与生产环境配置相同",
"[ ] 有灰度发布计划(不要直接全量推送)",
],
"文档": [
"[ ] 有 API 文档(端点、参数、返回格式)",
"[ ] 有 Runbook(常见问题的处理步骤)",
"[ ] 有 Oncall 联系方式和升级路径",
],
}
def print_checklist():
print("=== 生产就绪检查清单 ===\n")
total_items = 0
for category, items in PRODUCTION_READINESS_CHECKLIST.items():
print(f"【{category}】")
for item in items:
print(f" {item}")
total_items += 1
print()
print(f"共 {total_items} 项检查")
print("建议:在第一次上线前,所有项都应该是 ✅")
print_checklist()
本章小结
-
生产环境和实验环境之间有结构性差距,不是"少几行代码"的问题:并发、错误处理、可观测性、安全——这些在实验中不需要,在生产中每一个都不能缺。提前了解这些差距,比事故后补救成本低 10 倍。
-
Agent 比普通服务有额外的生产挑战:第三方 LLM API 的不稳定性、Token 成本的可变性、Prompt 注入的安全风险、输出质量的不一致性——这些是 AI 系统特有的,需要额外设计。
-
生产就绪不是一次性的检查,而是持续的工程实践:Checklist 是起点,但保持系统生产就绪需要文化:每次改动都考虑安全影响,每次事故都更新 Runbook,每个新功能都配套监控。
-
最小化第一次上线的范围:不要把所有功能一次性推上生产。先部署最核心的功能,在小流量下验证,再逐步扩展。"小步快跑"比"大版本发布"的风险低一个量级。
-
超时和降级是第一优先级:在所有生产改进中,超时控制和降级消息是最先要做的。它们直接影响用户体验,也是防止级联故障的第一道防线。
# 核心行动:把你现有的 Agent 改造成生产就绪的版本
# 最小改造:添加超时 + 错误处理 + 结构化返回值
async def minimum_production_upgrade(your_agent_func, user_input):
"""最小化生产改造模板"""
try:
result = await asyncio.wait_for(
your_agent_func(user_input),
timeout=30.0, # 30秒超时
)
return {"success": True, "output": result}
except asyncio.TimeoutError:
return {"success": False, "output": "服务超时,请稍后重试", "error": "timeout"}
except Exception as e:
return {"success": False, "output": "服务暂时不可用", "error": str(type(e).__name__)}
本章提示词模板
【模板1:生产事故分析提示词】
我的 AI Agent 系统发生了以下生产事故:
事故时间:{incident_time}
影响范围:{impact_scope}
错误现象:{error_description}
日志摘要:{log_excerpt}
系统版本:{system_version}
近期变更:{recent_changes}
请帮我进行 5-Why 根因分析:
1. 直接原因是什么?(表面现象)
2. 为什么会发生这个直接原因?(第一层原因)
3. 为什么会有第一层原因?(第二层原因)
4. 继续追溯到根本原因
5. 这个根本原因意味着需要改变什么系统设计或工程实践?
6. 为了防止类似事故重现,需要增加哪些保护机制?
【模板2:生产就绪审查提示词】
以下是我准备上生产的 AI Agent 系统的架构描述:
{system_architecture_description}
请帮我做生产就绪审查:
1. 这个架构中有哪些明显的单点故障?
2. 如果 OpenAI API 宕机30分钟,用户会看到什么?
3. 如果 QPS 突然增加10倍,系统会怎样?
4. 如果有人尝试 Prompt 注入,最坏情况是什么?
5. 列出5个最有可能的生产事故场景和对应的预防措施
6. 给出按优先级排序的3个"必须在上线前解决"的问题