Home About us Support Partners SIGN UP NOW

Setting up a Java MQTT Client

Text Copied
Introduction

This documentation guides you through the process of connecting an MQTT client to our CrystalMQ broker or any broker of your choice using the Eclipse Paho Java client library. It covers setting up connections via TCP, secure ports, and websockets, configuring MQTT authentication, utilizing advanced features, and performing basic operations like publish, subscribe, unsubscribe, and disconnecting clients.

Pre-requisites

Ensure you have:

  • JDK installed (Java Development Kit)
  • Eclipse IDE (or any Java IDE of your choice)
  • Any MQTT broker instance running or accessible

Dependency Installation

Setting up the development environment

Configure your Java environment to include the Eclipse Paho MQTT client library. This library facilitates MQTT communication and can be integrated into your Java project via Maven or by manual library inclusion.

Download JARS:

org.eclipse.paho.client.mqttv3-1.2.5.jar for Mqttv3 Client
org.eclipse.paho.mqttv5.client-1.2.5.jar for Mqttv5 Client

Connecting to MQTT Broker

This section has code snippets of various ways to connect to MQTT Broker. Ensure that the MQTT Broker supports the connection type that you would like to use. Also, obtain the corresponding connection parameters of the MQTT Broker (Address, Port, Username/Password, CA Certificate)

MQTT Over TCP

Use the following code to connect the client over TCP.

Define the Macro ADDRESS using MQTT Broker's connection parameters.

MQTT 3.1.1

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

boolean TLS = false; // Enable TLS
boolean AUTH = true;
int port = TLS? 8883 : 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = new MemoryPersistence();

client = new MqttClient(brokerUrl, client_id, persistence);

if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

boolean AUTH = true;
int port = 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", "tcp", hostip, port);
MemoryPersistence persistence = new MemoryPersistence();

client = new MqttClient(brokerUrl, client_id, persistence);

if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

MQTT Over TLS / SSL

Use the following code to connect securely to MQTT Broker over TLS.

Define the Macro ADDRESS using MQTT Broker's connection parameters.

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

boolean TLS = true;
boolean AUTH = true;
int port = TLS? 8883 : 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = new MemoryPersistence();

client = new MqttClient(brokerUrl, client_id, persistence);

MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Replace with your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " + e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

Set TLS parameters before calling the MQTTClient_connect to connect the client to the mQTT Broker securely over TLS.

If the MQTT Broker is hosted in a trusted server and the server verification is not required, the following code can be used to set TLS Options:

boolean TLS = true; // Use TLS
int port = TLS ? 8883 : 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = new MemoryPersistence();

try {
client = new MqttClient(brokerUrl, client_id, persistence);

MqttConnectOptions connOpts = new MqttConnectOptions();

if (TLS) {
connOpts.setSocketFactory(SocketFactory.getDefault()); // Use default socket factory
}

connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanSession(true);

// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

If the MQTT Broker has Server Certificate issued from a Trusted CA, then the Server Certificate can be verified using:

boolean TLS = true; // Use TLS
int port = TLS ? 8883 : 1883;
MqttClient client;
String brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = new MemoryPersistence();

try {
client = new MqttClient(brokerUrl, client_id, persistence);

MqttConnectOptions connOpts = new MqttConnectOptions();

if (TLS) {
SSLSocketFactory socketFactory = createSSLSocketFactory();
if (socketFactory != null) {
connOpts.setSocketFactory(socketFactory);
} else {
System.out.println("Error creating SSL socket factory.");
return;
}
}

connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanSession(true);

// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

// Continue with MQTT operations...

} catch (MqttException e) {
System.out.println("Error connecting to MQTT broker: " + e.getMessage());
e.printStackTrace();
}
private static SSLSocketFactory createSSLSocketFactory() {
try {
// Load your trusted CA certificates
KeyStore trustedStore = KeyStore.getInstance(KeyStore.getDefaultType());
// Load your trusted CA certificates here
// trustedStore.load(new FileInputStream("path_to_trusted_ca_cert"), "password".toCharArray());

// Create and initialize SSLContext
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null); // Use default trust managers

return sslContext.getSocketFactory();
} catch (Exception e) {
System.out.println("Error creating SSL socket factory: " + e.getMessage());
e.printStackTrace();
}
return null;
}

