fastapi概述

 
### 1. 最简单的应用
```python
from fastapi import FastAPI

app = FastAPI()  # 创建应用实例

@app.get("/")  # 路由装饰器
async def root():
    return {"message": "Hello World"}
```

保存为 `main.py`,运行:

```bash
uv run uvicorn main:app --reload
```

访问 `http://127.0.0.1:8000`,你会看到:

```json
{"message": "Hello World"}
```

### 2. 路径参数

```python
@app.get("/items/{item_id}")
async def read_item(item_id: int):
    """
    item_id 会自动转换为 int 类型
    访问: /items/123
    """
    return {"item_id": item_id}
```

### 3. 查询参数

```python
from typing import Optional

@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
    """
    查询参数: ?skip=0&limit=10
    """
    return {"skip": skip, "limit": limit}
```

---

**快速**: 与 NodeJS 和 Go 相当的高性能

| 特性 | FastAPI | Flask | Django |
|------|---------|-------|--------|
| 性能 | ⚡⚡⚡ | ⚡⚡ | ⚡ |
| 学习曲线 | 低 | 低 | 中 |
| 自动文档 | ✅ | ❌ | ❌ |
| 类型验证 | ✅ | ❌ | ⚠️ |
| 异步支持 | ✅ | ⚠️ | ✅ |


### 异步支持示例

FastAPI 原生支持异步编程,可以处理大量并发请求:

#### 1. 基础异步函数

```python
import asyncio
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
    # 异步函数,不会阻塞其他请求
    await asyncio.sleep(1)  # 模拟IO操作
    return {"message": "异步响应"}
```

#### 2. 异步HTTP请求

```python
import httpx

@app.get("/weather/{city}")
async def get_weather(city: str):
    # 使用 httpx 异步客户端
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.weather.com/{city}"
        )
    return response.json()
```

#### 3. 并发处理多个请求

```python
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # 可以同时查询多个数据源
    user_data, user_posts = await asyncio.gather(
        fetch_user_from_db(user_id),
        fetch_user_posts(user_id)
    )
    return {"user": user_data, "posts": user_posts}

async def fetch_user_from_db(user_id: int):
    # 模拟数据库查询
    await asyncio.sleep(0.1)
    return {"id": user_id, "name": "张三"}

async def fetch_user_posts(user_id: int):
    # 模拟获取用户文章
    await asyncio.sleep(0.1)
    return [{"title": "文章1"}, {"title": "文章2"}]
```

**异步优势**:
- 在IO密集型操作中性能提升显著
- 可以同时处理数千个并发连接
- 不需要额外的线程或进程
- 适合数据库查询、API调用等场景

#### 4. 并发安全与资源竞争问题

**问题场景:多个请求修改同一资源**

```python
from fastapi import FastAPI, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

# ❌ 危险:没有并发控制
@app.post("/wallet/add/{user_id}")
async def add_money(user_id: int, amount: int):
    # 两个请求同时执行:
    # 请求A: 读到balance=100
    # 请求B: 读到balance=100
    # 请求A: 写入150 (+50)
    # 请求B: 写入130 (+30)  ← A的修改丢失了!
    balance = await get_balance(user_id)  # balance=100
    new_balance = balance + amount
    await update_balance(user_id, new_balance)
    return {"balance": new_balance}
```

**解决方案1:使用数据库事务(推荐)**

```python
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession

@app.post("/wallet/add/{user_id}")
async def add_money(
    user_id: int,
    amount: int,
    db: AsyncSession
):
    # 使用数据库行锁(FOR UPDATE)
    async with db.begin():  # 自动事务
        # SELECT ... FOR UPDATE - 锁定该行
        stmt = select(User).where(
            User.id == user_id
        ).with_for_update()
        result = await db.execute(stmt)
        user = result.scalar_one()

        # 其他请求会等待锁释放
        user.balance += amount
        await db.flush()  # 提交后自动释放锁

    return {"balance": user.balance}
```

**解决方案2:使用分布式锁(Redis)**

```python
import aioredis
from contextlib import asynccontextmanager

redis = aioredis.from_url("redis://localhost")

@asynccontextmanager
async def redis_lock(lock_key: str, expire: int = 10):
    """分布式锁上下文管理器"""
    lock = None
    try:
        lock = await redis.set(
            lock_key, "locked",
            nx=True, ex=expire
        )
        if not lock:
            raise HTTPException(
                400, "操作正在进行,请稍后重试"
            )
        yield
    finally:
        if lock:
            await redis.delete(lock_key)

@app.post("/wallet/add/{user_id}")
async def add_money(user_id: int, amount: int):
    lock_key = f"lock:user:{user_id}"

    # 获取锁,同一用户的请求串行执行
    async with redis_lock(lock_key):
        balance = await get_balance(user_id)
        new_balance = balance + amount
        await update_balance(user_id, new_balance)

    return {"balance": new_balance}
```

**解决方案3:原子操作(最佳性能)**

```python
@app.post("/wallet/add/{user_id}")
async def add_money(user_id: int, amount: int):
    # 直接使用SQL原子操作
    await db.execute(
        update(User)
        .where(User.id == user_id)
        .values(balance=User.balance + amount)  # 原子递增
    )
    return {"message": "success"}
```

**并发控制机制对比**:

| 方案 | 适用场景 | 性能 | 一致性 |
|------|---------|------|--------|
| 数据库事务 | 单数据库应用 | ⚡⚡⚡ | 强一致 |
| 分布式锁 | 多服务/缓存 | ⚡⚡ | 强一致 |
| 原子操作 | 简单递增/递减 | ⚡⚡⚡ | 强一致 |
| 乐观锁 | 读多写少 | ⚡⚡⚡ | 最终一致 |

**关键要点**:
- `await` 本身不保证线程安全
- 多个请求可以同时执行到同一个 `await` 之前
- 必须使用锁、事务等机制保护共享资源
- 优先使用数据库级别的原子操作

## 自动文档生成 FastAPI 自动生成交互式 API 文档! ### Swagger UI 访问: `http://127.0.0.1:8000/docs` **功能**: - 📖 浏览所有 API 端点 - 🧪 在线测试 API - 📝 查看请求/响应模型 ### ReDoc 访问: `http://127.0.0.1:8000/redoc` **功能**: - 📄 更美观的文档展示 - 🔍 搜索功能 - 📱 移动端友好 ### 自定义文档 ```python app = FastAPI( title="Simple API", description="这是一个简单的 FastAPI 示例", version="0.1.0", docs_url="/docs", # Swagger UI 路径 redoc_url="/redoc", # ReDoc 路径 ) ``` ---

 


