How to run RabbitMQ with Python

RabbitMQ is a message broker, acting as the middleman between applications. Fortunately, RabbitMQ speaks multiple languages as a powerful broker, thanks to the many additional client libraries out there. This tutorial demonstrates how to connect to RabbitMQ in Python.

Before we get started in this lesson, we assume you have a running RabbitMQ server installed, but if not, click here to get started with a free RabbitMQ plan with CloudAMQP. Additionally, for your convenience, all code shown in this lesson is available on Github.

There are a growing number of ways to connect to RabbitMQ using Python including the use of ‘weak typing’ and even the asyncio framework.

The following Python-focused client libraries are in active development, as of this writing (2020-09-21):

These tools allow Python clients and AMQP brokers to work together in your application. This article, as well as the official RabbitMQ tutorial, use Pika, which continues to accommodate changes to the language, recently incorporating the Python asyncio framework.

Using a requirements file

Python allows your application to track dependencies through a special file named requirements.txt. You must place the file in the root folder for your project.

Install RabbitMQ Python library Pika, by adding the following to requirements.txt:

pika >= 1.1.0

Here we have specified the use of the latest version greater than or equal to 1.1.0. You can replace >= with == for an explicit release.

Run the following command from your root directory to complete the installation:

pip install -r requirements.txt

Note that you must install pip on your system prior to execution.

Types of connections in Pika

Pika allows you to choose the type of connection you create. Available options are:

  • The asynchronous connection adapter select connection (SelectConnection). SelectConnection is using callbacks. An example of a callback is add_on_open_callback which adds a callback notification once a connection has been opened. The SelectConnection can be useful if your RabbitMQ broker or your connection is slow or overloaded.
  • The blocking connection (BlockingConnection) waits for all requests to complete. This will block the execution thread until e.g channel_open or exchange_declared has returned. Its often simpler to use this sort of serialized logic.
  • An asyncio connection connects through a special adapter. Python asyncio allow functions to avoid blocking while waiting on I/O. A RabbitMQ client sends a request and then allows tasks to run on the same thread while waiting for a response. This is perfect for consumers since they communicate with and receive messages from the broker.

Connecting with Pika using a Blocking Connection

The blocking connection works well when sending messages at irregular intervals. Requests block the connection until the server sends a response to the request, i.e. when the connection has been established.

Establish a connection as follows:

import pika, os

url = os.environ.get("CLOUDAMQP_URL", "amqp://guest:guest@localhost:5672/")
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel.exchange_declare('test_exchange')
channel.queue_declare(queue="test_queue")
channel.queue_bind("test_queue", "test_exchange", "tests")
channel.basic_publish(exchange="test_exchange",
                            routing_key="tests",
                            body="Hello CloudAMQP!")
channel.close()
connection.close()

In the code above was a connection established to your server and the test_queue was declared. This queue is bound to the default exchange before publishing to it.

Full code for the blocking connection can be found here: https://github.com/cloudamqp/python-amqp-example

Using a non-blocking Pika Connection to send messages

The Python Pika library allows you to avoid waiting for the server to send responses using threading or multiprocessing. Use the SelectConnection as follows:

class RabbitConnectionExample:
"""
RabbitMQ operations
"""

def __init__(self):
     """
     Initializes the class
     """
       self.url = os.environ[‘RABBITMQ_URL’]
       self._barrier = Barrier(2, timeout=120)

def connection_callback(self, conn):
    """
    Run on connecting to the server

    :param conn: The connection created in the previous step
    """
    self._connection.channel(on_open_callback=self.channel_callback)

def channel_callback(self, ch):
    """
    Publish to the channel. You can use other methods with callbacks but only the channel
    creation method provides a channel. Other methods provide a frame you can choose to
    discard.

    :param ch: The channel established
    """
    properties = pika.BasicProperties(content_type='application/json')
    ch.basic_publish(exchange='test_exchange',
                                routing_key='tests',
                                properties=properties,
                                body="Hello CloudAMQP!")
    self._barrier.wait(timeout=1)
    ch.close()
    self._connection.close()

def run(self):
    """
    Runs the example
    """
       def run_io_loop(conn):
           conn.ioloop.start()

    params = pika.URLParameters(self._url)
    self._connection = pika.SelectConnection(
        params, on_open_callback=self.connection_callback)
    if self._connection:
           t = threading.Thread(target=run_io_loop, args=(self._connection, ))
           t.start()
           self._barrier.wait(timeout=30)
           self._connection.ioloop.stop()
       else:
           raise ValueError

RabbitConnectionExample().run()

The library chooses a polling mechanism based on your operating system to avoid blocking requests. You will need to create a workflow with a callback for each step required to send or receive messages, and then stop the loop when finished.

Creating a consumer in Pika

Pika is an AMQP 0.9.1 based library. Use the basic_consume method to receive responses:

url = os.environ.get(“CLOUDAMQP_URL”, “'amqp://guest:guest@localhost:5672/”)
params = 'pika.URLParameters(URL)
connection = pika.BlockingConnection(params)
channel =  connection.channel()
for method_frame, properties, body in channel.consume('test_queue'):
  print(str(body))
channel.close()
connection.close()

This method works with the BlockingConnection and SelectConnection. We can create a handle for each message or use the return value like an iterator in the BlockingConnection.

Using Python Asyncio to create a consumer

You can use the asyncio framework to create a consumer in Pika using AMQP 0.9.1. The process is similar to that used in the SelectConnection.

First, convert the run function from our SelectConnection example to the following:

def run(self):
  """
  Run the example.
  """
  print(sys.platform)
  if sys.platform == 'win32':
      asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
  params = pika.URLParameters(self._url)
  self._connection = AsyncioConnection(parameters=params,
                                           on_open_callback=self.open_connection_callback,
                                           on_open_error_callback=self.open_connection_error_callback,
                                           on_close_callback=self.close_connection_callback)
  if self._connection:
      self._connection.ioloop.run_forever()
      return self._closing

We switched to the AsyncioConnection which avoids blocking a thread until data is available.

To create a consumer in either the SelectConnection or AsyncioConnection set the prefetch count and move from using basic_publish to basic_consume after binding to the your queue in queue_bind_callback :

def on_message(self, _unused_channel, basic_deliver, _properties, body):
  """
  Called when your consumer receives a message.

  :param _ch:  Unused channel receiving the message
  :param basic_deliver:  AMQP.BasicDelivery object
  :param properties:  incoming message properties
  :param body:  incoming message body
  """
  print("Received {}\n{}".format(basic_deliver.delivery_tag, str(body)))

def qos_callback(self, _unused_frame):
  """
  Called after the QOS request succeeds

  :param _unused_frame:
  """
  print("Consuming from Queue")
  self._channel.add_on_cancel_callback(self.consumer_cancelled)
  self._consumer_tag = self._channel.basic_consume("test_queue", self.on_message,
                                      auto_ack=True)
  self._consuming = True
  self._was_consuming = True

def queue_bind_callback(self, _unused_frame):
  """
  Queue bind callback

  :param _unused_frame: Unused frame created by the queue bind call
  """
  print("Setting Prefetch")
  self._channel.basic_qos(prefetch_count=1, callback=self.qos_callback)

A broader Python asyncio tutorial can help you to understand the underpinnings of the framework.

CloudAMQP works with any library that can connect to RabbitMQ. So, let's start publishing and consuming messages today!

Further Reading

Python and AMQP 1.0 using Qpid Proton

RabbitMQ tutorials

Happy coding!

CloudAMQP - industry leading RabbitMQ as a service

Start your managed cluster today. CloudAMQP is 100% free to try.

13,000+ users including these smart companies