Microservices and Message Queues, Part 6: Building the second service

The previous article we implemented the first service in our distributed architecture. The first service functioning as the producer, published messages to our RabbitMQ instance on CloudAMQP. In this article, we will use the second service that will serve as the consumer of messages in our architecture.

As mentioned earlier, the GetMeHired-consumer will consume and process the messages published to our RabbitMQ instance on CloudAMQP. A message in this case is the job search query, and processing a message here entails grabbing the relevant jobs from the web and emailing those jobs to the users.

This service essentially does all the heavy lifting in our architecture.

With regards to the implementation details, we will define an APIRequeuestHelper class that will encapsulate the logic of making requests to the Linkedin job search API to grab relevant jobs.

Furthermore, we will again define the CloudAMQPHelper class that will streamline interfacing with our RabbitMQ instance on CloudAMQP. Additionally, we will also have the TwilioHelper class that will encompass the logic of working with Twilio Sendgrid in our project.

We will also define the send_email function that uses the TwilioHelper class to forward relevant jobs to users. Lastly, we will implement the main process that uses the CloudAMQPHelper class to grab messages from CloudAMQP, the APIRequestHelper class, to grab relevant jobs, and the send_email function to do what its name implies.

Also, note that the completed code for this project is hosted on GitHub. You can refer to it if you get stuck at any point. To begin, let's set up our development environment.

Setting up the project

Take the following steps to set up your development environment and subsequently the project:

Step 1: Creating a virtual environment

Even though we already created a virtual environment for the first service, we are still going to create a new virtual environment for the second service. This is major because the second service is meant to be a self-reliant application with its own set of dependencies.

  • In your terminal, run python3 -m venv get-me-hired-consumer-env on Unix and macOS or python -m venv get-me-hired-consumer-env on Windows to create a virtual environment named get-me-hired-consumer-env.
  • Run source get-me-hired-consumer-env/bin/activate on Unix and macOS or .\get-me-hired-consumer-env\Scripts\activate on Windows to activate the virtual environment.

Step 2: Create a FastAPI project

With your virtual environment activated, take the following steps to bootstrap a FastAPI project.

  • In your terminal run, code mkdir get-me-hired-consumer to create a new directory for our project.
  • In your terminal run cd get-me-hired-consumer && touch __init__.py main.py requirements.txt Essentially, this command would navigate into the get-me-hired-consumer folder and create three files in it: __init__.py, main.py, and requirements.txt

    If you are on Windows, the touch command will not work. In that case, only run the first part of the command to navigate into the newly created folder: cd get-me-hired-consumer. You can then open the folder in the IDE you are working with and manually create the three files.

  • If not done already, open the get-me-hired-consumer folder in your favorite IDE, and then copy and paste the snippet below into your requirements.txt file.
    fastapi==0.75.0
    pika==1.1.0
    uvicorn[standard]
    python-dotenv
    requests
    sendgrid
    
    
    These are the external dependencies we will need in our project for now.
  • In your terminal, run pip install -r requirements.txt This command will install all the dependencies listed in the requirements.txt file. Also, ensure that you are in the project directory in the terminal before running that command.
  • Next, copy the snippet below and paste it into the main.py file
    from fastapi import FastAPI
    
    app = FastAPI()
    
    # create a function which is called on incoming messages
    def callback(ch, method, properties, body):
      """ The logic for grabbing jobs and sending an email
          will be invoked here
      """
      pass
    
    def main():
      print("\n main function invoked \n")
    
    @app.on_event("startup")
    def startup_event():
      """ Code to run during startup """
      main()
    
    @app.on_event("shutdown")
      async def shutdown_event():
      """Code to run during shutdown"""
      pass
    

Step 3: Running the development server

In your terminal run uvicorn main:app --reload

In the output in your terminal, you should see this line INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

It shows the URL where your app is being served on your local machine.

You should see main function invoked logged to your console

With our project now set, we will proceed to implement the business logic. To begin we will implement the CloudAMQP and the APIRequest helper classes in that order.

But before implementing these classes, remember to create a .env file in your project and add your CloudAMQP credentials to the file - we demonstrated how to do this in the previous post.

Implementing the CloudAMQP helper class

The CloudAMQP helper class will encapsulate the logic of connecting to CloudAMQP, declaring queues & exchanges, and publishing messages. We will use this class in some parts of the project to consume messages from our CloudAMQP’s RabbitMQ instance.

Create a file cloudamqp_helper.py in the project directory and add the snippet below to it.

import os
from typing import Callable
import pika
# from pika.exchange_type import ExchangeType
from dotenv import load_dotenv

# Load the .env file
load_dotenv()

class CloudAMQPHelper:
  """ The interface between this project and CloudAMQP """

  QUEUE_NAME = "get_me_hired_queue"

  def __init__(self) -> None:
    """ Sets up a connection and a channel when this class is instantiated """
    url = os.environ["CLOUDAMQP_URL"]
    params = pika.URLParameters(url)

    self.__connection = pika.BlockingConnection(params) # Connect to CloudAMQP
    def __create_channel(self) -> pika.BlockingConnection:
    channel = self.__connection.channel() # start a channel
    return channel
    
  def __create_queue(self) -> None:
    """ Declares a queue - always good to create the same queue
        from the consumer side as well since the action is idempotent
    """
    # Get channel
    channel = self.__create_channel()
    # Create a queue
    channel.queue_declare(queue=self.QUEUE_NAME)

  def consume_message(self, callback: Callable) -> None:
    """ Reads a message published to a queue it's bound to """
    self.__create_queue()
    # Get channel
    channel = self.__create_channel()
    channel.basic_consume(
        self.QUEUE_NAME,
        callback,
        auto_ack=True
    )
    # start consuming (blocks)
    channel.start_consuming()
    self.connection.close()

# Create an instance
cloudamqp: CloudAMQPHelper = CloudAMQPHelper()

In the snippet above, we, first of all, used the python-dotenv to load our .env file just so it’s accessible in our project. Next, we created the CloudAMQPHelper class that exposes one public method, consume_message.

Next, let’s create our APIRequest helper class.

Implementing the APIRequest helper class

As its name implies, we will use this helper class to make API calls to the third-party API, relying on the Linkedin Jobs Search API. To use this API, signup for RapidAPI, if you haven’t already.

Now take the following steps to implement the helper class.

Step 1: Add your RapidAPI’s API key to the .env file

To be able to make API calls to the Linkedin API, you will need your account’s API key.

When logged into RapidAPI, go to the Linkedin Jobs Search page and click on the Subscribe to Test button, and then subscribe to the basic/free plan. Once subscribed, the Subscribe to Test button will be replaced with the Test Endpoint button.

Free users are only entitled to 15 API calls/month for the Linkedin job search API, so keep that in mind when trying to test the endpoints. I will advise that you do so sparingly.

Now that you are subscribed to the API, you can grab your API key. To do that, go to the page of any of the APIs above and copy the value of the X-RapidAPI-Key headers variable in the Code Snippets tab. See the image below:

Figure 1 - RapidAPI Key

Next, add the entry RAPID_API_KEY = axx… to your .env file, where axx… is the API key you just copied on RapidAPI.

Next, let’s create the helper class.

Step 2: Create the helper class.

Create a file api_helper.py in your project directory, then copy and paste the code snippet here in the new file.

Great! Now we have all we need to connect to CloudAMQP and to send http requests to the Linkedin job search API. Next, we will implement the logic needed to consume messages from CloudAMQP and, subsequently, send requests to the Linkedin API based on the search term in the message consumed.

Connecting to CloudAMQP and the Linkedin job search API

Here, the first goal is to grab messages published to our RabbitMQ instance on CloudAMQP by the first service. Second, we will use the content of the message, more specifically the “search_term” and “location” params to make a request to the Linkedin API.

First, go to the main.py file and add these imports to the list of import statements at the very top of the file.

from cloudamqp_helper import cloudamqp
from api_helper import jobs_api
import json

Essentially, we just imported our helper classes and the built-in json function into the file. Next, update the callback() function defined in the main.py file with the snippet below.

body = json.loads(body.decode('utf-8'))
search_term = body.get("search_term")
location = body.get("location")
email = body.get("email")


payload = {
  "search_terms": search_term,
  "location": location
}

jobs = jobs_api.get_jobs(payload=payload)

Next, update the print statement in the main() function with the line below.

cloudamqp.consume_message(callback=callback)

In the snippet above, the consume_message() function will grab messages from CloudAMQP. Once a message is received, the callback() function will then be invoked. The callback function is responsible for making an API call to the Linkedin job search API and returning all relevant jobs.

You can now run your first service and publish a message to CloudAMQP - alternatively, you can do that from the management interface. Once you’ve published a message, you then run this second service.

If everything works well, you will see a list of all the relevant jobs displayed on the console, and that’s it. Next, we will implement the logic that will then email these jobs to the user. At this point, your main.py should look like this.

Now that we are able to grab relevant jobs from the Linkedin jobs search API, we need to use Twilio Sendgrid to email those jobs to the user.

Email relevant jobs to a user