If the MQTT Broker has a self-signed Server Certificate then the Server Certificate can be verified using the Root Certificate obtained from the MQTT Broker:

// Load the root certificate
var rootCert = new X509Certificate2("path_of _the_root_file");

// Configure MQTT client options
var options = new MqttClientOptionsBuilder()
.WithClientId(GenerateClientId("crystalmq_",10))
.WithTcpServer("public-mqtt-broker.bevywise.com", 8883)
.WithCredentials("highlysecure", "N4xnpPTru43T8Lmk") // Add username and password
.WithTls(new MqttClientOptionsBuilderTlsParameters
{
UseTls = true,
Certificates = new List<X509Certificate> { rootCert },
AllowUntrustedCertificates = true, // Disable certificate chain validation
IgnoreCertificateChainErrors = true, // Ignore certificate chain errors
IgnoreCertificateRevocationErrors = true // Ignore certificate revocation errors
})
.Build();

// Create MQTT client
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
mqttClient.UseConnectedHandler(async e =>
{
Console.WriteLine("Connected successfully with MQTT broker.");

});
// Connect to the MQTT broker
await mqttClient.ConnectAsync(options);

MQTT Over WebSocket

Define the MQTT Broker Address like this to connect the client over WebSocket.

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//For Version-5
import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

int port;
String broker_Url= "";
boolean TLS = false; //set true to Enable TLS
boolean AUTH = true;
boolean web_socket = true;
if(web_socket)
port= TLS? 11433: 10433;
else
port = TLS? 8883 : 1883;
MqttClient client;
if(web_socket)
brokerUrl= String.format("%s://%s:%d", TLS ? "wss" : "ws", hostip, port);
else
brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port);
MemoryPersistence persistence = new MemoryPersistence();

client = new MqttClient(brokerUrl, client_id, persistence);

MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Replace with
your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

MQTT Over Secure WebSocket

Use the following code to connect the client over Secure WebSocket.

Set TLS Options as given in MQTT Over TLS section.

Define the Macro ADDRESS using MQTT Broker's connection parameters.

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
//For Version-5
import org.eclipse.paho.mqttv5.client.*
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;

int port;
String broker_Url= "";
boolean TLS = true; //set true to Enable TLS
boolean AUTH = true;
boolean web_socket = true;
if(web_socket)
port= TLS? 11433: 10433;
else
port = TLS? 8883 : 1883;
MqttClient client;
if(web_socket)
brokerUrl= String.format("%s://%s:%d", TLS ? "wss" : "ws", hostip, port);
else

brokerUrl =
String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port); MemoryPersistence persistence = new MemoryPersistence();

client = new MqttClient(brokerUrl, client_id, persistence);

MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("root.crt")); // Replace with
your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " +
e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

Configuring MQTT Authentication

To connect to MQTT Broker that requires MQTT Username and MQTT Password for authentication, add to username and password to the connection options like this:

if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);

Advanced Features

Setting Up Last Will & Testament

Configure the Last Will and Testament feature to specify a message that the broker will publish if the client unexpectedly disconnects. This helps inform other subscribers of the disconnected client's status.

Use the following code to set Last Will in the Connection Options:

// Example topic to subscribe
final String subscribeTopic = "will_topic/device1";

// Subscribing to a topic
client.subscribe(subscribeTopic);

Adjusting Keep Alive

MQTT maintains client-broker connections with a keep-alive mechanism. Adjust the keep-alive interval to control how frequently the client sends PINGREQ messages to the broker.

Modify the code below to suit your requirements:

connOpts.setKeepAlive(40);

