Welcome to LavinMQ for beginners! This article is the second in a series of LavinMQ and will take you through code examples with Python, steps to set up your own LavinMQ instance, and a relatable example of a working LavinMQ system to follow for yourself. The guide goes on to explain the steps to set up a connection and the basics of publishing/consuming messages from a queue.
This article assumes that you have a running LavinMQ server installed, if not - get started with a free LavinMQ plan at CloudAMQP.
This tutorial follows the scenario used in the previous article, Part 1: LavinMQ for beginners - What is LavinMQ?
The example follows a web application that allows users to upload a profile picture. Once the image is uploaded the user can traditionally decide what part of the image they want to show, scale it up or down and move it around.
The web application takes these instructions and the image and sends a request to the part of the system that is responsible for "Image Processing", which usually includes downsizing and web optimization.
The website handles the information, scales the image, and saves it in the new format. In the example, the entire scaling process will take several seconds. Let’s get started.
The full code can be seen at https://github.com/cloudamqp/python-amqp-example.Getting started with LavinMQ and Python
Start by downloading the client-library for Python3. The recommended
library for Python is
Pika.
Put
pika==1.1.0
in your
requirement.txt
file.
Queues and exchanges will be declared and created if they do not already exist and, finally, a message is published. The publish method queues messages internally if the connection is down and resends them later. Once the consumer subscribes to the queue, the messages are handled one by one and sent to the image processor.
A default exchange, identify by the empty string ("") will be used. The default exchange means that messages are routed to the queue with the name specified by routing_key, if it exists. (The default exchange is a direct exchange with no name)
Tutorial Full code
# publish.py
import pika, os
# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue='image_scaling') # Declare a queue
channel.basic_publish(exchange='',
routing_key='image_scaling',
body='Information about image scaling')
print(" [x] Sent Information about image scaling")
connection.close()
# example_consumer.py
import pika, os, time
def image_process_function(msg):
print(" Image processing")
print(" [x] Received " + str(msg))
time.sleep(5) # delays for 5 seconds
print(" Image processing finished");
return;
# Access the CLODUAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue='image_scaling') # Declare a queue
# create a function which is called on incoming messages
def callback(ch, method, properties, body):
image_process_function(body)
# set up subscription on the queue
channel.basic_consume('image_scaling',
callback,
auto_ack=True)
# start consuming (blocks)
channel.start_consuming()
connection.close()
Tutorial source code - Publisher
# example_publisher.py
import pika, os
# Access the CLOUDAMQP_URL environment variable and parse it (fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
params = pika.URLParameters(url)
Load client library and set up configuration parameters. Other connection parameter options for Pika can be found here: Connection Parameters.
connection = pika.BlockingConnection(params)
pika.BlockingConnection
establishes a connection with the LavinMQ server.
Start a channel
channel = connection.channel()
connection.channel
create a channel in the TCP connection.
Declare a queue
channel.queue_declare(queue='image_scaling') # Declare a queue
channel.queue_declare
creates a queue to which the message will be delivered. The queue will be given
the name
image_scaling.
Publish a message
channel.basic_publish(exchange='',
routing_key='image_scaling',
body='Information about image scaling')
print(" [x] Sent Information about image scaling")
channel.basic_publish
publish the message to the channel on the given exchange, with a routing key
and a body.
A default exchange, identify by the empty string ("") will be used. The
default exchange means that messages are routed to the queue with the name
specified by routing_key,
if it exists. (The default exchange is a direct exchange with no name).
Close the connection
connection.close()
The connection will be closed after the message has been published.
Consumer
Worker function
def image_process_function(msg):
print(" Image processing")
print(" [x] Received " + str(msg))
time.sleep(5) # delays for 5 seconds
print(" Image processing finished");
return;
image_process_function
is a todo-function. It will sleep for
5 seconds to simulate the image processing.
Function called for incoming messages
# create a function which is called on incoming messages
def callback(ch, method, properties, body):
image_process_function(body)
The
callback
function will be called on every message received from the queue.
The function will call a function that simulate the
image processing.
#set up subscription on the queue
channel.basic_consume('image_scaling',
callback,
auto_ack=True)
basic_consume
binds messages to the consumer callback function.
channel.start_consuming() # start consuming (blocks)
connection.close()
start_consuming
starts to consume messages from the queue.
More information about Python and CloudAMQP can be found here.
Please email us at contact@cloudamqp.com if you have any suggestions or feedback.