Node.js developers has a number of options for AMQP client libraries, three of them are shown with examples below.

WARNING: We do not recommend node-amqp as client library. It is for the moment an unmaintained project that lacks some basic features and has serious bugs.

Access CloudAMQP from Node.js using amqplib

A good option to start out with is amqplib.

Start by adding amqplib as a dependency to your package.json file.

Code example Publish and Subscribe

The following code snippet show how to connect and how to publish and consume messages. The publish method will queue messages internally if the connection is down and resend them later.

Full code example can be found here on github.

var amqp = require('amqplib/callback_api');

// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 1000);
    }
    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });
    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 1000);
    });
    console.log("[AMQP] connected");
    amqpConn = conn;
    whenConnected();
  });
}

function whenConnected() {
  startPublisher();
  startWorker();
}

var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
  amqpConn.createConfirmChannel(function(err, ch) {
    if (closeOnErr(err)) return;
      ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    pubChannel = ch;
    while (true) {
      var m = offlinePubQueue.shift();
      if (!m) break;
      publish(m[0], m[1], m[2]);
    }
  });
}

function publish(exchange, routingKey, content) {
  try {
    pubChannel.publish(exchange, routingKey, content, { persistent: true },
                      function(err, ok) {
                        if (err) {
                          console.error("[AMQP] publish", err);
                          offlinePubQueue.push([exchange, routingKey, content]);
                          pubChannel.connection.close();
                        }
                      });
  } catch (e) {
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
  }
}
// A worker that acks messages only if processed succesfully
function startWorker() {
  amqpConn.createChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });

    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    ch.prefetch(10);
    ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
      if (closeOnErr(err)) return;
      ch.consume("jobs", processMsg, { noAck: false });
      console.log("Worker is started");
    });

    function processMsg(msg) {
      work(msg, function(ok) {
        try {
          if (ok)
            ch.ack(msg);
          else
            ch.reject(msg, true);
        } catch (e) {
          closeOnErr(e);
        }
      });
    }
  });
}

function work(msg, cb) {
  console.log("Got msg ", msg.content.toString());
  cb(true);
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

setInterval(function() {
  publish("", "jobs", new Buffer("work work work"));
}, 1000);

start();

Access CloudAMQP from Node.js and amqp-coffee.

It is possible to use CloudAMQP with amqp-coffee, coffeescript and node.js.

Code example Publish and Subscribe

//node.coffee
AMQP = require('amqp-coffee')

//message to publish
msg = "Hello CloudAMQP"

//Creates a new amqp Connection.
amqpConnection = new AMQP {host: 'host', port:5672, vhost: 'your_vhost', login: 'your_vhost', password: 'your_password'}, (e, r)->
  if e?
    console.error "Error", e

  //Returns a channel that can be used to handle (declare, delete etc) queues.
  amqpConnection.queue {queue: "queueName"}, (e,q)->
    q.declare ()->
      q.bind "amq.direct", "queueName", ()->
      amqpConnection.publish "amq.direct", "queueName", msg, {confirm: true}, (err, res)->
      console.log "Message published: " + msg

    consumer = amqpConnection.consume "queueName", {prefetchCount: 2}, (message)->
      console.log("Message consumed: " + message.data.toString())
      message.ack()

    , (e,r)->
      console.log "Consumer setup"
      amqpConnection.publish "amqp.direct", "queueName", "message contents", {deliveryMode:2, confirm:true}, (e, r)->
        if !e? then console.log "Message Sent"

Access CloudAMQP from Node.js using Wascally

Another option is to connect via Wascally.

Start by adding wascally as a dependency to your package.json file.

Code example Publish and Subscribe

The following code snippets show how to connect and how to publish and consume messages:

//configuration.js
var config = {
  connection: {
    user: 'username', pass: 'password', server: 'server', port: 5672, vhost: 'username'
  },
  queues: [ { name: 'cloudamqp-queue', subscribe: true, durable: true } ],
  exchanges: [ { name: "exchange",type: "direct", persistent: true, durable: true } ],
  bindings: [ { exchange: 'exchange', target: 'cloudamqp-queue', keys: ["routingKey"] } ]
};

exports.config = config;
//subscriber.js
var rabbit = require("wascally");
var configuration = require( './configuration.js' )

rabbit.configure(configuration.config)
  .then(handleMessage)

function handleMessage(){
  //setting up the handler for the subscriber
  var handler = rabbit.handle("test.message", function( msg ) {
    try {
      console.log(msg.body);
      msg.ack();
    }
    catch( err ) {
      message.nack();
    }
  });
  console.log("waiting for message from publisher");
}
//publisher.js
var rabbit = require("wascally");
var configuration = require( './configuration.js' );

rabbit.configure(configuration.config)
  .then(publishMessage)

function publishMessage(){
  var message = "Hello CloudAMQP";
  var p = rabbit.publish("exchange", {
    type: "test.message",
    body: message,
    routingKey: "routingKey"
  });
}

Access CloudAMQP from Node.js using Jackrabbit

Another option is to connect via Jackrabbit. Jackrabbit is simple AMQP/RabbitMQ job queues for node.

Code example Publish and Subscribe

The following code snippet show how to connect and how to consume messages:

//subscriber.js
var jackrabbit = require('jackrabbit');
var url = process.env.CLOUDAMQP_URL || "amqp://localhost";

var rabbit = jackrabbit(url);
var exchange = rabbit.default();

var hello = exchange.queue({ name: 'example_queue', durable: true });
hello.consume(onMessage);

function onMessage(data,ack) {
  console.log('received:', data);
  ack();
}

The following code snippet show how to connect and how to publish messages:

//publisher.js
var jackrabbit = require('jackrabbit');

// Get the URL from ENV or default to localhost
var url = process.env.CLOUDAMQP_URL || "amqp://localhost";

// Connect to CloudAMQP
var rabbit = jackrabbit(url);
var exchange = rabbit.default();

var hello = exchange.queue({ name: 'example_queue', durable: true });

//publich message
exchange.publish({ msg: 'Hello CloudAMQP' }, { key: 'example_queue' });
exchange.on('drain', process.exit);

Alternative clients

amq
Extends amqplib with reconnection etc.
bramqp
Implementing 100% of the AMQP spec
rabbus
A highly opinionated set of messaging abstractions for RabbitMQ and NodeJS, built with Wascally
node-amqp
node-amqp is not recommended to use. node-amqp has a couple of drawbacks. It does not support framing (required for messages larger than 128KB) and it hides the channel concept so it is easy to shoot yourself in the foot, if you for instance call conn.exchange multiple times.

Video

Here's a video recording showing how to setup a CloudAMQP instance and connect to it using Node.js and the amqplib library: