Rust语言从入门到精通系列 - GRPC框架入门指北
gRPC 是 Google 开源的高性能、通用的 RPC 框架,它采用了基于 HTTP/2 协议的二进制传输协议,支持多种语言,包括 Rust。Rust 语言 GRPC 模块是一个用于 Rust 语言的 gRPC 客户端和服务器实现,它提供了一个简单易用的 API,可以方便地创建和使用 gRPC 服务。
基础用法
创建 gRPC 服务器
在 Rust 语言 GRPC 模块中,可以使用ServerBuilder
结构体来创建 gRPC 服务器。下面是一个简单的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use grpc::{Server, ServerBuilder};
fn main() {
let mut server = ServerBuilder::new_plain();
server.http.set_port(50051);
server.add_service(proto::greeter_server::GreeterServer::new_service_def(GreeterImpl {}));
let server = server.build().unwrap();
server.start();
server.wait();
}
struct GreeterImpl {}
impl proto::greeter_server::Greeter for GreeterImpl {
fn say_hello(&self, _m: grpc::RequestOptions, req: proto::HelloRequest) -> grpc::SingleResponse<proto::HelloReply> {
let mut r = proto::HelloReply::new();
r.set_message(format!("Hello, {}!", req.get_name()));
grpc::SingleResponse::completed(r)
}
}
这个示例中,我们创建了一个ServerBuilder
对象,并通过http
字段设置了服务器的端口号。然后我们使用add_service
方法将我们实现的Greeter
服务添加到服务器中。最后,我们通过build
方法构建了服务器,并通过start
方法启动了服务器。服务器启动后,我们通过wait
方法等待客户端连接。
创建 gRPC 客户端
在 Rust 语言 GRPC 模块中,可以使用Client
结构体来创建 gRPC 客户端。下面是一个简单的示例:
1
2
3
4
5
6
7
8
9
10
use grpc::{ChannelBuilder, Client};
fn main() {
let ch = ChannelBuilder::new_plain();
let client = Client::new(ch);
let mut req = proto::HelloRequest::new();
req.set_name("world".to_string());
let resp = client.say_hello(grpc::RequestOptions::new(), req);
println!("{}", resp.wait().unwrap().get_message());
}
这个示例中,我们创建了一个ChannelBuilder
对象,并使用Client
结构体创建了一个 gRPC 客户端。然后我们创建了一个HelloRequest
对象,并设置了它的name
字段。最后,我们使用say_hello
方法向服务器发送请求,并通过wait
方法等待响应。响应对象是一个SingleResponse
对象,我们通过unwrap
方法获取了它的值,并打印了它的message
字段。
使用流式 RPC
在 Rust 语言 GRPC 模块中,可以使用流式 RPC 来传输流数据。下面是一个简单的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, WriteFlags};
fn main() {
let mut server = ServerBuilder::new_plain();
server.http.set_port(50051);
server.add_service(proto::streaming::create_greeter_server(GreeterImpl {}));
let server = server.build().unwrap();
server.start();
let ch = ChannelBuilder::new_plain();
let client = Client::new(ch);
let reqs = vec![
proto::HelloRequest::new(),
proto::HelloRequest::new(),
proto::HelloRequest::new(),
];
let (mut tx, rx) = client.say_hello_stream(grpc::RequestOptions::new()).unwrap();
for req in reqs {
tx = tx.send((req, WriteFlags::default())).unwrap();
}
tx.close().unwrap();
for resp in rx.wait() {
println!("{}", resp.unwrap().get_message());
}
}
struct GreeterImpl {}
impl proto::streaming::Greeter for GreeterImpl {
fn say_hello_stream(&self, _m: grpc::RequestOptions, _stream: grpc::StreamingRequest<proto::HelloRequest>) -> grpc::StreamingResponse<proto::HelloReply> {
let (tx, rx) = grpc::channel::mpsc::channel(0);
std::thread::spawn(move || {
for req in _stream.into_iter() {
let mut r = proto::HelloReply::new();
r.set_message(format!("Hello, {}!", req.get_name()));
tx.send((r, WriteFlags::default())).unwrap();
}
tx.close().unwrap();
});
grpc::StreamingResponse::new(rx)
}
}
这个示例中,我们创建了一个Greeter
服务,并实现了一个say_hello_stream
方法,该方法接收一个StreamingRequest
对象,并返回一个StreamingResponse
对象。在该方法中,我们使用mpsc::channel
方法创建了一个通道,用于传输流数据。然后我们使用std::thread::spawn
方法创建了一个线程,该线程会将接收到的请求转换成响应,并通过通道发送给客户端。最后,我们使用StreamingResponse::new
方法将通道包装成一个StreamingResponse
对象,并将其返回给客户端。
在客户端中,我们创建了一个say_hello_stream
方法,并使用send
方法向服务器发送请求。然后我们通过wait
方法等待响应,并打印了响应的message
字段。
使用双向流式 RPC
在 Rust 语言 GRPC 模块中,可以使用双向流式 RPC 来传输双向流数据。下面是一个简单的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, StreamingSink, WriteFlags};
fn main() {
let mut server = ServerBuilder::new_plain();
server.http.set_port(50051);
server.add_service(proto::streaming::create_greeter_server(GreeterImpl {}));
let server = server.build().unwrap();
server.start();
let ch = ChannelBuilder::new_plain();
let client = Client::new(ch);
let (mut tx, rx) = client.say_hello_bidi(grpc::RequestOptions::new()).unwrap();
let reqs = vec![
proto::HelloRequest::new(),
proto::HelloRequest::new(),
proto::HelloRequest::new(),
];
std::thread::spawn(move || {
for req in reqs {
tx = tx.send((req, WriteFlags::default())).unwrap();
let resp = rx.into_future().wait().unwrap().0;
println!("{}", resp.unwrap().get_message());
}
tx.close().unwrap();
});
}
struct GreeterImpl {}
impl proto::streaming::Greeter for GreeterImpl {
fn say_hello_bidi(&self, _m: grpc::RequestOptions, stream: grpc::StreamingRequest<proto::HelloRequest>) -> grpc::StreamingResponse<proto::HelloReply> {
let (tx, rx) = grpc::channel::mpsc::channel(0);
std::thread::spawn(move || {
for req in stream.into_iter() {
let mut r = proto::HelloReply::new();
r.set_message(format!("Hello, {}!", req.get_name()));
tx.send((r, WriteFlags::default())).unwrap();
}
tx.close().unwrap();
});
grpc::StreamingResponse::new(rx)
}
}
这个示例中,我们创建了一个Greeter
服务,并实现了一个say_hello_bidi
方法,该方法接收一个StreamingRequest
对象,并返回一个StreamingResponse
对象。在该方法中,我们使用mpsc::channel
方法创建了一个通道,用于传输流数据。然后我们使用std::thread::spawn
方法创建了一个线程,该线程会将接收到的请求转换成响应,并通过通道发送给客户端。最后,我们使用StreamingResponse::new
方法将通道包装成一个StreamingResponse
对象,并将其返回给客户端。
在客户端中,我们使用say_hello_bidi
方法向服务器发送请求,并通过into_future
方法获取响应。然后我们通过println
方法打印了响应的message
字段。
进阶用法
使用 tokio
在 Rust 语言 GRPC 模块中,可以使用 tokio 来实现异步 RPC。下面是一个简单的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, StreamingSink, WriteFlags};
#[tokio::main]
async fn main() {
let mut server = ServerBuilder::new_plain();
server.http.set_port(50051);
server.add_service(proto::greeter_server::GreeterServer::new_service_def(GreeterImpl {}));
let server = server.build().unwrap();
server.start();
let ch = ChannelBuilder::new_plain();
let client = Client::new(ch);
let mut req = proto::HelloRequest::new();
req.set_name("world".to_string());
let resp = client.say_hello_async(grpc::RequestOptions::new(), req).await.unwrap();
println!("{}", resp.get_message());
}
struct GreeterImpl {}
impl proto::greeter_server::Greeter for GreeterImpl {
fn say_hello(&self, _m: grpc::RequestOptions, req: proto::HelloRequest) -> grpc::SingleResponse<proto::HelloReply> {
let mut r = proto::HelloReply::new();
r.set_message(format!("Hello, {}!", req.get_name()));
grpc::SingleResponse::completed(r)
}
}
这个示例中,我们使用tokio::main
宏来创建异步运行时。在服务器和客户端中,我们使用async
关键字来定义异步函数。在客户端中,我们使用await
关键字来等待异步响应。
tokio 使用流式 RPC
下面是一个使用 tokio 和流式 RPC 的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, StreamingSink, WriteFlags};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let mut server = ServerBuilder::new_plain();
server.http.set_port(50051);
server.add_service(proto::streaming::create_greeter_server(GreeterImpl {}));
let server = server.build().unwrap();
server.start();
let ch = ChannelBuilder::new_plain();
let client = Client::new(ch);
let (mut tx, rx) = mpsc::channel(10);
let mut stream = client.say_hello_streaming(grpc::RequestOptions::new()).unwrap();
tokio::spawn(async move {
while let Some(req) = rx.recv().await {
stream.send((req, WriteFlags::default())).unwrap();
}
stream.close().unwrap();
});
let reqs = vec![
proto::HelloRequest::new(),
proto::HelloRequest::new(),
proto::HelloRequest::new(),
];
for req in reqs {
tx.send(req).await.unwrap();
}
for resp in stream.into_stream().await {
println!("{}", resp.unwrap().get_message());
}
}
struct GreeterImpl {}
impl proto::streaming::Greeter for GreeterImpl {
fn say_hello_streaming(&self, _m: grpc::RequestOptions, _stream: grpc::StreamingRequest<proto::HelloRequest>) -> grpc::StreamingResponse<proto::HelloReply> {
let (tx, rx) = grpc::channel::mpsc::channel(0);
tokio::spawn(async move {
for req in _stream.into_async_iter().await {
let mut r = proto::HelloReply::new();
r.set_message(format!("Hello, {}!", req.get_name()));
tx.send((r, WriteFlags::default())).unwrap();
}
tx.close().unwrap();
});
grpc::StreamingResponse::new(rx)
}
}
这个示例中,我们使用tokio::sync::mpsc
库来创建一个通道,用于传输流数据。在客户端中,我们使用say_hello_streaming
方法向服务器发送请求,并将请求通过通道发送给异步任务。在异步任务中,我们使用into_async_iter
方法将请求流转换成异步迭代器,并将响应通过通道发送给客户端。在客户端中,我们使用into_stream
方法将响应流转换成异步流,并等待响应。
使用 TLS 加密
在 Rust 语言 GRPC 模块中,可以使用 TLS 加密来保护通信安全。下面是一个简单的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use grpc::{ChannelBuilder, Client};
use rustls::{Certificate, PrivateKey, ServerConfig};
use std::fs::File;
use std::io::BufReader;
fn main() {
let mut config = ServerConfig::new(rustls::NoClientAuth::new());
let cert_file = &mut BufReader::new(File::open("server.crt").unwrap());
let key_file = &mut BufReader::new(File::open("server.key").unwrap());
let cert_chain = rustls::internal::pemfile::certs(cert_file).unwrap();
let mut keys = rustls::internal::pemfile::rsa_private_keys(key_file).unwrap();
config.set_single_cert(cert_chain, keys.remove(0)).unwrap();
let mut server = grpc_tls::ServerBuilder::new_plain();
server.http.set_port(50051);
server.http.set_tls(config);
server.add_service(proto::greeter_server::GreeterServer::new_service_def(GreeterImpl {}));
let server = server.build().unwrap();
server.start();
let mut config = rustls::ClientConfig::new();
let cert_file = &mut BufReader::new(File::open("client.crt").unwrap());
let key_file = &mut BufReader::new(File::open("client.key").unwrap());
let cert_chain = rustls::internal::pemfile::certs(cert_file).unwrap();
let mut keys = rustls::internal::pemfile::rsa_private_keys(key_file).unwrap();
config.set_single_client_cert(cert_chain, keys.remove(0));
let ch = ChannelBuilder::new_tls().rustls_config(config);
let client = Client::new(ch);
let mut req = proto::HelloRequest::new();
req.set_name("world".to_string());
let resp = client.say_hello(grpc::RequestOptions::new(), req);
println!("{}", resp.wait().unwrap().get_message());
}
struct GreeterImpl {}
impl proto::greeter_server::Greeter for GreeterImpl {
fn say_hello(&self, _m: grpc::RequestOptions, req: proto::HelloRequest) -> grpc::SingleResponse<proto::HelloReply> {
let mut r = proto::HelloReply::new();
r.set_message(format!("Hello, {}!", req.get_name()));
grpc::SingleResponse::completed(r)
}
}
这个示例中,我们使用rustls
库来创建 TLS 配置,并使用grpc_tls::ServerBuilder
和ChannelBuilder::new_tls
方法来创建带有 TLS 加密的服务器和客户端。在服务器中,我们使用set_single_cert
方法来设置服务器证书和私钥。在客户端中,我们使用set_single_client_cert
方法来设置客户端证书和私钥。
总结
本教程介绍了 GRPC 的基础使用方法,并针对 tokio 结合 GRPC 的进阶使用进入入门级的探讨。希望能帮助同学们掌握 Rust 语言 GRPC 的应用。
本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。欢迎转载、使用、重新发布,但务必保留文章署名 TinyZ Zzh (包含链接: https://tinyzzh.github.io ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。 如有任何疑问,请 与我联系 (tinyzzh815@gmail.com) 。
评论