CloudAMQP with Node.js Getting started

Node.js developers has a number of options for AMQP client libraries, three of them are shown with examples below.

WARNING: We do not recommend node-amqp as client library. It is for the moment an unmaintained project that lacks some basic features and has serious bugs.

Access CloudAMQP from Node.js using amqplib

A good option to start out with is amqplib.

Start by adding amqplib as a dependency to your package.json file.

Code example Publish and Subscribe

The following code snippet show how to connect and how to publish and consume messages. The publish method will queue messages internally if the connection is down and resend them later.

Full code example can be found on GitHub.

var amqp = require('amqplib/callback_api');

// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 1000);
    }
    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });
    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 1000);
    });
    console.log("[AMQP] connected");
    amqpConn = conn;
    whenConnected();
  });
}

function whenConnected() {
  startPublisher();
  startWorker();
}

var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
  amqpConn.createConfirmChannel(function(err, ch) {
    if (closeOnErr(err)) return;
      ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    pubChannel = ch;
    while (true) {
      var m = offlinePubQueue.shift();
      if (!m) break;
      publish(m[0], m[1], m[2]);
    }
  });
}

function publish(exchange, routingKey, content) {
  try {
    pubChannel.publish(exchange, routingKey, content, { persistent: true },
                      function(err, ok) {
                        if (err) {
                          console.error("[AMQP] publish", err);
                          offlinePubQueue.push([exchange, routingKey, content]);
                          pubChannel.connection.close();
                        }
                      });
  } catch (e) {
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
  }
}
// A worker that acks messages only if processed succesfully
function startWorker() {
  amqpConn.createChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });

    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    ch.prefetch(10);
    ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
      if (closeOnErr(err)) return;
      ch.consume("jobs", processMsg, { noAck: false });
      console.log("Worker is started");
    });

    function processMsg(msg) {
      work(msg, function(ok) {
        try {
          if (ok)
            ch.ack(msg);
          else
            ch.reject(msg, true);
        } catch (e) {
          closeOnErr(e);
        }
      });
    }
  });
}

function work(msg, cb) {
  console.log("Got msg ", msg.content.toString());
  cb(true);
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

setInterval(function() {
  publish("", "jobs", new Buffer.from("work work work"));
}, 1000);

start();

Code example Publish and Subscribe

AMQP = require('amqp-coffee')

# message to publish
msg = "Hello CloudAMQP"

# Creates a new amqp Connection.
amqpConnection = new AMQP {host: 'host', port:5672, vhost: 'your_vhost', login: 'your_vhost', password: 'your_password'}, (e, r)->
  if e?
    console.error "Error", e

  # Returns a channel that can be used to handle (declare, delete etc) queues.
  amqpConnection.queue {queue: "queueName"}, (e,q)->
    q.declare ()->
      q.bind "amq.direct", "queueName", ()->
      amqpConnection.publish "amq.direct", "queueName", msg, {confirm: true}, (err, res)->
      console.log "Message published: " + msg

    consumer = amqpConnection.consume "queueName", {prefetchCount: 2}, (message)->
      console.log("Message consumed: " + message.data.toString())
      message.ack()

    , (e,r)->
      console.log "Consumer setup"
      amqpConnection.publish "amqp.direct", "queueName", "message contents", {deliveryMode:2, confirm:true}, (e, r)->
        if !e? then console.log "Message Sent"

Access CloudAMQP from Node.js using Jackrabbit

Another option is to connect via Jackrabbit. Jackrabbit is simple AMQP/RabbitMQ job queues for node.

Code example Publish and Subscribe

The following code snippet show how to connect and how to consume messages:

//subscriber.js
var jackrabbit = require('jackrabbit');
var url = process.env.CLOUDAMQP_URL || "amqp://localhost";

var rabbit = jackrabbit(url);
var exchange = rabbit.default();

var hello = exchange.queue({ name: 'example_queue', durable: true });
hello.consume(onMessage);

function onMessage(data,ack) {
  console.log('received:', data);
  ack("");
}

The following code snippet show how to connect and how to publish messages:

//publisher.js
var jackrabbit = require('jackrabbit');

// Get the URL from ENV or default to localhost
var url = process.env.CLOUDAMQP_URL || "amqp://localhost";

// Connect to CloudAMQP
var rabbit = jackrabbit(url);
var exchange = rabbit.default();

var hello = exchange.queue({ name: 'example_queue', durable: true });

//publish message
exchange.publish({ msg: 'Hello CloudAMQP' }, { key: 'example_queue' });

Alternative clients

bramqp
Implementing 100% of the AMQP spec

Video

Here's a video recording showing how to setup a CloudAMQP instance and connect to it using Node.js and the amqplib library: