Skip to main content

Rust - MQTT Hello World

Guangzhou, China

Rust is a multi-paradigm programming language designed for performance and safety, especially safe concurrency. Rust is syntactically similar to C++, but can guarantee memory safety by using a borrow checker to validate references. Rust achieves memory safety without garbage collection, and reference counting is optional.

Rust Up

Install Rust using rustup. To download Rustup and install Rust, run the following in your terminal, then follow the on-screen instructions:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

In the Rust development environment, all tools are installed to the ~/.cargo/bin directory, and this is where you will find the Rust toolchain, including rustc, cargo, and rustup:

rustc --version
rustc 1.63.0 (4b91a6ea7 2022-08-08)

You can update your installation by running:

rustup update
stable-x86_64-unknown-linux-gnu unchanged - rustc 1.63.0 (4b91a6ea7 2022-08-08)

If at any point you would like to uninstall Rust, you can run:

rustup self uninstall

Hello World

This is the source code of the traditional Hello World program.

hello.rs

// This is the main function
fn main() {
// Statements here are executed when the compiled binary is called

// Print text to the console
println!("Hello World!");
}

A binary can be generated using the Rust compiler rustc:

rustc hello.rs

rustc will produce a hello binary that can be executed.

./hello
Hello World!

Rusty MQTT

FAIL: I don't know how to add a user login and the connection to my broker fails. The official documentation does not mention username/password ¯\(ツ)/¯ I will try using Go instead.

Now I want to use the paho-mqtt client library in a Rust project, and implement connect, subscribe, messaging and unsubscribe, etc., between the client and MQTT broker.

Initialisation the Project

paho-mqtt is the most versatile and widely used MQTT client in the current Rust. The current latest version 0.11.1 supports MQTT v5, 3.1.1, 3.1, and also supports data transfer via standard TCP, SSL / TLS, WebSockets, and QoS support 0, 1, 2, etc:

cargo new mqtt-example
cd mqtt-example
nano Cargo.toml

Edit the Cargo.toml file in the project, and add the address of the paho-mqtt library to the dependencies and specify the binary file corresponding to the subscribe, publish code file:

[package]
name = "mqtt-example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git", branch = "master" }

[[bin]]
name = "sub"
path = "src/sub/main.rs"

[[bin]]
name = "pub"
path = "src/pub/main.rs"

MQTT Subscription

I am going to use an INSTAR MQTT Camera as my MQTTv5 Broker:

  • Broker IP: 192.168.2.115
  • MQTT Service Port: 1883
  • Broker Login: admin/instar

Rust - MQTT Hello World

src/sub/main.rs

use std::{
env,
process,
thread,
time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://192.168.2.115:1883";
const DFLT_CLIENT:&str = "rust_subscribe";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// The qos list that match topics above.
const DFLT_QOS:&[i32] = &[0, 1];

// Reconnect to the broker when connection is lost.
fn try_reconnect(cli: &mqtt::Client) -> bool
{
println!("Connection lost. Waiting to retry connection");
for _ in 0..12 {
thread::sleep(Duration::from_millis(5000));
if cli.reconnect().is_ok() {
println!("Successfully reconnected");
return true;
}
}
println!("Unable to reconnect after several attempts.");
false
}

// Subscribes to multiple topics.
fn subscribe_topics(cli: &mqtt::Client) {
if let Err(e) = cli.subscribe_many(DFLT_TOPICS, DFLT_QOS) {
println!("Error subscribes topics: {:?}", e);
process::exit(1);
}
}

fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);

// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();

// Create a client.
let mut cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});

// Initialize the consumer before connecting.
let rx = cli.start_consuming();

// Define the set of options for the connection.
let lwt = mqtt::MessageBuilder::new()
.topic("test")
.payload("Consumer lost connection")
.finalize();
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(false)
.will_message(lwt)
.finalize();

// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}

// Subscribe topics.
subscribe_topics(&cli);

println!("Processing requests...");
for msg in rx.iter() {
if let Some(msg) = msg {
println!("{}", msg);
}
else if !cli.is_connected() {
if try_reconnect(&cli) {
println!("Resubscribe topics...");
subscribe_topics(&cli);
} else {
break;
}
}
}

// If still connected, then disconnect now.
if cli.is_connected() {
println!("Disconnecting");
cli.unsubscribe_many(DFLT_TOPICS).unwrap();
cli.disconnect(None).unwrap();
}
println!("Exiting");
}

MQTT Publication

src/pub/main.rs

use std::{
env,
process,
time::Duration
};

extern crate paho_mqtt as mqtt;

const DFLT_BROKER:&str = "tcp://192.168.2.115:1883";
const DFLT_CLIENT:&str = "rust_publish";
const DFLT_TOPICS:&[&str] = &["rust/mqtt", "rust/test"];
// Define the qos.
const QOS:i32 = 1;

fn main() {
let host = env::args().nth(1).unwrap_or_else(||
DFLT_BROKER.to_string()
);

// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id(DFLT_CLIENT.to_string())
.finalize();

// Create a client.
let cli = mqtt::Client::new(create_opts).unwrap_or_else(|err| {
println!("Error creating the client: {:?}", err);
process::exit(1);
});

// Define the set of options for the connection.
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();

// Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e);
process::exit(1);
}

// Create a message and publish it.
// Publish message to 'test' and 'hello' topics.
for num in 0..5 {
let content = "Hello world! ".to_string() + &num.to_string();
let mut msg = mqtt::Message::new(DFLT_TOPICS[0], content.clone(), QOS);
if num % 2 == 0 {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[1]);
msg = mqtt::Message::new(DFLT_TOPICS[1], content.clone(), QOS);
} else {
println!("Publishing messages on the {:?} topic", DFLT_TOPICS[0]);
}
let tok = cli.publish(msg);

if let Err(e) = tok {
println!("Error sending message: {:?}", e);
break;
}
}


// Disconnect from the broker.
let tok = cli.disconnect(None);
println!("Disconnect from the broker");
tok.unwrap();
}

Compile Binary Files

The following command generates the sub, pub binary file in the mqtt-example/target/debug directory:

cargo build
Finished dev [unoptimized + debuginfo] target(s) in 0.07s

Execute the sub binary file and wait for the message to be published.

./sub
Unable to connect:
Paho(-1)

Executing the pub binary file, you can see that messages have been published to the topics rust/test and rust/mqtt, respectively.

./pub
Unable to connect:
Paho(-1)