Rust 2026 经验谈 - 异步流与迭代
异步编程中,我们不仅需要”等一个值”(Future),还需要”持续接收一组值”——这就是 Stream。Stream 是异步世界的 Iterator,但它的设计、使用和背压控制比 Iterator 复杂得多。本文从 Stream trait 本身出发,覆盖 async generator、async-stream crate、背压模式,以及 Stream 与 Iterator 的本质差异。
Stream trait:异步迭代的基础
定义与核心方法
Stream 定义在 futures-core 中(而非标准库),核心方法只有一个:
// futures-core 中的 Stream traitpub trait Stream { type Item;
fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>;}与 Iterator::next() 返回 Option<Item> 不同,Stream::poll_next 返回 Poll<Option<Item>>:
Poll::Pending:值尚未就绪,需要等待唤醒Poll::Ready(Some(item)):产出一个值Poll::Ready(None):流结束
futures-util 中的实用 Stream
futures-util 提供了丰富的 Stream 适配器,用法类似 Iterator:
use futures::stream::{self, StreamExt};use tokio;
#[tokio::main]async fn main() { // 从迭代器创建 Stream let mut s = stream::iter(vec![1, 2, 3, 4, 5]) .map(|x| x * 2) .filter(|x| futures::future::ready(x % 4 == 0));
while let Some(item) = s.next().await { println!("{item}"); }}常用适配器一览:
| 适配器 | 作用 | 背压行为 |
|---|---|---|
.map() | 转换每个元素 | 透传 |
.filter() | 过滤元素 | 透传 |
.buffered(n) | 并发执行 n 个 Future | 有限并发 |
.throttle(duration) | 限速输出 | 延迟丢弃 |
.chunks(n) | 批量聚合 | 透传 |
.take(n) | 取前 n 个 | 提前终止 |
踩坑提醒:StreamExt::collect() 会把整个 Stream 收集到 Vec,如果 Stream 是无限的,这会永远等待。务必确认 Stream 有限再 collect。
async fn 返回 Stream 的模式
问题:async fn 不能直接返回 Stream
Rust 目前不支持 async fn 返回 impl Stream,因为 async fn 只能返回 impl Future<Output = T>。要返回 Stream,需要手动实现或借助 crate。
模式一:手动实现 Stream
use futures::stream::Stream;use std::pin::Pin;use std::task::{Context, Poll};
struct CounterStream { count: u32, limit: u32,}
impl Stream for CounterStream { type Item = u32;
fn poll_next( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { if self.count < self.limit { let val = self.count; self.count += 1; Poll::Ready(Some(val)) } else { Poll::Ready(None) } }}手动实现的代价:你需要正确处理 Pin、Context、Poll 三件套。对于复杂逻辑(涉及 I/O、定时器),手写 poll_next 既繁琐又容易出错。
模式二:async-stream crate(推荐)
async-stream 允许你用 async 语法写 Stream 生成逻辑:
use async_stream::stream;use futures::stream::StreamExt;use tokio::time::{sleep, Duration};
fn countdown(from: u32) -> impl futures::Stream<Item = u32> { stream! { for i in (0..from).rev() { sleep(Duration::from_millis(100)).await; yield i; } }}
#[tokio::main]async fn main() { let mut s = countdown(5); while let Some(val) = s.next().await { println!("倒计时: {val}"); }}async-stream 的 stream! 宏通过内部生成一个 async fn + 手动状态机来实现,性能与手写相当。它的 try_stream! 变体支持 ? 错误传播:
use async_stream::try_stream;use futures::stream::StreamExt;
fn lines_from_file(path: &str) -> impl futures::Stream<Item = Result<String, std::io::Error>> { let path = path.to_owned(); try_stream! { let content = tokio::fs::read_to_string(&path).await?; for line in content.lines() { yield line.to_owned(); } }}模式三:channel 转 Stream
当数据源是外部推送(如 WebSocket、消息队列)时,channel 转 Stream 是最实用的模式:
use tokio::sync::mpsc;use tokio_stream::wrappers::ReceiverStream;use futures::stream::StreamExt;
async fn websocket_messages() -> impl futures::Stream<Item = String> { let (tx, rx) = mpsc::channel(128);
// 模拟外部推送 tokio::spawn(async move { for i in 0..10 { if tx.send(format!("msg-{i}")).await.is_err() { break; } } });
// ReceiverStream 包装 mpsc::Receiver 以实现 Stream ReceiverStream::new(rx)}Tokio 的 mpsc::Receiver 需要通过 tokio-stream crate 的 ReceiverStream 包装后才能实现 Stream。这种模式天然支持背压:channel 满时,发送方 send().await 会等待。
async generator:gen blocks(Nightly 状态)
Rust 社区一直在推进 gen blocks 语法,让生成器的写法更接近 Python 的 yield:
// nightly-only,尚未稳定,仅作演示#![feature(gen_blocks)]
fn counter() -> impl Iterator<Item = u32> { gen { let mut i = 0; loop { yield i; i += 1; } }}异步版本(async gen blocks 同样仅限 nightly):
// nightly-only,极度实验性,仅作演示#![feature(gen_blocks)]
fn async_counter() -> impl futures::Stream<Item = u32> { async gen { let mut i = 0; loop { tokio::time::sleep(std::time::Duration::from_secs(1)).await; yield i; i += 1; } }}截至 Rust 1.96,gen blocks 和 async gen blocks 仍在 nightly 且 API 可能变更。生产环境请使用 async-stream crate,不要依赖 nightly 的 gen blocks。
gen blocks 的核心价值:让编译器自动生成状态机,消除手写 poll_next 的样板代码。一旦稳定,它将取代 async-stream crate 成为首选方案。
背压控制模式
背压(backpressure)是 Stream 体系的核心问题:如果消费者处理速度慢于生产者,数据会无限堆积。Rust 的异步生态提供了多种背压控制手段。
模式一:有限并发(buffered / buffer_unordered)
use futures::stream::{self, StreamExt};
async fn fetch_url(url: &str) -> String { // 模拟 HTTP 请求 tokio::time::sleep(std::time::Duration::from_millis(100)).await; format!("response from {url}")}
async fn bounded_concurrency() { let urls = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
// 最多 3 个并发请求 let results: Vec<String> = stream::iter(urls) .map(|url| fetch_url(url)) .buffered(3) // 关键:限制并发度 .collect() .await;
println!("获取到 {} 个结果", results.len());}buffered(n) 保证同时最多 n 个 Future 在执行。更精细的选择:
buffered(n):保持顺序buffer_unordered(n):不保持顺序,稍快
模式二:Channel 背压
use tokio::sync::mpsc;
async fn producer(tx: mpsc::Sender<u32>) { for i in 0..1000 { // channel 满时自动等待 —— 天然背压 if tx.send(i).await.is_err() { break; // 消费者已关闭 } }}
async fn consumer(mut rx: mpsc::Receiver<u32>) { while let Some(val) = rx.recv().await { // 慢消费:模拟处理耗时 tokio::time::sleep(std::time::Duration::from_millis(50)).await; println!("处理: {val}"); }}
#[tokio::main]async fn main() { // channel 容量 16:生产者最多领先消费者 16 条消息 let (tx, rx) = mpsc::channel(16);
let p = tokio::spawn(producer(tx)); let c = tokio::spawn(consumer(rx));
let _ = p.await; let _ = c.await;}Channel 容量选择经验:
- 容量 1:最严格背压,几乎同步
- 容量 16~128:通用推荐,平衡吞吐与延迟
- 容量 1024+:高吞吐场景,但需注意内存开销
模式三:组合——Stream + Semaphore
更灵活的背压控制,适合不同优先级的任务:
use tokio::sync::Semaphore;use futures::stream::{self, StreamExt};
async fn rate_limited_processing() { let semaphore = std::sync::Arc::new(Semaphore::new(5)); let items: Vec<u32> = (0..20).collect();
let handles: Vec<_> = stream::iter(items) .map(|item| { let sem = semaphore.clone(); async move { let _permit = sem.acquire().await.unwrap(); // 最多 5 个并发 tokio::time::sleep(std::time::Duration::from_millis(100)).await; item * 2 } }) .buffer_unordered(20) // buffer 允许 20 个 task 在跑 .collect() .await;
println!("结果: {handles:?}");}这里 Semaphore 控制真正的并发度,buffer_unordered 控制缓冲区大小。两层分离,各司其职。
Stream 与 Iterator 的对比
本质差异
| 维度 | Iterator | Stream |
|---|---|---|
| 同步/异步 | 同步 | 异步 |
| 阻塞行为 | next() 立即返回 | next().await 可能等待 |
| 背压 | 天然存在(拉取模型) | 需要显式控制 |
| 组合子 | 丰富且稳定 | 丰富但在 futures-util |
| 标准库 | std::iter | 不在标准库 |
| 零开销 | 是 | 有调度开销 |
| 可组合性 | 同步组合 | 需要 async 组合 |
为什么 Stream 不在标准库?
这是一个历史和政治问题。Stream 曾计划进入标准库(std::stream),但因为:
- 异步运行时碎片化(tokio vs async-std vs smol)
Stream的 GAT 关联类型曾依赖 unstable 特性- 社区对 Stream 应该支持
async fn next()还是poll_next有分歧
目前 Stream 在 futures-core 中定义,所有运行时都依赖它。2026 年仍有提议将 Stream 稳定化进入标准库,但短期内不太可能落地。
从 Iterator 到 Stream 的转换
use futures::stream::{self, StreamExt};
// Iterator -> Stream(零开销)let s1 = stream::iter(0..10);
// Stream -> Iterator(需要阻塞)// 不能直接转换!Stream 的 next() 是异步的// 必须在运行时中 block_on:use tokio::runtime::Runtime;let rt = Runtime::new().unwrap();let iter = rt.block_on(async { let mut s = stream::iter(0..10); let mut vec = vec![]; while let Some(v) = s.next().await { vec.push(v); } vec});关键经验:不要试图在异步代码中混用 Iterator 和 Stream。选定一个模型后贯穿始终。如果你的数据源是同步的(如 Vec、HashMap),用 Iterator;如果涉及 I/O、网络、定时器,用 Stream。
实战经验总结
1. 优先用 async-stream,别手写 poll_next
手写 poll_next 的正确率在团队中极低。async-stream 的 stream! 宏在 99% 的场景下够用,性能损失可忽略。只有在极度性能敏感的路径(如每秒百万级事件流)才考虑手写。
2. 背压不是可选项
在微服务架构中,一个没有背压控制的 Stream 就是内存泄漏的定时炸弹。永远为生产者-消费者链路设置背压——channel 容量、buffered(n)、Semaphore,至少选一个。
3. collect() 前确认有限性
// 危险:无限 Stream + collect = 永远等待let all: Vec<_> = infinite_stream().collect().await;
// 安全:用 take 限制let first_100: Vec<_> = infinite_stream().take(100).collect().await;4. Stream 的生命周期比 Future 更复杂
Future 的生命周期通常较短(等一个值),Stream 的生命周期可能很长(持续运行)。注意 Stream 中的借用不能跨 .await 点——这是异步借用检查器(borrow checker)的限制,与 Future 一致。
5. pin_project 处理自引用 Stream
如果你手写 Stream 且内部有自引用结构,使用 pin_project_lite 或 pin-project crate 来安全地实现 Unpin:
use pin_project_lite::pin_project;use std::pin::Pin;use std::task::{Context, Poll};use futures::stream::Stream;
pin_project! { struct MyStream { data: Vec<u32>, #[pin] inner: SomeOtherStream, }}支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!
TinyZ's Blog