How to run RabbitMQ with Node.JS

This tutorial explores how to work with RabbitMQ in NodeJS. Code is available on Github.

This article assumes that you have a running RabbitMQ server installed, if not - get started with a free RabbitMQ plan at CloudAMQP.

There are multiple NodeJS clients for RabbitMQ. We focus on the actively developed amqplib and Rascal libraries. Whereas amqplib, as a barebones implementation of AMQP, gives the developer flexibility, Rascal handles common tasks to reduce the potential for programming errors.

What is amqplib?

The amqplib library is among the few remaining unabandoned NodeJS RabbitMQ libraries. No assumptions are made regarding your use of AMQP. Connections are not retried, the topology is not automatically created, and you must handle message failures.

Amqplib exposes synchronous and asynchronous methods over the same API. You may choose between implementing an application using promises or callbacks.

Publishing messages in amqplib

The amqplib library uses the AMQP 0.9.1 protocol. Channels created on connections push messages to RabbitMQ.

Use the following code to create your publisher:

const amqplib = require('amqplib');
var assert = require('assert');
var util = require('util');

var rabbit_user= process.env.RABBITMQ_USER;
var rabbit_pwd = process.env.RABBITMQ_PWD;
var rabbit_host = process.env.RABBITMQ_HOST;
var rabbit_port = process.env.RABBITMQ_PORT;
var vhost = process.env.RABBIT_VHOST;

var amql_url = util.format("amqp://%s:%s@%s:%s/%s", rabbit_user, rabbit_pwd, rabbit_host, rabbit_port, vhost);

async function produce(){
    console.log("Publishing");
    var conn = await amqplib.connect(amql_url, "heartbeat=60");
    var ch = await conn.createChannel()
    var exch = 'test_exchange';
    var q = 'test_queue';
    var rkey = 'test_route';
    var msg = 'Hello World!';
    await ch.assertExchange(exch, 'direct', {durable: true}).catch(console.error);
    await ch.assertQueue(q, {durable: true});
    await ch.bindQueue(q, exch, rkey);
    await ch.publish(exch, rkey, Buffer.from(msg));
    setTimeout( function()  {
        ch.close();
        conn.close();},  500 );
}
produce();

This producer connects to the broker, creates our exchange and queue, binds the queue to the exchange, and then publishes our test message. We close the channel and connection when finished.

Consuming messages in amqplib

Programs using amqplib use the consume function to receive messages from RabbitMQ:

const amqplib = require('amqplib');
var assert = require('assert');
var util = require('util');

var rabbit_user= process.env.RABBITMQ_USER;
var rabbit_pwd = process.env.RABBITMQ_PWD;
var rabbit_host = process.env.RABBITMQ_HOST;
var rabbit_port = process.env.RABBITMQ_PORT;
var vhost = process.env.RABBIT_VHOST;

var amql_url = util.format("amqp://%s:%s@%s:%s/%s", rabbit_user, rabbit_pwd, rabbit_host, rabbit_port, vhost);

async function do_consume() {
    var conn = await amqplib.connect(amql_url, "heartbeat=60");
    var ch = await conn.createChannel()
    var q = 'test_queue';
    await conn.createChannel();
    await ch.assertQueue(q, {durable: true});
    await ch.consume(q, function (msg) {
        console.log(msg.content.toString());
        ch.ack(msg);
        ch.cancel('myconsumer');
        }, {consumerTag: 'myconsumer'});
    setTimeout( function()  {
        ch.close();
        conn.close();},  500 );
}

do_consume();

We supply a message handler in the call to consume, closing the connection and channel when the consumer terminates. There is no need to use await with the call to do_consume.

A rascal broker

As a wrapper around amqplib, Rascal attempts to simplify the use of RabbitMQ and other AMQP based frameworks in nodejs. The inability of amqplib to deal with connection issues and unparseable content can pose a serious problem for microservices.

Rascal attempts to resolve these issues and add common features such as:

  • automated queue and exchange creation
  • automated reconnection and resubscription
  • flow control
  • redelivery limits
  • handles for specific failures
  • safe defaults
  • encryption

The library stores publication and subscription profiles which manage connections to the underlying RabbitMQ broker.

