第03章 API服务化:FastAPI生产级设计

第03章 API服务化:FastAPI生产级设计

“API 是你的系统给世界开的门。门的设计决定了谁能进、出了什么事你能知道、系统出问题时损失有多大。” —— API 设计原则


FastAPI 写出一个基础 Web 服务很容易,但"能跑"和"生产就绪"之间,有健康检查、优雅关机、请求追踪、超时控制这几道坎。本章把它们都写进你的 FastAPI 应用。


3.1 完整的生产级 FastAPI 结构

# ====================================================
# 生产级 FastAPI Agent 服务
# ====================================================

"""
生产级 FastAPI 服务的必要组件:

1. /health 端点       — 健康检查,K8s 用来判断是否重启容器
2. /ready 端点        — 就绪检查,K8s 用来判断是否接受流量
3. request_id 追踪    — 每个请求唯一ID,贯穿整个调用链
4. 优雅关机          — 收到停止信号时,等待正在处理的请求完成
5. 超时控制          — 防止某个请求永远挂起
6. 结构化日志        — JSON 格式,包含 request_id
7. 错误统一处理      — 不把 stacktrace 暴露给用户
"""

import asyncio
import uuid
import time
import logging
import json
from contextlib import asynccontextmanager
from typing import Optional, Dict, Any

from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from openai import AsyncOpenAI


# ─────────────────────────────────────────────
# 结构化日志配置
# ─────────────────────────────────────────────

class StructuredLogger:
    """生产日志:始终输出 JSON,便于日志系统(Loki/CloudWatch)解析"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
    
    def _emit(self, level: str, message: str, **kwargs):
        record = {
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "level": level,
            "service": self.service_name,
            "message": message,
            **kwargs,  # 额外的上下文字段(request_id, latency_ms, 等)
        }
        print(json.dumps(record, ensure_ascii=False))
    
    def info(self, message: str, **kwargs):
        self._emit("INFO", message, **kwargs)
    
    def warning(self, message: str, **kwargs):
        self._emit("WARNING", message, **kwargs)
    
    def error(self, message: str, **kwargs):
        self._emit("ERROR", message, **kwargs)


logger = StructuredLogger("agent-api")
client = AsyncOpenAI()


# ─────────────────────────────────────────────
# 应用状态管理
# ─────────────────────────────────────────────

class AppState:
    """应用全局状态(单例)"""
    def __init__(self):
        self.ready = False          # 是否就绪接受流量
        self.start_time = time.time()
        self.request_count = 0
        self.error_count = 0
    
    @property
    def uptime_seconds(self) -> float:
        return time.time() - self.start_time
    
    @property
    def error_rate(self) -> float:
        return self.error_count / max(self.request_count, 1)

app_state = AppState()


# ─────────────────────────────────────────────
# 生命周期管理(startup / shutdown)
# ─────────────────────────────────────────────

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用启动和关闭的生命周期管理
    
    启动时:初始化资源(数据库连接池、模型预热等)
    关闭时:优雅释放资源,等待进行中的请求完成
    """
    # ── 启动阶段 ──
    logger.info("Agent 服务启动中...")
    
    # 预热:发送一个测试请求,确保 LLM 连接正常
    try:
        await asyncio.wait_for(
            client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": "ping"}],
                max_tokens=5,
            ),
            timeout=15.0,
        )
        logger.info("LLM 连接预热成功")
        app_state.ready = True
    except Exception as e:
        logger.warning("LLM 预热失败,服务仍将启动但可能降级", error=str(e))
        app_state.ready = True  # 依然标记就绪,不阻塞部署
    
    yield  # 这里是应用正常运行阶段
    
    # ── 关闭阶段(优雅关机)──
    logger.info("Agent 服务正在关闭...")
    app_state.ready = False
    
    # 给进行中的请求最多10秒完成
    await asyncio.sleep(2)
    logger.info("Agent 服务已停止")


# ─────────────────────────────────────────────
# FastAPI 应用
# ─────────────────────────────────────────────

app = FastAPI(
    title="Agent API",
    version="1.0.0",
    lifespan=lifespan,
    # 生产环境关闭文档(避免暴露 API 结构)
    # docs_url=None,
    # redoc_url=None,
)

# CORS(按需配置,不要用 allow_origins=["*"])
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://your-frontend.com"],  # 改为实际前端域名
    allow_methods=["GET", "POST"],
    allow_headers=["Content-Type", "X-Request-ID"],
)


# ─────────────────────────────────────────────
# 请求追踪中间件
# ─────────────────────────────────────────────

@app.middleware("http")
async def request_tracking_middleware(request: Request, call_next):
    """
    为每个请求:
    1. 生成或传递 request_id(分布式追踪的基础)
    2. 记录请求日志
    3. 计算响应时间
    4. 在响应头中返回 request_id(方便客户端上报问题时对应日志)
    """
    # 支持外部传入 request_id(如 API 网关生成的)
    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
    start_time = time.time()
    
    logger.info(
        "请求开始",
        request_id=request_id,
        method=request.method,
        path=request.url.path,
    )
    
    try:
        response = await call_next(request)
    except Exception as e:
        logger.error("未捕获的异常", request_id=request_id, error=str(e))
        response = JSONResponse(
            status_code=500,
            content={"error": "internal_server_error", "request_id": request_id},
        )
    
    latency_ms = (time.time() - start_time) * 1000
    
    logger.info(
        "请求完成",
        request_id=request_id,
        status_code=response.status_code,
        latency_ms=round(latency_ms, 1),
    )
    
    # 在响应头中返回 request_id,方便前端上报问题时对应日志
    response.headers["X-Request-ID"] = request_id
    response.headers["X-Response-Time"] = f"{latency_ms:.1f}ms"
    
    return response


# ─────────────────────────────────────────────
# 健康检查端点
# ─────────────────────────────────────────────

@app.get("/health")
async def health_check():
    """
    存活检查(Liveness Probe)
    返回 200 = 进程正常
    返回 500 = 需要重启容器
    """
    return {
        "status": "alive",
        "uptime_seconds": round(app_state.uptime_seconds),
        "version": "1.0.0",
    }


@app.get("/ready")
async def readiness_check():
    """
    就绪检查(Readiness Probe)
    返回 200 = 可以接受流量
    返回 503 = 暂时不接受流量(正在启动或负载过高)
    """
    if not app_state.ready:
        raise HTTPException(
            status_code=503,
            detail={"status": "not_ready", "reason": "服务正在初始化"},
        )
    
    # 高错误率时也拒绝流量,让 K8s 重启(或者至少不再给它分配新请求)
    if app_state.error_rate > 0.5 and app_state.request_count > 10:
        raise HTTPException(
            status_code=503,
            detail={"status": "degraded", "error_rate": app_state.error_rate},
        )
    
    return {
        "status": "ready",
        "error_rate": round(app_state.error_rate, 3),
        "request_count": app_state.request_count,
    }


# ─────────────────────────────────────────────
# 业务端点
# ─────────────────────────────────────────────

class AgentRequest(BaseModel):
    message: str = Field(..., min_length=1, max_length=2000)
    session_id: Optional[str] = None


class AgentResponse(BaseModel):
    output: str
    request_id: str
    latency_ms: float
    tokens_used: int


@app.post("/v1/agent/chat", response_model=AgentResponse)
async def agent_chat(
    body: AgentRequest,
    request: Request,
):
    """
    核心业务端点:Agent 对话
    """
    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
    start_time = time.time()
    
    app_state.request_count += 1
    
    try:
        # 超时控制:30 秒
        response = await asyncio.wait_for(
            client.chat.completions.create(
                model="gpt-4o-mini",
                messages=[{"role": "user", "content": body.message}],
                max_tokens=500,
            ),
            timeout=30.0,
        )
        
        output = response.choices[0].message.content
        latency_ms = (time.time() - start_time) * 1000
        
        return AgentResponse(
            output=output,
            request_id=request_id,
            latency_ms=round(latency_ms, 1),
            tokens_used=response.usage.total_tokens,
        )
        
    except asyncio.TimeoutError:
        app_state.error_count += 1
        raise HTTPException(
            status_code=504,
            detail={"error": "timeout", "message": "请求超时,请稍后重试"},
        )
    except Exception as e:
        app_state.error_count += 1
        logger.error("Agent 调用失败", request_id=request_id, error=str(e))
        raise HTTPException(
            status_code=500,
            detail={"error": "internal_error", "message": "服务暂时不可用"},
        )


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

本章小结

  1. /health/ready 是两个不同的端点,作用不同/health 判断进程是否存活(失败=重启容器),/ready 判断是否能接受流量(失败=从负载均衡中摘除,但不重启)。两个都要有。

  2. request_id 是分布式系统中的"案发现场":每个请求一个唯一 ID,贯穿日志、响应头、错误消息。当用户报告问题时,一个 request_id 就能让你在海量日志中找到对应的完整链路。

  3. 优雅关机(Graceful Shutdown)防止用户看到中断的请求:在容器收到 SIGTERM 时,应该先停止接受新请求,等现有请求处理完,再退出。FastAPI + uvicorn 的 lifespan 机制是实现这一点的标准方式。

  4. 不要把内部错误信息(stacktrace)暴露给用户:统一错误处理在返回给用户之前,把 Exception 转换成友好的错误消息。完整的错误信息只出现在日志中。

  5. Pydantic 的 min_length / max_length 是第一道防线:在数据进入业务逻辑之前,先用 Pydantic 模型验证。这既是安全保护(防止超长输入),也是 API 契约的文档。

# 核心行动:给你的 FastAPI Agent 加上这三件事
# 最小化改造(优先级排序):

# 1. 加健康检查端点(10分钟)
@app.get("/health")
async def health(): return {"status": "ok"}

# 2. 加请求超时(5分钟)
response = await asyncio.wait_for(your_llm_call(), timeout=30.0)

# 3. 加 request_id 中间件(30分钟)
# (复制本章的 request_tracking_middleware)

本章提示词模板

【模板1:FastAPI 生产改造清单提示词】
以下是我当前的 FastAPI Agent 服务代码(关键部分):

{code_snippet}

请帮我识别:
1. 缺少哪些生产就绪的组件?(按优先级排序)
2. 有哪些潜在的性能瓶颈?
3. 有哪些安全隐患?(特别是 AI 服务特有的)
4. 给出最小化改造方案(最少改动,最大收益)
5. 有没有不需要修改业务逻辑就能添加的横切关注点(日志、追踪等)?
【模板2:API 错误处理设计提示词】
我的 AI Agent API 可能出现以下类型的错误:
1. OpenAI API 超时
2. OpenAI API 速率限制(429 Too Many Requests)
3. 用户输入包含无效字符
4. 上下文窗口超出限制
5. 服务器内存不足

请为每种错误设计:
1. HTTP 状态码(哪个最合适?)
2. 返回给用户的错误消息(友好但不泄露内部细节)
3. 是否需要重试,重试策略是什么?
4. 对应的监控告警阈值建议
5. Runbook:出现这个错误时,Oncall 工程师应该做什么?

→ 第04章:配置管理与安全密钥:不要把API Key写进代码