FastAPI APP
``` application = FastAPI( **settings.FASTAPI_KWARGS, lifespan=lifespan, redirect_slashes=False ) ``` ## `redirect_slashes=False` 参数解释 ### **作用** 控制 FastAPI 是否自动处理 URL 末尾斜杠的重定向行为。 --- ### **默认行为 (`redirect_slashes=True`)** 当设置为 `True`(默认值)时,FastAPI 会自动: - 将 `/path` 重定向到 `/path/`(如果 `/path/` 有定义) - 或将 `/path/` 重定向到 `/path`(如果 `/path` 有定义) **示例:** ```python from fastapi import FastAPI app = FastAPI(redirect_slashes=True) # 默认值 @app.get("/items") def get_items(): return {"msg": "items list"} @app.get("/items/{item_id}") def get_item(item_id: int): return {"msg": f"item {item_id}"} ``` **访问效果:** - `/items` → 200 OK(直接匹配) - `/items/` → **307 重定向** → `/items`(自动添加斜杠后重定向) - `/items/123` → 200 OK - `/items/123/` → **307 重定向** → `/items/123` --- ### **设置 `redirect_slashes=False` 的行为** 禁用自动重定向后: - `/path` 和 `/path/` 被视为**完全不同的路径** - 如果某个路径没有精确匹配的路由,返回 **404 Not Found** **示例:** ```python app = FastAPI(redirect_slashes=False) @app.get("/items") def get_items(): return {"msg": "items list"} # 注意:没有定义 @app.get("/items/") ``` **访问效果:** - `/items` → 200 OK - `/items/` → **404 Not Found**(因为没有定义带斜杠的路由) --- ### **使用场景对比** | 场景 | `redirect_slashes=True` (默认) | `redirect_slashes=False` | |------|-------------------------------|--------------------------| | RESTful API | ✅ 推荐,用户体验好 | ❌ 可能让用户困惑 | | 需要精确区分路径 | ❌ 自动重定向会干扰 | ✅ 严格区分带/不带斜杠 | | 反向代理环境 | ⚠️ 可能产生多余重定向 | ✅ 避免重定向开销 | | 前后端分离(SPA) | ✅ 处理用户随意输入 | ⚠️ 需前端处理路径规范 | --- ### **实际影响示例** ```python from fastapi import FastAPI # 配置1:默认行为 app1 = FastAPI(redirect_slashes=True) @app1.get("/users") def get_users(): return {"users": ["alice", "bob"]} # 访问 /users/ → 307 重定向到 /users ✅ # 配置2:禁用重定向 app2 = FastAPI(redirect_slashes=False) @app2.get("/users") def get_users(): return {"users": ["alice", "bob"]} # 访问 /users/ → 404 Not Found ❌ # 需要额外定义: @app2.get("/users/") def get_users_trailing(): return {"users": ["alice", "bob"]} ``` --- ### **为什么你的代码设置为 `False`?** 常见原因: 1. **严格的 API 设计**:要求客户端必须使用精确的 URL 格式 2. **避免重定向性能开销**:高并发场景下减少 307 响应 3. **与现有系统兼容**:其他服务期望特定路径格式 4. **Kubernetes Ingress 等代理环境**:避免多层重定向问题 --- ### **总结** - `redirect_slashes=True`(默认):友好但可能产生多余重定向 - `redirect_slashes=False`:严格但需要客户端精确匹配路径 你的代码选择 `False` 意味着采用了**更严格、更精确的路径匹配策略**。

 


 


 


 


请求与响应

 

#### GET 请求

```python
@app.get("/users/")
async def get_users():
    return [{"id": 1, "name": "张三"}, {"id": 2, "name": "李四"}]
```

#### POST 请求

```python
from pydantic import BaseModel

class User(BaseModel):
    name: str
    age: int

@app.post("/users/")
async def create_user(user: User):
    """
    请求体:
    {
        "name": "张三",
        "age": 25
    }
    """
    return {"message": f"用户 {user.name} 创建成功", "user": user}
```
    

 
#### PUT 请求
```python
@app.put("/users/{user_id}")
async def update_user(user_id: int, user: User):
    return {"message": f"用户 {user_id} 更新成功"}
```

#### DELETE 请求
```python
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
    return {"message": f"用户 {user_id} 删除成功"}
```

 

    
## 不使用 async 关键字 当去掉 `async` 关键字后,路由处理函数会变成**普通函数**(`def`): ```python from fastapi import FastAPI app = FastAPI() @app.get("/") # 使用普通函数 def root(): return {"message": "Hello World"} ``` ### 与 async def 的区别 | 特性 | async def (异步) | def (同步) | |------|------------------|------------| | **执行方式** | 在事件循环中执行 | 在线程池中执行 | | **并发模型** | 协程并发 | 线程并发 | | **适用场景** | I/O 密集型操作 | CPU 密集型或简单操作 | | **并发能力** | 更高(无阻塞) | 受线程池限制(默认约40个线程) | | **线程切换** | 轻量级 | 较重 | ### 关键点说明 1. **不会排队执行** - FastAPI 会自动在线程池中运行普通函数,多个请求仍然可以并发处理 2. **线程池限制** - 并发能力受限于线程池大小(默认约 40 个线程),超过限制的请求会等待 3. **适用场景** - 对于简单的、快速的响应操作,使用 `def` 完全可以 ### 推荐做法 - ✅ **推荐使用 `async def`**:适合大多数场景,特别是涉及数据库查询、API 调用等 I/O 操作 - ✅ **可以使用 `def`**:适合非常简单的响应或纯 CPU 计算 - ⚠️ **避免混用**:在一个项目中尽量保持一致性

 


 


 


 


 


 


异步支持
``` import asyncio counter = 0 lock = asyncio.Lock() async def safe_increment(): global counter async with lock: # 保护整个操作 temp = counter await asyncio.sleep(0.01) # 即使在await期间,锁也不会释放 counter = temp + 1 async def main(): global counter tasks = [safe_increment() for _ in range(100)] await asyncio.gather(*tasks) print(f"最终结果: {counter}") # 保证是100 # asyncio.run(main()) ```
- ``` from fastapi import FastAPI import threading app = FastAPI() counter = 0 lock = threading.Lock() @app.get("/increment/") def increment_counter(): global counter with lock: current_value = counter # 模拟一些耗时的操作 import time time.sleep(1) counter = current_value + 1 return {"counter": counter} ```

 
- await 本身不保证线程安全
- 多个请求可以同时执行到同一个 await 之前 
  - await只是让等待一个异步线程执行完成,却不阻止其他线程访问该方法 

- 必须使用锁、事务等机制保护共享资源
  - 优先使用数据库级别的原子操作
    

 

    

 
在 FastAPI 中,`await` 和非同步编程涉及到一些重要概念。让我详细解释一下:

## 1. **异步模型基础**

在 FastAPI(基于 asyncio)中:
- **没有真正的多线程并发执行**:asyncio 使用单线程事件循环
- `await` 不会阻塞线程,而是让出控制权给事件循环
- 看似“同时”执行,实际是协程间的协作式多任务

## 2. **代码示例演示**

```python
from fastapi import FastAPI
import asyncio
import time

app = FastAPI()

# 共享资源示例
counter = 0
shared_data = {}

@app.get("/async-endpoint1")
async def endpoint1():
    global counter
    # 模拟异步IO操作
    await asyncio.sleep(1)
    current = counter
    await asyncio.sleep(0.1)  # 模拟处理时间
    counter = current + 1
    return {"endpoint": 1, "counter": counter}

@app.get("/async-endpoint2")
async def endpoint2():
    global counter
    await asyncio.sleep(0.5)
    current = counter
    await asyncio.sleep(0.2)
    counter = current + 1
    return {"endpoint": 2, "counter": counter}
```

## 3. **并发访问机制**

### 场景分析:
```python
# 假设两个请求几乎同时到达:
# 请求1: endpoint1()
# 请求2: endpoint2()

# 执行时序可能如下:
# 时间点 0ms: endpoint1() 开始,读取 counter=0
# 时间点 0ms: endpoint2() 开始,读取 counter=0
# 时间点 100ms: endpoint1() 设置 counter=1
# 时间点 200ms: endpoint2() 设置 counter=1(覆盖了之前的修改)
# 结果:counter=1,但预期应该是2
```

## 4. **数据库并发问题**

对于数据库同一行数据的更新:

### **乐观锁机制**:
```python
# 使用版本号或时间戳
# SQL示例:
"""
UPDATE users 
SET balance = balance + 100, version = version + 1
WHERE id = 1 AND version = {current_version}
"""
# 检查 affected_rows,如果是0,说明版本已过期,需要重试
```

