The RabbitMQ federation plugin

Federation has several use-cases; collect messages from multiple clusters to a central cluster, distribute load of one queue to multiple other clusters and when migrating to another cluster without stopping all producers/consumers while doing so.

RabbitMQ provides two types of federation: exchange and queue federation. A federated exchange can route messages published to an upstream server to a local queue. A federated queue allowes a local consumer to receive messages from an upstream queue.

Upstream Downstream
Instance map

Exchange federation

Usually when RabbitMQ is used are messages published to an exchange and the messages are then routed to the queue or the queues. With use of exchange federation it is possible to get RabbitMQ to distribute those messages to any other clusters. That means messages arriving to those federated exchanges will also be forwarded to the downstream clusters. Exchange federation consumes messages from an upstream cluster and republishes on it's own local exchange, as if the messages published on the upstream cluster were published on the local cluster. A situation where exchange federation could be used is when there are clusters in multiple regions, but you want to collect all messages to a central cluster where your consumers can process them. The federation act as an intelligent shovel, so it will create a queue on the upstream cluster, bind it to the exchange you're federating and then consume from that queue and republish them on the local exchange. If the connection is broken messages will queue up on the upstream queue and when the server reconnects again it will transfer all messages that were publish during the network outage.

Queue federation

Queue federation connects an upstream queue to transfer messages to the downstream queue when there's consumers that has capacity on the downstream queue. This creates intelligent load balancing between clusters. It's also good when migrating between two clusters. Consumers and publishers can be moved in any order and the messages won't be duplicated (which is the case if you do exchange federation) but transferred to the new cluster when your consumers are there. The federated queue will only retrieve messages when it has run out of messages locally, it has consumers that need messages, and the upstream queue has "spare" messages that are not being consumed. Federated queues allows higher capacity than a single queue. Read Cluster migration with RabbitMQ Queue Federation for more information on the topic.

Federation example setup

Before we show an example of exchange federation, let us start by defining the concept of upstream and downstream servers. Upstream servers are the servers towards messages are originally published. Downstream servers are where the messages get forwarded to. The downstream cluster can be thought of as subscribing to messages from the upstream cluster. With exchange federation are messages published to the upstream exchange and then copied to the federated exchange, as though they were published directly to it.

Let us pretend that we have our nodes set up in Amazon, one node in ‘US-East-1’, one in ‘US-West-1’ and one in ‘EU-Central-1’. Those nodes constructs together our RabbitMQ fleet. By use of exchange federation, it is possible to define the node in ‘US-East-1’ and the node in ‘US-West-1’ as the upstream servers of the node in ‘EU-Central-1’. So any messages published to the RabbitMQ exchange located on ‘US-East-1’ or ‘US-East-1’ will be replicated on the corresponding exchange on the node in ‘EU-Central-1’.

A use case for federated queues is when the broker is operating at its maximum capacity. By federating queues, other brokers and their consumers can help out by take messages from the queue on the upstream server. For example, you might have producers and consumers in multiple locations, and you want your queues in "US-East-1" to get help from "EU-Central-1" when one is busier than the other.

Clustering, federation, shovel

The Shovel plugin is another plugin that can be used to balance the load of a queue. Shovel is a plugin for RabbitMQ that enables you to define replication relationships between a queue on one RabbitMQ server and an exchange on another.

Definition from RabbitMQ: "A plug-in for RabbitMQ that shovels messages from a queue on one broker to an exchange on another broker."

You can set up a queue on another broker that subscribes to the queue of the other broker. The shovel plugin will reduce the load that is set to one of the queues. The shovel plugin take messages from a queue (a source - can be seen as the "upstream server") in one broker and publish them to exchanges in another broker (a destination - can be seen as the "downstream server").

Compared to clustering can federation and shovel be used over WAN. Clustering is for joining several brokers in one location to enable scalable routing.

Getting Started with RabbitMQ Federations and Node.js

It is time to setup the exchange federation between two RabbitMQ nodes. All CloudAMQP dedicated instances comes with the federation plugin enabled. This example show how to set up federation from the management interface. Any language can be used to create the bindings, queues and exchanges. An example of how to set up the queues, exchanges and bindings in Node.js is shown in the bottom of this article.

In this example do we have an application architecture similar to the one from the map above, where we have one server (CloudAMQP instance) in AWS EU-Central-1 and one server (CloudAMQP instance) in AWS US-East-1. Both those RabbitMQ servers are set up and ready to be federated as soon as they are created. If you need more information about how to set up CloudAMQP instances, check out the CloudAMQP Documentation and compare different plans on the CloudAMQP Plans & Pricing page.

