第二章 技术栈选型:为AI而生的后端
第二章 技术栈选型:为AI而生的后端
“选择技术栈不是选最流行的,而是选最适合你的问题形状的。AI SaaS的问题形状是:高IO等待、变长处理时间、结构化与非结构化数据并存。”
一、为什么异步是核心需求
传统的Web应用,大多数请求的处理时间在几毫秒到几十毫秒之间:数据库查询、缓存读取、业务逻辑计算。这个时间尺度下,同步和异步的差距可以忽略不计。
AI应用的时间尺度完全不同:
- 调用视觉模型分析一张图片:1-5秒
- 调用LLM生成一段内容:3-15秒
- 调用视频生成API:30秒-5分钟
- 批量处理50张图片:数分钟
在这个时间尺度下,同步处理意味着一个请求会占用一个线程/进程几秒甚至几分钟。对于传统的WSGI服务器(如gunicorn多进程模型),每个进程同时只能处理一个请求——等AI调用的时候,这个进程完全阻塞,无法处理其他请求。
解决方案有两种:
方案A(水平扩展):开很多个进程,用进程数量换并发。100个并发需要100个进程,每个进程100MB内存,这意味着10GB内存只用于"等待AI响应"。成本极高。
方案B(异步IO):使用异步框架,单个进程可以同时处理数百个请求——不是真正的"同时",而是利用IO等待时间切换上下文。在等待AI API响应的那几秒里,同一个进程可以处理其他用户的请求。
FastAPI + asyncio就是方案B的核心实现。
二、FastAPI的核心优势
优势一:原生async/await支持
# 同步版本(每次调用都阻塞线程)
@app.post("/analyze")
def analyze_image(payload: AnalyzeRequest) -> AnalyzeResponse:
result = openai_client.analyze(payload.image_url) # 阻塞5秒
return AnalyzeResponse(result=result)
# 异步版本(等待IO时释放线程)
@app.post("/analyze")
async def analyze_image(payload: AnalyzeRequest) -> AnalyzeResponse:
result = await ai_client.analyze(payload.image_url) # 非阻塞等待
return AnalyzeResponse(result=result)
第二个版本在等待AI响应的5秒里,同一个事件循环可以处理其他用户的请求。这是AI SaaS高并发的基础。
优势二:Pydantic v2的数据验证
AI功能的数据边界特别重要:输入验证(确保传入的是合法的图片URL而不是SQL注入字符串)、输出解析(确保AI返回的JSON符合预期的schema)都可以用Pydantic优雅地处理:
from pydantic import BaseModel, UUID4, HttpUrl, field_validator
from typing import Literal
class OverlayItem(BaseModel):
role: Literal["headline", "offer", "cta", "body"] # 严格枚举
text: str
@field_validator("role", mode="before")
@classmethod
def normalize_role(cls, v: str) -> str:
"""模型可能返回非标准role值,归一化处理"""
VALID_ROLES = {"headline", "offer", "cta", "body"}
if v.lower() in VALID_ROLES:
return v.lower()
# 非标准值归类为body
return "body"
class ImageAnalysisResult(BaseModel):
overlays: list[OverlayItem]
has_face: bool
has_product: bool
description: str | None = None # 允许为None
class AnalyzeRequest(BaseModel):
asset_id: UUID4
# 不接受image_url作为直接入参,必须通过asset_id查询
# 这防止用户传入任意URL让服务器发起SSRF请求
注意最后的注释:不接受image_url作为直接入参——这是一个安全设计决策。在第十章安全与合规中会详细讨论。
优势三:依赖注入系统
FastAPI的Depends系统让横切关注点(身份验证、数据库会话、权限检查)可以优雅地复用:
# 一次定义,到处复用
async def get_current_user_with_quota_check(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> User:
"""检查用户是否有足够的图片分析额度"""
quota = await quota_service.get_image_analysis_quota(db, user)
if quota["remaining"] is not None and quota["remaining"] < 1:
raise HTTPException(
status_code=402,
detail={"code": "quota_exceeded", "remaining": 0}
)
return user
# 在路由中使用
@router.post("/analyze")
async def analyze_image(
asset_id: UUID4,
db: AsyncSession = Depends(get_db),
user: User = Depends(get_current_user_with_quota_check), # 复用
):
...
三、SQLAlchemy 2异步ORM
SQLAlchemy 2的异步支持让数据库操作也完全融入异步体系:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
# 异步引擎配置
engine = create_async_engine(
settings.database_url,
pool_size=20, # 连接池大小
max_overflow=30, # 超过pool_size后最多再开多少个连接
pool_timeout=30, # 获取连接的超时时间
pool_recycle=3600, # 连接复用最大时间(避免长连接超时)
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # 提交后不过期,避免额外查询
)
关键原则:同一个AsyncSession不能并发查询
这是SQLAlchemy异步的核心限制,初学者最容易踩的坑:
# ❌ 错误:同一个session并发执行两个查询
async def get_user_and_assets(db: AsyncSession, user_id: UUID):
results = await asyncio.gather(
db.execute(select(User).where(User.id == user_id)),
db.execute(select(Asset).where(Asset.owner_id == user_id)),
)
# 这会抛出 InvalidRequestError!
# ✅ 正确:顺序执行
async def get_user_and_assets(db: AsyncSession, user_id: UUID):
user_result = await db.execute(select(User).where(User.id == user_id))
asset_result = await db.execute(select(Asset).where(Asset.owner_id == user_id))
return user_result.scalar_one_or_none(), asset_result.scalars().all()
如果业务逻辑确实需要并发数据库查询,解决方案是使用多个session(如asyncio.gather配合多个独立session),而不是共用一个session。
四、PostgreSQL的JSONB:结构化与非结构化并存
AI SaaS的数据有一个特殊之处:核心业务数据是高度结构化的(用户、订阅、文件元数据),但AI处理的结果往往是半结构化的(分析JSON、生成的内容)。
关系型数据库处理半结构化数据的传统做法是序列化成文本字段存储,但这失去了查询能力。PostgreSQL的JSONB类型提供了最佳平衡:
from sqlalchemy.dialects.postgresql import JSONB
class Asset(Base):
__tablename__ = "assets"
id: Mapped[UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
asset_fingerprint: Mapped[str] = mapped_column(Text, unique=True)
# AI分析结果存为JSONB,既保留结构,又支持索引查询
image_analysis: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
image_analysis_version: Mapped[str | None] = mapped_column(Text, nullable=True)
JSONB的强大之处:可以为JSON中的特定字段创建索引,支持精确的JSON路径查询:
-- 查询所有有人脸的图片分析结果
SELECT * FROM assets
WHERE image_analysis->>'has_face' = 'true';
-- 为高频查询字段创建GIN索引
CREATE INDEX idx_assets_image_analysis
ON assets USING gin(image_analysis);
五、Redis的多重角色
在AI SaaS中,Redis通常扮演三个不同的角色,需要区别对待:
角色一:缓存(Cache)
缓存高频读取的、计算成本高的数据:
- 用户配额信息(避免每次请求都查数据库)
- 热门内容的AI分析结果
- 用户会话数据
# 配额缓存示例
QUOTA_CACHE_KEY = "quota:{user_id}:{quota_type}"
QUOTA_CACHE_TTL = 60 # 秒
async def get_cached_quota(user_id: UUID, quota_type: str) -> dict | None:
key = QUOTA_CACHE_KEY.format(user_id=user_id, quota_type=quota_type)
cached = await redis.get(key)
return json.loads(cached) if cached else None
async def set_quota_cache(user_id: UUID, quota_type: str, data: dict):
key = QUOTA_CACHE_KEY.format(user_id=user_id, quota_type=quota_type)
await redis.setex(key, QUOTA_CACHE_TTL, json.dumps(data))
async def invalidate_quota_cache(user_id: UUID, quota_type: str):
key = QUOTA_CACHE_KEY.format(user_id=user_id, quota_type=quota_type)
await redis.delete(key)
角色二:消息队列(Message Broker)
作为Celery的消息中间件,存储待处理的任务:
# celery配置
CELERY_BROKER_URL = "redis://localhost:6379/1" # 用独立的DB编号
CELERY_RESULT_BACKEND = "redis://localhost:6379/2"
# 生产建议:Broker和Cache用不同的Redis DB,
# 甚至用不同的Redis实例,避免互相影响
角色三:分布式锁(Distributed Lock)
防止同一用户并发触发多次相同的AI分析(可能导致重复扣费):
async def acquire_analysis_lock(
asset_id: UUID,
user_id: UUID,
ttl: int = 30
) -> bool:
"""尝试获取分析锁,返回是否成功"""
key = f"lock:analysis:{asset_id}:{user_id}"
# SET NX(不存在时才设置)是原子操作,天然防竞争
result = await redis.set(key, "1", nx=True, ex=ttl)
return result is not None
async def release_analysis_lock(asset_id: UUID, user_id: UUID):
key = f"lock:analysis:{asset_id}:{user_id}"
await redis.delete(key)
六、技术栈的"不用"清单
选型不只是选什么,也是不选什么。以下是AI SaaS常见的技术选型误区:
误区一:用Django ORM
Django的ORM是同步的。即使用ASGI服务器部署,数据库操作仍然阻塞事件循环(或者需要sync_to_async包装,有额外开销)。在AI SaaS的高并发场景下,这个限制是实质性的。推荐SQLAlchemy 2异步ORM或Tortoise ORM。
误区二:直接用requests库调用AI API
requests是同步库。在异步FastAPI应用中调用requests.get()会阻塞整个事件循环,所有并发请求都要等这一个请求完成。必须用httpx.AsyncClient或aiohttp。
误区三:把大JSON存在Text字段里
AI输出的JSON存成Text字段然后在Python里json.loads(),失去了数据库层面的查询能力。用PostgreSQL的JSONB,成本几乎相同,但获得了灵活的查询能力。
误区四:不做连接池
AI调用期间,数据库连接被占用。如果连接池配置不合理(pool_size太小),高并发时会出现连接等待。根据预期并发量和AI调用时长,合理配置pool_size和max_overflow。
核心洞见
-
异步IO是AI SaaS的核心技术需求——AI调用的时间尺度(秒级)在同步框架下会导致严重的资源浪费。FastAPI + asyncio + asyncpg是这个问题的标准解法。
-
Pydantic v2是AI输出的天然校验层——严格的类型注解和自定义validator,让AI的概率性输出在进入系统前就被归一化和验证。
-
同一AsyncSession不能并发查询——这是SQLAlchemy异步最重要的限制,必须了解并遵守。
-
PostgreSQL JSONB让结构化与半结构化数据和谐共存——AI分析结果用JSONB存储,既灵活,又支持高效查询。
-
Redis扮演三个不同角色,需分开管理:缓存(TTL驱动)、消息队列(Celery Broker)、分布式锁(防并发重复操作)。
下一章,我们深入AI API生态——市面上有那么多AI API提供商,如何选择、如何集成,特别是如何利用WaveSpeed这类多模型平台降低集成复杂度。