CloudAMQP with Crystal Getting started

The recommended library for Crystal to access RabbitMQ servers is AMQP::Client

With only ~1000 lines of code, and no dependencies, this amqp-client is a slimmed library but without compromising on functionality.

Sent messages are published as persistent, which means that messages are stored on disk and will survive broker/server crash or restart (when the queue is set to durable).

The client also waits for message confirmation from the broker, ensuring that no messages are lost in transit. This feature can be disabled if performance is a priority.

The client is also thread-safe.

Get started with Crystal and AMQP::Client

Installation

  1. Add the dependency to your
    dependencies:
      amqp-client:
        github: cloudamqp/amqp-client.cr
  2. Then run
    shards install

Usage

require "amqp-client"

AMQP::Client.start("amqp://guest:guest@localhost") do |c|
  c.channel do |ch|
      q = ch.queue("my-queue")
      q.subscribe(no_ack: false) do |msg|
        puts "Received: #{msg.body_io.to_s}"
        ch.basic_ack(msg.delivery_tag)
      end

  # publish directly to a queue without confirm
  q.publish "msg"

  # publish directly to a queue, blocking while waiting for confirm
  q.publish_confirm "msg"

  # publish to any exchange/routing-key
  ch.basic_publish "msg", exchange: "amq.topic", routing_key: "a"

  # publish to any exchange/routing-key and wait for confirm
  ch.basic_publish_confirm "msg", exchange: "amq.topic", routing_key: "a"

  # This statement will block until a message has arrived
  # The only way to "escape" the block is to unsubscribe
  q.subscribe(tag: "myconsumer", block: true) do |msg|
    q.unsubscribe("myconsumer")
  end

  # Consume and ack, nack or reject msgs
  ch.basic_consume("queue", tag: "consumer-tag", no_ack: false, exclusive: false, block: false) do |msg|
    case msg.body_io.to_s
    when "ack"
      ch.basic_ack(msg.delivery_tag)
    when "reject"
      ch.basic_reject(msg.delivery_tag, requeue: true)
    when "nack"
      ch.basic_nack(msg.delivery_tag, requeue: true, multiple: true)
    end
  end

  ch.prefetch(count: 1000) # alias for basic_qos

  name, message_count, consumer_count =
    ch.queue_declare(name: "myqueue", passive: false, durable: true,
                    exclusive: false, auto_delete: false,
                    args = Arguments.new)
  q = ch.queue # temporary queue that is deleted when the channel is closed
  ch.queue_purge("myqueue")
  ch.queue_bind("myqueue", "amq.topic", "routing-key")
  ch.queue_unbind("myqueue", "amq.topic", "routing-key")
  msg = ch.basic_get("myqueue", no_ack: true)
  ch.basic_ack(msg.delivery_tag)
  ch.queue_delete("myqueue")
  ch.exchange_declare("my-exchange", type: "topic")
  ch.exchange_delete("my-exchange")
end
end

.

GitHub repository