Rust 2026 经验谈 - 无锁并发模式

3054 字
15 分钟
Rust 2026 经验谈 - 无锁并发模式

无锁(lock-free)并发是高性能系统编程的圣杯。Rust 的原子类型和内存序提供了构建无锁数据结构的原语,但正确使用它们需要对硬件内存模型有深刻理解。本文从实战角度出发,详解 Rust 中无锁编程的核心模式与踩坑经验。

Atomic 类型全览#

标准库原子类型#

Rust 标准库在 std::sync::atomic 模块中提供了一组原子类型,每个对应一种基础类型:

use std::sync::atomic::{
AtomicBool, AtomicI8, AtomicI16, AtomicI32, AtomicI64, AtomicIsize,
AtomicU8, AtomicU16, AtomicU32, AtomicU64, AtomicUsize,
AtomicPtr,
};

分类

类别类型典型用途
布尔AtomicBool开关标志、终止信号
有符号整数AtomicI8 ~ AtomicI64, AtomicIsize计数器(可为负)、差值
无符号整数AtomicU8 ~ AtomicU64, AtomicUsize引用计数、索引、长度
指针AtomicPtr<T>无锁链表、指针摇摆

核心操作#

每个原子类型提供三类基本操作:

use std::sync::atomic::{AtomicU32, Ordering};
let counter = AtomicU32::new(0);
// 1. load / store——读取 / 写入
let val = counter.load(Ordering::SeqCst);
counter.store(42, Ordering::SeqCst);
// 2. fetch_xxx——读取并修改(返回旧值)
let old = counter.fetch_add(1, Ordering::SeqCst); // 原子自增
let old = counter.fetch_sub(1, Ordering::SeqCst); // 原子自减
let old = counter.fetch_and(0xFF, Ordering::SeqCst); // 原子位与
let old = counter.fetch_or(0x100, Ordering::SeqCst); // 原子位或
let old = counter.fetch_xor(0x1, Ordering::SeqCst); // 原子位异或
let old = counter.fetch_max(100, Ordering::SeqCst); // 原子取最大值
let old = counter.fetch_min(0, Ordering::SeqCst); // 原子取最小值
// 3. compare_exchange / compare_exchange_weak——CAS
let result = counter.compare_exchange(
42, // 期望值
100, // 新值
Ordering::SeqCst, // 成功时的 ordering
Ordering::SeqCst, // 失败时的 ordering
);

AtomicPtr 的特殊操作#

AtomicPtr<T> 是无锁数据结构的关键原语:

use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;
let head: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
// 读取指针
let current = head.load(Ordering::Acquire);
// CAS 替换指针(实现无锁 push)
let new_node = Box::into_raw(Box::new(Node { next: ptr::null_mut(), data: 42 }));
loop {
let old = head.load(Ordering::Acquire);
unsafe { (*new_node).next = old; }
if head.compare_exchange_weak(
old, new_node, Ordering::Release, Ordering::Relaxed
).is_ok() {
break;
}
}

踩坑AtomicPtr 存储的是裸指针,不管理内存。你必须在合适时机用 Box::from_raw 回收,否则泄漏。

Memory Ordering 实战详解#

内存序是无锁编程最难的部分。选错 ordering 不会 panic,但会导致其他线程看到不一致的数据——这是最隐蔽的 bug。

五种 Ordering 的语义#

use std::sync::atomic::Ordering;
Ordering语义保证使用场景
Relaxed无同步,只保证原子性单个变量的操作原子计数器、统计
Release写操作:之前的写对 Acquire 可见写-写不重排”发布”数据
Acquire读操作:之后的读能看到 Release 的写读-读不重排”获取”数据
AcqRel读写操作:同时具备 Acquire + Release双向不重排read-modify-write
SeqCst全局总序所有线程看到相同顺序最严格、最保守

Relaxed:只管原子,不管顺序#

