23 Jan 2009

Using the Camel Aggregator correctly

I was recently working on a project that required the use of a Camel aggregator. Since I had not used the aggregator much before I started by reading through the Camel Aggregator documentation. Some questions remained though.
In my use case I needed to aggregate every five incoming messages of the same format together into a new exchange. Sounds very simple but I did not find it that straight forward. And that's the motivation for this post.
To have an example at hand; let's say I want to aggregate messages of the following general XML format (with different values of course):

<artist genre='Alternative'>
<name>The White Stripes</name>
<album>Get Behind Me Satan</album>
<album>White Blood Cells</album>

Suppose that's the message format of the incoming messages. I will build the aggregator according to this format.
As mentioned in the Camel documentation, each aggregator is configured by a correlation expression and an aggregation strategy.
The correlation expression is used to define which messages should be aggregated together. When the correlation expression is being evaluated at runtime on a particular message, it returns a value that is used as the correlation id under which messages are being aggregated. All messages that evaluate to the same correlation id are aggregated together. A mistake I made initially was assuming that all messages that do not match my correlation expression will not be aggregated. But every correlation expression will always evaluate to a particular correlation id for every message. Suppose I want to aggregate all those messages that have an alternative genre (genre='Alternative') and I define my aggregation strategy accordingly (we will explore that later). All messages that have a different genre set will still be aggregated somehow, depending on what id the correlation expression evaluates to for these messages at runtime. We will see more detailed examples of that later, but this is important to keep in mind.
The aggregation strategy on the other hand contains the logic for combining all message exchanges into a single aggregated message. The default aggregation strategy is org.apache.camel.processor.aggregate.UseLatestAggregationStrategy, which only aggregates the latest incoming message, discarding older messages. This is great for throttling purposes but did not serve my use case.

Generally a Camel aggregator is set up in Java DSL as follows (I will only use Java examples, XML based configuration will be quite similar):

from("somewhere").aggregate("somehow").to("some destination");

Let's not care too much about the from(…) and to(…) part of this route, they will be defined by your business use case. It is the aggregate(…) part that we are interested in here.
As mentioned above the aggregator is configured via the correlation expression (which messages to aggregate together) and the aggregation strategy (how to aggregate messages). So it comes as no surprise that the aggregate() method can take the following parameters:

public AggregatorType aggregate(Expression correlationExpression, AggregationStrategy strategy);

When defining a correlation expression, one can either correlate on a particular message header or on the message content. Pretty much all examples I have seen so far did correlate messages on the existence of a particular message header element. In Camel headers are named and can take values - they are basically (name, value) pairs.
Suppose some of my messages also have a header called "MusicCollection" with a value "Yes" set. I can then aggregate my messages based on this header and define my correlation expression as follows:

.aggregate(header("MusicCollection"), …).

This correlation expression will evaluate to the value of the header "MusicCollection" at runtime (the String "Yes" in the above example). If no such header is set, the correlation expression will evaluate to null at runtime, which can still be used as a key for aggregating messages.
But what If I don't want to or cannot use message headers? Perhaps the message was received from an external system without any headers. I can then also correlate messages based on their message payload as well. Using an XPath expression I can test if a specific element or structure is present in the message payload. In my example the messages start with as the root element, so it seems natural to define a correlation expression like this:

.aggregate(xpath("/artist", …).

This however won't work as expected. Internally the aggregator will use the Camel XPathBuilder to evaluate the XPath expression and the above expression will return a new instance of org.w3c.dom.NodeList for every new message. The aggregation however can only occur on a correlation expression that evaluates to the same object or primitive value at runtime for all messages to be aggregated (as that object or primitive value is used internally as a HashMap key for storing the aggregated messages into exchanges). So we need to define an XPath expression that evaluates to the same object (or key) at runtime. The trick is to select a proper XPath expression and to cast the evaluation result into a type like java.lang.String, such as in the following example:

.aggregate(…).xpath("name(/artist)='artist'", String.class).

I had to move the XPath expression after the aggregate() method call because of the additional String.class argument. The above example will take the name of the root element of my message (which will be "artist") and test if it equals the string "artist". For the sample message shown in the beginning, this expression will evaluate to the String "true". For any other XML messages that don't start with , this XPath expression will evaluate to the String value "false" and these messages will still be aggregated according to my aggregation strategy.
Aggregating on only the root XML element does not seem very useful. I might want to aggregate on the genre attribute of the element instead. Here is the corresponding correlation expression:

.aggregate(…).xpath("/artist/@genre", String.class).

This does serve my use case. If I want to aggregate all messages with an "Alternative" genre I could use:

.aggregate(…).xpath("name(/artist[@genre='Alternative'])", String.class).

Notice this will aggregate all messages starting with together into a new exchange. All other messages that have a different genre will also be aggregated together into a different exchange, no matter what their genre value is. If you wondered why I use the XPath function name() here, then that is because using name() will return the String "artist" as the correlation key for the example message above (the name of the XML element that matches my XPath expression). If I omitted the name() function, it would return a string representation of the node set itself. In the case of my example message the correlation key would be the string "White Stripes Elephant Get Behind Me Satan White Blood Cells". Using "artist" as the key seems better somehow.
If you are unsure to what key your own correlation expression evaluates at runtime, place a breakpoint into method org.apache.camel.processor.aggregate.DefaultAggregationCollection.add() and debug your Camel route from there. Check the value of the computed correlationKey object.
You can of course define your correlation expression in different expression languages than XPath such as XQuery, SQL and others.

That should cover the correlation expression part of the aggregator. The second part deals with the aggregation strategy. Remember, the strategy defines how messages are aggregated together. The default strategy used by Camel is the UseLatestAggregationStrategy, which out of x received messages only takes the latest message. If however I want to aggregate multiple messages together into a single message, I need to write my own aggregation strategy. This isn't a big deal and the Camel aggregator documentation shows an example implementation. I have re-used that code for my example as well.

public class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newIn.getBody(String.class);
newIn.setBody(oldBody + newBody);
return newExchange;

In this implementation we extract the messages of both Exchanges old and new, then simply concatenate the two messages together and place the result on the new Exchange. Nothing fancy here. We can then instantiate this class and pass its reference into the aggregator:

.aggregate(…, new MyAggregationStrategy()).

So we are done, right? Well, what we have not defined yet is a criterion as to when the aggregation is done. There are again multiple ways of doing so. If I want to aggregate a fixed number of incoming messages, I can easily use the batch options:

.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).

