Rust语言从入门到精通系列 - 物联网消息传输协议MQTT(入门)

1528 字
8 分钟
Rust语言从入门到精通系列 - 物联网消息传输协议MQTT(入门)

MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,用于在低带宽和不稳定的网络环境中传输消息。MQTT协议基于发布/订阅模式,包含了许多特性,如QoS,保留消息,遗嘱消息等,使得它非常适合物联网设备之间的通信。

Rust是一种系统级编程语言,具有内存安全和高性能的特性。Rust语言的主要目标是提供一种安全、并发、实用的编程语言,使得开发者可以轻松地编写高性能的系统级应用程序。本教程将介绍如何使用Rust语言和rumqttc模块来实现MQTT协议的基础应用和进阶应用。

rumqttc模块简介#

rumqttc是一个基于Rust语言实现的MQTT客户端库,它提供了连接MQTT服务器、订阅主题、发布消息等基本功能,并支持TLS加密连接。rumqttc的API简单易用,适合初学者和中级开发者使用。

在Cargo.toml文件中添加rumqtt模块依赖, 示例配置如下:

[dependencies]
rumqttc = "0.21.0"

应用实践#

连接MQTT服务器#

使用rumqttc连接MQTT服务器非常简单,只需要指定服务器地址和端口号即可。以下是一个连接到本地MQTT服务器的示例代码:

use rumqttc::{Client, MqttOptions};
fn main() {
let mqtt_options = MqttOptions::new("test-1", "localhost", 1883);
let (mut client, _) = Client::new(mqtt_options, 10);
client
.connect()
.expect("Failed to connect to MQTT server");
// ...
}

其中,test-1是客户端ID,可以自行定义。10是消息队列的大小,表示可以同时处理的未确认消息数量。

订阅主题#

订阅MQTT主题可以接收来自其他客户端的消息。使用rumqttc订阅主题也非常简单,只需要指定主题名称和消息处理函数即可。以下是一个订阅主题的示例代码:

use rumqttc::{Client, MqttOptions, QoS};
fn main() {
let mqtt_options = MqttOptions::new("test-2", "localhost", 1883);
let (mut client, mut connection) = Client::new(mqtt_options, 10);
client
.subscribe("test/topic", QoS::AtLeastOnce)
.expect("Failed to subscribe to topic");
for message in connection.iter() {
if let Ok(message) = message {
println!("{}", message.payload_str());
}
}
}

其中,test/topic是要订阅的主题名称,QoS::AtLeastOnce表示消息至少被处理一次,即使出现网络故障或客户端宕机也不会丢失。connection.iter()返回一个迭代器,可以用来不断接收来自服务器的消息。

发布消息#

发布MQTT消息可以向其他客户端发送数据。使用rumqttc发布消息也非常简单,只需要指定主题名称和消息内容即可。以下是一个发布消息的示例代码:

use rumqttc::{Client, MqttOptions, QoS, ReconnectOptions, Transport};
fn main() {
let mqtt_options = MqttOptions::new("test-3", "localhost", 1883);
let (mut client, mut connection) = Client::new(mqtt_options, 10);
let reconnection_options = ReconnectOptions::Always(10);
client
.publish("test/topic", QoS::AtLeastOnce, false, "Hello, world!")
.expect("Failed to publish message");
}

其中,"Hello, world!"是要发送的消息内容,可以是字符串、字节流或其他数据类型。

保留消息#

这个示例演示如何使用rumqttc模块发送和接收保留消息。

use rumqttc::{Client, MqttOptions, QoS};
fn main() {
let mqtt_options = MqttOptions::new("test-retain", "localhost", 1883);
let (mut client, _) = Client::new(mqtt_options, 10);
client
.publish("test/topic", QoS::AtLeastOnce, true, "hello world".to_owned())
.unwrap();
// 注意:rumqttc 没有提供 get_retained() 方法。
// 订阅者订阅主题时会自动收到该主题的保留消息,
// 无需主动获取。
}

这个示例中,我们创建了一个MQTT客户端,连接到本地的MQTT服务器,然后发送了一条保留消息到test/topic主题。在调用publish方法时,我们指定了消息的QoS为AtLeastOnce,表示消息至少要被传输一次,但不保证只传输一次。第三个参数true表示该消息为保留消息。

订阅者订阅该主题时,MQTT服务器会自动推送最新的保留消息给订阅者,无需主动调用方法获取。

断开连接#

使用完rumqttc后,需要手动断开与MQTT服务器的连接。以下是一个断开连接的示例代码:

use rumqttc::{Client, MqttOptions};
fn main() {
let mqtt_options = MqttOptions::new("test-4", "localhost", 1883);
let (mut client, _) = Client::new(mqtt_options, 10);
client
.connect()
.expect("Failed to connect to MQTT server");
// ...
client.disconnect().expect("Failed to disconnect from MQTT server");
}

使用Last Will和Testament#

这个示例演示如何使用rumqttc模块设置Last Will和Testament。

use rumqttc::{Client, LastWill, MqttOptions, QoS};
fn main() {
let mut mqtt_options = MqttOptions::new("test-lwt", "localhost", 1883);
let last_will = LastWill::new("test/topic", QoS::AtLeastOnce, "offline".to_owned());
mqtt_options.set_last_will(last_will);
let (mut client, _) = Client::new(mqtt_options, 10);
// Do something here
}

这个示例中,我们创建了一个MQTT客户端,连接到本地的MQTT服务器。然后我们使用LastWill::new方法创建了一个Last Will和Testament,指定了主题、QoS和消息内容。注意:Last Will必须在创建Client之前通过mqtt_options.set_last_will()方法设置,而不是在Client创建之后调用。

TLS加密连接#

为了保护MQTT通信的安全性,可以使用TLS加密连接。使用rumqttc实现TLS加密连接也非常简单,只需要指定证书和私钥即可。以下是一个使用TLS加密连接的示例代码:

use std::fs::File;
use std::io::BufReader;
use std::path::Path;
use rumqttc::{Client, MqttOptions};
use rumqttc::tokio_rustls::rustls::{ClientConfig, RootCertStore};
fn main() {
let mut mqtt_options = MqttOptions::new("test-5", "localhost", 8883);
let mut root_store = RootCertStore::empty();
let cert_file = &mut BufReader::new(File::open(Path::new("ca.crt")).unwrap());
root_store.add_pem_file(cert_file).unwrap();
let config = ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
mqtt_options.set_tls(config);
let (mut client, _) = Client::new(mqtt_options, 10);
client
.connect()
.expect("Failed to connect to MQTT server");
// ...
}

其中,ca.crt是CA证书。TLS配置必须在创建Client之前通过mqtt_options.set_tls()方法设置,使用rustls的ClientConfig::builder()模式构建TLS配置。

多线程处理消息#

使用多线程可以提高消息处理的效率和并发性。以下是一个使用多线程处理消息的示例代码:

use std::thread;
use rumqttc::{Client, MqttOptions, QoS};
fn main() {
let mqtt_options = MqttOptions::new("test-6", "localhost", 1883);
let (mut client, mut connection) = Client::new(mqtt_options, 10);
client
.subscribe("test/topic", QoS::AtLeastOnce)
.expect("Failed to subscribe to topic");
for message in connection.iter() {
if let Ok(message) = message {
let payload = message.payload.clone();
let topic = message.topic.clone();
thread::spawn(move || {
println!("Received message: {} from topic: {}", payload, topic);
});
}
}
}

其中,thread::spawn()创建一个新线程来处理消息,可以使用闭包来捕获消息的内容。

总结#

rumqttc模块是一个非常方便的MQTT客户端库,它提供了一系列API,可以方便地实现MQTT协议的功能。本教程提供了几个常见的基础应用示例,希望对您有所帮助。

支持与分享

如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!

赞助
Rust语言从入门到精通系列 - 物联网消息传输协议MQTT(入门)
https://tinyzzh.github.io/posts/2023-05-16-rust_lang_tutorial_235_mqtt_basic/
作者
TinyZ Zzh
发布于
2023-05-16
许可协议
CC BY-NC-SA 4.0

评论区

Profile Image of the Author
TinyZ Zzh
专注于高并发服务器、网络游戏相关(Java、PHP、Unity3D、Unreal Engine等)技术,热爱游戏事业, 正在努力实现自我价值当中。
公告
欢迎来到我的博客!这是一则示例公告。
音乐
封面

音乐

暂未播放

0:00 0:00
暂无歌词
分类
标签
站点统计
文章
211
分类
38
标签
200
总字数
337,853
运行时长
0
最后活动
0 天前

文章目录