Sometimes you want to delay the delivery of messages for a certain time so that subscribers doesn't see them immediately. The AMQP protocol doesn't have a native delayed queue feature, but with RabbitMQ's AMQP protocol extensions we can easily emulate one by combining the message TTL function and the dead-lettering function.

Note: There is now a plugin available for delayed messages (RabbitMQ 3.5.3 and later versions), RabbitMQ delayed messages exchange plugin. The RabbitMQ Delayed Message Plugin adds a new exchange type to RabbitMQ where messages routed by that exchange can be delayed if the user adds a delay header to a message. Read more about the plugin here.

Dead lettering Redirect a message after expiration or rejection

If a queue is declared with the x-dead-letter-exchange property messages which is either rejected, nacked or the TTL for a message expires will be sent to the specified dead-letter-exchange, and if you specify x-dead-letter-routing-key the routing key of the message with be changed when dead lettered.

Message TTL Discard unconsumed messages after a certain time

By declaring a queue with the x-message-ttl property, messages will be discarded from the queue if they haven't been consumed within the time specified.

How to combine TTL and DLX to delay message delivery

By combining these to functions we publish a message to a queue which will expire its message after the TTL and then reroute it to the exchange and with the dead-letter routing key so that they end up in a queue which we consume from.

Step by step:

  • Declare the delayed queue
    • Add the x-dead-letter-exchange argument property, and set it to the default exchange "".
    • Add the x-dead-letter-routing-key argument property, and set it to the name of the destination queue.
    • Add the x-message-ttl argument property, and set it to the number of milliseconds you want to delay the message.
  • Subscribe to the destination queue

Example from github.com/cloudamqp/rabbitmq-delayed-messages

The following Ruby snippet, which relays on the excellent Bunny library, demonstrates how delayed message can be implemented.

require 'bunny'

B = Bunny.new ENV['CLOUDAMQP_URL']
B.start

DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'

def publish
  ch = B.create_channel
  # declare a queue with the DELAYED_QUEUE name
  ch.queue(DELAYED_QUEUE, arguments: {
    # set the dead-letter exchange to the default queue
    'x-dead-letter-exchange' => '',
    # when the message expires, set change the routing key into the destination queue name
    'x-dead-letter-routing-key' => DESTINATION_QUEUE,
    # the time in milliseconds to keep the message in the queue
    'x-message-ttl' => 3000
  })
  # publish to the default exchange with the the delayed queue name as routing key,
  # so that the message ends up in the newly declared delayed queue
  ch.default_exchange.publish 'message content', routing_key: DELAYED_QUEUE
  puts "#{Time.now}: Published the message"
  ch.close
end

def subscribe
  ch = B.create_channel
  # declare the destination queue
  q = ch.queue DESTINATION_QUEUE, durable: true 
  q.subscribe do |delivery, headers, body|
    puts "#{Time.now}: Got the message"
  end
end

subscribe()
publish()

sleep

If you have any questions, please feel free to contact support@cloudamqp.com for further assistance.