Apache ActiveMQ Artemis and SpringBoot using MQTT
Let’s add something to our existing Artemis and Spring Boot integration. The former example was based on JMS(AMQP) communication protocol. To enhance our example we add MQTT producer and consumer today. Just little remainder. This implementation is a clear proof-of-concept just to explore how things are supposed to work. It’s not production quality code.
- We are adding new dependency to pom. Details about usage are available here https://docs.spring.io/spring-integration/docs/current/reference/html/mqtt.html
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.5</version>
</dependency>
- Next step is to create a several @Bean configurations.
@Configuration
public class MQTTConfig {
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{"tcp://0.0.0.0:1883"});
options.setUserName("admin");
options.setPassword("admin".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("javaTestClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("prices.mqtt.east");
messageHandler.setDefaultQos(1);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", received from MQTT")
.handle(message -> System.out.println("MQTT Message received: " + message))
.get();
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("javaMqqtClient",
mqttClientFactory(), "prices.mqtt.east");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MQTTGateway {
void sendToMqtt(String data);
}
}
- For being able to send a message using MQTT, let’s create a small
@RestController
. We will injectMQTTGateway
interface which is magically bound to proper implementation. In our controller, we can simply rely on it.
@RestController
@RequestMapping("/mqtt")
@AllArgsConstructor
public class MQQTProducerController {
private final MQTTConfig.MQTTGateway mqttGateway;
@GetMapping
public String send(@RequestParam String m) {
mqttGateway.sendToMqtt(m);
return "Message sent: "+m;
}
}
To send a message using MQTT producer let’s just call URL in very simple way:
http://localhost:8080/mqtt?m=my_message
. Hmm, what I can see now is that both, MQTT and JMS receivers, are getting the message, and we can see a proper console output. Now let’s try to send a message using JMS http://localhost:8080/amq?m=my_message
. This way, strangely, the message is received by JMS receiver only, no MQTT receiver console message. Have no idea why it is, yet. To see what is required for JMS & MQTT to work together see this https://stackoverflow.com/questions/66932646/message-sent-using-jms-producer-is-not-received-in-mqtt-receiver-in-the-same-spr