### **悲观锁机制**:
```python
# 在事务中使用 SELECT ... FOR UPDATE
from databases import Database

async def transfer_funds(db: Database, from_id: int, to_id: int, amount: float):
    async with db.transaction():
        # 锁定两行数据
        query1 = "SELECT balance FROM accounts WHERE id = :id FOR UPDATE"
        from_balance = await db.fetch_val(query1, values={"id": from_id})
        
        if from_balance >= amount:
            # 执行更新
            update1 = "UPDATE accounts SET balance = balance - :amount WHERE id = :id"
            await db.execute(update1, values={"id": from_id, "amount": amount})
            
            update2 = "UPDATE accounts SET balance = balance + :amount WHERE id = :id"
            await db.execute(update2, values={"id": to_id, "amount": amount})
```

## 5. **实际解决方案**

### **方案1:使用数据库事务**
```python
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update

async def update_user(session: AsyncSession, user_id: int):
    async with session.begin():
        # 获取并锁定行
        stmt = select(User).where(User.id == user_id).with_for_update()
        result = await session.execute(stmt)
        user = result.scalar_one()
        
        # 修改数据
        user.balance += 100
        await session.commit()
```

### **方案2:使用分布式锁**
```python
import redis.asyncio as redis
import asyncio

class DistributedLock:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    async def lock(self, key: str, timeout=10):
        import uuid
        identifier = str(uuid.uuid4())
        end = asyncio.get_event_loop().time() + timeout
        
        while asyncio.get_event_loop().time() < end:
            if await self.redis.setnx(key, identifier):
                await self.redis.expire(key, timeout)
                return identifier
            await asyncio.sleep(0.01)
        return None
    
    async def unlock(self, key: str, identifier: str):
        script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        await self.redis.eval(script, 1, key, identifier)

# 使用
redis_client = redis.Redis()
lock = DistributedLock(redis_client)

async def safe_update(user_id: int):
    lock_key = f"lock:user:{user_id}"
    lock_id = await lock.lock(lock_key)
    if lock_id:
        try:
            # 执行更新操作
            pass
        finally:
            await lock.unlock(lock_key, lock_id)
```

### **方案3:使用消息队列串行化**
```python
import asyncio
from collections import defaultdict

class TaskQueue:
    def __init__(self):
        self.queues = defaultdict(asyncio.Queue)
        self.locks = {}
    
    async def process_for_key(self, key: str, coro_func, *args):
        if key not in self.locks:
            self.locks[key] = asyncio.Lock()
        
        async with self.locks[key]:
            return await coro_func(*args)

queue = TaskQueue()

@app.post("/update/{user_id}")
async def update_user(user_id: int):
    async def _update():
        # 这个函数对同一个user_id会串行执行
        # 对不同user_id可以并行执行
        pass
    
    return await queue.process_for_key(f"user_{user_id}", _update)
```

## 6. **关键总结**

1. **asyncio 是单线程的**,真正的并发在 `await` 点切换
2. **竞态条件可能发生**:即使单线程,协程切换也可能导致数据不一致
3. **数据库层面需要额外保护**:
   - 使用事务和适当的隔离级别
   - 使用乐观锁或悲观锁
4. **应用层保护**:
   - 对共享资源使用锁(asyncio.Lock 用于单进程)
   - 使用分布式锁(跨进程/服务)
   - 使用消息队列串行化处理

## 7. **最佳实践建议**

```python
# 1. 尽量使用数据库的事务机制
# 2. 对关键操作使用行级锁或乐观锁
# 3. 避免在应用层维护共享状态
# 4. 如果必须,使用 asyncio.Lock(单进程)或分布式锁

import asyncio

# 应用层锁(仅限单进程)
lock = asyncio.Lock()

async def safe_operation():
    async with lock:  # 这会序列化所有访问
        # 操作共享资源
        pass
```

记住:**数据库的 ACID 特性是你的第一道防线**,应用层保护是补充。在设计系统时,优先考虑让数据库处理并发问题。

    

 

    

 

    

 
# **asyncio.Lock() 详细解析与多场景示例**

`asyncio.Lock()` 是 asyncio 中的互斥锁,用于在**单进程、单线程**的异步环境中保护共享资源。让我详细说明并提供多个例子。

## **1. 基本工作原理**

```python
import asyncio

lock = asyncio.Lock()  # 创建一个锁对象

async def critical_section():
    # 等待获取锁
    async with lock:  # 进入时自动获取锁
        # 临界区代码 - 同一时间只有一个协程能执行这里
        await asyncio.sleep(1)
        print("执行临界区代码")
    # 退出时自动释放锁

# 模拟并发
async def main():
    # 创建多个任务并发执行
    tasks = [critical_section() for _ in range(5)]
    await asyncio.gather(*tasks)

# 执行结果:虽然创建了5个任务,但它们会串行执行临界区代码
# 每个任务等待前一个释放锁后,才能进入临界区
```

## **2. 示例1:保护共享计数器**

### **问题场景(无锁时):**
```python
import asyncio

counter = 0

async def unsafe_increment():
    global counter
    temp = counter  # 读取当前值
    await asyncio.sleep(0.01)  # 模拟IO操作,让出控制权
    counter = temp + 1  # 写回新值

async def main():
    global counter
    # 并发执行100次自增
    tasks = [unsafe_increment() for _ in range(100)]
    await asyncio.gather(*tasks)
    print(f"最终结果: {counter}")  # 可能不是100!
    
# 运行结果可能小于100,因为存在竞态条件
```

### **解决方案(使用锁):**
```python
import asyncio

counter = 0
lock = asyncio.Lock()

async def safe_increment():
    global counter
    async with lock:  # 保护整个操作
        temp = counter
        await asyncio.sleep(0.01)  # 即使在await期间,锁也不会释放
        counter = temp + 1

async def main():
    global counter
    tasks = [safe_increment() for _ in range(100)]
    await asyncio.gather(*tasks)
    print(f"最终结果: {counter}")  # 保证是100

# asyncio.run(main())
```

## **3. 示例2:数据库连接池管理**

```python
import asyncio
from typing import List

class DatabaseConnectionPool:
    def __init__(self, pool_size: int = 5):
        self.pool_size = pool_size
        self.available = [f"conn_{i}" for i in range(pool_size)]
        self.in_use = []
        self.lock = asyncio.Lock()  # 保护连接池状态
    
    async def get_connection(self):
        async with self.lock:
            # 等待直到有可用连接
            while not self.available:
                print("无可用连接,等待中...")
                # 临时释放锁,让其他协程可以归还连接
                await self.lock.release()
                await asyncio.sleep(0.1)  # 短暂等待
                await self.lock.acquire()  # 重新获取锁
            
            conn = self.available.pop()
            self.in_use.append(conn)
            print(f"获取连接: {conn}, 可用: {self.available}, 使用中: {self.in_use}")
            return conn
    
    async def release_connection(self, conn: str):
        async with self.lock:
            if conn in self.in_use:
                self.in_use.remove(conn)
                self.available.append(conn)
                print(f"释放连接: {conn}, 可用: {self.available}, 使用中: {self.in_use}")

# 使用示例
async def query_database(pool: DatabaseConnectionPool, task_id: int):
    conn = await pool.get_connection()
    try:
        # 模拟数据库查询
        await asyncio.sleep(0.5)
        print(f"任务 {task_id} 使用连接 {conn} 执行查询")
    finally:
        await pool.release_connection(conn)

async def main():
    pool = DatabaseConnectionPool(pool_size=3)
    tasks = [query_database(pool, i) for i in range(10)]
    await asyncio.gather(*tasks)
```

## **4. 示例3:缓存系统**

