CloudAMQP with JavaScript Getting started

JavaScript Client Libraries

JavaScript developers have several options for AMQP client libraries:

  • amqp-client.js - CloudAMQP's official TypeScript client with a high-level API: automatic reconnection, consumer recovery, message encoding, and RPC support. Runs in any JavaScript runtime (Node.js, Bun, Deno, the browser), with zero dependencies.
  • amqplib - A widely used callback/promise based client for Node.js. A good choice if you want low-level control.

amqp-client.js

amqp-client.js is CloudAMQP's official AMQP client for JavaScript. The high-level API manages connections for you: it reconnects automatically with exponential backoff, recovers your consumers after a reconnect, and publishes persistent messages by default. It's written in TypeScript, ships type definitions, and has zero runtime dependencies. It's the recommended choice for new JavaScript projects.

Installation

$ npm install @cloudamqp/amqp-client --save

Publishing Messages

Create a file named publish.js:

import { AMQPSession } from "@cloudamqp/amqp-client"

// Connect to CloudAMQP
const session = await AMQPSession.connect(process.env.CLOUDAMQP_URL)

// Publish a message
const queue = await session.queue("my_queue")
await queue.publish("Hello World")

console.log("Message published")
await session.stop()

Consuming Messages

Create a file named subscribe.js:

import { AMQPSession } from "@cloudamqp/amqp-client"

// Connect to CloudAMQP
const session = await AMQPSession.connect(process.env.CLOUDAMQP_URL)

// Consume messages. The message is acked after the callback returns,
// and nacked + requeued if it throws. The subscription survives reconnects.
console.log("[*] Waiting for messages. To exit press CTRL+C")
const queue = await session.queue("my_queue")
await queue.subscribe(async (msg) => {
  console.log(`[x] Received: ${msg.bodyString()}`)
})

Running the Application

In separate terminal windows, run:

# Terminal 1
$ export CLOUDAMQP_URL=amqps://user:pass@change-me.rmq.cloudamqp.com/user
$ node --enable-source-maps subscribe.js
[*] Waiting for messages. To exit press CTRL+C
# Terminal 2
$ export CLOUDAMQP_URL=amqps://user:pass@change-me.rmq.cloudamqp.com/user
$ node --enable-source-maps publish.js
Message published

Expected output in Terminal 1:

[x] Received: Hello World

Advanced Features

The high-level API includes built-in support for:

  • Automatic reconnection with exponential backoff and consumer recovery
  • Async iterator subscriptions
  • RPC client/server patterns
  • Automatic message encoding/decoding via codecs (JSON and text, gzip and deflate)
  • Persistent messages by default, so messages survive a broker restart

See the GitHub repository and the API documentation for the full reference. A low-level API AMQPClient is also available when you want to manage channels yourself.

Other runtimes and the browser

The same client runs in Node.js, Bun, Deno, and the browser. It picks TCP or WebSockets automatically from the connection URL, so a browser connects over WebSockets with no code changes. See AMQP over WebSockets for details.


amqplib

amqplib is a widely used client with a callback and promise based API. It's a solid choice when you want explicit control over connections and channels.

Start by adding amqplib as a dependency to your package.json file.

Code example Publish and Subscribe

The following code snippet shows how to connect and how to publish and consume messages. The publish method will queue messages internally if the connection is down and resend them later.

Full code example can be found on GitHub.

var amqp = require('amqplib/callback_api');

// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
  amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
    if (err) {
      console.error("[AMQP]", err.message);
      return setTimeout(start, 1000);
    }
    conn.on("error", function(err) {
      if (err.message !== "Connection closing") {
        console.error("[AMQP] conn error", err.message);
      }
    });
    conn.on("close", function() {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 1000);
    });
    console.log("[AMQP] connected");
    amqpConn = conn;
    whenConnected();
  });
}

function whenConnected() {
  startPublisher();
  startWorker();
}

var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
  amqpConn.createConfirmChannel(function(err, ch) {
    if (closeOnErr(err)) return;
      ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });
    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    pubChannel = ch;
    while (true) {
      var m = offlinePubQueue.shift();
      if (!m) break;
      publish(m[0], m[1], m[2]);
    }
  });
}

function publish(exchange, routingKey, content) {
  try {
    pubChannel.publish(exchange, routingKey, content, { persistent: true },
                      function(err, ok) {
                        if (err) {
                          console.error("[AMQP] publish", err);
                          offlinePubQueue.push([exchange, routingKey, content]);
                          pubChannel.connection.close();
                        }
                      });
  } catch (e) {
    console.error("[AMQP] publish", e.message);
    offlinePubQueue.push([exchange, routingKey, content]);
  }
}
// A worker that acks messages only if processed succesfully
function startWorker() {
  amqpConn.createChannel(function(err, ch) {
    if (closeOnErr(err)) return;
    ch.on("error", function(err) {
      console.error("[AMQP] channel error", err.message);
    });

    ch.on("close", function() {
      console.log("[AMQP] channel closed");
    });

    ch.prefetch(10);
    ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
      if (closeOnErr(err)) return;
      ch.consume("jobs", processMsg, { noAck: false });
      console.log("Worker is started");
    });

    function processMsg(msg) {
      work(msg, function(ok) {
        try {
          if (ok)
            ch.ack(msg);
          else
            ch.reject(msg, true);
        } catch (e) {
          closeOnErr(e);
        }
      });
    }
  });
}

function work(msg, cb) {
  console.log("Got msg ", msg.content.toString());
  cb(true);
}

function closeOnErr(err) {
  if (!err) return false;
  console.error("[AMQP] error", err);
  amqpConn.close();
  return true;
}

setInterval(function() {
  publish("", "jobs", new Buffer.from("work work work"));
}, 1000);

start();

Video

Here's a video recording showing how to set up a CloudAMQP instance and connect to it using Node.js and the amqplib library: