NATS with JetStream introduction

Sat, Apr 10, 2021 6-minute read

NATS describes itself like this: “NATS messaging enables the exchange of data that is segmented into messages among computer applications and services. These messages are addressed by subjects and do not depend on network location. This provides an abstraction layer between the application or service and the underlying physical network. Data is encoded and framed as a message and sent by a publisher. The message is received, decoded, and processed by one or more subscribers.” Simply said, if you are using RabbitMQ, ActiveMQ or any other MQ, chances are, that NATS can be very fast and easier to run&maintain alternative.

Here are a several features I would like to highlight:

  • it’s written in Golang thus it’s very performant and there is a version for Windows too. There are actually versions for many operating systems like Linux (of different flavours), FreeBSD, MacOS etc. see here

  • it’s very easy to setup. As many other Golang projects, the build is just a one binary, so for Windows we have nats-server.exe that does everything. To get NATS running in most default configuration, we can just run this exe file without any parameters.

$ nats-server.exe
[26404] 2021/04/12 07:31:56.012368 [INF] Starting nats-server
[26404] 2021/04/12 07:31:56.029109 [INF]   Version:  2.2.1
[26404] 2021/04/12 07:31:56.029679 [INF]   Git:      [0bdd8f8]
[26404] 2021/04/12 07:31:56.030862 [INF]   Name:     NCFMCCHJFJED2BF5AKDY7ZT6O6ZQO2F4O5A3GGAAYIB7XIIDMVL4424T
[26404] 2021/04/12 07:31:56.030862 [INF]   ID:       NCFMCCHJFJED2BF5AKDY7ZT6O6ZQO2F4O5A3GGAAYIB7XIIDMVL4424T
[26404] 2021/04/12 07:31:56.034875 [INF] Listening for client connections on 0.0.0.0:4222
[26404] 2021/04/12 07:31:56.296552 [INF] Server is ready
[26404] 2021/04/12 07:31:59.375904 [INF] Initiating Shutdown...
[26404] 2021/04/12 07:31:59.377554 [INF] Server Exiting..
  • NATS can provide communication of different scenarios. We have standard publish/subscribe, queue groups but also request/reply scenario I like most. In this scenario we, as a publisher, send a request and wait (timeout is used here) for any subscribers to reply. If there are many subscribers, the fastest wins. It’s a great alternative for e.g. standard REST based microservices or a way how to decouple our (majestic/modular) monolith application internally.

  • NATS, in the latest version (starts in v. 2.2.0), added support for MQTT. Messages are routed internally between those two protocols, thus you can have MQTT publisher sending messages to NATS and NATS native subscribers receiving those messages. Details are described here.

To run NATS with MQTT enabled we need a configuration file as the one below with minimal MQTT configuration. Actually we have a configuration for both - native NATS communication protocol port (4222) and standard MQTT protocol port configuration (1883) .

listen: 127.0.0.1:4222

