sabato 14 gennaio 2017

Play and SSE (with Akka Streams)

One of the thing that I like most of my job is that it requires a constant learning. Cloud, microservices, reactive systems etc. are more that a buzzword. They are here to stay for a long time and every developer needs to embrace the change... and so do I :-)

Let's begin with...

Some history

Last year, my company decided to migrate its main product to the cloud. This product, like many existing software products, is a monolith (it's made by more than one monolith, to be honest; it was born in the mighty .ear era, where that kind of architecture was perfectly viable and acceptable), and it cannot be deployed as is on a modern PaaS like Docker Datacenter.... ok, that's not completely true, but let's assume that it is :-)

Given that, we (the R&D team) were given the possibility to investigate on new technologies that can be used to improve, or rewrite from scratch, our product. My task (not only mine, to tell the truth) was to investigate the Lightbend platform.


The Lightbend platform

A brief visit to the corporate website can give you some basic information about the Lightbend platform. Its main component are:

  • Akka
  • Play
  • Lagom
  • Conductr
I decided to start with Akka, the actor-based message-driven runtime at the core of the platform. It didn't take long to understand the huge potential of this toolkit! After playing for a while, and with big satisfaction I have to say, with Akka (writing some actors, starting some clusters, playing with streams) I switched to Play. What I said about Akka can be said also for Play, it's a great tool to write scalable and non-blocking web application.

While studying Play, I decided to implement, as a personal exercise (let's call it a proof of concept), a simple webapp that pushes messages to the browser using Server-Sent Event (SSE).

I won't describe SSE in this post, as there are many web sites that can give you all the informations you need. It is enough to say that is a very lightweight protocol, and if you need to push data to a browser, it's surely the best way to go. Another viable option is WebSocket, but it has some drowbacks, mainly the fact that it requires a protocol update, from http to ws.

The use case I had in mind was quite simple. When a user points its browser to a specific web page, an SSE channel is opened. Clearly we can have many active users (that is, many opened pages and, therefore, many open channels) simultaneously. When someone posts a message in another web page, this message must be pushed to all the opened SSE channels and displayed in any browser showing the web page.
Nothing special, you may guess, except for the fact that the I wanted the SSE channel to be backed by an Akka Stream (more on that later), stream fed by an Akka actor, with the role of publisher.

Like all the modern human beings, I started my little project googling around, but what I found didn't satisfy me completely, so I decided to implement it by myself. Source code is available here. Make sure to checkout the develop branch.

This is how it works:

  • start application with the command activator run (you have to install Typesafe Activator)
  • point your browser to the page http://localhost:9000/dashboard
    • if you like, open that page in more panels of the browser or in another browser
  • with a REST client (like Postman), perform a POST request to http://localhost:9000/message/[your_text_here]
  • The message you typed as a path parameter will be displayed in the dashboard page
Nothing special indeed :-)

Ok, it's time for some explanations on the code. 

I'll start with a picture, explaining how to open an SSE channel from server to browser, backed by an Akka Stream.



If you're not totally new to Play, you may know that Play let clients interact through a Controller object, that can be reached through one or more routes declared in a proper configuration file.

Here it is a snippet of the routes configuration file:

/dashboard       controller.Application.dashboard
/stream          controller.Application.stream
/message:ean     controller.Application.message(ean: String)


The Controller, through a StreamSupervisor (an internal actor, don't mind it) creates a PublisherActor (we will have one PublisherActor per opened SSE channel) that register itself to the PublishersManagerActor. The PublisherActor is the actor that actually will push the messages on the Akka Stream, materialized over a SSE channel.

The PublishersManagerActor manages all the available publishers (it simply caches them in a map, with no eviction, DON'T DO this in production!!!!) and, when it received a message from the controller, forwards the message to the registered publishers, that will send it on the SSE channel.

Let's show this in a sequence diagram.



The real beauty is that all the interactions with the actor depicted in the previous sequence diagrams are completely asynchronous, in a fire-and-forget fashion.
This is the real strength of Akka!! As an example, consider the message interaction between Controller and PublishersManagerActor in the above picture: it is completely asynchronous, so Controller can return a HTTP 200 status code to the client without waiting the publishers to complete their work.

This is the relevant code of the controller class, called Application:


@Singleton
public class Application extends Controller {

    private ActorSelection publishersManager;


    @Inject
    public Application( ActorSystem actorSystem ) {
        publishersManager = actorSystem.actorSelection( "/user/" + Constants.PUBLISHERS_MANAGER_ACTOR );
    }

    /**
     * home page
     */
    public Result dashboard() {
        Logger.debug( "Application: dashboard" );
        return ok( dashboard.render() );
    }


    /**
     * Sends a message to all the registered SSE publishers
     * @param text the text to be sent
     * @return a result object
     */
    public Result message( String text ) {
        Logger.debug( "Application: message [{}]", text );
        publishersManager.tell( new Message( text ), ActorRef.noSender() );
        return ok();
    }

    /**
     * Opens a SSE connection backed by an Akka stream
     *
     * @return a chunked result
     */
    public Result stream() {

        Logger.debug( "Application: start stream" );

        Source<EventSource.Event, ?> eventSource =
                Source.actorPublisher( PublisherActor.props() ).
                        map( msg -> EventSource.Event.event( (String) msg ) );

        return ok().chunked( eventSource.via( EventSource.flow() ) ).as( MimeTypes.EVENT_STREAM );
    }
}



The Application.dashboard() method only renders the dashboard.scala.html template. This template contains the javascript stuff to open the SSE channel invoking the /stream route, that is bound to the Application.stream() method (see source code for the details).

Before analyzing the Application.stream() method, just a few words about the constructor of the controller class. You can see that, using the actor selection feature, we can store a reference to the PublishersManagerActor. The Application.message() method use this reference to send a message to all registered publishers. 
The PublishersManagerActor is created by Play, using the Guice framework, at application startup. You can find the code that wires up the publishers manager in the class Module of the project.

Now let's analyze the Application.stream()method, the most important one of the Application class. With this method, called via Javascript from the /dashboard web page, we can open a SSE channel from the server to the browser.

This method is heavily based on Akka Stream library. The Akka Stream topic is very wide, so we'll introduce here just the basic concepts. If you want to know more about it, and I encourage you to do that, visit this page. An Akka Stream is composed by, at least, three component.


  • Source; it is actually the source of the data. It can be an actor, like this tutorial, but it can be anything that is able to generate a stream of data.
  • Flow: represents a transformation phase. With a Flow we can transform the data coming from the Source in a format that can be ingested by the Sink
  • Sink: the actual destination of the stream of data
When we declare a stream, we are actually creating a description of the stream itself. In Akka Stream terms, we call this a blueprint. The process that actually translate a blueprint into a real flow of data is called materialization
The most important feature of reactive streams, and therefore of Akka Streams, is back-pressure, the ability of Source and Sink to coordinate themselves about the optimal throughput of the stream, in order not to overwhelm Sink capacity.

Back to the Application.stream() method now. Here it is the code again:


    public Result stream() {

        Source<EventSource.Event, ?> eventSource =
                Source.actorPublisher( PublisherActor.props() ).
                        map( msg -> EventSource.Event.event( (String) msg ) );

        return ok().chunked( eventSource.via( EventSource.flow() ) ).as( MimeTypes.EVENT_STREAM );

    }


Our Source will be an Akka actor, so we start the blueprint with the Source.actorPublisher() method, which accepts a Props object. Not all actors are suitable to act as publisher, only the ones that implements the AbstractActorPublisher class. We'll show the code of the actor later, now it's better to go on with the stream definition. 

The class play.libs.EventSource is the core class of the Play framework related to SSE stuff, and in Play 2.5 it was completely rewritten adapting it to the use of Akka Streams. That said, the stream of data produced by the publisher actor must by transformed in a EventSource.Event object, and we do this using the Source.map() method. What we obtain is an instance of the Source<EventSource.Event, ?> object. As you may have noticed, the EventSource.Event object wraps a simple String.

Now we have to introduce a transformation phase (a Flow). This can be achieved using the Source.via() method, passing it an instance of a Flow<EventSource.Event, ByteString, ?> object created with the EventSource.flow() method.

The final step is the materialization of the stream. This is performed by the Result.chunked() method, which accepts a Source<ByteString, ?> object, the result of the transformation process performed by the Flow object

The work is done now. The last thing we have to do is simply to declare the mime type of the Result object returned by the controller, using the Result.as() method.


We described how to wire the stream, now we'll see how to feed the stream itself with a publisher actor. This is the code, inspired by what you can find here:

public class PublisherActor extends AbstractActorPublisher<String> {

    /**
     * Buffer of messages
     */
    private final List<String> buf = new ArrayList<>();

    public static Props props() {
        return Props.create( PublisherActor.class );
    }

    private ActorSelection publishersManager;

    public PublisherActor() {
        Logger.info( "PublisherActor: building actor" );

        receive( ReceiveBuilder.

                match( Message.class, msg -> {
                    Logger.info( "PublisherActor: received: {}", msg.text );
                    handleMessage( msg.text );
                } ).
                match( ActorPublisherMessage.Request.class, request -> {
                    Logger.info( "PublisherActor: received request: {}", request );

                    deliverBuf();

                } ).
                match( ActorPublisherMessage.Cancel.class, cancel -> {
                    Logger.info( "PublisherActor: received cancel: {}. Unregistering", cancel );
                    publishersManager.tell( new Protocol.Unregister(), context().self() );
                    context().stop( self() );

                } ).
                matchAny( msg -> {
                    Logger.info( "PublisherActor: received (and ignored) {}", msg );
                } ).
                build() );

        publishersManager = context().actorSelection( "/user/" + PUBLISHERS_MANAGER_ACTOR );
        Logger.debug( "PublisherActor: registering myself ({}) to PublishersManagerActor", context().self() );
        publishersManager.tell( new Register(), context().self() );
    }

    void handleMessage( String message ) {
        if ( buf.isEmpty() && totalDemand() > 0 ) {
            Logger.debug( "PublisherActor: calling onNext with '{}'", message );
            onNext( message );
        }
        else {
            Logger.debug( "PublisherActor: buffering {}", message );
            buf.add( message );
            deliverBuf();
        }
    }

    void deliverBuf() {
        Logger.debug( "PublisherActor: totalDemand() {}", totalDemand() );
        while ( totalDemand() > 0 ) {
            if ( totalDemand() <= Integer.MAX_VALUE ) {
                final List<String> took =
                        buf.subList( 0, Math.min( buf.size(), (int) totalDemand() ) );
                took.forEach( this::onNext );
                buf.removeAll( took );
                break;
            }
            else {
                final List<String> took =
                        buf.subList( 0, Math.min( buf.size(), Integer.MAX_VALUE ) );
                took.forEach( this::onNext );
                buf.removeAll( took );
            }
        }
    }
}

A publisher actor must extends the AbstractActorPublisher<T> class. When we use a publisher actor, the back-pressure is handled via the messages ActorPublisherMessage.Request e ActorPublisherMessage.Cancel. The most important is the Request message, that is sent by the Sink to inform the Source about how many object it can accept. This number is returned by the AbstractActorPublisher.totalDemand() method. Internally we use a buffer that can grow without limit (warning!!!!! this is only a proof of concept, in real-life use case probably you have to use something better than this) to keep the messages that are not pushed to the Sink because of back-pressure. To actually push a message to the Sink, it's enough to call the AbstractActorPublisher.onNext() method. In our code this is performed when the actor receives a Message object.

Another important thing to notice is how we register the PublisherActor to the manager, that manages all the available publishers. This is done in the constructor

        publishersManager = context().actorSelection( "/user/" + PUBLISHERS_MANAGER_ACTOR );
    
        publishersManager.tell( new Register(), context().self() );

We simply obtain a reference to the PublishersManagerActor, created at application startup, via actor selection facility, and send it a Register message. When the PublishersManagerActor receives a Register message, it caches the reference to the publisher, an ActorRef object, in its internal cache (warning!!!!! this is only a proof of concept, in real-life use case probably you have to use something better than this, like a cache with eviction policy).

This is the code of the manager actor.

public class PublishersManagerActor extends AbstractActor {

    private Map<String, ActorRef> publishers = new HashMap<>();

    public PublishersManagerActor() {

        receive( ReceiveBuilder.
                match( Protocol.Register.class, msg -> {
                    Logger.info( "registering {}", sender() );
                    publishers.put( sender().toString(), sender() );
                } ).
                match( Protocol.Unregister.class, msg -> {

                    Logger.info( "unregistering {}", sender() );
                    publishers.remove( sender().toString() );

                } ).
                match( Protocol.Message.class, msg -> {

                    publishers.values().
                         forEach( ref -> ref.forward( msg, context() ) );

                } ).
                build() );
    }
}

It's a very simple actor. Just notice how the references collected via Register is cached when the actor receives a Message object.

If you take a look again at the code of the controller class Application, you can clearly see that the Application.message() method, sends a Message object to the PublishersManagerActor, actually leaving it the task to forward the message to all the registered publishers, and then to all the SSE opened stream.

Some screenshots now.

Once the application is started, browse to the http://localhost:9000/dashboard page in a browser (don't expect any beautifully designed page, I'm not a front-end guy....).


Now, with a http client like Postman, send a POST message to http://localhost:9000/message/[place_your_text_here] (in a real life use case, the text should be placed in the body but, again, it's a POC, so do it as an homework, if you like).

When you send the POST request, the message will be displayed in the dashboard page. If you open the same page in many panels, or in other browsers, all the open pages will show the message.


That's all folks! Enjoy...

Disclaimer 1. As I said many times in this post, this is only have to be considered as a proof of concept, implemented to discover how to use an actor-based Akka Streams to feed an SSE channel. Many thing are simplified (the cache of publishers, the internal buffer of messages etc.) and therefore this code MUST NOT be considered as production-ready. Hope it can help anyway. Let me know also if this code can be improved in any way... after all, I'm learning :-)

Disclaimer 2: the Akka Stream (and the reactive streams) topic is very complex, and I only scratch the surface of it with this post. Feel free to learn more about it.

Disclaimer 3: I'm not english, so forgive me if this is post is not always understandable.







venerdì 27 marzo 2015

Fork Join to transform list of objects - java 8

Following the previous post, here you are a little update if you're using java 8. The interface TransformFunction can be transformed into a functional interface.

@FunctionalInterface
public interface TransformFunction<T, S> {

        T transform (S source);

}

Doing this, you can trigger the process that converts a list of Integer into a list of String object typing something like this:

ListTransformer<String, Integer> transformer = new ListTransformer<>(sources, source -> Integer.toString(source));
List<String> result = transformer.compute();

You can notice how it all becomes more simple.

Hope it can help!




giovedì 12 marzo 2015

Fork Join to transform list of objects

I'm starting this new blog with a little framework that is aimed to simplify the processing of lists in Java.

I'm quite sure in your career you have faced the following use case: given a list of objects you have to transform it in a new list of objects, where each object is obtained through a transformation (complicated or simple) of the original objects.

The simple way to resolve this problem is to iterate through the original list, transform each object and add it to a new list. It is something like this:


List<Object> newList = new ArrayList<Object>();
for (Object originalObject : originalList) {
    newList.add(transform(originalObject));


It is simple, isn't it?

The problem may arise when you have a huge amount of objects.

Keep in mind that we're not talking about big data, so the 'huge' word should be intended as thousand of objects, a sizing that is too little to start thinking about a complicated map/reduce algorithm but, at the same time, it has impact on performances if the process if executed sequentially like showed in the code snippet above.

The solution I developed is based on the Fork/Join pool introduced in Java 7. Probably with Java 8, with its support for lambdas, there is a more elegant and coincise solution, but our target JVM at the moment is JDK 7, so I focused on this platform.

This is the generic list transformer.



import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;

public class ListTransformer<T, S> extends RecursiveTask<List<T>> {

private static final long serialVersionUID = 1L;

static final int LIMIT = 1000;

private List<S> sources;
private TransformFunction<T, S> function;

public ListTransformer ( List<S> sources, TransformFunction<T, S> function ) {

super ();
this.sources = sources;
this.function = function;
}

@Override
protected List<T> compute () {

if ( sources.size() < LIMIT ) {
List<T> preparedEntities = new ArrayList<T> ();
for ( S source : sources ) {
preparedEntities.add ( function.transform( source ) );
}
return preparedEntities;
}
else {

int divider = sources.size () / 2;

ListTransformer<T, S> curLeft = new ListTransformer<T, S> ( sources.subList ( 0, divider ), function );

ListTransformer<T, S> curRight = new ListTransformer<T, S> ( sources.subList ( divider, sources.size() ), function );

curRight.fork ();
List<T> leftResult = curLeft.compute ();
List<T> rightResult = curRight.join ();
leftResult.addAll ( rightResult );

return leftResult;
}
}

public interface TransformFunction<T, S> {

T transform (S source);
}
}


The class is parameterized: S should be intended as the source object, T as the transformed object. The process is quite simple. If the size of the processed list is above a reasonable limit, we create two new subtasks, firing the first and wait for the completion of the second, which can trigger internally other tasks until we get a list of the desired size to apply the transformation.

Let's see it in action. Suppose you have to deal with the trivial task of transforming a list of Integer into a list of String. This is a test case that shows how to implement this task with a Fork / Join approach

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

import org.junit.Assert;
import org.junit.Test;


public class ListTransformerTest {


@Test
public void testList() {
List<Integer> source = new ArrayList<Integer>();
for (int i = 0; i < 5000; i ++) {
source.add(i);
}

ForkJoinPool pool = new ForkJoinPool ();

List<String> transform = pool.invoke ( new ListTransformer<String, Integer> ( source, 
new ListTransformer.TransformFunction<String, Integer>() {

@Override
public String transform ( Integer source ) {

return source.toString ();
}

   } ) );

for (int i = 0; i < 5000; i ++) {
Assert.assertEquals( "" + i, transform.get ( i ) );
}
}
}


The test checks that the order of the new list is what we're expecting. This is important, because I don't think you want that the new list contains objects in random order :-)

Now it's time to demonstrate that this is not only an exercise but it can actually give you much better performances. I created a simple JMH benchmark (in a future post I will talk about this great framework) to test the performances of the Fork / Join approach compared to the traditional sequential approach. The tests were conducted on a MacBook Pro late 2014 model (core I7 2.4 GHz, 16 GB Ram)

Benchmark                              (size)   Mode  Cnt      Score  Error 
ListTransformerBenchmark.testNew         1000  thrpt    5  26182,426 ±  899,082  ops/s
ListTransformerBenchmark.testNew         2000  thrpt    5  16690,305 ± 2213,340  ops/s
ListTransformerBenchmark.testNew         5000  thrpt    5   9268,967 ±  257,888  ops/s
ListTransformerBenchmark.testNew        50000  thrpt    5   1356,710 ±   81,285  ops/s
ListTransformerBenchmark.testStandard    1000  thrpt    5  32834,898 ±  870,725  ops/s
ListTransformerBenchmark.testStandard    2000  thrpt    5  15759,069 ±  884,292  ops/s
ListTransformerBenchmark.testStandard    5000  thrpt    5   6198,490 ±  253,743  ops/s

ListTransformerBenchmark.testStandard   50000  thrpt    5    561,933 ±   27,503  ops/s

The testNew, as you may guess, is the method that use the Fork/Join transformation, while the testStandard method use a simple loop.

The results show that for a small list, the traditional approach is faster, probably because the overhead necessary to create the ForkJoin pool is heavier than the transformation process itself. When the list reaches a size of some thousand of objects, the multithreaded approach shines. We can see that for a list of 5000 objects, the score reached for the Fork/Join transformation is 9228 operation per second, while the sequential process stops at 6199 operations per second. A nice gain of performance I think.

For bigger lists, the Fork/Join approach shows better performances. For a list of 50000 objects, the multithreaded transformer grants a huge gain of performance (the 240%).

If you're in need of extreme optimization on your codebase, I think that this is a valuable solution.

Cheers