```python
import asyncio
from datetime import datetime, timedelta

class AsyncCache:
    def __init__(self):
        self.cache = {}
        self.lock = asyncio.Lock()
    
    async def get(self, key: str, default=None):
        async with self.lock:
            if key in self.cache:
                entry = self.cache[key]
                if entry['expires_at'] > datetime.now():
                    return entry['value']
                else:
                    # 已过期,删除
                    del self.cache[key]
            return default
    
    async def set(self, key: str, value, ttl_seconds: int = 60):
        async with self.lock:
            self.cache[key] = {
                'value': value,
                'expires_at': datetime.now() + timedelta(seconds=ttl_seconds)
            }
    
    async def clear_expired(self):
        async with self.lock:
            now = datetime.now()
            expired_keys = [
                key for key, entry in self.cache.items()
                if entry['expires_at'] <= now
            ]
            for key in expired_keys:
                del self.cache[key]
            return len(expired_keys)

# 使用示例
cache = AsyncCache()

async def get_user_data(user_id: str):
    # 先尝试从缓存获取
    data = await cache.get(f"user:{user_id}")
    if data is not None:
        print(f"缓存命中: {user_id}")
        return data
    
    # 缓存未命中,模拟从数据库获取
    print(f"缓存未命中,查询数据库: {user_id}")
    await asyncio.sleep(0.5)  # 模拟数据库查询
    data = f"用户 {user_id} 的数据"
    
    # 写入缓存
    await cache.set(f"user:{user_id}", data, ttl_seconds=10)
    return data

async def main():
    # 模拟并发请求
    tasks = []
    for i in range(5):
        tasks.append(get_user_data("user1"))  # 相同key,应该只有一个真正查询数据库
        tasks.append(get_user_data(f"user{i}"))  # 不同key,会并发查询
    
    await asyncio.gather(*tasks)
```

## **5. 示例4:限流器(Rate Limiter)**

```python
import asyncio
import time

class RateLimiter:
    def __init__(self, requests_per_second: int):
        self.requests_per_second = requests_per_second
        self.timestamps = []
        self.lock = asyncio.Lock()
    
    async def acquire(self):
        async with self.lock:
            now = time.time()
            
            # 移除1秒前的记录
            cutoff = now - 1.0
            self.timestamps = [ts for ts in self.timestamps if ts > cutoff]
            
            # 检查是否超过限制
            if len(self.timestamps) >= self.requests_per_second:
                # 计算需要等待的时间
                oldest = self.timestamps[0]
                wait_time = oldest + 1.0 - now
                
                # 释放锁,等待指定时间
                await self.lock.release()
                try:
                    await asyncio.sleep(wait_time)
                finally:
                    await self.lock.acquire()  # 重新获取锁
                
                # 重新计算(因为等待期间可能有变化)
                now = time.time()
                self.timestamps = [ts for ts in self.timestamps if ts > now - 1.0]
            
            # 添加当前时间戳
            self.timestamps.append(now)

# 使用示例
limiter = RateLimiter(requests_per_second=3)  # 每秒最多3个请求

async def make_request(request_id: int):
    await limiter.acquire()  # 等待直到允许执行
    print(f"请求 {request_id} 在 {time.time():.2f} 执行")
    await asyncio.sleep(0.1)  # 模拟请求处理

async def main():
    # 模拟10个并发请求
    tasks = [make_request(i) for i in range(10)]
    await asyncio.gather(*tasks)
```

## **6. 示例5:工作队列**

```python
import asyncio
from collections import deque

class AsyncWorkQueue:
    def __init__(self, max_workers: int = 3):
        self.max_workers = max_workers
        self.queue = deque()
        self.active_workers = 0
        self.lock = asyncio.Lock()
        self.not_empty = asyncio.Condition(lock=self.lock)
        self.not_full = asyncio.Condition(lock=self.lock)
    
    async def put(self, item):
        async with self.lock:
            # 等待队列不满(如果有最大长度限制)
            self.queue.append(item)
            self.not_empty.notify()  # 通知有任务可处理
    
    async def get(self):
        async with self.lock:
            # 等待队列不空
            while not self.queue:
                await self.not_empty.wait()
            return self.queue.popleft()
    
    async def worker(self, worker_id: int):
        while True:
            try:
                # 获取任务
                task = await self.get()
                
                async with self.lock:
                    self.active_workers += 1
                
                # 执行任务
                print(f"Worker {worker_id} 开始处理任务: {task}")
                await asyncio.sleep(1)  # 模拟任务处理
                print(f"Worker {worker_id} 完成任务: {task}")
                
                async with self.lock:
                    self.active_workers -= 1
                    if self.active_workers < self.max_workers:
                        self.not_full.notify()  # 通知可以添加新任务
                        
            except asyncio.CancelledError:
                break
    
    async def start(self, num_workers: int = None):
        if num_workers is None:
            num_workers = self.max_workers
        
        workers = []
        for i in range(num_workers):
            worker_task = asyncio.create_task(self.worker(i))
            workers.append(worker_task)
        
        return workers

# 使用示例
async def main():
    queue = AsyncWorkQueue(max_workers=2)
    
    # 启动工作者
    workers = await queue.start()
    
    # 添加任务
    for i in range(10):
        await queue.put(f"任务{i}")
        print(f"已添加任务{i}")
        await asyncio.sleep(0.2)  # 控制添加速度
    
    # 等待所有任务完成
    await asyncio.sleep(10)
    
    # 清理
    for worker in workers:
        worker.cancel()
```

## **7. 重要注意事项**

### **锁的嵌套使用:**
```python
async def nested_locks_example():
    lock1 = asyncio.Lock()
    lock2 = asyncio.Lock()
    
    async with lock1:
        print("获取 lock1")
        await asyncio.sleep(0.1)
        
        # 注意:这可能导致死锁!
        async with lock2:
            print("获取 lock2")
            await asyncio.sleep(0.1)
```

### **避免死锁的建议:**
1. **固定获取顺序**:总是按相同顺序获取多个锁
2. **使用超时**:
```python
try:
    await asyncio.wait_for(lock.acquire(), timeout=5.0)
    # 执行操作
except asyncio.TimeoutError:
    print("获取锁超时")
finally:
    if lock.locked():
        lock.release()
```

3. **使用 `asyncio.Lock()` 而不是 `threading.Lock()`**

### **性能考虑:**
```python
# 不好的做法:锁的范围太大
async def bad_example():
    async with lock:
        # IO操作不需要在锁内
        data = await fetch_from_network()  # 这会阻塞其他协程
        process(data)

# 好的做法:只保护必要的部分
async def good_example():
    # IO操作在锁外
    data = await fetch_from_network()
    
    # 只保护共享资源的访问
    async with lock:
        shared_resource.update(data)
```

## **8. 总结**

`asyncio.Lock()` 的关键点:
1. **协程级别互斥**:保护单进程内的异步协程访问共享资源
2. **非阻塞等待**:等待锁时不会阻塞整个线程
3. **可重入**:同一个协程可以多次获取同一个锁(计数机制)
4. **需要配合 `async with` 使用**:确保锁的正确释放

**适用场景**:
- 保护内存中的共享数据结构
- 管理有限资源(连接池、线程池)
- 实现异步的同步原语(信号量、条件变量等)
- 单进程内的并发控制

**不适用场景**:
- 多进程环境(需要使用 `multiprocessing.Lock()`)
- 分布式系统(需要使用分布式锁如 Redis 锁)
- 数据库并发(应该使用数据库的事务和锁机制)


 
asyncio.gather 的作用是并发执行多个协程(任务),并等待它们全部完成。

```
async def main():
    # 模拟10个并发请求
    tasks = [make_request(i) for i in range(10)]
    await asyncio.gather(*tasks)
```

- **并发执行**   | 10个 `make_request` 同时启动,不是顺序执行 
- **等待全部完成** | 会阻塞直到最后一个任务结束                  
- **收集结果**   | 返回所有任务的返回值列表(按传入顺序)            
- **异常传播**   | 默认第一个异常会取消其他任务                 


对比:不用 vs 用 gather

 
```
# ❌ 顺序执行(慢)
for i in range(10):
    await make_request(i)  # 等1个完成再启动下一个,总耗时 = 10 × 单个耗时

# ✅ 并发执行(快)
tasks = [make_request(i) for i in range(10)]
await asyncio.gather(*tasks)  # 10个同时启动,总耗时 ≈ 最慢的那个
```

