Skip to main content

Go Paho MQTT Client

Victoria Harbour, Hongkong

I want to create a Go MQTT client that can connect to the INSTAR MQTT Broker on an IN-8015 FHD camera on local IP 192.168.2.77. I want to be able to add topic subscriptions as well as publishing MQTT updates to the broker.

Project initialization

This project uses paho.mqtt.golang as MQTT client library, install:

go mod init go/paho-mqtt
go get github.com/eclipse/paho.mqtt.golang

Connect to the MQTT broker

./src/client/main.go

package main

import (
"fmt"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Topic: %s | %s\n", msg.Topic(), msg.Payload())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %+v", err)
}

func main() {
var broker = "192.168.2.70"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("admin")
opts.SetPassword("instar")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
  • messagePubHandler: global MQTT pub message processing
  • connectHandler: callback for the connection
  • connectLostHandler: callback for connection loss
  • ClientOptions: used to set broker, port, client id, username, password and other options.

Topic Subscription

func sub(client mqtt.Client) {
// Subscribe to the LWT connection status
topic := "cameras/77/status/connection"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to LWT %s", topic)
}

Topic Update

func publish(client mqtt.Client) {
// Go to PTZ preset 2 and return to 1 after 15s
nums := []int{2, 1}
for _, num := range nums {
value := fmt.Sprintf("%d", num)
token := client.Publish("cameras/77/features/ptz/preset/raw", 0, false, value)
token.Wait()
time.Sleep(15 * time.Second)
}
}

Testing

Now bringing it all together:

package main

import (
"fmt"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Topic: %s | %s\n", msg.Topic(), msg.Payload())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %+v", err)
}

func main() {
var broker = "192.168.2.77"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("admin")
opts.SetPassword("instar")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}

sub(client)
publish(client)

client.Disconnect(250)
}

func publish(client mqtt.Client) {
// Go to PTZ preset 2 and return to 1 after 15s
nums := []int{2, 1}
for _, num := range nums {
value := fmt.Sprintf("%d", num)
token := client.Publish("cameras/77/features/ptz/preset/raw", 0, false, value)
token.Wait()
time.Sleep(15 * time.Second)
}
}

func sub(client mqtt.Client) {
// Subscribe to the LWT connection status
topic := "cameras/77/status/connection"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Println("Subscribed to LWT", topic)
}

Running the Program

go run .\main.go
Connected
Subscribed to LWT cameras/77/status/connection
Topic: cameras/77/status/connection | {"val":"online"}

The program will pan your camera to preset position 2, return it to preset 1 and then stop running.