JavaScript developers have several options for AMQP client libraries:
amqp-client.js is CloudAMQP's official AMQP client for JavaScript. The high-level API manages connections for you: it reconnects automatically with exponential backoff, recovers your consumers after a reconnect, and publishes persistent messages by default. It's written in TypeScript, ships type definitions, and has zero runtime dependencies. It's the recommended choice for new JavaScript projects.
$ npm install @cloudamqp/amqp-client --save
Create a file named
publish.js:
import { AMQPSession } from "@cloudamqp/amqp-client"
// Connect to CloudAMQP
const session = await AMQPSession.connect(process.env.CLOUDAMQP_URL)
// Publish a message
const queue = await session.queue("my_queue")
await queue.publish("Hello World")
console.log("Message published")
await session.stop()
Create a file named
subscribe.js:
import { AMQPSession } from "@cloudamqp/amqp-client"
// Connect to CloudAMQP
const session = await AMQPSession.connect(process.env.CLOUDAMQP_URL)
// Consume messages. The message is acked after the callback returns,
// and nacked + requeued if it throws. The subscription survives reconnects.
console.log("[*] Waiting for messages. To exit press CTRL+C")
const queue = await session.queue("my_queue")
await queue.subscribe(async (msg) => {
console.log(`[x] Received: ${msg.bodyString()}`)
})
In separate terminal windows, run:
# Terminal 1
$ export CLOUDAMQP_URL=amqps://user:pass@change-me.rmq.cloudamqp.com/user
$ node --enable-source-maps subscribe.js
[*] Waiting for messages. To exit press CTRL+C
# Terminal 2
$ export CLOUDAMQP_URL=amqps://user:pass@change-me.rmq.cloudamqp.com/user
$ node --enable-source-maps publish.js
Message published
Expected output in Terminal 1:
[x] Received: Hello World
The high-level API includes built-in support for:
See the GitHub repository and the API documentation for the full reference. A low-level API AMQPClient is also available when you want to manage channels yourself.
The same client runs in Node.js, Bun, Deno, and the browser. It picks TCP or WebSockets automatically from the connection URL, so a browser connects over WebSockets with no code changes. See AMQP over WebSockets for details.
amqplib is a widely used client with a callback and promise based API. It's a solid choice when you want explicit control over connections and channels.
Start by adding amqplib as a dependency to your package.json file.
The following code snippet shows how to connect and how to publish and consume messages. The publish method will queue messages internally if the connection is down and resend them later.
Full code example can be found on GitHub.
var amqp = require('amqplib/callback_api');
// if the connection is closed or fails to be established at all, we will reconnect
var amqpConn = null;
function start() {
amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) {
if (err) {
console.error("[AMQP]", err.message);
return setTimeout(start, 1000);
}
conn.on("error", function(err) {
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
amqpConn = conn;
whenConnected();
});
}
function whenConnected() {
startPublisher();
startWorker();
}
var pubChannel = null;
var offlinePubQueue = [];
function startPublisher() {
amqpConn.createConfirmChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
pubChannel = ch;
while (true) {
var m = offlinePubQueue.shift();
if (!m) break;
publish(m[0], m[1], m[2]);
}
});
}
function publish(exchange, routingKey, content) {
try {
pubChannel.publish(exchange, routingKey, content, { persistent: true },
function(err, ok) {
if (err) {
console.error("[AMQP] publish", err);
offlinePubQueue.push([exchange, routingKey, content]);
pubChannel.connection.close();
}
});
} catch (e) {
console.error("[AMQP] publish", e.message);
offlinePubQueue.push([exchange, routingKey, content]);
}
}
// A worker that acks messages only if processed succesfully
function startWorker() {
amqpConn.createChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.prefetch(10);
ch.assertQueue("jobs", { durable: true }, function(err, _ok) {
if (closeOnErr(err)) return;
ch.consume("jobs", processMsg, { noAck: false });
console.log("Worker is started");
});
function processMsg(msg) {
work(msg, function(ok) {
try {
if (ok)
ch.ack(msg);
else
ch.reject(msg, true);
} catch (e) {
closeOnErr(e);
}
});
}
});
}
function work(msg, cb) {
console.log("Got msg ", msg.content.toString());
cb(true);
}
function closeOnErr(err) {
if (!err) return false;
console.error("[AMQP] error", err);
amqpConn.close();
return true;
}
setInterval(function() {
publish("", "jobs", new Buffer.from("work work work"));
}, 1000);
start();
Here's a video recording showing how to set up a CloudAMQP instance and connect to it using Node.js and the amqplib library: