Skip to main content

MQTT Message Server and Angular Frontend

Guangzhou, China

Basic Setup

Msg Server

git clone https://github.com/mpolinowski/mqtt-msg
cd mqtt-msg
npm install

Database

docker pull mongo:4.4.14
docker pull mongo-express:1.0.0-alpha.4

docker-compose.yml

version: '3.8'

services:

mongodb:
image: mongo:4.4.14
container_name: mongodb
restart: unless-stopped
ports:
- 27017:27017
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
volumes:
- type: bind
source: /opt/mongo/db
target: /data/db
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongo mongodb:27017/test --quiet
interval: 30s
timeout: 10s
retries: 3
networks:
- mongodb_network

mongo-express:
image: mongo-express:1.0.0-alpha.4
container_name: mongo-express
restart: unless-stopped
ports:
- 8081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
ME_CONFIG_MONGODB_URL: mongodb://root:example@mongodb:27017/
ME_CONFIG_BASICAUTH_USERNAME: admin
ME_CONFIG_BASICAUTH_PASSWORD: example
depends_on:
- mongodb
networks:
- mongodb_network

networks:
mongodb_network:
name: mongodb_network

Create directories for data storage:

mkdir -p /opt/mongo/{db,configdb}

Open ports if application runs on a different server and start database:

ufw allow 8081/tcp
ufw allow 27017/tcp
ufw reload
ufw status
docker-compose up -d
docker ps -a

And login to the Mongo-Express web frontend on your server IP and port 8081 with your basicAuth password (s. docker-compose.yml).

Connections

Connecting to your MQTT Broker

We will use the mqtt NPM package (npm install mqtt) to connect to the MQTT broker running on an INSTAR WQHD Camera with the following options:

// INSTAR MQTT Broker setup
const {connect} = require('mqtt')
const brokerURL = 'mqtt://192.168.2.115:1883'
const topicPrefix = 'msg/'
const options = {
// Clean session
clean: true,
connectTimeout: 4000,
// Auth
clientId: 'mqtt_msg',
username: 'admin',
password: 'admin',
}
const client = connect(brokerURL, options)

We can test the connection by publishing a heartbeat from our Msg client (see app.ts) and reading the message using an MQTT subscription:

initMqttListener() {
console.log('[INFO] :: Msg server connecting to MQTT broker')

// MQTT Subscription
client.on('connect', function() {
console.log('[INFO] :: Connected to MQTT Broker ' + brokerURL)
client.subscribe(topicPrefix + '#', function(err: any) {
if (err) {
console.log('[ERROR] :: MQTT error', err)
} else {
client.publish(topicPrefix + 'heartbeat', 'Msg Service is online')
console.log('[INFO] :: Topic', topicPrefix + 'heartbeat', 'was updated')
}
})
})

// Read MQTT Messages
client.on('message', function(topic: any, message: any) {
// Read topic/message from buffer
console.log('[MQTT] ::', topic.toString(), '"' + message.toString() + '"')
client.end()
})
}

Connecting to your MongoDB Instance

We will use the mongoose NPM package (npm install mongoose) to connect to the database with the following options:

// MongoDB setup
const mongoose = require('mongoose')
const mongoAuth = {
"auth": {
"authSource": "admin"
},
"user": "root",
"pass": "example"
}
const mongoURL = 'mongodb://root:example@192.168.2.111:27017/?retryWrites=true&w=majority'

The following code connects to our MongoDB instances and creates a collection mqtt-msg inside the test database:

initDB() {
connect_db().catch(err => console.log('[ERROR] :: Database error', err));
async function connect_db() {
await mongoose.connect(mongoURL, mongoAuth);
}

const createCollection = mongoose.model('mqtt-msg', { name: String})

const test = new createCollection({ name: 'Msg Client'})
test.save().then(() => console.log('[INFO] :: Msg server connected to database'))
}

Spinning up the Webserver

We will use the express NPM package (npm install express) to create a webserver with the following options:

