Rust 2026 经验谈 - Tokio 2026 实战

3069 字
15 分钟
Rust 2026 经验谈 - Tokio 2026 实战

Tokio 是 Rust 生态中事实标准的异步运行时,但”用 Tokio”和”用好 Tokio”之间隔着运行时配置、spawn 策略、cooperative yielding、cancel safety 等一堆实战知识。本文聚焦 Tokio 1.x 在 2026 年的最佳实践,帮你避开生产中反复出现的那些坑。

Tokio 运行时配置#

multi-thread vs current-thread#

use tokio::runtime::Runtime;
// multi-thread:多线程调度器(生产默认选择)
let rt = Runtime::new().unwrap();
// current-thread:单线程调度器(测试、轻量工具)
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
维度multi-threadcurrent-thread
并行度N 个 worker 线程1 个线程
任务调度work-stealingFIFO
适用场景服务端、高并发测试、CLI、嵌入式
启动开销较大(N 个线程)极小

选型决策树#

需要真正的并行(CPU 多核利用)?
├── 是 → multi-thread
└── 否 → 是测试吗?
├── 是 → current-thread(更快的启动,更易调试)
└── 否 → 是单线程嵌入式?
├── 是 → current-thread
└── 否 → multi-thread(大多数情况的安全选择)

worker_threads 配置#

let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // 4 个 worker 线程
.max_blocking_threads(32) // 阻塞线程池上限
.enable_all()
.build()
.unwrap();

worker_threads 的选择:

  • IO 密集型:不需要太多 worker 线程——它们大部分时间在等 IO。4-8 个通常足够,即使你用 64 核机器
  • 混合型:如果 async 任务中有 CPU 计算部分,增加 worker 线程。但更好的做法是用 spawn_blocking
  • 默认值num_cpus——对大多数场景合理,但 IO 密集型时可能过多

踩坑:过度配置 worker_threads

// 反模式:IO 密集型配 64 个 worker
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(64) // 每个线程 ~8MB 栈 = 512MB 内存!
.build()
.unwrap();
// 正确:IO 密集型 4-8 个 worker 足够
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.build()
.unwrap();

运行时配置的其他参数#

let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(512) // spawn_blocking 线程池上限
.thread_stack_size(2 * 1024 * 1024) // 自定义栈大小(默认 2MB)
.thread_name("my-worker") // 线程名(方便调试)
.thread_name_fn(|| format!("worker-{}", THREAD_COUNTER.fetch_add(1, Ordering::Relaxed)))
.on_thread_start(|| log::info!("worker started"))
.on_thread_stop(|| log::info!("worker stopped"))
.enable_io() // 启用 IO(epoll/kqueue/IOCP)
.enable_time() // 启用定时器
.build()
.unwrap();

spawn 策略#

tokio::spawn vs spawn_blocking vs spawn_local#

// 1. tokio::spawn:在 worker 线程上调度 async 任务
let handle: JoinHandle<Result<Data>> = tokio::spawn(async move {
fetch_data(url).await
});
// 2. tokio::task::spawn_blocking:在独立线程池上调度同步(阻塞)代码
let handle: JoinHandle<Result<Data>> = tokio::task::spawn_blocking(move || {
blocking_c_library_call() // 不可以 .await
});
// 3. tokio::task::spawn_local:在当前线程上调度(!Send 任务)
let handle: JoinHandle<()> = tokio::task::spawn_local(async move {
// 这个 Future 不需要 Send,只在本线程运行
let not_send = Rc::new(42);
println!("{}", not_send);
});

何时用哪个?#

tokio::spawn

  • 99% 的情况——标准的 async 任务调度
  • Future 必须 Send + 'static
  • 可以在任意 worker 线程上运行

spawn_blocking

  • 调用同步阻塞 API(文件 IO、DNS 解析、C FFI、CPU 密集计算)
  • 闭包不能 .await
  • 独立线程池,不会阻塞 worker 线程

spawn_local

  • Future 不是 Send(包含 RcRefCell 等)
  • 只能在 LocalSet 中使用
  • 常见于 GUI 应用、单线程事件循环
// spawn_local 的典型场景:GUI 应用
use tokio::task::LocalSet;
#[tokio::main]
async fn main() {
let local = LocalSet::new();
local.spawn_local(async move {
let not_send = Rc::new(0);
// 这个 Rc 不会跨线程,安全
loop {
do_gui_work(&not_send).await;
}
});
local.await; // 运行 LocalSet 中的所有任务
}

spawn 的常见陷阱#

陷阱 1:JoinHandle 被丢弃 → 任务被取消

let handle = tokio::spawn(async {
do_work().await;
});
// handle 被 drop → 任务被取消(不会等待完成)!

如果你不想等结果但也不想让任务被取消,用 tokio::spawn 然后 “detach”(不保存 JoinHandle):

tokio::spawn(async {
do_work().await;
}); // JoinHandle 被 drop,但任务不会被取消(tokio 的设计)

实际上在 Tokio 中,JoinHandle::drop 不会取消任务——任务会继续运行。但 JoinHandle 被丢弃后你无法再 join 它,也无法检测它是否 panic。所以最好显式处理:

let handle = tokio::spawn(async { do_work().await });
tokio::spawn(async move {
if let Err(e) = handle.await {
tracing::error!("task panicked: {:?}", e);
}
});

陷阱 2:在 async fn 中调用 std::thread::spawn

// 反模式:绕过 Tokio 的调度
async fn bad() {
std::thread::spawn(|| {
// 这个线程不在 Tokio 运行时中
// 不能使用 tokio::spawn、tokio::fs 等
});
}

Task Budgeting 与 Cooperative Yielding#

这是 Tokio 1.x 引入的关键机制,也是最容易困惑的部分。

问题:一个任务霸占 worker 线程#

// 没有协作式让出:循环可能永远不返回控制权
async fn busy_loop() {
let mut i = 0;
loop {
i += 1;
// 没有 .await → 永远不 yield → 其他任务饿死
}
}

但即使是”有 .await”的场景也可能出问题:

async fn rapid_poll() {
let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
loop {
// 每次 accept 成功都立即进入下一轮
// 如果连接速率极高,其他任务可能饿死
let (stream, _) = listener.accept().await.unwrap();
handle(stream);
}
}

Tokio 的解决方案:task budget#

Tokio 给每个任务一个”预算”——每次 poll 消耗一个预算单位。预算用完时,即使 IO 就绪,poll 也返回 Pending,强制让出。

// Tokio 内部的 budget 逻辑(简化版)
fn poll_accept(listener: &TcpListener, cx: &mut Context) -> Poll<io::Result<TcpStream>> {
if budget_remaining() == 0 {
// 预算用完,强制让出
cx.waker().wake_by_ref(); // 安排下次 poll
return Poll::Pending;
}
match listener.accept() {
Ok(stream) => {
consume_budget(); // 消耗一个预算
Poll::Ready(Ok(stream))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
register_waker(cx);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}

默认预算:128 次 poll。一个任务连续 poll 128 次后必须让出。

踸踩的坑:budget 耗尽导致的”延迟”#

async fn process_connections(listener: TcpListener) {
loop {
let (stream, _) = listener.accept().await.unwrap();
// 在极高速连接下,128 次 accept 后 budget 耗尽
// 下一次 poll 返回 Pending → 延迟一个 tick
// 这可能导致 P99 延迟抖动
}
}

解决方案:在每个连接处理中 spawn 新任务,让 budget 自然重置:

async fn process_connections(listener: TcpListener) {
loop {
let (stream, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
handle_connection(stream).await;
});
// spawn 后当前任务的 budget 压力减轻
}
}

tokio::select! 模式与 Cancel Safety#

tokio::select! 是 Tokio 中最强大也最危险的宏——它同时等待多个分支,第一个就绪的分支被执行,其余被取消。

基本用法#

use tokio::sync::mpsc;
async fn event_loop(mut rx: mpsc::Receiver<Event>, shutdown: tokio::sync::watch::Receiver<bool>) {
loop {
tokio::select! {
Some(event) = rx.recv() => {
handle_event(event).await;
}
_ = shutdown.changed() => {
tracing::info!("shutdown signal received");
break;
}
else => {
// 所有分支都返回 Pending/关闭
tracing::info!("all channels closed");
break;
}
}
}
}

Cancel Safety:select! 最大的坑#

当一个分支被取消时,该分支中的 async 操作可能处于部分完成状态。

// 反模式:cancel 不安全的操作
async fn bad_select() {
let buf = b"hello world";
tokio::select! {
// 如果 timeout 先就绪,write 可能已经写了部分数据
// 但不知道写了多少!下次 write 从哪里开始?
result = socket.write(buf) => {
process(result);
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
// timeout:write 被取消,可能已写部分数据
}
}
}

什么是 cancel-safe?

一个操作是 cancel-safe 的,如果它被取消后,下次调用可以从正确的状态继续。

操作Cancel Safety原因
tokio::time::sleep安全无副作用,重新 sleep 即可
mpsc::Receiver::recv安全消息还在队列中
TcpStream::read安全取消时保证无数据被读取
TcpStream::write不安全可能已写部分数据
TcpStream::read_exact不安全缓冲区可能被部分填充
broadcast::recv安全消息不会丢失

解决 cancel safety 的模式#

模式 1:优先使用 cancel-safe 的操作

TcpStream::readmpsc::Receiver::recv 本身就是 cancel-safe 的,在 select! 中可以直接使用:

// read 是 cancel-safe 的:取消时保证无数据被读取
tokio::select! {
result = socket.read(&mut buf) => {
// 安全:buf 未被部分修改
}
_ = tokio::time::sleep(timeout) => {
// 超时取消,无数据丢失
}
}

注意:read_exact 反而不是 cancel-safe 的——取消时缓冲区可能已被部分填充,且不知道已读多少字节。

模式 2:把不安全的操作隔离到独立任务中

// 用 oneshot 通信,而非直接 select
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let data = socket.read(&mut buf).await;
let _ = tx.send(data);
});
tokio::select! {
result = rx => {
// 读取完成
}
_ = tokio::time::sleep(timeout) => {
// 超时,读取任务继续运行(不被取消)
// 需要清理:abort 读取任务
}
}