use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
static COUNTER: AtomicU64 = AtomicU64::new(0);
fn relaxed_counter() {
let handles: Vec<_> = (0..10)
.map(|_| {
thread::spawn(|| {
for _ in 0..1_000_000 {
COUNTER.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
// 保证最终值是 10_000_000(原子性保证不丢失更新)
assert_eq!(COUNTER.load(Ordering::Relaxed), 10_000_000);
}

踩坑:Relaxed 不保证其他变量的写入顺序。如果线程 A 先写 data = 42flag.store(1, Relaxed),线程 B 读到 flag == 1 不代表能看到 data == 42

Acquire/Release:生产者-消费者同步#

这是最常用的非 SeqCst 组合:

use std::sync::atomic::{AtomicBool, Ordering};
static DATA: std::sync::OnceLock<String> = std::sync::OnceLock::new();
static READY: AtomicBool = AtomicBool::new(false);
fn producer() {
// 先写数据
DATA.set("hello".to_owned()).unwrap();
// 再设置标志(Release 保证上面的写对 Acquire 可见)
READY.store(true, Ordering::Release);
}
fn consumer() {
// 循环等待标志(Acquire 保证后续读能看到 Release 之前的写)
while !READY.load(Ordering::Acquire) {
std::hint::spin_loop();
}
// 安全:DATA 一定已经初始化
let data = DATA.get().unwrap();
assert_eq!(data, "hello");
}

核心原则Release store 之前的所有写操作,对执行了 Acquire load 且读到该值的线程可见。这就是”发布-获取”语义。

AcqRel:读写同时需要同步#

fetch_add/fetch_sub 等既读又写的操作,需要同时 Acquire 和 Release 时使用:

use std::sync::atomic::{AtomicUsize, Ordering};
struct RwLockFree {
readers: AtomicUsize,
writer: AtomicBool,
}
impl RwLockFree {
fn read_enter(&self) -> bool {
loop {
// 先检查是否有写者
if self.writer.load(Ordering::Acquire) {
return false;
}
// 原子增加读者计数
// AcqRel:Acquire 检查 writer,Release 让 writer 看到 reader 增加
let old = self.readers.fetch_add(1, Ordering::AcqRel);
if old > 0 || !self.writer.load(Ordering::Acquire) {
return true;
}
// 回退
self.readers.fetch_sub(1, Ordering::Relaxed);
}
}
}

SeqCst:最严格,也最慢#

use std::sync::atomic::{AtomicUsize, Ordering};
fn seqlock_example() {
let a = AtomicUsize::new(0);
let b = AtomicUsize::new(0);
std::thread::scope(|s| {
s.spawn(|| {
a.store(1, Ordering::SeqCst);
b.store(1, Ordering::SeqCst);
});
s.spawn(|| {
let rb = b.load(Ordering::SeqCst);
let ra = a.load(Ordering::SeqCst);
// SeqCst 保证:如果 rb == 1,则 ra 必然 == 1
// 用 Relaxed 则无法保证
if rb == 1 {
assert_eq!(ra, 1);
}
});
});
}

选型建议

场景推荐 Ordering原因
简单计数Relaxed不需要同步
标志+数据Release/Acquire标准发布-获取
CAS 循环修改AcqRel(成功)/ Acquire(失败)读写同步
多变量一致性SeqCst需要全局序

compare_exchange 循环实现 CAS#

CAS(Compare-And-Swap)是无锁算法的基石。Rust 中通过 compare_exchangecompare_exchange_weak 实现。

compare_exchange vs compare_exchange_weak#

use std::sync::atomic::{AtomicU32, Ordering};
let val = AtomicU32::new(5);
// compare_exchange:只在当前值等于 expected 时才替换
// 如果不等于,返回 Err(当前值)
let result = val.compare_exchange(
5, // expected
10, // new
Ordering::SeqCst,
Ordering::SeqCst,
);
assert_eq!(result, Ok(5)); // 返回 Ok(旧值)
assert_eq!(val.load(Ordering::SeqCst), 10);
// compare_exchange_weak:在 ARM 等平台上可能 spuriously 失败
// 即使当前值等于 expected,也可能返回 Err
// 但在 CAS 循环中用 weak 版本更高效(避免额外的分支)

实战经验:在 CAS 循环中始终用 compare_exchange_weakweak 版本在 LL/SC 架构(ARM、RISC-V)上避免了一次额外的 load 指令,性能更好。在 x86 上两者等价(x86 CAS 是单指令)。

典型 CAS 循环:原子 fetch_update#

use std::sync::atomic::{AtomicU32, Ordering};
fn atomic_square(val: &AtomicU32) -> u32 {
loop {
let current = val.load(Ordering::Acquire);
let new = current * current;
// 如果 val 仍然是 current,替换为 new
// 否则重试
match val.compare_exchange_weak(
current,
new,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(old) => return old,
Err(_) => continue, // 其他线程修改了,重试
}
}
}

无锁栈(Treiber Stack)#

这是最经典的无锁数据结构:

use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;
struct Node<T> {
data: T,
next: *mut Node<T>,
}
pub struct LockFreeStack<T> {
head: AtomicPtr<Node<T>>,
}
impl<T> LockFreeStack<T> {
pub const fn new() -> Self {
Self { head: AtomicPtr::new(ptr::null_mut()) }
}
pub fn push(&self, data: T) {
let new_node = Box::into_raw(Box::new(Node {
data,
next: ptr::null_mut(),
}));
// CAS 循环:将 new_node 插入链表头部
loop {
let old_head = self.head.load(Ordering::Acquire);
unsafe { (*new_node).next = old_head; }
if self.head.compare_exchange_weak(
old_head,
new_node,
Ordering::Release,
Ordering::Relaxed,
).is_ok() {
return;
}
// CAS 失败,其他线程修改了 head,重试
}
}
pub fn pop(&self) -> Option<T> {
loop {
let old_head = self.head.load(Ordering::Acquire);
if old_head.is_null() {
return None;
}
let new_head = unsafe { (*old_head).next };
if self.head.compare_exchange_weak(
old_head,
new_head,
Ordering::AcqRel,
Ordering::Acquire,
).is_ok() {
// 成功取走节点
let node = unsafe { Box::from_raw(old_head) };
return Some(node.data);
}
// CAS 失败,重试
}
}
}
impl<T> Drop for LockFreeStack<T> {
fn drop(&mut self) {
while self.pop().is_some() {}
}
}

踩坑:Treiber Stack 的 pop 有 ABA 问题——如果线程 A 读到 head = P,然后被调度走,线程 B pop 了 P 并 push 了新节点恰好也在地址 P,线程 A 的 CAS 会错误成功。解决方案是用带版本号的指针(如 AtomicUsize 中高位存版本号,低位存地址)。

SeqLock 模式实现#

SeqLock(顺序锁)是一种”乐观读”模式:读者不加锁,而是读一个序号,读数据,再检查序号是否变化。如果序号变了(写者在这期间写了),重试。

use std::sync::atomic::{AtomicUsize, Ordering};
use std::cell::UnsafeCell;
pub struct SeqLock<T> {
seq: AtomicUsize,
data: UnsafeCell<T>,
}
// SAFETY: SeqLock 内部用 seq 保证同步访问
unsafe impl<T: Send> Sync for SeqLock<T> {}
impl<T: Copy> SeqLock<T> {
pub const fn new(data: T) -> Self {
Self {
seq: AtomicUsize::new(0),
data: UnsafeCell::new(data),
}
}
pub fn read(&self) -> T {
loop {
// 1. 读序号(Acquire 保证后续数据读取在序号读取之后)
let seq1 = self.seq.load(Ordering::Acquire);
// 如果序号是奇数,说明写者正在写,重试
if seq1 & 1 == 1 {
std::hint::spin_loop();
continue;
}
// 2. 读数据
// SAFETY: 我们在 seq1 和 seq2 之间检查了序号
// 如果序号没变,数据在这期间没被修改
let data = unsafe { *self.data.get() };
// 3. 再读序号,检查是否变化
let seq2 = self.seq.load(Ordering::Acquire);
if seq1 == seq2 {
return data;
}
// 序号变了,重试
}
}
pub fn write(&self, new_data: T) {
// 1. 序号加 1(奇数表示正在写)
let old_seq = self.seq.fetch_add(1, Ordering::Release);
// 2. 写数据
unsafe {
*self.data.get() = new_data;
}
// 3. 序号再加 1(偶数表示写完)
// Release 保证数据写在序号更新之前对读者可见
self.seq.fetch_add(1, Ordering::Release);
}
}

使用场景:读多写少、数据较小(Copy 类型)、对读延迟敏感。Linux 内核大量使用 SeqLock 读取 jiffies 等频繁更新的时间数据。

踩坑

  1. T 必须是 Copy:否则读者可能在写者写一半时读到半新半旧的数据,drop 时未定义行为
  2. 不能有指针:如果 T 含指针,半新半旧的读取可能导致悬垂指针
  3. 写者饥饿:如果写者太频繁,读者可能一直重试

版本 2:用 AtomicUsize 存储小数据#

对于 u64 等小数据,如果平台支持 AtomicU64,直接用原子操作更简单:

use std::sync::atomic::{AtomicU64, Ordering};
pub struct AtomicTimestamp {
value: AtomicU64,
}
impl AtomicTimestamp {
pub fn read(&self) -> u64 {
self.value.load(Ordering::Relaxed)
}
pub fn update(&self, new_val: u64) {
self.value.store(new_val, Ordering::Relaxed);
}
}

如果只需要最终一致性,Relaxed 足矣。

常见无锁数据结构思路#

1. 无锁队列(Michael-Scott Queue)#

生产者-消费者场景最常用的无锁数据结构:

思路:
- 链表实现,dummy head 节点
- push:CAS 修改 tail.next,再 CAS 推进 tail
- pop:CAS 推进 head
- 需要 Hazard Pointer 或 EBR 解决内存回收

Rust 实现可用 crossbeam-queueArrayQueue(有界)和 SegQueue(无界)。

2. 无锁环形缓冲区(SPSC)#

单生产者单消费者场景,无需 CAS:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::cell::UnsafeCell;
pub struct SpscRingBuffer<T> {
buf: UnsafeCell<Vec<T>>,
capacity: usize,
head: AtomicUsize, // 读位置
tail: AtomicUsize, // 写位置
}
unsafe impl<T: Send> Sync for SpscRingBuffer<T> {}
impl<T: Default + Copy> SpscRingBuffer<T> {
pub fn new(capacity: usize) -> Self {
let mut buf = Vec::with_capacity(capacity);
for _ in 0..capacity {
buf.push(T::default());
}
Self {
buf: UnsafeCell::new(buf),
capacity,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
}
}
pub fn push(&self, item: T) -> bool {
let tail = self.tail.load(Ordering::Relaxed);
let next_tail = (tail + 1) % self.capacity;
if next_tail == self.head.load(Ordering::Acquire) {
return false; // 满
}
unsafe {
(*self.buf.get())[tail] = item;
}
self.tail.store(next_tail, Ordering::Release);
true
}
pub fn pop(&self) -> Option<T> {
let head = self.head.load(Ordering::Relaxed);
if head == self.tail.load(Ordering::Acquire) {
return None; // 空
}
let item = unsafe { (*self.buf.get())[head] };
self.head.store((head + 1) % self.capacity, Ordering::Release);
Some(item)
}
}

关键:SPSC 环形缓冲区中,生产者只写 tail,消费者只写 head,没有竞争,因此用 Acquire/Release 就足够,不需要 CAS。

3. 无锁 HashMap#

真正的无锁 HashMap 极其复杂(需要 resize 等),实践中通常用分片锁方案:

use std::sync::atomic::AtomicUsize;
// 思路:将 HashMap 分成 N 个 shard
// 每个 shard 用 RwLock 或 SeqLock
// 用 AtomicUsize 做负载均衡和 shard 选择
// 这是 "lock-striping" 而非真正无锁
// 真正无锁的 HashMap 参见: crossbeam-skiplist 或 flurry

内存回收:无锁的最大难题#

无锁数据结构中,节点何时回收是核心难题。Rust 中主要方案:

方案机制优点缺点
Hazard Pointer每线程声明”正在使用”的指针确切回收实现复杂,性能开销
EBR (Epoch-Based Reclamation)全局 epoch,延迟回收性能好最坏情况下延迟高
QSBR (Quiescent State)每线程声明”不持有引用”无额外读开销需要主动声明
crossbeam-epochEBR 的 Rust 实现可用、稳定需要 unsafe

实战建议:优先用 crossbeam 系列 crate,它们已经处理了内存回收。自己写无锁数据结构时,先用 crossbeam-epoch 管理内存。

实战经验总结#

1. 优先用 SeqCst,有性能问题再降级#

不要过早优化 ordering。先用 SeqCst 确保正确性,再用 benchmark 确认 ordering 是瓶颈,再逐步降级。一个错误的 Ordering 比 SeqCst 的开销严重一万倍

2. Ordering 的正确性无法用测试验证#

内存序错误只在特定硬件(ARM、PowerPC)和特定调度下才会暴露。x86 是强序模型(TSO),大多数 Relaxed 错误在 x86 上不会暴露。必须在 ARM 上做 CI 测试

3. CAS 循环要防止活锁#

// 活锁:多个线程同时 CAS,互相失败
loop {
let current = val.load(Ordering::Acquire);
if val.compare_exchange_weak(
current, f(current), Ordering::Release, Ordering::Relaxed
).is_ok() {
break;
}
// 加退避减少竞争
std::hint::spin_loop();
}

对于高竞争场景,考虑指数退避或用锁替代。

4. 无锁不等于更快#

在低竞争场景下,Mutex 往往比无锁方案更快,因为:

  • Cache bounce:CAS 失败导致缓存行无效化
  • 重试开销:CAS 循环反复 load
  • 代码复杂度:维护成本高

用 benchmark 做决策,而非直觉。

5. AtomicPtr + Box::into_raw / Box::from_raw 是 Rust 无锁编程的标准范式#

Rust 的所有权模型使得无锁指针管理比 C 更安全——至少你能在类型系统中表达”这里不管理生命周期,但最终要回收”。

支持与分享

如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!

赞助
Rust 2026 经验谈 - 无锁并发模式
https://tinyzzh.github.io/posts/rust-2026/2026-06-21-rust_2026_021_lock_free/
作者
TinyZ Zzh
发布于
2026-06-21
许可协议
CC BY-NC-SA 4.0

评论区

Profile Image of the Author
TinyZ Zzh
专注于高并发服务器、网络游戏相关(Java、PHP、Unity3D、Unreal Engine等)技术,热爱游戏事业, 正在努力实现自我价值当中。
公告
欢迎来到我的博客!这是一则示例公告。
音乐
封面

音乐

暂未播放

0:00 0:00
暂无歌词
分类
标签
站点统计
文章
235
分类
38
标签
262
总字数
391,680
运行时长
0
最后活动
0 天前

文章目录