实际运行流程

 
```
import asyncio
import aiohttp

async def make_request(i):
    print(f"[{i}] 开始")
    await asyncio.sleep(1)  # 模拟网络IO
    print(f"[{i}] 完成")
    return f"结果{i}"

async def main():
    tasks = [make_request(i) for i in range(3)]
    results = await asyncio.gather(*tasks)
    print(f"所有结果: {results}")

asyncio.run(main())
```
[0] 开始
[1] 开始
[2] 开始
[0] 完成  (约1秒后)
[1] 完成
[2] 完成
所有结果: ['结果0', '结果1', '结果2']

常用参数

 
```
results = await asyncio.gather(
    *tasks,
    return_exceptions=True  # 异常不抛出,作为结果返回
)
```

在 FastAPI 中的典型应用

 
```
@app.get("/batch-users")
async def get_batch_users(user_ids: list[int]):
    # 并发查询多个用户,而不是循环 await
    tasks = [fetch_user(uid) for uid in user_ids]
    users = await asyncio.gather(*tasks)
    return users
```
一句话总结:gather 是 asyncio 的"并发执行 + 汇总结果"工具,让多个 IO 操作并行处理,大幅提升效率。

 


 


 


 


依赖注入
## 4. 依赖注入系统 ([api/deps.py](D:\wks\aisql\chat2sql-develop\src\chat2sql\api\deps.py)) ### 4.1 数据库会话依赖 ```python # api/deps.py:7-12 def get_db_session() -> Generator[Session, None, None]: with Session(engine) as session: yield session SessionDep = Annotated[Session, Depends(get_db_session)] ``` #### Depends 详解 **作用:** - `Depends` 是 FastAPI 的依赖注入核心工具类 - 用于在路由处理函数执行前自动调用依赖函数,并将返回值注入到处理函数参数中 - 在请求处理过程中,FastAPI 会自动解析 `Depends`,调用 `get_db_session()`,并将返回的 `Session` 实例传递给需要它的路由函数 - 依赖函数可以是任何可调用对象(函数、类等) - 它会随同request自身的参数一起注入方法,在方法中不仅可以使用request参数,还可以使用依赖注入的参数 - 实现了控制反转(IoC)和依赖管理解耦 **核心用法:** 1. **基本用法** - 直接使用 Depends ```python from fastapi import Depends def get_db_session(): with Session(engine) as session: yield session @router.post("/endpoint") def endpoint(session: Session = Depends(get_db_session)): # 直接使用 user = session.get(UserModel, user_id) ``` 2. **推荐用法** - 使用 Annotated 类型别名(项目采用) ```python from typing import Annotated from fastapi import Depends SessionDep = Annotated[Session, Depends(get_db_session)] # 定义类型别名 @router.post("/endpoint") def endpoint(session: SessionDep): # 更简洁,类型提示更清晰 user = session.get(UserModel, user_id) ``` **Depends 的工作原理:** ``` ┌────────────────────────────────────────────────────────────────┐ │ 1. FastAPI 解析路由函数签名 │ │ def endpoint(session: SessionDep): │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 2. 识别 SessionDep 类型定义 │ │ SessionDep = Annotated[Session, Depends(get_db_session)] │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 3. 调用依赖函数 get_db_session() │ │ - 执行 with Session(engine) as session │ │ - yield 返回 session 对象 │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 4. 将 session 注入到 endpoint 函数的 session 参数 │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 5. 执行 endpoint 业务逻辑 │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 6. endpoint 执行完毕,get_db_session 继续 │ │ - yield 后的清理代码执行(session.close()) │ └────────────────────────────────────────────────────────────────┘ ``` **Depends 的优势:** - **复用性**:一次定义,多处使用 - **测试友好**:可轻松替换依赖函数为 mock 实现 - **缓存控制**:通过 `use_cache=False` 控制是否缓存依赖结果 - **嵌套依赖**:依赖函数可以依赖其他依赖 - **自动文档**:自动生成 API 文档中的参数说明 **高级用法示例:** ```python # 带参数的依赖 def get_current_user(token: str = Header(...)): return decode_token(token) UserDep = Annotated[User, Depends(get_current_user)] # 嵌套依赖 def get_db_session(): with Session(engine) as session: yield session def get_repository(session: SessionDep = Depends()): return UserRepository(session) RepositoryDep = Annotated[UserRepository, Depends(get_repository)] # 禁用缓存(每次调用都重新执行) def get_time(): return datetime.now() TimeDep = Annotated[datetime, Depends(get_time, use_cache=False)] ``` **使用方式:** ```python @router.post("/endpoint") def endpoint(session: SessionDep): # 自动注入数据库会话 user = session.get(UserModel, user_id) ``` ### 4.2 依赖注入流程 ``` 请求到达 -> FastAPI 识别 SessionDep 依赖 -> 调用 get_db_session() -> 创建 Session(engine) -> yield session 到路由处理函数 -> 处理函数执行完毕 -> session 自动清理(上下文管理器) ```

 


 


 


 


env配置
- 上面的方式比下面的方式更可靠,因为它使用了 `pathlib` 来构建绝对路径,避免了工作目录变化导致的 `.env` 文件无法找到的问题。 ``` class Settings(BaseSettings): # 使用新的 model_config 配置方式 # 使用绝对路径加载 .env 文件(避免工作目录问题) model_config = SettingsConfigDict( env_file=pathlib.Path(__file__).parent.parent.parent.parent / '.env', # 绝对路径 env_file_encoding='utf-8', # 编码 extra='ignore', # 忽略未定义字段 env_ignore_empty=True, # 忽略空值 ) class Settings(BaseSettings): # 使用新的 model_config 配置方式 model_config = SettingsConfigDict( env_file=os.path.join(os.getcwd(), '.env'), # .env 文件路径 env_file_encoding='utf-8', # 编码 extra='ignore', # 忽略未定义字段 env_ignore_empty=True, # 忽略空值 ) ```

 


 


 


 


SQLModel
#### SQLModel.metadata - 在 SQLAlchemy/SQLModel 中,**元数据**是一个中央注册表,用于跟踪所有定义的表结构。 - Alchemy 英/ˈælkəmi/ 美/ˈælkəmi/ - class User(SQLModel, table=True)会自动注册到 SQLModel.metadata 中 - `SQLModel.metadata.create_all(bind=engine)` 创建所有已注册的表 ```python # SQLModel 内部使用 SQLAlchemy 的 MetaData from sqlalchemy import MetaData # 每个 SQLModel 类都有一个 metadata 属性 class User(SQLModel, table=True): __tablename__ = "users" id: Optional[int] = Field(primary_key=True) name: str # User 类会自动注册到 SQLModel.metadata 中 ``` #### 表模型的自动注册机制 **关键点**: 当定义一个继承自 `SQLModel` 且 `table=True` 的类时,该类会**自动注册**到 `SQLModel.metadata` 中。 **文件**: `elec\fastapi\src\fastapi_base\models\users.py` ```python from sqlmodel import SQLModel, Field as SQLField class UserBase(SQLModel): """用户基础字段""" username: str = SQLField(index=True, unique=True, max_length=50) # ... 其他字段 # 关键: table=True 触发自动注册 class User(UserBase, table=True): """用户表模型""" __tablename__ = "users" id: Optional[int] = SQLField(default=None, primary_key=True) created_at: datetime = SQLField(default_factory=datetime.now) ``` #### 导入触发的注册过程 **文件**: `elec\fastapi\src\fastapi_base\models\__init__.py` ```python """数据模型模块""" from sqlmodel import SQLModel # 关键: 这行导入会触发 users.py 的执行 from . import users ``` **执行流程**: 1. `init_db()` 执行 from fastapi_base.models import SQLModel 2. Python 解释器加载 fastapi_base.models.__init__.py 3. `__init__.py` 中的 from . import users 被执行 4. `users.py` 中的 class User(UserBase, table=True) 定义被执行 5. `User` 类定义时,由于 table=True,自动调用: ```python # 伪代码展示 SQLModel 内部机制 class User(UserBase, table=True): def __init_subclass__(cls, table=False, **kwargs): if table: # 将表信息注册到 SQLModel.metadata SQLModel.metadata.tables[cls.__tablename__] = cls.__table__ ``` 6. `User` 表被注册到 `SQLModel.metadata` 中 7. `init_db()` 调用 `SQLModel.metadata.create_all(bind=engine)` 创建所有已注册的表 ---

 

    

 

    

 


 


 


 


 


 


