Part 2: LavinMQ for beginners - Sample code for Python

Welcome to LavinMQ for beginners! This article is the second in a series of LavinMQ and will take you through code examples with Python, steps to set up your own LavinMQ instance, and a relatable example of a working LavinMQ system to follow for yourself. The guide goes on to explain the steps to set up a connection and the basics of publishing/consuming messages from a queue.

This article assumes that you have a running LavinMQ server installed, if not - get started with a free LavinMQ plan at CloudAMQP.

This tutorial follows the scenario used in the previous article, Part 1: LavinMQ for beginners - What is LavinMQ?

The example follows a web application that allows users to upload a profile picture. Once the image is uploaded the user can traditionally decide what part of the image they want to show, scale it up or down and move it around.

The web application takes these instructions and the image and sends a request to the part of the system that is responsible for "Image Processing", which usually includes downsizing and web optimization.

The website handles the information, scales the image, and saves it in the new format. In the example, the entire scaling process will take several seconds. Let’s get started.

The full code can be seen at https://github.com/cloudamqp/python-amqp-example.

Getting started with LavinMQ and Python

Start by downloading the client-library for Python3. The recommended library for Python is Pika. Put pika==1.1.0 in your requirement.txt file.

Queues and exchanges will be declared and created if they do not already exist and, finally, a message is published. The publish method queues messages internally if the connection is down and resends them later. Once the consumer subscribes to the queue, the messages are handled one by one and sent to the image processor.

A default exchange, identify by the empty string ("") will be used. The default exchange means that messages are routed to the queue with the name specified by routing_key, if it exists. (The default exchange is a direct exchange with no name)

Tutorial Full code

# publish.py
import pika, os
# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue='image_scaling') # Declare a queue
channel.basic_publish(exchange='',
                      routing_key='image_scaling',
                      body='Information about image scaling')

print(" [x] Sent Information about image scaling")
connection.close()
# example_consumer.py
import pika, os, time

def image_process_function(msg):
  print(" Image processing")
  print(" [x] Received " + str(msg))

  time.sleep(5) # delays for 5 seconds
  print(" Image processing finished");
  return;

# Access the CLODUAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue='image_scaling') # Declare a queue

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  image_process_function(body)

# set up subscription on the queue
channel.basic_consume('image_scaling',
  callback,
  auto_ack=True)

# start consuming (blocks)
channel.start_consuming()
connection.close()

Tutorial source code - Publisher

# example_publisher.py
import pika, os
# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
params = pika.URLParameters(url)

Load client library and set up configuration parameters. Other connection parameter options for Pika can be found here: Connection Parameters.

connection = pika.BlockingConnection(params)

pika.BlockingConnection establishes a connection with the LavinMQ server.

Start a channel

channel = connection.channel()

connection.channel create a channel in the TCP connection.

Declare a queue

channel.queue_declare(queue='image_scaling') # Declare a queue

channel.queue_declare creates a queue to which the message will be delivered. The queue will be given the name image_scaling.

Publish a message

channel.basic_publish(exchange='',
                      routing_key='image_scaling',
                      body='Information about image scaling')
print(" [x] Sent Information about image scaling")

channel.basic_publish publish the message to the channel on the given exchange, with a routing key and a body. A default exchange, identify by the empty string ("") will be used. The default exchange means that messages are routed to the queue with the name specified by routing_key, if it exists. (The default exchange is a direct exchange with no name).

Close the connection

connection.close()

The connection will be closed after the message has been published.

Consumer

Worker function

def image_process_function(msg):
  print(" Image processing")
  print(" [x] Received " + str(msg))

  time.sleep(5) # delays for 5 seconds
  print(" Image processing finished");
  return;

image_process_function is a todo-function. It will sleep for 5 seconds to simulate the image processing.

Function called for incoming messages

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  image_process_function(body)

The callback function will be called on every message received from the queue. The function will call a function that simulate the image processing.

#set up subscription on the queue
channel.basic_consume('image_scaling',
  callback,
  auto_ack=True)

basic_consume binds messages to the consumer callback function.

channel.start_consuming() # start consuming (blocks)
connection.close()

start_consuming starts to consume messages from the queue.

More information about Python and CloudAMQP can be found here.

Please email us at contact@cloudamqp.com if you have any suggestions or feedback.

Guide - LavinMQ for beginners

CloudAMQP - industry leading RabbitMQ as a service

Start your managed cluster today. CloudAMQP is 100% free to try.

13,000+ users including these smart companies