A lot of new features have been introduced in the Mule 3.5 release. In this post I briefly explain how four of these new features can be used:
- The batch module
- The watermark and cron scheduler on the poll message processor
- The new database connector
Then I show how to apply these new features to achieve similar functionality to that described in a previous blog post.
The batch module lets us design a Mule application to do batch processing in a simpler and more intuitive fashion.
In the new batch module, a batch job is divided into 4 parts; Input, Load and Dispatch (performed automatically by Mule), Process and On Complete.
These are explained below.
As the name implies, the input phase is the part in the batch job where the data is loaded and prepared to be processed by the batch module. Since we can invoke the batch by using the ‘batch execute’ (even from a flow), the input phase is optional. This is similar to the behaviour of (private) flows where the message source can be either another flow, batch job or an inbound-endpoint.
Load and Dispatch Phase
The “load and dispatch” phase is done automatically by Mule; here Mule puts each message from the payload on a queue for the process phase.
The process phase is divided into a series of batch steps, each batch step can have multiple message processors (outbound endpoints, flow references, java components etc), also each step can be executed based on whether the message succeeded or failed in the previous step.
On Complete Phase
The “On Complete” phase has the results of the batch job, these results can be used for reporting.
Poll message processor
The poll processor has now been improved to include the functionality to define a cron expression as follows:
It is also important to note that if you are going to use the polling frequency, this should now be defined in a sub-element (called fixed-frequency-scheduler) instead of an attribute on the poll processor itself.
Watermark enables us to get a certain value (for example the biggest id) from the polled items and save this value to an object store (if the object store is not defined, one is created and used automatically by the watermark).
To use this feature we need to specify either a selector and a selector-expression or else we can use the update-expression.
The Selector attribute can take MIN, MAX, FIRST, LAST. The selector expression takes the expression to get the required element for example #[message.payload[‘Id’]].
On the other hand the update-expression lets us define a custom expression which updates the watermark value (this would usually be a variable we set in the flow).
Note; when debugging in Anypoint Studio, the watermark value is retained between different executions of the application, this is due to the fact that the watermark value is by default persisted on a persistant object store. If this is not the desired behaviour during development, one can change the object store to use the memory as described in another blog post or else change the run configuration to clear application data.
The new database connector
Mule now features a new database connector, this has big improvements on the previous JDBC connector including streaming for selects, bulk mode support for inserts, support for auto-generated-keys, Datasense support, as well as better support for dynamic queries.
Revisiting Large Dataset Retrieval in Mule with the new Mule features.
In this section I define a couple of ways to do batch jobs with the new database connector and the watermark feature.
In the first example I show how we can define a batch job that polls at specified intervals for the next values. A limitation of the first example might be that we want all the values available in the database to be polled at a specified time (especially when using cron). This is handled in the second example.
In order to start creating the batch job, we first need to define the Spring bean for the data source (in this case it uses the derby data source).
Then we need to specify the database configuration as follows:
Having specified the database configuration we can start creating the batch job.
First we need to define our input phase, this has a polling element (which allows us to specify either a Cron expression or a fixed frequency for the polling frequency) which polls a Database select.
To use a fixed frequency, specify the polling element as follows:
To use a cron scheduler, specify the polling element as follows:
Now we can define the query using the new database connector. With the new database connector there are three ways to do a Select query:
- ‘From Template’
In this case we use a ‘Parameterized’ query since our select statement does not need to be dynamic, and since we are not going to reuse the query in our application.
Our Polling element should now look something similar to the following:
Assuming we want to do a database query that will get us only the next items in the database we can use the watermark; in this case we can specify a selector and a selector-expression.
The “selector” attribute can take MIN, MAX, FIRST, LAST. In this example we use MAX since the auto generated number by the database is in ascending order. The selector expression takes the expression to get the required element. In this case; #[message.payload[‘key1’]].
We should also define the default-expression (the expression to be invoked at the first run) as well as the variable that we use to hold the watermark result (in this example we name it ‘Id’).
This should look similar to the following:
Furthermore we should amend our query to select only the elements that are greater than the value of the ‘Id’ variable.
The poll element should now look like this:
Now that we have the input phase defined we can start defining the processing phase.
As shown previously, each item on the list is transformed into a record during the ‘Load and Dispatch’ phase, and each record is then processed in the processing phase in a series of one or many batch steps.
We first need to group these items with a batch commit, the size attribute defines the number of items we want committed in each batch. After we define the batch commit, we can go ahead and define the database insert endpoint with a parametrized query. Since we want the results to be inserted in bulk we should set the bulk mode attribute on the database endpoint to true.
As an extra step, after this batch step we can also define a batch step which handles failing messages only (notice the accept-policy), for the sake of simplicity we are just going to define a logger.
As the processing phase is now defined, the only missing piece is the ‘on complete’ phase.
As described previously the on complete phase is used to gather the results of the batch we processed. Here we are going to use a logger to show this feature, however, I am sure that these values can be used much more creatively to create more awesome reporting in real world applications.
The following is the complete flow for this example:
Now that we have seen how we can implement this very easily, we can move on and discuss how we can refine this to replicate the same functional behaviour as the “Large Dataset Retrieval in Mule” blog post.
There are three major differences between the “Large Dataset Retrieval in Mule” blog post implementation and the implementation above:
- The Id is not being reset to ‘0’ after we complete the whole batch.
- We are not polling immediately after but we are waiting for the next polling frequency to be met.
- An HTTP endpoint is used to start the batch.
For the first requirement we need to change the watermark from using a selector expression to use an update-expression instead.
For the second requirement, we need:
- To make sure that the scheduler starts exactly after each other
- To make sure that no other scheduler starts before that poll completes.
The third requirement can be achieved by stopping and starting the batch input phase, however I do think that this is beyond the scope of this blog post.
The first requirement is pretty easy to implement, we just need to do a couple of simple changes.
The first change we need to do is to modify the water mark as follows:
We also need to fill in the “myId” to ‘0’ when no records are left to be processed and to the biggest value when further records exist such as the following:
We also need to modify the database query to be ordered on key1 in ascending order, as well as changing to filter values to be between the current id and the number of items we want to get:
The second requirement is a little bit more tricky to implement. For this requirement we we need to make sure that another poll does not happen during the current poll; to do this we should make sure that the scheduler is stopped just after the database select is done.
We also need to manually poll for further records in case the current result had records to process. In case there are no further records to be processed we need to make sure that the scheduler is kept running so it runs automatically within the specified interval.
This requires that we use a bit of custom Java code that gets the scheduler for us to get the required functionality to stop, start and schedule the poller.
This is the Java code I’ve used for this:
Please note that this assumes that there is only one batch name that starts with the same batch name and followed by ‘/’, however this can be refined further by using a regular expression.
The following is the whole batch job for the second example:
I hope you enjoyed this blog post.
(This post first published; 7th May 2014 on blog.ricston.com)