Rust语言从入门到精通系列 - GRPC框架入门指北

gRPC 是 Google 开源的高性能、通用的 RPC 框架,它采用了基于 HTTP/2 协议的二进制传输协议,支持多种语言,包括 Rust。Rust 语言 GRPC 模块是一个用于 Rust 语言的 gRPC 客户端和服务器实现,它提供了一个简单易用的 API,可以方便地创建和使用 gRPC 服务。
重要提示: 以下示例使用的
grpccrate(grpc-rs)已废弃且不再维护。现代 Rust gRPC 开发推荐使用 tonic,它是基于 Hyper 1.x 和 prost 的高性能 gRPC 框架。下方保留旧 API 示例仅供参考,并附带 tonic 替代代码。
基础用法
创建 gRPC 服务器
在 Rust 语言 GRPC 模块中,可以使用ServerBuilder结构体来创建 gRPC 服务器。下面是一个简单的示例:
// ⚠️ 以下使用已废弃的 grpc crate (grpc-rs),仅作参考use grpc::{Server, ServerBuilder};
fn main() { let mut server = ServerBuilder::new_plain(); server.http.set_port(50051); server.add_service(proto::greeter_server::GreeterServer::new_service_def(GreeterImpl {})); let server = server.build().unwrap(); server.start(); server.wait();}
struct GreeterImpl {}
impl proto::greeter_server::Greeter for GreeterImpl { fn say_hello(&self, _m: grpc::RequestOptions, req: proto::HelloRequest) -> grpc::SingleResponse<proto::HelloReply> { let mut r = proto::HelloReply::new(); r.set_message(format!("Hello, {}!", req.get_name())); grpc::SingleResponse::completed(r) }}tonic 替代代码:
use tonic::{transport::Server, Request, Response, Status};
pub mod greeter { tonic::include_proto!("greeter");}
use greeter::greeter_server::{Greeter, GreeterServer};use greeter::{HelloReply, HelloRequest};
#[derive(Default)]pub struct GreeterImpl {}
#[tonic::async_trait]impl Greeter for GreeterImpl { async fn say_hello( &self, request: Request<HelloRequest>, ) -> Result<Response<HelloReply>, Status> { let reply = HelloReply { message: format!("Hello, {}!", request.into_inner().name), }; Ok(Response::new(reply)) }}
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let addr = "[::1]:50051".parse()?; let greeter = GreeterImpl::default(); Server::builder() .add_service(GreeterServer::new(greeter)) .serve(addr) .await?; Ok(())}这个示例中,我们创建了一个ServerBuilder对象,并通过http字段设置了服务器的端口号。然后我们使用add_service方法将我们实现的Greeter服务添加到服务器中。最后,我们通过build方法构建了服务器,并通过start方法启动了服务器。服务器启动后,我们通过wait方法等待客户端连接。
创建 gRPC 客户端
在 Rust 语言 GRPC 模块中,可以使用Client结构体来创建 gRPC 客户端。下面是一个简单的示例:
// ⚠️ 以下使用已废弃的 grpc crate (grpc-rs),仅作参考use grpc::{ChannelBuilder, Client};
fn main() { let ch = ChannelBuilder::new_plain(); let client = Client::new(ch); let mut req = proto::HelloRequest::new(); req.set_name("world".to_string()); let resp = client.say_hello(grpc::RequestOptions::new(), req); println!("{}", resp.wait().unwrap().get_message());}tonic 替代代码:
use tonic::transport::Channel;
pub mod greeter { tonic::include_proto!("greeter");}
use greeter::greeter_client::GreeterClient;use greeter::HelloRequest;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let channel = Channel::from_static("http://[::1]:50051").connect().await?; let mut client = GreeterClient::new(channel); let request = tonic::Request::new(HelloRequest { name: "world".into(), }); let response = client.say_hello(request).await?; println!("{}", response.into_inner().message); Ok(())}这个示例中,我们创建了一个ChannelBuilder对象,并使用Client结构体创建了一个 gRPC 客户端。然后我们创建了一个HelloRequest对象,并设置了它的name字段。最后,我们使用say_hello方法向服务器发送请求,并通过wait方法等待响应。响应对象是一个SingleResponse对象,我们通过unwrap方法获取了它的值,并打印了它的message字段。
使用流式 RPC
在 Rust 语言 GRPC 模块中,可以使用流式 RPC 来传输流数据。下面是一个简单的示例:
// ⚠️ 以下使用已废弃的 grpc crate (grpc-rs),仅作参考use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, WriteFlags};
fn main() { let mut server = ServerBuilder::new_plain(); server.http.set_port(50051); server.add_service(proto::streaming::create_greeter_server(GreeterImpl {})); let server = server.build().unwrap(); server.start();
let ch = ChannelBuilder::new_plain(); let client = Client::new(ch);
let reqs = vec![ proto::HelloRequest::new(), proto::HelloRequest::new(), proto::HelloRequest::new(), ];
let (mut tx, rx) = client.say_hello_stream(grpc::RequestOptions::new()).unwrap(); for req in reqs { tx = tx.send((req, WriteFlags::default())).unwrap(); } tx.close().unwrap();
for resp in rx.wait() { println!("{}", resp.unwrap().get_message()); }}
struct GreeterImpl {}
impl proto::streaming::Greeter for GreeterImpl { fn say_hello_stream(&self, _m: grpc::RequestOptions, _stream: grpc::StreamingRequest<proto::HelloRequest>) -> grpc::StreamingResponse<proto::HelloReply> { let (tx, rx) = grpc::channel::mpsc::channel(0); std::thread::spawn(move || { for req in _stream.into_iter() { let mut r = proto::HelloReply::new(); r.set_message(format!("Hello, {}!", req.get_name())); tx.send((r, WriteFlags::default())).unwrap(); } tx.close().unwrap(); }); grpc::StreamingResponse::new(rx) }}提示: 流式 RPC 在 tonic 中使用
tonic::Streaming和tokio::sync::mpsc实现更为简洁,参考 tonic 流式示例。
这个示例中,我们创建了一个Greeter服务,并实现了一个say_hello_stream方法,该方法接收一个StreamingRequest对象,并返回一个StreamingResponse对象。在该方法中,我们使用mpsc::channel方法创建了一个通道,用于传输流数据。然后我们使用std::thread::spawn方法创建了一个线程,该线程会将接收到的请求转换成响应,并通过通道发送给客户端。最后,我们使用StreamingResponse::new方法将通道包装成一个StreamingResponse对象,并将其返回给客户端。
在客户端中,我们创建了一个say_hello_stream方法,并使用send方法向服务器发送请求。然后我们通过wait方法等待响应,并打印了响应的message字段。
使用双向流式 RPC
在 Rust 语言 GRPC 模块中,可以使用双向流式 RPC 来传输双向流数据。下面是一个简单的示例:
// ⚠️ 以下使用已废弃的 grpc crate (grpc-rs),仅作参考use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, StreamingSink, WriteFlags};
fn main() { let mut server = ServerBuilder::new_plain(); server.http.set_port(50051); server.add_service(proto::streaming::create_greeter_server(GreeterImpl {})); let server = server.build().unwrap(); server.start();
let ch = ChannelBuilder::new_plain(); let client = Client::new(ch);
let (mut tx, rx) = client.say_hello_bidi(grpc::RequestOptions::new()).unwrap(); let reqs = vec![ proto::HelloRequest::new(), proto::HelloRequest::new(), proto::HelloRequest::new(), ]; std::thread::spawn(move || { for req in reqs { tx = tx.send((req, WriteFlags::default())).unwrap(); let resp = rx.into_future().wait().unwrap().0; println!("{}", resp.unwrap().get_message()); } tx.close().unwrap(); });}
struct GreeterImpl {}
impl proto::streaming::Greeter for GreeterImpl { fn say_hello_bidi(&self, _m: grpc::RequestOptions, stream: grpc::StreamingRequest<proto::HelloRequest>) -> grpc::StreamingResponse<proto::HelloReply> { let (tx, rx) = grpc::channel::mpsc::channel(0); std::thread::spawn(move || { for req in stream.into_iter() { let mut r = proto::HelloReply::new(); r.set_message(format!("Hello, {}!", req.get_name())); tx.send((r, WriteFlags::default())).unwrap(); } tx.close().unwrap(); }); grpc::StreamingResponse::new(rx) }}提示: 双向流式 RPC 在 tonic 中通过
Request<Streaming<T>>和Response<Streaming<T>>实现,参考 tonic 双向流示例。
这个示例中,我们创建了一个Greeter服务,并实现了一个say_hello_bidi方法,该方法接收一个StreamingRequest对象,并返回一个StreamingResponse对象。在该方法中,我们使用mpsc::channel方法创建了一个通道,用于传输流数据。然后我们使用std::thread::spawn方法创建了一个线程,该线程会将接收到的请求转换成响应,并通过通道发送给客户端。最后,我们使用StreamingResponse::new方法将通道包装成一个StreamingResponse对象,并将其返回给客户端。
在客户端中,我们使用say_hello_bidi方法向服务器发送请求,并通过into_future方法获取响应。然后我们通过println方法打印了响应的message字段。
进阶用法
使用 tokio
在 Rust 语言 GRPC 模块中,可以使用 tokio 来实现异步 RPC。下面是一个简单的示例:
// ⚠️ 以下使用已废弃的 grpc crate (grpc-rs),仅作参考use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, StreamingSink, WriteFlags};
#[tokio::main]async fn main() { let mut server = ServerBuilder::new_plain(); server.http.set_port(50051); server.add_service(proto::greeter_server::GreeterServer::new_service_def(GreeterImpl {})); let server = server.build().unwrap(); server.start();
let ch = ChannelBuilder::new_plain(); let client = Client::new(ch);
let mut req = proto::HelloRequest::new(); req.set_name("world".to_string()); let resp = client.say_hello_async(grpc::RequestOptions::new(), req).await.unwrap(); println!("{}", resp.get_message());}
struct GreeterImpl {}
impl proto::greeter_server::Greeter for GreeterImpl { fn say_hello(&self, _m: grpc::RequestOptions, req: proto::HelloRequest) -> grpc::SingleResponse<proto::HelloReply> { let mut r = proto::HelloReply::new(); r.set_message(format!("Hello, {}!", req.get_name())); grpc::SingleResponse::completed(r) }}提示: tonic 本身就是基于 tokio 的异步框架,无需额外配置即可使用
#[tokio::main]。
这个示例中,我们使用tokio::main宏来创建异步运行时。在服务器和客户端中,我们使用async关键字来定义异步函数。在客户端中,我们使用await关键字来等待异步响应。
tokio 使用流式 RPC
下面是一个使用 tokio 和流式 RPC 的示例:
// ⚠️ 以下使用已废弃的 grpc crate (grpc-rs),仅作参考use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, StreamingSink, WriteFlags};use tokio::sync::mpsc;
#[tokio::main]async fn main() { let mut server = ServerBuilder::new_plain(); server.http.set_port(50051); server.add_service(proto::streaming::create_greeter_server(GreeterImpl {})); let server = server.build().unwrap(); server.start();
let ch = ChannelBuilder::new_plain(); let client = Client::new(ch);
let (mut tx, rx) = mpsc::channel(10); let mut stream = client.say_hello_streaming(grpc::RequestOptions::new()).unwrap(); tokio::spawn(async move { while let Some(req) = rx.recv().await { stream.send((req, WriteFlags::default())).unwrap(); } stream.close().unwrap(); });
let reqs = vec![ proto::HelloRequest::new(), proto::HelloRequest::new(), proto::HelloRequest::new(), ]; for req in reqs { tx.send(req).await.unwrap(); }
for resp in stream.into_stream().await { println!("{}", resp.unwrap().get_message()); }}
struct GreeterImpl {}
impl proto::streaming::Greeter for GreeterImpl { fn say_hello_streaming(&self, _m: grpc::RequestOptions, _stream: grpc::StreamingRequest<proto::HelloRequest>) -> grpc::StreamingResponse<proto::HelloReply> { let (tx, rx) = grpc::channel::mpsc::channel(0); tokio::spawn(async move { for req in _stream.into_async_iter().await { let mut r = proto::HelloReply::new(); r.set_message(format!("Hello, {}!", req.get_name())); tx.send((r, WriteFlags::default())).unwrap(); } tx.close().unwrap(); }); grpc::StreamingResponse::new(rx) }}提示: tonic 的流式 RPC 原生支持 tokio 异步,无需手动管理
WriteFlags。
这个示例中,我们使用tokio::sync::mpsc库来创建一个通道,用于传输流数据。在客户端中,我们使用say_hello_streaming方法向服务器发送请求,并将请求通过通道发送给异步任务。在异步任务中,我们使用into_async_iter方法将请求流转换成异步迭代器,并将响应通过通道发送给客户端。在客户端中,我们使用into_stream方法将响应流转换成异步流,并等待响应。
使用 TLS 加密
在 Rust 语言 GRPC 模块中,可以使用 TLS 加密来保护通信安全。下面是一个简单的示例:
// ⚠️ 以下使用已废弃的 grpc crate (grpc-rs),仅作参考// ⚠️ rustls::internal::pemfile 和 rustls::NoClientAuth 在新版 rustls 中已移除use grpc::{ChannelBuilder, Client};use rustls::{Certificate, PrivateKey, ServerConfig};use std::fs::File;use std::io::BufReader;
fn main() { let mut config = ServerConfig::new(rustls::NoClientAuth::new()); let cert_file = &mut BufReader::new(File::open("server.crt").unwrap()); let key_file = &mut BufReader::new(File::open("server.key").unwrap()); let cert_chain = rustls::internal::pemfile::certs(cert_file).unwrap(); let mut keys = rustls::internal::pemfile::rsa_private_keys(key_file).unwrap(); config.set_single_cert(cert_chain, keys.remove(0)).unwrap(); let mut server = grpc_tls::ServerBuilder::new_plain(); server.http.set_port(50051); server.http.set_tls(config); server.add_service(proto::greeter_server::GreeterServer::new_service_def(GreeterImpl {})); let server = server.build().unwrap(); server.start();
let mut config = rustls::ClientConfig::new(); let cert_file = &mut BufReader::new(File::open("client.crt").unwrap()); let key_file = &mut BufReader::new(File::open("client.key").unwrap()); let cert_chain = rustls::internal::pemfile::certs(cert_file).unwrap(); let mut keys = rustls::internal::pemfile::rsa_private_keys(key_file).unwrap(); config.set_single_client_cert(cert_chain, keys.remove(0)); let ch = ChannelBuilder::new_tls().rustls_config(config); let client = Client::new(ch); let mut req = proto::HelloRequest::new(); req.set_name("world".to_string()); let resp = client.say_hello(grpc::RequestOptions::new(), req); println!("{}", resp.wait().unwrap().get_message());}
struct GreeterImpl {}
impl proto::greeter_server::Greeter for GreeterImpl { fn say_hello(&self, _m: grpc::RequestOptions, req: proto::HelloRequest) -> grpc::SingleResponse<proto::HelloReply> { let mut r = proto::HelloReply::new(); r.set_message(format!("Hello, {}!", req.get_name())); grpc::SingleResponse::completed(r) }}提示: tonic 原生支持 TLS,使用
tonic::transport::Server::builder().tls_config()即可配置,无需grpc_tls或手动操作rustls::internal::pemfile。参考 tonic TLS 示例。
这个示例中,我们使用rustls库来创建 TLS 配置,并使用grpc_tls::ServerBuilder和ChannelBuilder::new_tls方法来创建带有 TLS 加密的服务器和客户端。在服务器中,我们使用set_single_cert方法来设置服务器证书和私钥。在客户端中,我们使用set_single_client_cert方法来设置客户端证书和私钥。
总结
本教程介绍了 GRPC 的基础使用方法,并针对 tokio 结合 GRPC 的进阶使用进入入门级的探讨。希望能帮助同学们掌握 Rust 语言 GRPC 的应用。
支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!