计数
``` use std::sync::{Arc, Mutex}; use std::thread; pub fn test(){ let counter = Arc::new(Mutex::new(0)); let mut sync_funcs = vec![]; for _ in 0..100 { let counter = Arc::clone(&counter); // 创建即开始执行 let sync_func = thread::spawn(move || { for _ in 0..10 { let mut num = counter.lock().unwrap(); *num += 1; } }); sync_funcs.push(sync_func); } //依次等待每个线程完成 for func in sync_funcs { func.join().unwrap(); } let final_count = *counter.lock().unwrap(); println!("Final count: {}", final_count); } ```
# Rust 并发多线程示例详解 > 文档版本:v1.0 > 创建日期:2026-03-22 > 对应源文件:[src/bfa/main.rs](../src/bfa/main.rs) --- ## 一、代码概述 本文档详细讲解一个完整的 Rust 并发多线程示例,演示了如何使用多个线程同时操作共享数据,并通过同步机制保证线程安全。 ### 代码功能 - 创建 10 个并发线程 - 每个线程对共享计数器增加 100 次 - 最终计数器值为 1000(10 × 100) - 演示了 Rust 中线程安全和共享数据的处理方式 --- ## 二、逐行代码分析 ### 1. 导入依赖(第 1-2 行) ```rust use std::sync::{Arc, Mutex}; use std::thread; ``` **解析:** - **`std::sync`**:Rust 标准库的同步模块,提供并发原语 - **`Arc`**(Atomic Reference Counting):原子引用计数智能指针 - 允许多个所有者共享同一份数据 - 线程安全的引用计数,保证原子性操作 - 类似于 Rc(Reference Counted),但可以安全地用于多线程环境 - **`Mutex`**:互斥锁(Mutual Exclusion) - 提供内部可变性(Interior Mutability) - 同一时刻只允许一个线程访问数据 - 通过 `lock()` 方法获取锁,通过离开作用区自动释放锁 - **`std::thread`**:线程模块 - 提供 `spawn()` 函数用于创建新线程 - 提供 `JoinHandle` 用于管理线程生命周期 --- ### 2. 注释说明(第 3 行) ```rust //cd /home/xt/wks/rust/xuetu && cargo run --bin bfa ``` **解析:** 这是一个注释,记录了运行该程序的命令: - `cd /home/xt/wks/rust/xuetu`:切换到项目目录 - `cargo run --bin bfa`:编译并运行名为 `bfa` 的二进制文件 --- ### 3. main 函数入口(第 4 行) ```rust fn main() { ``` **解析:** - Rust 程序的入口点 - 不接受参数,不返回值 - 执行流程从这里开始 --- ### 4. 创建共享计数器(第 6 行) ```rust let counter = Arc::new(Mutex::new(0)); ``` **解析:** 这是一个嵌套的数据结构,从内到外理解: 1. **`Mutex::new(0)`**: - 创建一个互斥锁,保护的初始值是 0(整数) - 数据结构:`Mutex<i32>` - 此时 `counter` 的类型是 `Mutex<i32>` 2. **`Arc::new(...)`**: - 将 `Mutex` 包装在 `Arc` 中 - 最终类型:`Arc<Mutex<i32>>` - 这样可以在多个线程间共享所有权 **为什么需要 Arc + Mutex?** | 组件 | 作用 | |------|------| | `Arc` | 允许多个线程共享同一份数据的所有权 | | `Mutex` | 保证同一时刻只有一个线程能访问数据 | > **深入分析:为什么 Mutex 已经互斥了,还需要 Arc?** **1. Mutex 保护的是"访问行为",不是"访问资格"** `Mutex` 的核心职责是 **互斥(Mutual Exclusion)**——它保证同一时刻只有一个线程能*进入*临界区操作数据。但它有一个前提:**这些线程必须都能找得到这个 Mutex**。 在 Rust 的所有权模型中,一个值同一时刻只能有一个所有者。如果你创建了一个 `Mutex<T>`,然后直接把它 `move` 进一个线程的闭包里: ```rust let data = Mutex::new(0); thread::spawn(move || { let mut guard = data.lock().unwrap(); *guard += 1; }); // 这里 main 线程已经失去了 data 的所有权 // 其他线程再也无法访问这个 Mutex ``` 此时 `data` 被**唯一地**转移给了新线程。`Mutex` 确实保证了"同一时刻只有一个线程能访问数据",因为从根本上说,**只有那一个线程知道数据在哪**。其他线程连门都找不到,自然不需要锁。 **2. 所有权与生命周期的硬约束** Rust 的线程闭包 `thread::spawn` 要求捕获的变量满足 `'static` 生命周期。这意味着你不能直接把局部变量的引用 `&T` 抛给多个线程——编译器无法保证这些引用在线程执行期间始终有效。 所以你面临一个两难: - 不能 `move` 给多个线程(所有权唯一) - 不能借 `&` 给多个线程(生命周期不够) `Arc` 就是来解决这个困境的:它在堆上分配数据,并维护一个**原子引用计数**。当多个线程各自持有一个 `Arc<T>` 的克隆时,它们共享的是对同一块堆内存的**所有权**,而不是借用。每个克隆都是独立的值(`Arc` 本身实现了 `Send`),可以被 `move` 进不同的线程。只有当最后一个 `Arc` 被销毁时,堆上的数据才会被释放。 **3. MutexGuard 只是"临时借用",不是所有权转移** 当你调用 `mutex.lock()` 时,你得到的是一个 `MutexGuard<T>`。这个 Guard 是一个 RAII 对象,它代表"我现在持有锁",并在析构时自动释放锁。但请注意: - `MutexGuard` **借用**的是 Mutex 内部的数据,它并没有拿走数据的所有权。 - 数据始终归 `Mutex` 所有,而 `Mutex` 始终归 `Arc` 所有(或某个单一所有者)。 - 锁的获取和释放是**动态运行时的行为**,而所有权的传递是**静态编译期**的约束。Rust 编译器在编译时就需要知道"哪些线程可能访问这个数据",`Arc` 提供了这个静态结构。 **4. 一个形象的类比** 想象一个公共洗手间(数据): - `Mutex` 是洗手间的门锁机制:一次只能进一个人,防止多人同时进去搞破坏。 - `Arc` 是洗手间的地址信息和准入资格:它告诉所有人"洗手间在 3 楼 301,你们都有权去"。 如果没有 `Arc`,只有盖房子的人(创建者)知道洗手间在哪,但他把唯一的钥匙吞进了肚子里(`move` 进一个线程),其他人既不知道地址,也没有钥匙。Mutex 锁得再好,也只锁住了一个人能用的洗手间。 **5. 那为什么不用 `Rc`?** 有些读者可能会想:共享所有权用引用计数不就行了,为什么非得是 `Arc`(Atomic Reference Counted)? 因为 `Rc` 的引用计数操作(增加/减少)**不是原子的**。如果多个线程同时克隆或丢弃 `Rc`,计数器会发生数据竞争,导致 Use-After-Free 或内存泄漏。`Arc` 使用原子指令(如 `AtomicUsize`)来维护计数,保证了线程安全,代价是有一点点性能开销。 **6. 总结:两者是正交的** | 特性 | `Arc<T>` | `Mutex<T>` | |------|----------|------------| | 解决的核心问题 | **多个线程如何共享同一个数据的所有权** | **同一时刻只有一个线程能修改数据** | | 作用时机 | 编译期:构建静态的共享所有权结构 | 运行期:动态控制访问时序 | | 没有它会怎样 | 数据只能属于一个线程,其他线程无法访问 | 多个线程能同时读写,产生数据竞争 | 因此,`Arc` 和 `Mutex` 解决的是完全不同层面的问题。`Arc` 回答的是**"谁有资格访问"**,而 `Mutex` 回答的是**"访问时如何排队"**。在多线程共享可变数据的场景中,二者缺一不可:`Arc` 让你能把同一份数据递给多个线程,而 `Mutex` 让这些线程在真正操作时不会互相踩脚。 --- ### 5. 创建句柄向量(第 7 行) ```rust let mut handles = vec![]; ``` **解析:** - **`vec![]`**:创建一个空的动态数组(Vector) - **`mut`**:声明为可变,后续可以向其中添加元素 - **`handles`**:用于存储线程句柄 `JoinHandle` 的向量 - 类型:`Vec<JoinHandle<()>>` **为什么需要句柄?** - 线程创建后会立即运行,但主线程需要等待子线程完成 - `JoinHandle` 提供了 `join()` 方法来阻塞等待线程结束 --- ### 6. 创建线程循环(第 9-21 行) ```rust for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { for _ in 0..100 { let mut num = counter.lock().unwrap(); *num += 1; } }); handles.push(handle); } ``` #### 6.1 循环结构(第 9 行) ```rust for _ in 0..10 { ``` **解析:** - 创建 10 次迭代 - **`_`**:表示不关心循环变量的值(用下划线忽略) - 等价于 `for i in 0..10`,但不使用 `i` #### 6.2 克隆 Arc(第 10 行) ```rust let counter = Arc::clone(&counter); ``` **解析:** - **`Arc::clone(&counter)`**: - 克隆 `Arc` 智能指针(不是克隆数据本身) - 增加引用计数(从 1 增加到 2、3、4...) - 多个 `Arc` 指向同一块内存数据 - **Shadowing(遮蔽)**: - 新的 `counter` 变量遮蔽了外部的同名变量 - 外部 `counter` 仍然存在,但在循环作用域中不可见 **为什么需要克隆?** ``` 原始 counter (Arc) ──→ [Mutex<i32>] ↑ 克隆 counter 1 (Arc) ─────┘ 克隆 counter 2 (Arc) ─────┘ ... 克隆 counter 10 (Arc) ────┘ ``` 每个线程需要拥有自己的 `Arc` 副本,才能在 `move` 闭包中转移所有权。 #### 6.3 创建线程(第 11 行) ```rust let handle = thread::spawn(move || { ``` **解析:** - **`thread::spawn`**:创建一个新线程 - 参数:一个闭包(closure) - 返回:`JoinHandle<()>` - 线程创建后立即开始执行 - **`move` 关键字**: - 强制闭包获取其捕获变量的所有权 - 将 `counter` 的所有权从主线程转移到新线程 - 必须使用,因为新线程可能比主线程活得更久 - **`||`**:闭包语法 - 参数列表为空(不接受参数) - 可以捕获环境变量(如 `counter`) **闭包语法详解:** Rust 闭包的完整语法格式为: ```rust |参数1, 参数2, ...| { // 闭包体 } ``` 不同形式的闭包对比: ```rust // 1. 无参数的闭包 let no_params = || { println!("No parameters"); 42 // 返回值 }; // 2. 有参数的闭包 let with_params = |x: i32, y: i32| -> i32 { x + y // 返回x + y }; // 3. 简洁形式(单表达式,省略大括号) let simple = |x| x * 2; // 4. 带类型注解的简洁形式 let typed: fn(i32) -> i32 = |x| x * 2; ``` **thread::spawn 中的闭包:** ```rust // 示例中的闭包(无参数) thread::spawn(move || { // 线程代码 }); // 如果需要传递参数可以这样写: let data = vec![1, 2, 3]; thread::spawn(move || { // data 通过捕获传入,不是作为参数 println!("{:?}", data); }); // thread::spawn 的闭包不接受参数,只能通过捕获环境变量获取数据 // 以下是错误的示例: // thread::spawn(|data| { // ? 编译错误!spawn 闭包不接受参数 // println!("{:?}", data); // }); ``` #### 6.4 线程内部循环(第 12 行) ```rust for _ in 0..100 { ``` **解析:** - 每个线程内部执行 100 次循环 - 10 个线程 × 100 次 = 1000 次总操作 #### 6.5 获取锁(第 13 行) ```rust let mut num = counter.lock().unwrap(); ``` **解析:** - **`counter.lock()`**: - 尝试获取 `Mutex` 的锁 - 返回 `LockResult<MutexGuard<i32>>` - 如果锁已被其他线程持有,当前线程会阻塞等待 - 成功后返回 `MutexGuard` 智能指针 - **`.unwrap()`**: - 解包 `Result` 类型 - 如果成功,获取 `MutexGuard` - 如果失败(锁被污染),程序会 panic(这种情况极少发生) - **`let mut num`**: - 将 `MutexGuard` 绑定到可变变量 `num` - `MutexGuard` 实现了 `DerefMut`,可以像引用一样操作 **MutexGuard 是什么?** 这是一个非常重要的概念!让我们详细理解 `MutexGuard` 的工作原理。 #### 关键问题:MutexGuard vs 简单类型复制 **简单类型的复制(值复制):** ```rust let x = 42; let y = x; // 复制 x 的值给 y y += 1; // y 变成 43 println!("x = {}, y = {}", x, y); // 输出: x = 42, y = 43 // x 和 y 是独立的,修改 y 不影响 x ``` **MutexGuard 的情况(智能指针):** ```rust let counter = Arc::new(Mutex::new(42)); { let mut num = counter.lock().unwrap(); // num 不是简单的值复制 *num += 1; // 修改的是 Mutex 内部的数据 println!("num = {}", *num); // 输出: num = 43 } // num 离开作用域,锁自动释放 println!("counter = {}", *counter.lock().unwrap()); // 输出: counter = 43 // 修改确实影响到了 counter! ``` #### MutexGuard 的本质 `MutexGuard` 是一个**智能指针**(Smart Pointer),它的作用是: 1. **持有锁**:Guard 存在期间,锁一直被持有 2. **提供访问**:通过解引用操作符 `*` 访问底层的数据 3. **自动释放**:Guard 被销毁时,自动释放锁 **数据结构示意:** ```rust // 简化的 MutexGuard 结构(伪代码) struct MutexGuard<'a, T> { lock: &'a Mutex<T>, // 持有 Mutex 的引用 data: &'a mut T, // 持有数据的可变引用 } ``` **内存关系图:** ``` ┌─────────────────────────────────────────────────────────┐ │ Arc<Mutex<i32>> │ │ ┌────────────┐ │ │ │ 引用计数 │ │ │ │ = 2 │ │ │ └─────┬──────┘ │ │ │ │ │ ┌─────▼─────────────┐ │ │ │ Mutex<i32> │ │ │ │ ┌─────────────┐ │ │ │ │ │ 锁状态 │ │ │ │ │ │ (已锁定) │ │ │ │ │ └──────┬──────┘ │ │ │ │ │ │ │ │ │ ┌──────▼──────┐ │ │ │ │ │ data │ │ │ │ │ │ = 42 │ │?──┐ │ │ │ └─────────────┘ │ │ 引用 │ │ └───────────────────┘ │ │ └──────────────────────────┼─────────────────────────────┘ │ │ ┌──────────────────────────┼─────────────────────────────┐ │ MutexGuard │ │ │ ┌──────────────────────┐│ │ │ │ lock: &Mutex │├─────────────────────────────┘ │ │ data: &mut i32 ──────┘┘ 指向上面的 data │ └──────────────────────┘ └──────────────────────────┘ ``` #### 与引用的区别 **普通可变引用:** ```rust let mut value = 42; let ref_value = &mut value; // ref_value 是可变引用 *ref_value += 1; // 通过引用修改 value // value 现在是 43 ``` **MutexGuard(智能指针):** ```rust let mutex = Mutex::new(42); let guard = mutex.lock().unwrap(); // guard 是 MutexGuard *guard += 1; // 通过 guard 访问和修改数据 // guard 离开作用域时,自动释放锁 ``` #### Deref 和 DerefMut trait `MutexGuard` 实现了 `Deref` 和 `DerefMut` trait,这使得它可以像引用一样使用: ```rust // Deref trait(不可变访问) impl<T: ?Sized> Deref for MutexGuard<'_, T> { type Target = T; fn deref(&self) -> &Self::Target { // 返回内部数据的不可变引用 } } // DerefMut trait(可变访问) impl<T: ?Sized> DerefMut for MutexGuard<'_, T> { fn deref_mut(&mut self) -> &mut Self::Target { // 返回内部数据的可变引用 } } ``` **实际效果:** ```rust let mut num = counter.lock().unwrap(); // 这些操作是等价的: *num += 1; // 使用解引用操作符 MutexGuard::deref_mut(&mut num); // 显式调用方法 ``` #### 为什么需要 MutexGuard? **问题 1:如何保证锁一定被释放?** 如果使用手动锁管理: ```rust // ? 假设的 API(不安全) let mut num = counter.lock(); // 获取锁 *num += 1; counter.unlock(); // 手动释放锁 - 如果这里 panic 了怎么办? ``` 如果 `*num += 1` 发生 panic,`unlock()` 永远不会被调用,锁永远不会被释放! **解决方案:RAII(资源获取即初始化)** ```rust // ? 实际的 API(安全) { let mut num = counter.lock().unwrap(); // 获取锁,创建 Guard *num += 1; // 即使这里 panic,Guard 的 drop 仍会执行 } // Guard 离开作用域,自动释放锁 ``` `MutexGuard` 实现了 `Drop` trait: ```rust impl<T: ?Sized> Drop for MutexGuard<'_, T> { fn drop(&mut self) { // 自动释放锁 self.lock.unlock(); } } ``` **问题 2:如何防止忘记释放锁?** 使用 `MutexGuard`,编译器会保证: 1. Guard 离开作用域时,`drop()` 一定会被调用 2. `drop()` 中会释放锁 3. 即使发生 panic,也能保证锁被释放 #### 完整示例对比 **示例 1:简单类型复制** ```rust fn main() { let x = 42; let y = x; // 复制值 y += 1; println!("x = {}", x); // x = 42(未改变) println!("y = {}", y); // y = 43(独立变化) } ``` **示例 2:MutexGuard 智能指针** ```rust use std::sync::Mutex; fn main() { let counter = Mutex::new(42); { let mut num = counter.lock().unwrap(); // num 是 MutexGuard println!("Before: num = {}", *num); // num = 42 *num += 1; // 修改 Mutex 内部的数据 println!("After: num = {}", *num); // num = 43 } // Guard 离开作用域,自动释放锁 let value = *counter.lock().unwrap(); println!("counter = {}", value); // counter = 43(已被修改) } ``` **示例 3:多个 Guard 共享同一份数据** ```rust use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let counter1 = Arc::clone(&counter); let handle1 = thread::spawn(move || { let mut guard1 = counter1.lock().unwrap(); // guard1 持有锁 *guard1 += 10; println!("Thread 1: guard1 = {}", *guard1); // guard1 = 10 }); // guard1 离开作用域,释放锁 let counter2 = Arc::clone(&counter); let handle2 = thread::spawn(move || { thread::sleep(std::time::Duration::from_millis(10)); // 等待线程1 let mut guard2 = counter2.lock().unwrap(); // guard2 获取锁 *guard2 += 20; println!("Thread 2: guard2 = {}", *guard2); // guard2 = 30 }); // guard2 离开作用域,释放锁 handle1.join().unwrap(); handle2.join().unwrap(); let value = *counter.lock().unwrap(); println!("Final: counter = {}", value); // counter = 30 } ``` #### 总结:MutexGuard 的核心特性 | 特性 | 说明 | |------|------| | **智能指针** | 持有对数据的可变引用,不是简单的值复制 | | **持有锁** | Guard 存在期间,锁一直被持有 | | **自动释放** | 离开作用域时自动释放锁(RAII) | | **解引用** | 可以用 `*` 操作符访问底层的数据 | | **唯一性** | 同一时刻只能有一个 Guard 存在 | **关键理解:** ``` 简单类型复制: x = 42, y = x → y 是独立的副本 普通引用: y = &x → y 引用 x,没有生命周期管理 智能指针: y = MutexGuard → y 管理 x 的访问,带有额外功能(锁管理) ``` #### 6.6 修改数据(第 14 行) ```rust *num += 1; ``` **解析:** - **`*num`**:解引用 `MutexGuard`,获取底层的 `i32` 值 - **`+= 1`**:增加 1 - `MutexGuard` 离开作用域时自动释放锁(第 15 行) **锁的生命周期:** ```rust { // 锁在 lock() 时获取 let mut num = counter.lock().unwrap(); // 创建 MutexGuard *num += 1; // 通过 Guard 访问和修改数据 } // num 离开作用域,MutexGuard 被 drop,锁自动释放 ``` #### 6.7 保存句柄(第 17 行) ```rust handles.push(handle); ``` **解析:** - 将新创建的线程句柄添加到向量中 - 用于后续等待所有线程完成 --- ### 7. 等待所有线程完成(第 20-22 行) ```rust for handle in handles { handle.join().unwrap(); } ``` **解析:** - **`for handle in handles`**: - 遍历所有线程句柄 - 获取每个 `JoinHandle` 的所有权 - **`handle.join()`**: - 阻塞当前(主)线程,等待子线程结束 - 返回 `Result<T>`,其中 `T` 是闭包的返回值(这里是 `()`) - 如果线程 panic,`join()` 返回 `Err` - **`.unwrap()`**: - 如果线程成功完成,不做任何事 - 如果线程 panic,主线程也会 panic **重要说明:关于 join() 的行为和并发性** #### 关键问题:join() 是阻塞等待,那程序还是并发的吗? 这是一个非常重要的概念!让我们详细理解: 1. **join() 是阻塞等待**: - 第1次循环:`handle.join()` 会阻塞主线程,等待第1个线程执行完毕 - 第2次循环:`handle.join()` 会阻塞主线程,等待第2个线程执行完毕 - ...依此类推 2. **但这并不意味着线程是顺序执行的!** 关键理解:**所有10个线程已经在前面被创建并开始并行执行了!** ```rust // 第一个循环:创建并启动所有10个线程 for _ in 0..10 { let handle = thread::spawn(move || { // 线程立即开始执行,与其他线程并行运行 }); handles.push(handle); } // 此时:10个线程可能同时在不同的CPU核心上运行! // 第二个循环:等待所有线程完成 for handle in handles { handle.join().unwrap(); // 阻塞等待 } ``` #### 时间线示意 ``` 时刻 0: 主线程创建线程1 → 线程1开始运行 时刻 1: 主线程创建线程2 → 线程2开始运行 ... 时刻 9: 主线程创建线程10 → 线程10开始运行 ↓ 所有10个线程都在并行执行 ↓ 时刻 10: 主线程调用 join(线程1) → 等待线程1完成 (此时线程2-10可能还在运行) 时刻 11: 线程1完成 时刻 12: 主线程调用 join(线程2) → 等待线程2完成 (此时线程3-10可能还在运行) ... ``` #### 详细执行时序图 ``` 主线程: 创建线程1 ─┐ 主线程: 创建线程2 ──┼─→ 所有10个线程已经在并行运行! 主线程: ... │ (可能同时在不同CPU核心上执行) 主线程: 创建线程10─┘ ┌─────────────────────────────────────────────────────────┐ │ 同时运行的 10 个线程 │ ├─────────────────────────────────────────────────────────┤ │ │ │ 线程 1: ████████ 完成 │ │ 线程 2: ████████████ 完成 │ │ 线程 3: ██████████ 完成 │ │ ... │ │ 线程 10: ████████████ 完成 │ │ │ └─────────────────────────────────────────────────────────┘ 主线程: join(线程1) ← 等待线程1完成(此时其他线程可能还在运行) 主线程: join(线程2) ← 等待线程2完成(此时线程3-10可能还在运行) 主线程: ... 主线程: join(线程10) ``` #### 为什么仍然需要加锁? 虽然线程并行执行,但它们都要访问同一个 `counter`: - **没有锁的情况**: ``` 时刻 0: 线程A读取 counter = 10 时刻 1: 线程B读取 counter = 10 时刻 2: 线程A写入 11 时刻 3: 线程B写入 11 ← 线程A的操作被覆盖了! ``` - **有锁的情况**: ``` 时刻 0: 线程A获取锁 时刻 1: 线程A读取 10,写入 11,释放锁 时刻 2: 线程B获取锁(等待A完成) 时刻 3: 线程B读取 11,写入 12,释放锁 ``` #### join() 的真正作用 - ? **不是**控制线程的执行顺序(线程已经在并行执行了) - ? **而是**确保主线程在所有子线程完成之前不会退出 #### 如果不调用 join() 会怎样? ```rust // 危险示例 for _ in 0..10 { thread::spawn(|| { // 线程代码 }); } // main 函数返回,程序终止(即使子线程还在运行) ``` 主线程可能立即退出,程序终止,即使子线程还没执行完! #### 完整示例对比 **示例 1:不使用 join()** ```rust use std::thread; use std::time::Duration; fn main() { for i in 0..10 { thread::spawn(move || { println!("Thread {} working...", i); thread::sleep(Duration::from_secs(1)); println!("Thread {} done!", i); }); } // 主线程立即退出,子线程可能还没完成 // 输出可能不完整或没有输出 } ``` **示例 2:使用 join()** ```rust use std::thread; use std::time::Duration; fn main() { let mut handles = vec![]; for i in 0..10 { let handle = thread::spawn(move || { println!("Thread {} working...", i); thread::sleep(Duration::from_secs(1)); println!("Thread {} done!", i); }); handles.push(handle); } // 等待所有线程完成 for handle in handles { handle.join().unwrap(); } println!("All threads completed!"); } ``` #### 总结:join() 的核心要点 | 特性 | 说明 | |------|------| | **阻塞等待** | `join()` 会阻塞主线程,等待指定的线程完成 | | **顺序等待** | 按顺序等待每个线程,但不影响线程的并行执行 | | **不是顺序执行** | 线程已经在并行运行了,`join()` 只是等待它们完成 | | **保证完成** | 确保主线程在所有子线程完成之前不会退出 | | **收集结果** | 可以获取线程的返回值 | **关键理解:** ``` 创建阶段: 线程1、线程2、...、线程10 同时开始运行 ↓ 执行阶段: 所有线程并行执行(在不同CPU核心上) ↓ 等待阶段: join(线程1) → join(线程2) → ... → join(线程10) 主线程顺序等待,但子线程早已在并行执行 ``` **执行时序:** ``` 主线程: 创建线程1 ─┐ 主线程: 创建线程2 ─┼─→ 主线程: 等待(join) 主线程: ... │ 等待所有子线程完成后继续 主线程: 创建线程10─┘ ``` --- ### 8. 输出最终结果(第 24 行) ```rust println!("Final counter: {}", *counter.lock().unwrap()); ``` **解析:** - **`counter.lock().unwrap()`**: - 获取锁(此时所有其他线程已完成) - 获取 `MutexGuard<i32>` - **`*`**:解引用,获取 `i32` 值 - **`println!`**:格式化输出 - 输出示例:`Final counter: 1000` --- ## 三、核心概念详解 ### 3.1 Arc(原子引用计数) **定义:** ```rust pub struct Arc<T> { // 内部实现细节 } ``` **特点:** | 特性 | 说明 | |------|------| | 线程安全 | 引用计数操作是原子的 | | 多所有者 | 允许多个所有者共享数据 | | 不可变 | 只提供共享引用,不提供可变性 | **与 Rc 的区别:** ```rust use std::rc::Rc; // 单线程使用 use std::sync::Arc; // 多线程使用 ``` **Arc 的所有权模型:** ``` 引用计数 = 1 → 引用计数 = 2 → 引用计数 = 1 Arc1 Arc1, Arc2 Arc1 (Arc2 被销毁) ``` ### 3.2 Mutex(互斥锁) **定义:** ```rust pub struct Mutex<T> { // 内部实现细节 } ``` **作用:** 1. **内部可变性**:通过不可变引用修改数据 2. **互斥访问**:同时只有一个线程能访问数据 **锁的操作:** ```rust let mutex = Mutex::new(5); // 获取锁 { let mut data = mutex.lock().unwrap(); *data += 1; // 修改数据 } // 锁自动释放 // 锁已释放,可以再次获取 let data = mutex.lock().unwrap(); ``` ### 3.3 线程创建与管理 **spawn 函数签名:** ```rust pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T + Send + 'static, T: Send + 'static, ``` **约束解释:** | 约束 | 含义 | |------|------| | `FnOnce()` | 闭包只能调用一次 | | `Send` | 闭包和返回值可以在线程间传递 | | `'static` | 闭包捕获的数据拥有静态生命周期(不依赖栈) | **JoinHandle 方法:** ```rust impl<T> JoinHandle<T> { pub fn join(self) -> Result<T>; pub fn thread(&self) -> &Thread; } ``` ### 3.4 move 关键字 **作用:** 强制闭包获取捕获变量的所有权 **核心原理:** `move` 关键字告诉编译器:"闭包捕获的所有变量,将其所有权从外部作用域转移到闭包内部"。 ``` 外部作用域 闭包内部 data ──move──→ [ data ] 所有权转移 ↑ 原变量不再有效 ``` **示例对比:** ```rust // 示例 1: 没有 move - 借用 let data = vec![1, 2, 3]; let closure = || { println!("Length: {}", data.len()); // 借用 data }; println!("Still accessible: {}", data.len()); // ? data 仍然可用 closure(); ``` ```rust // 示例 2: 使用 move - 所有权转移 let data = vec![1, 2, 3]; let closure = move || { println!("Length: {}", data.len()); // data 的所有权转移到闭包 }; // println!("{}", data.len()); // ? 编译错误!data 已经被移动 closure(); // closure(); // ? 再次调用也会错误,因为闭包只能调用一次(FnOnce) ``` ```rust // 示例 3: Copy 类型 - move 会复制而非转移 let num = 42; let closure = move || { println!("Number: {}", num); // i32 实现 Copy,num 被复制 }; println!("Number: {}", num); // ? num 仍然可用(因为是 Copy 类型) ``` **在多线程中的必要性:** ```rust use std::thread; let data = vec![1, 2, 3]; // 错误:没有 move let handle = thread::spawn(|| { println!("{:?}", data); // ? 编译错误 }); // 原因:新线程可能比主线程活得久,data 可能被先释放 drop(data); // data 在这里被释放 handle.join().unwrap(); // 但子线程还在使用 data! ``` ```rust use std::thread; let data = vec![1, 2, 3]; // 正确:使用 move let handle = thread::spawn(move || { println!("{:?}", data); // ? data 的所有权转移到子线程 }); // data 不再在这里可用 handle.join().unwrap(); // 等待子线程完成 ``` **闭包与所有权转移总结:** | 特性 | 不使用 `move` | 使用 `move` | |------|-------------|-----------| | 捕获方式 | 借用(引用) | 转移所有权 | | 原变量仍可用 | ? | ?(除非是 Copy 类型)| | 闭包类型 | `Fn` 或 `FnMut` | `FnOnce` | | 线程安全 | ?(可能悬垂引用)| ? | | 适用场景 | 短生命周期的闭包 | 线程、async 等 | **特殊说明:** `Arc::clone()` 不转移所有权,而是增加引用计数,因此可以配合 `move` 使用: ```rust let counter = Arc::new(Mutex::new(0)); let counter_clone = Arc::clone(&counter); // 增加引用计数 let handle = thread::spawn(move || { // counter_clone 的所有权转移到闭包 // 但底层数据仍然被 counter 引用 }); // counter 仍然可用 ``` --- ## 四、执行流程图 ``` ┌─────────────────────────────────────────────────────────────┐ │ 主线程开始 │ └─────────────────┬───────────────────────────────────────────┘ │ ▼ ┌─────────────────────┐ │ 创建 Arc<Mutex<0>> │ └─────────┬───────────┘ │ ▼ ┌─────────────────────┐ │ 创建空 handles 向量 │ └─────────┬───────────┘ │ ▼ ┌─────────────────────┐ │ for i in 0..10 │?─────────┐ └─────────┬───────────┘ │ │ │ ▼ │ ┌─────────────────────┐ │ │ Arc::clone │ │ └─────────┬───────────┘ │ │ │ ▼ │ ┌─────────────────────┐ │ │ thread::spawn │──────────┤ 创建新线程 └─────────┬───────────┘ │ │ │ ▼ │ ┌─────────────────────┐ │ │ push handle │ │ └─────────┬───────────┘ │ │ │ └──────────────────────┘(重复10次) ┌─────────────────────────────────────────────────────────────┐ │ 同时运行的 10 个线程 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ 线程 1: lock() → +1 → unlock() → lock() → +1... │ │ 线程 2: lock() → +1 → unlock() → lock() → +1... │ │ ... │ │ 线程 10: lock() → +1 → unlock() → lock() → +1... │ │ │ │ 每个线程执行 100 次 += 1 操作 │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────┐ │ for handle in... │?─────────┐ └─────────┬───────────┘ │ │ │ ▼ │ ┌─────────────────────┐ │ │ handle.join() │──────────┤ 等待线程结束 └─────────┬───────────┘ │ │ │ └──────────────────────┘(重复10次) │ ▼ ┌─────────────────────┐ │ 输出最终结果 │ │ Final counter: 1000│ └─────────────────────┘ ``` --- ## 五、为什么结果是 1000? ### 5.1 数学计算 ``` 线程数量 × 每个线程的操作次数 = 最终计数器值 10 × 100 = 1000 ``` ### 5.2 并发安全性保证 **问题:** 如果没有锁会怎样? ```rust // 错误示例(假设没有 Mutex) let counter = Arc::new(0); // ? i32 是 Copy,不能用 Arc // 或者使用 unsafe 的可变引用 // 可能的结果:0-1000 之间的任意值(数据竞争) ``` **数据竞争(Data Race):** ``` 时刻 0: counter = 10 时刻 1: 线程A读取 counter = 10 时刻 2: 线程B读取 counter = 10 时刻 3: 线程A写入 11 时刻 4: 线程B写入 11 ← 线程A的操作被覆盖了! ``` **Mutex 的保证:** ``` 时刻 0: counter = 10 时刻 1: 线程A获取锁 时刻 2: 线程A读取 10,写入 11,释放锁 时刻 3: 线程B获取锁(等待A完成) 时刻 4: 线程B读取 11,写入 12,释放锁 ``` ### 5.3 内存顺序 Rust 的 `Mutex` 保证: 1. **原子性**:每次操作要么全部完成,要么都不完成 2. **可见性**:一个线程的修改对其他线程立即可见 3. **有序性**:操作按代码顺序执行 --- ## 六、常见问题 ### Q1: 为什么需要 Arc? **A:** 因为 `thread::spawn` 的闭包必须拥有 `Send + 'static` 的数据。 ```rust // 没有 Arc:所有权只能属于一个线程 let counter = Mutex::new(0); let handle1 = thread::spawn(move || { // counter 的所有权转移到线程1 *counter.lock().unwrap() += 1; }); // 编译错误:counter 已经被移动 let handle2 = thread::spawn(move || { *counter.lock().unwrap() += 1; // ? }); ``` 使用 `Arc` 解决: ```rust // 使用 Arc:多个线程共享所有权 let counter = Arc::new(Mutex::new(0)); let counter1 = Arc::clone(&counter); let handle1 = thread::spawn(move || { *counter1.lock().unwrap() += 1; // ? }); let counter2 = Arc::clone(&counter); let handle2 = thread::spawn(move || { *counter2.lock().unwrap() += 1; // ? }); ``` ### Q2: 为什么需要 Mutex? **A:** 因为 Rust 的所有权规则不允许通过共享引用修改数据。 ```rust // 错误示例 let counter = Arc::new(0); *counter += 1; // ? Arc 只提供不可变引用 ``` 使用 `Mutex` 提供内部可变性: ```rust let counter = Arc::new(Mutex::new(0)); *counter.lock().unwrap() += 1; // ? ``` ### Q3: 什么是 unwrap()? **A:** `unwrap()` 是 `Result` 和 `Option` 的方法: ```rust // 对于 Result<T, E> match result { Ok(value) => value, Err(err) => panic!("..."), // unwrap 在 Err 时 panic } ``` **为什么这里可以安全使用?** - `Mutex::lock()` 只会在锁被污染时返回 `Err`(极罕见) - `JoinHandle::join()` 的 `Err` 只在子线程 panic 时发生 - 在示例代码中,这些情况不会发生 ### Q4: 线程执行的顺序是确定的吗? **A:** 不确定。线程调度的顺序由操作系统决定。 **可能的执行顺序:** ``` 顺序 1: 线程1 → 线程2 → ... → 线程10 顺序 2: 线程10 → 线程9 → ... → 线程1 顺序 3: 线程1、3、5 并行,然后 2、4、6... ``` **但结果总是 1000**,因为 `Mutex` 保证了互斥访问。 ### Q5: 如果不调用 join() 会怎样? **A:** 主线程可能提前退出,导致程序终止。 ```rust // 危险示例 for _ in 0..10 { thread::spawn(|| { // 如果主线程在这里退出,子线程可能未完成 }); } // main 函数返回,程序终止(即使子线程还在运行) ``` ### Q6: 能否不用 Arc,只用 Mutex? **A:** 可以,但只能在单个线程内使用。 ```rust // 单线程使用 Mutex let counter = Mutex::new(0); { let mut data = counter.lock().unwrap(); *data += 1; } { let mut data = counter.lock().unwrap(); *data += 1; } ``` 多线程必须使用 `Arc` 来共享所有权。 --- ## 七、扩展示例 ### 7.1 多个共享变量 ```rust use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let sum = Arc::new(Mutex::new(0)); let mut handles = vec![]; for i in 0..5 { let counter = Arc::clone(&counter); let sum = Arc::clone(&sum); let handle = thread::spawn(move || { for _ in 0..100 { let mut num = counter.lock().unwrap(); *num += 1; } let mut s = sum.lock().unwrap(); *s += i * 100; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Counter: {}", *counter.lock().unwrap()); println!("Sum: {}", *sum.lock().unwrap()); } ``` ### 7.2 带返回值的线程 ```rust use std::thread; fn main() { let handle = thread::spawn(|| { let result = 1 + 1; result // 返回值 }); let result = handle.join().unwrap(); println!("Result: {}", result); } ``` ### 7.3 使用 RwLock(读写锁) ```rust use std::sync::{Arc, RwLock}; use std::thread; fn main() { let data = Arc::new(RwLock::new(0)); let mut handles = vec![]; // 写线程 for _ in 0..5 { let data = Arc::clone(&data); handles.push(thread::spawn(move || { let mut num = data.write().unwrap(); *num += 1; })); } // 读线程 for _ in 0..5 { let data = Arc::clone(&data); handles.push(thread::spawn(move || { let num = data.read().unwrap(); println!("Read: {}", *num); })); } for handle in handles { handle.join().unwrap(); } } ``` **RwLock vs Mutex:** | 特性 | Mutex | RwLock | |------|-------|--------| | 同时读 | ? | ? | | 同时写 | ? | ? | | 读写同时 | ? | ? | --- ## 八、性能考虑 ### 8.1 锁的竞争 **问题:** 锁竞争会导致性能下降 ```rust // 高竞争示例 for _ in 0..1000 { let counter = Arc::clone(&counter); thread::spawn(move || { for _ in 0..10000 { *counter.lock().unwrap() += 1; // 频繁获取锁 } }); } ``` **优化:减少锁的持有时间** ```rust for _ in 0..1000 { let counter = Arc::clone(&counter); thread::spawn(move || { let mut local_sum = 0; for _ in 0..10000 { local_sum += 1; // 不使用锁 } *counter.lock().unwrap() += local_sum; // 最后加锁一次 }); } ``` ### 8.2 线程数量 **经验法则:** - CPU 密集型:线程数 ≈ CPU 核心数 - I/O 密集型:线程数可以更多 ```rust use std::thread; fn main() { let num_threads = thread::available_parallelism() .map(|n| n.get()) .unwrap_or(4); println!("Recommended threads: {}", num_threads); } ``` --- ## 九、调试技巧 ### 9.1 打印线程ID ```rust use std::thread; fn main() { let handle = thread::spawn(|| { println!("Thread ID: {:?}", thread::current().id()); }); handle.join().unwrap(); println!("Main Thread ID: {:?}", thread::current().id()); } ``` ### 9.2 使用日志 ```rust use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let handle = thread::spawn({ let counter = Arc::clone(&counter); move || { println!("Thread: acquiring lock..."); let mut num = counter.lock().unwrap(); println!("Thread: lock acquired"); *num += 1; println!("Thread: releasing lock"); } }); println!("Main: acquiring lock..."); let mut num = counter.lock().unwrap(); println!("Main: lock acquired"); *num += 1; println!("Main: releasing lock"); handle.join().unwrap(); } ``` --- ## 十、总结 ### 关键要点 1. **Arc**:多线程间共享数据 2. **Mutex**:保证数据修改的互斥性 3. **thread::spawn**:创建新线程 4. **move**:转移所有权到新线程 5. **join**:等待线程完成 ### Rust 并发的优势 - **编译时保证**:编译器会阻止数据竞争 - **零成本抽象**:不需要运行时垃圾回收 - **类型安全**:通过类型系统保证线程安全 ### 学习路径 ``` 基础 → 所有权系统 → 闭包 → 线程 → Arc/Mutex → 高级并发 ``` --- ## 十一、参考资源 ### 官方文档 - [Rust Book - 并发编程](https://doc.rust-lang.org/book/ch16-00-concurrency.html) - [std::thread 模块](https://doc.rust-lang.org/std/thread/index.html) - [std::sync 模块](https://doc.rust-lang.org/std/sync/index.html) ### 推荐阅读 - 《Rust 程序设计语言》第 16 章 - 《Rust By Example》 - 并发章节 - 《Rust Atomics and Locks》 by Mara Bos ---

 


参考