Configuring Session Persistence

Session data of an MQTT Client include the Subscriptions made by the Client and any data that the Client would receive with QoS>0. The Client can get the MQTT Broker to store its session data across connections.

MQTT 3.1.1 Clients can set Clean Session = 0 to request the MQTT Broker to keep its session information stored across connections.

MQTT 3.1.1

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient(brokerUrl, client_id, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
connOpts.setSocketFactory(SocketFactory.getDefault()); // Use default socket factory
}

connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanSession(cleanSession); // Set Clean Session flag

// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");

// Continue with MQTT operations...

} catch (MqttException e) {
System.out.println("Error connecting to MQTT broker: " + e.getMessage());
e.printStackTrace();
}

MQTT 5 Clients can set Clean Start = 0 and Session Expiry Interval = 'N' to request the MQTT Broker to keep its session information stored across connections for 'N' seconds.

MQTT 3.1.1

import org.eclipse.paho.client.mqttv5.*;
import org.eclipse.paho.client.mqttv5.persist.MemoryPersistence;

MQTT 3.1.1

MemoryPersistence persistence = new MemoryPersistence();

try {
client = new MqttAsyncClient(brokerUrl, client_id, persistence);

MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
connOpts.setSocketFactory(SocketFactory.getDefault()); // Use default socket factory
}
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
connOpts.setCleanStart(cleanStart); // Set Clean Start flag
connOpts.setSessionExpiryInterval(sessionExpiryInterval); // Set Session Expiry Interval

// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
IMqttToken connectToken = client.connect(connOpts);
connectToken.waitForCompletion();
System.out.println("Connected");

// Continue with MQTT operations...
} catch (MqttException e) {
System.out.println("Error connecting to MQTT broker: " + e.getMessage());
e.printStackTrace();
}

Setting Maximum Packet Size

MQTT5 Client can request the MQTT Broker to only send data packets less than a specific size by setting it like this:

MQTT 3.1.1

connOpts.setMaximumPacketSize(256);

Publish

Sending Data

Efficiently distribute data to multiple subscribers by publishing it to designated topics with the following code snippet:

MQTT 3.1.1

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

String topic = "cmq/topic";
String message = "hello world"';
int qos = 1;
boolean retain = false;
client.publish(topic, message.getBytes(), qos, retain);

MQTT 5

import org.eclipse.paho.client.mqttv5.*;
import org.eclipse.paho.client.mqttv5.persist.MemoryPersistence;

// Publish message
String topic = "cmq/topic";
String message = "hello world";
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(1);
mqttMessage.setRetained(false);

// Publish message asynchronously
IMqttDeliveryToken publishToken = client.publish(topic, mqttMessage);
publishToken.waitForCompletion();
System.out.println("Message published");

Setting Retained Messages

Enable the retain flag when publishing a message to ensure the broker stores the last message for each topic. This guarantees that new subscribers receive the most recent message upon connecting.

To implement this, use the following code snippet:

boolean retain = false;
client.publish(topic, message.getBytes(), qos, retain);

Specifying QoS Levels

MQTT provides three levels of Quality of Service (QoS) for message delivery:

  • QoS 0 (At most once)
  • QoS 1 (At least once)
  • QoS 2 (Exactly once)

Specify the required QoS level when publishing MQTT messages using this code:

int qos = 1;
client.publish(topic, message.getBytes(), qos, retain);

Message Expiry Interval

The 'Message expiry interval' property sets a message's life span in seconds; if undelivered within this time, the broker discards it. MQTT5 supports this feature. MQTT5 Clients can set this while publishing data.

MQTT 5

import org.eclipse.paho.mqttv5.common.MqttProperties;
String topic ="topic1";
String messageContent= "hello world";
int qos = 1;
boolean retain = false;
MqttMessagemessage=newMqttMessage(messageContent.getBytes());
message.setQos(qos);
message.setRetained(retain);
MqttPropertiesproperties=newMqttProperties(); properties.setMessageExpiryInterval(60L);
message.setProperties(properties);
client.publish(topic, message);

