MQTT Message Server and Angular Frontend
- Basic Setup
- Connections
- Writing Incoming Messages to Database
- API & Frontend
- Generating Channel IDs
- Angular Frontend
- Connecting the Frontend
- Addendum
Basic Setup
Msg Server
git clone https://github.com/mpolinowski/mqtt-msg
cd mqtt-msg
npm install
Database
docker pull mongo:4.4.14
docker pull mongo-express:1.0.0-alpha.4
docker-compose.yml
version: '3.8'
services:
mongodb:
image: mongo:4.4.14
container_name: mongodb
restart: unless-stopped
ports:
- 27017:27017
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
volumes:
- type: bind
source: /opt/mongo/db
target: /data/db
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongo mongodb:27017/test --quiet
interval: 30s
timeout: 10s
retries: 3
networks:
- mongodb_network
mongo-express:
image: mongo-express:1.0.0-alpha.4
container_name: mongo-express
restart: unless-stopped
ports:
- 8081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
ME_CONFIG_MONGODB_URL: mongodb://root:example@mongodb:27017/
ME_CONFIG_BASICAUTH_USERNAME: admin
ME_CONFIG_BASICAUTH_PASSWORD: example
depends_on:
- mongodb
networks:
- mongodb_network
networks:
mongodb_network:
name: mongodb_network
Create directories for data storage:
mkdir -p /opt/mongo/{db,configdb}
Open ports if application runs on a different server and start database:
ufw allow 8081/tcp
ufw allow 27017/tcp
ufw reload
ufw status
docker-compose up -d
docker ps -a
And login to the Mongo-Express web frontend on your server IP and port 8081
with your basicAuth password (s. docker-compose.yml).
Connections
Connecting to your MQTT Broker
We will use the mqtt NPM package (npm install mqtt
) to connect to the MQTT broker running on an INSTAR WQHD Camera with the following options:
// INSTAR MQTT Broker setup
const {connect} = require('mqtt')
const brokerURL = 'mqtt://192.168.2.115:1883'
const topicPrefix = 'msg/'
const options = {
// Clean session
clean: true,
connectTimeout: 4000,
// Auth
clientId: 'mqtt_msg',
username: 'admin',
password: 'admin',
}
const client = connect(brokerURL, options)
We can test the connection by publishing a heartbeat from our Msg client (see app.ts
) and reading the message using an MQTT subscription:
initMqttListener() {
console.log('[INFO] :: Msg server connecting to MQTT broker')
// MQTT Subscription
client.on('connect', function() {
console.log('[INFO] :: Connected to MQTT Broker ' + brokerURL)
client.subscribe(topicPrefix + '#', function(err: any) {
if (err) {
console.log('[ERROR] :: MQTT error', err)
} else {
client.publish(topicPrefix + 'heartbeat', 'Msg Service is online')
console.log('[INFO] :: Topic', topicPrefix + 'heartbeat', 'was updated')
}
})
})
// Read MQTT Messages
client.on('message', function(topic: any, message: any) {
// Read topic/message from buffer
console.log('[MQTT] ::', topic.toString(), '"' + message.toString() + '"')
client.end()
})
}
Connecting to your MongoDB Instance
We will use the mongoose NPM package (npm install mongoose
) to connect to the database with the following options:
// MongoDB setup
const mongoose = require('mongoose')
const mongoAuth = {
"auth": {
"authSource": "admin"
},
"user": "root",
"pass": "example"
}
const mongoURL = 'mongodb://root:example@192.168.2.111:27017/?retryWrites=true&w=majority'
The following code connects to our MongoDB instances and creates a collection mqtt-msg
inside the test
database:
initDB() {
connect_db().catch(err => console.log('[ERROR] :: Database error', err));
async function connect_db() {
await mongoose.connect(mongoURL, mongoAuth);
}
const createCollection = mongoose.model('mqtt-msg', { name: String})
const test = new createCollection({ name: 'Msg Client'})
test.save().then(() => console.log('[INFO] :: Msg server connected to database'))
}
Spinning up the Webserver
We will use the express NPM package (npm install express
) to create a webserver with the following options:
// Express setup
const express = require('express')
const app = express()
const port = process.env.PORT || 8888
To get started we will simply return a timestamp on the webroot /
and use the function to trigger our msg client, that so far only establishes connections to our broker and database:
// Run Webfront
app.get('/', (req: any, res: any) => {
res.json({ server: "running", timestamp: new Date().getTime() })
})
// Webserver
app.listen(port, () => {
console.log('[INFO] :: Webserver running on Port', port)
new MsgBot()
})
Test
Run the app with npm start
and check your console output. If everything works you will see the following lines:
[INFO] :: Webserver running on Port 8888
[INFO] :: Msg service is online
[INFO] :: Msg server connecting to MQTT broker
[INFO] :: Connected to MQTT Broker mqtt://192.168.2.115:1883
[INFO] :: Topic msg/heartbeat was updated
[MQTT] :: msg/heartbeat "Msg Service is online"
[INFO] :: Msg server connected to database
You can find the entire app.ts
file from this stage below: Establishing Connections.
Writing Incoming Messages to Database
We can now remove the test from our database initialization:
initDB() {
connect_db().catch(err => console.log('[ERROR] :: Database error', err));
async function connect_db() {
await mongoose.connect(mongoURL, mongoAuth);
}
}
And instead move it down into the MQTT function to handle incoming (JSON formatted) MQTT messages and write them into our database:
initMqttListener() {
console.log('[INFO] :: Msg server connecting to MQTT broker')
// MQTT Subscription
client.on('connect', function() {
console.log('[INFO] :: Connected to MQTT Broker ' + brokerURL)
client.subscribe(topicPrefix + '#', function(err: any) {
if (err) {
console.log('[ERROR] :: MQTT error', err)
} else {
client.publish(topicPrefix + 'server', JSON.stringify({ msg: 'Msg Service is online' }))
console.log('[INFO] :: Topic', topicPrefix + 'server', 'was updated')
}
})
})
// Read MQTT Messages
client.on('message', function(topic: any, message: any) {
// Read topic/message from buffer
const msg = message.toString()
console.log('[MQTT] ::', topic.toString(), '"' + msg + '"')
// Write message to database
// Define Collection model
const msgModel = mongoose.model('mqtt-msg', { msg: String, createdOn: Date, topicId: String })
// Use JSON for messages
const msgObject = JSON.parse(msg)
// Add date to each message object
msgObject.createdOn = new Date()
// Add ID to each message object
msgObject.topicId = topic
new msgModel(msgObject).save().then(() => console.log('[INFO] :: Msg saved to database'))
client.end()
})
}
See Writing Messages to Database for full application code.
API & Frontend
Generating Channel IDs
To be able to add those messages to an user interface we will have to provide an API. We can start by creating random session IDs for communication channels:
// Return random channel ID
app.get('/channelId', (req: any, res: any) => {
const {client1, client2} = req.query
const channelId = topicPrefix + btoa(client1 + '_' + client2 + '_' + new Date().getTime)
res.json({ channelId })
})
This creates an Express route that returns a unique, randomly generated, ID for each pair of clients:
curl 'http://localhost:8888/channelId?client1=me&client2=you'
{"channelId":"msg/bWVfeW91X2Z1bmN0aW9uIGdldFRpbWUoKSB7IFtuYXRpdmUgY29kZV0gfQ=="}
Angular Frontend
Scaffold the Angular app:
npm install -g @angular/cli
ng new frontend
cd frontend
ng generate library msg-server
This will create a library for our message server in frontend/projects/msg-server/src/lib
that we need to import as MsgServer module in frontend/src/app/app.module.ts
:
import { NgModule } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { MsgServerModule } from 'projects/msg-server/src/public-api';
import { AppRoutingModule } from './app-routing.module';
import { AppComponent } from './app.component';
@NgModule({
declarations: [
AppComponent
],
imports: [
BrowserModule,
AppRoutingModule,
MsgServerModule
],
providers: [],
bootstrap: [AppComponent]
})
export class AppModule { }
Now remove all the placeholder HTML from frontend/src/app/app.component.html
and replace it with:
<lib-msg-server></lib-msg-server>
<router-outlet></router-outlet>
Run the app npm start
and verify that it is running on http://localhost:4200/
- you should see a simple __msg-server works! __.
We can now replace this HTML with a code snippit from codepen.io. The line we saw above is generated in frontend/projects/msg-server/src/lib/msg-server.component.ts
which we can replace with our template code:
import { Component, OnInit } from '@angular/core';
@Component({
selector: 'lib-msg-server',
templateUrl: './msg-server.component.html',
styleUrls: [
'./msg-server.component.html'
]
})
export class MsgServerComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
}
And copy&paste the code from codepen.io:
- HTML into
frontend/projects/msg-server/src/lib/msg-server.component.html
- CSS into
frontend/projects/msg-server/src/lib/msg-server.component.css
And make changes as you see fit to the HTL and CSS.
Connecting the Frontend
Install the following library:
npm install ngx-mqtt
npm install ngx-mqtt-chat
Import it into frontend/projects/msg-server/src/lib/msg-server.module.ts
:
import { CommonModule } from '@angular/common';
import { NgModule } from '@angular/core';
import { FormsModule } from '@angular/forms';
import { NgxMqttChatModule } from 'ngx-mqtt-chat';
import { MsgServerComponent } from './msg-server.component';
@NgModule({
declarations: [
MsgServerComponent
],
imports: [
CommonModule,
NgxMqttChatModule,
FormsModule
],
exports: [
MsgServerComponent
]
})
export class MsgServerModule { }
And provide the connection data for your MQTT Broker (the module only seems to support websockets and no authentication ?) in frontend/projects/msg-server/src/lib/msg-server.component.ts
:
import { Component, OnInit } from '@angular/core';
import { NgxMqttChatService } from 'ngx-mqtt-chat';
@Component({
selector: 'lib-msg-server',
templateUrl: './msg-server.component.html',
styleUrls: [
'./msg-server.component.css'
]
})
export class MsgServerComponent implements OnInit {
chatView=false;
constructor(
private messagingService: NgxMqttChatService
) { }
ngOnInit(): void {
let mqtt = {
manageConnectionManually: false,
hostname: '192.168.2.111',
port: 1885,
path: ''
} as any
this.messagingService.connect(mqtt),
this.messagingService.publishToTopic(
'msg/bWVfeW91X2Z1bmN0aW9uIGdldFRpbWUoKSB7IFtuYXRpdmUgY29kZV0gfQ==',
{ msg:"Client connected..." }
)
}
}
Here the messaging service is using the channel ID msg/bWVfeW91X2Z1bmN0aW9uIGdldFRpbWUoKSB7IFtuYXRpdmUgY29kZV0gfQ==
created earlier to publish a status message.
Addendum
Establishing Connections
// Express setup
const express = require('express')
const app = express()
const port = process.env.PORT || 8888
// MongoDB setup
const mongoose = require('mongoose')
const mongoAuth = {
"auth": {
"authSource": "admin"
},
"user": "root",
"pass": "example"
}
const mongoURL = 'mongodb://root:example@192.168.2.111:27017/?retryWrites=true&w=majority'
// INSTAR MQTT Broker setup
const {connect} = require('mqtt')
const brokerURL = 'mqtt://192.168.2.115:1883'
const topicPrefix = 'msg/'
const options = {
// Clean session
clean: true,
connectTimeout: 4000,
// Auth
clientId: 'mqtt_msg',
username: 'admin',
password: 'admin',
}
const client = connect(brokerURL, options)
// Msg Bot
export class MsgBot {
constructor() {
console.log('[INFO] :: Msg service is online')
this.initMqttListener()
this.initDB()
}
initDB() {
connect_db().catch(err => console.log('[ERROR] :: Database error', err));
async function connect_db() {
await mongoose.connect(mongoURL, mongoAuth);
}
const createCollection = mongoose.model('mqtt-msg', { name: String})
const test = new createCollection({ name: 'Msg Client'})
test.save().then(() => console.log('[INFO] :: Msg server connected to database'))
}
initMqttListener() {
console.log('[INFO] :: Msg server connecting to MQTT broker')
// MQTT Subscription
client.on('connect', function() {
console.log('[INFO] :: Connected to MQTT Broker ' + brokerURL)
client.subscribe(topicPrefix + '#', function(err: any) {
if (err) {
console.log('[ERROR] :: MQTT error', err)
} else {
client.publish(topicPrefix + 'heartbeat', 'Msg Service is online')
console.log('[INFO] :: Topic', topicPrefix + 'heartbeat', 'was updated')
}
})
})
// Read MQTT Messages
client.on('message', function(topic: any, message: any) {
// Read topic/message from buffer
console.log('[MQTT] ::', topic.toString(), '"' + message.toString() + '"')
client.end()
})
}
}
// Run Webfront
app.get('/', (req: any, res: any) => {
res.json({ server: "running", timestamp: new Date().getTime() })
})
// Webserver
app.listen(port, () => {
console.log('[INFO] :: Webserver running on Port', port)
new MsgBot()
})
Writing Messages to Database
// Express setup
const express = require('express')
const app = express()
const port = process.env.PORT || 8888
// MongoDB setup
const mongoose = require('mongoose')
const mongoAuth = {
"auth": {
"authSource": "admin"
},
"user": "root",
"pass": "example"
}
const mongoURL = 'mongodb://root:example@192.168.2.111:27017/?retryWrites=true&w=majority'
// INSTAR MQTT Broker setup
const {connect} = require('mqtt')
const brokerURL = 'mqtt://192.168.2.115:1883'
const topicPrefix = 'msg/'
const options = {
// Clean session
clean: true,
connectTimeout: 4000,
// Auth
clientId: 'msg_server',
username: 'admin',
password: 'admin',
}
const client = connect(brokerURL, options)
// Msg Bot
export class MsgBot {
constructor() {
console.log('[INFO] :: Msg service is online')
this.initMqttListener()
this.initDB()
}
initDB() {
connect_db().catch(err => console.log('[ERROR] :: Database error', err));
async function connect_db() {
await mongoose.connect(mongoURL, mongoAuth);
}
}
initMqttListener() {
console.log('[INFO] :: Msg server connecting to MQTT broker')
// MQTT Subscription
client.on('connect', function() {
console.log('[INFO] :: Connected to MQTT Broker ' + brokerURL)
client.subscribe(topicPrefix + '#', function(err: any) {
if (err) {
console.log('[ERROR] :: MQTT error', err)
} else {
client.publish(topicPrefix + 'server', JSON.stringify({ msg: 'Msg Service is online' }))
console.log('[INFO] :: Topic', topicPrefix + 'server', 'was updated')
}
})
})
// Read MQTT Messages
client.on('message', function(topic: any, message: any) {
// Read topic/message from buffer
const msg = message.toString()
console.log('[MQTT] ::', topic.toString(), '"' + msg + '"')
// Write message to database
// Define Collection model
const msgModel = mongoose.model('mqtt-msg', { msg: String, createdOn: Date, topicId: String })
// Use JSON for messages
const msgObject = JSON.parse(msg)
// Add date to each message object
msgObject.createdOn = new Date()
// Add ID to each message object
msgObject.topicId = topic
new msgModel(msgObject).save().then(() => console.log('[INFO] :: Msg saved to database'))
client.end()
})
}
}
// Run Webfront
app.get('/', (req: any, res: any) => {
res.json({ server: "running", timestamp: new Date().getTime() })
})
// Webserver
app.listen(port, () => {
console.log('[INFO] :: Webserver running on Port', port)
new MsgBot()
})