数据库
# MySQL 连接丢失问题排查与解决方案 > 错误信息: `(pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query')` > 生成日期: 2026-04-10 --- ## 问题分析 ### 错误特征 ``` pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query') [SQL: SELECT t_user.id, t_user.created_at, t_user.updated_at, t_user.username, t_user.realname, t_user.email, t_user.password_hash, t_user.`role`, t_user.is_active, t_user.last_login FROM t_user WHERE t_user.username = %(username_1)s] [parameters: {'username_1': 'admin'}] ``` ### 关键线索 1. **环境**: "另外一台新电脑上登录" 2. **连接池配置**: `poolclass=NullPool` (每次都创建新连接) 3. **日志显示**: `[cached since 2547s ago]` - 说明使用了缓存的连接 4. **错误时机**: 在查询执行过程中连接丢失 --- ## 可能原因 ### 1. MySQL 服务器超时配置 ⭐ 最可能 MySQL 服务器有多种超时设置,长时间空闲的连接会被服务器主动断开: | 参数 | 默认值 | 说明 | |------|--------|------| | `wait_timeout` | 28800秒 (8小时) | 非交互式连接超时 | | `interactive_timeout` | 28800秒 (8小时) | 交互式连接超时 | | `net_read_timeout` | 30秒 | 读取超时 | | `net_write_timeout` | 60秒 | 写入超时 | **问题场景**: - 应用启动时建立了数据库连接 - 连接空闲超过 `wait_timeout` 时间 - MySQL 服务器断开连接 - 应用尝试使用已断开的连接 → **连接丢失错误** ### 2. 网络问题 新电脑的网络环境可能导致: - 防火墙中断长时间空闲的连接 - NAT 路由器超时 - VPN/代理连接不稳定 ### 3. MySQL 连接数限制 ```sql -- 检查最大连接数 SHOW VARIABLES LIKE 'max_connections'; -- 检查当前连接数 SHOW STATUS LIKE 'Threads_connected'; ``` 如果连接数达到上限,新请求会被拒绝。 ### 4. MySQL 服务重启 MySQL 服务重启会导致所有现有连接失效。 --- ## 解决方案 ### 方案 1: 配置连接池(推荐)⭐ 使用 SQLAlchemy 的连接池自动管理连接的生命周期: ```python # core/db.py from sqlmodel import create_engine from sqlalchemy.pool import QueuePool engine = create_engine( str(settings.SQLALCHEMY_DATABASE_URI), poolclass=QueuePool, # 使用连接池 pool_size=5, # 连接池大小 max_overflow=10, # 最大溢出连接数 pool_recycle=3600, # 连接回收时间(秒)⭐ 关键配置 pool_pre_ping=True, # 连接前先测试⭐ 推荐开启 echo=False, future=True, ) ``` **关键参数说明**: | 参数 | 说明 | 推荐值 | |------|------|--------| | `pool_recycle` | 连接在使用超过该时间后会被回收,避免使用过期连接 | 3600 (1小时) < `wait_timeout` | | `pool_pre_ping` | 每次从连接池获取连接时先测试连接是否有效 | True (强烈推荐) | | `pool_size` | 常驻连接池大小 | 5-10 | | `max_overflow` | 额外溢出连接数 | 10-20 | **为什么 `pool_pre_ping=True` 很重要**: - 每次获取连接时执行 `SELECT 1` 测试 - 自动检测并丢弃失效连接 - 对性能影响很小(约 1-2ms) ### 方案 2: 使用连接池 + 连接保活 ```python from sqlmodel import create_engine from sqlalchemy.pool import QueuePool engine = create_engine( str(settings.SQLALCHEMY_DATABASE_URI), poolclass=QueuePool, pool_size=5, max_overflow=10, pool_recycle=1800, # 30分钟回收 pool_pre_ping=True, connect_args={ "connect_timeout": 10, # 连接超时 "read_timeout": 30, # 读取超时 "write_timeout": 30, # 写入超时 }, echo=False, future=True, ) ``` ### 方案 3: 调整 MySQL 服务器配置 如果无法修改应用代码,可以在 MySQL 服务器端调整: ```sql -- 查看当前配置 SHOW VARIABLES LIKE '%timeout%'; -- 临时调整(重启后失效) SET GLOBAL wait_timeout = 86400; -- 24小时 SET GLOBAL interactive_timeout = 86400; -- 永久调整(需修改配置文件并重启) -- 在 my.cnf 或 my.ini 中添加: [mysqld] wait_timeout = 86400 interactive_timeout = 86400 ``` ### 方案 4: 保持 NullPool 但添加重试机制 如果必须使用 `NullPool`(例如某些特殊场景),可以添加重试逻辑: ```python from sqlalchemy import exc from time import sleep def execute_with_retry(session, statement, max_retries=3): """执行数据库查询,带重试机制""" for attempt in range(max_retries): try: return session.exec(statement).first() except exc.OperationalError as e: if attempt == max_retries - 1: raise if "Lost connection" in str(e) or "MySQL server has gone away" in str(e): sleep(1) # 等待1秒后重试 continue raise ``` ### 方案 5: 使用依赖注入 + 自动重连 结合 FastAPI 的依赖注入系统: ```python # api/deps.py from sqlmodel import Session from typing import Annotated, Generator from fastapi import Depends def get_db_session() -> Generator[Session, None, None]: """获取数据库会话,带连接检查""" from sqlalchemy import exc max_retries = 3 for attempt in range(max_retries): try: with Session(engine) as session: # 测试连接 session.exec(text("SELECT 1")) yield session break # 成功则退出 except exc.OperationalError as e: if attempt == max_retries - 1: raise if "Lost connection" in str(e): continue # 重试 raise SessionDep = Annotated[Session, Depends(get_db_session)] ``` --- ## 排查步骤 ### 1. 检查 MySQL 超时配置 ```sql -- 登录 MySQL mysql -u root -p -- 查看超时配置 SHOW VARIABLES LIKE '%timeout%'; ``` **预期输出**: ``` +--------------------------+-------+ | Variable_name | Value | +--------------------------+-------+ | connect_timeout | 10 | | wait_timeout | 28800 | ⭐ 检查这个 | interactive_timeout | 28800 | ⭐ 检查这个 | net_read_timeout | 30 | | net_write_timeout | 60 | +--------------------------+-------+ ``` ### 2. 检查网络连接 ```bash # 从新电脑测试 MySQL 连接 ping telnet 3306 # 检查是否有防火墙规则 sudo iptables -L -n | grep 3306 ``` ### 3. 检查应用日志 从错误日志看: ``` [cached since 2547s ago] ⭐ 约42分钟前缓存的连接 ``` 这说明连接已经存在约42分钟,可能已经超过 MySQL 的 `wait_timeout` 或遇到网络问题。 ### 4. 临时验证 在 MySQL 命令行中: ```sql -- 查看当前连接 SHOW PROCESSLIST; -- 手动 kill 一个长时间空闲的连接,看是否能复现问题 KILL ; ``` --- ## 推荐配置(生产环境) ```python # core/db.py from sqlmodel import create_engine from sqlalchemy.pool import QueuePool engine = create_engine( str(settings.SQLALCHEMY_DATABASE_URI), # 连接池配置 poolclass=QueuePool, pool_size=10, # 根据并发量调整 max_overflow=20, pool_recycle=3600, # 1小时回收(小于 MySQL wait_timeout) pool_pre_ping=True, # ⭐ 必须开启 # 连接参数 connect_args={ "connect_timeout": 10, "read_timeout": 30, "write_timeout": 30, "charset": "utf8mb4", }, # 其他配置 echo=False, # 生产环境关闭 SQL 日志 future=True, pool_use_lifo=True, # 后进先出,减少连接创建 ) ``` --- ## 监控与日志 ### 添加连接监控 ```python # core/db.py import logging logger = logging.getLogger(__name__) # 记录连接池状态 def log_pool_status(): pool = engine.pool logger.info(f"Pool status: size={pool.size()}, " f"checked_in={pool.checkedin()}, " f"checked_out={pool.checkedout()}") # 在中间件中调用 # core/middleware.py @app.middleware("http") async def log_requests(request: Request, call_next): log_pool_status() # 记录每次请求的连接池状态 response = await call_next(request) return response ``` --- ## 总结 ### 问题根源 当前配置使用 `NullPool`,连接可能被缓存很长时间,超过 MySQL 的 `wait_timeout` 导致连接被服务器断开,应用尝试使用已断开的连接时报错。 ### 最佳解决方案 ✅ **使用 QueuePool + pool_pre_ping=True** ```python engine = create_engine( str(settings.SQLALCHEMY_DATABASE_URI), poolclass=QueuePool, pool_size=5, max_overflow=10, pool_recycle=3600, # ⭐ 关键:小于 MySQL wait_timeout pool_pre_ping=True, # ⭐ 关键:自动检测失效连接 echo=False, future=True, ) ``` ### 效果 - ✅ 自动管理连接生命周期 - ✅ 自动检测并丢弃失效连接 - ✅ 避免连接泄漏 - ✅ 提升性能(连接复用) - ✅ 无需修改业务代码 --- ## 快速修复 修改 [db.py](/home/xt/wks/zhinengjiancha/n2sql/src/inspectai_api/modules/chat2sql/core/db.py) 第12-16行: ```python # 修改前 engine = create_engine( str(settings.SQLALCHEMY_DATABASE_URI), poolclass=NullPool, # ❌ 问题根源 echo=False, future=True, ) # 修改后 from sqlalchemy.pool import QueuePool engine = create_engine( str(settings.SQLALCHEMY_DATABASE_URI), poolclass=QueuePool, # ✅ 使用连接池 pool_size=5, max_overflow=10, pool_recycle=3600, # ✅ 1小时回收 pool_pre_ping=True, # ✅ 连接前测试 echo=False, future=True, ) ``` 重启应用即可生效。

 


 


 


 


