|
```
use std::sync::{Arc, Mutex};
use std::thread;
pub fn test(){
let counter = Arc::new(Mutex::new(0));
let mut sync_funcs = vec![];
for _ in 0..100 {
let counter = Arc::clone(&counter);
// 创建即开始执行
let sync_func = thread::spawn(move || {
for _ in 0..10 {
let mut num = counter.lock().unwrap();
*num += 1;
}
});
sync_funcs.push(sync_func);
}
//依次等待每个线程完成
for func in sync_funcs {
func.join().unwrap();
}
let final_count = *counter.lock().unwrap();
println!("Final count: {}", final_count);
}
```
|
|
# Rust 并发多线程示例详解
> 文档版本:v1.0
> 创建日期:2026-03-22
> 对应源文件:[src/bfa/main.rs](../src/bfa/main.rs)
---
## 一、代码概述
本文档详细讲解一个完整的 Rust 并发多线程示例,演示了如何使用多个线程同时操作共享数据,并通过同步机制保证线程安全。
### 代码功能
- 创建 10 个并发线程
- 每个线程对共享计数器增加 100 次
- 最终计数器值为 1000(10 × 100)
- 演示了 Rust 中线程安全和共享数据的处理方式
---
## 二、逐行代码分析
### 1. 导入依赖(第 1-2 行)
```rust
use std::sync::{Arc, Mutex};
use std::thread;
```
**解析:**
- **`std::sync`**:Rust 标准库的同步模块,提供并发原语
- **`Arc`**(Atomic Reference Counting):原子引用计数智能指针
- 允许多个所有者共享同一份数据
- 线程安全的引用计数,保证原子性操作
- 类似于 Rc(Reference Counted),但可以安全地用于多线程环境
- **`Mutex`**:互斥锁(Mutual Exclusion)
- 提供内部可变性(Interior Mutability)
- 同一时刻只允许一个线程访问数据
- 通过 `lock()` 方法获取锁,通过离开作用区自动释放锁
- **`std::thread`**:线程模块
- 提供 `spawn()` 函数用于创建新线程
- 提供 `JoinHandle` 用于管理线程生命周期
---
### 2. 注释说明(第 3 行)
```rust
//cd /home/xt/wks/rust/xuetu && cargo run --bin bfa
```
**解析:**
这是一个注释,记录了运行该程序的命令:
- `cd /home/xt/wks/rust/xuetu`:切换到项目目录
- `cargo run --bin bfa`:编译并运行名为 `bfa` 的二进制文件
---
### 3. main 函数入口(第 4 行)
```rust
fn main() {
```
**解析:**
- Rust 程序的入口点
- 不接受参数,不返回值
- 执行流程从这里开始
---
### 4. 创建共享计数器(第 6 行)
```rust
let counter = Arc::new(Mutex::new(0));
```
**解析:**
这是一个嵌套的数据结构,从内到外理解:
1. **`Mutex::new(0)`**:
- 创建一个互斥锁,保护的初始值是 0(整数)
- 数据结构:`Mutex<i32>`
- 此时 `counter` 的类型是 `Mutex<i32>`
2. **`Arc::new(...)`**:
- 将 `Mutex` 包装在 `Arc` 中
- 最终类型:`Arc<Mutex<i32>>`
- 这样可以在多个线程间共享所有权
**为什么需要 Arc + Mutex?**
| 组件 | 作用 |
|------|------|
| `Arc` | 允许多个线程共享同一份数据的所有权 |
| `Mutex` | 保证同一时刻只有一个线程能访问数据 |
> **深入分析:为什么 Mutex 已经互斥了,还需要 Arc?**
**1. Mutex 保护的是"访问行为",不是"访问资格"**
`Mutex` 的核心职责是 **互斥(Mutual Exclusion)**——它保证同一时刻只有一个线程能*进入*临界区操作数据。但它有一个前提:**这些线程必须都能找得到这个 Mutex**。
在 Rust 的所有权模型中,一个值同一时刻只能有一个所有者。如果你创建了一个 `Mutex<T>`,然后直接把它 `move` 进一个线程的闭包里:
```rust
let data = Mutex::new(0);
thread::spawn(move || {
let mut guard = data.lock().unwrap();
*guard += 1;
});
// 这里 main 线程已经失去了 data 的所有权
// 其他线程再也无法访问这个 Mutex
```
此时 `data` 被**唯一地**转移给了新线程。`Mutex` 确实保证了"同一时刻只有一个线程能访问数据",因为从根本上说,**只有那一个线程知道数据在哪**。其他线程连门都找不到,自然不需要锁。
**2. 所有权与生命周期的硬约束**
Rust 的线程闭包 `thread::spawn` 要求捕获的变量满足 `'static` 生命周期。这意味着你不能直接把局部变量的引用 `&T` 抛给多个线程——编译器无法保证这些引用在线程执行期间始终有效。
所以你面临一个两难:
- 不能 `move` 给多个线程(所有权唯一)
- 不能借 `&` 给多个线程(生命周期不够)
`Arc` 就是来解决这个困境的:它在堆上分配数据,并维护一个**原子引用计数**。当多个线程各自持有一个 `Arc<T>` 的克隆时,它们共享的是对同一块堆内存的**所有权**,而不是借用。每个克隆都是独立的值(`Arc` 本身实现了 `Send`),可以被 `move` 进不同的线程。只有当最后一个 `Arc` 被销毁时,堆上的数据才会被释放。
**3. MutexGuard 只是"临时借用",不是所有权转移**
当你调用 `mutex.lock()` 时,你得到的是一个 `MutexGuard<T>`。这个 Guard 是一个 RAII 对象,它代表"我现在持有锁",并在析构时自动释放锁。但请注意:
- `MutexGuard` **借用**的是 Mutex 内部的数据,它并没有拿走数据的所有权。
- 数据始终归 `Mutex` 所有,而 `Mutex` 始终归 `Arc` 所有(或某个单一所有者)。
- 锁的获取和释放是**动态运行时的行为**,而所有权的传递是**静态编译期**的约束。Rust 编译器在编译时就需要知道"哪些线程可能访问这个数据",`Arc` 提供了这个静态结构。
**4. 一个形象的类比**
想象一个公共洗手间(数据):
- `Mutex` 是洗手间的门锁机制:一次只能进一个人,防止多人同时进去搞破坏。
- `Arc` 是洗手间的地址信息和准入资格:它告诉所有人"洗手间在 3 楼 301,你们都有权去"。
如果没有 `Arc`,只有盖房子的人(创建者)知道洗手间在哪,但他把唯一的钥匙吞进了肚子里(`move` 进一个线程),其他人既不知道地址,也没有钥匙。Mutex 锁得再好,也只锁住了一个人能用的洗手间。
**5. 那为什么不用 `Rc`?**
有些读者可能会想:共享所有权用引用计数不就行了,为什么非得是 `Arc`(Atomic Reference Counted)?
因为 `Rc` 的引用计数操作(增加/减少)**不是原子的**。如果多个线程同时克隆或丢弃 `Rc`,计数器会发生数据竞争,导致 Use-After-Free 或内存泄漏。`Arc` 使用原子指令(如 `AtomicUsize`)来维护计数,保证了线程安全,代价是有一点点性能开销。
**6. 总结:两者是正交的**
| 特性 | `Arc<T>` | `Mutex<T>` |
|------|----------|------------|
| 解决的核心问题 | **多个线程如何共享同一个数据的所有权** | **同一时刻只有一个线程能修改数据** |
| 作用时机 | 编译期:构建静态的共享所有权结构 | 运行期:动态控制访问时序 |
| 没有它会怎样 | 数据只能属于一个线程,其他线程无法访问 | 多个线程能同时读写,产生数据竞争 |
因此,`Arc` 和 `Mutex` 解决的是完全不同层面的问题。`Arc` 回答的是**"谁有资格访问"**,而 `Mutex` 回答的是**"访问时如何排队"**。在多线程共享可变数据的场景中,二者缺一不可:`Arc` 让你能把同一份数据递给多个线程,而 `Mutex` 让这些线程在真正操作时不会互相踩脚。
---
### 5. 创建句柄向量(第 7 行)
```rust
let mut handles = vec![];
```
**解析:**
- **`vec![]`**:创建一个空的动态数组(Vector)
- **`mut`**:声明为可变,后续可以向其中添加元素
- **`handles`**:用于存储线程句柄 `JoinHandle` 的向量
- 类型:`Vec<JoinHandle<()>>`
**为什么需要句柄?**
- 线程创建后会立即运行,但主线程需要等待子线程完成
- `JoinHandle` 提供了 `join()` 方法来阻塞等待线程结束
---
### 6. 创建线程循环(第 9-21 行)
```rust
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..100 {
let mut num = counter.lock().unwrap();
*num += 1;
}
});
handles.push(handle);
}
```
#### 6.1 循环结构(第 9 行)
```rust
for _ in 0..10 {
```
**解析:**
- 创建 10 次迭代
- **`_`**:表示不关心循环变量的值(用下划线忽略)
- 等价于 `for i in 0..10`,但不使用 `i`
#### 6.2 克隆 Arc(第 10 行)
```rust
let counter = Arc::clone(&counter);
```
**解析:**
- **`Arc::clone(&counter)`**:
- 克隆 `Arc` 智能指针(不是克隆数据本身)
- 增加引用计数(从 1 增加到 2、3、4...)
- 多个 `Arc` 指向同一块内存数据
- **Shadowing(遮蔽)**:
- 新的 `counter` 变量遮蔽了外部的同名变量
- 外部 `counter` 仍然存在,但在循环作用域中不可见
**为什么需要克隆?**
```
原始 counter (Arc) ──→ [Mutex<i32>]
↑
克隆 counter 1 (Arc) ─────┘
克隆 counter 2 (Arc) ─────┘
...
克隆 counter 10 (Arc) ────┘
```
每个线程需要拥有自己的 `Arc` 副本,才能在 `move` 闭包中转移所有权。
#### 6.3 创建线程(第 11 行)
```rust
let handle = thread::spawn(move || {
```
**解析:**
- **`thread::spawn`**:创建一个新线程
- 参数:一个闭包(closure)
- 返回:`JoinHandle<()>`
- 线程创建后立即开始执行
- **`move` 关键字**:
- 强制闭包获取其捕获变量的所有权
- 将 `counter` 的所有权从主线程转移到新线程
- 必须使用,因为新线程可能比主线程活得更久
- **`||`**:闭包语法
- 参数列表为空(不接受参数)
- 可以捕获环境变量(如 `counter`)
**闭包语法详解:**
Rust 闭包的完整语法格式为:
```rust
|参数1, 参数2, ...| {
// 闭包体
}
```
不同形式的闭包对比:
```rust
// 1. 无参数的闭包
let no_params = || {
println!("No parameters");
42 // 返回值
};
// 2. 有参数的闭包
let with_params = |x: i32, y: i32| -> i32 {
x + y // 返回x + y
};
// 3. 简洁形式(单表达式,省略大括号)
let simple = |x| x * 2;
// 4. 带类型注解的简洁形式
let typed: fn(i32) -> i32 = |x| x * 2;
```
**thread::spawn 中的闭包:**
```rust
// 示例中的闭包(无参数)
thread::spawn(move || {
// 线程代码
});
// 如果需要传递参数可以这样写:
let data = vec![1, 2, 3];
thread::spawn(move || {
// data 通过捕获传入,不是作为参数
println!("{:?}", data);
});
// thread::spawn 的闭包不接受参数,只能通过捕获环境变量获取数据
// 以下是错误的示例:
// thread::spawn(|data| { // ? 编译错误!spawn 闭包不接受参数
// println!("{:?}", data);
// });
```
#### 6.4 线程内部循环(第 12 行)
```rust
for _ in 0..100 {
```
**解析:**
- 每个线程内部执行 100 次循环
- 10 个线程 × 100 次 = 1000 次总操作
#### 6.5 获取锁(第 13 行)
```rust
let mut num = counter.lock().unwrap();
```
**解析:**
- **`counter.lock()`**:
- 尝试获取 `Mutex` 的锁
- 返回 `LockResult<MutexGuard<i32>>`
- 如果锁已被其他线程持有,当前线程会阻塞等待
- 成功后返回 `MutexGuard` 智能指针
- **`.unwrap()`**:
- 解包 `Result` 类型
- 如果成功,获取 `MutexGuard`
- 如果失败(锁被污染),程序会 panic(这种情况极少发生)
- **`let mut num`**:
- 将 `MutexGuard` 绑定到可变变量 `num`
- `MutexGuard` 实现了 `DerefMut`,可以像引用一样操作
**MutexGuard 是什么?**
这是一个非常重要的概念!让我们详细理解 `MutexGuard` 的工作原理。
#### 关键问题:MutexGuard vs 简单类型复制
**简单类型的复制(值复制):**
```rust
let x = 42;
let y = x; // 复制 x 的值给 y
y += 1; // y 变成 43
println!("x = {}, y = {}", x, y); // 输出: x = 42, y = 43
// x 和 y 是独立的,修改 y 不影响 x
```
**MutexGuard 的情况(智能指针):**
```rust
let counter = Arc::new(Mutex::new(42));
{
let mut num = counter.lock().unwrap(); // num 不是简单的值复制
*num += 1; // 修改的是 Mutex 内部的数据
println!("num = {}", *num); // 输出: num = 43
}
// num 离开作用域,锁自动释放
println!("counter = {}", *counter.lock().unwrap()); // 输出: counter = 43
// 修改确实影响到了 counter!
```
#### MutexGuard 的本质
`MutexGuard` 是一个**智能指针**(Smart Pointer),它的作用是:
1. **持有锁**:Guard 存在期间,锁一直被持有
2. **提供访问**:通过解引用操作符 `*` 访问底层的数据
3. **自动释放**:Guard 被销毁时,自动释放锁
**数据结构示意:**
```rust
// 简化的 MutexGuard 结构(伪代码)
struct MutexGuard<'a, T> {
lock: &'a Mutex<T>, // 持有 Mutex 的引用
data: &'a mut T, // 持有数据的可变引用
}
```
**内存关系图:**
```
┌─────────────────────────────────────────────────────────┐
│ Arc<Mutex<i32>> │
│ ┌────────────┐ │
│ │ 引用计数 │ │
│ │ = 2 │ │
│ └─────┬──────┘ │
│ │ │
│ ┌─────▼─────────────┐ │
│ │ Mutex<i32> │ │
│ │ ┌─────────────┐ │ │
│ │ │ 锁状态 │ │ │
│ │ │ (已锁定) │ │ │
│ │ └──────┬──────┘ │ │
│ │ │ │ │
│ │ ┌──────▼──────┐ │ │
│ │ │ data │ │ │
│ │ │ = 42 │ │?──┐ │
│ │ └─────────────┘ │ │ 引用 │
│ └───────────────────┘ │ │
└──────────────────────────┼─────────────────────────────┘
│
│
┌──────────────────────────┼─────────────────────────────┐
│ MutexGuard │ │
│ ┌──────────────────────┐│ │
│ │ lock: &Mutex │├─────────────────────────────┘
│ │ data: &mut i32 ──────┘┘ 指向上面的 data
│ └──────────────────────┘
└──────────────────────────┘
```
#### 与引用的区别
**普通可变引用:**
```rust
let mut value = 42;
let ref_value = &mut value; // ref_value 是可变引用
*ref_value += 1; // 通过引用修改 value
// value 现在是 43
```
**MutexGuard(智能指针):**
```rust
let mutex = Mutex::new(42);
let guard = mutex.lock().unwrap(); // guard 是 MutexGuard
*guard += 1; // 通过 guard 访问和修改数据
// guard 离开作用域时,自动释放锁
```
#### Deref 和 DerefMut trait
`MutexGuard` 实现了 `Deref` 和 `DerefMut` trait,这使得它可以像引用一样使用:
```rust
// Deref trait(不可变访问)
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
// 返回内部数据的不可变引用
}
}
// DerefMut trait(可变访问)
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
// 返回内部数据的可变引用
}
}
```
**实际效果:**
```rust
let mut num = counter.lock().unwrap();
// 这些操作是等价的:
*num += 1; // 使用解引用操作符
MutexGuard::deref_mut(&mut num); // 显式调用方法
```
#### 为什么需要 MutexGuard?
**问题 1:如何保证锁一定被释放?**
如果使用手动锁管理:
```rust
// ? 假设的 API(不安全)
let mut num = counter.lock(); // 获取锁
*num += 1;
counter.unlock(); // 手动释放锁 - 如果这里 panic 了怎么办?
```
如果 `*num += 1` 发生 panic,`unlock()` 永远不会被调用,锁永远不会被释放!
**解决方案:RAII(资源获取即初始化)**
```rust
// ? 实际的 API(安全)
{
let mut num = counter.lock().unwrap(); // 获取锁,创建 Guard
*num += 1; // 即使这里 panic,Guard 的 drop 仍会执行
} // Guard 离开作用域,自动释放锁
```
`MutexGuard` 实现了 `Drop` trait:
```rust
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
// 自动释放锁
self.lock.unlock();
}
}
```
**问题 2:如何防止忘记释放锁?**
使用 `MutexGuard`,编译器会保证:
1. Guard 离开作用域时,`drop()` 一定会被调用
2. `drop()` 中会释放锁
3. 即使发生 panic,也能保证锁被释放
#### 完整示例对比
**示例 1:简单类型复制**
```rust
fn main() {
let x = 42;
let y = x; // 复制值
y += 1;
println!("x = {}", x); // x = 42(未改变)
println!("y = {}", y); // y = 43(独立变化)
}
```
**示例 2:MutexGuard 智能指针**
```rust
use std::sync::Mutex;
fn main() {
let counter = Mutex::new(42);
{
let mut num = counter.lock().unwrap(); // num 是 MutexGuard
println!("Before: num = {}", *num); // num = 42
*num += 1; // 修改 Mutex 内部的数据
println!("After: num = {}", *num); // num = 43
} // Guard 离开作用域,自动释放锁
let value = *counter.lock().unwrap();
println!("counter = {}", value); // counter = 43(已被修改)
}
```
**示例 3:多个 Guard 共享同一份数据**
```rust
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let counter1 = Arc::clone(&counter);
let handle1 = thread::spawn(move || {
let mut guard1 = counter1.lock().unwrap(); // guard1 持有锁
*guard1 += 10;
println!("Thread 1: guard1 = {}", *guard1); // guard1 = 10
}); // guard1 离开作用域,释放锁
let counter2 = Arc::clone(&counter);
let handle2 = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(10)); // 等待线程1
let mut guard2 = counter2.lock().unwrap(); // guard2 获取锁
*guard2 += 20;
println!("Thread 2: guard2 = {}", *guard2); // guard2 = 30
}); // guard2 离开作用域,释放锁
handle1.join().unwrap();
handle2.join().unwrap();
let value = *counter.lock().unwrap();
println!("Final: counter = {}", value); // counter = 30
}
```
#### 总结:MutexGuard 的核心特性
| 特性 | 说明 |
|------|------|
| **智能指针** | 持有对数据的可变引用,不是简单的值复制 |
| **持有锁** | Guard 存在期间,锁一直被持有 |
| **自动释放** | 离开作用域时自动释放锁(RAII) |
| **解引用** | 可以用 `*` 操作符访问底层的数据 |
| **唯一性** | 同一时刻只能有一个 Guard 存在 |
**关键理解:**
```
简单类型复制: x = 42, y = x → y 是独立的副本
普通引用: y = &x → y 引用 x,没有生命周期管理
智能指针: y = MutexGuard → y 管理 x 的访问,带有额外功能(锁管理)
```
#### 6.6 修改数据(第 14 行)
```rust
*num += 1;
```
**解析:**
- **`*num`**:解引用 `MutexGuard`,获取底层的 `i32` 值
- **`+= 1`**:增加 1
- `MutexGuard` 离开作用域时自动释放锁(第 15 行)
**锁的生命周期:**
```rust
{ // 锁在 lock() 时获取
let mut num = counter.lock().unwrap(); // 创建 MutexGuard
*num += 1; // 通过 Guard 访问和修改数据
} // num 离开作用域,MutexGuard 被 drop,锁自动释放
```
#### 6.7 保存句柄(第 17 行)
```rust
handles.push(handle);
```
**解析:**
- 将新创建的线程句柄添加到向量中
- 用于后续等待所有线程完成
---
### 7. 等待所有线程完成(第 20-22 行)
```rust
for handle in handles {
handle.join().unwrap();
}
```
**解析:**
- **`for handle in handles`**:
- 遍历所有线程句柄
- 获取每个 `JoinHandle` 的所有权
- **`handle.join()`**:
- 阻塞当前(主)线程,等待子线程结束
- 返回 `Result<T>`,其中 `T` 是闭包的返回值(这里是 `()`)
- 如果线程 panic,`join()` 返回 `Err`
- **`.unwrap()`**:
- 如果线程成功完成,不做任何事
- 如果线程 panic,主线程也会 panic
**重要说明:关于 join() 的行为和并发性**
#### 关键问题:join() 是阻塞等待,那程序还是并发的吗?
这是一个非常重要的概念!让我们详细理解:
1. **join() 是阻塞等待**:
- 第1次循环:`handle.join()` 会阻塞主线程,等待第1个线程执行完毕
- 第2次循环:`handle.join()` 会阻塞主线程,等待第2个线程执行完毕
- ...依此类推
2. **但这并不意味着线程是顺序执行的!**
关键理解:**所有10个线程已经在前面被创建并开始并行执行了!**
```rust
// 第一个循环:创建并启动所有10个线程
for _ in 0..10 {
let handle = thread::spawn(move || {
// 线程立即开始执行,与其他线程并行运行
});
handles.push(handle);
}
// 此时:10个线程可能同时在不同的CPU核心上运行!
// 第二个循环:等待所有线程完成
for handle in handles {
handle.join().unwrap(); // 阻塞等待
}
```
#### 时间线示意
```
时刻 0: 主线程创建线程1 → 线程1开始运行
时刻 1: 主线程创建线程2 → 线程2开始运行
...
时刻 9: 主线程创建线程10 → 线程10开始运行
↓ 所有10个线程都在并行执行 ↓
时刻 10: 主线程调用 join(线程1) → 等待线程1完成
(此时线程2-10可能还在运行)
时刻 11: 线程1完成
时刻 12: 主线程调用 join(线程2) → 等待线程2完成
(此时线程3-10可能还在运行)
...
```
#### 详细执行时序图
```
主线程: 创建线程1 ─┐
主线程: 创建线程2 ──┼─→ 所有10个线程已经在并行运行!
主线程: ... │ (可能同时在不同CPU核心上执行)
主线程: 创建线程10─┘
┌─────────────────────────────────────────────────────────┐
│ 同时运行的 10 个线程 │
├─────────────────────────────────────────────────────────┤
│ │
│ 线程 1: ████████ 完成 │
│ 线程 2: ████████████ 完成 │
│ 线程 3: ██████████ 完成 │
│ ... │
│ 线程 10: ████████████ 完成 │
│ │
└─────────────────────────────────────────────────────────┘
主线程: join(线程1) ← 等待线程1完成(此时其他线程可能还在运行)
主线程: join(线程2) ← 等待线程2完成(此时线程3-10可能还在运行)
主线程: ...
主线程: join(线程10)
```
#### 为什么仍然需要加锁?
虽然线程并行执行,但它们都要访问同一个 `counter`:
- **没有锁的情况**:
```
时刻 0: 线程A读取 counter = 10
时刻 1: 线程B读取 counter = 10
时刻 2: 线程A写入 11
时刻 3: 线程B写入 11 ← 线程A的操作被覆盖了!
```
- **有锁的情况**:
```
时刻 0: 线程A获取锁
时刻 1: 线程A读取 10,写入 11,释放锁
时刻 2: 线程B获取锁(等待A完成)
时刻 3: 线程B读取 11,写入 12,释放锁
```
#### join() 的真正作用
- ? **不是**控制线程的执行顺序(线程已经在并行执行了)
- ? **而是**确保主线程在所有子线程完成之前不会退出
#### 如果不调用 join() 会怎样?
```rust
// 危险示例
for _ in 0..10 {
thread::spawn(|| {
// 线程代码
});
}
// main 函数返回,程序终止(即使子线程还在运行)
```
主线程可能立即退出,程序终止,即使子线程还没执行完!
#### 完整示例对比
**示例 1:不使用 join()**
```rust
use std::thread;
use std::time::Duration;
fn main() {
for i in 0..10 {
thread::spawn(move || {
println!("Thread {} working...", i);
thread::sleep(Duration::from_secs(1));
println!("Thread {} done!", i);
});
}
// 主线程立即退出,子线程可能还没完成
// 输出可能不完整或没有输出
}
```
**示例 2:使用 join()**
```rust
use std::thread;
use std::time::Duration;
fn main() {
let mut handles = vec![];
for i in 0..10 {
let handle = thread::spawn(move || {
println!("Thread {} working...", i);
thread::sleep(Duration::from_secs(1));
println!("Thread {} done!", i);
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
println!("All threads completed!");
}
```
#### 总结:join() 的核心要点
| 特性 | 说明 |
|------|------|
| **阻塞等待** | `join()` 会阻塞主线程,等待指定的线程完成 |
| **顺序等待** | 按顺序等待每个线程,但不影响线程的并行执行 |
| **不是顺序执行** | 线程已经在并行运行了,`join()` 只是等待它们完成 |
| **保证完成** | 确保主线程在所有子线程完成之前不会退出 |
| **收集结果** | 可以获取线程的返回值 |
**关键理解:**
```
创建阶段: 线程1、线程2、...、线程10 同时开始运行
↓
执行阶段: 所有线程并行执行(在不同CPU核心上)
↓
等待阶段: join(线程1) → join(线程2) → ... → join(线程10)
主线程顺序等待,但子线程早已在并行执行
```
**执行时序:**
```
主线程: 创建线程1 ─┐
主线程: 创建线程2 ─┼─→ 主线程: 等待(join)
主线程: ... │ 等待所有子线程完成后继续
主线程: 创建线程10─┘
```
---
### 8. 输出最终结果(第 24 行)
```rust
println!("Final counter: {}", *counter.lock().unwrap());
```
**解析:**
- **`counter.lock().unwrap()`**:
- 获取锁(此时所有其他线程已完成)
- 获取 `MutexGuard<i32>`
- **`*`**:解引用,获取 `i32` 值
- **`println!`**:格式化输出
- 输出示例:`Final counter: 1000`
---
## 三、核心概念详解
### 3.1 Arc(原子引用计数)
**定义:**
```rust
pub struct Arc<T> {
// 内部实现细节
}
```
**特点:**
| 特性 | 说明 |
|------|------|
| 线程安全 | 引用计数操作是原子的 |
| 多所有者 | 允许多个所有者共享数据 |
| 不可变 | 只提供共享引用,不提供可变性 |
**与 Rc 的区别:**
```rust
use std::rc::Rc; // 单线程使用
use std::sync::Arc; // 多线程使用
```
**Arc 的所有权模型:**
```
引用计数 = 1 → 引用计数 = 2 → 引用计数 = 1
Arc1 Arc1, Arc2 Arc1 (Arc2 被销毁)
```
### 3.2 Mutex(互斥锁)
**定义:**
```rust
pub struct Mutex<T> {
// 内部实现细节
}
```
**作用:**
1. **内部可变性**:通过不可变引用修改数据
2. **互斥访问**:同时只有一个线程能访问数据
**锁的操作:**
```rust
let mutex = Mutex::new(5);
// 获取锁
{
let mut data = mutex.lock().unwrap();
*data += 1; // 修改数据
} // 锁自动释放
// 锁已释放,可以再次获取
let data = mutex.lock().unwrap();
```
### 3.3 线程创建与管理
**spawn 函数签名:**
```rust
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
```
**约束解释:**
| 约束 | 含义 |
|------|------|
| `FnOnce()` | 闭包只能调用一次 |
| `Send` | 闭包和返回值可以在线程间传递 |
| `'static` | 闭包捕获的数据拥有静态生命周期(不依赖栈) |
**JoinHandle 方法:**
```rust
impl<T> JoinHandle<T> {
pub fn join(self) -> Result<T>;
pub fn thread(&self) -> &Thread;
}
```
### 3.4 move 关键字
**作用:**
强制闭包获取捕获变量的所有权
**核心原理:**
`move` 关键字告诉编译器:"闭包捕获的所有变量,将其所有权从外部作用域转移到闭包内部"。
```
外部作用域 闭包内部
data ──move──→ [ data ] 所有权转移
↑
原变量不再有效
```
**示例对比:**
```rust
// 示例 1: 没有 move - 借用
let data = vec![1, 2, 3];
let closure = || {
println!("Length: {}", data.len()); // 借用 data
};
println!("Still accessible: {}", data.len()); // ? data 仍然可用
closure();
```
```rust
// 示例 2: 使用 move - 所有权转移
let data = vec![1, 2, 3];
let closure = move || {
println!("Length: {}", data.len()); // data 的所有权转移到闭包
};
// println!("{}", data.len()); // ? 编译错误!data 已经被移动
closure();
// closure(); // ? 再次调用也会错误,因为闭包只能调用一次(FnOnce)
```
```rust
// 示例 3: Copy 类型 - move 会复制而非转移
let num = 42;
let closure = move || {
println!("Number: {}", num); // i32 实现 Copy,num 被复制
};
println!("Number: {}", num); // ? num 仍然可用(因为是 Copy 类型)
```
**在多线程中的必要性:**
```rust
use std::thread;
let data = vec![1, 2, 3];
// 错误:没有 move
let handle = thread::spawn(|| {
println!("{:?}", data); // ? 编译错误
});
// 原因:新线程可能比主线程活得久,data 可能被先释放
drop(data); // data 在这里被释放
handle.join().unwrap(); // 但子线程还在使用 data!
```
```rust
use std::thread;
let data = vec![1, 2, 3];
// 正确:使用 move
let handle = thread::spawn(move || {
println!("{:?}", data); // ? data 的所有权转移到子线程
});
// data 不再在这里可用
handle.join().unwrap(); // 等待子线程完成
```
**闭包与所有权转移总结:**
| 特性 | 不使用 `move` | 使用 `move` |
|------|-------------|-----------|
| 捕获方式 | 借用(引用) | 转移所有权 |
| 原变量仍可用 | ? | ?(除非是 Copy 类型)|
| 闭包类型 | `Fn` 或 `FnMut` | `FnOnce` |
| 线程安全 | ?(可能悬垂引用)| ? |
| 适用场景 | 短生命周期的闭包 | 线程、async 等 |
**特殊说明:**
`Arc::clone()` 不转移所有权,而是增加引用计数,因此可以配合 `move` 使用:
```rust
let counter = Arc::new(Mutex::new(0));
let counter_clone = Arc::clone(&counter); // 增加引用计数
let handle = thread::spawn(move || {
// counter_clone 的所有权转移到闭包
// 但底层数据仍然被 counter 引用
});
// counter 仍然可用
```
---
## 四、执行流程图
```
┌─────────────────────────────────────────────────────────────┐
│ 主线程开始 │
└─────────────────┬───────────────────────────────────────────┘
│
▼
┌─────────────────────┐
│ 创建 Arc<Mutex<0>> │
└─────────┬───────────┘
│
▼
┌─────────────────────┐
│ 创建空 handles 向量 │
└─────────┬───────────┘
│
▼
┌─────────────────────┐
│ for i in 0..10 │?─────────┐
└─────────┬───────────┘ │
│ │
▼ │
┌─────────────────────┐ │
│ Arc::clone │ │
└─────────┬───────────┘ │
│ │
▼ │
┌─────────────────────┐ │
│ thread::spawn │──────────┤ 创建新线程
└─────────┬───────────┘ │
│ │
▼ │
┌─────────────────────┐ │
│ push handle │ │
└─────────┬───────────┘ │
│ │
└──────────────────────┘(重复10次)
┌─────────────────────────────────────────────────────────────┐
│ 同时运行的 10 个线程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 线程 1: lock() → +1 → unlock() → lock() → +1... │
│ 线程 2: lock() → +1 → unlock() → lock() → +1... │
│ ... │
│ 线程 10: lock() → +1 → unlock() → lock() → +1... │
│ │
│ 每个线程执行 100 次 += 1 操作 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────┐
│ for handle in... │?─────────┐
└─────────┬───────────┘ │
│ │
▼ │
┌─────────────────────┐ │
│ handle.join() │──────────┤ 等待线程结束
└─────────┬───────────┘ │
│ │
└──────────────────────┘(重复10次)
│
▼
┌─────────────────────┐
│ 输出最终结果 │
│ Final counter: 1000│
└─────────────────────┘
```
---
## 五、为什么结果是 1000?
### 5.1 数学计算
```
线程数量 × 每个线程的操作次数 = 最终计数器值
10 × 100 = 1000
```
### 5.2 并发安全性保证
**问题:** 如果没有锁会怎样?
```rust
// 错误示例(假设没有 Mutex)
let counter = Arc::new(0); // ? i32 是 Copy,不能用 Arc
// 或者使用 unsafe 的可变引用
// 可能的结果:0-1000 之间的任意值(数据竞争)
```
**数据竞争(Data Race):**
```
时刻 0: counter = 10
时刻 1: 线程A读取 counter = 10
时刻 2: 线程B读取 counter = 10
时刻 3: 线程A写入 11
时刻 4: 线程B写入 11 ← 线程A的操作被覆盖了!
```
**Mutex 的保证:**
```
时刻 0: counter = 10
时刻 1: 线程A获取锁
时刻 2: 线程A读取 10,写入 11,释放锁
时刻 3: 线程B获取锁(等待A完成)
时刻 4: 线程B读取 11,写入 12,释放锁
```
### 5.3 内存顺序
Rust 的 `Mutex` 保证:
1. **原子性**:每次操作要么全部完成,要么都不完成
2. **可见性**:一个线程的修改对其他线程立即可见
3. **有序性**:操作按代码顺序执行
---
## 六、常见问题
### Q1: 为什么需要 Arc?
**A:** 因为 `thread::spawn` 的闭包必须拥有 `Send + 'static` 的数据。
```rust
// 没有 Arc:所有权只能属于一个线程
let counter = Mutex::new(0);
let handle1 = thread::spawn(move || {
// counter 的所有权转移到线程1
*counter.lock().unwrap() += 1;
});
// 编译错误:counter 已经被移动
let handle2 = thread::spawn(move || {
*counter.lock().unwrap() += 1; // ?
});
```
使用 `Arc` 解决:
```rust
// 使用 Arc:多个线程共享所有权
let counter = Arc::new(Mutex::new(0));
let counter1 = Arc::clone(&counter);
let handle1 = thread::spawn(move || {
*counter1.lock().unwrap() += 1; // ?
});
let counter2 = Arc::clone(&counter);
let handle2 = thread::spawn(move || {
*counter2.lock().unwrap() += 1; // ?
});
```
### Q2: 为什么需要 Mutex?
**A:** 因为 Rust 的所有权规则不允许通过共享引用修改数据。
```rust
// 错误示例
let counter = Arc::new(0);
*counter += 1; // ? Arc 只提供不可变引用
```
使用 `Mutex` 提供内部可变性:
```rust
let counter = Arc::new(Mutex::new(0));
*counter.lock().unwrap() += 1; // ?
```
### Q3: 什么是 unwrap()?
**A:** `unwrap()` 是 `Result` 和 `Option` 的方法:
```rust
// 对于 Result<T, E>
match result {
Ok(value) => value,
Err(err) => panic!("..."), // unwrap 在 Err 时 panic
}
```
**为什么这里可以安全使用?**
- `Mutex::lock()` 只会在锁被污染时返回 `Err`(极罕见)
- `JoinHandle::join()` 的 `Err` 只在子线程 panic 时发生
- 在示例代码中,这些情况不会发生
### Q4: 线程执行的顺序是确定的吗?
**A:** 不确定。线程调度的顺序由操作系统决定。
**可能的执行顺序:**
```
顺序 1: 线程1 → 线程2 → ... → 线程10
顺序 2: 线程10 → 线程9 → ... → 线程1
顺序 3: 线程1、3、5 并行,然后 2、4、6...
```
**但结果总是 1000**,因为 `Mutex` 保证了互斥访问。
### Q5: 如果不调用 join() 会怎样?
**A:** 主线程可能提前退出,导致程序终止。
```rust
// 危险示例
for _ in 0..10 {
thread::spawn(|| {
// 如果主线程在这里退出,子线程可能未完成
});
}
// main 函数返回,程序终止(即使子线程还在运行)
```
### Q6: 能否不用 Arc,只用 Mutex?
**A:** 可以,但只能在单个线程内使用。
```rust
// 单线程使用 Mutex
let counter = Mutex::new(0);
{
let mut data = counter.lock().unwrap();
*data += 1;
}
{
let mut data = counter.lock().unwrap();
*data += 1;
}
```
多线程必须使用 `Arc` 来共享所有权。
---
## 七、扩展示例
### 7.1 多个共享变量
```rust
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let sum = Arc::new(Mutex::new(0));
let mut handles = vec![];
for i in 0..5 {
let counter = Arc::clone(&counter);
let sum = Arc::clone(&sum);
let handle = thread::spawn(move || {
for _ in 0..100 {
let mut num = counter.lock().unwrap();
*num += 1;
}
let mut s = sum.lock().unwrap();
*s += i * 100;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Counter: {}", *counter.lock().unwrap());
println!("Sum: {}", *sum.lock().unwrap());
}
```
### 7.2 带返回值的线程
```rust
use std::thread;
fn main() {
let handle = thread::spawn(|| {
let result = 1 + 1;
result // 返回值
});
let result = handle.join().unwrap();
println!("Result: {}", result);
}
```
### 7.3 使用 RwLock(读写锁)
```rust
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(0));
let mut handles = vec![];
// 写线程
for _ in 0..5 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let mut num = data.write().unwrap();
*num += 1;
}));
}
// 读线程
for _ in 0..5 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let num = data.read().unwrap();
println!("Read: {}", *num);
}));
}
for handle in handles {
handle.join().unwrap();
}
}
```
**RwLock vs Mutex:**
| 特性 | Mutex | RwLock |
|------|-------|--------|
| 同时读 | ? | ? |
| 同时写 | ? | ? |
| 读写同时 | ? | ? |
---
## 八、性能考虑
### 8.1 锁的竞争
**问题:** 锁竞争会导致性能下降
```rust
// 高竞争示例
for _ in 0..1000 {
let counter = Arc::clone(&counter);
thread::spawn(move || {
for _ in 0..10000 {
*counter.lock().unwrap() += 1; // 频繁获取锁
}
});
}
```
**优化:减少锁的持有时间**
```rust
for _ in 0..1000 {
let counter = Arc::clone(&counter);
thread::spawn(move || {
let mut local_sum = 0;
for _ in 0..10000 {
local_sum += 1; // 不使用锁
}
*counter.lock().unwrap() += local_sum; // 最后加锁一次
});
}
```
### 8.2 线程数量
**经验法则:**
- CPU 密集型:线程数 ≈ CPU 核心数
- I/O 密集型:线程数可以更多
```rust
use std::thread;
fn main() {
let num_threads = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
println!("Recommended threads: {}", num_threads);
}
```
---
## 九、调试技巧
### 9.1 打印线程ID
```rust
use std::thread;
fn main() {
let handle = thread::spawn(|| {
println!("Thread ID: {:?}", thread::current().id());
});
handle.join().unwrap();
println!("Main Thread ID: {:?}", thread::current().id());
}
```
### 9.2 使用日志
```rust
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let handle = thread::spawn({
let counter = Arc::clone(&counter);
move || {
println!("Thread: acquiring lock...");
let mut num = counter.lock().unwrap();
println!("Thread: lock acquired");
*num += 1;
println!("Thread: releasing lock");
}
});
println!("Main: acquiring lock...");
let mut num = counter.lock().unwrap();
println!("Main: lock acquired");
*num += 1;
println!("Main: releasing lock");
handle.join().unwrap();
}
```
---
## 十、总结
### 关键要点
1. **Arc**:多线程间共享数据
2. **Mutex**:保证数据修改的互斥性
3. **thread::spawn**:创建新线程
4. **move**:转移所有权到新线程
5. **join**:等待线程完成
### Rust 并发的优势
- **编译时保证**:编译器会阻止数据竞争
- **零成本抽象**:不需要运行时垃圾回收
- **类型安全**:通过类型系统保证线程安全
### 学习路径
```
基础 → 所有权系统 → 闭包 → 线程 → Arc/Mutex → 高级并发
```
---
## 十一、参考资源
### 官方文档
- [Rust Book - 并发编程](https://doc.rust-lang.org/book/ch16-00-concurrency.html)
- [std::thread 模块](https://doc.rust-lang.org/std/thread/index.html)
- [std::sync 模块](https://doc.rust-lang.org/std/sync/index.html)
### 推荐阅读
- 《Rust 程序设计语言》第 16 章
- 《Rust By Example》 - 并发章节
- 《Rust Atomics and Locks》 by Mara Bos
---
|
|
|
|
|