Rust语言从入门到精通系列 - GRPC框架入门指北

4 分钟阅读

gRPC 是 Google 开源的高性能、通用的 RPC 框架,它采用了基于 HTTP/2 协议的二进制传输协议,支持多种语言,包括 Rust。Rust 语言 GRPC 模块是一个用于 Rust 语言的 gRPC 客户端和服务器实现,它提供了一个简单易用的 API,可以方便地创建和使用 gRPC 服务。

基础用法

创建 gRPC 服务器

在 Rust 语言 GRPC 模块中,可以使用ServerBuilder结构体来创建 gRPC 服务器。下面是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
use grpc::{Server, ServerBuilder};

fn main() {
    let mut server = ServerBuilder::new_plain();
    server.http.set_port(50051);
    server.add_service(proto::greeter_server::GreeterServer::new_service_def(GreeterImpl {}));
    let server = server.build().unwrap();
    server.start();
    server.wait();
}

struct GreeterImpl {}

impl proto::greeter_server::Greeter for GreeterImpl {
    fn say_hello(&self, _m: grpc::RequestOptions, req: proto::HelloRequest) -> grpc::SingleResponse<proto::HelloReply> {
        let mut r = proto::HelloReply::new();
        r.set_message(format!("Hello, {}!", req.get_name()));
        grpc::SingleResponse::completed(r)
    }
}

这个示例中,我们创建了一个ServerBuilder对象,并通过http字段设置了服务器的端口号。然后我们使用add_service方法将我们实现的Greeter服务添加到服务器中。最后,我们通过build方法构建了服务器,并通过start方法启动了服务器。服务器启动后,我们通过wait方法等待客户端连接。

创建 gRPC 客户端

在 Rust 语言 GRPC 模块中,可以使用Client结构体来创建 gRPC 客户端。下面是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
use grpc::{ChannelBuilder, Client};

fn main() {
    let ch = ChannelBuilder::new_plain();
    let client = Client::new(ch);
    let mut req = proto::HelloRequest::new();
    req.set_name("world".to_string());
    let resp = client.say_hello(grpc::RequestOptions::new(), req);
    println!("{}", resp.wait().unwrap().get_message());
}

这个示例中,我们创建了一个ChannelBuilder对象,并使用Client结构体创建了一个 gRPC 客户端。然后我们创建了一个HelloRequest对象,并设置了它的name字段。最后,我们使用say_hello方法向服务器发送请求,并通过wait方法等待响应。响应对象是一个SingleResponse对象,我们通过unwrap方法获取了它的值,并打印了它的message字段。

使用流式 RPC

在 Rust 语言 GRPC 模块中,可以使用流式 RPC 来传输流数据。下面是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, WriteFlags};

fn main() {
    let mut server = ServerBuilder::new_plain();
    server.http.set_port(50051);
    server.add_service(proto::streaming::create_greeter_server(GreeterImpl {}));
    let server = server.build().unwrap();
    server.start();

    let ch = ChannelBuilder::new_plain();
    let client = Client::new(ch);

    let reqs = vec![
        proto::HelloRequest::new(),
        proto::HelloRequest::new(),
        proto::HelloRequest::new(),
    ];

    let (mut tx, rx) = client.say_hello_stream(grpc::RequestOptions::new()).unwrap();
    for req in reqs {
        tx = tx.send((req, WriteFlags::default())).unwrap();
    }
    tx.close().unwrap();

    for resp in rx.wait() {
        println!("{}", resp.unwrap().get_message());
    }
}

struct GreeterImpl {}

impl proto::streaming::Greeter for GreeterImpl {
    fn say_hello_stream(&self, _m: grpc::RequestOptions, _stream: grpc::StreamingRequest<proto::HelloRequest>) -> grpc::StreamingResponse<proto::HelloReply> {
        let (tx, rx) = grpc::channel::mpsc::channel(0);
        std::thread::spawn(move || {
            for req in _stream.into_iter() {
                let mut r = proto::HelloReply::new();
                r.set_message(format!("Hello, {}!", req.get_name()));
                tx.send((r, WriteFlags::default())).unwrap();
            }
            tx.close().unwrap();
        });
        grpc::StreamingResponse::new(rx)
    }
}

这个示例中,我们创建了一个Greeter服务,并实现了一个say_hello_stream方法,该方法接收一个StreamingRequest对象,并返回一个StreamingResponse对象。在该方法中,我们使用mpsc::channel方法创建了一个通道,用于传输流数据。然后我们使用std::thread::spawn方法创建了一个线程,该线程会将接收到的请求转换成响应,并通过通道发送给客户端。最后,我们使用StreamingResponse::new方法将通道包装成一个StreamingResponse对象,并将其返回给客户端。