数据验证
### Pydantic 模型 ```python from pydantic import BaseModel, Field, EmailStr from typing import Optional class UserModel(BaseModel): """用户数据模型""" name: str = Field(..., min_length=1, max_length=50) age: int = Field(..., ge=0, le=150) email: Optional[EmailStr] = None class Config: """配置类:用于设置 Pydantic 模型的行为和元数据""" # json_schema_extra: 为 OpenAPI 文档(Swagger UI)提供示例数据 # 这样在访问 /docs 时会显示这个示例,方便前端开发人员参考 json_schema_extra = { "example": { "name": "张三", "age": 25, "email": "zhangsan@example.com" } } # 其他常用配置选项: # # 1. by_alias = True # 序列化时使用字段的别名(alias) # # 2. validate_assignment = True # 在赋值时进行验证,而不仅仅是在初始化时 # # 3. arbitrary_types_allowed = True # 允许使用任意 Python 类型 # # 4. orm_mode = True (Pydantic v2: from_attributes = True) # 支持从 ORM 对象(如 SQLAlchemy 模型)创建 Pydantic 模型 # # 5. schema_extra (Pydantic v2 已弃用,使用 json_schema_extra) # 为 JSON Schema 添加额外信息 ``` **验证规则**: - `name`: 必填,1-50 字符 - `age`: 必填,0-150 之间 - `email`: 可选,必须是有效邮箱 ### 使用模型 ```python @app.post("/users/") async def create_user(user: UserModel): # FastAPI 自动验证请求体 # 如果验证失败,返回 422 错误 return {"user": user.dict()} ``` ### 验证错误示例 **请求**: ```json { "name": "", "age": 200 } ``` **响应** (422 Unprocessable Entity): ```json { "detail": [ { "loc": ["body", "name"], "msg": "ensure this value has at least 1 characters", "type": "value_error.any_str.min_length" }, { "loc": ["body", "age"], "msg": "ensure this value is less than or equal to 150", "type": "value_error.number.not_le" } ] } ``` ---

 


 


 


 


事务控制
是的,FastAPI 可以使用中间件封装数据库事务。不过需要注意,中间件方式适合**每个请求独立事务**的场景,而不是 Java 那种声明式事务(`@Transactional`)。下面我给你一个完整的示例: ## 方案1:中间件方式(每个请求一个事务) ```python from fastapi import FastAPI, Depends, HTTPException from sqlalchemy import create_engine, Column, Integer, String from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session from contextvars import ContextVar from typing import Optional # 数据库配置 SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() # 使用 ContextVar 存储当前请求的 session db_session: ContextVar[Optional[Session]] = ContextVar("db_session", default=None) # 定义模型 class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) name = Column(String, index=True) email = Column(String, unique=True, index=True) class Order(Base): __tablename__ = "orders" id = Column(Integer, primary_key=True, index=True) user_id = Column(Integer) amount = Column(Integer) # 创建表 Base.metadata.create_all(bind=engine) # 中间件:事务管理 @app.middleware("http") async def transaction_middleware(request, call_next): # 创建新的 session session = SessionLocal() token = db_session.set(session) try: # 处理请求 response = await call_next(request) # 如果响应成功,提交事务 if response.status_code < 400: session.commit() else: session.rollback() return response except Exception as e: # 发生异常,回滚事务 session.rollback() raise e finally: # 关闭 session 并清理 context session.close() db_session.reset(token) # 获取当前请求的 session 的依赖 def get_db(): db = db_session.get() if db is None: raise HTTPException(status_code=500, detail="Database session not available") return db # FastAPI 应用 app = FastAPI() # API 路由:可以在单个方法中写多个 SQL,共享同一个事务 @app.post("/users/") async def create_user_with_order(name: str, email: str, amount: int, db: Session = Depends(get_db)): # 创建用户 user = User(name=name, email=email) db.add(user) db.flush() # 获取 user.id # 创建订单(使用同一个事务) order = Order(user_id=user.id, amount=amount) db.add(order) # 不需要 commit,中间件会自动处理 return {"user_id": user.id, "order_id": order.id} @app.post("/transfer/") async def transfer_money(from_user_id: int, to_user_id: int, amount: int, db: Session = Depends(get_db)): # 多个 SQL 操作在同一个事务中 from_user = db.query(User).filter(User.id == from_user_id).first() to_user = db.query(User).filter(User.id == to_user_id).first() if not from_user or not to_user: raise HTTPException(status_code=404, detail="User not found") # 模拟转账操作 # 这里可以执行多个 SQL 更新 # ... return {"message": "Transfer completed"} ``` ## 方案2:使用依赖注入(更灵活,类似 @Transactional) ```python from contextlib import contextmanager from fastapi import FastAPI, Depends, HTTPException from sqlalchemy.orm import Session app = FastAPI() # 事务装饰器/上下文管理器 class TransactionManager: def __init__(self): self.db_session = None @contextmanager def transaction(self, db: Session): try: yield db db.commit() except Exception: db.rollback() raise transaction_manager = TransactionManager() # 依赖:获取事务性的 session def get_transactional_db(): db = SessionLocal() try: with transaction_manager.transaction(db): yield db finally: db.close() # 使用依赖注入控制事务范围 @app.post("/batch-create/") async def batch_create(db: Session = Depends(get_transactional_db)): # 这些操作都在同一个事务中 user1 = User(name="User1", email="user1@example.com") user2 = User(name="User2", email="user2@example.com") db.add(user1) db.add(user2) # 如果这里出错,前面的操作都会回滚 order = Order(user_id=user1.id, amount=100) db.add(order) return {"message": "Batch created"} ``` ## 方案3:手动控制事务(最灵活) ```python from fastapi import FastAPI, Depends, HTTPException from sqlalchemy.orm import Session app = FastAPI() # 获取数据库 session 的依赖 def get_db(): db = SessionLocal() try: yield db finally: db.close() @app.post("/complex-operation/") async def complex_operation(db: Session = Depends(get_db)): try: # 开始事务 # 操作1 user = User(name="Test", email="test@example.com") db.add(user) db.flush() # 操作2 order = Order(user_id=user.id, amount=500) db.add(order) # 操作3:可能失败的操作 # 如果这里出错,前面的 flush 也会回滚 if some_condition: raise HTTPException(status_code=400, detail="Something went wrong") # 手动提交 db.commit() return {"message": "Success"} except Exception: # 手动回滚 db.rollback() raise ``` ## 方案4:使用装饰器实现类似 @Transactional ```python from functools import wraps from typing import Callable from fastapi import FastAPI, Depends def transactional(func: Callable): @wraps(func) async def wrapper(*args, **kwargs): # 获取 db session db = kwargs.get('db') if not db: # 尝试从 args 中获取 for arg in args: if isinstance(arg, Session): db = arg break if not db: raise ValueError("No database session found") try: result = await func(*args, **kwargs) db.commit() return result except Exception: db.rollback() raise return wrapper @app.post("/decorator-example/") @transactional async def decorated_operation(db: Session = Depends(get_db)): user = User(name="Decorator", email="decorator@example.com") db.add(user) order = Order(user_id=user.id, amount=300) db.add(order) return {"message": "Transactional operation completed"} ``` ## 总结 1. **中间件方案**:适合简单的 CRUD,每个请求自动管理事务 2. **依赖注入方案**:比中间件更灵活,可以控制事务范围 3. **手动控制**:最灵活,适合复杂业务逻辑 4. **装饰器方案**:类似 Java 的 `@Transactional` 注解 推荐使用**依赖注入方案**,因为它既保持了代码简洁,又提供了足够的事务控制能力。

 


 


 


 


