第03章:AI写的异步代码,并发一高就挂?

第03章:AI写的异步代码,并发一高就挂?

“async def 写上去了,但如果你在里面调了一个同步的数据库驱动,整个事件循环就堵死了。”


ℹ️ 版本说明:本章基于 Python 3.14.5。Python 3.13 引入了实验性的免费线程(free-threaded)模式,Python 3.14 进一步完善了 asyncio 的性能和 API。

3.1 AI默认会生成什么

你让 AI 写一个"高并发的 FastAPI 接口":

from fastapi import FastAPI
import asyncio
import httpx

app = FastAPI()

@app.get("/fetch-data")
async def fetch_data():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")
    return response.json()

@app.get("/process")
async def process_item(item_id: int):
    # 从数据库查询
    result = db.query(Item).filter(Item.id == item_id).first()
    return result

第一个接口写得还不错,用了 httpx.AsyncClient,是真正的异步 HTTP 请求。

但第二个接口有个大问题:db.query() 是同步操作。你在 async def 里调用了同步的数据库查询,这会在并发请求时导致整个事件循环阻塞。

一个被阻塞的事件循环意味着: 在那个同步操作完成之前,服务器无法处理其他任何请求。你的"高并发接口"在真正的并发压力下会表现得比单线程还糟糕。


3.2 AI通常遗漏的5个坑

⚠️ 坑1:在 async def 里调用同步阻塞操作

这是最常见、最隐蔽的问题。同步操作包括:

  • 同步数据库驱动psycopg2(同步)而不是 asyncpg(异步);SQLAlchemy 的同步引擎而不是 create_async_engine
  • 同步文件 I/Oopen(), os.path.exists(), shutil 等标准库文件操作
  • 同步 HTTP 请求requests 库(同步),而不是 httpxaiohttp(异步)
  • time.sleep():应该用 asyncio.sleep()

检测方法:在代码里搜索 import requestsimport psycopg2time.sleep——如果出现在 async def 里,就是问题所在。


⚠️ 坑2:忘记 await,导致协程未执行

这类 bug 特别诡异:

async def send_notification(user_id: int):
    # 错误:忘记 await,这行代码创建了协程对象但没有执行它
    notify_user(user_id)
    
    # 正确:
    await notify_user(user_id)

如果 notify_userasync def,不加 await 的调用会返回一个协程对象而不执行任何操作。Python 3.14 会产生 RuntimeWarning: coroutine 'notify_user' was never awaited,但这只是警告,不会让你的代码崩溃——只是静默地什么都没做。


⚠️ 坑3:CPU 密集型任务放在事件循环里

asyncio 的异步是I/O 并发,不是 CPU 并发。如果你在 async def 里做大量计算(图像处理、数据分析、机器学习推理),会阻塞事件循环。

@app.get("/process-image")
async def process_image(image_id: int):
    # 错误:CPU密集型操作阻塞事件循环
    result = heavy_image_processing(image_id)
    
    # 正确:放到线程池执行
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, heavy_image_processing, image_id)
    return result

AI 通常不会主动建议你用 run_in_executor,除非你明确要求。


⚠️ 坑4:并发任务缺少限流,导致资源耗尽

AI 很容易写出这样的代码来并发处理多个任务:

async def process_all(items: list):
    tasks = [process_item(item) for item in items]
    results = await asyncio.gather(*tasks)
    return results

如果 items 有 10000 个元素,这段代码会同时发起 10000 个并发操作——数据库连接耗尽、外部 API 速率限制被触发、内存溢出。

正确做法:用 asyncio.Semaphore 限制并发数。


⚠️ 坑5:异步上下文管理器使用错误

数据库连接、HTTP 客户端等都应该作为上下文管理器使用,但 AI 经常会写出:

# 错误:每次请求都创建新的连接,没有连接池
@app.get("/users")
async def get_users():
    async with AsyncSession(engine) as session:
        users = await session.execute(select(User))
    return users.scalars().all()

每次请求都新建会话没问题,但如果你把 engine 创建放在函数里,每次都新建连接池就是大问题了。AI 需要你明确要求才会帮你设计连接池的生命周期。


3.3 更好的提示词

提示词 P01:让 AI 写真正异步的数据库查询

使用时机:需要用 SQLAlchemy 2.x 做异步数据库操作

比默认多了什么

  • 明确使用 create_async_engineAsyncSession
  • 连接池配置
  • 会话生命周期管理
用 Python 3.14 + SQLAlchemy 2.x + asyncpg 写一个完全异步的数据库访问层。

