Rust 2026 经验谈 - 异步 + FFI:桥接同步与异步世界

2130 字
11 分钟
Rust 2026 经验谈 - 异步 + FFI:桥接同步与异步世界

Rust 的异步运行时基于 epoll/kqueue/io_uring 等非阻塞 I/O,但现实世界中大量 C 库是阻塞的:数据库驱动、加密库、文件系统操作、硬件 SDK。如何在异步运行时中安全高效地调用这些阻塞代码,是 Rust 生产环境的核心挑战。本文系统梳理异步与 FFI 桥接的各个模式。

block_in_place vs spawn_blocking#

Tokio 提供了两种在异步上下文中执行阻塞代码的方式,它们的行为差异微妙但关键。

spawn_blocking:最安全的选择#

use tokio::task;
async fn call_blocking_c_lib(input: &[u8]) -> Vec<u8> {
// 将阻塞调用提交到专门的阻塞线程池
let input = input.to_owned();
task::spawn_blocking(move || {
// 这里的代码运行在阻塞线程池中
// 不会占用异步工作线程
expensive_c_computation(&input)
})
.await
.expect("spawn_blocking panicked")
}
fn expensive_c_computation(data: &[u8]) -> Vec<u8> {
// 模拟阻塞的 C 库调用
std::thread::sleep(std::time::Duration::from_millis(100));
data.to_vec()
}

spawn_blocking 的工作原理:

  1. 将闭包提交到一个独立的多线程线程池(与异步运行时分离)
  2. 异步工作线程立即释放,可以调度其他 task
  3. 闭包执行完毕后,结果通过 oneshot channel 传回异步世界

block_in_place:在当前线程上阻塞#

use tokio::task;
async fn call_blocking_in_place(input: &[u8]) -> Vec<u8> {
let input = input.to_owned();
// 在当前线程上阻塞,但告知运行时可以借用此线程
task::block_in_place(move || {
expensive_c_computation(&input)
})
}

block_in_place 的工作原理:

  1. 当前异步工作线程上执行闭包
  2. 通知运行时”这个线程暂时不可用”,运行时会增加一个工作线程来补偿
  3. 闭包返回后,当前线程恢复为异步工作线程

关键差异对比#

维度spawn_blockingblock_in_place
执行线程专门的阻塞线程池当前异步工作线程
线程创建无(复用阻塞池)可能触发新工作线程
闭包约束'static + Send'static + Send
task 间影响临时减少异步工作线程数
适用场景长时间阻塞短时间阻塞
current-thread 运行时支持不支持(panic)
多次调用线程池复用,高效可能反复创建线程
与 task::spawn 交互安全安全

选型建议:

  • 默认用 spawn_blocking。它是更安全、更可预测的选择。
  • block_in_place 适合阻塞时间极短(< 1ms)的场景,避免 spawn_blocking 的闭包提交开销。
  • current_thread 运行时中,只能用 spawn_blockingblock_in_place 会 panic。
  • 在多层嵌套中,block_in_place 可以在 spawn_blocking 闭包内使用,但反过来不行。

常见错误:在 async fn 中直接阻塞#

// 错误!会阻塞异步工作线程,影响同一线程上的其他 task
async fn bad_blocking() -> String {
std::thread::sleep(std::time::Duration::from_secs(5)); // 整个线程卡住!
String::from("done")
}
// 正确:用 spawn_blocking
async fn good_blocking() -> String {
tokio::task::spawn_blocking(|| {
std::thread::sleep(std::time::Duration::from_secs(5));
String::from("done")
})
.await
.unwrap()
}

直接在 async fn 中调用阻塞函数是新手最常犯的错误,后果是整个工作线程被卡住,同一线程上的其他 task 全部无法调度。

criterion 异步 Benchmark 实操#

对异步代码做 benchmark 比同步代码复杂,因为需要运行时。以下是 2026 年的推荐实践:

基本模式#

use criterion::{criterion_group, criterion_main, Criterion, BenchmarkId};
use tokio::runtime::Runtime;
fn bench_async_function(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
c.bench_function("async_computation", |b| {
b.to_async(&rt).iter(|| async {
some_async_work().await
})
});
}
async fn some_async_work() -> u64 {
tokio::time::sleep(std::time::Duration::from_micros(10)).await;
42
}
criterion_group!(benches, bench_async_function);
criterion_main!(benches);