// Express setup
const express = require('express')
const app = express()
const port = process.env.PORT || 8888

To get started we will simply return a timestamp on the webroot / and use the function to trigger our msg client, that so far only establishes connections to our broker and database:

// Run Webfront
app.get('/', (req: any, res: any) => {
res.json({ server: "running", timestamp: new Date().getTime() })
})


// Webserver
app.listen(port, () => {
console.log('[INFO] :: Webserver running on Port', port)

new MsgBot()
})

Test

Run the app with npm start and check your console output. If everything works you will see the following lines:

[INFO] :: Webserver running on Port 8888
[INFO] :: Msg service is online
[INFO] :: Msg server connecting to MQTT broker
[INFO] :: Connected to MQTT Broker mqtt://192.168.2.115:1883
[INFO] :: Topic msg/heartbeat was updated
[MQTT] :: msg/heartbeat "Msg Service is online"
[INFO] :: Msg server connected to database

You can find the entire app.ts file from this stage below: Establishing Connections.

Writing Incoming Messages to Database

We can now remove the test from our database initialization:

initDB() {
connect_db().catch(err => console.log('[ERROR] :: Database error', err));
async function connect_db() {
await mongoose.connect(mongoURL, mongoAuth);
}
}

And instead move it down into the MQTT function to handle incoming (JSON formatted) MQTT messages and write them into our database:

initMqttListener() {
console.log('[INFO] :: Msg server connecting to MQTT broker')

// MQTT Subscription
client.on('connect', function() {
console.log('[INFO] :: Connected to MQTT Broker ' + brokerURL)
client.subscribe(topicPrefix + '#', function(err: any) {
if (err) {
console.log('[ERROR] :: MQTT error', err)
} else {
client.publish(topicPrefix + 'server', JSON.stringify({ msg: 'Msg Service is online' }))
console.log('[INFO] :: Topic', topicPrefix + 'server', 'was updated')
}
})
})

// Read MQTT Messages
client.on('message', function(topic: any, message: any) {
// Read topic/message from buffer
const msg = message.toString()
console.log('[MQTT] ::', topic.toString(), '"' + msg + '"')
// Write message to database
// Define Collection model
const msgModel = mongoose.model('mqtt-msg', { msg: String, createdOn: Date, topicId: String })
// Use JSON for messages
const msgObject = JSON.parse(msg)
// Add date to each message object
msgObject.createdOn = new Date()
// Add ID to each message object
msgObject.topicId = topic

new msgModel(msgObject).save().then(() => console.log('[INFO] :: Msg saved to database'))

client.end()
})
}

See Writing Messages to Database for full application code.

API & Frontend

Generating Channel IDs

To be able to add those messages to an user interface we will have to provide an API. We can start by creating random session IDs for communication channels:

// Return random channel ID
app.get('/channelId', (req: any, res: any) => {
const {client1, client2} = req.query
const channelId = topicPrefix + btoa(client1 + '_' + client2 + '_' + new Date().getTime)
res.json({ channelId })
})

This creates an Express route that returns a unique, randomly generated, ID for each pair of clients:

curl 'http://localhost:8888/channelId?client1=me&client2=you'

{"channelId":"msg/bWVfeW91X2Z1bmN0aW9uIGdldFRpbWUoKSB7IFtuYXRpdmUgY29kZV0gfQ=="}

Angular Frontend

Scaffold the Angular app:

npm install -g @angular/cli
ng new frontend
cd frontend
ng generate library msg-server

This will create a library for our message server in frontend/projects/msg-server/src/lib that we need to import as MsgServer module in frontend/src/app/app.module.ts:

import { NgModule } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { MsgServerModule } from 'projects/msg-server/src/public-api';

import { AppRoutingModule } from './app-routing.module';
import { AppComponent } from './app.component';

@NgModule({
declarations: [
AppComponent
],
imports: [
BrowserModule,
AppRoutingModule,
MsgServerModule
],
providers: [],
bootstrap: [AppComponent]
})
export class AppModule { }

Now remove all the placeholder HTML from frontend/src/app/app.component.html and replace it with:

<lib-msg-server></lib-msg-server>

<router-outlet></router-outlet>

Run the app npm start and verify that it is running on http://localhost:4200/ - you should see a simple __msg-server works! __.

We can now replace this HTML with a code snippit from codepen.io. The line we saw above is generated in frontend/projects/msg-server/src/lib/msg-server.component.ts which we can replace with our template code:

import { Component, OnInit } from '@angular/core';

@Component({
selector: 'lib-msg-server',
templateUrl: './msg-server.component.html',
styleUrls: [
'./msg-server.component.html'
]
})
export class MsgServerComponent implements OnInit {

constructor() { }

ngOnInit(): void {
}

}

And copy&paste the code from codepen.io:

  • HTML into frontend/projects/msg-server/src/lib/msg-server.component.html
  • CSS into frontend/projects/msg-server/src/lib/msg-server.component.css

And make changes as you see fit to the HTL and CSS.

Connecting the Frontend

Install the following library:

npm install ngx-mqtt
npm install ngx-mqtt-chat

Import it into frontend/projects/msg-server/src/lib/msg-server.module.ts:

import { CommonModule } from '@angular/common';
import { NgModule } from '@angular/core';
import { FormsModule } from '@angular/forms';

import { NgxMqttChatModule } from 'ngx-mqtt-chat';
import { MsgServerComponent } from './msg-server.component';



@NgModule({
declarations: [
MsgServerComponent
],
imports: [
CommonModule,
NgxMqttChatModule,
FormsModule
],
exports: [
MsgServerComponent
]
})

export class MsgServerModule { }

And provide the connection data for your MQTT Broker (the module only seems to support websockets and no authentication ?) in frontend/projects/msg-server/src/lib/msg-server.component.ts:

import { Component, OnInit } from '@angular/core';
import { NgxMqttChatService } from 'ngx-mqtt-chat';

@Component({
selector: 'lib-msg-server',
templateUrl: './msg-server.component.html',
styleUrls: [
'./msg-server.component.css'
]
})
export class MsgServerComponent implements OnInit {
chatView=false;
constructor(
private messagingService: NgxMqttChatService
) { }

ngOnInit(): void {
let mqtt = {
manageConnectionManually: false,
hostname: '192.168.2.111',
port: 1885,
path: ''
} as any
this.messagingService.connect(mqtt),
this.messagingService.publishToTopic(
'msg/bWVfeW91X2Z1bmN0aW9uIGdldFRpbWUoKSB7IFtuYXRpdmUgY29kZV0gfQ==',
{ msg:"Client connected..." }
)
}

}

Here the messaging service is using the channel ID msg/bWVfeW91X2Z1bmN0aW9uIGdldFRpbWUoKSB7IFtuYXRpdmUgY29kZV0gfQ== created earlier to publish a status message.

Addendum

Establishing Connections

// Express setup
const express = require('express')
const app = express()
const port = process.env.PORT || 8888

// MongoDB setup
const mongoose = require('mongoose')
const mongoAuth = {
"auth": {
"authSource": "admin"
},
"user": "root",
"pass": "example"
}
const mongoURL = 'mongodb://root:example@192.168.2.111:27017/?retryWrites=true&w=majority'


// INSTAR MQTT Broker setup
const {connect} = require('mqtt')
const brokerURL = 'mqtt://192.168.2.115:1883'
const topicPrefix = 'msg/'
const options = {
// Clean session
clean: true,
connectTimeout: 4000,
// Auth
clientId: 'mqtt_msg',
username: 'admin',
password: 'admin',
}
const client = connect(brokerURL, options)