In this example the aggregation will finish whenever it has processed 5 incoming messages no matter as to how they got aggregated (into which exchanges). Or I could specify a timeout after which the aggregated message is sent, irrespective of how many message have been aggregated:

.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).

If I want to finish the aggregation when there are 5 messages aggregated together into one new exchange, then a more complex logic is required, which I can specify using the completedPredicate() expression:

.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).

The example above assumes that there is a header called "aggregated" set and whenever its value reaches the number 5, the aggregation will finish. Of course some application level code needs to set this header and also increment the counter. This would typically be done in the custom aggregation strategy implementation, where increasing that number can be based on any application logic (see attached demo, which illustrates this approach).
I can also call a particular method on a Spring bean for deciding when to finish the aggregation by using a BeanExpression:

.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).
new BeanExpression("aggregatorStrategy", "isCompleted"))

This requires a Spring bean with name "aggregatorStrategy" being defined in the Camel Spring configuration (your camel-context.xml) which exposes a public method called isCompleted().

<bean id="aggregatorStrategy"

Check the attached demo for an example.
Finally, the entire aggregator code in Java DSL for my use case now reads:

aggregate(new MyAggregationStrategy()).
xpath("/artist/@genre", String.class).

Using a Camel aggregator can appear rather complex at first. But once you get to see some working examples it becomes much clearer (as with all things in life). I personally had some troubles getting the aggregator to work with an XPath based correlation expression. That part I wanted to share most of all in this article.

And finally here is the demo for this article. After extracting, follow the steps in README.txt to run it quickly.


Claus Ibsen said...

Great explanation of the aggregator. It does take a little while to get familiar with.

In Camel 1.5 there is a outBatchSize you can use to set to 5 so Camel will complete when it has aggregated 5 exchanges into a single Exchange.

James Strachan said...

Great post! :)

Torsten Mielke said...

Thanks for your feedback Claus. I actually tested the outBatchSize(5) option as well and found that it this makes the aggregator wait until it has created 5 new exchanges to be sent out in one go.
outBatchSize() does not care how many messages were aggregated into one new exchange but rather how many new exchanges were created while aggregating.
That is why I had to define my own completedPredicate in the end.

Anonymous said...

Hi Torsten.
How have you bean?

Great post! It helped a lot understanding a little better the aggregator in Camel.

I still have a doubt: How can I configure it to ignore late messages that belongs to a closed aggregate?

I have been using batchTimeout, and when a late message arrives, it is part of a news agregate. It is not desired in my case.

Cheers, man!

Italo Stefani
Vetta Technologies

Torsten Mielke said...

Thanks for your feedback Italo.

You are right, by using the batchTimeout you won't discard older messages, they will be gathered into a new exchange.

If you need to discard outdated messages, perhaps a custom aggregation strategy can help you.

Italo Giovani Stefani said...

Hi Torsten.
I found a really simple solution to ignore late messages. I used the idempotentConsumer component after the aggregation. I am not sure if it works for all cases, but it is a tip.

.aggregator(header("fool"), new MyAggregator()).batchTimeout(2000L)
.idempotentConsumer(header("fool"), new MyMessageIdRepository())

MyMessageIdRepository can bem implemented as following:

class MyMessageIdRepository implements MessageIdRepository{
  private static Map repository = new HashMap();
  public boolean contains(String messageId) {
      return true;
      repository.put(messageId, messageId);
    return false;

The idempotentConsumer component works as filter for duplicate messages. My aggregation is by a header, and this header is preserved after the aggregation. So, if a late message arrives, it will generate a new aggregate with this header, which has already passed by MyMessageIdRepository.


Italo Giovani Stefani

Mahesh Reddy said...

batchsize() will only consider nr of incoming messages to look at before flushing the "aggregate exchange"s to the next processor right?

it will not impact the nr of exchanges that have been grouped into one aggregate exchange right?

i.e, if u want to process 5 pop genre exchanges downstream, u can;t use batchsize()

Celebrating Life...

Torsten Mielke said...

Hello Mahes,

Please note this blog post refers to the older Aggregator implementation in Camel
From Camel 2.3 onwards the aggregator has been rewritten and you should look at this doc if you on 2.3 or higher: http://camel.apache.org/aggregator2.html.

For the old aggregator you're right, the batchSize only configures how many exchanges the aggregator should process before completing the aggregation, irrespective of how they got aggregated (into which exchanges).

From the older Aggregator doc:
"This is the number of incoming exchanges that is processed by the aggregator and when this threshold is reached the batch is completed"

In Camel 2.3 or higher the property is called completionSize.

Hope this helps.

Mahesh Reddy said...

My mistake.. didn't notice that this article was before 2.3

BTW, I found this Fusesource's doc which is pretty good

Thanks! Torsten

Kico (Henrique Lobo Weissmann) said...

Great exlanation, really usefull!