Sometimes you want to delay the delivery of messages for a certain time so that subscribers doesn't see them immediately. The AMQP protocol doesn't have a native delayed queue feature, but with RabbitMQ's AMQP protocol extensions we can easily emulate one by combining the message TTL function and the dead-lettering function.
Note: There is now a plugin available for delayed messages (RabbitMQ 3.5.3 and later versions), RabbitMQ delayed messages exchange plugin. The RabbitMQ Delayed Message Plugin adds a new exchange type to RabbitMQ where messages routed by that exchange can be delayed if the user adds a delay header to a message. Read more about the plugin: RabbitMQ delayed message exchange plugin
If a queue is declared with the x-dead-letter-exchange property messages which is either rejected, nacked or the TTL for a message expires will be sent to the specified dead-letter-exchange, and if you specify x-dead-letter-routing-key the routing key of the message will be changed when dead lettered.
By declaring a queue with the x-message-ttl property, messages will be discarded from the queue if they haven't been consumed within the time specified.
By combining these to functions we publish a message to a queue which will expire its message after the TTL and then reroute it to the exchange and with the dead-letter routing key so that they end up in a queue which we consume from.
Step by step:
The following Ruby snippet, which relays on the excellent Bunny library, demonstrates how delayed message can be implemented.
require 'bunny'
B = Bunny.new ENV['CLOUDAMQP_URL']
B.start
DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'
def publish
ch = B.create_channel
# declare a queue with the DELAYED_QUEUE name
ch.queue(DELAYED_QUEUE, arguments: {
# set the dead-letter exchange to the default queue
'x-dead-letter-exchange' => '',
# when the message expires, set change the routing key into the destination queue name
'x-dead-letter-routing-key' => DESTINATION_QUEUE,
# the time in milliseconds to keep the message in the queue
'x-message-ttl' => 3000
})
# publish to the default exchange with the the delayed queue name as routing key,
# so that the message ends up in the newly declared delayed queue
ch.default_exchange.publish 'message content', routing_key: DELAYED_QUEUE
puts "#{Time.now}: Published the message"
ch.close
end
def subscribe
ch = B.create_channel
# declare the destination queue
q = ch.queue DESTINATION_QUEUE, durable: true
q.subscribe do |delivery, headers, body|
puts "#{Time.now}: Got the message"
end
end
subscribe()
publish()
sleep
If you have any questions, please feel free to contact support@cloudamqp.com for further assistance.