在客户端中,我们创建了一个say_hello_stream方法,并使用send方法向服务器发送请求。然后我们通过wait方法等待响应,并打印了响应的message字段。

使用双向流式 RPC

在 Rust 语言 GRPC 模块中,可以使用双向流式 RPC 来传输双向流数据。下面是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, StreamingSink, WriteFlags};

fn main() {
    let mut server = ServerBuilder::new_plain();
    server.http.set_port(50051);
    server.add_service(proto::streaming::create_greeter_server(GreeterImpl {}));
    let server = server.build().unwrap();
    server.start();

    let ch = ChannelBuilder::new_plain();
    let client = Client::new(ch);

    let (mut tx, rx) = client.say_hello_bidi(grpc::RequestOptions::new()).unwrap();
    let reqs = vec![
        proto::HelloRequest::new(),
        proto::HelloRequest::new(),
        proto::HelloRequest::new(),
    ];
    std::thread::spawn(move || {
        for req in reqs {
            tx = tx.send((req, WriteFlags::default())).unwrap();
            let resp = rx.into_future().wait().unwrap().0;
            println!("{}", resp.unwrap().get_message());
        }
        tx.close().unwrap();
    });
}

struct GreeterImpl {}

impl proto::streaming::Greeter for GreeterImpl {
    fn say_hello_bidi(&self, _m: grpc::RequestOptions, stream: grpc::StreamingRequest<proto::HelloRequest>) -> grpc::StreamingResponse<proto::HelloReply> {
        let (tx, rx) = grpc::channel::mpsc::channel(0);
        std::thread::spawn(move || {
            for req in stream.into_iter() {
                let mut r = proto::HelloReply::new();
                r.set_message(format!("Hello, {}!", req.get_name()));
                tx.send((r, WriteFlags::default())).unwrap();
            }
            tx.close().unwrap();
        });
        grpc::StreamingResponse::new(rx)
    }
}

这个示例中,我们创建了一个Greeter服务,并实现了一个say_hello_bidi方法,该方法接收一个StreamingRequest对象,并返回一个StreamingResponse对象。在该方法中,我们使用mpsc::channel方法创建了一个通道,用于传输流数据。然后我们使用std::thread::spawn方法创建了一个线程,该线程会将接收到的请求转换成响应,并通过通道发送给客户端。最后,我们使用StreamingResponse::new方法将通道包装成一个StreamingResponse对象,并将其返回给客户端。

在客户端中,我们使用say_hello_bidi方法向服务器发送请求,并通过into_future方法获取响应。然后我们通过println方法打印了响应的message字段。

进阶用法

使用 tokio

在 Rust 语言 GRPC 模块中,可以使用 tokio 来实现异步 RPC。下面是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, StreamingSink, WriteFlags};

#[tokio::main]
async fn main() {
    let mut server = ServerBuilder::new_plain();
    server.http.set_port(50051);
    server.add_service(proto::greeter_server::GreeterServer::new_service_def(GreeterImpl {}));
    let server = server.build().unwrap();
    server.start();

    let ch = ChannelBuilder::new_plain();
    let client = Client::new(ch);

    let mut req = proto::HelloRequest::new();
    req.set_name("world".to_string());
    let resp = client.say_hello_async(grpc::RequestOptions::new(), req).await.unwrap();
    println!("{}", resp.get_message());
}

struct GreeterImpl {}

impl proto::greeter_server::Greeter for GreeterImpl {
    fn say_hello(&self, _m: grpc::RequestOptions, req: proto::HelloRequest) -> grpc::SingleResponse<proto::HelloReply> {
        let mut r = proto::HelloReply::new();
        r.set_message(format!("Hello, {}!", req.get_name()));
        grpc::SingleResponse::completed(r)
    }
}

这个示例中,我们使用tokio::main宏来创建异步运行时。在服务器和客户端中,我们使用async关键字来定义异步函数。在客户端中,我们使用await关键字来等待异步响应。

tokio 使用流式 RPC

下面是一个使用 tokio 和流式 RPC 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use grpc::{Client, ClientStreamingSink, Server, ServerBuilder, ServerStreamingSink, StreamingSink, WriteFlags};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let mut server = ServerBuilder::new_plain();
    server.http.set_port(50051);
    server.add_service(proto::streaming::create_greeter_server(GreeterImpl {}));
    let server = server.build().unwrap();
    server.start();

    let ch = ChannelBuilder::new_plain();
    let client = Client::new(ch);

    let (mut tx, rx) = mpsc::channel(10);
    let mut stream = client.say_hello_streaming(grpc::RequestOptions::new()).unwrap();
    tokio::spawn(async move {
        while let Some(req) = rx.recv().await {
            stream.send((req, WriteFlags::default())).unwrap();
        }
        stream.close().unwrap();
    });

    let reqs = vec![
        proto::HelloRequest::new(),
        proto::HelloRequest::new(),
        proto::HelloRequest::new(),
    ];
    for req in reqs {
        tx.send(req).await.unwrap();
    }

    for resp in stream.into_stream().await {
        println!("{}", resp.unwrap().get_message());
    }
}