Topic Alias

The 'Topic Alias' property allows clients to use a short alias instead of a full topic name, reducing message packet size and improving network efficiency.

import org.eclipse.paho.mqttv5.common.MqttProperties;
MqttConnectionOptions options=newMqttConnectionOptions();
MqttPropertiesproperties=newMqttProperties();
properties.setTopicAliasMaximum(10);
options.setMqttProperties(properties);
client.connect(options);

Properties associated with MQTT PUBLISH enhance message handling, providing context or instructions for brokers and clients. These properties, including message expiry intervals and topic aliases, optimize message delivery and network bandwidth.

Subscribe

Subscribing to Topic Filter

To receive data published by other clients, this client has to subscribe to a matching Topic Filter like this:

String topic = "cmq/topic";
int qos = 1;
client.subscribe(topic, qos);

This topic filter can match with an exact topic or it can have wildcards like # and +

Receiving Data

To receive data sent for the subscriptions, a callback function needs to be defined like this:

// Set callback
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
cause.printStackTrace();
// Handle reconnection or other actions
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message received:");
System.out.println(" Topic: " + topic);
System.out.println(" Message: " + new String(message.getPayload()));
System.out.println(" QoS: " + message.getQos());
System.out.println(" Retained: " + message.isRetained());
// Process the received message
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete: " + token.getMessage());
// Handle delivery confirmation
}
});

Unsubscribing from Topics

To stop receiving updates from a topic, use the code provided to unsubscribe.

// Unsubscribe from the topicawait
String topic="cmq/topic";
client.unsubscribe(topic);

Disconnecting the Client

Ensure a proper termination of your client's connection with the broker to avoid issues and resource leaks on both sides, thereby maintaining system stability.

Use the following code to disconnect the client from the broker:

try {
client.disconnect();
System.out.println("Disconnected from broker.");
} catch (MqttException e) {
System.out.println("Error disconnecting from broker: " + e.getMessage());
e.printStackTrace();
}

Building Your Business Logic

You have the opportunity to develop and customize your own intricate business logic within this environment, tailoring it precisely to your specific needs and objectives.

Implementing Best Practices

Client Identification Management

Assign distinct client IDs to individual devices to ensure accurate identification. For private instances, allocate unique IDs to each client; in shared environments, append a random string to guarantee ID uniqueness.

Data Structuring

Plan your data architecture proactively. Whether handling plain text, JSON-formatted data, or numerical values, ensure the structure aligns effectively with your application's specific requirements.

Robust Error Handling

Implement strong error management to handle MQTT connection failures, subscription problems, and message publishing errors effectively.

Securing Credentials

Safeguard sensitive information like usernames, passwords, and client IDs by not hard-coding them in your source code. Use environment variables or secure configuration files instead.

Regular Testing & Monitoring

Continuously test MQTT communication and monitor client metrics such as connection status, message throughput, and error rates to quickly identify and fix issues.

Optimizing Session Management

Choose between clean and persistent sessions (`clean: true` or `clean: false`) based on your need to retain subscriptions and queued messages across client connections.

Reconnect on Disconnect

Add code to attempt reconnection to the MQTT Broker when there is an unexpected disconnection. This will ensure that your client stays connected and does not lose any data.

Download Code

Download the complete code for client that uses Java MQTT Client Library to connect with our CrystalMQ broker or any broker of your choice.

MQTT 3.1.1

//paho mqttv3
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.SSLContext;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.security.KeyStore;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.TrustManagerFactory;
import java.security.KeyManagementException;
import java.util.Random;
import java.util.Scanner;
public class MqttClientv3 {
public static void main(String[] args) {
String client_id = "your_client_id";
String hostip = "public-mqtt-broker.bevywise.com";
boolean TLS = false; // set true to Enable TLS
boolean AUTH = true;
String client_username = "Access-Key";
String client_password = "Access-Token";
boolean websocket_connection=false; //set true to Enable websocket
String topic = "";
int qos = 0;
boolean retain = false;
String brokerUrl= "";
int port;
if(websocket_connection)
port= TLS ? 11433:10433;
else
port = TLS ? 8883 : 1883; // Use 8883 for SSL/TLS, 1883 for non-SSL

// Creating a new MQTT client instance
MqttClient client;
if(websocket_connection)
brokerUrl=String.format("%s://%s:%d", TLS ? "wss" : "ws", hostip, port);
else
brokerUrl = String.format("%s://%s:%d", TLS ? "ssl" : "tcp", hostip, port);

MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient(brokerUrl, client_id, persistence);
// Setting callbacks
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Received message on topic: " + topic);
System.out.println("Message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered successfully.");
}
});
// Setting connection options
MqttConnectOptions connOpts = new MqttConnectOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); //
Replace with your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " + e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.toCharArray());
}
connOpts.setCleanSession(true);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
// Example topic to subscribe
final String subscribeTopic = "will_topic/device1";
// Subscribing to a topic
client.subscribe(subscribeTopic);
// Publishing messages or subscribing/unsubscribing based on user input
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.println("1. Publish\n2. Subscribe\n3. Disconnect\n4. Unsubscribe");
int choice = scanner.nextInt();
scanner.nextLine(); // consume newline
switch (choice) {
case 1:
System.out.print("Enter Topic QoS(0/1/2) Retain(0/1) Count: ");
String[] input = scanner.nextLine().split(" ");
topic = input[0];
qos = Integer.parseInt(input[1]);
retain = Integer.parseInt(input[2]) == 1;
int count = Integer.parseInt(input[3]);
for (int i = 0; i < count; i++) { String message="{\" temp\":" + (new
Random().nextInt(50) + 1) + ",\" pressure\":" + (new Random().nextInt(100) +
1) + ",\" status\":" + (new Random().nextInt(2)) + "}" ; try {
client.publish(topic, message.getBytes(), qos, retain);
System.out.println("Published message: " + message);
} catch (MqttException e) {
System.out.println(" Error publishing message: " + e.getMessage());
e.printStackTrace();
}
Thread.sleep(6000); // 6 seconds
}
break;
case 2:
System.out.print(" Enter Topic QoS(0/1/2): ");
String[] subInput = scanner.nextLine().split(" ");
topic = subInput[0];
qos = Integer.parseInt(subInput[1]);
try {
client.subscribe(topic, qos);
System.out.println(" Subscribed to topic: " + topic);
} catch (MqttException e) {
System.out.println(" Error subscribing to topic: " + e.getMessage());
e.printStackTrace();
}
break;
case 3:
try {
client.disconnect();
System.out.println(" Disconnected from broker."); } catch (MqttException e) { System.out.println("Error disconnecting
from broker: " + e.getMessage());
e.printStackTrace();
}
break;
case 4:
System.out.print(" Enter Topic to unsubscribe: ");
topic = scanner.nextLine();
try {
client.unsubscribe(topic);
System.out.println(" Unsubscribed from topic: " + topic);
} catch (MqttException e) {
System.out.println(" Error unsubscribing from topic: " + e.getMessage());
e.printStackTrace();
}
break;
default:
System.out.println(" Invalid choice."); } } } catch (MqttException | InterruptedException e) { System.out.println("An
error occurred: " + e.getMessage());
e.printStackTrace();
}
}
// Method to create SSL socket factory (if needed)
private static SSLSocketFactory getSocketFactory(String caCrtFile) throws Exception {
CertificateFactory cf = CertificateFactory.getInstance(" X.509"); FileInputStream caInput=new
FileInputStream(caCrtFile); X509Certificate caCert=(X509Certificate)
cf.generateCertificate(caInput); caInput.close(); KeyStore
keyStore=KeyStore.getInstance(KeyStore.getDefaultType());
keyStore.load(null, null); keyStore.setCertificateEntry("ca-certificate",
caCert); TrustManagerFactory
tmf=TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(keyStore); SSLContext context=SSLContext.getInstance("TLS");
context.init(null, tmf.getTrustManagers(), null); return
context.getSocketFactory(); } }

