Skip to main content

MQTT Clients

Guangzhou, China

Golang

Project Setup

echo $GOPATH
/home/myuser/go
mkdir /home/myuser/go/src/go-mqtt && cd /home/myuser/go/src/go-mqtt
go mod init go-mqtt
go get github.com/eclipse/paho.mqtt.golang@latest
cat go.mod

module go-mqtt

go 1.18

require (
github.com/eclipse/paho.mqtt.golang v1.3.5 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect
)

Connect the Client

The first step is to configure the client with the broker address, port, client id, username, etc. This is done in Paho MQTT via mqtt.NewClientOptions. I am going to set three callbacks:

  • OnConnect: called when the client is connected to the server.
  • OnConnectionLost: called when the client is disconnected from the server.
  • MessageHandler: called when a message is received from the server.

The MQTT Broker I am using is an INSTAR IP camera on the local IP 192.168.2.117:

MQTT Clients

nano /home/myuser/go/src/go-mqtt/main.go
package main

import (
"fmt"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("%s received on topic %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}

var connectionLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connection Lost: %s\n", err.Error())
}

var broker = "192.168.2.117"
var port = 1883
var now = time.Now().Format(time.RFC850)

func main() {
// client configuration
options := mqtt.NewClientOptions()
options.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
options.SetClientID("go_mqtt_client")
options.SetUsername("admin")
options.SetPassword("instar")

// mqtt event callbacks
options.SetDefaultPublishHandler(messagePubHandler)
options.OnConnect = connectHandler
options.OnConnectionLost = connectionLostHandler
}

To connect call client.Connect(), wait for the connection and handle potential errors:

  // connect the client
client := mqtt.NewClient(options)
token := client.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error())
}

Subscribe to a Topic

To subscribe, call the method client.Subscribe with three parameters:

  • topic: string with the subscription topic
  • qos: 0 (fire-and-forget), 1 (re-send if missed) or 2 (make sure it is only received once)
  • callback: a function to be called when a message in this topic is received. It can be nil so only the default handler will be called
  // subscribe topic
topic := "client/go/timestamp"
token = client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Println("Subscribed to topic %s\n", topic)

Publish to a Topic

To publish a message, call the method client.Publish. It receives four parameters:

  • topic: same topic as before - I am going to send a timestamp for 10-times before disconnecting
  • qos: 0 (fire-and-forget), 1 (re-send if missed) or 2 (make sure it is only received once)
  • retained: boolean indicating whether the message must be retained by the server
  • payload: message to be publish under the topic
  num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message: %s", now)
token = client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}

client.Disconnect(100)

MQTT Clients

Encryption

Add tls.Config with your broker certificates:

func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
cert, err := ioutil.ReadFile("cert/pcert.pem")
if err != nil {
log.Fatalln(err.Error())
}

certpool.AppendCertsFromPEM(cert)
return &tls.Config{
InsecureSkipVerify: true,
RootCAs: certpool,
}
}

To use your INSTAR IP cameras broker with encryption, click on Download Camera's certificate in the MQTT configuration (s. first screenshot above) and download it to cert/. Or use the client.crt generated here.

E.g.:

-----BEGIN CERTIFICATE-----
MIIEAzCCAuugAwIBAgIUBY1hlCGvdj4NhBXkZ/uLUZNILAwwDQYJKoZIhvcNAQEL
BQAwgZAxCzAJBgNVBAYTAkdCMRcwFQYDVQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwG
A1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1vc3F1aXR0bzELMAkGA1UECwwCQ0ExFjAU
BgNVBAMMDW1vc3F1aXR0by5vcmcxHzAdBgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hv
by5vcmcwHhcNMjAwNjA5MTEwNjM5WhcNMzAwNjA3MTEwNjM5WjCBkDELMAkGA1UE
BhMCR0IxFzAVBgNVBAgMDlVuaXRlZCBLaW5nZG9tMQ4wDAYDVQQHDAVEZXJieTES
MBAGA1UECgwJTW9zcXVpdHRvMQswCQYDVQQLDAJDQTEWMBQGA1UEAwwNbW9zcXVp
dHRvLm9yZzEfMB0GCSqGSIb3DQEJARYQcm9nZXJAYXRjaG9vLm9yZzCCASIwDQYJ
KoZIhvcNAQEBBQADggEPADCCAQoCggEBAME0HKmIzfTOwkKLT3THHe+ObdizamPg
UZmD64Tf3zJdNeYGYn4CEXbyP6fy3tWc8S2boW6dzrH8SdFf9uo320GJA9B7U1FW
Te3xda/Lm3JFfaHjkWw7jBwcauQZjpGINHapHRlpiCZsquAthOgxW9SgDgYlGzEA
s06pkEFiMw+qDfLo/sxFKB6vQlFekMeCymjLCbNwPJyqyhFmPWwio/PDMruBTzPH
3cioBnrJWKXc3OjXdLGFJOfj7pP0j/dr2LH72eSvv3PQQFl90CZPFhrCUcRHSSxo
E6yjGOdnz7f6PveLIB574kQORwt8ePn0yidrTC1ictikED3nHYhMUOUCAwEAAaNT
MFEwHQYDVR0OBBYEFPVV6xBUFPiGKDyo5V3+Hbh4N9YSMB8GA1UdIwQYMBaAFPVV
6xBUFPiGKDyo5V3+Hbh4N9YSMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL
BQADggEBAGa9kS21N70ThM6/Hj9D7mbVxKLBjVWe2TPsGfbl3rEDfZ+OKRZ2j6AC
6r7jb4TZO3dzF2p6dgbrlU71Y/4K0TdzIjRj3cQ3KSm41JvUQ0hZ/c04iGDg/xWf
+pp58nfPAYwuerruPNWmlStWAXf0UTqRtg4hQDWBuUFDJTuWuuBvEXudz74eh/wK
sMwfu1HFvjy5Z0iMDU8PUDepjVolOCue9ashlS4EB5IECdSR2TItnAIiIwimx839
LdUdRudafMu5T5Xma182OC0/u/xRlEm+tvKGGmfFcN0piqVl8OrSPBgIlb+1IKJE
m/XriWr/Cq4h/JfB7NTsezVslgkBaoU=
-----END CERTIFICATE-----

