Rust 2026 经验谈 - 无锁并发模式
无锁(lock-free)并发是高性能系统编程的圣杯。Rust 的原子类型和内存序提供了构建无锁数据结构的原语,但正确使用它们需要对硬件内存模型有深刻理解。本文从实战角度出发,详解 Rust 中无锁编程的核心模式与踩坑经验。
Atomic 类型全览
标准库原子类型
Rust 标准库在 std::sync::atomic 模块中提供了一组原子类型,每个对应一种基础类型:
use std::sync::atomic::{ AtomicBool, AtomicI8, AtomicI16, AtomicI32, AtomicI64, AtomicIsize, AtomicU8, AtomicU16, AtomicU32, AtomicU64, AtomicUsize, AtomicPtr,};分类:
| 类别 | 类型 | 典型用途 |
|---|---|---|
| 布尔 | AtomicBool | 开关标志、终止信号 |
| 有符号整数 | AtomicI8 ~ AtomicI64, AtomicIsize | 计数器(可为负)、差值 |
| 无符号整数 | AtomicU8 ~ AtomicU64, AtomicUsize | 引用计数、索引、长度 |
| 指针 | AtomicPtr<T> | 无锁链表、指针摇摆 |
核心操作
每个原子类型提供三类基本操作:
use std::sync::atomic::{AtomicU32, Ordering};
let counter = AtomicU32::new(0);
// 1. load / store——读取 / 写入let val = counter.load(Ordering::SeqCst);counter.store(42, Ordering::SeqCst);
// 2. fetch_xxx——读取并修改(返回旧值)let old = counter.fetch_add(1, Ordering::SeqCst); // 原子自增let old = counter.fetch_sub(1, Ordering::SeqCst); // 原子自减let old = counter.fetch_and(0xFF, Ordering::SeqCst); // 原子位与let old = counter.fetch_or(0x100, Ordering::SeqCst); // 原子位或let old = counter.fetch_xor(0x1, Ordering::SeqCst); // 原子位异或let old = counter.fetch_max(100, Ordering::SeqCst); // 原子取最大值let old = counter.fetch_min(0, Ordering::SeqCst); // 原子取最小值
// 3. compare_exchange / compare_exchange_weak——CASlet result = counter.compare_exchange( 42, // 期望值 100, // 新值 Ordering::SeqCst, // 成功时的 ordering Ordering::SeqCst, // 失败时的 ordering);AtomicPtr 的特殊操作
AtomicPtr<T> 是无锁数据结构的关键原语:
use std::sync::atomic::{AtomicPtr, Ordering};use std::ptr;
let head: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
// 读取指针let current = head.load(Ordering::Acquire);
// CAS 替换指针(实现无锁 push)let new_node = Box::into_raw(Box::new(Node { next: ptr::null_mut(), data: 42 }));loop { let old = head.load(Ordering::Acquire); unsafe { (*new_node).next = old; } if head.compare_exchange_weak( old, new_node, Ordering::Release, Ordering::Relaxed ).is_ok() { break; }}踩坑:AtomicPtr 存储的是裸指针,不管理内存。你必须在合适时机用 Box::from_raw 回收,否则泄漏。
Memory Ordering 实战详解
内存序是无锁编程最难的部分。选错 ordering 不会 panic,但会导致其他线程看到不一致的数据——这是最隐蔽的 bug。
五种 Ordering 的语义
use std::sync::atomic::Ordering;| Ordering | 语义 | 保证 | 使用场景 |
|---|---|---|---|
Relaxed | 无同步,只保证原子性 | 单个变量的操作原子 | 计数器、统计 |
Release | 写操作:之前的写对 Acquire 可见 | 写-写不重排 | ”发布”数据 |
Acquire | 读操作:之后的读能看到 Release 的写 | 读-读不重排 | ”获取”数据 |
AcqRel | 读写操作:同时具备 Acquire + Release | 双向不重排 | read-modify-write |
SeqCst | 全局总序 | 所有线程看到相同顺序 | 最严格、最保守 |
Relaxed:只管原子,不管顺序
use std::sync::atomic::{AtomicU64, Ordering};use std::thread;
static COUNTER: AtomicU64 = AtomicU64::new(0);
fn relaxed_counter() { let handles: Vec<_> = (0..10) .map(|_| { thread::spawn(|| { for _ in 0..1_000_000 { COUNTER.fetch_add(1, Ordering::Relaxed); } }) }) .collect();
for h in handles { h.join().unwrap(); }
// 保证最终值是 10_000_000(原子性保证不丢失更新) assert_eq!(COUNTER.load(Ordering::Relaxed), 10_000_000);}踩坑:Relaxed 不保证其他变量的写入顺序。如果线程 A 先写 data = 42 再 flag.store(1, Relaxed),线程 B 读到 flag == 1 不代表能看到 data == 42。
Acquire/Release:生产者-消费者同步
这是最常用的非 SeqCst 组合:
use std::sync::atomic::{AtomicBool, Ordering};
static DATA: std::sync::OnceLock<String> = std::sync::OnceLock::new();static READY: AtomicBool = AtomicBool::new(false);
fn producer() { // 先写数据 DATA.set("hello".to_owned()).unwrap(); // 再设置标志(Release 保证上面的写对 Acquire 可见) READY.store(true, Ordering::Release);}
fn consumer() { // 循环等待标志(Acquire 保证后续读能看到 Release 之前的写) while !READY.load(Ordering::Acquire) { std::hint::spin_loop(); } // 安全:DATA 一定已经初始化 let data = DATA.get().unwrap(); assert_eq!(data, "hello");}核心原则:Release store 之前的所有写操作,对执行了 Acquire load 且读到该值的线程可见。这就是”发布-获取”语义。
AcqRel:读写同时需要同步
fetch_add/fetch_sub 等既读又写的操作,需要同时 Acquire 和 Release 时使用:
use std::sync::atomic::{AtomicUsize, Ordering};
struct RwLockFree { readers: AtomicUsize, writer: AtomicBool,}
impl RwLockFree { fn read_enter(&self) -> bool { loop { // 先检查是否有写者 if self.writer.load(Ordering::Acquire) { return false; } // 原子增加读者计数 // AcqRel:Acquire 检查 writer,Release 让 writer 看到 reader 增加 let old = self.readers.fetch_add(1, Ordering::AcqRel); if old > 0 || !self.writer.load(Ordering::Acquire) { return true; } // 回退 self.readers.fetch_sub(1, Ordering::Relaxed); } }}SeqCst:最严格,也最慢
use std::sync::atomic::{AtomicUsize, Ordering};
fn seqlock_example() { let a = AtomicUsize::new(0); let b = AtomicUsize::new(0);
std::thread::scope(|s| { s.spawn(|| { a.store(1, Ordering::SeqCst); b.store(1, Ordering::SeqCst); }); s.spawn(|| { let rb = b.load(Ordering::SeqCst); let ra = a.load(Ordering::SeqCst); // SeqCst 保证:如果 rb == 1,则 ra 必然 == 1 // 用 Relaxed 则无法保证 if rb == 1 { assert_eq!(ra, 1); } }); });}选型建议:
| 场景 | 推荐 Ordering | 原因 |
|---|---|---|
| 简单计数 | Relaxed | 不需要同步 |
| 标志+数据 | Release/Acquire | 标准发布-获取 |
| CAS 循环修改 | AcqRel(成功)/ Acquire(失败) | 读写同步 |
| 多变量一致性 | SeqCst | 需要全局序 |
compare_exchange 循环实现 CAS
CAS(Compare-And-Swap)是无锁算法的基石。Rust 中通过 compare_exchange 和 compare_exchange_weak 实现。
compare_exchange vs compare_exchange_weak
use std::sync::atomic::{AtomicU32, Ordering};
let val = AtomicU32::new(5);
// compare_exchange:只在当前值等于 expected 时才替换// 如果不等于,返回 Err(当前值)let result = val.compare_exchange( 5, // expected 10, // new Ordering::SeqCst, Ordering::SeqCst,);assert_eq!(result, Ok(5)); // 返回 Ok(旧值)assert_eq!(val.load(Ordering::SeqCst), 10);
// compare_exchange_weak:在 ARM 等平台上可能 spuriously 失败// 即使当前值等于 expected,也可能返回 Err// 但在 CAS 循环中用 weak 版本更高效(避免额外的分支)实战经验:在 CAS 循环中始终用 compare_exchange_weak。weak 版本在 LL/SC 架构(ARM、RISC-V)上避免了一次额外的 load 指令,性能更好。在 x86 上两者等价(x86 CAS 是单指令)。
典型 CAS 循环:原子 fetch_update
use std::sync::atomic::{AtomicU32, Ordering};
fn atomic_square(val: &AtomicU32) -> u32 { loop { let current = val.load(Ordering::Acquire); let new = current * current; // 如果 val 仍然是 current,替换为 new // 否则重试 match val.compare_exchange_weak( current, new, Ordering::Release, Ordering::Relaxed, ) { Ok(old) => return old, Err(_) => continue, // 其他线程修改了,重试 } }}无锁栈(Treiber Stack)
这是最经典的无锁数据结构:
use std::sync::atomic::{AtomicPtr, Ordering};use std::ptr;
struct Node<T> { data: T, next: *mut Node<T>,}
pub struct LockFreeStack<T> { head: AtomicPtr<Node<T>>,}
impl<T> LockFreeStack<T> { pub const fn new() -> Self { Self { head: AtomicPtr::new(ptr::null_mut()) } }
pub fn push(&self, data: T) { let new_node = Box::into_raw(Box::new(Node { data, next: ptr::null_mut(), }));
// CAS 循环:将 new_node 插入链表头部 loop { let old_head = self.head.load(Ordering::Acquire); unsafe { (*new_node).next = old_head; }
if self.head.compare_exchange_weak( old_head, new_node, Ordering::Release, Ordering::Relaxed, ).is_ok() { return; } // CAS 失败,其他线程修改了 head,重试 } }
pub fn pop(&self) -> Option<T> { loop { let old_head = self.head.load(Ordering::Acquire); if old_head.is_null() { return None; }
let new_head = unsafe { (*old_head).next };
if self.head.compare_exchange_weak( old_head, new_head, Ordering::AcqRel, Ordering::Acquire, ).is_ok() { // 成功取走节点 let node = unsafe { Box::from_raw(old_head) }; return Some(node.data); } // CAS 失败,重试 } }}
impl<T> Drop for LockFreeStack<T> { fn drop(&mut self) { while self.pop().is_some() {} }}踩坑:Treiber Stack 的 pop 有 ABA 问题——如果线程 A 读到 head = P,然后被调度走,线程 B pop 了 P 并 push 了新节点恰好也在地址 P,线程 A 的 CAS 会错误成功。解决方案是用带版本号的指针(如 AtomicUsize 中高位存版本号,低位存地址)。
SeqLock 模式实现
SeqLock(顺序锁)是一种”乐观读”模式:读者不加锁,而是读一个序号,读数据,再检查序号是否变化。如果序号变了(写者在这期间写了),重试。
use std::sync::atomic::{AtomicUsize, Ordering};use std::cell::UnsafeCell;
pub struct SeqLock<T> { seq: AtomicUsize, data: UnsafeCell<T>,}
// SAFETY: SeqLock 内部用 seq 保证同步访问unsafe impl<T: Send> Sync for SeqLock<T> {}
impl<T: Copy> SeqLock<T> { pub const fn new(data: T) -> Self { Self { seq: AtomicUsize::new(0), data: UnsafeCell::new(data), } }
pub fn read(&self) -> T { loop { // 1. 读序号(Acquire 保证后续数据读取在序号读取之后) let seq1 = self.seq.load(Ordering::Acquire);
// 如果序号是奇数,说明写者正在写,重试 if seq1 & 1 == 1 { std::hint::spin_loop(); continue; }
// 2. 读数据 // SAFETY: 我们在 seq1 和 seq2 之间检查了序号 // 如果序号没变,数据在这期间没被修改 let data = unsafe { *self.data.get() };
// 3. 再读序号,检查是否变化 let seq2 = self.seq.load(Ordering::Acquire);
if seq1 == seq2 { return data; } // 序号变了,重试 } }
pub fn write(&self, new_data: T) { // 1. 序号加 1(奇数表示正在写) let old_seq = self.seq.fetch_add(1, Ordering::Release);
// 2. 写数据 unsafe { *self.data.get() = new_data; }
// 3. 序号再加 1(偶数表示写完) // Release 保证数据写在序号更新之前对读者可见 self.seq.fetch_add(1, Ordering::Release); }}使用场景:读多写少、数据较小(Copy 类型)、对读延迟敏感。Linux 内核大量使用 SeqLock 读取 jiffies 等频繁更新的时间数据。
踩坑:
- T 必须是 Copy:否则读者可能在写者写一半时读到半新半旧的数据,drop 时未定义行为
- 不能有指针:如果 T 含指针,半新半旧的读取可能导致悬垂指针
- 写者饥饿:如果写者太频繁,读者可能一直重试
版本 2:用 AtomicUsize 存储小数据
对于 u64 等小数据,如果平台支持 AtomicU64,直接用原子操作更简单:
use std::sync::atomic::{AtomicU64, Ordering};
pub struct AtomicTimestamp { value: AtomicU64,}
impl AtomicTimestamp { pub fn read(&self) -> u64 { self.value.load(Ordering::Relaxed) }
pub fn update(&self, new_val: u64) { self.value.store(new_val, Ordering::Relaxed); }}如果只需要最终一致性,Relaxed 足矣。
常见无锁数据结构思路
1. 无锁队列(Michael-Scott Queue)
生产者-消费者场景最常用的无锁数据结构:
思路:- 链表实现,dummy head 节点- push:CAS 修改 tail.next,再 CAS 推进 tail- pop:CAS 推进 head- 需要 Hazard Pointer 或 EBR 解决内存回收Rust 实现可用 crossbeam-queue 的 ArrayQueue(有界)和 SegQueue(无界)。
2. 无锁环形缓冲区(SPSC)
单生产者单消费者场景,无需 CAS:
use std::sync::atomic::{AtomicUsize, Ordering};use std::cell::UnsafeCell;
pub struct SpscRingBuffer<T> { buf: UnsafeCell<Vec<T>>, capacity: usize, head: AtomicUsize, // 读位置 tail: AtomicUsize, // 写位置}
unsafe impl<T: Send> Sync for SpscRingBuffer<T> {}
impl<T: Default + Copy> SpscRingBuffer<T> { pub fn new(capacity: usize) -> Self { let mut buf = Vec::with_capacity(capacity); for _ in 0..capacity { buf.push(T::default()); } Self { buf: UnsafeCell::new(buf), capacity, head: AtomicUsize::new(0), tail: AtomicUsize::new(0), } }
pub fn push(&self, item: T) -> bool { let tail = self.tail.load(Ordering::Relaxed); let next_tail = (tail + 1) % self.capacity; if next_tail == self.head.load(Ordering::Acquire) { return false; // 满 } unsafe { (*self.buf.get())[tail] = item; } self.tail.store(next_tail, Ordering::Release); true }
pub fn pop(&self) -> Option<T> { let head = self.head.load(Ordering::Relaxed); if head == self.tail.load(Ordering::Acquire) { return None; // 空 } let item = unsafe { (*self.buf.get())[head] }; self.head.store((head + 1) % self.capacity, Ordering::Release); Some(item) }}关键:SPSC 环形缓冲区中,生产者只写 tail,消费者只写 head,没有竞争,因此用 Acquire/Release 就足够,不需要 CAS。
3. 无锁 HashMap
真正的无锁 HashMap 极其复杂(需要 resize 等),实践中通常用分片锁方案:
use std::sync::atomic::AtomicUsize;
// 思路:将 HashMap 分成 N 个 shard// 每个 shard 用 RwLock 或 SeqLock// 用 AtomicUsize 做负载均衡和 shard 选择// 这是 "lock-striping" 而非真正无锁// 真正无锁的 HashMap 参见: crossbeam-skiplist 或 flurry内存回收:无锁的最大难题
无锁数据结构中,节点何时回收是核心难题。Rust 中主要方案:
| 方案 | 机制 | 优点 | 缺点 |
|---|---|---|---|
| Hazard Pointer | 每线程声明”正在使用”的指针 | 确切回收 | 实现复杂,性能开销 |
| EBR (Epoch-Based Reclamation) | 全局 epoch,延迟回收 | 性能好 | 最坏情况下延迟高 |
| QSBR (Quiescent State) | 每线程声明”不持有引用” | 无额外读开销 | 需要主动声明 |
crossbeam-epoch | EBR 的 Rust 实现 | 可用、稳定 | 需要 unsafe |
实战建议:优先用 crossbeam 系列 crate,它们已经处理了内存回收。自己写无锁数据结构时,先用 crossbeam-epoch 管理内存。
实战经验总结
1. 优先用 SeqCst,有性能问题再降级
不要过早优化 ordering。先用 SeqCst 确保正确性,再用 benchmark 确认 ordering 是瓶颈,再逐步降级。一个错误的 Ordering 比 SeqCst 的开销严重一万倍。
2. Ordering 的正确性无法用测试验证
内存序错误只在特定硬件(ARM、PowerPC)和特定调度下才会暴露。x86 是强序模型(TSO),大多数 Relaxed 错误在 x86 上不会暴露。必须在 ARM 上做 CI 测试。
3. CAS 循环要防止活锁
// 活锁:多个线程同时 CAS,互相失败loop { let current = val.load(Ordering::Acquire); if val.compare_exchange_weak( current, f(current), Ordering::Release, Ordering::Relaxed ).is_ok() { break; } // 加退避减少竞争 std::hint::spin_loop();}对于高竞争场景,考虑指数退避或用锁替代。
4. 无锁不等于更快
在低竞争场景下,Mutex 往往比无锁方案更快,因为:
- Cache bounce:CAS 失败导致缓存行无效化
- 重试开销:CAS 循环反复 load
- 代码复杂度:维护成本高
用 benchmark 做决策,而非直觉。
5. AtomicPtr + Box::into_raw / Box::from_raw 是 Rust 无锁编程的标准范式
Rust 的所有权模型使得无锁指针管理比 C 更安全——至少你能在类型系统中表达”这里不管理生命周期,但最终要回收”。
支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!
TinyZ's Blog