要求:
1. 使用 `create_async_engine` 而不是 `create_engine`
2. 连接池配置:pool_size=10, max_overflow=20, pool_timeout=30
3. 使用 FastAPI 的依赖注入提供 AsyncSession
4. 演示一个完整的 CRUD 示例:
   - 创建(INSERT)
   - 查询单个(SELECT WHERE id=?)
   - 查询列表(SELECT with pagination)
   - 更新(UPDATE)
   - 删除(DELETE)
5. 所有操作都要有 try/except,数据库异常要转为 HTTP 异常
6. 连接池在应用启动时初始化,关闭时优雅关闭(使用 FastAPI lifespan)

基于 Python 3.14 + FastAPI 0.115.x + SQLAlchemy 2.x + asyncpg。

提示词 P02:处理并发任务时的限流和错误恢复

使用时机:需要并发处理大批量任务(批量请求 API、批量数据处理等)

比默认多了什么

  • Semaphore 限制并发
  • 部分失败时的错误收集(不影响成功的任务)
  • 超时处理
我需要并发处理一批用户 ID(可能有几千个),对每个用户调用外部 API 获取数据。

用 Python 3.14 + asyncio 实现,要求:
1. 使用 asyncio.Semaphore 限制最大并发数为 50
2. 使用 asyncio.gather(return_exceptions=True) 捕获部分失败,不因为个别失败中断整批处理
3. 每个请求有 10 秒超时(asyncio.timeout 或 asyncio.wait_for)
4. 失败的任务记录到日志(包含 user_id 和错误信息),成功的任务返回结果
5. 实现指数退避重试(最多3次,间隔:1秒、2秒、4秒)
6. 返回结构:{"success": [...], "failed": [...], "total": N, "success_rate": "XX%"}

给我完整的实现代码,不要有任何占位符。

提示词 P03:将 CPU 密集型任务从事件循环分离

使用时机:有大量计算任务(图像处理、数据分析、PDF 生成等)需要集成到异步应用

比默认多了什么

  • 用 ProcessPoolExecutor 处理真正的 CPU 密集型任务
  • 用 ThreadPoolExecutor 处理同步 I/O(文件读写、调用同步库)
  • 明确区分两种场景
我的 FastAPI 应用需要处理以下两类后台任务:
A)CPU密集型:PDF 转图片(使用 pdf2image 库,纯 CPU 计算)
B)同步 I/O 型:调用第三方 SDK(只有同步接口,没有异步版本)

用 Python 3.14 + asyncio 实现这两类任务,要求:
1. CPU密集型任务使用 ProcessPoolExecutor(真正的多进程并行)
2. 同步 I/O 任务使用 ThreadPoolExecutor(释放 GIL 的线程池)
3. 两种 executor 在 FastAPI lifespan 里初始化和关闭
4. 给出正确的调用方式:`await loop.run_in_executor(executor, func, *args)`
5. 说明 Python 3.14 的 free-threaded 模式(--disable-gil)对 CPU 密集型任务的影响
6. 超时控制:CPU任务30秒超时,IO任务5秒超时

演示端点:POST /convert-pdf(A类任务)和 POST /call-sdk(B类任务)

3.4 验收清单

检查项 验证方法 AI辅助
无同步阻塞操作在 async def 中 grep -rn "import requests|time.sleep|open(" --include="*.py" 检查 async 函数体 让 AI 审查代码找出所有同步调用
所有 await 都有对应 async def 运行 Python 时无 RuntimeWarning: coroutine was never awaited 让 AI 检查所有 async 函数调用
数据库使用异步驱动 确认使用 asyncpg / motor / aiosqlite 而非同步驱动 让 AI 检查 requirements 和 import
并发批处理有限流 批量任务代码中存在 asyncio.Semaphore 让 AI 重构批量任务代码
CPU 密集型任务在 executor 中 计算密集型函数通过 run_in_executor 调用 让 AI 识别并重构
有连接池配置 数据库引擎有 pool_size 设置 让 AI 添加连接池参数

3.5 本章小结

如果你只记一件事async def 不会自动让你的代码变快。在 async def 里调用同步操作,比不用 async 还要糟糕——它阻塞的不只是当前请求,而是整个服务器。

Python 3.14 异步编程的关键认知

  1. asyncio 解决的是 I/O 等待,不是 CPU 计算。10 个并发数据库查询,asyncio 效果显著;10 个并发矩阵计算,你需要多进程
  2. 同步 → 异步的替换清单requestshttpxpsycopg2asyncpgtime.sleepasyncio.sleep;同步 SQLAlchemy → create_async_engine
  3. Python 3.14 的 free-threaded 模式 允许多线程真正并行运行 Python 代码(移除 GIL),这是历史性突破——但目前仍是实验性特性,需要额外编译选项启用

→ 第4章:AI帮我写了异常处理,但用户看到的还是500?