Rust 2026 经验谈 - 异步流与迭代

2069 字
10 分钟
Rust 2026 经验谈 - 异步流与迭代

异步编程中,我们不仅需要”等一个值”(Future),还需要”持续接收一组值”——这就是 Stream。Stream 是异步世界的 Iterator,但它的设计、使用和背压控制比 Iterator 复杂得多。本文从 Stream trait 本身出发,覆盖 async generator、async-stream crate、背压模式,以及 Stream 与 Iterator 的本质差异。

Stream trait:异步迭代的基础#

定义与核心方法#

Stream 定义在 futures-core 中(而非标准库),核心方法只有一个:

// futures-core 中的 Stream trait
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}

Iterator::next() 返回 Option<Item> 不同,Stream::poll_next 返回 Poll<Option<Item>>

  • Poll::Pending:值尚未就绪,需要等待唤醒
  • Poll::Ready(Some(item)):产出一个值
  • Poll::Ready(None):流结束

futures-util 中的实用 Stream#

futures-util 提供了丰富的 Stream 适配器,用法类似 Iterator:

use futures::stream::{self, StreamExt};
use tokio;
#[tokio::main]
async fn main() {
// 从迭代器创建 Stream
let mut s = stream::iter(vec![1, 2, 3, 4, 5])
.map(|x| x * 2)
.filter(|x| futures::future::ready(x % 4 == 0));
while let Some(item) = s.next().await {
println!("{item}");
}
}

常用适配器一览:

适配器作用背压行为
.map()转换每个元素透传
.filter()过滤元素透传
.buffered(n)并发执行 n 个 Future有限并发
.throttle(duration)限速输出延迟丢弃
.chunks(n)批量聚合透传
.take(n)取前 n 个提前终止

踩坑提醒:StreamExt::collect() 会把整个 Stream 收集到 Vec,如果 Stream 是无限的,这会永远等待。务必确认 Stream 有限再 collect。

async fn 返回 Stream 的模式#

问题:async fn 不能直接返回 Stream#

Rust 目前不支持 async fn 返回 impl Stream,因为 async fn 只能返回 impl Future<Output = T>。要返回 Stream,需要手动实现或借助 crate。

模式一:手动实现 Stream#

use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
struct CounterStream {
count: u32,
limit: u32,
}
impl Stream for CounterStream {
type Item = u32;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.count < self.limit {
let val = self.count;
self.count += 1;
Poll::Ready(Some(val))
} else {
Poll::Ready(None)
}
}
}

手动实现的代价:你需要正确处理 PinContextPoll 三件套。对于复杂逻辑(涉及 I/O、定时器),手写 poll_next 既繁琐又容易出错。

模式二:async-stream crate(推荐)#

async-stream 允许你用 async 语法写 Stream 生成逻辑:

use async_stream::stream;
use futures::stream::StreamExt;
use tokio::time::{sleep, Duration};
fn countdown(from: u32) -> impl futures::Stream<Item = u32> {
stream! {
for i in (0..from).rev() {
sleep(Duration::from_millis(100)).await;
yield i;
}
}
}
#[tokio::main]
async fn main() {
let mut s = countdown(5);
while let Some(val) = s.next().await {
println!("倒计时: {val}");
}
}

async-streamstream! 宏通过内部生成一个 async fn + 手动状态机来实现,性能与手写相当。它的 try_stream! 变体支持 ? 错误传播:

use async_stream::try_stream;
use futures::stream::StreamExt;
fn lines_from_file(path: &str) -> impl futures::Stream<Item = Result<String, std::io::Error>> {
let path = path.to_owned();
try_stream! {
let content = tokio::fs::read_to_string(&path).await?;
for line in content.lines() {
yield line.to_owned();
}
}
}

模式三:channel 转 Stream#

当数据源是外部推送(如 WebSocket、消息队列)时,channel 转 Stream 是最实用的模式:

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use futures::stream::StreamExt;
async fn websocket_messages() -> impl futures::Stream<Item = String> {
let (tx, rx) = mpsc::channel(128);
// 模拟外部推送
tokio::spawn(async move {
for i in 0..10 {
if tx.send(format!("msg-{i}")).await.is_err() {
break;
}
}
});
// ReceiverStream 包装 mpsc::Receiver 以实现 Stream
ReceiverStream::new(rx)
}

