17 Dec 2009

Using Spring JMS Template for sending messages to ActiveMQ

Something that took me a long way to really understand. And I only got there by debugging through the low level Spring and ActiveMQ code.
There are various articles out there that warn you about using JmsTemplate for producing messages outside any container. A runtime container will usually pool your JMS resources (connection, session and producer), but when using JmsTemplate in a standalone application, no pooling is there per se. The JmsTemplate is certainly not responsible for pooling resources.

I have seen a number of applications that use JmsTemplate outside of any applicaton server container for producing messages. It seems to be more popular than using the plain JMS APIs. No wonder as Spring JMS provides such a nice layer of abstraction that shields all the low level JMS code, just like this snippet:

ConnectionFactory cf2 = new ConnectionFactory("tcp://localhost:61616");
JmsTemplate template = new JmsTemplate(cf2);
template.send("MyQueue", new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
TextMessage tm = session.createTextMessage();
tm.setText("A new message");
return tm;

However something that is easily forgotten is that this code will create a new JMS connection, consumer and producer for each call to JmsTemplate.send(). Creating these JMS objects are expensive operations and according to the JMS spec these resources are meant to be re-used and not re-created for every message. Performance is going to suffer dramatically when they are not re-used.
That’s pretty clear to most developers but very easily forgotten.

So what is really needed in order to pool all these JMS resources when sending messages with JmsTemplate? I personally was confused whether I need to configure some Jencks container in order to have the JMS resources pooled or could I use plain ActiveMQ connection factories and how does the configuration need to look like? The various documentations I found only added to that confusion.

The answer is pretty simple: All that is needed is to use the org.apache.activemq.pool.PooledConnectionFactory. It will serve JMS connections from its internal pool and wrap them in a PooledConnection instance. The purpose of the PooledConnection is to also pool any JMS sessions and producers that get created. The pool can be configured of course.
For using the JmsTemplate there is no need to configure any containers like Jencks and others. Also see http://activemq.apache.org/jmstemplate-gotchas.html for some additional information on this topic.
The following simple and complete Spring configuration is enough:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"

<bean id="connectionFactory"
<constructor-arg value="failover:(tcp://localhost:61616)"/>

<bean id="jmsTemplate"
<constructor-arg ref="connectionFactory"/>
<property name="sessionTransacted" value="false"/>
<property name="receiveTimeout" value="5000"/>

It pre-configures the JmsTemplate to use the ActiveMQ PooledConnectionFactory so it can be used straight away in a Java program:

package org.apache.activemq.test.jmstemplate;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class JmsTemplateTest {

public static void main(String[] args) throws Exception {

ApplicationContext ctx = new ClassPathXmlApplicationContext(
JmsTemplate template = (JmsTemplate) ctx.getBean("jmsTemplate");

for(int i=0; i<10; i++) {
template.send("MyQueue", new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
TextMessage tm = session.createTextMessage();
tm.setText("This is a test message");
return tm;

A Maven based test case is provided. It can be checked out using

svn checkout http://abloggerscode.googlecode.com/svn/trunk/JmsTemplateDemo

Once checked out, follow the instructions in README.txt.

Be careful when using Springs SingleConnectionFactory. It does re-use the underlying JMS connection but it does not re-use the JMS session and producer when configured to internally work with the ActiveMQConnectionFactory!

4 Nov 2009

Does ActiveMQ 5.3 broker get hung if there are millions of messages on a queue and direct JDBC is used?

That was an interesting ActiveMQ issue I had to work on last week.
We used ActiveMQ with direct JDBC persistence to a MySQL database.

And noticed that when we had 2 million persistent messages on queue A, we could not even process any messages on an empty queue B. It seemed the broker got hung but then it did react to any commands from the web or JMX console.

When looking into the cause of this more deeply we found this:

In ActiveMQ there is a cleanup task thread that periodically checks for expired or acknowledged messages. In case of using a direct JDBC persistence, this task runs the following SQL command against the JDBC database table where the JMS messages are stored:


Depending on the JDBC database used, that SQL statement might lock the entire database table during the run of that statement. This is certainly the case when using MySQL but not when using Apache Derby. Other JDBC databases might lock the entire table as well.

During that time no other persistent message (no matter for what queue) can be processed, as all JMS messages for all queues are stored in the same database table called activemq_msgs. For a database table with millions of messages, this might take several mins or more (I had to wait around 15 mins for that statement to complete).
Also, the cleanup task is scheduled to run at a fixed rate (every 2 mins or so), so once it completed it is already late on schedule and therefore gets kicked off again straight away, not leaving much time for other threads to insert new messages into the db. So the threads managing the other queues starve and might perhaps never get the processing time to process their messages.

This behavior can be confirmed by adding the following logging configuration


It will log the above SQL statement being executed for a long time and being called repeatedly as soon as it has finished.
Also when attaching jconsole to the broker and capturing a stack trace of the thread that is to process new messages, it should be waiting for a long time on the JDBC database driver to complete the SQL insert statement.

Name: QueueThread:queue://JMSLoadTest.queue.0
Total blocked: 1 Total waited: 0

Stack trace:
java.net.SocketInputStream.socketRead0(Native Method)

The broker itself is not hung but the cleanup task locks the entire JDBC database. If the broker is left running for a longer time, it will process new messages, but only a few within a longer time frame.
I raised MB-558 (mirrored in AMQ-2470) for this problem and Gary Tully fixed it by changing the clean up task from a running at a fixed rate to running at a fixed delay.

13 May 2009

Understanding Authentication and Authorization in ServiceMix

I spent a good few hours lately trying to understand how authentication and authorization work inside ServiceMix 3. It is this gathered knowledge that I want to share in this post. For those in a hurry, there's an executive summary at the end. For this article I assume you know the basic concepts of JAAS. Some reference material on JAAS can be found here and here.

Such complex topic is easier to understand if it is based on a valid use-case that others can reproduce easily. A pretty good candidate is the cxf-ws-security demo in ServiceMix 3.2. Anyone not familiar with that demo might want to build and run it prior to reading on.
In addition I encourage you to debug through the ServiceMix source code while you read through this article. Simply set the SERVICEMIX_DEBUG=1 environment variable and attach a Java debugger to ServiceMix. You can then place breakpoints into the relevant source code that I discuss below and walk through the call stack and source code as you read along.

I will not try to discuss all of the security aspects in ServiceMix 3 here as such scope is far too broad. Rather this article is based on running the cxf-ws-security demo and aims to explain how an incoming SOAP/HTTP request gets authenticated and authorized in ServiceMix 3.

Alright, the stage is set, let's start.
The demo's use-case is
External CXF Java client -> CXF-BC consumer -> CXF-SE component

as shown in the demo's README.txt. With regards to security we want to make sure that the client gets authenticated based on a username/password combination before verifying if the client has permissions to call the service (authorization). Sounds simple enough but it involves a good number of components as well we see next.

As documented in the cxf-ws-security demo, an external SOAP client sends a SOAP/HTTP request to the CXF-BC consumer component deployed in ServiceMix. The SOAP request includes a large WSSE Security header carrying a WS-Security UsernameToken profile among an XML-Signature and some XML-Encrypted content. I will not focus on XML-Signature and XML-Encryption here (some information on this regards was posted previously). For this discussion the UsernameToken profile is of most interest and is included in the SOAP request as follows (omitting some details for clarity):

<soap:Envelope xmlns:soap="..." >

It is this username/password combination in the SOAP security header that will be used for authentication inside ServiceMix.
So the request first hits the jetty server in the servicemix-cxf-bc component from where it gets passed into the CXF interceptor stack. There are a good couple of CXF interceptors configured by default. In our demo the list of CXF interceptors to be executed is as follows:

DEBUG - PhaseInterceptorChain:
receive [AttachmentInInterceptor, LoggingInInterceptor]
post-stream [StaxInInterceptor]
read [ReadHeadersInterceptor]
pre-protocol [MustUnderstandInterceptor, SAAJInInterceptor, WSS4JInInterceptor,
unmarshal [JbiOperationInterceptor]
pre-invoke [JbiInWsdl1Interceptor, JbiInInterceptor]
invoke [JbiInvokerInterceptor, UltimateReceiverMustUnderstandInterceptor]
post-invoke [JbiPostInvokerInterceptor, OutgoingChainInterceptor]

Those interceptors in italics are of primary interest for this discussion.
In the configuration of the demo's ws-security-cxfbc-su component (in ws-security-cxfbc-su /src/main/resources/xbean.xml) we explicitly added and configured the org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor that will perform the WS-Security functions. It is the first security relevant interceptor that we want to look at closer.


This interceptor handles the entire SOAP WS-Security header. In our example it needs to decrypt all the encrypted parts of the SOAP request, perform the XML-Signature check and extract the username/password information from the WS-Security header. What functions to perform is specified in xbean.xml, where this interceptor is configured (see the "action" list):

<bean class="org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor"
<entry key="action" value="Timestamp Signature Encrypt UsernameToken"/>
<entry key="..." .../>

The name tells it already, this interceptor uses Apache WSS4J to perform these WS-Security functions. This line of code inside CXF WSS4JInInterceptor.handleMessage() method calls into WSS4J:

Vector wsResult = getSecurityEngine().processSecurityHeader(

I will not cover the Apache WSS4J specifics here but want to mention that the WSS4JInInterceptor configuration includes a password callback implementation that is used to obtain password for decryption and UsernameToken verification. This is also specified in the xbean.xml of the servicemix-cxf-bc-su component:

<entry key="passwordCallbackClass"

See the Apache WSS4J documentation for more detailed information on WSS4J.

The outcome of each of the WS-Security function performed by WSS4J is stored in a java.util.Vector of org.apache.ws.security.WSSecurityEngineResult objects called wsResult that is then added as a property to the SoapMessage using the key WSHandlerConstants.RECV_RESULTS. This vector carries all the results from processing the WS-Security header, such as the principal, SAML token, X.509 certificates, etc.

Other interceptors that get invoked later will require these results for their own processing.

One of the vector elements will contain the result of parsing the WS-Security UsernameToken profile in form of a org.apache.ws.security.WSUsernameTokenPrincipal object instance. This hosts the username and password information that was received with the SOAP request.

That is roughly what the WSS4JInInterceptor does with our SOAP request.
The next CXF interceptor to be invoked is:

This one is invoked right after the WSSJ4InInterceptor and is rather complex. I will try my best to explain it in most simple terms.
It first of all extracts the WSHandlerConstants.RECV_RESULTS vector that was set by the previous WSS4JInInterceptor

// in JbiJAASInterceptor.handleMessage()
List<Object> results = (Vector<Object>)message.get(WSHandlerConstants.RECV_RESULTS);

and next checks if this Vector contains any org.apache.ws.security.WSUsernameTokenPrincipal object:

for (Iterator it = hr.getResults().iterator(); it.hasNext();) {
WSSecurityEngineResult er = (WSSecurityEngineResult) it.next();
if (er != null && er.getPrincipal() instanceof
WSUsernameTokenPrincipal p =

this.authenticationService.authenticate(subject, domain, p.getName(), p.getPassword());

If yes, then the principal gets added to the javax.security.auth.Subject instance and the authentication service gets invoked. It is this security Subject instance that will carry all credentials information after a successful authentication!

JAAS will be used now to perform the authentication.
At first a new JAAS javax.security.auth.login.LoginContext gets created while also registering a JAAS callback handler:

//in method authenticate()
LoginContext loginContext = new LoginContext(domain, subject, new CallbackHandler() {
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (int i = 0; i < callbacks.length; i++) {
if (callbacks[i] instanceof NameCallback) {
((NameCallback) callbacks[i]).setName(user);
} else if (callbacks[i] instanceof PasswordCallback && credentials instanceof String) {
((PasswordCallback) callbacks[i]).setPassword(((String) credentials).toCharArray());
} else if (callbacks[i] instanceof CertificateCallback && credentials instanceof X509Certificate) {
((CertificateCallback) callbacks[i]).setCertificate((X509Certificate) credentials);
} else {
throw new UnsupportedCallbackException(callbacks[i]);

This callback handler that we pass into the LoginContext constructor is able to deal with different types of callbacks (see the handle() method) and will be invoked later in order to take the username and password of the incoming request and place it into the callback object.
Once the LoginContext got instantiated, we can perform the authentication by calling


This API call internally invokes the configured JAAS login modules to perform their respective types of authentication (username/password based or certificate based, etc). If the call to login() returns without throwing an exception, the overall authentication succeeded. It is therefore the login module implementation that really decides whether a client is authenticated or not.

How are the login modules configured resolved and configured at runtime? Well, the domain argument of the LoginContext constructor specifies the index into a configuration that determines which LoginModules to use for the authentication. In our case the domain value will be "servicemix-domain", which matches the domain defined in conf/login.properties:

/* conf/login.properties */
servicemix-domain {


By default this registers two login modules which will be invoked in order until the call to login() succeeds. Actually the JAAS javax.security.auth.login.LoginModule will invoke initialize(), login() and commit() on the configured login modules in this order. Each login module class gets configured for a bunch of config files (we'll explore that later).

So the call to loginContext.login() internally invokes

  • PropertiesLoginModule.initialize()

  • PropertiesLoginModule.login()

  • PropertiesLoginModule.commit()

in this order. If the call to any of these methods raises an exception, the CertificatesLoginModule will be tried. If the CertificatesLoginModule also raises an exception, then the authentication fails.
The various classes used in JAASAuthenticationService.authenticate() method (LoginContext, LoginModule, CallbackHandler, etc) are pure JAAS terminology and not ServiceMix specific. If you are not familiar with these, check the JAAS documentation.

Time to examine the org.apache.servicemix.jbi.security.login.PropertiesLoginModule next and see how it performs the authentication.

This JAAS login module is used for username/password based login and is configured for two property files in conf/login.properties. The file users-passwords.properties (to be found in your ServiceMix conf/ directory) stores usernames and their passwords. The other properties file groups.properties maps users to their roles. If authentication succeeds, each authenticated user gets one or more roles assigned, according to this properties file. The authorization decision that is performed later will be done on the user's role, not the user's name (role based access control)!
PropertiesLoginModule.initialize() does not do too much, it only resolves the two property file names from the login module configuration in login.properties.
In PropertiesLoginModule.login() these files get loaded into memory. Notice that any changes to either users-passwords.properties or groups.properties will be reloaded when dealing with the next invocation! So you can adjust the user and role mapping at runtime.
The next step is to instantiate a javax.security.auth.callback.Callback array containing a NameCallback and a PasswordCallback.

// in method PropertiesLoginModule.login()
Callback[] callbacks = new Callback[2];
callbacks[0] = new NameCallback("Username: ");
callbacks[1] = new PasswordCallback("Password: ", false);

These callbacks are then passed into the callback handler that was created with the LoginContext earlier in JAASAuthenticationService.authenticate() (see previous sample code above):

//in method PropertiesLoginModule.login()

The callback handler now iterates through all the callbacks (in our case two, namely the NameCallback and PasswordCallback) and checks of what type they are. Check the code that I quoted above already:

//in method JAASAuthenticationService.authenticate()
LoginContext loginContext = new LoginContext(domain, subject, new CallbackHandler() {
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (int i = 0; i < callbacks.length; i++) {
if (callbacks[i] instanceof NameCallback) {
((NameCallback) callbacks[i]).setName(user);
} else if (callbacks[i] instanceof PasswordCallback && credentials instanceof String) {
((PasswordCallback) callbacks[i]).setPassword(((String) credentials).toCharArray());
} else if (callbacks[i] instanceof CertificateCallback && credentials instanceof X509Certificate) {
((CertificateCallback) callbacks[i]).setCertificate((X509Certificate) credentials);
} else {
throw new UnsupportedCallbackException(callbacks[i]);

For the NameCallback it sets the username that was extracted from the WS UsernameToken profile of the incoming SOAP request. And for the PasswordCallback it sets the corresponding password. If the callback was of type CertificateCallback, it would set the X.509 certificate of the incoming WSSE security header (this is not the case in our example).
Next we jump back into our PropertiesLoginModule.login() method. The two callback objects now contain the client's username and password (as sent with the SOAP request), so we can authenticate them.
The rest is very simple. The username and password extracted from the callback are compared to the user/password combinations loaded from users-passwords.properties and if they don't math an FailedLoginException is raised.

// method PropertiesLoginModule.login() continued
user = ((NameCallback) callbacks[0]).getName();
char[] tmpPassword = ((PasswordCallback) callbacks[1]).getPassword();
if (tmpPassword == null) {
tmpPassword = new char[0];
String password = users.getProperty(user);
if (password == null) {
throw new FailedLoginException("User does not exist");
if (!password.equals(new String(tmpPassword))) {
throw new FailedLoginException("Password does not match");

It is this code where the authentication either succeeds or fails. If authentication fails, a FailedLoginException is raised, causing the call to PropertiesLoginModule.abort(), otherwise PropertiesLoginModule.commit() is invoked. Inside commit() we only add the relevant security roles from groups.properties to the authenticated user and add the entire authenticated principal to the java security Subject:

public boolean commit() throws LoginException {
principals.add(new UserPrincipal(user));

for (Enumeration enumeration = groups.keys(); enumeration.hasMoreElements();) {
String name = (String) enumeration.nextElement();
String[] userList = ((String) groups.getProperty(name) + "").split(",");
for (int i = 0; i < userList.length; i++) {
if (user.equals(userList[i])) {
principals.add(new GroupPrincipal(name));

The Java security Subject now carries all credential information including its user roles.
The list of roles will be required at last to perform authorization.

Note: All these operations are performed from within JbiJAASInterceptor.handleMessage(). This interceptor handles all the complex JAAS authentication part. Thus when this interceptor finishes, the client request is either authenticated or rejected. The remaining CXF interceptors can now be invoked but they won't perform any security related functions.

Finally the
will use the org.apache.servicemix.jbi.security.SecuredBroker to place the constructed JBI message onto the ServiceMix Normalized Message Router (NMR). But before doing so, the SecuredBroker will check if the caller is authorized. So this is the final authorization.

Before the message is sent to the NMR, the SecuredBroker checks if the caller is authorized. Once the JBI message containing the payload of the SOAP message is put onto the NMR it will be delivered to the target service (in our case the demo's ws-security-cxfse-su component) without performing any security checks. So this authorization is the final security operation in our scenario.

The information about what roles are allowed to invoke on what services are specified in conf/security.xml in your ServiceMix installation. Out of the box there is an <authorizationMap> entry as follows:

<sm:authorizationMap id="authorizationMap">
<sm:authorizationEntry service="*:*" roles="*" />

So out of the box anyone can invoke on any service. This basically turns off authorization. The source code actually checks if roles="*" and bypasses authorization completely:

// inside SecuredBroker.sendExchangePacket()
if (!acls.contains(GroupPrincipal.ANY)) {
//perform authorization, details omitted.

... whereby GroupPrincipal.ANY resolves to "*".
If you want to enable authorization you need to specify one or more authorization entries in security.xml where the role are not just "*", meaning everyone. You would generally list some of the role names from your conf/groups.properties file.
Of course you can fine grain access control to a particular service by specifying the service attribute in the <authorizationEntry> element. The syntax is
<sm:authorizationEntry service="{namespace}:servicename" roles="admin"/>

as typically defined in your xbean.xml service configuration. So in our example, in order to limit access to only the administrator role for the cxf-se service, you would specify:

<sm:authorizationEntry service="{http://apache.org/hello_world_soap_http}:SOAPServiceWSSecurity"
roles="admin" />

So this list of authorization entry configuration from security.xml is available to the SecuredBroker. On the other hand it also has access to the javax.security.auth.Subject instance that contains all the clients security credentials including the authenticated user roles and it can extract the service name of the target service to invoke on from the JBI MessageExchange. Hence it is just a question of checking that the user role in the security Subject is allowed to invoke the service named in the MessageExchange. This check will be performed using the authorization configuration from security.xml.
If the caller is not authorized a security exception will be thrown, otherwise the exchanges gets put onto the NMR from where it will be routed to the right service.
That's it. In case the authorization succeeds the JBI message is put onto the NMR from where it will be routed and dispatched to the target service.

Executive Summary:

  • The WSS4JInInterceptor runs the WS-Security related functions, extract WS username and password from the SOAP security header and puts it into a WSUsernameTokenPrincipal object.
  • The JbiJAASInterceptor is the next CXF interceptor to invoke on and checks for the presence of a WSUsernameTokenPrincipal, extracts the credential information and invokes on the JAAS authentication service.

  • The JAAS authentication service uses the configured login module class (PropertiesLoginModule as set in conf/login.properties) to delegate the authentication decision.

  • The PropertiesLoginModules calls back on the registered JAAS callback handler to retrieve the username/password and authenticates these credentials against the definitions in user-passwords.properties. If authentication succeeds, it assigns the list of roles to this authenticated Subject. The call stack returns all the way back to the JbiJAASInterceptor.

  • The JbiInvokerInterceptor dispatches the request and uses the SecuredBroker to send the message exchange to the NMR.

  • The SecuredBroker performs authorization based on the user role names stored in the Java security Subject and based on the authorization configuration made in conf/security.xml.

I hope I could spot some light onto the mysteries of authentication/authorization in ServiceMix 3.
The ServiceMix documentation does unfortunately not document these concepts in much detail.

Looking forward to any feedback from you.

16 Apr 2009

Slower performance with the new servicemix-jms endpoints?

There are two servicemix-jms components available in the later versions of ServiceMix 3. The traditional jms endpoint that is configured using <jms:endpoint> and the new jms endpoint that uses <jms:consumer>; and <jms:provider>. The new JMS endpoints reuse much of Spring JMS. Very recently I noticed a pretty large performance difference between these two JMS consumers, in the way that the new servicemix-jms consumer was much slower than the traditional consumer while using a pretty default configuration for both.
The reason for that is that out of the box the new JMS endpoint is configured to use the Spring JMS DefaultMessageListenerContainer without any caching, so all JMS resources get re-created and closed for every message being sent or received. This of course degrades performance a lot.

There are two simple solutions:

1) Configure caching for this DMLC

<jms:consumer service="test: NewJMSConsumer"

Using cacheLevel="3" (CACHE_CONSUMER) is the highest cache option and caches the pretty much all JMS resources for each listener thread (JMS connection, session and message consumer). Other levels are 2 (CACHE_SESSION ), 1 (CACHE_CONNECTION) and 0 (CACHE_NONE). If no cacheLevel is specified, CACHE_NONE is used!

2) Or configure for a different Spring JMS listener

<jms:consumer service="test:NewJMSConsumer"

The ServerSessionMessageListenerContainer type internally uses a ServerSessionPool (which as the name suggests pools the server sessions) and also results in reasonable performance. However this listener type might not be suitable for all deployments.

Both options should boost performance and perform equally fast as the old JMS endpoint. These jms consumer attributes are both documented but it is easy to overlook them and just use the default configuration.
I recommend solution 1, using the DefaultMessageListenerContainer and enable caching.

Finally there seems no reason why caching could not be turned on by default. So I raised JIRA SM-1841 at Apache.

23 Jan 2009

Using the Camel Aggregator correctly

I was recently working on a project that required the use of a Camel aggregator. Since I had not used the aggregator much before I started by reading through the Camel Aggregator documentation. Some questions remained though.
In my use case I needed to aggregate every five incoming messages of the same format together into a new exchange. Sounds very simple but I did not find it that straight forward. And that's the motivation for this post.
To have an example at hand; let's say I want to aggregate messages of the following general XML format (with different values of course):

<artist genre='Alternative'>
<name>The White Stripes</name>
<album>Get Behind Me Satan</album>
<album>White Blood Cells</album>

Suppose that's the message format of the incoming messages. I will build the aggregator according to this format.
As mentioned in the Camel documentation, each aggregator is configured by a correlation expression and an aggregation strategy.
The correlation expression is used to define which messages should be aggregated together. When the correlation expression is being evaluated at runtime on a particular message, it returns a value that is used as the correlation id under which messages are being aggregated. All messages that evaluate to the same correlation id are aggregated together. A mistake I made initially was assuming that all messages that do not match my correlation expression will not be aggregated. But every correlation expression will always evaluate to a particular correlation id for every message. Suppose I want to aggregate all those messages that have an alternative genre (genre='Alternative') and I define my aggregation strategy accordingly (we will explore that later). All messages that have a different genre set will still be aggregated somehow, depending on what id the correlation expression evaluates to for these messages at runtime. We will see more detailed examples of that later, but this is important to keep in mind.
The aggregation strategy on the other hand contains the logic for combining all message exchanges into a single aggregated message. The default aggregation strategy is org.apache.camel.processor.aggregate.UseLatestAggregationStrategy, which only aggregates the latest incoming message, discarding older messages. This is great for throttling purposes but did not serve my use case.

Generally a Camel aggregator is set up in Java DSL as follows (I will only use Java examples, XML based configuration will be quite similar):

from("somewhere").aggregate("somehow").to("some destination");

Let's not care too much about the from(…) and to(…) part of this route, they will be defined by your business use case. It is the aggregate(…) part that we are interested in here.
As mentioned above the aggregator is configured via the correlation expression (which messages to aggregate together) and the aggregation strategy (how to aggregate messages). So it comes as no surprise that the aggregate() method can take the following parameters:

public AggregatorType aggregate(Expression correlationExpression, AggregationStrategy strategy);

When defining a correlation expression, one can either correlate on a particular message header or on the message content. Pretty much all examples I have seen so far did correlate messages on the existence of a particular message header element. In Camel headers are named and can take values - they are basically (name, value) pairs.
Suppose some of my messages also have a header called "MusicCollection" with a value "Yes" set. I can then aggregate my messages based on this header and define my correlation expression as follows:

.aggregate(header("MusicCollection"), …).

This correlation expression will evaluate to the value of the header "MusicCollection" at runtime (the String "Yes" in the above example). If no such header is set, the correlation expression will evaluate to null at runtime, which can still be used as a key for aggregating messages.
But what If I don't want to or cannot use message headers? Perhaps the message was received from an external system without any headers. I can then also correlate messages based on their message payload as well. Using an XPath expression I can test if a specific element or structure is present in the message payload. In my example the messages start with as the root element, so it seems natural to define a correlation expression like this:

.aggregate(xpath("/artist", …).

This however won't work as expected. Internally the aggregator will use the Camel XPathBuilder to evaluate the XPath expression and the above expression will return a new instance of org.w3c.dom.NodeList for every new message. The aggregation however can only occur on a correlation expression that evaluates to the same object or primitive value at runtime for all messages to be aggregated (as that object or primitive value is used internally as a HashMap key for storing the aggregated messages into exchanges). So we need to define an XPath expression that evaluates to the same object (or key) at runtime. The trick is to select a proper XPath expression and to cast the evaluation result into a type like java.lang.String, such as in the following example:

.aggregate(…).xpath("name(/artist)='artist'", String.class).

I had to move the XPath expression after the aggregate() method call because of the additional String.class argument. The above example will take the name of the root element of my message (which will be "artist") and test if it equals the string "artist". For the sample message shown in the beginning, this expression will evaluate to the String "true". For any other XML messages that don't start with , this XPath expression will evaluate to the String value "false" and these messages will still be aggregated according to my aggregation strategy.
Aggregating on only the root XML element does not seem very useful. I might want to aggregate on the genre attribute of the element instead. Here is the corresponding correlation expression:

.aggregate(…).xpath("/artist/@genre", String.class).

This does serve my use case. If I want to aggregate all messages with an "Alternative" genre I could use:

.aggregate(…).xpath("name(/artist[@genre='Alternative'])", String.class).

Notice this will aggregate all messages starting with together into a new exchange. All other messages that have a different genre will also be aggregated together into a different exchange, no matter what their genre value is. If you wondered why I use the XPath function name() here, then that is because using name() will return the String "artist" as the correlation key for the example message above (the name of the XML element that matches my XPath expression). If I omitted the name() function, it would return a string representation of the node set itself. In the case of my example message the correlation key would be the string "White Stripes Elephant Get Behind Me Satan White Blood Cells". Using "artist" as the key seems better somehow.
If you are unsure to what key your own correlation expression evaluates at runtime, place a breakpoint into method org.apache.camel.processor.aggregate.DefaultAggregationCollection.add() and debug your Camel route from there. Check the value of the computed correlationKey object.
You can of course define your correlation expression in different expression languages than XPath such as XQuery, SQL and others.

That should cover the correlation expression part of the aggregator. The second part deals with the aggregation strategy. Remember, the strategy defines how messages are aggregated together. The default strategy used by Camel is the UseLatestAggregationStrategy, which out of x received messages only takes the latest message. If however I want to aggregate multiple messages together into a single message, I need to write my own aggregation strategy. This isn't a big deal and the Camel aggregator documentation shows an example implementation. I have re-used that code for my example as well.

public class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newIn.getBody(String.class);
newIn.setBody(oldBody + newBody);
return newExchange;

In this implementation we extract the messages of both Exchanges old and new, then simply concatenate the two messages together and place the result on the new Exchange. Nothing fancy here. We can then instantiate this class and pass its reference into the aggregator:

.aggregate(…, new MyAggregationStrategy()).

So we are done, right? Well, what we have not defined yet is a criterion as to when the aggregation is done. There are again multiple ways of doing so. If I want to aggregate a fixed number of incoming messages, I can easily use the batch options:

.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).

In this example the aggregation will finish whenever it has processed 5 incoming messages no matter as to how they got aggregated (into which exchanges). Or I could specify a timeout after which the aggregated message is sent, irrespective of how many message have been aggregated:

.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).

If I want to finish the aggregation when there are 5 messages aggregated together into one new exchange, then a more complex logic is required, which I can specify using the completedPredicate() expression:

.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).

The example above assumes that there is a header called "aggregated" set and whenever its value reaches the number 5, the aggregation will finish. Of course some application level code needs to set this header and also increment the counter. This would typically be done in the custom aggregation strategy implementation, where increasing that number can be based on any application logic (see attached demo, which illustrates this approach).
I can also call a particular method on a Spring bean for deciding when to finish the aggregation by using a BeanExpression:

.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).
new BeanExpression("aggregatorStrategy", "isCompleted"))

This requires a Spring bean with name "aggregatorStrategy" being defined in the Camel Spring configuration (your camel-context.xml) which exposes a public method called isCompleted().

<bean id="aggregatorStrategy"

Check the attached demo for an example.
Finally, the entire aggregator code in Java DSL for my use case now reads:

aggregate(new MyAggregationStrategy()).
xpath("/artist/@genre", String.class).

Using a Camel aggregator can appear rather complex at first. But once you get to see some working examples it becomes much clearer (as with all things in life). I personally had some troubles getting the aggregator to work with an XPath based correlation expression. That part I wanted to share most of all in this article.

And finally here is the demo for this article. After extracting, follow the steps in README.txt to run it quickly.