struct GreeterImpl {}

impl proto::streaming::Greeter for GreeterImpl {
    fn say_hello_streaming(&self, _m: grpc::RequestOptions, _stream: grpc::StreamingRequest<proto::HelloRequest>) -> grpc::StreamingResponse<proto::HelloReply> {
        let (tx, rx) = grpc::channel::mpsc::channel(0);
        tokio::spawn(async move {
            for req in _stream.into_async_iter().await {
                let mut r = proto::HelloReply::new();
                r.set_message(format!("Hello, {}!", req.get_name()));
                tx.send((r, WriteFlags::default())).unwrap();
            }
            tx.close().unwrap();
        });
        grpc::StreamingResponse::new(rx)
    }
}

这个示例中,我们使用tokio::sync::mpsc库来创建一个通道,用于传输流数据。在客户端中,我们使用say_hello_streaming方法向服务器发送请求,并将请求通过通道发送给异步任务。在异步任务中,我们使用into_async_iter方法将请求流转换成异步迭代器,并将响应通过通道发送给客户端。在客户端中,我们使用into_stream方法将响应流转换成异步流,并等待响应。

使用 TLS 加密

在 Rust 语言 GRPC 模块中,可以使用 TLS 加密来保护通信安全。下面是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use grpc::{ChannelBuilder, Client};
use rustls::{Certificate, PrivateKey, ServerConfig};
use std::fs::File;
use std::io::BufReader;

fn main() {
    let mut config = ServerConfig::new(rustls::NoClientAuth::new());
    let cert_file = &mut BufReader::new(File::open("server.crt").unwrap());
    let key_file = &mut BufReader::new(File::open("server.key").unwrap());
    let cert_chain = rustls::internal::pemfile::certs(cert_file).unwrap();
    let mut keys = rustls::internal::pemfile::rsa_private_keys(key_file).unwrap();
    config.set_single_cert(cert_chain, keys.remove(0)).unwrap();
    let mut server = grpc_tls::ServerBuilder::new_plain();
    server.http.set_port(50051);
    server.http.set_tls(config);
    server.add_service(proto::greeter_server::GreeterServer::new_service_def(GreeterImpl {}));
    let server = server.build().unwrap();
    server.start();

    let mut config = rustls::ClientConfig::new();
    let cert_file = &mut BufReader::new(File::open("client.crt").unwrap());
    let key_file = &mut BufReader::new(File::open("client.key").unwrap());
    let cert_chain = rustls::internal::pemfile::certs(cert_file).unwrap();
    let mut keys = rustls::internal::pemfile::rsa_private_keys(key_file).unwrap();
    config.set_single_client_cert(cert_chain, keys.remove(0));
    let ch = ChannelBuilder::new_tls().rustls_config(config);
    let client = Client::new(ch);
    let mut req = proto::HelloRequest::new();
    req.set_name("world".to_string());
    let resp = client.say_hello(grpc::RequestOptions::new(), req);
    println!("{}", resp.wait().unwrap().get_message());
}

struct GreeterImpl {}

impl proto::greeter_server::Greeter for GreeterImpl {
    fn say_hello(&self, _m: grpc::RequestOptions, req: proto::HelloRequest) -> grpc::SingleResponse<proto::HelloReply> {
        let mut r = proto::HelloReply::new();
        r.set_message(format!("Hello, {}!", req.get_name()));
        grpc::SingleResponse::completed(r)
    }
}

这个示例中,我们使用rustls库来创建 TLS 配置,并使用grpc_tls::ServerBuilderChannelBuilder::new_tls方法来创建带有 TLS 加密的服务器和客户端。在服务器中,我们使用set_single_cert方法来设置服务器证书和私钥。在客户端中,我们使用set_single_client_cert方法来设置客户端证书和私钥。

总结

本教程介绍了 GRPC 的基础使用方法,并针对 tokio 结合 GRPC 的进阶使用进入入门级的探讨。希望能帮助同学们掌握 Rust 语言 GRPC 的应用。

知识共享许可协议

本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。欢迎转载、使用、重新发布,但务必保留文章署名 TinyZ Zzh (包含链接: https://tinyzzh.github.io ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。 如有任何疑问,请 与我联系 (tinyzzh815@gmail.com)

TinyZ Zzh

TinyZ Zzh

专注于高并发服务器、网络游戏相关(Java、PHP、Unity3D、Unreal Engine等)技术,热爱游戏事业, 正在努力实现自我价值当中。

评论

  点击开始评论...