模式 3:用 tokio::select! 的 biased 选项控制优先级

tokio::select! {
biased; // 按书写顺序优先检查
_ = shutdown.changed() => {
// 优先检查 shutdown
break;
}
Some(event) = rx.recv() => {
handle_event(event).await;
}
}

biased 改变 select 的调度策略:默认是随机选择就绪分支,biased 则按书写顺序从上到下检查。这在 shutdown 信号需要优先处理时很有用。

select! 中的借用规则#

// 错误:两个分支同时借用 conn
// tokio::select! {
// _ = conn.read(&mut buf1) => {}
// _ = conn.write(&mut buf2) => {} // 编译错误:conn 已被借用
// }
// 正确:用 &mut 分开
// 或者用 split 把 TcpStream 分成 read half 和 write half
let (read_half, mut write_half) = conn.split();
tokio::select! {
result = read_half.read(&mut buf) => {}
_ = write_half.write_all(response) => {}
}

tokio::join! vs try_join!#

join!:等待所有,忽略错误#

let (result1, result2, result3) = tokio::join!(
task1(),
task2(),
task3(),
);
// 三个任务都完成,即使有的 Err
// result1: Result<A, E1>, result2: Result<B, E2>, ...

join! 的特点:

  • 并发执行所有 future
  • 全部等待完成
  • 不会提前返回——即使某个任务失败,其他任务继续运行
  • 返回所有结果的元组

try_join!:任一失败则全部取消#

match tokio::try_join!(task1(), task2(), task3()) {
Ok((r1, r2, r3)) => {
// 全部成功
}
Err(e) => {
// 任一失败,其他被取消
}
}

try_join! 的特点:

  • 所有 Future 必须返回 Result
  • 任一返回 Err,立即取消其余并返回该 Err
  • 适用于”全部成功才算成功”的场景(如分布式事务)

实战选型#

// 场景 1:服务启动——所有组件都要成功
async fn start_server() -> Result<()> {
let (db, cache, mq) = tokio::try_join!(
connect_db(),
connect_cache(),
connect_mq(),
)?;
// 任何一个连接失败都不启动
Ok(())
}
// 场景 2:服务关闭——所有组件都要关闭,但失败只记录日志
async fn shutdown(db: Db, cache: Cache, mq: Mq) {
let (db_result, cache_result, mq_result) = tokio::join!(
db.close(),
cache.close(),
mq.close(),
);
if let Err(e) = db_result {
tracing::error!("db close failed: {}", e);
}
// 即使 db 关闭失败,cache 和 mq 也要尝试关闭
}
// 场景 3:竞速——任一完成即可
async fn fetch_with_fallback(primary: &str, fallback: &str) -> Data {
tokio::select! {
data = fetch(primary) => data,
data = fetch(fallback) => data,
}
}

运行时生命周期管理#

#[tokio::main] 的便利与限制#

#[tokio::main]
async fn main() {
// 自动创建 multi-thread 运行时
// 等待所有 spawned 任务完成后退出
}

#[tokio::main] 展开后等价于:

fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
// 你的 async main
})
}

手动管理运行时的场景#

场景 1:需要自定义配置

fn main() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.max_blocking_threads(256)
.enable_io()
.enable_time()
.build()
.expect("failed to create runtime");
rt.block_on(async {
app().await;
});
}

场景 2:在非 async 代码中使用 Tokio

fn main() {
// 先创建运行时
let rt = tokio::runtime::Runtime::new().unwrap();
// 同步初始化阶段
let config = load_config();
// 在同步代码中 spawn async 任务
let handle = rt.spawn(async {
async_initialization().await
});
// 阻塞等待
let result = rt.block_on(handle).unwrap();
// 进入 async 主循环
rt.block_on(async {
serve(result).await;
});
}

场景 3:多运行时(高级,通常不推荐)

fn main() {
// IO 运行时
let io_rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_io()
.build()
.unwrap();
// 计算运行时
let compute_rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(8)
.build()
.unwrap();
// 在 IO 运行时中运行主循环
io_rt.block_on(async {
// IO 任务在本运行时
let data = fetch_data().await;
// 计算任务在计算运行时
let result = compute_rt.spawn(async move {
heavy_computation(data).await
}).await.unwrap();
write_result(result).await;
});
}

警告:多运行时容易导致死锁——如果运行时 A 的任务等待运行时 B 的任务完成,而 B 又等待 A。除非你有明确的隔离理由,否则用单运行时。

优雅关闭#

use tokio::signal;
use tokio::sync::watch;
async fn serve_with_graceful_shutdown() {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// 监听 SIGTERM / Ctrl+C
tokio::spawn(async move {
signal::ctrl_c().await.ok();
tracing::info!("shutdown signal received");
let _ = shutdown_tx.send(true);
});
// 启动服务器,传入 shutdown 信号
let server = hyper::Server::bind(&addr)
.serve(make_service)
.with_graceful_shutdown(async {
shutdown_rx.changed().await.ok();
});
if let Err(e) = server.await {
tracing::error!("server error: {}", e);
}
// 服务器停止后,等待 spawned 任务完成
tracing::info!("server stopped, waiting for tasks to finish...");
tokio::time::sleep(Duration::from_secs(5)).await;
}

运行时句柄(Handle)的传递#

use tokio::runtime::Handle;
fn sync_function_that_needs_async(handle: Handle) {
// 在同步代码中 spawn async 任务
handle.spawn(async {
async_work().await;
});
// 或阻塞等待 async 操作
let result = handle.block_on(async_work());
}

Handle 是对运行时的轻量引用(只是一个 Arc),可以 clone 和跨线程传递。常用场景:

  • 从同步库中 spawn async 任务
  • spawn_blocking 闭包中回到 async 世界
let handle = tokio::runtime::Handle::current();
tokio::task::spawn_blocking(move || {
let result = blocking_c_call();
// 回到 async 世界处理结果
handle.spawn(async move {
process_async(result).await;
});
}).await?;

实战经验总结#

  1. IO 密集型 4-8 个 worker 足够:不要配 num_cpus,那是 CPU 密集型的策略
  2. spawn_blocking 处理一切阻塞操作:文件 IO、DNS、C FFI——worker 线程是宝贵资源
  3. select! 的 cancel safety 是必修课write/read_exact 不 cancel-safe,read/recv
  4. join! vs try_join! vs select!:全等待 / 全成功才算 / 任一即可,三个不同语义
  5. budget 是隐式保护:但极高频场景下可能导致延迟抖动,用 spawn 分散
  6. 单运行时优于多运行时:除非有明确的隔离需求
  7. 优雅关闭是生产必需品:signal 监听 + grace period + drain

支持与分享

如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!

赞助
Rust 2026 经验谈 - Tokio 2026 实战
https://tinyzzh.github.io/posts/rust-2026/2026-06-15-rust_2026_015_tokio_2026/
作者
TinyZ Zzh
发布于
2026-06-15
许可协议
CC BY-NC-SA 4.0

评论区

Profile Image of the Author
TinyZ Zzh
专注于高并发服务器、网络游戏相关(Java、PHP、Unity3D、Unreal Engine等)技术,热爱游戏事业, 正在努力实现自我价值当中。
公告
欢迎来到我的博客!这是一则示例公告。
音乐
封面

音乐

暂未播放

0:00 0:00
暂无歌词
分类
标签
站点统计
文章
229
分类
38
标签
250
总字数
379,217
运行时长
0
最后活动
0 天前

文章目录