MQTT Clients
Golang
Project Setup
echo $GOPATH
/home/myuser/go
mkdir /home/myuser/go/src/go-mqtt && cd /home/myuser/go/src/go-mqtt
go mod init go-mqtt
go get github.com/eclipse/paho.mqtt.golang@latest
cat go.mod
module go-mqtt
go 1.18
require (
github.com/eclipse/paho.mqtt.golang v1.3.5 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect
)
Connect the Client
The first step is to configure the client with the broker address, port, client id, username, etc. This is done in Paho MQTT via mqtt.NewClientOptions
. I am going to set three callbacks:
OnConnect
: called when the client is connected to the server.OnConnectionLost
: called when the client is disconnected from the server.MessageHandler
: called when a message is received from the server.
The MQTT Broker I am using is an INSTAR IP camera on the local IP 192.168.2.117
:
nano /home/myuser/go/src/go-mqtt/main.go
package main
import (
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("%s received on topic %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectionLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connection Lost: %s\n", err.Error())
}
var broker = "192.168.2.117"
var port = 1883
var now = time.Now().Format(time.RFC850)
func main() {
// client configuration
options := mqtt.NewClientOptions()
options.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
options.SetClientID("go_mqtt_client")
options.SetUsername("admin")
options.SetPassword("instar")
// mqtt event callbacks
options.SetDefaultPublishHandler(messagePubHandler)
options.OnConnect = connectHandler
options.OnConnectionLost = connectionLostHandler
}
To connect call client.Connect()
, wait for the connection and handle potential errors:
// connect the client
client := mqtt.NewClient(options)
token := client.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error())
}
Subscribe to a Topic
To subscribe, call the method client.Subscribe
with three parameters:
topic
: string with the subscription topicqos
:0
(fire-and-forget),1
(re-send if missed) or2
(make sure it is only received once)callback
: a function to be called when a message in this topic is received. It can be nil so only the default handler will be called
// subscribe topic
topic := "client/go/timestamp"
token = client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Println("Subscribed to topic %s\n", topic)
Publish to a Topic
To publish a message, call the method client.Publish
. It receives four parameters:
topic
: same topic as before - I am going to send a timestamp for 10-times before disconnectingqos
:0
(fire-and-forget),1
(re-send if missed) or2
(make sure it is only received once)retained
: boolean indicating whether the message must be retained by the serverpayload
: message to be publish under the topic
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message: %s", now)
token = client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
client.Disconnect(100)
Encryption
Add tls.Config
with your broker certificates:
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
cert, err := ioutil.ReadFile("cert/pcert.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(cert)
return &tls.Config{
InsecureSkipVerify: true,
RootCAs: certpool,
}
}
To use your INSTAR IP cameras broker with encryption, click on Download Camera's certificate in the MQTT configuration (s. first screenshot above) and download it to cert/
. Or use the client.crt generated here.
E.g.:
-----BEGIN CERTIFICATE-----
MIIEAzCCAuugAwIBAgIUBY1hlCGvdj4NhBXkZ/uLUZNILAwwDQYJKoZIhvcNAQEL
BQAwgZAxCzAJBgNVBAYTAkdCMRcwFQYDVQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwG
A1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1vc3F1aXR0bzELMAkGA1UECwwCQ0ExFjAU
BgNVBAMMDW1vc3F1aXR0by5vcmcxHzAdBgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hv
by5vcmcwHhcNMjAwNjA5MTEwNjM5WhcNMzAwNjA3MTEwNjM5WjCBkDELMAkGA1UE
BhMCR0IxFzAVBgNVBAgMDlVuaXRlZCBLaW5nZG9tMQ4wDAYDVQQHDAVEZXJieTES
MBAGA1UECgwJTW9zcXVpdHRvMQswCQYDVQQLDAJDQTEWMBQGA1UEAwwNbW9zcXVp
dHRvLm9yZzEfMB0GCSqGSIb3DQEJARYQcm9nZXJAYXRjaG9vLm9yZzCCASIwDQYJ
KoZIhvcNAQEBBQADggEPADCCAQoCggEBAME0HKmIzfTOwkKLT3THHe+ObdizamPg
UZmD64Tf3zJdNeYGYn4CEXbyP6fy3tWc8S2boW6dzrH8SdFf9uo320GJA9B7U1FW
Te3xda/Lm3JFfaHjkWw7jBwcauQZjpGINHapHRlpiCZsquAthOgxW9SgDgYlGzEA
s06pkEFiMw+qDfLo/sxFKB6vQlFekMeCymjLCbNwPJyqyhFmPWwio/PDMruBTzPH
3cioBnrJWKXc3OjXdLGFJOfj7pP0j/dr2LH72eSvv3PQQFl90CZPFhrCUcRHSSxo
E6yjGOdnz7f6PveLIB574kQORwt8ePn0yidrTC1ictikED3nHYhMUOUCAwEAAaNT
MFEwHQYDVR0OBBYEFPVV6xBUFPiGKDyo5V3+Hbh4N9YSMB8GA1UdIwQYMBaAFPVV
6xBUFPiGKDyo5V3+Hbh4N9YSMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL
BQADggEBAGa9kS21N70ThM6/Hj9D7mbVxKLBjVWe2TPsGfbl3rEDfZ+OKRZ2j6AC
6r7jb4TZO3dzF2p6dgbrlU71Y/4K0TdzIjRj3cQ3KSm41JvUQ0hZ/c04iGDg/xWf
+pp58nfPAYwuerruPNWmlStWAXf0UTqRtg4hQDWBuUFDJTuWuuBvEXudz74eh/wK
sMwfu1HFvjy5Z0iMDU8PUDepjVolOCue9ashlS4EB5IECdSR2TItnAIiIwimx839
LdUdRudafMu5T5Xma182OC0/u/xRlEm+tvKGGmfFcN0piqVl8OrSPBgIlb+1IKJE
m/XriWr/Cq4h/JfB7NTsezVslgkBaoU=
-----END CERTIFICATE-----
And add this configuration to the options by the method SetTLSConfig:
// client configuration
tlsConfig := NewTlsConfig()
opts := mqtt.NewClientOptions()
opts.SetTLSConfig(tlsConfig)
// use tcp:// instead of ssl:// for none-tls
// opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.AddBroker(fmt.Sprintf("ssl://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("admin")
opts.SetPassword("instar")
Full Code Example (no-TLS)
package main
import (
"fmt"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var port = 1883
var broker = "192.168.2.117"
var topic = "client/go/timestamp"
var now = time.Now().Format(time.RFC850)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("%s received on topic %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectionLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connection Lost: %s\n", err.Error())
}
func main() {
// client configuration
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("admin")
opts.SetPassword("instar")
// mqtt event callbacks
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectionLostHandler
// connect the client
client := mqtt.NewClient(opts)
token := client.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error())
}
subscribe(client)
publish(client)
client.Disconnect(100)
}
// subscribe to topic
func subscribe(client mqtt.Client) {
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic %s\n", topic)
}
// publish to topic
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %s", now)
token := client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
Full Code Example (self-signed TLS Certificate)
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
var port = 8883
var broker = "192.168.2.117"
var topic = "client/go/timestamp"
var clientID = "go_mqtt_client"
var username = "admin"
var password = "instar"
var certificate = "cert/pcert.pem"
var now = time.Now().Format(time.RFC850)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("%s received on topic %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectionLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connection Lost: %s\n", err.Error())
}
func main() {
// client configuration
tlsConfig := NewTlsConfig()
opts := mqtt.NewClientOptions()
opts.SetTLSConfig(tlsConfig)
opts.AddBroker(fmt.Sprintf("ssl://%s:%d", broker, port))
opts.SetClientID(clientID)
opts.SetUsername(username)
opts.SetPassword(password)
// mqtt event callbacks
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectionLostHandler
// connect the client
client := mqtt.NewClient(opts)
token := client.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error())
}
subscribe(client)
publish(client)
client.Disconnect(100)
}
// subscribe to topic
func subscribe(client mqtt.Client) {
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic %s\n", topic)
}
// publish to topic
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %s", now)
token := client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
// self-signed Cert
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
cert, err := ioutil.ReadFile(certificate)
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(cert)
return &tls.Config{
InsecureSkipVerify: true,
RootCAs: certpool,
}
}
Full Code Example (CA TLS Certificate)
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var port = 1888
var portSSL = 8883
var broker = "192.168.2.111"
var topic = "go/client/hi"
var clientID = "go_mqtt_client"
var username = "admin"
var password = "instar"
var caCert = "cert/instar/ca.crt"
var clientCert = "cert/instar/client.crt"
var clientKey = "cert/instar/client.key"
var now = time.Now().Format(time.RFC850)
func NewTLSConfig() *tls.Config {
// Import trusted certificates from CAfile.pem.
// Alternatively, manually add CA certificates to
// default openssl CA bundle.
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile(caCert)
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}
// Import client certificate/key pair
cert, err := tls.LoadX509KeyPair(clientCert, clientKey)
if err != nil {
panic(err)
}
// Just to print out the client certificate..
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
panic(err)
}
// fmt.Println(cert.Leaf)
// Create tls.Config with desired tls properties
return &tls.Config{
// RootCAs = certs used to verify server cert.
RootCAs: certpool,
// ClientAuth = whether to request cert from server.
// Since the server is set up for SSL, this happens
// anyways.
ClientAuth: tls.NoClientCert,
// ClientCAs = certs used to validate client cert.
ClientCAs: nil,
// InsecureSkipVerify = verify that cert contents
// match server. IP matches what is in cert etc.
InsecureSkipVerify: true,
// Certificates = list of certs client sends to server.
Certificates: []tls.Certificate{cert},
}
}
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
func main() {
tlsconfig := NewTLSConfig()
opts := MQTT.NewClientOptions()
// opts.AddBroker(fmt.Sprintf("mqtt://%s:%d", broker, port))
opts.AddBroker(fmt.Sprintf("mqtts://%s:%d", broker, portSSL))
opts.SetClientID(clientID).SetTLSConfig(tlsconfig)
opts.SetDefaultPublishHandler(f)
opts.SetUsername(username)
opts.SetPassword(password)
// Start the connection
c := MQTT.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
c.Subscribe(topic, 0, nil)
i := 0
for range time.Tick(time.Duration(1) * time.Second) {
if i == 5 {
break
}
text := fmt.Sprintf("%d Timestamp: %s", i, now)
c.Publish(topic, 0, false, text)
i++
}
c.Disconnect(250)
}