### 1. 最简单的应用
```python
from fastapi import FastAPI
app = FastAPI() # 创建应用实例
@app.get("/") # 路由装饰器
async def root():
return {"message": "Hello World"}
```
保存为 `main.py`,运行:
```bash
uv run uvicorn main:app --reload
```
访问 `http://127.0.0.1:8000`,你会看到:
```json
{"message": "Hello World"}
```
### 2. 路径参数
```python
@app.get("/items/{item_id}")
async def read_item(item_id: int):
"""
item_id 会自动转换为 int 类型
访问: /items/123
"""
return {"item_id": item_id}
```
### 3. 查询参数
```python
from typing import Optional
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
"""
查询参数: ?skip=0&limit=10
"""
return {"skip": skip, "limit": limit}
```
---
|
**快速**: 与 NodeJS 和 Go 相当的高性能 | 特性 | FastAPI | Flask | Django | |------|---------|-------|--------| | 性能 | ⚡⚡⚡ | ⚡⚡ | ⚡ | | 学习曲线 | 低 | 低 | 中 | | 自动文档 | ✅ | ❌ | ❌ | | 类型验证 | ✅ | ❌ | ⚠️ | | 异步支持 | ✅ | ⚠️ | ✅ | |
### 异步支持示例
FastAPI 原生支持异步编程,可以处理大量并发请求:
#### 1. 基础异步函数
```python
import asyncio
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
# 异步函数,不会阻塞其他请求
await asyncio.sleep(1) # 模拟IO操作
return {"message": "异步响应"}
```
#### 2. 异步HTTP请求
```python
import httpx
@app.get("/weather/{city}")
async def get_weather(city: str):
# 使用 httpx 异步客户端
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.weather.com/{city}"
)
return response.json()
```
#### 3. 并发处理多个请求
```python
@app.get("/users/{user_id}")
async def get_user(user_id: int):
# 可以同时查询多个数据源
user_data, user_posts = await asyncio.gather(
fetch_user_from_db(user_id),
fetch_user_posts(user_id)
)
return {"user": user_data, "posts": user_posts}
async def fetch_user_from_db(user_id: int):
# 模拟数据库查询
await asyncio.sleep(0.1)
return {"id": user_id, "name": "张三"}
async def fetch_user_posts(user_id: int):
# 模拟获取用户文章
await asyncio.sleep(0.1)
return [{"title": "文章1"}, {"title": "文章2"}]
```
**异步优势**:
- 在IO密集型操作中性能提升显著
- 可以同时处理数千个并发连接
- 不需要额外的线程或进程
- 适合数据库查询、API调用等场景
#### 4. 并发安全与资源竞争问题
**问题场景:多个请求修改同一资源**
```python
from fastapi import FastAPI, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
# ❌ 危险:没有并发控制
@app.post("/wallet/add/{user_id}")
async def add_money(user_id: int, amount: int):
# 两个请求同时执行:
# 请求A: 读到balance=100
# 请求B: 读到balance=100
# 请求A: 写入150 (+50)
# 请求B: 写入130 (+30) ← A的修改丢失了!
balance = await get_balance(user_id) # balance=100
new_balance = balance + amount
await update_balance(user_id, new_balance)
return {"balance": new_balance}
```
**解决方案1:使用数据库事务(推荐)**
```python
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
@app.post("/wallet/add/{user_id}")
async def add_money(
user_id: int,
amount: int,
db: AsyncSession
):
# 使用数据库行锁(FOR UPDATE)
async with db.begin(): # 自动事务
# SELECT ... FOR UPDATE - 锁定该行
stmt = select(User).where(
User.id == user_id
).with_for_update()
result = await db.execute(stmt)
user = result.scalar_one()
# 其他请求会等待锁释放
user.balance += amount
await db.flush() # 提交后自动释放锁
return {"balance": user.balance}
```
**解决方案2:使用分布式锁(Redis)**
```python
import aioredis
from contextlib import asynccontextmanager
redis = aioredis.from_url("redis://localhost")
@asynccontextmanager
async def redis_lock(lock_key: str, expire: int = 10):
"""分布式锁上下文管理器"""
lock = None
try:
lock = await redis.set(
lock_key, "locked",
nx=True, ex=expire
)
if not lock:
raise HTTPException(
400, "操作正在进行,请稍后重试"
)
yield
finally:
if lock:
await redis.delete(lock_key)
@app.post("/wallet/add/{user_id}")
async def add_money(user_id: int, amount: int):
lock_key = f"lock:user:{user_id}"
# 获取锁,同一用户的请求串行执行
async with redis_lock(lock_key):
balance = await get_balance(user_id)
new_balance = balance + amount
await update_balance(user_id, new_balance)
return {"balance": new_balance}
```
**解决方案3:原子操作(最佳性能)**
```python
@app.post("/wallet/add/{user_id}")
async def add_money(user_id: int, amount: int):
# 直接使用SQL原子操作
await db.execute(
update(User)
.where(User.id == user_id)
.values(balance=User.balance + amount) # 原子递增
)
return {"message": "success"}
```
**并发控制机制对比**:
| 方案 | 适用场景 | 性能 | 一致性 |
|------|---------|------|--------|
| 数据库事务 | 单数据库应用 | ⚡⚡⚡ | 强一致 |
| 分布式锁 | 多服务/缓存 | ⚡⚡ | 强一致 |
| 原子操作 | 简单递增/递减 | ⚡⚡⚡ | 强一致 |
| 乐观锁 | 读多写少 | ⚡⚡⚡ | 最终一致 |
**关键要点**:
- `await` 本身不保证线程安全
- 多个请求可以同时执行到同一个 `await` 之前
- 必须使用锁、事务等机制保护共享资源
- 优先使用数据库级别的原子操作
|
|
## 自动文档生成
FastAPI 自动生成交互式 API 文档!
### Swagger UI
访问: `http://127.0.0.1:8000/docs`
**功能**:
- 📖 浏览所有 API 端点
- 🧪 在线测试 API
- 📝 查看请求/响应模型
### ReDoc
访问: `http://127.0.0.1:8000/redoc`
**功能**:
- 📄 更美观的文档展示
- 🔍 搜索功能
- 📱 移动端友好
### 自定义文档
```python
app = FastAPI(
title="Simple API",
description="这是一个简单的 FastAPI 示例",
version="0.1.0",
docs_url="/docs", # Swagger UI 路径
redoc_url="/redoc", # ReDoc 路径
)
```
---
|
|
|
|
```
application = FastAPI(
**settings.FASTAPI_KWARGS,
lifespan=lifespan,
redirect_slashes=False
)
```
## `redirect_slashes=False` 参数解释
### **作用**
控制 FastAPI 是否自动处理 URL 末尾斜杠的重定向行为。
---
### **默认行为 (`redirect_slashes=True`)**
当设置为 `True`(默认值)时,FastAPI 会自动:
- 将 `/path` 重定向到 `/path/`(如果 `/path/` 有定义)
- 或将 `/path/` 重定向到 `/path`(如果 `/path` 有定义)
**示例:**
```python
from fastapi import FastAPI
app = FastAPI(redirect_slashes=True) # 默认值
@app.get("/items")
def get_items():
return {"msg": "items list"}
@app.get("/items/{item_id}")
def get_item(item_id: int):
return {"msg": f"item {item_id}"}
```
**访问效果:**
- `/items` → 200 OK(直接匹配)
- `/items/` → **307 重定向** → `/items`(自动添加斜杠后重定向)
- `/items/123` → 200 OK
- `/items/123/` → **307 重定向** → `/items/123`
---
### **设置 `redirect_slashes=False` 的行为**
禁用自动重定向后:
- `/path` 和 `/path/` 被视为**完全不同的路径**
- 如果某个路径没有精确匹配的路由,返回 **404 Not Found**
**示例:**
```python
app = FastAPI(redirect_slashes=False)
@app.get("/items")
def get_items():
return {"msg": "items list"}
# 注意:没有定义 @app.get("/items/")
```
**访问效果:**
- `/items` → 200 OK
- `/items/` → **404 Not Found**(因为没有定义带斜杠的路由)
---
### **使用场景对比**
| 场景 | `redirect_slashes=True` (默认) | `redirect_slashes=False` |
|------|-------------------------------|--------------------------|
| RESTful API | ✅ 推荐,用户体验好 | ❌ 可能让用户困惑 |
| 需要精确区分路径 | ❌ 自动重定向会干扰 | ✅ 严格区分带/不带斜杠 |
| 反向代理环境 | ⚠️ 可能产生多余重定向 | ✅ 避免重定向开销 |
| 前后端分离(SPA) | ✅ 处理用户随意输入 | ⚠️ 需前端处理路径规范 |
---
### **实际影响示例**
```python
from fastapi import FastAPI
# 配置1:默认行为
app1 = FastAPI(redirect_slashes=True)
@app1.get("/users")
def get_users():
return {"users": ["alice", "bob"]}
# 访问 /users/ → 307 重定向到 /users ✅
# 配置2:禁用重定向
app2 = FastAPI(redirect_slashes=False)
@app2.get("/users")
def get_users():
return {"users": ["alice", "bob"]}
# 访问 /users/ → 404 Not Found ❌
# 需要额外定义:
@app2.get("/users/")
def get_users_trailing():
return {"users": ["alice", "bob"]}
```
---
### **为什么你的代码设置为 `False`?**
常见原因:
1. **严格的 API 设计**:要求客户端必须使用精确的 URL 格式
2. **避免重定向性能开销**:高并发场景下减少 307 响应
3. **与现有系统兼容**:其他服务期望特定路径格式
4. **Kubernetes Ingress 等代理环境**:避免多层重定向问题
---
### **总结**
- `redirect_slashes=True`(默认):友好但可能产生多余重定向
- `redirect_slashes=False`:严格但需要客户端精确匹配路径
你的代码选择 `False` 意味着采用了**更严格、更精确的路径匹配策略**。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### GET 请求
```python
@app.get("/users/")
async def get_users():
return [{"id": 1, "name": "张三"}, {"id": 2, "name": "李四"}]
```
#### POST 请求
```python
from pydantic import BaseModel
class User(BaseModel):
name: str
age: int
@app.post("/users/")
async def create_user(user: User):
"""
请求体:
{
"name": "张三",
"age": 25
}
"""
return {"message": f"用户 {user.name} 创建成功", "user": user}
```
#### PUT 请求
```python
@app.put("/users/{user_id}")
async def update_user(user_id: int, user: User):
return {"message": f"用户 {user_id} 更新成功"}
```
#### DELETE 请求
```python
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
return {"message": f"用户 {user_id} 删除成功"}
```
|
|
## 不使用 async 关键字
当去掉 `async` 关键字后,路由处理函数会变成**普通函数**(`def`):
```python
from fastapi import FastAPI
app = FastAPI()
@app.get("/") # 使用普通函数
def root():
return {"message": "Hello World"}
```
### 与 async def 的区别
| 特性 | async def (异步) | def (同步) |
|------|------------------|------------|
| **执行方式** | 在事件循环中执行 | 在线程池中执行 |
| **并发模型** | 协程并发 | 线程并发 |
| **适用场景** | I/O 密集型操作 | CPU 密集型或简单操作 |
| **并发能力** | 更高(无阻塞) | 受线程池限制(默认约40个线程) |
| **线程切换** | 轻量级 | 较重 |
### 关键点说明
1. **不会排队执行** - FastAPI 会自动在线程池中运行普通函数,多个请求仍然可以并发处理
2. **线程池限制** - 并发能力受限于线程池大小(默认约 40 个线程),超过限制的请求会等待
3. **适用场景** - 对于简单的、快速的响应操作,使用 `def` 完全可以
### 推荐做法
- ✅ **推荐使用 `async def`**:适合大多数场景,特别是涉及数据库查询、API 调用等 I/O 操作
- ✅ **可以使用 `def`**:适合非常简单的响应或纯 CPU 计算
- ⚠️ **避免混用**:在一个项目中尽量保持一致性
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```
import asyncio
counter = 0
lock = asyncio.Lock()
async def safe_increment():
global counter
async with lock: # 保护整个操作
temp = counter
await asyncio.sleep(0.01) # 即使在await期间,锁也不会释放
counter = temp + 1
async def main():
global counter
tasks = [safe_increment() for _ in range(100)]
await asyncio.gather(*tasks)
print(f"最终结果: {counter}") # 保证是100
# asyncio.run(main())
```
-
```
from fastapi import FastAPI
import threading
app = FastAPI()
counter = 0
lock = threading.Lock()
@app.get("/increment/")
def increment_counter():
global counter
with lock:
current_value = counter
# 模拟一些耗时的操作
import time
time.sleep(1)
counter = current_value + 1
return {"counter": counter}
```
- await 本身不保证线程安全
- 多个请求可以同时执行到同一个 await 之前
- await只是让等待一个异步线程执行完成,却不阻止其他线程访问该方法
- 必须使用锁、事务等机制保护共享资源
- 优先使用数据库级别的原子操作
|
在 FastAPI 中,`await` 和非同步编程涉及到一些重要概念。让我详细解释一下:
## 1. **异步模型基础**
在 FastAPI(基于 asyncio)中:
- **没有真正的多线程并发执行**:asyncio 使用单线程事件循环
- `await` 不会阻塞线程,而是让出控制权给事件循环
- 看似“同时”执行,实际是协程间的协作式多任务
## 2. **代码示例演示**
```python
from fastapi import FastAPI
import asyncio
import time
app = FastAPI()
# 共享资源示例
counter = 0
shared_data = {}
@app.get("/async-endpoint1")
async def endpoint1():
global counter
# 模拟异步IO操作
await asyncio.sleep(1)
current = counter
await asyncio.sleep(0.1) # 模拟处理时间
counter = current + 1
return {"endpoint": 1, "counter": counter}
@app.get("/async-endpoint2")
async def endpoint2():
global counter
await asyncio.sleep(0.5)
current = counter
await asyncio.sleep(0.2)
counter = current + 1
return {"endpoint": 2, "counter": counter}
```
## 3. **并发访问机制**
### 场景分析:
```python
# 假设两个请求几乎同时到达:
# 请求1: endpoint1()
# 请求2: endpoint2()
# 执行时序可能如下:
# 时间点 0ms: endpoint1() 开始,读取 counter=0
# 时间点 0ms: endpoint2() 开始,读取 counter=0
# 时间点 100ms: endpoint1() 设置 counter=1
# 时间点 200ms: endpoint2() 设置 counter=1(覆盖了之前的修改)
# 结果:counter=1,但预期应该是2
```
## 4. **数据库并发问题**
对于数据库同一行数据的更新:
### **乐观锁机制**:
```python
# 使用版本号或时间戳
# SQL示例:
"""
UPDATE users
SET balance = balance + 100, version = version + 1
WHERE id = 1 AND version = {current_version}
"""
# 检查 affected_rows,如果是0,说明版本已过期,需要重试
```
### **悲观锁机制**:
```python
# 在事务中使用 SELECT ... FOR UPDATE
from databases import Database
async def transfer_funds(db: Database, from_id: int, to_id: int, amount: float):
async with db.transaction():
# 锁定两行数据
query1 = "SELECT balance FROM accounts WHERE id = :id FOR UPDATE"
from_balance = await db.fetch_val(query1, values={"id": from_id})
if from_balance >= amount:
# 执行更新
update1 = "UPDATE accounts SET balance = balance - :amount WHERE id = :id"
await db.execute(update1, values={"id": from_id, "amount": amount})
update2 = "UPDATE accounts SET balance = balance + :amount WHERE id = :id"
await db.execute(update2, values={"id": to_id, "amount": amount})
```
## 5. **实际解决方案**
### **方案1:使用数据库事务**
```python
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
async def update_user(session: AsyncSession, user_id: int):
async with session.begin():
# 获取并锁定行
stmt = select(User).where(User.id == user_id).with_for_update()
result = await session.execute(stmt)
user = result.scalar_one()
# 修改数据
user.balance += 100
await session.commit()
```
### **方案2:使用分布式锁**
```python
import redis.asyncio as redis
import asyncio
class DistributedLock:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def lock(self, key: str, timeout=10):
import uuid
identifier = str(uuid.uuid4())
end = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < end:
if await self.redis.setnx(key, identifier):
await self.redis.expire(key, timeout)
return identifier
await asyncio.sleep(0.01)
return None
async def unlock(self, key: str, identifier: str):
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
await self.redis.eval(script, 1, key, identifier)
# 使用
redis_client = redis.Redis()
lock = DistributedLock(redis_client)
async def safe_update(user_id: int):
lock_key = f"lock:user:{user_id}"
lock_id = await lock.lock(lock_key)
if lock_id:
try:
# 执行更新操作
pass
finally:
await lock.unlock(lock_key, lock_id)
```
### **方案3:使用消息队列串行化**
```python
import asyncio
from collections import defaultdict
class TaskQueue:
def __init__(self):
self.queues = defaultdict(asyncio.Queue)
self.locks = {}
async def process_for_key(self, key: str, coro_func, *args):
if key not in self.locks:
self.locks[key] = asyncio.Lock()
async with self.locks[key]:
return await coro_func(*args)
queue = TaskQueue()
@app.post("/update/{user_id}")
async def update_user(user_id: int):
async def _update():
# 这个函数对同一个user_id会串行执行
# 对不同user_id可以并行执行
pass
return await queue.process_for_key(f"user_{user_id}", _update)
```
## 6. **关键总结**
1. **asyncio 是单线程的**,真正的并发在 `await` 点切换
2. **竞态条件可能发生**:即使单线程,协程切换也可能导致数据不一致
3. **数据库层面需要额外保护**:
- 使用事务和适当的隔离级别
- 使用乐观锁或悲观锁
4. **应用层保护**:
- 对共享资源使用锁(asyncio.Lock 用于单进程)
- 使用分布式锁(跨进程/服务)
- 使用消息队列串行化处理
## 7. **最佳实践建议**
```python
# 1. 尽量使用数据库的事务机制
# 2. 对关键操作使用行级锁或乐观锁
# 3. 避免在应用层维护共享状态
# 4. 如果必须,使用 asyncio.Lock(单进程)或分布式锁
import asyncio
# 应用层锁(仅限单进程)
lock = asyncio.Lock()
async def safe_operation():
async with lock: # 这会序列化所有访问
# 操作共享资源
pass
```
记住:**数据库的 ACID 特性是你的第一道防线**,应用层保护是补充。在设计系统时,优先考虑让数据库处理并发问题。
|
# **asyncio.Lock() 详细解析与多场景示例**
`asyncio.Lock()` 是 asyncio 中的互斥锁,用于在**单进程、单线程**的异步环境中保护共享资源。让我详细说明并提供多个例子。
## **1. 基本工作原理**
```python
import asyncio
lock = asyncio.Lock() # 创建一个锁对象
async def critical_section():
# 等待获取锁
async with lock: # 进入时自动获取锁
# 临界区代码 - 同一时间只有一个协程能执行这里
await asyncio.sleep(1)
print("执行临界区代码")
# 退出时自动释放锁
# 模拟并发
async def main():
# 创建多个任务并发执行
tasks = [critical_section() for _ in range(5)]
await asyncio.gather(*tasks)
# 执行结果:虽然创建了5个任务,但它们会串行执行临界区代码
# 每个任务等待前一个释放锁后,才能进入临界区
```
## **2. 示例1:保护共享计数器**
### **问题场景(无锁时):**
```python
import asyncio
counter = 0
async def unsafe_increment():
global counter
temp = counter # 读取当前值
await asyncio.sleep(0.01) # 模拟IO操作,让出控制权
counter = temp + 1 # 写回新值
async def main():
global counter
# 并发执行100次自增
tasks = [unsafe_increment() for _ in range(100)]
await asyncio.gather(*tasks)
print(f"最终结果: {counter}") # 可能不是100!
# 运行结果可能小于100,因为存在竞态条件
```
### **解决方案(使用锁):**
```python
import asyncio
counter = 0
lock = asyncio.Lock()
async def safe_increment():
global counter
async with lock: # 保护整个操作
temp = counter
await asyncio.sleep(0.01) # 即使在await期间,锁也不会释放
counter = temp + 1
async def main():
global counter
tasks = [safe_increment() for _ in range(100)]
await asyncio.gather(*tasks)
print(f"最终结果: {counter}") # 保证是100
# asyncio.run(main())
```
## **3. 示例2:数据库连接池管理**
```python
import asyncio
from typing import List
class DatabaseConnectionPool:
def __init__(self, pool_size: int = 5):
self.pool_size = pool_size
self.available = [f"conn_{i}" for i in range(pool_size)]
self.in_use = []
self.lock = asyncio.Lock() # 保护连接池状态
async def get_connection(self):
async with self.lock:
# 等待直到有可用连接
while not self.available:
print("无可用连接,等待中...")
# 临时释放锁,让其他协程可以归还连接
await self.lock.release()
await asyncio.sleep(0.1) # 短暂等待
await self.lock.acquire() # 重新获取锁
conn = self.available.pop()
self.in_use.append(conn)
print(f"获取连接: {conn}, 可用: {self.available}, 使用中: {self.in_use}")
return conn
async def release_connection(self, conn: str):
async with self.lock:
if conn in self.in_use:
self.in_use.remove(conn)
self.available.append(conn)
print(f"释放连接: {conn}, 可用: {self.available}, 使用中: {self.in_use}")
# 使用示例
async def query_database(pool: DatabaseConnectionPool, task_id: int):
conn = await pool.get_connection()
try:
# 模拟数据库查询
await asyncio.sleep(0.5)
print(f"任务 {task_id} 使用连接 {conn} 执行查询")
finally:
await pool.release_connection(conn)
async def main():
pool = DatabaseConnectionPool(pool_size=3)
tasks = [query_database(pool, i) for i in range(10)]
await asyncio.gather(*tasks)
```
## **4. 示例3:缓存系统**
```python
import asyncio
from datetime import datetime, timedelta
class AsyncCache:
def __init__(self):
self.cache = {}
self.lock = asyncio.Lock()
async def get(self, key: str, default=None):
async with self.lock:
if key in self.cache:
entry = self.cache[key]
if entry['expires_at'] > datetime.now():
return entry['value']
else:
# 已过期,删除
del self.cache[key]
return default
async def set(self, key: str, value, ttl_seconds: int = 60):
async with self.lock:
self.cache[key] = {
'value': value,
'expires_at': datetime.now() + timedelta(seconds=ttl_seconds)
}
async def clear_expired(self):
async with self.lock:
now = datetime.now()
expired_keys = [
key for key, entry in self.cache.items()
if entry['expires_at'] <= now
]
for key in expired_keys:
del self.cache[key]
return len(expired_keys)
# 使用示例
cache = AsyncCache()
async def get_user_data(user_id: str):
# 先尝试从缓存获取
data = await cache.get(f"user:{user_id}")
if data is not None:
print(f"缓存命中: {user_id}")
return data
# 缓存未命中,模拟从数据库获取
print(f"缓存未命中,查询数据库: {user_id}")
await asyncio.sleep(0.5) # 模拟数据库查询
data = f"用户 {user_id} 的数据"
# 写入缓存
await cache.set(f"user:{user_id}", data, ttl_seconds=10)
return data
async def main():
# 模拟并发请求
tasks = []
for i in range(5):
tasks.append(get_user_data("user1")) # 相同key,应该只有一个真正查询数据库
tasks.append(get_user_data(f"user{i}")) # 不同key,会并发查询
await asyncio.gather(*tasks)
```
## **5. 示例4:限流器(Rate Limiter)**
```python
import asyncio
import time
class RateLimiter:
def __init__(self, requests_per_second: int):
self.requests_per_second = requests_per_second
self.timestamps = []
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = time.time()
# 移除1秒前的记录
cutoff = now - 1.0
self.timestamps = [ts for ts in self.timestamps if ts > cutoff]
# 检查是否超过限制
if len(self.timestamps) >= self.requests_per_second:
# 计算需要等待的时间
oldest = self.timestamps[0]
wait_time = oldest + 1.0 - now
# 释放锁,等待指定时间
await self.lock.release()
try:
await asyncio.sleep(wait_time)
finally:
await self.lock.acquire() # 重新获取锁
# 重新计算(因为等待期间可能有变化)
now = time.time()
self.timestamps = [ts for ts in self.timestamps if ts > now - 1.0]
# 添加当前时间戳
self.timestamps.append(now)
# 使用示例
limiter = RateLimiter(requests_per_second=3) # 每秒最多3个请求
async def make_request(request_id: int):
await limiter.acquire() # 等待直到允许执行
print(f"请求 {request_id} 在 {time.time():.2f} 执行")
await asyncio.sleep(0.1) # 模拟请求处理
async def main():
# 模拟10个并发请求
tasks = [make_request(i) for i in range(10)]
await asyncio.gather(*tasks)
```
## **6. 示例5:工作队列**
```python
import asyncio
from collections import deque
class AsyncWorkQueue:
def __init__(self, max_workers: int = 3):
self.max_workers = max_workers
self.queue = deque()
self.active_workers = 0
self.lock = asyncio.Lock()
self.not_empty = asyncio.Condition(lock=self.lock)
self.not_full = asyncio.Condition(lock=self.lock)
async def put(self, item):
async with self.lock:
# 等待队列不满(如果有最大长度限制)
self.queue.append(item)
self.not_empty.notify() # 通知有任务可处理
async def get(self):
async with self.lock:
# 等待队列不空
while not self.queue:
await self.not_empty.wait()
return self.queue.popleft()
async def worker(self, worker_id: int):
while True:
try:
# 获取任务
task = await self.get()
async with self.lock:
self.active_workers += 1
# 执行任务
print(f"Worker {worker_id} 开始处理任务: {task}")
await asyncio.sleep(1) # 模拟任务处理
print(f"Worker {worker_id} 完成任务: {task}")
async with self.lock:
self.active_workers -= 1
if self.active_workers < self.max_workers:
self.not_full.notify() # 通知可以添加新任务
except asyncio.CancelledError:
break
async def start(self, num_workers: int = None):
if num_workers is None:
num_workers = self.max_workers
workers = []
for i in range(num_workers):
worker_task = asyncio.create_task(self.worker(i))
workers.append(worker_task)
return workers
# 使用示例
async def main():
queue = AsyncWorkQueue(max_workers=2)
# 启动工作者
workers = await queue.start()
# 添加任务
for i in range(10):
await queue.put(f"任务{i}")
print(f"已添加任务{i}")
await asyncio.sleep(0.2) # 控制添加速度
# 等待所有任务完成
await asyncio.sleep(10)
# 清理
for worker in workers:
worker.cancel()
```
## **7. 重要注意事项**
### **锁的嵌套使用:**
```python
async def nested_locks_example():
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async with lock1:
print("获取 lock1")
await asyncio.sleep(0.1)
# 注意:这可能导致死锁!
async with lock2:
print("获取 lock2")
await asyncio.sleep(0.1)
```
### **避免死锁的建议:**
1. **固定获取顺序**:总是按相同顺序获取多个锁
2. **使用超时**:
```python
try:
await asyncio.wait_for(lock.acquire(), timeout=5.0)
# 执行操作
except asyncio.TimeoutError:
print("获取锁超时")
finally:
if lock.locked():
lock.release()
```
3. **使用 `asyncio.Lock()` 而不是 `threading.Lock()`**
### **性能考虑:**
```python
# 不好的做法:锁的范围太大
async def bad_example():
async with lock:
# IO操作不需要在锁内
data = await fetch_from_network() # 这会阻塞其他协程
process(data)
# 好的做法:只保护必要的部分
async def good_example():
# IO操作在锁外
data = await fetch_from_network()
# 只保护共享资源的访问
async with lock:
shared_resource.update(data)
```
## **8. 总结**
`asyncio.Lock()` 的关键点:
1. **协程级别互斥**:保护单进程内的异步协程访问共享资源
2. **非阻塞等待**:等待锁时不会阻塞整个线程
3. **可重入**:同一个协程可以多次获取同一个锁(计数机制)
4. **需要配合 `async with` 使用**:确保锁的正确释放
**适用场景**:
- 保护内存中的共享数据结构
- 管理有限资源(连接池、线程池)
- 实现异步的同步原语(信号量、条件变量等)
- 单进程内的并发控制
**不适用场景**:
- 多进程环境(需要使用 `multiprocessing.Lock()`)
- 分布式系统(需要使用分布式锁如 Redis 锁)
- 数据库并发(应该使用数据库的事务和锁机制)
|
asyncio.gather 的作用是并发执行多个协程(任务),并等待它们全部完成。
```
async def main():
# 模拟10个并发请求
tasks = [make_request(i) for i in range(10)]
await asyncio.gather(*tasks)
```
- **并发执行** | 10个 `make_request` 同时启动,不是顺序执行
- **等待全部完成** | 会阻塞直到最后一个任务结束
- **收集结果** | 返回所有任务的返回值列表(按传入顺序)
- **异常传播** | 默认第一个异常会取消其他任务
对比:不用 vs 用 gather
```
# ❌ 顺序执行(慢)
for i in range(10):
await make_request(i) # 等1个完成再启动下一个,总耗时 = 10 × 单个耗时
# ✅ 并发执行(快)
tasks = [make_request(i) for i in range(10)]
await asyncio.gather(*tasks) # 10个同时启动,总耗时 ≈ 最慢的那个
```
实际运行流程
```
import asyncio
import aiohttp
async def make_request(i):
print(f"[{i}] 开始")
await asyncio.sleep(1) # 模拟网络IO
print(f"[{i}] 完成")
return f"结果{i}"
async def main():
tasks = [make_request(i) for i in range(3)]
results = await asyncio.gather(*tasks)
print(f"所有结果: {results}")
asyncio.run(main())
```
[0] 开始
[1] 开始
[2] 开始
[0] 完成 (约1秒后)
[1] 完成
[2] 完成
所有结果: ['结果0', '结果1', '结果2']
常用参数
```
results = await asyncio.gather(
*tasks,
return_exceptions=True # 异常不抛出,作为结果返回
)
```
在 FastAPI 中的典型应用
```
@app.get("/batch-users")
async def get_batch_users(user_ids: list[int]):
# 并发查询多个用户,而不是循环 await
tasks = [fetch_user(uid) for uid in user_ids]
users = await asyncio.gather(*tasks)
return users
```
一句话总结:gather 是 asyncio 的"并发执行 + 汇总结果"工具,让多个 IO 操作并行处理,大幅提升效率。
|
|
|
|
|
|
|
|
|
|
## 4. 依赖注入系统 ([api/deps.py](D:\wks\aisql\chat2sql-develop\src\chat2sql\api\deps.py))
### 4.1 数据库会话依赖
```python
# api/deps.py:7-12
def get_db_session() -> Generator[Session, None, None]:
with Session(engine) as session:
yield session
SessionDep = Annotated[Session, Depends(get_db_session)]
```
#### Depends 详解
**作用:**
- `Depends` 是 FastAPI 的依赖注入核心工具类
- 用于在路由处理函数执行前自动调用依赖函数,并将返回值注入到处理函数参数中
- 在请求处理过程中,FastAPI 会自动解析 `Depends`,调用 `get_db_session()`,并将返回的 `Session` 实例传递给需要它的路由函数
- 依赖函数可以是任何可调用对象(函数、类等)
- 它会随同request自身的参数一起注入方法,在方法中不仅可以使用request参数,还可以使用依赖注入的参数
- 实现了控制反转(IoC)和依赖管理解耦
**核心用法:**
1. **基本用法** - 直接使用 Depends
```python
from fastapi import Depends
def get_db_session():
with Session(engine) as session:
yield session
@router.post("/endpoint")
def endpoint(session: Session = Depends(get_db_session)): # 直接使用
user = session.get(UserModel, user_id)
```
2. **推荐用法** - 使用 Annotated 类型别名(项目采用)
```python
from typing import Annotated
from fastapi import Depends
SessionDep = Annotated[Session, Depends(get_db_session)] # 定义类型别名
@router.post("/endpoint")
def endpoint(session: SessionDep): # 更简洁,类型提示更清晰
user = session.get(UserModel, user_id)
```
**Depends 的工作原理:**
```
┌────────────────────────────────────────────────────────────────┐
│ 1. FastAPI 解析路由函数签名 │
│ def endpoint(session: SessionDep): │
└────────────────────────────┬───────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ 2. 识别 SessionDep 类型定义 │
│ SessionDep = Annotated[Session, Depends(get_db_session)] │
└────────────────────────────┬───────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ 3. 调用依赖函数 get_db_session() │
│ - 执行 with Session(engine) as session │
│ - yield 返回 session 对象 │
└────────────────────────────┬───────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ 4. 将 session 注入到 endpoint 函数的 session 参数 │
└────────────────────────────┬───────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ 5. 执行 endpoint 业务逻辑 │
└────────────────────────────┬───────────────────────────────────┘
│
▼
┌────────────────────────────────────────────────────────────────┐
│ 6. endpoint 执行完毕,get_db_session 继续 │
│ - yield 后的清理代码执行(session.close()) │
└────────────────────────────────────────────────────────────────┘
```
**Depends 的优势:**
- **复用性**:一次定义,多处使用
- **测试友好**:可轻松替换依赖函数为 mock 实现
- **缓存控制**:通过 `use_cache=False` 控制是否缓存依赖结果
- **嵌套依赖**:依赖函数可以依赖其他依赖
- **自动文档**:自动生成 API 文档中的参数说明
**高级用法示例:**
```python
# 带参数的依赖
def get_current_user(token: str = Header(...)):
return decode_token(token)
UserDep = Annotated[User, Depends(get_current_user)]
# 嵌套依赖
def get_db_session():
with Session(engine) as session:
yield session
def get_repository(session: SessionDep = Depends()):
return UserRepository(session)
RepositoryDep = Annotated[UserRepository, Depends(get_repository)]
# 禁用缓存(每次调用都重新执行)
def get_time():
return datetime.now()
TimeDep = Annotated[datetime, Depends(get_time, use_cache=False)]
```
**使用方式:**
```python
@router.post("/endpoint")
def endpoint(session: SessionDep): # 自动注入数据库会话
user = session.get(UserModel, user_id)
```
### 4.2 依赖注入流程
```
请求到达 -> FastAPI 识别 SessionDep 依赖
-> 调用 get_db_session()
-> 创建 Session(engine)
-> yield session 到路由处理函数
-> 处理函数执行完毕
-> session 自动清理(上下文管理器)
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 上面的方式比下面的方式更可靠,因为它使用了 `pathlib` 来构建绝对路径,避免了工作目录变化导致的 `.env` 文件无法找到的问题。
```
class Settings(BaseSettings):
# 使用新的 model_config 配置方式
# 使用绝对路径加载 .env 文件(避免工作目录问题)
model_config = SettingsConfigDict(
env_file=pathlib.Path(__file__).parent.parent.parent.parent / '.env', # 绝对路径
env_file_encoding='utf-8', # 编码
extra='ignore', # 忽略未定义字段
env_ignore_empty=True, # 忽略空值
)
class Settings(BaseSettings):
# 使用新的 model_config 配置方式
model_config = SettingsConfigDict(
env_file=os.path.join(os.getcwd(), '.env'), # .env 文件路径
env_file_encoding='utf-8', # 编码
extra='ignore', # 忽略未定义字段
env_ignore_empty=True, # 忽略空值
)
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#### SQLModel.metadata
- 在 SQLAlchemy/SQLModel 中,**元数据**是一个中央注册表,用于跟踪所有定义的表结构。
- Alchemy 英/ˈælkəmi/ 美/ˈælkəmi/
- class User(SQLModel, table=True)会自动注册到 SQLModel.metadata 中
- `SQLModel.metadata.create_all(bind=engine)` 创建所有已注册的表
```python
# SQLModel 内部使用 SQLAlchemy 的 MetaData
from sqlalchemy import MetaData
# 每个 SQLModel 类都有一个 metadata 属性
class User(SQLModel, table=True):
__tablename__ = "users"
id: Optional[int] = Field(primary_key=True)
name: str
# User 类会自动注册到 SQLModel.metadata 中
```
#### 表模型的自动注册机制
**关键点**: 当定义一个继承自 `SQLModel` 且 `table=True` 的类时,该类会**自动注册**到 `SQLModel.metadata` 中。
**文件**: `elec\fastapi\src\fastapi_base\models\users.py`
```python
from sqlmodel import SQLModel, Field as SQLField
class UserBase(SQLModel):
"""用户基础字段"""
username: str = SQLField(index=True, unique=True, max_length=50)
# ... 其他字段
# 关键: table=True 触发自动注册
class User(UserBase, table=True):
"""用户表模型"""
__tablename__ = "users"
id: Optional[int] = SQLField(default=None, primary_key=True)
created_at: datetime = SQLField(default_factory=datetime.now)
```
#### 导入触发的注册过程
**文件**: `elec\fastapi\src\fastapi_base\models\__init__.py`
```python
"""数据模型模块"""
from sqlmodel import SQLModel
# 关键: 这行导入会触发 users.py 的执行
from . import users
```
**执行流程**:
1. `init_db()` 执行 from fastapi_base.models import SQLModel
2. Python 解释器加载 fastapi_base.models.__init__.py
3. `__init__.py` 中的 from . import users 被执行
4. `users.py` 中的 class User(UserBase, table=True) 定义被执行
5. `User` 类定义时,由于 table=True,自动调用:
```python
# 伪代码展示 SQLModel 内部机制
class User(UserBase, table=True):
def __init_subclass__(cls, table=False, **kwargs):
if table:
# 将表信息注册到 SQLModel.metadata
SQLModel.metadata.tables[cls.__tablename__] = cls.__table__
```
6. `User` 表被注册到 `SQLModel.metadata` 中
7. `init_db()` 调用 `SQLModel.metadata.create_all(bind=engine)` 创建所有已注册的表
---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# MySQL 连接丢失问题排查与解决方案
> 错误信息: `(pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query')`
> 生成日期: 2026-04-10
---
## 问题分析
### 错误特征
```
pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query')
[SQL: SELECT t_user.id, t_user.created_at, t_user.updated_at, t_user.username, t_user.realname, t_user.email, t_user.password_hash, t_user.`role`, t_user.is_active, t_user.last_login
FROM t_user
WHERE t_user.username = %(username_1)s]
[parameters: {'username_1': 'admin'}]
```
### 关键线索
1. **环境**: "另外一台新电脑上登录"
2. **连接池配置**: `poolclass=NullPool` (每次都创建新连接)
3. **日志显示**: `[cached since 2547s ago]` - 说明使用了缓存的连接
4. **错误时机**: 在查询执行过程中连接丢失
---
## 可能原因
### 1. MySQL 服务器超时配置 ⭐ 最可能
MySQL 服务器有多种超时设置,长时间空闲的连接会被服务器主动断开:
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `wait_timeout` | 28800秒 (8小时) | 非交互式连接超时 |
| `interactive_timeout` | 28800秒 (8小时) | 交互式连接超时 |
| `net_read_timeout` | 30秒 | 读取超时 |
| `net_write_timeout` | 60秒 | 写入超时 |
**问题场景**:
- 应用启动时建立了数据库连接
- 连接空闲超过 `wait_timeout` 时间
- MySQL 服务器断开连接
- 应用尝试使用已断开的连接 → **连接丢失错误**
### 2. 网络问题
新电脑的网络环境可能导致:
- 防火墙中断长时间空闲的连接
- NAT 路由器超时
- VPN/代理连接不稳定
### 3. MySQL 连接数限制
```sql
-- 检查最大连接数
SHOW VARIABLES LIKE 'max_connections';
-- 检查当前连接数
SHOW STATUS LIKE 'Threads_connected';
```
如果连接数达到上限,新请求会被拒绝。
### 4. MySQL 服务重启
MySQL 服务重启会导致所有现有连接失效。
---
## 解决方案
### 方案 1: 配置连接池(推荐)⭐
使用 SQLAlchemy 的连接池自动管理连接的生命周期:
```python
# core/db.py
from sqlmodel import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
str(settings.SQLALCHEMY_DATABASE_URI),
poolclass=QueuePool, # 使用连接池
pool_size=5, # 连接池大小
max_overflow=10, # 最大溢出连接数
pool_recycle=3600, # 连接回收时间(秒)⭐ 关键配置
pool_pre_ping=True, # 连接前先测试⭐ 推荐开启
echo=False,
future=True,
)
```
**关键参数说明**:
| 参数 | 说明 | 推荐值 |
|------|------|--------|
| `pool_recycle` | 连接在使用超过该时间后会被回收,避免使用过期连接 | 3600 (1小时) < `wait_timeout` |
| `pool_pre_ping` | 每次从连接池获取连接时先测试连接是否有效 | True (强烈推荐) |
| `pool_size` | 常驻连接池大小 | 5-10 |
| `max_overflow` | 额外溢出连接数 | 10-20 |
**为什么 `pool_pre_ping=True` 很重要**:
- 每次获取连接时执行 `SELECT 1` 测试
- 自动检测并丢弃失效连接
- 对性能影响很小(约 1-2ms)
### 方案 2: 使用连接池 + 连接保活
```python
from sqlmodel import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
str(settings.SQLALCHEMY_DATABASE_URI),
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_recycle=1800, # 30分钟回收
pool_pre_ping=True,
connect_args={
"connect_timeout": 10, # 连接超时
"read_timeout": 30, # 读取超时
"write_timeout": 30, # 写入超时
},
echo=False,
future=True,
)
```
### 方案 3: 调整 MySQL 服务器配置
如果无法修改应用代码,可以在 MySQL 服务器端调整:
```sql
-- 查看当前配置
SHOW VARIABLES LIKE '%timeout%';
-- 临时调整(重启后失效)
SET GLOBAL wait_timeout = 86400; -- 24小时
SET GLOBAL interactive_timeout = 86400;
-- 永久调整(需修改配置文件并重启)
-- 在 my.cnf 或 my.ini 中添加:
[mysqld]
wait_timeout = 86400
interactive_timeout = 86400
```
### 方案 4: 保持 NullPool 但添加重试机制
如果必须使用 `NullPool`(例如某些特殊场景),可以添加重试逻辑:
```python
from sqlalchemy import exc
from time import sleep
def execute_with_retry(session, statement, max_retries=3):
"""执行数据库查询,带重试机制"""
for attempt in range(max_retries):
try:
return session.exec(statement).first()
except exc.OperationalError as e:
if attempt == max_retries - 1:
raise
if "Lost connection" in str(e) or "MySQL server has gone away" in str(e):
sleep(1) # 等待1秒后重试
continue
raise
```
### 方案 5: 使用依赖注入 + 自动重连
结合 FastAPI 的依赖注入系统:
```python
# api/deps.py
from sqlmodel import Session
from typing import Annotated, Generator
from fastapi import Depends
def get_db_session() -> Generator[Session, None, None]:
"""获取数据库会话,带连接检查"""
from sqlalchemy import exc
max_retries = 3
for attempt in range(max_retries):
try:
with Session(engine) as session:
# 测试连接
session.exec(text("SELECT 1"))
yield session
break # 成功则退出
except exc.OperationalError as e:
if attempt == max_retries - 1:
raise
if "Lost connection" in str(e):
continue # 重试
raise
SessionDep = Annotated[Session, Depends(get_db_session)]
```
---
## 排查步骤
### 1. 检查 MySQL 超时配置
```sql
-- 登录 MySQL
mysql -u root -p
-- 查看超时配置
SHOW VARIABLES LIKE '%timeout%';
```
**预期输出**:
```
+--------------------------+-------+
| Variable_name | Value |
+--------------------------+-------+
| connect_timeout | 10 |
| wait_timeout | 28800 | ⭐ 检查这个
| interactive_timeout | 28800 | ⭐ 检查这个
| net_read_timeout | 30 |
| net_write_timeout | 60 |
+--------------------------+-------+
```
### 2. 检查网络连接
```bash
# 从新电脑测试 MySQL 连接
ping
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### Pydantic 模型
```python
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
class UserModel(BaseModel):
"""用户数据模型"""
name: str = Field(..., min_length=1, max_length=50)
age: int = Field(..., ge=0, le=150)
email: Optional[EmailStr] = None
class Config:
"""配置类:用于设置 Pydantic 模型的行为和元数据"""
# json_schema_extra: 为 OpenAPI 文档(Swagger UI)提供示例数据
# 这样在访问 /docs 时会显示这个示例,方便前端开发人员参考
json_schema_extra = {
"example": {
"name": "张三",
"age": 25,
"email": "zhangsan@example.com"
}
}
# 其他常用配置选项:
#
# 1. by_alias = True
# 序列化时使用字段的别名(alias)
#
# 2. validate_assignment = True
# 在赋值时进行验证,而不仅仅是在初始化时
#
# 3. arbitrary_types_allowed = True
# 允许使用任意 Python 类型
#
# 4. orm_mode = True (Pydantic v2: from_attributes = True)
# 支持从 ORM 对象(如 SQLAlchemy 模型)创建 Pydantic 模型
#
# 5. schema_extra (Pydantic v2 已弃用,使用 json_schema_extra)
# 为 JSON Schema 添加额外信息
```
**验证规则**:
- `name`: 必填,1-50 字符
- `age`: 必填,0-150 之间
- `email`: 可选,必须是有效邮箱
### 使用模型
```python
@app.post("/users/")
async def create_user(user: UserModel):
# FastAPI 自动验证请求体
# 如果验证失败,返回 422 错误
return {"user": user.dict()}
```
### 验证错误示例
**请求**:
```json
{
"name": "",
"age": 200
}
```
**响应** (422 Unprocessable Entity):
```json
{
"detail": [
{
"loc": ["body", "name"],
"msg": "ensure this value has at least 1 characters",
"type": "value_error.any_str.min_length"
},
{
"loc": ["body", "age"],
"msg": "ensure this value is less than or equal to 150",
"type": "value_error.number.not_le"
}
]
}
```
---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
是的,FastAPI 可以使用中间件封装数据库事务。不过需要注意,中间件方式适合**每个请求独立事务**的场景,而不是 Java 那种声明式事务(`@Transactional`)。下面我给你一个完整的示例:
## 方案1:中间件方式(每个请求一个事务)
```python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from contextvars import ContextVar
from typing import Optional
# 数据库配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 使用 ContextVar 存储当前请求的 session
db_session: ContextVar[Optional[Session]] = ContextVar("db_session", default=None)
# 定义模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
email = Column(String, unique=True, index=True)
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer)
amount = Column(Integer)
# 创建表
Base.metadata.create_all(bind=engine)
# 中间件:事务管理
@app.middleware("http")
async def transaction_middleware(request, call_next):
# 创建新的 session
session = SessionLocal()
token = db_session.set(session)
try:
# 处理请求
response = await call_next(request)
# 如果响应成功,提交事务
if response.status_code < 400:
session.commit()
else:
session.rollback()
return response
except Exception as e:
# 发生异常,回滚事务
session.rollback()
raise e
finally:
# 关闭 session 并清理 context
session.close()
db_session.reset(token)
# 获取当前请求的 session 的依赖
def get_db():
db = db_session.get()
if db is None:
raise HTTPException(status_code=500, detail="Database session not available")
return db
# FastAPI 应用
app = FastAPI()
# API 路由:可以在单个方法中写多个 SQL,共享同一个事务
@app.post("/users/")
async def create_user_with_order(name: str, email: str, amount: int, db: Session = Depends(get_db)):
# 创建用户
user = User(name=name, email=email)
db.add(user)
db.flush() # 获取 user.id
# 创建订单(使用同一个事务)
order = Order(user_id=user.id, amount=amount)
db.add(order)
# 不需要 commit,中间件会自动处理
return {"user_id": user.id, "order_id": order.id}
@app.post("/transfer/")
async def transfer_money(from_user_id: int, to_user_id: int, amount: int, db: Session = Depends(get_db)):
# 多个 SQL 操作在同一个事务中
from_user = db.query(User).filter(User.id == from_user_id).first()
to_user = db.query(User).filter(User.id == to_user_id).first()
if not from_user or not to_user:
raise HTTPException(status_code=404, detail="User not found")
# 模拟转账操作
# 这里可以执行多个 SQL 更新
# ...
return {"message": "Transfer completed"}
```
## 方案2:使用依赖注入(更灵活,类似 @Transactional)
```python
from contextlib import contextmanager
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
app = FastAPI()
# 事务装饰器/上下文管理器
class TransactionManager:
def __init__(self):
self.db_session = None
@contextmanager
def transaction(self, db: Session):
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
transaction_manager = TransactionManager()
# 依赖:获取事务性的 session
def get_transactional_db():
db = SessionLocal()
try:
with transaction_manager.transaction(db):
yield db
finally:
db.close()
# 使用依赖注入控制事务范围
@app.post("/batch-create/")
async def batch_create(db: Session = Depends(get_transactional_db)):
# 这些操作都在同一个事务中
user1 = User(name="User1", email="user1@example.com")
user2 = User(name="User2", email="user2@example.com")
db.add(user1)
db.add(user2)
# 如果这里出错,前面的操作都会回滚
order = Order(user_id=user1.id, amount=100)
db.add(order)
return {"message": "Batch created"}
```
## 方案3:手动控制事务(最灵活)
```python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
app = FastAPI()
# 获取数据库 session 的依赖
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/complex-operation/")
async def complex_operation(db: Session = Depends(get_db)):
try:
# 开始事务
# 操作1
user = User(name="Test", email="test@example.com")
db.add(user)
db.flush()
# 操作2
order = Order(user_id=user.id, amount=500)
db.add(order)
# 操作3:可能失败的操作
# 如果这里出错,前面的 flush 也会回滚
if some_condition:
raise HTTPException(status_code=400, detail="Something went wrong")
# 手动提交
db.commit()
return {"message": "Success"}
except Exception:
# 手动回滚
db.rollback()
raise
```
## 方案4:使用装饰器实现类似 @Transactional
```python
from functools import wraps
from typing import Callable
from fastapi import FastAPI, Depends
def transactional(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# 获取 db session
db = kwargs.get('db')
if not db:
# 尝试从 args 中获取
for arg in args:
if isinstance(arg, Session):
db = arg
break
if not db:
raise ValueError("No database session found")
try:
result = await func(*args, **kwargs)
db.commit()
return result
except Exception:
db.rollback()
raise
return wrapper
@app.post("/decorator-example/")
@transactional
async def decorated_operation(db: Session = Depends(get_db)):
user = User(name="Decorator", email="decorator@example.com")
db.add(user)
order = Order(user_id=user.id, amount=300)
db.add(order)
return {"message": "Transactional operation completed"}
```
## 总结
1. **中间件方案**:适合简单的 CRUD,每个请求自动管理事务
2. **依赖注入方案**:比中间件更灵活,可以控制事务范围
3. **手动控制**:最灵活,适合复杂业务逻辑
4. **装饰器方案**:类似 Java 的 `@Transactional` 注解
推荐使用**依赖注入方案**,因为它既保持了代码简洁,又提供了足够的事务控制能力。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
```
src/simple_api/
├── __init__.py
├── main.py # 应用入口,创建 FastAPI 实例
└── api/
├── __init__.py
└── routes/
├── __init__.py
├── basic.py # 基础功能路由
├── calculator.py # 计算器路由
└── user.py # 用户管理路由
```
|
|
```python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from simple_api.api.routes import basic, calculator, user
# 创建应用
app = FastAPI(
title="Simple API",
description="FastAPI 学习示例",
version="0.1.0"
)
# CORS 中间件(允许前端访问)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 注册路由
app.include_router(basic.router, tags=["基础功能"])
app.include_router(calculator.router, prefix="/calculator", tags=["计算器"])
app.include_router(user.router, prefix="/user", tags=["用户管理"])
# 根路径
@app.get("/")
async def root():
return {"message": "欢迎使用 Simple API!"}
```
---
|
|
- 按对象划分路由,将一个对象的路由操作整合到一个python文件中
- fastapi_base\api\routes\users.py
- 定义router = APIRouter(),
- 每个对象都有一个router
```
"""
用户路由示例,演示如何创建 CRUD 路由。
"""
from typing import List
from fastapi import APIRouter, HTTPException, status
from fastapi_base.api.deps import SessionDep
from fastapi_base.core.schemas import PageRes, Res
from fastapi_base.crud.users import crud_user
from fastapi_base.models.users import User, UserCreate, UserUpdate
router = APIRouter()
@router.get("/", summary="获取用户列表")
def get_users(
session: SessionDep,
offset: int = 0,
limit: int = 10,
) -> PageRes[User]:
```
- 路由汇兑
- fastapi_base\api\routes\__init__.py
- 新建主路由api_router = APIRouter(),注册所有对象的路由
```
"""
路由模块
注册所有 API 路由。
"""
from fastapi import APIRouter
from fastapi_base.api.routes import health, users
# 创建主路由
api_router = APIRouter()
# 注册子路由
api_router.include_router(health.router, tags=["系统"])
api_router.include_router(users.router, prefix="/users", tags=["用户管理"])
```
- main.py中注册主路由
- main.py中同样可以有自己的路由
```
from fastapi import FastAPI
from fastapi_base.api.routes import api_router
from fastapi_base.core.settings import settings
# 创建应用
application = FastAPI(**settings.FASTAPI_KWARGS, lifespan=lifespan)
# 注册路由
application.include_router(api_router, prefix=settings.API_PREFIX)
# 健康检查端点
@application.get("/health", tags=["系统"], summary="健康检查")
def health_check():
"""API 测活探针"""
return Res[dict].ok([{"message": "The API is LIVE"}])
```
|
|
|
|
|
|
|
|
|
|
|
|
- .\.venv\Scripts\activate
- python .\dev.py
- 后端通常需要连接数据库,
- 在启动时会检查数据库中是否已经初始化表结构
- 如果没有,则会自动初始化表结构
```
(.venv) PS D:\wks\elec\fastapi> .\.venv\Scripts\activate
(fastapi-base) PS D:\wks\elec\fastapi> python .\dev.py
INFO: Will watch for changes in these directories: ['D:\\wks\\elec\\fastapi']
INFO: Uvicorn running on http://127.0.0.1:8801 (Press CTRL+C to quit)
...
...
2026-03-10 11:28:38 | INFO | fastapi_base.core.migrations:auto_upgrade:46 - [MIGRATION] Database migration completed successfully.
2026-03-10 11:28:38 | INFO | fastapi_base.main:lifespan:72 - Application startup complete.
```
- dev.py
```
"""
开发启动脚本
用于在开发环境中启动应用,支持热重载。
"""
import sys
import os
# 添加 src 目录到路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src"))
import uvicorn
from fastapi_base.core.settings import settings
if __name__ == "__main__":
uvicorn.run(
"fastapi_base.main:app",
host=settings.API_HOST,
port=settings.API_PORT,
reload=True, # 开发模式启用热重载
log_level="info",
)
```
|
|
### 开发模式
```bash
cd /ai/wks/work2/uv_demo
uv sync # 安装依赖
cd src
uv run python -m fastapi_base.main
```
或使用 uvicorn:
```bash
uv run uvicorn fastapi_base.main:app --reload --host 127.0.0.1 --port 8000
```
### 生产模式
```bash
uvicorn fastapi_base.main:app --host 0.0.0.0 --port 8000 --workers 4
```
---
|
|
### uv run uvicorn
- uv run uvicorn main:app --host 0.0.0.0 --port 8000
```
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
app = FastAPI(title="Simple API", version="1.0.0")
class Item(BaseModel):
name: str
price: float
description: str | None = None
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/items/{item_id}")
async def read_item(item_id: int, q: str | None = None):
return {"item_id": item_id, "q": q}
@app.post("/items")
async def create_item(item: Item):
return {"message": "Item created", "item": item.model_dump()}
# if __name__ == "__main__":
# uvicorn.run(app, host="0.0.0.0", port=8000)
```
### python启动
- 使用python必须打开main方法的注释,没有main入口,程序是空执行
```
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
```
```
(py312) xt@fine-bump-3:~/wks/elec/tmp$ python main.py
INFO: Started server process [1733852]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
```
|
|
|
|
|
|
|
|
|
|
|