Tokio 的 mpsc::Receiver 需要通过 tokio-stream crate 的 ReceiverStream 包装后才能实现 Stream。这种模式天然支持背压:channel 满时,发送方 send().await 会等待。

async generator:gen blocks(Nightly 状态)#

Rust 社区一直在推进 gen blocks 语法,让生成器的写法更接近 Python 的 yield

// nightly-only,尚未稳定,仅作演示
#![feature(gen_blocks)]
fn counter() -> impl Iterator<Item = u32> {
gen {
let mut i = 0;
loop {
yield i;
i += 1;
}
}
}

异步版本(async gen blocks 同样仅限 nightly):

// nightly-only,极度实验性,仅作演示
#![feature(gen_blocks)]
fn async_counter() -> impl futures::Stream<Item = u32> {
async gen {
let mut i = 0;
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
yield i;
i += 1;
}
}
}

截至 Rust 1.96,gen blocks 和 async gen blocks 仍在 nightly 且 API 可能变更。生产环境请使用 async-stream crate,不要依赖 nightly 的 gen blocks。

gen blocks 的核心价值:让编译器自动生成状态机,消除手写 poll_next 的样板代码。一旦稳定,它将取代 async-stream crate 成为首选方案。

背压控制模式#

背压(backpressure)是 Stream 体系的核心问题:如果消费者处理速度慢于生产者,数据会无限堆积。Rust 的异步生态提供了多种背压控制手段。

模式一:有限并发(buffered / buffer_unordered)#

use futures::stream::{self, StreamExt};
async fn fetch_url(url: &str) -> String {
// 模拟 HTTP 请求
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
format!("response from {url}")
}
async fn bounded_concurrency() {
let urls = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
// 最多 3 个并发请求
let results: Vec<String> = stream::iter(urls)
.map(|url| fetch_url(url))
.buffered(3) // 关键:限制并发度
.collect()
.await;
println!("获取到 {} 个结果", results.len());
}

buffered(n) 保证同时最多 n 个 Future 在执行。更精细的选择:

  • buffered(n):保持顺序
  • buffer_unordered(n):不保持顺序,稍快

模式二:Channel 背压#

use tokio::sync::mpsc;
async fn producer(tx: mpsc::Sender<u32>) {
for i in 0..1000 {
// channel 满时自动等待 —— 天然背压
if tx.send(i).await.is_err() {
break; // 消费者已关闭
}
}
}
async fn consumer(mut rx: mpsc::Receiver<u32>) {
while let Some(val) = rx.recv().await {
// 慢消费:模拟处理耗时
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
println!("处理: {val}");
}
}
#[tokio::main]
async fn main() {
// channel 容量 16:生产者最多领先消费者 16 条消息
let (tx, rx) = mpsc::channel(16);
let p = tokio::spawn(producer(tx));
let c = tokio::spawn(consumer(rx));
let _ = p.await;
let _ = c.await;
}

Channel 容量选择经验:

  • 容量 1:最严格背压,几乎同步
  • 容量 16~128:通用推荐,平衡吞吐与延迟
  • 容量 1024+:高吞吐场景,但需注意内存开销

模式三:组合——Stream + Semaphore#

更灵活的背压控制,适合不同优先级的任务:

use tokio::sync::Semaphore;
use futures::stream::{self, StreamExt};
async fn rate_limited_processing() {
let semaphore = std::sync::Arc::new(Semaphore::new(5));
let items: Vec<u32> = (0..20).collect();
let handles: Vec<_> = stream::iter(items)
.map(|item| {
let sem = semaphore.clone();
async move {
let _permit = sem.acquire().await.unwrap();
// 最多 5 个并发
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
item * 2
}
})
.buffer_unordered(20) // buffer 允许 20 个 task 在跑
.collect()
.await;
println!("结果: {handles:?}");
}