b.to_async(&rt) 是 criterion 提供的异步 benchmark 适配器,它在指定的运行时上执行 async 闭包。

对比 spawn_blocking 开销#

fn bench_spawn_vs_block_in_place(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("blocking_methods");
group.bench_function("spawn_blocking", |b| {
b.to_async(&rt).iter(|| async {
tokio::task::spawn_blocking(|| {
std::thread::sleep(std::time::Duration::from_micros(100));
})
.await
.unwrap()
})
});
group.bench_function("block_in_place", |b| {
b.to_async(&rt).iter(|| async {
tokio::task::block_in_place(|| {
std::thread::sleep(std::time::Duration::from_micros(100));
})
})
});
group.finish();
}

实测结果(在我的 i7-13700K 上):

  • 阻塞 100μs 的任务,spawn_blocking 均值约 115μs(含线程池调度)
  • block_in_place 均值约 102μs(直接在当前线程)
  • 差异随阻塞时间缩小而变得显著:10μs 任务差异可达 30%

注意事项#

  1. 不要在 benchmark 中创建新运行时——在函数外创建一次,传引用进去
  2. 异步 benchmark 不测量运行时启动时间——那不是你的代码开销
  3. black_box 防止优化消除——criterion::black_box(result)
  4. 对含 I/O 的 benchmark 要谨慎——结果不稳定,考虑 mock

与 C 库的异步交互模式#

模式一:回调转 Future#

很多 C 库采用回调模式(注册回调函数,异步通知结果)。将其转为 Rust Future 的标准模式:

use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
struct CCallbackFuture {
result: Arc<Mutex<Option<i32>>>,
waker: Arc<Mutex<Option<Waker>>>,
}
impl Future for CCallbackFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut result = self.result.lock().unwrap();
if let Some(val) = result.take() {
Poll::Ready(val)
} else {
// 保存 waker,C 回调触发时唤醒
*self.waker.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending
}
}
}
// C 回调函数
unsafe extern "C" fn on_complete(value: i32, user_data: *mut std::ffi::c_void) {
let data = unsafe { &*(user_data as *const CallbackData) };
*data.result.lock().unwrap() = Some(value);
if let Some(waker) = data.waker.lock().unwrap().take() {
waker.wake();
}
}
struct CallbackData {
result: Arc<Mutex<Option<i32>>>,
waker: Arc<Mutex<Option<Waker>>>,
}

关键要点:

  • Arc<Mutex<>> 在 C 回调和 Rust Future 间共享状态
  • 回调中必须唤醒 waker,否则 Future 永远 Pending
  • user_data 指针的生命周期管理是最大的安全风险——确保 Future 活着时 user_data 有效

模式二:轮询模式(Polling C API)#

某些 C 库提供非阻塞 poll 接口:

use tokio::io::Interest;
use tokio::net::UdpSocket;
async fn poll_c_fd() -> std::io::Result<()> {
// 将 C 库的 fd 注册到 tokio 的 epoll
// 注意:from_raw_fd 需要 unsafe,且要求 fd 有效且未被 tokio 接管
let socket = unsafe { UdpSocket::from_raw_fd(c_library_fd) };
loop {
socket.ready(Interest::READABLE).await?;
// fd 可读,调用 C 库的非阻塞读取
let result = unsafe { c_lib_nonblocking_read() };
if result > 0 {
// 处理数据
}
}
}

模式三:io_uring 集成思路#

io_uring 是 Linux 5.1+ 的异步 I/O 接口,与 Rust 异步运行时有天然的匹配性:

// 使用 io_uring 的概念模型(通过 tokio-uring 或 glommio)
async fn uring_read_example() -> std::io::Result<Vec<u8>> {
// tokio-uring 的写法
use tokio_uring::fs::File;
let file = File::open("huge_file.dat").await?;
let buf = vec![0u8; 4096];
let (res, buf) = file.read_at(buf, 0).await?;
Ok(buf[..res].to_vec())
}

io_uring 集成的挑战:

  1. 与 tokio 的兼容性:tokio 默认用 epoll,io_uring 需要独立的运行时(tokio-uring)或替代运行时(glommio)
  2. 缓冲区所有权:io_uring 要求缓冲区在操作完成前保持有效,这与 Rust 的借用模型有摩擦
  3. 内核版本要求:需要 Linux 5.1+(某些特性需 5.6+),跨平台部署受限
  4. 调优参数:uring 的 entry 数量、fixed buffer、SQE 批量提交等需要针对场景调优

在异步运行时中调用阻塞系统库的经验#

经验一:识别你的阻塞边界#

不是所有 FFI 都是阻塞的。分类判断:

类型例子处理方式
CPU 密集加密、压缩、图像处理spawn_blocking
阻塞 I/O传统文件 I/O、数据库驱动spawn_blocking
非阻塞 I/Oepoll-based C 库fd 注册到运行时
回调式libcurl、libuv回调转 Future
长期阻塞硬件等待、串口读独立线程 + channel

经验二:控制阻塞线程池大小#

Tokio 的阻塞线程池默认最大 512 个线程。对于 CPU 密集型任务,你应该限制到 CPU 核心数:

use tokio::runtime::Builder;
let rt = Builder::new_multi_thread()
.worker_threads(4) // 异步工作线程
.max_blocking_threads(8) // 阻塞线程池上限
.build()
.unwrap();

过大的阻塞池会导致线程竞争和调度开销;过小会导致任务排队。经验公式:CPU 密集型设为核心数,I/O 密集型可设为核心数的 2~4 倍。

经验三:避免在 spawn_blocking 中持有异步锁#

// 危险:在阻塞代码中持有异步 Mutex 的锁
async fn bad_pattern() {
let data = Arc::new(tokio::sync::Mutex::new(vec![]));
let data_clone = data.clone();
tokio::task::spawn_blocking(move || {
let mut guard = data_clone.blocking_lock(); // 这可以工作...
guard.push(1);
// 但如果另一个 task 在等锁,可能死锁
})
.await
.unwrap();
}
// 更安全:在异步侧获取锁,将数据传入阻塞闭包
async fn good_pattern() {
let data = Arc::new(tokio::sync::Mutex::new(vec![]));
{
let mut guard = data.lock().await;
let inner = &mut *guard;
tokio::task::spawn_blocking(move || {
// 对 inner 的数据做阻塞处理
})
.await
.unwrap();
}
}

经验四:为 C 库的线程安全性建档#

调用 C 库前,确认其线程安全模型:

  • 线程安全:可以自由在 spawn_blocking 中并发调用
  • 线程局部状态:每个 spawn_blocking 闭包是独立线程,线程局部状态不共享——需要显式传递
  • 全局锁:C 库内部可能有全局 mutex,并发 spawn_blocking 调用实际串行执行,性能不如预期

经验五:cbindgen 与建桥的自动化#

当 Rust 异步代码需要暴露给 C 调用时:

// Rust 侧
#[unsafe(no_mangle)]
pub unsafe extern "C" fn rust_async_operation(
input: *const u8,
input_len: usize,
callback: unsafe extern "C" fn(*mut u8, usize),
) {
let input_data = unsafe { std::slice::from_raw_parts(input, input_len) }.to_vec();
// 在独立运行时中启动异步操作
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let result = async_process(&input_data).await;
callback(result.as_ptr() as *mut u8, result.len());
});
});
}

注意这里创建了独立运行时——因为 C 侧的调用不受 Rust 运行时管理,必须自行管理生命周期。

支持与分享

如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!

赞助
Rust 2026 经验谈 - 异步 + FFI:桥接同步与异步世界
https://tinyzzh.github.io/posts/rust-2026/2026-06-17-rust_2026_017_async_ffi/
作者
TinyZ Zzh
发布于
2026-06-17
许可协议
CC BY-NC-SA 4.0

评论区

Profile Image of the Author
TinyZ Zzh
专注于高并发服务器、网络游戏相关(Java、PHP、Unity3D、Unreal Engine等)技术,热爱游戏事业, 正在努力实现自我价值当中。
公告
欢迎来到我的博客!这是一则示例公告。
音乐
封面

音乐

暂未播放

0:00 0:00
暂无歌词
分类
标签
站点统计
文章
231
分类
38
标签
253
总字数
382,993
运行时长
0
最后活动
0 天前

文章目录