Delayed messages with RabbitMQ

Sometimes you want to delay the delivery of messages for a certain time so that subscribers doesn't see them immediately. The AMQP protocol doesn't have a native delayed queue feature, but with RabbitMQ's AMQP protocol extensions we can easily emulate one by combining the message TTL function and the dead-lettering function.

Note: There is now a plugin available for delayed messages (RabbitMQ 3.5.3 and later versions), RabbitMQ delayed messages exchange plugin. The RabbitMQ Delayed Message Plugin adds a new exchange type to RabbitMQ where messages routed by that exchange can be delayed if the user adds a delay header to a message. Read more about the plugin: RabbitMQ delayed message exchange plugin

Dead lettering Redirect a message after expiration or rejection

If a queue is declared with the x-dead-letter-exchange property messages which is either rejected, nacked or the TTL for a message expires will be sent to the specified dead-letter-exchange, and if you specify x-dead-letter-routing-key the routing key of the message will be changed when dead lettered.

Message TTL Discard unconsumed messages after a certain time

By declaring a queue with the x-message-ttl property, messages will be discarded from the queue if they haven't been consumed within the time specified.

How to combine TTL and DLX to delay message delivery

By combining these to functions we publish a message to a queue which will expire its message after the TTL and then reroute it to the exchange and with the dead-letter routing key so that they end up in a queue which we consume from.

Step by step:

  • Declare the delayed queue
    • Add the x-dead-letter-exchange argument property, and set it to the default exchange "".
    • Add the x-dead-letter-routing-key argument property, and set it to the name of the destination queue.
    • Add the x-message-ttl argument property, and set it to the number of milliseconds you want to delay the message.
  • Subscribe to the destination queue

Example from github.com/cloudamqp/rabbitmq-delayed-messages

The following Ruby snippet, which relays on the excellent Bunny library, demonstrates how delayed message can be implemented.

require 'bunny'

B = Bunny.new ENV['CLOUDAMQP_URL']
B.start

DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'

def publish
  ch = B.create_channel
  # declare a queue with the DELAYED_QUEUE name
  ch.queue(DELAYED_QUEUE, arguments: {
    # set the dead-letter exchange to the default queue
    'x-dead-letter-exchange' => '',
    # when the message expires, set change the routing key into the destination queue name
    'x-dead-letter-routing-key' => DESTINATION_QUEUE,
    # the time in milliseconds to keep the message in the queue
    'x-message-ttl' => 3000
  })
  # publish to the default exchange with the the delayed queue name as routing key,
  # so that the message ends up in the newly declared delayed queue
  ch.default_exchange.publish 'message content', routing_key: DELAYED_QUEUE
  puts "#{Time.now}: Published the message"
  ch.close
end

def subscribe
  ch = B.create_channel
  # declare the destination queue
  q = ch.queue DESTINATION_QUEUE, durable: true
  q.subscribe do |delivery, headers, body|
    puts "#{Time.now}: Got the message"
  end
end

subscribe()
publish()

sleep

If you have any questions, please feel free to contact support@cloudamqp.com for further assistance.

Delayed messages with LavinMQ

The delayed message exchange has the same purpose, and inherently works the same for RabbitMQ and LavinMQ. You can set up a delayed message exchange in RabbitMQ, and then export+import those definitions to LavinMQ and expect that it works exactly the same for your application.

With that said, the delayed-exchange is implemented slightly different.

For example, the UI is different. Additionally, in LavinMQ, you can let the type be the regular type (topic, direct, fanout..) and then you add an args: {x-delayed-exchange: true} to indicate that the exchange is a delayed exchange

LavinMQ also handles the “rabbitmq way” where you send in type: x-delayed-message so clients can work with both brokers in the same way, but the user interface is a bit different. If you don’t specify a type it will become a direct exchange by default.

In the management interface for LavinMQ, you choose a type and then check the “delayed” box if you want the exchange to be delayed, but in the rabbitMQ interface, delayed is displayed as one of the types.

LavinMQ:



RabbitMQ: