RabbitMQ delayed message exchange plugin with Node.JS

The RabbitMQ Delayed Message Plugin adds a new exchange type to RabbitMQ where messages routed by that exchange can be delayed if the user adds a delay header to a message.

Sometimes you may want to delay the delivery of messages for a certain time so that subscribers doesn't see them immediately. The AMQP protocol doesn't have a native delayed queue feature. People has instead combined the message TTL function and the dead-lettering function to achieve this - guide for that can be found here.

There is also a plugin available for delayed messages, RabbitMQ delayed messages exchange plugin (targets RabbitMQ 3.5.8 and later versions and Erlang 18.0+). This plugin adds a new exchange type to RabbitMQ and messages routed to that exchange can be delayed for a specified amount of time. CloudAMQP customers on dedicated plans can enable the plugin from the plugins tab.

More information and information about how to install the plugin for non-customers can be found here.

x-delayed-message, x-delayed-type

We will start by declare an exchange providing the 'x-delayed-message' exchange type and also provided an 'x-delayed-type' argument. 'x-delayed-type' tell the plugin how the exchange should behave after the given delay time has passed by - in this case we want it to behave as a direct exchange.

pubChannel.assertExchange(exchange, "x-delayed-message", {autoDelete: false, durable: true, passive: true,  arguments: {'x-delayed-type':  "direct"}})

x-delay

When we have the exchange declared, we can publish messages providing a header telling the plugin how long time it should delay our message. Don't forget to bind a queue to the exchange.

pubChannel.publish(exchange, routingKey, content, {headers: {"x-delay": delay}},
function(err, ok) {
  if (err) {
    console.error("[AMQP] publish", err);
    pubChannel.connection.close();
  }
});

The plugin will proceed to route the message without delay if no x-delay header is provided.

Code example

When running the full code given below, a connection will be established between the RabbiMQ instance and your application. Queues and exchanges will be declared and created if they do not already exist and finally a message will be published with the 'x-delay' header - one message will be sent every 10 second. The publish method will queue messages internally if the connection is down and resend them later. The consumer subscribes to the jobs-queue. The messages are handled one by one and sent to the work process.

delay exchange message
delay exchange message
[AMQP] connected
Worker is started
work sent: 18:19:35 --- received: 18:19:45
work sent: 18:19:45 --- received: 18:19:55
work sent: 18:19:55 --- received: 18:20:05
work sent: 18:20:05 --- received: 18:20:15
var amqp = require('amqplib/callback_api');
var amqpConn = null;

function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 1000);
    }

    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });

    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
        return setTimeout(start, 1000);
    });

    console.log("[AMQP] connected");
    amqpConn = conn;
    whenConnected();
  });
}

function whenConnected() {
  startPublisher();
  startWorker();
}

var pubChannel = null;
var offlinePubQueue = [];
var exchange = 'my-delay-exchange';

function startPublisher() {
  amqpConn.createConfirmChannel(function(err, ch) {
    if (closeOnErr(err)) return;
      ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });
    pubChannel = ch;
    //assert the exchange: 'my-delay-exchange' to be a x-delayed-message,
    pubChannel.assertExchange(exchange, "x-delayed-message", {autoDelete: false, durable: true, passive: true,  arguments: {'x-delayed-type':  "direct"}})
    //Bind the queue: "jobs" to the exchnage: "my-delay-exchange" with the binding key "jobs"
    pubChannel.bindQueue('jobs', exchange ,'jobs');

    while (true) {
      var m = offlinePubQueue.shift();
      if (!m) break;
      publish(m[0], m[1], m[2]);
    }
  });
}

function publish(routingKey, content, delay) {
  try {
    pubChannel.publish(exchange, routingKey, content, {headers: {"x-delay": delay}},
      function(err, ok) {
        if (err) {
          console.error("[AMQP] publish", err);
          offlinePubQueue.push([exchange, routingKey, content]);
          pubChannel.connection.close();
        }
    });
  } catch (e) {
    console.error("[AMQP] failed", e.message);
    offlinePubQueue.push([routingKey, content, delay]);
  }
}
// A worker that acks messages only if processed succesfully
function startWorker() {
  amqpConn.createChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    ch.prefetch(10);
    ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
      if (closeOnErr(err)) return;
      ch.consume("jobs", processMsg, { noAck: false });
      console.log("Worker is started");
    });

    function processMsg(msg) {
      work(msg, function(ok) {
        try {
          if (ok)
            ch.ack(msg);
          else
            ch.reject(msg, true);
        } catch (e) {
          closeOnErr(e);
        }
      });
    }

    function work(msg, cb) {
      console.log(msg.content.toString() + " --- received: " + current_time());
      cb(true);
    }
  });
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

function current_time(){
  now = new Date();
  hour = "" + now.getHours(); if (hour.length == 1) { hour = "0" + hour; }
  minute = "" + now.getMinutes(); if (minute.length == 1) { minute = "0" + minute; }
  second = "" + now.getSeconds(); if (second.length == 1) { second = "0" + second; }
  return hour + ":" + minute + ":" + second;
}

//Publish a message every 10 second. The message will be delayed 10seconds.
setInterval(function() {
  var date = new Date();
  publish("jobs", new Buffer("work sent: " + current_time()), 10000);
}, 10000);

start();

As always, please email us at if you have any suggestions or feedback.

CloudAMQP - industry leading RabbitMQ as a service

Start your managed cluster today. CloudAMQP is 100% free to try.

13,000+ users including these smart companies