Configuring rascal

Rascal creates a specialized broker for each connection which manages publication and consumption from RabbitMQ. Set up the broker using a json configuration file:

{
  "vhosts": {
    "test": {
      "connection": {
        "url": "amqp://user:pwd@127.0.0.1:5672/test"
        },
         "exchanges": {
            "test_exchange": {
               "assert": true,
               "type": "direct"
            }
         },
         "queues": [
            "test_queue"
         ],
         "bindings": {
            "b1": {
               "source": "test_exchange",
               "destination": "test_queue",
               "destinationType": "queue",
               "bindingKey": "test_route"
            }
         },
         "publications": {
            "demo_publication": {
               "vhost": "test",
               "exchange": "test_exchange",
               "routingKey": "test_route"
            }
         },
         "subscriptions": {
            "demo_subscription": {
               "queue": "test_queue",
               "prefetch": 1
            }
         }
      }
   }
}

This file tells Rascal to create the test_exchange and binds the test_queue to the exchange. The tool attempts to set reasonable default values for missing variables.

A Rascal publisher

Rascal brokers perform most of the work we needed to publish to RabbitMQ using amqplib. Creating the broker gives us access to an underlying publisher:

const Broker = require('rascal').BrokerAsPromised;
const config = require('./config.json');

async function rascal_produce(){
    console.log("Publishing");
    var msg = 'Hello World!';
    const broker = await Broker.create(config);
    broker.on('error', console.error);
    const publication = await broker.publish('demo_publication', msg);
    publication.on('error', console.error);
    console.log("Published")
}

rascal_produce().catch(console.error);

The Rascal broker created and bound test_queue to test_exchange in the create function. We use the publish method to send a message to RabbitMQ using the specified publisher from the config.json file.

Rascal Subscriber

Rascal uses subscription profiles when receiving messages.These profiles set up consumers:

const Broker = require('rascal').BrokerAsPromised;
const config = require('./config.json');

async function rascal_consume(){
  console.log("Consuming");
  const broker = await Broker.create(config);
  broker.on('error', console.error);
  const subscription = await broker.subscribe('demo_subscription', 'b1');
  subscription.on('message', (message, content, ackOrNack)=>{
      console.log(content);
      ackOrNack();
      subscription.cancel();
  });
  subscription.on('error', console.error);
  subscription.on(‘invalid_content’, (msg) =>{
console.log(‘Failed to parse message’);
});
}

rascal_consume().catch(console.error);

Our asynchronous consumer sends messages to the appropriately registered functions based on the configuration above. Error handlers allow us to log failures.

Does Rascal nack my messages?

It is important to be aware that Rascal nacks messages. This occurs when there is no invalid content listener or the application exceeds the subscriber’s redelivery limit. Invalid content may include unparseable characters or corrupted messages.

Negative acknowledgment allows for bulk rejection. To avoid constantly requeuing failed messages, specify a special invalid content listener as follows:

try{
  const subscription = await broker.subscribe(“my_subscriber”)
  subscription.on(“message”, (message, content, ackOrNack) => {
      // handle message
  }).on(“invalid_content”, (err, message, ackOrNack) => {
     // handle message with invalid content
  })
} catch(err){
  console.error(“Subscription does not exist”)
}

It is likely that a message with bad content will fail in other consumers as well. Handling this up front keeps your applications and microservices healthy. Use logstash or another third party tool to receive notifications and diagnose the underlying issue.

Which nodejs amqp library should I use?

The library you choose depends on the features you need to implement. If you are building complex software using enterprise integration patterns, Rascal offers additional benefits over the barebones amqplib.

Smaller applications may opt to use only amqplib. The features you need should dictate which library you choose.

RabbitMQ clients in Nodejs

The npm registry lists multiple RabbitMQ nodejs clients. While most are abandoned, Rascal and amqplib client are in active development. The amqplib client gives you flexibility through a barebones AMQP 0.9.1 API. You can use the Rascal wrapper to stabilize more complicated applications.

Any client that connects to RabbitMQ works with CloudAMQP.

CloudAMQP - industry leading RabbitMQ as a service

Start your managed cluster today. CloudAMQP is 100% free to try.

13,000+ users including these smart companies