CloudAMQP with Java Getting started

RabbitMQ has developed an excellent Java AMQP library. The full API documentation for the library can be found here. There are different ways to use the spring framework with RabbitMQ, the first spring example show spring-amqp with java and the second example uses bean templates.

Access CloudAMQP from Java with amqp-client

Begin to add the AMQP library as an dependency in your pom.xml file (use the latest version):

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.10.0</version>
  <scope>compile</scope>
</dependency>

Fetch the CLOUDAMQP_URL environment variable and subtitute with a url to a local RabbitMQ instance if you couldn't be found.

Create a ConnectionFactory and configure it with the URL. All connections are created from this factory and on the connection we create a Channel.

Declare a queue and publish a message to the "default exchange" with the queue name as routing key. This is a shortcut as all queues by default a bound to the "default queue" with the it's name as routing parameter.

Then start a consumer which listens to that queue prints out the message body to the console.

The full code can be seen at github.com/cloudamqp/java-amqp-example.

Code example Publish and subscribe

public static void main(String[] args) throws Exception {
  String uri = System.getenv("CLOUDAMQP_URL");
  if (uri == null) uri = "amqp://guest:guest@localhost";

  ConnectionFactory factory = new ConnectionFactory();
  factory.setUri(uri);

  //Recommended settings
  factory.setConnectionTimeout(30000);

  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();

  String queue = "hello";     //queue name
  boolean durable = false;    //durable - RabbitMQ will never lose the queue if a crash occurs
  boolean exclusive = false;  //exclusive - if queue only will be used by one connection
  boolean autoDelete = false; //autodelete - queue is deleted when last consumer unsubscribes

  channel.queueDeclare(queue, durable, exclusive, autoDelete, null);
  String message = "Hello CloudAMQP!";

  String exchangeName = "";
  String routingKey = "hello";
  channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
  System.out.println(" [x] Sent '" + message + "'");

  DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
  };
  channel.basicConsume(queue, true, deliverCallback, consumerTag -> { });
}

Access CloudAMQP with spring-amqp and java

If you are using the Spring Framework for Java you can use the spring-amqp library for RabbitMQ. Good documentation about the library can be found in the Spring documentation for AMQP. Begin by adding follwing dependencie to your POM.xml-file dependencies section (use the latest version):

<dependencies>
  <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.5.RELEASE</version>
  </dependency>
</dependencies>

Fetch the CLOUDAMQP_URL environment variable from the console page for your instance.

Code example Publish and subscribe

Create a CachingConnectionFactory and configure it with username, password and vhost. All connections are created from this factory and on the connection we create a Channel.

public static void main(String[] args) {
  // set up the connection
  CachingConnectionFactory connectionFactory=new CachingConnectionFactory("bunny.cloudamqp.com");
  connectionFactory.setUsername("rozcdysg");
  connectionFactory.setPassword("Mx9GntDW4WBJvmY2_M_Qr2_a4gRGc3_G");
  connectionFactory.setVirtualHost("rozcdysg");

  //Recommended settings
  connectionFactory.setRequestedHeartBeat(30);
  connectionFactory.setConnectionTimeout(30000);

  //Set up queue, exchanges and bindings
  RabbitAdmin admin = new RabbitAdmin(connectionFactory);
  Queue queue = new Queue("myQueue");
  admin.declareQueue(queue);
  TopicExchange exchange = new TopicExchange("myEExchange");
  admin.declareExchange(exchange);
  admin.declareBinding(
    BindingBuilder.bind(queue).to(exchange).with("foo.*"));

  //Set up the listener
  SimpleMessageListenerContainer container =
    new SimpleMessageListenerContainer(connectionFactory);
  Object listener = new Object() {
    public void handleMessage(String foo) {
      System.out.println(foo);
    }
  };

  //Send a message
  MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
  container.setMessageListener(adapter);
  container.setQueueNames("myQueue");
  container.start();

  RabbitTemplate template = new RabbitTemplate(connectionFactory);
  template.convertAndSend("myExchange", "foo.bar", "Hello CloudAMQP!");
  try{
    Thread.sleep(1000);
  } catch(InterruptedException e) {
     Thread.currentThread().interrupt();
  }
  container.stop();
}

Access CloudAMQP with spring-amqp and beans

If you are using the Spring Framework for Java you can use the spring-amqp library for RabbitMQ. Good documentation about the library can be found in the Spring documentation for AMQP. Begin by adding follwing dependencie to your POM.xml-file dependencies section (use the latest version):

<dependencies>
  <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.5.RELEASE</version>
  </dependency>
</dependencies>

Fetch the CLOUDAMQP_URL environment variable from the console page for your instance.

Code example Publish and subscribe

Start to create the XML bean definitions for sender-context.xml and listener-context.xml


<!-- /cloudamqp/src/main/resources/listener-context.xml -->

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
                      http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
                      http://www.springframework.org/schema/rabbit
                      http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
  <rabbit:connection-factory id="connectionFactory"
                             host="host"
                             virtual-host="vhost"
                             username="username"
                             password="password" />
  <rabbit:admin connection-factory="connectionFactory" />
  <!-- Create queue -->
  <rabbit:queue id="mySpringQueue" auto-delete="false" name="mySpringQueue" />
  <!-- create myExchange and bind mySpringQueue to myExchange-->
  <rabbit:topic-exchange id="myExchange" name="myExchange">
    <rabbit:bindings>
      <rabbit:binding queue="mySpringQueue" pattern="my.*"></rabbit:binding>
    </rabbit:bindings>
  </rabbit:topic-exchange>
  <!-- instantiate listener -->
  <bean id="myListener" class="com.cloudamqp.amqp.Listener" />
  <rabbit:listener-container id="myListenerContainer" connection-factory="connectionFactory">
    <rabbit:listener ref="myListener" queues="mySpringQueue" /></rabbit:listener-container>
  </beans>

<!-- /cloudamqp/src/main/resources/listener-context.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
                           http://www.springframework.org/schema/rabbit
                           http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
   <!--  create a rabbit connection factory with specified parameters -->
   <rabbit:connection-factory id="connectionFactory"
                              host="host"
                              virtual-host="vhost"
                              username="username"
                              password="password" />
   <rabbit:admin connection-factory="connectionFactory" />
   <!-- create a bean that sends messages to myExhange-->
   <rabbit:template id="cloudamqpTemplate" connection-factory="connectionFactory"  exchange="myExchange"/>
 </beans>

Create a Listener class that implements MessageListener. This class will listen for messages from myQueue, as specified in the listener-contet.xml file.


package com.cloudamqp.amqp;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class Listener implements MessageListener {
  public void onMessage(Message message) {
    String messageBody= new String(message.getBody());
    System.out.println("Message received: "+messageBody);
  }
}

Create a ListenerContainer class that reads resources from the context file.


package com.cloudamqp.amqp;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ListenerContainer {
  public static void main(String[] args) {
    ApplicationContext c1 = new ClassPathXmlApplicationContext("listener-context.xml");
  }
}

Create a sender class tha sends messages to the exchange my.routingkey.


package com.cloudamqp.amqp;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Sender {
  public static void main(String[] args) throws Exception {
    ApplicationContext context = new ClassPathXmlApplicationContext("sender-context.xml");
    AmqpTemplate aTemplate = (AmqpTemplate) context.getBean("cloudamqpTemplate");
    for (int i = 1; i < 6; i++)
    {
      aTemplate.convertAndSend("my.routingkey", "Hello CloudAMQP, Message # " +i);
    }
  }
}

Run the ListenerContainer as a Java application and the Sender as a Java application. The listener will display the messages send by the sender.

Java AMQP resources