mqtt {
    # Specify a host and port to listen for websocket connections
    #
    # listen: "host:port"
    # It can also be configured with individual parameters,
    # namely host and port.
    #
    # host: "hostname"
    port: 1883
    
    # TLS configuration.
    #
    
    
    # If no user name is provided when an MQTT client connects, will default
    # this user name in the authentication phase. If specified, this will
    # override, for MQTT clients, any `no_auth_user` value defined in the
    # main configuration file.
    # Note that this is not compatible with running the server in operator mode.
    #
    # no_auth_user: "my_username_for_apps_not_providing_credentials"
    
    # See below to know what is the normal way of limiting MQTT clients
    # to specific users.
    # If there are no users specified in the configuration, this simple authorization
    # block allows you to override the values that would be configured in the
    # equivalent block in the main section.
    #
    authorization {
    #     # If this is specified, the client has to provide the same username
    #     # and password to be able to connect.
        username: "admin"
        password: "admin"
        #
        #     # If this is specified, the password field in the CONNECT packet has to
        #     # match this token.
        #     # token: "my_token"
        #
        #     # This overrides the main's authorization timeout. For consistency
        #     # with the main's authorization configuration block, this is expressed
        #     # as a number of seconds.
        #     # timeout: 2.0
    }
    
    # This is the amount of time after which a QoS 1 message sent to
    # a client is redelivered as a DUPLICATE if the server has not
    # received the PUBACK packet on the original Packet Identifier.
    # The value has to be positive.
    # Zero will cause the server to use the default value (30 seconds).
    # Note that changes to this option is applied only to new MQTT subscriptions.
    #
    # Expressed as a time duration, with "s", "m", "h" indicating seconds,
    # minutes and hours respectively. For instance "10s" for 10 seconds,
    # "1m" for 1 minute, etc...
    #
    # ack_wait: "1m"
    
    # This is the amount of QoS 1 messages the server can send to
    # a subscription without receiving any PUBACK for those messages.
    # The valid range is [0..65535].
    #
    # The total of subscriptions' max_ack_pending on a given session cannot
    # exceed 65535. Attempting to create a subscription that would bring
    # the total above the limit would result in the server returning 0x80
    # in the SUBACK for this subscription.
    # Due to how the NATS Server handles the MQTT "#" wildcard, each
    # subscription ending with "#" will use 2 times the max_ack_pending value.
    # Note that changes to this option is applied only to new subscriptions.
    #
    # max_ack_pending: 100
}

To run NATS with this configuration file with JetStream persistence enabled, we can simply issue a following command:

$ nats-server.exe -js -c basic.config
[3556] 2021/04/12 07:43:16.003023 [INF] Starting nats-server
[3556] 2021/04/12 07:43:16.020761 [INF]   Version:  2.2.1
[3556] 2021/04/12 07:43:16.021287 [INF]   Git:      [0bdd8f8]
[3556] 2021/04/12 07:43:16.022333 [INF]   Name:     NDKHAYGOY36JWM53K6SRAUD6DN3J7KAOB2S6444TW7CCD325UHTAR4C5
[3556] 2021/04/12 07:43:16.022866 [INF]   Node:     5ytEAgrW
[3556] 2021/04/12 07:43:16.024457 [INF]   ID:       NDKHAYGOY36JWM53K6SRAUD6DN3J7KAOB2S6444TW7CCD325UHTAR4C5
[3556] 2021/04/12 07:43:16.025506 [INF] Using configuration file: basic.config
[3556] 2021/04/12 07:43:16.027598 [INF] Starting JetStream
[3556] 2021/04/12 07:43:16.028683 [INF]     _ ___ _____ ___ _____ ___ ___   _   __  __
[3556] 2021/04/12 07:43:16.029215 [INF]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[3556] 2021/04/12 07:43:16.030280 [INF] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[3556] 2021/04/12 07:43:16.031381 [INF]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[3556] 2021/04/12 07:43:16.032528 [INF]
[3556] 2021/04/12 07:43:16.033595 [INF]          https://docs.nats.io/jetstream
[3556] 2021/04/12 07:43:16.034659 [INF]
[3556] 2021/04/12 07:43:16.035713 [INF] ---------------- JETSTREAM ----------------
[3556] 2021/04/12 07:43:16.036246 [INF]   Max Memory:      11.94 GB
[3556] 2021/04/12 07:43:16.037312 [INF]   Max Storage:     1.00 TB
[3556] 2021/04/12 07:43:16.037898 [INF]   Store Directory: "C:\\Temp\\nats\\jetstream"
[3556] 2021/04/12 07:43:16.038940 [INF] -------------------------------------------
[3556] 2021/04/12 07:43:16.062642 [INF]   Restored 0 messages for Stream "$MQTT_msgs"
[3556] 2021/04/12 07:43:16.082644 [INF]   Restored 0 messages for Stream "$MQTT_rmsgs"
[3556] 2021/04/12 07:43:16.083645 [INF]   Recovering 1 Consumers for Stream - "$MQTT_rmsgs"