The previous article in this series explored how to get started with RabbitMQ Streams. But beyond the default configurations that are bundled with a Stream, you might want to customize these default settings in some scenarios. This article looks at some of these configurations as well as the limitations of RabbitMQ Streams.
RabbitMQ Streams Configurations
You can configure a Stream in RabbitMQ using queue arguments specified with a policy key or at the time of queue declaration. Read on to learn about these configurations.
Data Retention Configurations
Since RabbitMQ Streams are immutable, they inherently tend to grow infinitely. As this is an undesirable behavior, a Stream can be configured to discard old messages through a retention policy. Hold on– retention policy?
Yes, through a retention policy, you can configure a Stream to truncate its messages once it reaches a given size or a specified age. Truncating messages entails deleting an entire segment file. But what is a segment file?
RabbitMQ Streams do not persist messages in a big, single file. Instead, a Stream is broken down into smaller files known as segment files. A Stream truncates its size by deleting a segment file and all its messages. To configure a Stream's retention strategy, you can adopt size or time-based retention strategies.
- Size-based retention strategy - the Stream is configured to truncate its size once the total size of the stream reaches a given value.
- Time-based retention strategy - the Stream is configured to truncate a segment file once that segment reaches a given age.
Setting up the sized-based retention strategy requires providing the following arguments when declaring the Stream. As mentioned earlier, this can also be done through a policy:
On the other hand, setting up the time-based retention strategy requires providing the following arguments when declaring the Stream:
Notice how the
argument is required in both strategies? We will explain this. First, let’s
make sense of these arguments.
This argument will control the maximum size of the RabbitMQ Stream. When this
is set, RabbitMQ will delete segment files from the beginning of the Stream.
The deletion happens when the Stream’s total size reaches the value of
For example, if the maximum size of a Stream is set to
the Stream will discard the oldest messages when the Stream’s disk usage hits
100000000 bytes. RabbitMQ does not provide a default value for this.
The unit could be in KB, MB, GB, or TB, however, when you just provide a value for this argument without a unit, it will default to bytes.
This argument will control how long a message survives in a RabbitMQ Stream. The unit of this configuration could either be in years (Y), months (M), days (D), hours (H), minutes (M), or seconds (S).
For example, if the
of a Stream is set to,
the Stream will discard segment files that have been there for 30 days or
more. RabbitMQ does not provide a default value for this.
As mentioned earlier, RabbitMQ Streams encompass one or more segment files on
disk, and this argument controls the size of each segment. For example, if the
maximum size of the segment file of a Stream is set to
each segment file will have a maximum size of 50000 bytes. RabbitMQ provides a
default value for this: 500000000 bytes
Now, back to your question.
Why is the
argument required in both retention strategies?
The max-age and x-max-length-bytes arguments are important for the retention of messages in RabbitMQ Streams, but the retention is evaluated on a per-segment basis. Essentially, Streams only apply the retention policies whenever an existing segment file has reached its maximum size and is closed in favor of a new one.
As a result, if the
argument is not provided,
the Stream will never know when to close the current segment file and create a
new one. And, by extension, invoke the retention policy. This is why this
argument is required in the size and time-based retention strategies.
arguments can be combined. And, of course, always provide the third required
argument. In that case, the Stream will only discard messages when both
conditions are true. Not clear?
Okay, for example, if the
is 100(not ideal) and the
is 30D, the Stream will only discard segment files that have been in the
Stream for more than 30 days only when the Stream's disk usage reaches 100.
In essence, even if there are segment files whose
has exceeded the limit, the Stream won’t discard them until the max length is
exceeded and vice versa.
Controlling the Initial Replication Factor
Remember Streams are persistent and replicated. When a Stream is initialized, RabbitMQ will create a replica of the Stream on some randomly selected nodes in the cluster. However, the number of replicas can be controlled in two ways:
- with the x-initial-cluster-size queue argument when declaring the Stream via an AMQP client.
- With the initial-cluster-size queue argument when declaring the Stream via the stream plugin.
This argument controls the number of nodes in the cluster on which the Stream will be replicated. Like quorum queues and replicated classic queues, streams are affected by cluster sizes. The more replicas a stream has, the more data needs to be replicated, lowering the throughput. It is recommended to use an uneven cluster size to constitute a quorum, such as 1, 3, or 5.
RabbitMQ Stream Leader Election Configuration
Even though a Stream would always have replicas across nodes, there is always the leader replica or node. All Stream operations go through the leader replica first and then replicated on the other nodes. Which node becomes the replica is controlled in three ways:
By passing the
x-queue-leader-locatorargument when declaring the Stream
By setting the
By defining the
queue_leader_locatorin the configuration file
The supported values for leader election configuration are:
client-local- This is the default value. The client that declares the Stream is usually connected to some node. The client-local value elects this node to be the leader.
Balanced- If there are less than 1000 queues, make the node hosting the minimum number of Stream leaders the leader. Else, make a random node the leader.
RabbitMQ Streams Limitations
Streams store messages as AMQP 1.0 encoded data. When publishing using AMQP 0.9.1 a conversion is done under the hood. While this conversion will often play out well, sometimes it doesn’t. For example, if the header of an AMQP 0.9.1 message contains complex values like arrays/lists, the header will not be converted. That is because headers in an AMQP 1.0 message can only contain values of simple types, such as strings and numbers.
UI Metric Accuracy
When working with Streams, sometimes the Management UI does not reflect the precise message count. In streams, offset tracking information also counts as messages, making the message count artificially larger than it is. This should make no practical difference in most systems.
This series explored the fundamentals of RabbitMQ Streams, from when to use Streams in part 1 to how to get started with them in part 2. . This article took a step further to cover some optional configurations that make it easier to tweak a Stream for a specific use case.
Overall, Streams weren’t created to replace queues, but to complement them. Streams open up new possibilities for RabbitMQ use cases.
Ready to start using RabbitMQ in your architecture? CloudAMQP is one of the world’s largest RabbitMQ cloud hosting providers. In addition to RabbitMQ, we also created our in-house message broker, LavinMQ with a throughput of around 1,000,000 messages/sec.
Email us at email@example.com with any suggestions, questions, or feedback.