MQTT 5

//paho mqttv5
import org.eclipse.paho.mqttv5.client.IMqttClient;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.SSLContext;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.security.KeyStore;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.cert.CertificateException;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.KeyManagerFactory;
import java.security.KeyManagementException;
import java.util.List;
import java.util.Random;
import java.util.Scanner;
public class MqttClientv5 {
public static void main(String[] args)throws Exception{
String client_id = "your_client_id";
String hostip = "public-mqtt-broker.bevywise.com";
boolean AUTH = true;
String client_username = "Access-Key";
String client_password = "Access-Token";
boolean websocket_connection=false; //set true to Enable websocket
String topic = "";
int qos = 0;
boolean retain = false;
String brokerUrl= "";
int port;
if(websocket_connection)
port= 10433;
else
port = 1883; // Use 8883 for SSL/TLS, 1883 for non-SSL

// Creating a new MQTT client instance
MqttClient client;
if(websocket_connection)
brokerUrl=String.format("%s://%s:%d", "ws", hostip, port);
else
brokerUrl = String.format("%s://%s:%d", "tcp", hostip, port);

MemoryPersistence persistence = new MemoryPersistence();
try {
client = new MqttClient(brokerUrl, client_id, persistence);
// Setting callbacks
client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
System.out.println("Disconnected from broker.");
}
@Override
public void mqttErrorOccurred(MqttException exception) {
System.out.println("MQTT Error: " + exception.getMessage());
exception.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Received message on topic: " + topic);
System.out.println("Message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttToken token) {
System.out.println("Message delivered successfully.");
}
@Override
public void authPacketArrived(int reasonCode, MqttProperties properties) {
System.out.println("Auth packet arrived with reason code: " + reasonCode);
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("Connected to broker: " + serverURI);
}
// Setting connection options
MqttConnectionOptions connOpts = new MqttConnectionOptions();
if (TLS) {
try {
connOpts.setSocketFactory(getSocketFactory("/path/to/your/root.crt")); //
Replace with your root cert path
} catch (Exception e) {
System.out.println("Error setting up SSL socket factory: " + e.getMessage());
e.printStackTrace();
}
}
if (AUTH) {
connOpts.setUserName(client_username);
connOpts.setPassword(client_password.getBytes()); // Convert to byte array
}
connOpts.setCleanStart(true); // Clean start for MQTTv5
connOpts.setMaxInflight(50);
// Connect to MQTT broker
System.out.println("Connecting to broker: " + brokerUrl);
client.connect(connOpts);
System.out.println("Connected");
// Example topic to subscribe
final String subscribeTopic = "will_topic/device1";
// Subscribing to a topic
client.subscribe(subscribeTopic, qos);
// Publishing messages or subscribing/unsubscribing based on user input
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.println("1. Publish\n2. Subscribe\n3. Disconnect\n4. Unsubscribe");
int choice = scanner.nextInt();
scanner.nextLine(); // consume newline
switch (choice) {
case 1:
System.out.print("Enter Topic QoS(0/1/2) Retain(0/1) Count: ");
String[] input = scanner.nextLine().split(" ");
topic = input[0];
qos = Integer.parseInt(input[1]);
retain = Integer.parseInt(input[2]) == 1;
int count = Integer.parseInt(input[3]);
for (int i = 0; i < count; i++) { //String message="{\" temp\":" + (new
Random().nextInt(50) + 1) + ",\" pressure\":" + (new Random().nextInt(100) +
1) + ",\" status\":" + (new Random().nextInt(2)==0 ? "ON" : "OFF" ) + "}" ;
String message="hello world" ; MqttMessage mqttMessage=new
MqttMessage(message.getBytes()); mqttMessage.setQos(qos);
mqttMessage.setRetained(retain); System.out.println("Message: "+message);
client.publish(topic, mqttMessage);
System.out.println(" Packet: "+i+" - sent"); } break; case 2: System.out.print("Enter Topic QoS(0/1/2): ");
input = scanner.nextLine().split(" ");
topic = input[0];
qos = Integer.parseInt(input[1]);
client.subscribe(topic, qos);
break;
case 3:
client.disconnect();
System.out.println(" Disconnected from broker."); return; case 4: System.out.print("Enter Topic: ");
topic = scanner.nextLine();
client.unsubscribe(topic);
break;
default:
System.out.println(" Invalid choice. Please try again."); } } } catch (MqttException e) { System.out.println("Error
connecting to MQTT broker: " + e.getMessage());
e.printStackTrace();
}
}
private static SSLSocketFactory getSocketFactory(String rootCertPath) throws Exception {
CertificateFactory cf = CertificateFactory.getInstance(" X.509"); FileInputStream fis=new
FileInputStream(rootCertPath); X509Certificate caCert=(X509Certificate)
cf.generateCertificate(fis); fis.close(); KeyStore
ks=KeyStore.getInstance("JKS"); ks.load(null, null);
ks.setCertificateEntry("ca-certificate", caCert); TrustManagerFactory
tmf=TrustManagerFactory.getInstance("SunX509"); tmf.init(ks); SSLContext
sslContext=SSLContext.getInstance("TLSv1.2"); sslContext.init(null,
tmf.getTrustManagers(), null); return sslContext.getSocketFactory(); } }

Create Executable Bundle

Steps for Connecting MqttClientv3:

Compile: javac -cp /path/to/org.eclipse.paho.client.mqttv3-1.2.5.jar MqttClientv3.java

Run : java -cp .:/path/to/org.eclipse.paho.client.mqttv3-1.2.5.jar MqttClientv3

(or) Run with JAR:

  • mkdir -p build/lib build/classes build/META-INF/
  • cp -r MqttClientv3.class build/classes/
  • cp -r /path/of/org.eclipse.paho.client.mqttv3-1.2.5.jar build/lib/
  • create a file name MANIFEST.MF:
  • Manifest-Version: 1.0
  • Main-Class: MqttClientExample
  • Class-Path: lib/org.eclipse.paho.client.mqttv3-1.2.5.jar
  • copy the above contents in that ,cp -r MANIFEST.MF build/META-INF/
  • cd build jar cvfm MqttClientv3.jar META-INF/MANIFEST.MF -C classes . -C lib .
  • java -jar MqttClientv3.jar -- java -jar MqttClientv3.jar

Steps for Connecting MqttClientv5:

Compile:javac -cp /path/to/org.eclipse.paho.mqttv5.client-1.2.5.jar MqttClientv5.java

Run:java -cp .:/path/to/org.eclipse.paho.mqttv5.client-1.2.5.jar MqttClientv5 (or) Run with JAR:

mkdir -p build/lib build/classes build/META-INF/

cp -r MqttClientv3.class build/classes/

cp -r /path/of/org.eclipse.paho.mqttv5.client-1.2.5.jar build/lib/

create a file name MANIFEST.MF:

Manifest-Version: 1.0 Main-Class: MqttClientExample Class-Path: lib/org.eclipse.paho.mqttv5.client-1.2.5.jar

copy the above contents in that, cp -r MANIFEST.MF build/META-INF/

cd build jar cvfm MqttClientv5.jar META-INF/MANIFEST.MF -C classes . -C lib .

java -jar MqttClientv5.jar --Use the build anywhere.

Connect your client to our state-of-the-art CrystalMQ broker or any broker of your choice. This powerful combination will ensure optimal performance and reliability for all your messaging needs, paving the way for a robust and efficient system integration.