这里 Semaphore 控制真正的并发度,buffer_unordered 控制缓冲区大小。两层分离,各司其职。

Stream 与 Iterator 的对比#

本质差异#

维度IteratorStream
同步/异步同步异步
阻塞行为next() 立即返回next().await 可能等待
背压天然存在(拉取模型)需要显式控制
组合子丰富且稳定丰富但在 futures-util
标准库std::iter不在标准库
零开销有调度开销
可组合性同步组合需要 async 组合

为什么 Stream 不在标准库?#

这是一个历史和政治问题。Stream 曾计划进入标准库(std::stream),但因为:

  1. 异步运行时碎片化(tokio vs async-std vs smol)
  2. Stream 的 GAT 关联类型曾依赖 unstable 特性
  3. 社区对 Stream 应该支持 async fn next() 还是 poll_next 有分歧

目前 Streamfutures-core 中定义,所有运行时都依赖它。2026 年仍有提议将 Stream 稳定化进入标准库,但短期内不太可能落地。

从 Iterator 到 Stream 的转换#

use futures::stream::{self, StreamExt};
// Iterator -> Stream(零开销)
let s1 = stream::iter(0..10);
// Stream -> Iterator(需要阻塞)
// 不能直接转换!Stream 的 next() 是异步的
// 必须在运行时中 block_on:
use tokio::runtime::Runtime;
let rt = Runtime::new().unwrap();
let iter = rt.block_on(async {
let mut s = stream::iter(0..10);
let mut vec = vec![];
while let Some(v) = s.next().await {
vec.push(v);
}
vec
});

关键经验:不要试图在异步代码中混用 Iterator 和 Stream。选定一个模型后贯穿始终。如果你的数据源是同步的(如 VecHashMap),用 Iterator;如果涉及 I/O、网络、定时器,用 Stream。

实战经验总结#

1. 优先用 async-stream,别手写 poll_next#

手写 poll_next 的正确率在团队中极低。async-streamstream! 宏在 99% 的场景下够用,性能损失可忽略。只有在极度性能敏感的路径(如每秒百万级事件流)才考虑手写。

2. 背压不是可选项#

在微服务架构中,一个没有背压控制的 Stream 就是内存泄漏的定时炸弹。永远为生产者-消费者链路设置背压——channel 容量、buffered(n)、Semaphore,至少选一个。

3. collect() 前确认有限性#

// 危险:无限 Stream + collect = 永远等待
let all: Vec<_> = infinite_stream().collect().await;
// 安全:用 take 限制
let first_100: Vec<_> = infinite_stream().take(100).collect().await;

4. Stream 的生命周期比 Future 更复杂#

Future 的生命周期通常较短(等一个值),Stream 的生命周期可能很长(持续运行)。注意 Stream 中的借用不能跨 .await 点——这是异步借用检查器(borrow checker)的限制,与 Future 一致。

5. pin_project 处理自引用 Stream#

如果你手写 Stream 且内部有自引用结构,使用 pin_project_litepin-project crate 来安全地实现 Unpin

use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::stream::Stream;
pin_project! {
struct MyStream {
data: Vec<u32>,
#[pin]
inner: SomeOtherStream,
}
}

支持与分享

如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!

赞助
Rust 2026 经验谈 - 异步流与迭代
https://tinyzzh.github.io/posts/rust-2026/2026-06-16-rust_2026_016_async_stream/
作者
TinyZ Zzh
发布于
2026-06-16
许可协议
CC BY-NC-SA 4.0

评论区

Profile Image of the Author
TinyZ Zzh
专注于高并发服务器、网络游戏相关(Java、PHP、Unity3D、Unreal Engine等)技术,热爱游戏事业, 正在努力实现自我价值当中。
公告
欢迎来到我的博客!这是一则示例公告。
音乐
封面

音乐

暂未播放

0:00 0:00
暂无歌词
分类
标签
站点统计
文章
232
分类
38
标签
255
总字数
384,983
运行时长
0
最后活动
0 天前

文章目录