Home About us Support Partners SIGN UP NOW

Connecting MQTT clients via ESP32

Text Copied
Introduction

This guide offers a comprehensive tutorial on connecting an MQTT client running on an ESP32 device to our CrystalMQ broker or any broker of your choice. It covers essential tasks such as establishing connections, subscribing to topics, unsubscribing, and exchanging messages. By following these steps, you can effectively integrate MQTT communication into your IoT projects.

Prerequisites

Before you begin, ensure you have:

  • An ESP32 development board with Arduino IDE setup
  • Access to any MQTT broker instance (IP address or domain name)
  • WiFi connection credentials for the ESP32

Dependency Installation

Install the Arduino IDE

Download and install the Arduino IDE from the official Arduino website.

Install the ESP32 Board Package

  • Open the Arduino IDE
  • Go to File > Preferences
  • In the Additional Boards Manager URLs field, add the following URL

https://raw.githubusercontent.com/espressif/arduino-esp32/gh-pages/package_esp32_index.json

If there are already URLs in this field, you can separate them with commas.

  • Click OK to save the preferences
  • Go to Tools > Board > Boards Manager
  • In the Boards Manager, search for ESP32 and install the esp32 package by Espressif Systems

Select the ESP32 Board

  • Go to Tools > Board
  • Scroll down and select your specific ESP32 board (e.g., ESP32 Dev Module)

Install Required Libraries

  • Go to Sketch > Include Library > Manage Libraries
  • In the Library Manager, search for the following libraries and install them as needed
    • WiFi (for WiFi connectivity)
    • ESPAsyncWebServer (for creating web servers)
    • PubSubClient (for MQTT communication)
    • Adafruit Sensor (if using Adafruit sensors)
    • DHT sensor library (for DHT sensors)
    • ArduinoJson (for JSON parsing and serialization)

Simply click on the desired library and then click the Install button.

Verify Installation

  • Open an example sketch to verify the installation.
  • Go to File > Examples and look for examples under ESP32 or the installed libraries.
  • Select an example sketch, compile, and upload it to your ESP32 board to ensure everything is set up correctly.

Define Constants

Replace your_wifi_ssid, your_wifi_password, mqtt_server, and mqtt_port with your actual WiFi and MQTT broker details.

/ WiFi
const char *ssid = "xxxxx"; // Enter your WiFi ssid
const char *password = "xxxxx"; // Enter WiFi password
// MQTT Broker
const char *mqtt_server = "public-mqtt-broker.bevywise.com";
const int mqtt_port = 1883;
const char *mqtt_username = "username"; (optional)
const char *mqtt_password = "pwd"; (optional)

Connecting to MQTT Broker

This section has code snippets of various ways to connect to MQTT Broker. Ensure that the MQTT Broker supports the connection type that you would like to use. Also, obtain the corresponding connection parameters of the MQTT Broker (Address, Port, Username/Password, CA Certificate)

MQTT Over TCP

Use the following code to connect the client over TCP. Define the Macro ADDRESS using MQTT Broker's connection parameters.

For MQTT 3.1.1 :

WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{
delay(500);
Serial.println("Connecting to WiFi..");
}
const char *client_id = “esp_client”;
client.setServer(mqtt_server,mqtt_port);

For MQTT 5 :

AsyncMqttClient mqttClient;
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
// You can subscribe to topics here
// mqttClient.subscribe("test/topic", 1);
}
void setup() {
Serial.begin(115200);
connectToWifi();
mqttClient.onConnect(onMqttConnect);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setClientId(client_id);
// Set the protocol version to MQTT 5.0
mqttClient.setProtocolVersion(5);
mqttClient.connect();
}

MQTT Over TLS / SSL

Use the following code to connect securely to MQTT Broker over TLS. Define the Macro ADDRESS using MQTT Broker's connection parameters.

#include <WiFiClientSecure.h>
// Root CA certificate for the MQTT broker
const char* root_ca = \
"-----BEGIN CERTIFICATE-----\n" \
"YOUR_ROOT_CA_CERTIFICATE\n" \
"-----END CERTIFICATE-----\n";
// init wifi secure client
WiFiClientSecure espClient;
PubSubClient client(espClient);
espClient.setCACert(root_ca);
espClient.setCertificate(server_cert); //for client verification
espClient.setPrivateKey(server_key); // for client verification

Set TLS parameters before calling the MQTTClient_connect to connect the client to the mQTT Broker securely over TLS.

If the MQTT Broker is hosted in a trusted server and the server verification is not required, the following code can be used to set TLS Options:

// MQTT Broker settings
const char* mqtt_server = "your_mqtt_server";
const uint16_t mqtt_port = 8883; // Typically, port 8883 is used for MQTT over TLS
const char* client_id = generateClientId(crystal_, length);
// Initialize WiFi and MQTT clients
WiFiClientSecure espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected"); }
void connectToMqtt() {
// Disable server certificate verification
espClient.setInsecure();
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}

If the MQTT Broker has Server Certificate issued from a Trusted CA, then the Server Certificate can be verified using:

/ Root CA certificate for the MQTT broker
const char* root_ca = \
"-----BEGIN CERTIFICATE-----\n" \
"YOUR_ROOT_CA_CERTIFICATE\n" \
"-----END CERTIFICATE-----\n";
// Initialize WiFi and MQTT clients
WiFiClientSecure espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void connectToMqtt() {
// Set the root CA certificate
espClient.setCACert(root_ca);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}

If the MQTT Broker has a self-signed Server Certificate then the Server Certificate can be verified using the Root Certificate obtained from the MQTT Broker:

// Self-signed root certificate for the MQTT broker
const char* root_ca = \
"-----BEGIN CERTIFICATE-----\n" \
"YOUR_SELF_SIGNED_ROOT_CERTIFICATE\n" \
"-----END CERTIFICATE-----\n";
// Initialize WiFi and MQTT clients
WiFiClientSecure espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void connectToMqtt() {
// Set the self-signed root CA certificate
espClient.setCACert(root_ca);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}

MQTT Over WebSocket

Define the MQTT Broker Address like this to connect the client over WebSocket.

#include <PubSubClient.h>
#include <WebSocketsClient.h>
// MQTT Broker settings
const char* mqtt_server = "public-mqtt-broker.bevywise.com";
const uint16_t mqtt_port = 10443; // WebSocket without TLS usually runs on port 80
const char* client_id = generateClientId(crystal_, length);
// Initialize WiFi and MQTT clients
WiFiClient espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void connectToMqtt() {
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT over WebSocket... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void mqttCallback(char* topic, byte* payload, unsigned int length) {
// Handle incoming messages
Serial.print("Message arrived on topic: ");
Serial.print(topic);
Serial.print(". Message: ");
for (unsigned int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}

MQTT Over Secure WebSocket

Use the following code to connect the client over Secure WebSocket. Set TLS Options as given in MQTT Over TLS section. Define the Macro ADDRESS using MQTT Broker's connection parameters.

// MQTT Broker settings
const char* mqtt_server = "public-mqtt-broker.bevywise.com";
const uint16_t mqtt_port =11443; // WebSocket with TLS usually runs on port 443
const char* client_id = "esp_client";
// Root CA certificate for the MQTT broker
const char* root_ca = \
"-----BEGIN CERTIFICATE-----\n" \
"YOUR_ROOT_CA_CERTIFICATE\n" \
"-----END CERTIFICATE-----\n";
// Initialize WiFi and MQTT clients
WiFiClientSecure espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void connectToMqtt() {
// Set the root CA certificate for TLS
espClient.setCACert(root_ca);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT over WebSocket with TLS... ");
if (mqttClient.connect(client_id)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void mqttCallback(char* topic, byte* payload, unsigned int length) {
// Handle incoming messages
Serial.print("Message arrived on topic: ");
Serial.print(topic);
Serial.print(". Message: ");
for (unsigned int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}

Configuring MQTT Authentication

To connect to MQTT Broker that requires MQTT Username and MQTT Password for authentication, add to username and password to the connection options like this:

#include <WiFi.h>
#include <WiFiClient.h>
#include <PubSubClient.h>
// MQTT Broker settings
const char* mqtt_server = "public-mqtt-broker.bevywise.com";
const uint16_t mqtt_port = 1883; // Use port 1883 for non-TLS
const char* mqtt_user = "your_mqtt_username";
const char* mqtt_password = "your_mqtt_password";
const char* client_id_prefix = "esp_client_";
// Initialize WiFi and MQTT clients
WiFiClient espClient;
PubSubClient mqttClient(espClient);
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
String generateClientId(String prefix, int length) {
const char chars[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
int charsLength = sizeof(chars) - 1;
String clientId = prefix;
// Initialize random seed
randomSeed(analogRead(0));
for (int i = 0; i < length; i++) {
clientId += chars[random(0, charsLength)];
}
return clientId;
}
void connectToMqtt() {
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCallback(mqttCallback);
String client_id = generateClientId(client_id_prefix, 10);
while (!mqttClient.connected()) {
Serial.print("Connecting to MQTT... ");
if (mqttClient.connect(client_id.c_str(), mqtt_user, mqtt_password)) {
Serial.println("connected");
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic");
} else {
Serial.print("failed with state ");
Serial.print(mqttClient.state());
delay(2000);
}
}
}
void mqttCallback(char* topic, byte* payload, unsigned int length) {
// Handle incoming messages
Serial.print("Message arrived on topic: ");
Serial.print(topic);
Serial.print(". Message: ");
for (unsigned int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
}
Serial.println();
}
void setup() {
Serial.begin(115200);
connectToWifi();
connectToMqtt();
}
void loop() {
// Ensure the MQTT client stays connected
if (!mqttClient.connected()) {
connectToMqtt();
}
mqttClient.loop();
}

Advanced Features

Setting Up Last Will & Testament

Configure the Last Will and Testament feature to specify a message that the broker will publish if the client unexpectedly disconnects. This helps inform other subscribers of the disconnected client's status.

Use the following code to set Last Will in the Connection Options:

// LWT settings
const char* lwt_topic = "test/lwt";
const char* lwt_message = "ESP32 disconnected";
const int lwt_qos = 1;
const bool lwt_retain = true;
//Function to reconnect MQTTBroker
void reconnect()
{
while (!mqttClient.connected())
{
Serial.print("Attempting MQTT connection...");
// Attempt to connect with the MQTT username, password, and LWT
if(mqttClient.connect("ESP32Client",mqtt_user,mqtt_password, lwt_topic,lwt_qos, lwt_retain, lwt_message)) {
Serial.println("connected"); // Subscribe to a topic mqttClient.subscribe("test/topic");
}
else
{
Serial.print("failed, rc=");
Serial.print(mqttClient.state());
Serial.println(" try again in 5 seconds"); // Wait 5 seconds before retrying
delay(5000); } } }

Adjusting Keep Alive

MQTT maintains client-broker connections with a keep-alive mechanism. Adjust the keep-alive interval to control how frequently the client sends PINGREQ messages to the broker.

Modify the code below to suit your requirements:

// MQTT keep-alive interval (in seconds)
const int mqtt_keep_alive = 60; // Adjust as needed
void setup()
{
mqttClient.setKeepAlive(mqtt_keep_alive);
}

Configuring Session Persistence

Session data of an MQTT Client include the Subscriptions made by the Client and any data that the Client would receive with QoS>0.

The Client can get the MQTT Broker to store its session data across connections.

MQTT 3.1.1 Clients can set Clean Session = 0 to request the MQTT Broker to keep its session information stored across connections.

For MQTT 3.1.1 :

if (mqttClient.connect("ESP32ClientClean", mqtt_user, mqtt_password, nullptr, 0, false, nullptr, true))
{
Serial.println("connected");
}
if (mqttClient.connect("ESP32ClientPersistent", mqtt_user, mqtt_password, nullptr, 0, false, nullptr, false))
{
Serial.println("connected");
}

MQTT 5 Clients can set Clean Start = 0 and Session Expiry Interval = 'N' to request the MQTT Broker to keep its session information stored across connections for 'N' seconds.

For MQTT 5 :

// MQTT client settings
const char* client_id = "ESP32Client";
const bool clean_start = false;
const uint32_t session_expiry_interval = 3600; // Session expiry interval in seconds
AsyncMqttClient mqttClient;
void connectToWifi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);
// Subscribe to topics or publish messages here
// mqttClient.subscribe("your/topic", 0);
}
void setup() {
Serial.begin(115200);
connectToWifi();
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCredentials(mqtt_user, mqtt_password);
// Configure MQTT 5 features
mqttClient.setCleanSession(clean_start);
mqttClient.setSessionExpiry(session_expiry_interval);
mqttClient.connect(client_id);
}
void loop() {
// Nothing to do here, AsyncMqttClient handles everything
}

Setting Maximum Packet Size

MQTT5 Client can request the MQTT Broker to only send data packets less than a specific size by setting it like this:

For MQTT 5 :

const char* data = “largest data to send”;
const int chunkSize = 1024;
int dataLength = strlen(data);
int numChunks = (dataLength + chunkSize - 1) / chunkSize;
// Calculate number of chunks needed
for (int i = 0; i < numChunks; i++)
{
char chunk[chunkSize + 1]; // Buffer for chunk
// Copy chunk of data into buffer
strncpy(chunk, data + i * chunkSize, chunkSize);
chunk[chunkSize] = '\0'; // Null-terminate the string
// Publish chunk as MQTT message
mqttClient.publish("large_data_topic", chunk, false); // false for QoS 0 Serial.print("Published chunk ");
}

Publish

Sending Data

Efficiently distribute data to multiple subscribers by publishing it to designated topics with the following code snippet:

For MQTT 3.1.1

mqttclient.publish("topic",message.c_str());

For MQTT 5

// Example publishing
String message = "Hello, MQTT!";
mqttClient.publish("topic", 0, false, message.c_str());
}

Setting Retained Messages

Enable the retain flag when publishing a message to ensure the broker stores the last message for each topic. This guarantees that new subscribers receive the most recent message upon connecting.

To implement this, use the following code snippet:

// Example: Publish a retained message every 10 seconds
static unsigned long lastPublish = 0;
if (millis() - lastPublish > 10000)
{
lastPublish = millis();
if (mqttClient.publish("topic", "Device Status = 1", true))
{
Serial.println("Retained message published successfully");
}
else
{
Serial.println("Retained message publish failed");
}
}

Specifying QoS Levels

MQTT provides three levels of Quality of Service (QoS) for message delivery:

  • QoS 0 (At most once)
  • QoS 1 (At least once)
  • QoS 2 (Exactly once)
Specify the required QoS level when publishing MQTT messages using this code:

if (mqttClient.publish("topic", "Device status with DoS 0", false, 0))
{
Serial.println("Message with QoS 0 published successfully");
}
else
{
Serial.println("Message with QoS 0 publish failed");
}
// Publish a message with QoS 1
if (mqttClient.publish("topic", "Devicce status with QoS 1", false, 1))
{
Serial.println("Message with QoS 1 published successfully");
}
else
{
Serial.println("Message with QoS 1 publish failed");
}
// Publish a message with QoS 2
if (mqttClient.publish("topic", "Device status with QoS 2", false, 2))
{
Serial.println("Message with QoS 2 published successfully");
}
else
{
Serial.println("Message with QoS 2 publish failed");
}

Message Expiry Interval

The 'Message expiry interval' property sets a message's life span in seconds; if undelivered within this time, the broker discards it. MQTT5 supports this feature. MQTT5 Clients can set this while publishing data.

For MQTT 5

int expiryInterval = 60;
const char* message = “device status messge”;
mqttClient.beginPublish(topic, strlen(message), false, expiryInterval *1000); // Expiry interval in milliseconds
mqttClient.print(payload);
mqttClient.endPublish();

Topic Alias

The 'Topic Alias' property allows clients to use a short alias instead of a full topic name, reducing message packet size and improving network efficiency.

For MQTT 5

// Topic Alias maximum value (adjust based on MQTT broker capabilities)
const int MAX_TOPIC_ALIAS = 10;
const char* message = “Device status message”;
// Publish with topic alias if supported by the broker
mqttClient.beginPublish(topic, strlen(message), false);
mqttClient.writeTopic(topic); // Publish full topic name
mqttClient.writeTopicAlias(1); // Set topic alias, e.g., 1 mqttClient.print(message);
mqttClient.endPublish();

Properties associated with MQTT PUBLISH enhance message handling, providing context or instructions for brokers and clients. These properties, including message expiry intervals and topic aliases, optimize message delivery and network bandwidth.

Subscribe

Subscribing to Topic Filter

To receive data published by other clients, this client has to subscribe to a matching Topic Filter like this:

mqttClient.subscribe("cmq/topic");

This topic filter can match with an exact topic or it can have wildcards like # and +

Sending Data

To receive data sent for the subscriptions, a callback function needs to be defined like this:

void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
Serial.println("Message received:");
Serial.print(" Topic: ");
Serial.println(topic);
Serial.print(" Payload: ");
Serial.println(payload);
// You can add more processing here based on your application needs
}
void setup() {
Serial.begin(115200);
connectToWifi();
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onMessage(onMqttMessage); // Set the callback function for incoming messages
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCredentials(mqtt_user, mqtt_password);
mqttClient.connect(client_id);
}

Unsubscribing from Topics

To stop receiving updates from a topic, use the code provided to unsubscribe.

// Unsubscribe from the topicawait
mqttClient.unsubscribe("cmq/topic");

Disconnecting the Client

Ensure a proper termination of your client's connection with the broker to avoid issues and resource leaks on both sides, thereby maintaining system stability.

Use the following code to disconnect the client from the broker:

mqttClient.disconnect();

Building Your Business Logic

You have the opportunity to develop and customize your own intricate business logic within this environment, tailoring it precisely to your specific needs and objectives.

Implementing Best Practices

Unique Client ID Management

Assign a specific client ID to each device for accurate identification. In private settings, designate unique IDs for each client. In shared environments, append a random string to each client ID to maintain uniqueness.

Data Design

Strategically plan your data structure beforehand. Whether you are working with plain text, JSON-formatted data, or numerical data, ensure the design aligns with your application's needs and use case.

Robust Error Handling

Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.

Securing Credentials

Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.

Regular Testing & Monitoring

Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.

Optimizing Session Management

Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.

Reconnect on Disconnect

Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.

Download Code

Download the complete code for client that uses ESP32 MQTT Client Library to connect with our CrystalMQ broker or any broker of your choice.

For MQTT 3.1.1

//This code reads the availability status and productivity status of machine to mqttbroker
//include necessary libraries
#include <Arduino.h>
#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
#include "time.h"
// WiFiUDP ntpUDP;
const char *ntpServer = "time.google.com";
WiFiClient ProxSen;
PubSubClient client(ProxSen);
unsigned long epochTime = 0;
unsigned long currentMillis1 = 0;
const char* ssid = "xxxxxxxxxx"; // replace xxx.. with your WIFI SSID
const char* password = "xxxxxxxxxx"; // repace with WIFI Password
const char* mqtt_server = "xxxxxxxxxxxxx"; //MQTT server name
const char* mqtt_user = "xxxxxxxxxxxxx"; //MQTT broker user name
const char* mqtt_password = "xxxxxxxxxx"; //MQTT broker password
const int ProxSenPin = 33; //availablity status read
const int productivityPin = 26; // produtivity status read
const unsigned long aggregationInterval = 60000; // 1 minute
unsigned int availabilityValues[60]; // Store availability and productivity values for 1 minutes
// unsigned int changeoverValues[300];
unsigned int valueIndex = 0;
int availability = 0;
int productivity = 1;
bool produceFlag = false;
unsigned long getTime()
{
time_t now;
Serial.println("inside gettime");
struct tm timeinfo;
if (!getLocalTime(&timeinfo))
{
Serial.println("Failed to obtain time");
getTime();
}
time(&now);
return now;
}
// To read the availability of the machine
void IRAM_ATTR availIsr()
{
availability = digitalRead(ProxSenPin) ? 0 : 1; //device works at negataive logic
}
// To read the productivity of the machine
void IRAM_ATTR produceIsr()
{
produceFlag = digitalRead(productivityPin) ? false : true;
}
//Connection with mqtt broker
void connectToMqtt()
{
while (!client.connected())
{
Serial.println("Connecting to MQTT...");
if (client.connect("ProxSen", mqtt_user, mqtt_password))
{
Serial.println("Connected to MQTT");
}
else
{
Serial.print("Failed to connect to MQTT, rc=");
Se}
rial.print(client.state());
Serial.println(" Retrying in 5 seconds...");
delay(5000);
}
}
void setup()
{
Serial.begin(115200);
//configure WIFI connection
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{
delay(5000);
Serial.print(".");
}
Serial.println("WiFi connected");
//configure MQTTBroker
client.setServer(mqtt_server,1883);
configTime(0, 0, ntpServer);
epochTime = getTime();
connectToMqtt();
pinMode(ProxSenPin, INPUT);
attachInterrupt(digitalPinToInterrupt(ProxSenPin), availIsr, CHANGE);
pinMode(productivityPin, INPUT);
attachInterrupt(digitalPinToInterrupt(productivityPin), produceIsr, CHANGE);
availability = digitalRead(ProxSenPin)?0:1;
delay(25);
}
void loop()
{
epochTime = epochTime + 1;
availabilityValues[valueIndex] = availability;
valueIndex++;
//publish the production data at the part end time
if(produceFlag)
{
DynamicJsonDocument jsonDocument1(1000);
String jsonProductivity;
produceFlag = false;
jsonDocument1["production_status"] = 1;
jsonDocument1["timestamp"] = epochTime;
serializeJson(jsonDocument1,jsonProductivity);
if (!client.connected())
{
connectToMqtt();
}
client.publish("ProxSen/productivity",jsonProductivity.c_str());
}
//publish machine status every 5 seconds
if(millis() - currentMillis1 >= 5000)
{
currentMillis1 = millis();
DynamicJsonDocument jsonDocument2(1000);
String jsonAvailStatus;
jsonDocument2["machine_status"] = availability;
jsonDocument2["timestamp"] = epochTime;
serializeJson(jsonDocument2,jsonAvailStatus);
if (!client.connected())
{
connectToMqtt();
}
client.publish("ProxSen/machine_status", jsonAvailStatus.c_str());
}
delay(1000);
}

For MQTT 5

#include <Arduino.h>
#include <WiFi.h>
#include <AsyncMqttClient.h>
#include <ArduinoJson.h>
#include "time.h"
// WiFi credentials
const char* ssid = "your_SSID"; // Replace with your WiFi SSID
const char* password = "your_PASSWORD"; // Replace with your WiFi password
// MQTT Broker settings const char* mqtt_server = "public-mqtt-broker.bevywise.com"; // MQTT broker server name
const uint16_t mqtt_port = 1883; // MQTT broker port (use 1883 for non-TLS)
const char* mqtt_user = "your_mqtt_username"; // MQTT broker username
const char* mqtt_password = "your_mqtt_password"; // MQTT broker password
// MQTT client settings
const char* client_id = "ESP32Client";
AsyncMqttClient mqttClient;
WiFiClient wifiClient;
unsigned long epochTime = 0;
unsigned long currentMillis1 = 0;
const char* ntpServer = "time.google.com";
const int ProxSenPin = 33; // Availability status pin
const int productivityPin = 26; // Productivity status pin
const unsigned long aggregationInterval = 60000; // 1 minute
unsigned int availabilityValues[60]; // Store availability values for 1 minute
unsigned int valueIndex = 0;
int availability = 0;
bool produceFlag = false;
void IRAM_ATTR availIsr() {
availability = digitalRead(ProxSenPin) ? 0 : 1; // Inverse logic (negative logic)
}
void IRAM_ATTR produceIsr() {
produceFlag = digitalRead(productivityPin) ? false : true;
}
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
}
unsigned long getTime() {
time_t now;
struct tm timeinfo;
if (!getLocalTime(&timeinfo)) {
Serial.println("Failed to obtain time");
getTime();
}
time(&now);
return now;
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);
// Subscribe to necessary topics if needed
mqttClient.subscribe("ProxSen/#");
// Example publishing on connect (if needed)
String message = "Hello, MQTT!";
mqttClient.publish("ProxSen/test", 0, false, message.c_str());
}
void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
Serial.println("Disconnected from MQTT.");
if (WiFi.isConnected()) {
connectToMqtt();
}
}
void setup() {
Serial.begin(115200);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
mqttClient.setServer(mqtt_server, mqtt_port);
mqttClient.setCredentials(mqtt_user, mqtt_password);
mqttClient.setClientId(client_id);
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
// Configure MQTT 5 specific settings (optional)
mqttClient.setCleanSession(true); // Set clean session to true or false based on your requirement
// Connect to MQTT broker
connectToMqtt();
// Setup interrupts for availability and productivity pins
pinMode(ProxSenPin, INPUT);
attachInterrupt(digitalPinToInterrupt(ProxSenPin), availIsr, CHANGE);
pinMode(productivityPin, INPUT);
attachInterrupt(digitalPinToInterrupt(productivityPin), produceIsr, CHANGE);
availability = digitalRead(ProxSenPin) ? 0 : 1;
delay(25);
}
void loop() {
epochTime = epochTime + 1;
availabilityValues[valueIndex] = availability;
valueIndex++;
// Publish productivity data when produceFlag is set
if (produceFlag) {
DynamicJsonDocument jsonDocument1(1000);
String jsonProductivity;
produceFlag = false;
jsonDocument1["production_status"] = 1;
jsonDocument1["timestamp"] = epochTime;
serializeJson(jsonDocument1, jsonProductivity);
if (mqttClient.connected()) {
mqttClient.publish("ProxSen/productivity", 0, false, jsonProductivity.c_str());
}
}
// Publish machine status every 5 seconds
if (millis() - currentMillis1 >= 5000) {
currentMillis1 = millis();
DynamicJsonDocument jsonDocument2(1000);
String jsonAvailStatus;
jsonDocument2["machine_status"] = availability;
jsonDocument2["timestamp"] = epochTime;
serializeJson(jsonDocument2, jsonAvailStatus);
if (mqttClient.connected()) {
mqttClient.publish("ProxSen/machine_status", 0, false, jsonAvailStatus.c_str());
}
}
// Handle MQTT client library keep alive and message processing
if (mqttClient.connected()) {
mqttClient.loop();
} else {
connectToMqtt();
}
delay(1000);
}

Create Executable Bundle

Steps to Flash the Code into ESP32

Following are the steps to upload the code into the ESP32 processor :

  • Prepare the board: Hold down the boot button, then press and release the reset button.
  • Select the board and port: Go to Tools, then Port, and select the ESP32 dev module board and the correct COM port. If you're not sure which port to use, try selecting one and see if it works.
  • Verify the code: Click the Verify icon in the top left corner to compile the code and check for errors.
  • Upload the code: Click the Upload icon to compile the code again and push it to the ESP32.
  • Check the output: Open the serial monitor to see the output. If the output is gibberish, you might need to change the baud rate to match the code and the serial monitor.

Connect your client to our state-of-the-art CrystalMQ broker or any broker of your choice. This powerful combination will ensure optimal performance and reliability for all your messaging needs, paving the way for a robust and efficient system integration.