We will in this example create a queue named 'demoqueue' on both the upstream and the downstream server. We will also create a federated exchange and bind the exchange on the upstream server to the federated link on the downstream server. Messages published to the exchange on the upstream server will be routed to the 'demoqueue' and they will also be copied to the exchange on the downstream server (and after that also routed to the demoqueue on the downstream server.)

Define the Upstream for the downstream server

We have to start by setting up the downstream cluster (the server that the messages will be forward to, in this case 'EU-Central-1'). To do this we need the URI to the upstream server. Open the management interface for the downstream server and go to the 'Admin'->'Federation Upstreams' screen and press 'Add a new upstream'. Fill in all information needed, the URI should be the URI of the upstream server. The expiry time is the time in milliseconds, after which an upstream queue for a federated exchange may be deleted, if a connection to the upstream broker is lost. TTL is in this example set to one hour, meaning that all messages will stay for one hour. If one cluster goes down, will the other cluster have all the messages from the last hour.

Note: Be sure to have the correct virtual host selected all the time.

Federation add upstream

Define the Policy

Create a policy that selects a set of exchanges, and applies a single upstream to the objects.

Navigate to 'Admin' -> 'Policies' and press 'Add / update a policy'.

Federation Add Policy

The "pattern" argument is a regular expression used to match exchange or queue names. In this case we tell the policy to federate all exchanges whose names begin with "federation.".

A policy can apply to an upstream set or a single upstream of exchanges and/or queues. In this example do we apply to all upstreams, federation-upstream-set is set to all.

NOTE: Policies are matched every time an exchange or queue is created.

NOTE: Any other matching policy with a priority greater than 0 will take precedence over this policy.

Setting up Upstream server

We then need to create message queues, exchanges and the bindings for the downstream server. The fastest and easiest way to set up this is by using any programming language, it can also be done directly from the management interface. Sample code for Node.js and the client library amqplib , can be found in the bottom of this article.

  • Define a queue, in this example named ‘demoqueue‘.
  • Create an exchange, in this example named ‘federation.exchange’.
  • Bind the exchange to the queue, in this example bind the ‘federation.exchange’ to the ‘demoqueue‘. The exchanges shown for the downstream server should look like in the picture below.
  • Check that the federation.exchange has a binding to the federated upstream exchange.
    federatioin upstream

The federated exchange will connect to all its upstream exchanges using AMQP. The exchanges for the downstream server should look like in the picture below.

federation upstream

Try the federation by publishing one message to the exchange, federation.exchange, on the upstream server. You will see that the message ends up in the 'demoqueue' on both the upstream and the downstream server.

Node.js sample code

The sample code below show how to set up a connection to the RabbitMQ server. It also shows how to set up the demoqueue and the federation.exchange to that queue. You have to create the queues and the exchanges on both the upstream and the downstream server.

Start by adding amqplib as a dependency to your package.json file.

var queue = 'demoqueue';
var exchange = 'federation.exchange';

function bail(err) {
  console.error(err);
  process.exit(1);
}

function publisher(conn) {
  conn.createChannel(on_open);
  function on_open(err, ch) {
    if (err != null) bail(err);
    ch.assertExchange(exchange, 'fanout');
    ch.assertQueue(queue);
    ch.bindQueue(queue, exchange, ''),
    ch.publish(exchange, '', new Buffer('Hello CloudAMQP'));
  }
}

function consumer(conn) {
  var ok = conn.createChannel(on_open);
  function on_open(err, ch) {
    if (err != null) bail(err);
    ch.assertQueue(queue);
    ch.consume(queue, function(msg) {
      if (msg !== null) {
        console.log(msg.content.toString());
        ch.ack(msg);
      }
    });
  }
}
// Get the URL from ENV or default to localhost
var url = process.env.CLOUDAMQP_URL || "amqp://localhost";
require('amqplib/callback_api')
.connect(url, {timeout: 1}, function(err, conn) {
  if (err != null) bail(err);
  consumer(conn);
  publisher(conn);
});

If you like to set up the federation from the console do I recommend this article from Jaxenter.com, Distributed log aggregation with RabbitMQ Federation.

More information about how to get started with CloudAMQP and guides for various languages and platforms can be found in our documentation.

Please email us at contact@cloudamqp.com if you have any suggestions, questions or feedback.

CloudAMQP - industry leading RabbitMQ as a service

Start your managed cluster today. CloudAMQP is 100% free to try.

13,000+ users including these smart companies