// Msg Bot
export class MsgBot {
constructor() {
console.log('[INFO] :: Msg service is online')
this.initMqttListener()
this.initDB()
}

initDB() {
connect_db().catch(err => console.log('[ERROR] :: Database error', err));
async function connect_db() {
await mongoose.connect(mongoURL, mongoAuth);
}

const createCollection = mongoose.model('mqtt-msg', { name: String})

const test = new createCollection({ name: 'Msg Client'})
test.save().then(() => console.log('[INFO] :: Msg server connected to database'))
}

initMqttListener() {
console.log('[INFO] :: Msg server connecting to MQTT broker')

// MQTT Subscription
client.on('connect', function() {
console.log('[INFO] :: Connected to MQTT Broker ' + brokerURL)
client.subscribe(topicPrefix + '#', function(err: any) {
if (err) {
console.log('[ERROR] :: MQTT error', err)
} else {
client.publish(topicPrefix + 'heartbeat', 'Msg Service is online')
console.log('[INFO] :: Topic', topicPrefix + 'heartbeat', 'was updated')
}
})
})

// Read MQTT Messages
client.on('message', function(topic: any, message: any) {
// Read topic/message from buffer
console.log('[MQTT] ::', topic.toString(), '"' + message.toString() + '"')
client.end()
})
}
}


// Run Webfront
app.get('/', (req: any, res: any) => {
res.json({ server: "running", timestamp: new Date().getTime() })
})


// Webserver
app.listen(port, () => {
console.log('[INFO] :: Webserver running on Port', port)

new MsgBot()
})

Writing Messages to Database

// Express setup
const express = require('express')
const app = express()
const port = process.env.PORT || 8888

// MongoDB setup
const mongoose = require('mongoose')
const mongoAuth = {
"auth": {
"authSource": "admin"
},
"user": "root",
"pass": "example"
}
const mongoURL = 'mongodb://root:example@192.168.2.111:27017/?retryWrites=true&w=majority'


// INSTAR MQTT Broker setup
const {connect} = require('mqtt')
const brokerURL = 'mqtt://192.168.2.115:1883'
const topicPrefix = 'msg/'
const options = {
// Clean session
clean: true,
connectTimeout: 4000,
// Auth
clientId: 'msg_server',
username: 'admin',
password: 'admin',
}
const client = connect(brokerURL, options)


// Msg Bot
export class MsgBot {

constructor() {
console.log('[INFO] :: Msg service is online')
this.initMqttListener()
this.initDB()
}

initDB() {
connect_db().catch(err => console.log('[ERROR] :: Database error', err));
async function connect_db() {
await mongoose.connect(mongoURL, mongoAuth);
}
}

initMqttListener() {
console.log('[INFO] :: Msg server connecting to MQTT broker')

// MQTT Subscription
client.on('connect', function() {
console.log('[INFO] :: Connected to MQTT Broker ' + brokerURL)
client.subscribe(topicPrefix + '#', function(err: any) {
if (err) {
console.log('[ERROR] :: MQTT error', err)
} else {
client.publish(topicPrefix + 'server', JSON.stringify({ msg: 'Msg Service is online' }))
console.log('[INFO] :: Topic', topicPrefix + 'server', 'was updated')
}
})
})

// Read MQTT Messages
client.on('message', function(topic: any, message: any) {
// Read topic/message from buffer
const msg = message.toString()
console.log('[MQTT] ::', topic.toString(), '"' + msg + '"')
// Write message to database
// Define Collection model
const msgModel = mongoose.model('mqtt-msg', { msg: String, createdOn: Date, topicId: String })
// Use JSON for messages
const msgObject = JSON.parse(msg)
// Add date to each message object
msgObject.createdOn = new Date()
// Add ID to each message object
msgObject.topicId = topic

new msgModel(msgObject).save().then(() => console.log('[INFO] :: Msg saved to database'))

client.end()
})
}
}


// Run Webfront
app.get('/', (req: any, res: any) => {
res.json({ server: "running", timestamp: new Date().getTime() })
})


// Webserver
app.listen(port, () => {
console.log('[INFO] :: Webserver running on Port', port)

new MsgBot()
})