项目结构
``` src/simple_api/ ├── __init__.py ├── main.py # 应用入口,创建 FastAPI 实例 └── api/ ├── __init__.py └── routes/ ├── __init__.py ├── basic.py # 基础功能路由 ├── calculator.py # 计算器路由 └── user.py # 用户管理路由 ```
```python from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from simple_api.api.routes import basic, calculator, user # 创建应用 app = FastAPI( title="Simple API", description="FastAPI 学习示例", version="0.1.0" ) # CORS 中间件(允许前端访问) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 注册路由 app.include_router(basic.router, tags=["基础功能"]) app.include_router(calculator.router, prefix="/calculator", tags=["计算器"]) app.include_router(user.router, prefix="/user", tags=["用户管理"]) # 根路径 @app.get("/") async def root(): return {"message": "欢迎使用 Simple API!"} ``` ---
- 按对象划分路由,将一个对象的路由操作整合到一个python文件中 - fastapi_base\api\routes\users.py - 定义router = APIRouter(), - 每个对象都有一个router ``` """ 用户路由示例,演示如何创建 CRUD 路由。 """ from typing import List from fastapi import APIRouter, HTTPException, status from fastapi_base.api.deps import SessionDep from fastapi_base.core.schemas import PageRes, Res from fastapi_base.crud.users import crud_user from fastapi_base.models.users import User, UserCreate, UserUpdate router = APIRouter() @router.get("/", summary="获取用户列表") def get_users( session: SessionDep, offset: int = 0, limit: int = 10, ) -> PageRes[User]: ``` - 路由汇兑 - fastapi_base\api\routes\__init__.py - 新建主路由api_router = APIRouter(),注册所有对象的路由 ``` """ 路由模块 注册所有 API 路由。 """ from fastapi import APIRouter from fastapi_base.api.routes import health, users # 创建主路由 api_router = APIRouter() # 注册子路由 api_router.include_router(health.router, tags=["系统"]) api_router.include_router(users.router, prefix="/users", tags=["用户管理"]) ``` - main.py中注册主路由 - main.py中同样可以有自己的路由 ``` from fastapi import FastAPI from fastapi_base.api.routes import api_router from fastapi_base.core.settings import settings # 创建应用 application = FastAPI(**settings.FASTAPI_KWARGS, lifespan=lifespan) # 注册路由 application.include_router(api_router, prefix=settings.API_PREFIX) # 健康检查端点 @application.get("/health", tags=["系统"], summary="健康检查") def health_check(): """API 测活探针""" return Res[dict].ok([{"message": "The API is LIVE"}]) ```

 


 


 


 


项目运行
- .\.venv\Scripts\activate - python .\dev.py - 后端通常需要连接数据库, - 在启动时会检查数据库中是否已经初始化表结构 - 如果没有,则会自动初始化表结构 ``` (.venv) PS D:\wks\elec\fastapi> .\.venv\Scripts\activate (fastapi-base) PS D:\wks\elec\fastapi> python .\dev.py INFO: Will watch for changes in these directories: ['D:\\wks\\elec\\fastapi'] INFO: Uvicorn running on http://127.0.0.1:8801 (Press CTRL+C to quit) ... ... 2026-03-10 11:28:38 | INFO | fastapi_base.core.migrations:auto_upgrade:46 - [MIGRATION] Database migration completed successfully. 2026-03-10 11:28:38 | INFO | fastapi_base.main:lifespan:72 - Application startup complete. ``` - dev.py ``` """ 开发启动脚本 用于在开发环境中启动应用,支持热重载。 """ import sys import os # 添加 src 目录到路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) import uvicorn from fastapi_base.core.settings import settings if __name__ == "__main__": uvicorn.run( "fastapi_base.main:app", host=settings.API_HOST, port=settings.API_PORT, reload=True, # 开发模式启用热重载 log_level="info", ) ```
### 开发模式 ```bash cd /ai/wks/work2/uv_demo uv sync # 安装依赖 cd src uv run python -m fastapi_base.main ``` 或使用 uvicorn: ```bash uv run uvicorn fastapi_base.main:app --reload --host 127.0.0.1 --port 8000 ``` ### 生产模式 ```bash uvicorn fastapi_base.main:app --host 0.0.0.0 --port 8000 --workers 4 ``` ---
### uv run uvicorn - uv run uvicorn main:app --host 0.0.0.0 --port 8000 ``` from fastapi import FastAPI from pydantic import BaseModel import uvicorn app = FastAPI(title="Simple API", version="1.0.0") class Item(BaseModel): name: str price: float description: str | None = None @app.get("/") async def root(): return {"message": "Hello World"} @app.get("/items/{item_id}") async def read_item(item_id: int, q: str | None = None): return {"item_id": item_id, "q": q} @app.post("/items") async def create_item(item: Item): return {"message": "Item created", "item": item.model_dump()} # if __name__ == "__main__": # uvicorn.run(app, host="0.0.0.0", port=8000) ``` ### python启动 - 使用python必须打开main方法的注释,没有main入口,程序是空执行 ``` if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) ``` ``` (py312) xt@fine-bump-3:~/wks/elec/tmp$ python main.py INFO: Started server process [1733852] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit) ```

 


 


 


 


参考