And add this configuration to the options by the method SetTLSConfig:

	// client configuration
tlsConfig := NewTlsConfig()
opts := mqtt.NewClientOptions()
opts.SetTLSConfig(tlsConfig)
// use tcp:// instead of ssl:// for none-tls
// opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.AddBroker(fmt.Sprintf("ssl://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("admin")
opts.SetPassword("instar")

Full Code Example (no-TLS)

package main

import (
"fmt"
"log"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

var port = 1883
var broker = "192.168.2.117"
var topic = "client/go/timestamp"
var now = time.Now().Format(time.RFC850)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("%s received on topic %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}

var connectionLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connection Lost: %s\n", err.Error())
}

func main() {
// client configuration
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("admin")
opts.SetPassword("instar")

// mqtt event callbacks
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectionLostHandler

// connect the client
client := mqtt.NewClient(opts)
token := client.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error())
}

subscribe(client)
publish(client)

client.Disconnect(100)
}

// subscribe to topic
func subscribe(client mqtt.Client) {
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic %s\n", topic)
}

// publish to topic
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %s", now)
token := client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}

Full Code Example (self-signed TLS Certificate)

package main

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

var port = 8883
var broker = "192.168.2.117"
var topic = "client/go/timestamp"
var clientID = "go_mqtt_client"
var username = "admin"
var password = "instar"
var certificate = "cert/pcert.pem"
var now = time.Now().Format(time.RFC850)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("%s received on topic %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}

var connectionLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connection Lost: %s\n", err.Error())
}

func main() {
// client configuration
tlsConfig := NewTlsConfig()
opts := mqtt.NewClientOptions()
opts.SetTLSConfig(tlsConfig)
opts.AddBroker(fmt.Sprintf("ssl://%s:%d", broker, port))
opts.SetClientID(clientID)
opts.SetUsername(username)
opts.SetPassword(password)

// mqtt event callbacks
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectionLostHandler

// connect the client
client := mqtt.NewClient(opts)
token := client.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error())
}

subscribe(client)
publish(client)

client.Disconnect(100)
}

// subscribe to topic
func subscribe(client mqtt.Client) {
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic %s\n", topic)
}

// publish to topic
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %s", now)
token := client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}

// self-signed Cert
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
cert, err := ioutil.ReadFile(certificate)
if err != nil {
log.Fatalln(err.Error())
}

certpool.AppendCertsFromPEM(cert)
return &tls.Config{
InsecureSkipVerify: true,
RootCAs: certpool,
}
}

Full Code Example (CA TLS Certificate)

package main

import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"time"

MQTT "github.com/eclipse/paho.mqtt.golang"
)

var port = 1888
var portSSL = 8883
var broker = "192.168.2.111"
var topic = "go/client/hi"
var clientID = "go_mqtt_client"
var username = "admin"
var password = "instar"
var caCert = "cert/instar/ca.crt"
var clientCert = "cert/instar/client.crt"
var clientKey = "cert/instar/client.key"
var now = time.Now().Format(time.RFC850)

func NewTLSConfig() *tls.Config {
// Import trusted certificates from CAfile.pem.
// Alternatively, manually add CA certificates to
// default openssl CA bundle.
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile(caCert)
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}

// Import client certificate/key pair
cert, err := tls.LoadX509KeyPair(clientCert, clientKey)
if err != nil {
panic(err)
}

// Just to print out the client certificate..
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
panic(err)
}
// fmt.Println(cert.Leaf)

// Create tls.Config with desired tls properties
return &tls.Config{
// RootCAs = certs used to verify server cert.
RootCAs: certpool,
// ClientAuth = whether to request cert from server.
// Since the server is set up for SSL, this happens
// anyways.
ClientAuth: tls.NoClientCert,
// ClientCAs = certs used to validate client cert.
ClientCAs: nil,
// InsecureSkipVerify = verify that cert contents
// match server. IP matches what is in cert etc.
InsecureSkipVerify: true,
// Certificates = list of certs client sends to server.
Certificates: []tls.Certificate{cert},
}
}

var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}

func main() {
tlsconfig := NewTLSConfig()

opts := MQTT.NewClientOptions()
// opts.AddBroker(fmt.Sprintf("mqtt://%s:%d", broker, port))
opts.AddBroker(fmt.Sprintf("mqtts://%s:%d", broker, portSSL))
opts.SetClientID(clientID).SetTLSConfig(tlsconfig)
opts.SetDefaultPublishHandler(f)
opts.SetUsername(username)
opts.SetPassword(password)

// Start the connection
c := MQTT.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}

c.Subscribe(topic, 0, nil)

i := 0
for range time.Tick(time.Duration(1) * time.Second) {
if i == 5 {
break
}
text := fmt.Sprintf("%d Timestamp: %s", i, now)
c.Publish(topic, 0, false, text)
i++
}

c.Disconnect(250)
}