In this talk from RabbitMQ Summit 2019 we listen to Madhav Sathe and Marcial Rosales from Pivotal.
In this live-coding session we explore a variety of event processing and streaming solutions built with RabbitMQ. We will show how Spring Cloud Streams and RabbitMQ provide abstractions for developers to build event driven apps. You will learn tools and techniques to solve stateless and stateful streaming problems using RabbitMQ and Reactive Streams.
Madhav Sathe ( Twitter, GitHub ) works as a cloud applications and platform architect at Pivotal. He helps customers in their application modernization journey and cloud strategy. Prior to joining Pivotal, Madhav spent most of his time at Oracle, first as developer and then as a product manager. Madhav developed and managed Oracle's IT analytics, application and middleware monitoring, management solutions.
Marcial Rosales ( Twitter, Github) spent most of his career implementing trading platforms, and now works as Principal Software Engineer at Pivotal. When he joined Pivotal 4 years ago, he worked with customers on their cloud-native transformation journey to Cloud Foundry. In the last year, he helps customers get the best experience using RabbitMQ.
Rapidly Building Event Driven and Streaming Applications with RabbitMQ
In today’s session we’re going to talk about building resilient streaming applications with RabbitMQ using the active stack or reactor. Our agenda for this presentation includes abstractions provided by the Spring cloud stream for event-driven micro services. Then we will discuss about the requirements for building reliable and efficient streaming applications.
The Developer Stack
One of the first requirements for building any event-driven application is the ability to filter events in real-time. That is essential for stately streaming applications. This developer stack has been out there for quite some time and people are using it in production. It is basically a binder-based-integration-based Spring for developer stack where the binder implements a lot of abstractions for creating the resources necessary for your application such as queues. The binder can be swapped out for RabbitMQ or Apache Kafka and you can have your own binder for your own messaging system. From the application micro services point of view, the Spring boot gets the experience of Spring cloud stream for the input and output channels.
Here is a classic process or application which just means that it consumes data from one channel and produces data for another channel. The input and output both in this particular case will be your RabbitMQ queues. This is a classic Spring cloud stream abstraction provided by the actual business line. The business code here is one line and the rest of it is an abstraction provided by the Spring cloud stream.
Building Stateful Applications
Before we start discussing, we would like you to know why we’re giving you this talk and what work we’ve done behind in this deck. We started working after continuously hearing from customers that they wanted to build streaming applications that process things in real-time, especially the stateful ones.
“I wonder whether RabbitMQ is the right thing to use or not. If it is the right thing to use, then which is the API we should use?” Our first reaction was what do you mean by streaming a stateful application? I’m pretty sure that we can do streaming RabbitMQ every day, so what is the story about? So, the first thing that the customer said, “Yes, I can process these things in real-time using the current stack”. But what if I want to perform windowing aggregations? What if I want to process multiple events out in the window of time? So, we talked about processing multiple events not just one at a time. We said let’s take that as a challenge and let’s try to build ourselves a streaming application and experience first-hand what are the challenges that we need to face and how to overcome those challenges. The outcome of that is the work that we’ve done here.
Distributed Word Count
We said let’s build a distributive word count. It’s simple and everyone has seen what a word count is. It has all the ingredients that every straight-forward streaming application has. In the file we first started to write the streaming application–a landing port, a stream data from file straight to RabbitMQ. Let’s say we send those lines into RabbitMQ. There’s a queue, which has those lines. The line gives a hint of the stack we’re using.
We also have an extra application called the Line Splitter, which receives those lines in real-time and is going to extract those words to somehow filter them and then send them to RabbitMQ. Eventually we have an application called word-counter. It will receive those words in real-time and is going to count them in memory. Why not? People are wondering right now if I can use my stack like Java RabbitMQ or Spring AMQP. Yes, bring it on. We can write this application without any problem. As soon as we mention things like strong guarantee of delivery and many other attributes or capabilities we expect from a streaming application then it gets very interesting.
RabbitMQ Stream Processing Framework
In order to build this demo, we have to use a different framework. The framework that we chose was what we call our Stream Processing Foundation. It is made up of two libraries of Reactor RabbitMQ (pure Java) and project Reactor (pure Java). The versions are important because the customer didn’t know that this thing existed. Yet, it is extremely powerful. Why? Because, right out of the box I can perform a wealth of operations that I can do over a stream of data.
E.g we can stream data from different queues and merge. But all that comes with certain awareness and a cost. We are moving from procedural programming to functional programming, which is a big change for a developer. The Stream Processing Foundation is just a foundation. In order to build our distributed application, we realized that we needed extra capabilities that can be found in the repository, private and incubated. We don’t know yet how this is going to be retro-fitted into the RabbitMQ core or if it is going to be a separate library, which I will show you.
Regardless of what you want to do, I don’t want to lose any data. So, let’s go for the first challenge: guarantee of delivery. Obviously, we want to send messages to RabbitMQ and want to use parity of confirmation because we want to have a guarantee of delivery.
What happens if you get a negative publisher confirmation? That is an infrastructure problem and not a business problem. Why we could get negative publisher information? If RabbitMQ has a failure or if we have something in the queue like a maximum depth that is going to be beyond this point, then reject any further publishing. So, this is a very interesting point to have into the back-pressure mechanism in RabbitMQ. What we would expect from any streaming framework is don’t bother me with failures and deal with it. Eventually we expect that after a certain time it will be resending.
Live Code (line-importer)
I’m going to show you some codes. It doesn’t really matter if you understand the API. They are brand new right now and it’s functional programming, but all I want you to grasp is a glimpse of what is possible. In the live code, I will show you the Spring boot application. Although this is Spring, but all the coding that you will see here is entirely Java. It is not dependent on Spring. What I want to do is just copy a bit of the Flux <String> code into the clipboard. Flux String is just a convenient class that reads a file from the URI and returns the Flux <String>. This represents any stream that releases elements.
What do you want to do in a stream? I want to inspect it, or probably want to log it or filter or even transform it. All these things are already available in the concept of a Flux.
I am going to add a filter to all these lines and trim all the spaces. I am going to keep only those lines that are longer than zero. At this point, I have a pipeline ready to be involved and that’s going to be read from a file. It’s not that simple, but I’m going to give you the details.
Notice that I’m adding rabbit.send (lines). The line is now so expressive and extremely powerful. It does exactly what you want i.e. not bother you with failures and only tell you when something is done. In other words, it will send a string flux<string> lines = FluxSources and return Flux<string> sentLines = rabbit.send ( lines ). It is eventually going to be exchanged with a queue.
This bootcamp for me is a very revolutionary concept. The RabbitMQ reactive dominic queue, introduced a year ago uses this API. How cool it would be to treat a queue like a stream where if I put something in the stream and it goes back to the queue! On the contrary, a queue can be treated like a stream where I can do it from the stream whatever is in the queue. No channel and no publishing, just streams!
Let’s say we do this here:
rabbit.using(settings).<String>createStream( topicName: "lines");
In this case, this are the business objects in a string. But, imagine this could be anything. Under the code we’re using block serialization and deserialization by default. So, now this is going to determine what lines will stream. Now I can do something like this is a dream that I told you. However, this will connect the stream to queue.
I will do this now:
TopicStream<String> lineStream = rabbit.using(settings).<String>createStream ( topicName: "lines"); Flux<String> sentLines = lineStream.send ( lines );
With all the guarantees that the code provides, it is very much focused on what we’re trying to achieve. But we don’t want to finish and probably want to change suppressing with something else. Well, let’s say I want to calculate the throughput of things and send it to the RabbitMQ queue. What about if I want to save the last line sent to the file so that I can recover. Also, if I can do something like the below:
sentLines.window(Duration.ofSeconds(1)) Flux<Flux<String>> .flatMap(w -> w.count()) .scan ((previousCount, count) -> Long.sum(previousCount, count)) .subscribe (total -> System.out.printf(“Sent %d lines so far”));
Here, we want to window all the center lines and calculate how many things appear (in that window). Upon closing, the window emits a result in number .count() and then this can operate. What it does is process things as they go through the pipeline taking into account the accumulated value from the previous events. So, I can calculate 9.5/2 to five, seven, and so on. I can print out the total center lines per second.
This can be a bit too much to absorb now, but all I’m going to tell you is that a streaming application is not just about dealing with RabbitMQ queues. It’s even beyond RabbitMQ queues. If you have a ready-to-use toolbox and have RabitMQ’s support in it, you can also use Redis because it has a reactive RabbitMQ queue, and a reactive API. You can use a relation at the base that has a reactive driver.
Distributed Word Count
The challenge with distributed word count that we’re talking about here is end-to-end pipeline delivery. But, how about intermediately processing the RabbitMQ data as a processor and sending the data to another RabbitMQ. For that, we will read the data from lines-q-1 and then process that data into line-splitter, later producing the splitter line into multiple words with each other becoming a separate message on words-q-1. After all, at the end we want to count the words, so we have to split the line.
This basically represents the business logic for the Words Count application. So, line splitter is going to split that; however, on what basis will the line splitter let the lines know that the line has been processed and it can be deleted? Only upon confirmation that the processing is complete, wherein the word has been successfully published to the words queue. In this case, it is a word queue. It can also be your Redis database or any other reactive data sink.
Once confirmation, three messages are received here as three words were sent during the process. An acknowledgement is sent to the lines queue. That means there’s end-to-end delivery guarantee established here. If there’s a failure, then that means the line splitter will not send an acknowledgement to the lines queue. Just receiving the line from the lines queue is not a successful processed line. In fact, it also has to perceive the data as well.
For guaranteed delivery, the primary principle is to never acknowledge a message until it has been successfully processed. The second principle is how do we efficiently manage these delivery guarantees? Can we unite just thousands of lines, perhaps you’re receiving from the lines queue - does it makes sense to acknowledge just one by one? It’s not a good utilization of your network resources. In that case, the idea will be that you receive multiple lines as a stream and the acknowledgement is sent in a windowed manner to the lines-queue-1. Hence, multiple acknowledgements are sent together which guarantees the delivery and improves the performance.
Code Walkthrough (line-splitter)
In order to build scalable applications today, the reactive must be able to send data on demand no matter whether there are millions of information. This is a unique way to have predictable responses.
Next, we’ll take a look at the code for the line-splitter application. The first part is just a resource definition. Here we’re just going to need the lines resource and the words resource. Lines resource indicates the wiring necessary to get the data from the lines-queue-1 and words resource represents the wiring that is necessary to write data to the words queue.
Traditionally, if you’re reading the data you want to get delivered and receive acknowledgement, then you need an acknowledgeable delivery. That has a mechanism in place allowing you to know whether you received the message or not. It is not a message that is typed but is essentially a byte array. If you strongly want to receive a typed notification, then that must be converted to your business object. In this case the business object is type <String>.
As soon as you convert it to your business object, you lose the ability to send an acknowledgement. For that purpose, the abstract built here is a transactional topic stream that gives you the business object and a strongly typed stream as well it is encapsulated in the transaction semantics. This abstraction will be purposeful if there is enough customer requirement and it will be absorbed in the upstream open source as well.
Similarly, we also have strongly typed stream for words as well. Hence, we have the strongly typed stream for both lines (input) and lines (output). Both are strongly typed; both provide transactional semantics.
The first thing here is that we’re going to receive the line in a streaming manner where it is received on the flux. Flux is a way of receiving one or more collection of data stream in a strongly typed transactional manner. Once that is received, the stream is processed and that produces a bunch of words again as a stream. Here we’re going to process the line stream and get the words in the output stream. Notice that we’re going to process this TL (transactional line) and split it into different words based on the spaces. We will apply some filters based on business logic, counting the words that are only alphanumeric and more than four letters.
The important action that happens in this abstraction here guarantees end-to-end delivery. Only upon the delivery of these different individual words that have been spun out of that line, the TL can be successfully processed. So, for each word we’re going to find a separate stream from this TL and map it back to the parent. For every word that gets strongly typed to the transactional semantics stream, each of them will be mapped to the parent. Upon completion of the process, we’re going to say that it is committed. However, the commit is not yet done in reality–it is the intent of the commit. The process to commit knows that it has to commit only after committing all the words that is the child of this particular stream or child of this specific line. Here is when we take the word stream and map it to the line.
If you notice here, variable names are also chosen carefully, and the words are mapped to the line. These are going to be sent over the wire as a stream to the downstream RabbitMQ. Upon successfully sending those messages in this particular line, we are sending those messages to the RabbitMQ. After that we receive a publish confirmation and a strongly typed notification. Now, we’re going to start committing each and every word in a stream. Only when all these words are committed successfully will this be acted upon.
Transactional Semantics Across Streams
We read the lines from the lines-queue-1 as a line strongly typed object but encapsulated with the transaction semantics. Next, using the splitter we spun multiple words out of it. But they are mapped with the line to the parent. Those words are then sent to the words queue stream and the intent to commit is declared. From the lines point of view, it has split the word and completed the processing. However, the commit is not yet acted upon since the words are yet to be written to the queue.
Once the words are successfully written to the queue and the publish confirmation happens, each of these words start to commit. When each of these words start to commit and communicate back to that particular line, then only that line is acted upon and the acknowledgement is sent back to the lines queue. The acknowledgement for multiple lines is grouped together for network resource efficiency. This guarantees delivery in streaming across distributed systems. The lines of code for the queue provides end-to-end delivery guarantees efficiently across the distributed system.
Here the upstream queue could be one system and the downstream could be another system, completely running on different clusters. It could even be Redis or any other reactive data sync. That’s the power of this reactor API along with the built-in transaction semantics.
Using the reactive RabbitMQ plus the reactor can be a very compelling framework for streaming applications, which is not just about streaming data to RabbitMQ. You can stream lots of applications if you have a toolbox with you. You can do so much with your code that will be simpler, more robust and efficient. This is about a synchronous programming paradigm, which is not that simple. As soon as your application becomes complex, you have to have a very good tool set.
There’s another factor that we want to emphasize is the back pressure–the key feature of the reactive. If the data system sync is unable to take more data, the producer will slow down upon request. The end-to-end pipeline performs the back pressure as well. So, for some reason if the line splitter slows down, it means that the lines queue is going to fill up and the file reader application slows down and stops reading the data from the file.
The end-to-end back pressure in a reactive way is happening in a minimalistic code from the business point of view. Also, for the variation statefulness, the change lock in the distributed word count diagram connected internally with the reactor is going to go to Redis. If those change locks are not crowded with the word stream only then they are going to come back. The word counts have been counted completely and it can be saved and removed from RabbitMQ.
The final note is that if you’re going to build this process out of the reactor you need to understand the level of difficulty. It is not trivial; however, it is very simple to do it only because we have the reactor. Without the reactor with us, we obviously cannot attempt to do this. With this we now have stateful streaming applications with partitioning that is supported right out-of-the-box. So, you don’t even know that partitioning is happening.
Can this be scaled? Yes, of course it can be and distributed to the consumers in the partition. This is because consumers are running an algorithm that allows them to distribute the partitions and their own application instances. Hence, if one application goes awry, the other one takes over. So, partitioning, a certain type, and end-to-end delivery guarantee–these are the three important features that you should always have in any streaming framework beyond all the complex operations for doing analytics such as aggregation, windowing, and merging.