Rust 2026 经验谈 - Tokio 2026 实战
Tokio 是 Rust 生态中事实标准的异步运行时,但”用 Tokio”和”用好 Tokio”之间隔着运行时配置、spawn 策略、cooperative yielding、cancel safety 等一堆实战知识。本文聚焦 Tokio 1.x 在 2026 年的最佳实践,帮你避开生产中反复出现的那些坑。
Tokio 运行时配置
multi-thread vs current-thread
use tokio::runtime::Runtime;
// multi-thread:多线程调度器(生产默认选择)let rt = Runtime::new().unwrap();
// current-thread:单线程调度器(测试、轻量工具)let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap();| 维度 | multi-thread | current-thread |
|---|---|---|
| 并行度 | N 个 worker 线程 | 1 个线程 |
| 任务调度 | work-stealing | FIFO |
| 适用场景 | 服务端、高并发 | 测试、CLI、嵌入式 |
| 启动开销 | 较大(N 个线程) | 极小 |
选型决策树
需要真正的并行(CPU 多核利用)?├── 是 → multi-thread└── 否 → 是测试吗? ├── 是 → current-thread(更快的启动,更易调试) └── 否 → 是单线程嵌入式? ├── 是 → current-thread └── 否 → multi-thread(大多数情况的安全选择)worker_threads 配置
let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(4) // 4 个 worker 线程 .max_blocking_threads(32) // 阻塞线程池上限 .enable_all() .build() .unwrap();worker_threads 的选择:
- IO 密集型:不需要太多 worker 线程——它们大部分时间在等 IO。4-8 个通常足够,即使你用 64 核机器
- 混合型:如果 async 任务中有 CPU 计算部分,增加 worker 线程。但更好的做法是用
spawn_blocking - 默认值:
num_cpus——对大多数场景合理,但 IO 密集型时可能过多
踩坑:过度配置 worker_threads
// 反模式:IO 密集型配 64 个 workerlet rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(64) // 每个线程 ~8MB 栈 = 512MB 内存! .build() .unwrap();
// 正确:IO 密集型 4-8 个 worker 足够let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(4) .build() .unwrap();运行时配置的其他参数
let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(4) .max_blocking_threads(512) // spawn_blocking 线程池上限 .thread_stack_size(2 * 1024 * 1024) // 自定义栈大小(默认 2MB) .thread_name("my-worker") // 线程名(方便调试) .thread_name_fn(|| format!("worker-{}", THREAD_COUNTER.fetch_add(1, Ordering::Relaxed))) .on_thread_start(|| log::info!("worker started")) .on_thread_stop(|| log::info!("worker stopped")) .enable_io() // 启用 IO(epoll/kqueue/IOCP) .enable_time() // 启用定时器 .build() .unwrap();spawn 策略
tokio::spawn vs spawn_blocking vs spawn_local
// 1. tokio::spawn:在 worker 线程上调度 async 任务let handle: JoinHandle<Result<Data>> = tokio::spawn(async move { fetch_data(url).await});
// 2. tokio::task::spawn_blocking:在独立线程池上调度同步(阻塞)代码let handle: JoinHandle<Result<Data>> = tokio::task::spawn_blocking(move || { blocking_c_library_call() // 不可以 .await});
// 3. tokio::task::spawn_local:在当前线程上调度(!Send 任务)let handle: JoinHandle<()> = tokio::task::spawn_local(async move { // 这个 Future 不需要 Send,只在本线程运行 let not_send = Rc::new(42); println!("{}", not_send);});何时用哪个?
tokio::spawn:
- 99% 的情况——标准的 async 任务调度
- Future 必须
Send + 'static - 可以在任意 worker 线程上运行
spawn_blocking:
- 调用同步阻塞 API(文件 IO、DNS 解析、C FFI、CPU 密集计算)
- 闭包不能
.await - 独立线程池,不会阻塞 worker 线程
spawn_local:
- Future 不是
Send(包含Rc、RefCell等) - 只能在
LocalSet中使用 - 常见于 GUI 应用、单线程事件循环
// spawn_local 的典型场景:GUI 应用use tokio::task::LocalSet;
#[tokio::main]async fn main() { let local = LocalSet::new();
local.spawn_local(async move { let not_send = Rc::new(0); // 这个 Rc 不会跨线程,安全 loop { do_gui_work(¬_send).await; } });
local.await; // 运行 LocalSet 中的所有任务}spawn 的常见陷阱
陷阱 1:JoinHandle 被丢弃 → 任务被取消
let handle = tokio::spawn(async { do_work().await;});// handle 被 drop → 任务被取消(不会等待完成)!如果你不想等结果但也不想让任务被取消,用 tokio::spawn 然后 “detach”(不保存 JoinHandle):
tokio::spawn(async { do_work().await;}); // JoinHandle 被 drop,但任务不会被取消(tokio 的设计)实际上在 Tokio 中,JoinHandle::drop 不会取消任务——任务会继续运行。但 JoinHandle 被丢弃后你无法再 join 它,也无法检测它是否 panic。所以最好显式处理:
let handle = tokio::spawn(async { do_work().await });
tokio::spawn(async move { if let Err(e) = handle.await { tracing::error!("task panicked: {:?}", e); }});陷阱 2:在 async fn 中调用 std::thread::spawn
// 反模式:绕过 Tokio 的调度async fn bad() { std::thread::spawn(|| { // 这个线程不在 Tokio 运行时中 // 不能使用 tokio::spawn、tokio::fs 等 });}Task Budgeting 与 Cooperative Yielding
这是 Tokio 1.x 引入的关键机制,也是最容易困惑的部分。
问题:一个任务霸占 worker 线程
// 没有协作式让出:循环可能永远不返回控制权async fn busy_loop() { let mut i = 0; loop { i += 1; // 没有 .await → 永远不 yield → 其他任务饿死 }}但即使是”有 .await”的场景也可能出问题:
async fn rapid_poll() { let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap(); loop { // 每次 accept 成功都立即进入下一轮 // 如果连接速率极高,其他任务可能饿死 let (stream, _) = listener.accept().await.unwrap(); handle(stream); }}Tokio 的解决方案:task budget
Tokio 给每个任务一个”预算”——每次 poll 消耗一个预算单位。预算用完时,即使 IO 就绪,poll 也返回 Pending,强制让出。
// Tokio 内部的 budget 逻辑(简化版)fn poll_accept(listener: &TcpListener, cx: &mut Context) -> Poll<io::Result<TcpStream>> { if budget_remaining() == 0 { // 预算用完,强制让出 cx.waker().wake_by_ref(); // 安排下次 poll return Poll::Pending; }
match listener.accept() { Ok(stream) => { consume_budget(); // 消耗一个预算 Poll::Ready(Ok(stream)) } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { register_waker(cx); Poll::Pending } Err(e) => Poll::Ready(Err(e)), }}默认预算:128 次 poll。一个任务连续 poll 128 次后必须让出。
踸踩的坑:budget 耗尽导致的”延迟”
async fn process_connections(listener: TcpListener) { loop { let (stream, _) = listener.accept().await.unwrap(); // 在极高速连接下,128 次 accept 后 budget 耗尽 // 下一次 poll 返回 Pending → 延迟一个 tick // 这可能导致 P99 延迟抖动 }}解决方案:在每个连接处理中 spawn 新任务,让 budget 自然重置:
async fn process_connections(listener: TcpListener) { loop { let (stream, _) = listener.accept().await.unwrap(); tokio::spawn(async move { handle_connection(stream).await; }); // spawn 后当前任务的 budget 压力减轻 }}tokio::select! 模式与 Cancel Safety
tokio::select! 是 Tokio 中最强大也最危险的宏——它同时等待多个分支,第一个就绪的分支被执行,其余被取消。
基本用法
use tokio::sync::mpsc;
async fn event_loop(mut rx: mpsc::Receiver<Event>, shutdown: tokio::sync::watch::Receiver<bool>) { loop { tokio::select! { Some(event) = rx.recv() => { handle_event(event).await; } _ = shutdown.changed() => { tracing::info!("shutdown signal received"); break; } else => { // 所有分支都返回 Pending/关闭 tracing::info!("all channels closed"); break; } } }}Cancel Safety:select! 最大的坑
当一个分支被取消时,该分支中的 async 操作可能处于部分完成状态。
// 反模式:cancel 不安全的操作async fn bad_select() { let buf = b"hello world";
tokio::select! { // 如果 timeout 先就绪,write 可能已经写了部分数据 // 但不知道写了多少!下次 write 从哪里开始? result = socket.write(buf) => { process(result); } _ = tokio::time::sleep(Duration::from_secs(5)) => { // timeout:write 被取消,可能已写部分数据 } }}什么是 cancel-safe?
一个操作是 cancel-safe 的,如果它被取消后,下次调用可以从正确的状态继续。
| 操作 | Cancel Safety | 原因 |
|---|---|---|
tokio::time::sleep | 安全 | 无副作用,重新 sleep 即可 |
mpsc::Receiver::recv | 安全 | 消息还在队列中 |
TcpStream::read | 安全 | 取消时保证无数据被读取 |
TcpStream::write | 不安全 | 可能已写部分数据 |
TcpStream::read_exact | 不安全 | 缓冲区可能被部分填充 |
broadcast::recv | 安全 | 消息不会丢失 |
解决 cancel safety 的模式
模式 1:优先使用 cancel-safe 的操作
TcpStream::read 和 mpsc::Receiver::recv 本身就是 cancel-safe 的,在 select! 中可以直接使用:
// read 是 cancel-safe 的:取消时保证无数据被读取tokio::select! { result = socket.read(&mut buf) => { // 安全:buf 未被部分修改 } _ = tokio::time::sleep(timeout) => { // 超时取消,无数据丢失 }}注意:read_exact 反而不是 cancel-safe 的——取消时缓冲区可能已被部分填充,且不知道已读多少字节。
模式 2:把不安全的操作隔离到独立任务中
// 用 oneshot 通信,而非直接 selectlet (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move { let data = socket.read(&mut buf).await; let _ = tx.send(data);});
tokio::select! { result = rx => { // 读取完成 } _ = tokio::time::sleep(timeout) => { // 超时,读取任务继续运行(不被取消) // 需要清理:abort 读取任务 }}模式 3:用 tokio::select! 的 biased 选项控制优先级
tokio::select! { biased; // 按书写顺序优先检查
_ = shutdown.changed() => { // 优先检查 shutdown break; }
Some(event) = rx.recv() => { handle_event(event).await; }}biased 改变 select 的调度策略:默认是随机选择就绪分支,biased 则按书写顺序从上到下检查。这在 shutdown 信号需要优先处理时很有用。
select! 中的借用规则
// 错误:两个分支同时借用 conn// tokio::select! {// _ = conn.read(&mut buf1) => {}// _ = conn.write(&mut buf2) => {} // 编译错误:conn 已被借用// }
// 正确:用 &mut 分开// 或者用 split 把 TcpStream 分成 read half 和 write halflet (read_half, mut write_half) = conn.split();
tokio::select! { result = read_half.read(&mut buf) => {} _ = write_half.write_all(response) => {}}tokio::join! vs try_join!
join!:等待所有,忽略错误
let (result1, result2, result3) = tokio::join!( task1(), task2(), task3(),);// 三个任务都完成,即使有的 Err// result1: Result<A, E1>, result2: Result<B, E2>, ...join! 的特点:
- 并发执行所有 future
- 全部等待完成
- 不会提前返回——即使某个任务失败,其他任务继续运行
- 返回所有结果的元组
try_join!:任一失败则全部取消
match tokio::try_join!(task1(), task2(), task3()) { Ok((r1, r2, r3)) => { // 全部成功 } Err(e) => { // 任一失败,其他被取消 }}try_join! 的特点:
- 所有 Future 必须返回
Result - 任一返回
Err,立即取消其余并返回该Err - 适用于”全部成功才算成功”的场景(如分布式事务)
实战选型
// 场景 1:服务启动——所有组件都要成功async fn start_server() -> Result<()> { let (db, cache, mq) = tokio::try_join!( connect_db(), connect_cache(), connect_mq(), )?; // 任何一个连接失败都不启动 Ok(())}
// 场景 2:服务关闭——所有组件都要关闭,但失败只记录日志async fn shutdown(db: Db, cache: Cache, mq: Mq) { let (db_result, cache_result, mq_result) = tokio::join!( db.close(), cache.close(), mq.close(), ); if let Err(e) = db_result { tracing::error!("db close failed: {}", e); } // 即使 db 关闭失败,cache 和 mq 也要尝试关闭}
// 场景 3:竞速——任一完成即可async fn fetch_with_fallback(primary: &str, fallback: &str) -> Data { tokio::select! { data = fetch(primary) => data, data = fetch(fallback) => data, }}运行时生命周期管理
#[tokio::main] 的便利与限制
#[tokio::main]async fn main() { // 自动创建 multi-thread 运行时 // 等待所有 spawned 任务完成后退出}#[tokio::main] 展开后等价于:
fn main() { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() .block_on(async { // 你的 async main })}手动管理运行时的场景
场景 1:需要自定义配置
fn main() { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(4) .max_blocking_threads(256) .enable_io() .enable_time() .build() .expect("failed to create runtime");
rt.block_on(async { app().await; });}场景 2:在非 async 代码中使用 Tokio
fn main() { // 先创建运行时 let rt = tokio::runtime::Runtime::new().unwrap();
// 同步初始化阶段 let config = load_config();
// 在同步代码中 spawn async 任务 let handle = rt.spawn(async { async_initialization().await });
// 阻塞等待 let result = rt.block_on(handle).unwrap();
// 进入 async 主循环 rt.block_on(async { serve(result).await; });}场景 3:多运行时(高级,通常不推荐)
fn main() { // IO 运行时 let io_rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(4) .enable_io() .build() .unwrap();
// 计算运行时 let compute_rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(8) .build() .unwrap();
// 在 IO 运行时中运行主循环 io_rt.block_on(async { // IO 任务在本运行时 let data = fetch_data().await;
// 计算任务在计算运行时 let result = compute_rt.spawn(async move { heavy_computation(data).await }).await.unwrap();
write_result(result).await; });}警告:多运行时容易导致死锁——如果运行时 A 的任务等待运行时 B 的任务完成,而 B 又等待 A。除非你有明确的隔离理由,否则用单运行时。
优雅关闭
use tokio::signal;use tokio::sync::watch;
async fn serve_with_graceful_shutdown() { let (shutdown_tx, shutdown_rx) = watch::channel(false);
// 监听 SIGTERM / Ctrl+C tokio::spawn(async move { signal::ctrl_c().await.ok(); tracing::info!("shutdown signal received"); let _ = shutdown_tx.send(true); });
// 启动服务器,传入 shutdown 信号 let server = hyper::Server::bind(&addr) .serve(make_service) .with_graceful_shutdown(async { shutdown_rx.changed().await.ok(); });
if let Err(e) = server.await { tracing::error!("server error: {}", e); }
// 服务器停止后,等待 spawned 任务完成 tracing::info!("server stopped, waiting for tasks to finish..."); tokio::time::sleep(Duration::from_secs(5)).await;}运行时句柄(Handle)的传递
use tokio::runtime::Handle;
fn sync_function_that_needs_async(handle: Handle) { // 在同步代码中 spawn async 任务 handle.spawn(async { async_work().await; });
// 或阻塞等待 async 操作 let result = handle.block_on(async_work());}Handle 是对运行时的轻量引用(只是一个 Arc),可以 clone 和跨线程传递。常用场景:
- 从同步库中 spawn async 任务
- 在
spawn_blocking闭包中回到 async 世界
let handle = tokio::runtime::Handle::current();
tokio::task::spawn_blocking(move || { let result = blocking_c_call();
// 回到 async 世界处理结果 handle.spawn(async move { process_async(result).await; });}).await?;实战经验总结
- IO 密集型 4-8 个 worker 足够:不要配
num_cpus,那是 CPU 密集型的策略 - spawn_blocking 处理一切阻塞操作:文件 IO、DNS、C FFI——worker 线程是宝贵资源
- select! 的 cancel safety 是必修课:
write/read_exact不 cancel-safe,read/recv是 - join! vs try_join! vs select!:全等待 / 全成功才算 / 任一即可,三个不同语义
- budget 是隐式保护:但极高频场景下可能导致延迟抖动,用 spawn 分散
- 单运行时优于多运行时:除非有明确的隔离需求
- 优雅关闭是生产必需品:signal 监听 + grace period + drain
支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!
TinyZ's Blog