During my time working as a consultant, I have learned that one of the most popular patterns that clients ask for is the fork-join pattern. Basically the idea is that in the middle of a Mule flow, we need to do some stuff in parallel, wait for all paths to finish execution, and then continue.
Why would you need fork-join pattern? There are at least a hundred and one reasons, but let me give you a small example. Imagine you have a flow where you are required to do some lookups online, perhaps call 2 different external web-services. Since external web-services tend to be on the slow side when compared to invoking internal services, you might decide to do these lookups in parallel to reduce the response times. Once you collect the lookup data from both services, you carry on with your flow. Basically here we are looking for a reduction in response times.
A very good explanation of how this can currently be achieved in Mule can be found here.
This is all well and good, but I’m of the opinion that this is way too complicated to implement for a simple pattern. And complication usually also means worse performance, higher implementation costs and higher maintenance costs. I knew I had to do something, and I did.
First I created a simple flow with the fork-join pattern as you would normally do in Mule, and stress tested that. I discovered that on average, just to do the fork join, Mule was taking about 100ms under light load on a Mac with dual core HyperThreaded i7 processor. In my books this looked like a bit on the high side.
Then I decided to implement a custom message processor (all the code is available in the GIST at the end of this blog). This message processor is quite simple. As a configuration, it accepts a list of message processors. The idea is that each of these message processors will receive a copy of the current message, and then they are executed in parallel. Threads for execution are taken from a thread pool which is configurable too. The result of the message processor is a MuleMessageCollection, which contains a list of MuleMessages with the response from each execution, in the same order as configured. The following GIST shows an example of how you would configure this message processor.
Here we are invoking two web services in parallel, storing the result of each lookup in a flow variable (with the help of an enricher). As you can see, the ParallelMessageProcessor accepts a list of MessageProcessors, in this case they are called lookupWs1 and lookupWs2. These message processors can be any kind of message processors, but here we are calling other flows, which internally invoke the external web services.
Once we get the response from each individual web service, we are using the combine-collections-transformer to transform the MuleMessageCollection into a MuleMessage, and then take each individual result using MEL. The maximum of active threads on the internal thread pool is also configurable.
The following GIST is the code for this message processor. As you can see it’s very simple and easy to follow. The only restriction is that we need Java 1.6 (or above) as we use the ThreadPoolExecutor to execute the message processors in parallel. Having said that, this is not really an issues as it has became a requirement for the later versions of Mule.
The process() method is where all the magic happens. For each message processor configured, we create a ProcessorRunner which clones the current event and uses it to execute the message processor. Once the ProcessorRunner is created, we use the invokeAll() available on the ThreadPoolExecutor.
The invokeAll() returns immediately with a list of Future objects, used to get the response, and of course, wait where necessary.
Using the exact same tests as before, the new ParallelMessageProcessor happily invoked the web services in parallel, and the fork-join pattern was taking less than 5ms under the same load.
Hope you find this ParallelMessageProcessor as useful as I did!