Take the following steps to implement the logic that emails the link of a job posting to a user. Even though the Linkedin job search API returns lots of job postings, to simplify things, we will only email one job posting to the user.

Step 1: Create a Twilio account

To begin here, sign up for a free SendGrid account if you haven't already.

Step 2: Grab the API key

Once logged in, go to the Settings section of the SendGrid UI and click the Create API Key button. Give your API key a name on the next screen and proceed to create it.

Once done, copy the API Key displayed on your screen and create a new entry in your .env file with the value of the API Key as shown below. SENDGRID_API_KEY=your api key here

Step 3: Create a Twilio Sender Identity

Next, we need to create a Single Sender Identity. A Sender Identity represents your 'From' email address—the address your recipients see as the sender of your emails.

If you are yet to create a Sender identity, you’d be greeted with the page shown in the image below. Click on the Create a Single Sender button on that page.

Figure 2 - Single Sender identity

Next, fill in all the required information on the next page and click the create button. You will get an email verification link in your inbox. Click on the link to verify your email address.

Step 4: Create the Twilio helper class

Here, we will create the helper class that we will use to connect to Twilio and email the link to a job posting to the end user. Create a file twilio_helper.py in your project directory, and add the snippet below to it.

import os
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail, To
from dotenv import load_dotenv

# Load the .env file
load_dotenv()

class TwilioHelper:
  FROM_EMAIL = “your single sender email address”

  def __Send_email(self, message: Mail):
    """ Send an email to the provided email addresses"""

    try:
      sg = SendGridAPIClient(os.environ.get("SENDGRID_API_KEY"))
      response = sg.send(message)
      code, body, headers = response.status_code, response.body, response.headers
      print(f"Response Code: {code} ")
      print(f"Response Body: {body} ")
      print(f"Response Headers: {headers} ")
      print("Email Sent!")
    except Exception as e:
      print("Error: {0}".format(e))
    return str(response.status_code)

  def email_job_listing(self, to_email: str, job: dict):
    TO_EMAILS = [
      To(
        email=to_email,
        substitutions={
          "-job_url-": job.get("linkedin_job_url_cleaned"),
          "-company_name-": job.get("company_name"),
          "-job_location-": job.get("job_location"),
          "-date_posted-": job.get("posted_date")
        }
      )
    ]

    html_content=(
      "Hello there from GetMeHired! We've got new job posting for you :)"
      "Job URL: -job_url-"
      "Company Name: -company_name-"
      "Job Location: -job_location-"
      "Date Posted: -date_posted-"
    )

    message = Mail(
      from_email=self.FROM_EMAIL,
      to_emails=TO_EMAILS,
      subject="New Job Posting - GetMeHired",
      html_content=html_content
    )

    self.__Send_email(message)

twilio_api: TwilioHelper = TwilioHelper()

The class above would essentially allow us to connect to Twilio. Additionally, it exposes the email_job_listing function that forwards job postings to a user’s inbox.

Next, we will invoke that function in our main.py file.

Step 5: Send email to users

Here, we will invoke the Twilio helper function that forwards jobs to users via email. To do that first, add the snippet below to the list of imports in your main.py file at the very top of the file.

from twilio_helper import twilio_api

Next, go to the callback() function in the main.py file, and add the line below to the end of the function.

twilio_api.email_job_listing(to_email=email, job=jobs)

In the end, your main.py file should look like this.

To test the entire project you can run the first service, the producer, and then publish some messages to the RabbitMQ instance on CloudAMQP. After that you can then run the second service, the consumer - once the consumer is up, it will grab all relevant jobs and email the first one to t he email submitted.

That’s it!

Conclusion

In this article, we built a minimal python server with FastAPI. Overall, we are seeking to implement a distributed architecture with two services, and this is the second service. The goal of this service is to have a consumer that grabs messages from CloudAMQP.

This service is also responsible for processing the messages it receives. Processing a message here entails making an API call to the Linkedin jobs search API to grab relevant jobs matching the user’s search query. Additionally, this service also connects to Twilio and uses it to email jobs to users.

With this service completed, we are pretty much done with the implementation of our distributed architecture. That notwithstanding, we are still going to have one more article in this series to wrap things up.

This next article will cover some ways to extend/modify our rudimentary project to fit a real-world scenario.

We’d be happy to hear from you! Please leave your suggestions, questions, or feedback in the comment section below or get in touch with us at contact@cloudamqp.com.

CloudAMQP - industry leading RabbitMQ as a service

Start your managed cluster today. CloudAMQP is 100% free to try.

13,000+ users including these smart companies