Apache ActiveMQ Artemis and SpringBoot using MQTT

Sat, Apr 3, 2021 2-minute read

Let’s add something to our existing Artemis and Spring Boot integration. The former example was based on JMS(AMQP) communication protocol. To enhance our example we add MQTT producer and consumer today. Just little remainder. This implementation is a clear proof-of-concept just to explore how things are supposed to work. It’s not production quality code.

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.4.5</version>
</dependency>
  • Next step is to create a several @Bean configurations.
@Configuration
public class MQTTConfig {

  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[]{"tcp://0.0.0.0:1883"});
    options.setUserName("admin");
    options.setPassword("admin".toCharArray());
    factory.setConnectionOptions(options);

    return factory;
  }

  @Bean
  @ServiceActivator(inputChannel = "mqttOutboundChannel")
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler =
            new MqttPahoMessageHandler("javaTestClient", mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("prices.mqtt.east");
    messageHandler.setDefaultQos(1);
    return messageHandler;
  }

  @Bean
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }

  @Bean
  public IntegrationFlow mqttInFlow() {
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p + ", received from MQTT")
            .handle(message -> System.out.println("MQTT Message received: " + message))
            .get();
  }

  @Bean
  public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("javaMqqtClient",
            mqttClientFactory(), "prices.mqtt.east");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
  }

  @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  public interface MQTTGateway {
    void sendToMqtt(String data);
  }
}
  • For being able to send a message using MQTT, let’s create a small @RestController. We will inject MQTTGateway interface which is magically bound to proper implementation. In our controller, we can simply rely on it.
@RestController
@RequestMapping("/mqtt")
@AllArgsConstructor
public class MQQTProducerController {

  private final MQTTConfig.MQTTGateway mqttGateway;

  @GetMapping
  public String send(@RequestParam String m) {
    mqttGateway.sendToMqtt(m);
    return "Message sent: "+m;
  }
}

To send a message using MQTT producer let’s just call URL in very simple way: http://localhost:8080/mqtt?m=my_message . Hmm, what I can see now is that both, MQTT and JMS receivers, are getting the message, and we can see a proper console output. Now let’s try to send a message using JMS http://localhost:8080/amq?m=my_message. This way, strangely, the message is received by JMS receiver only, no MQTT receiver console message. Have no idea why it is, yet. To see what is required for JMS & MQTT to work together see this https://stackoverflow.com/questions/66932646/message-sent-using-jms-producer-is-not-received-in-mqtt-receiver-in-the-same-spr