4 Nov 2009

Does ActiveMQ 5.3 broker get hung if there are millions of messages on a queue and direct JDBC is used?


That was an interesting ActiveMQ issue I had to work on last week.
We used ActiveMQ 5.3.0.3-fuse with direct JDBC persistence to a MySQL database.

And noticed that when we had 2 million persistent messages on queue A, we could not even process any messages on an empty queue B. It seemed the broker got hung but then it did react to any commands from the web or JMX console.

When looking into the cause of this more deeply we found this:

In ActiveMQ there is a cleanup task thread that periodically checks for expired or acknowledged messages. In case of using a direct JDBC persistence, this task runs the following SQL command against the JDBC database table where the JMS messages are stored:


DELETE FROM ACTIVEMQ_MSGS WHERE ( EXPIRATION<>0 AND EXPIRATIONOR ID <=
( SELECT min(ACTIVEMQ_ACKS.LAST_ACKED_ID) FROM ACTIVEMQ_ACKS WHERE
ACTIVEMQ_ACKS.CONTAINER=ACTIVEMQ_MSGS.CONTAINER)


Depending on the JDBC database used, that SQL statement might lock the entire database table during the run of that statement. This is certainly the case when using MySQL but not when using Apache Derby. Other JDBC databases might lock the entire table as well.

During that time no other persistent message (no matter for what queue) can be processed, as all JMS messages for all queues are stored in the same database table called activemq_msgs. For a database table with millions of messages, this might take several mins or more (I had to wait around 15 mins for that statement to complete).
Also, the cleanup task is scheduled to run at a fixed rate (every 2 mins or so), so once it completed it is already late on schedule and therefore gets kicked off again straight away, not leaving much time for other threads to insert new messages into the db. So the threads managing the other queues starve and might perhaps never get the processing time to process their messages.

This behavior can be confirmed by adding the following logging configuration


log4j.logger.org.apache.activemq.store.jdbc=DEBUG


It will log the above SQL statement being executed for a long time and being called repeatedly as soon as it has finished.
Also when attaching jconsole to the broker and capturing a stack trace of the thread that is to process new messages, it should be waiting for a long time on the JDBC database driver to complete the SQL insert statement.


Name: QueueThread:queue://JMSLoadTest.queue.0
State: RUNNABLE
Total blocked: 1 Total waited: 0

Stack trace:
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:129)
com.mysql.jdbc.util.ReadAheadInputStream.fill(ReadAheadInputStream.java:113)
com.mysql.jdbc.util.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:160)
com.mysql.jdbc.util.ReadAheadInputStream.read(ReadAheadInputStream.java:188)
com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:2428)
com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2882)
com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:2871)
com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3414)
com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:1936)
com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2060)
com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2542)
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1734)
com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1876)
org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
org.apache.commons.dbcp.DelegatingPreparedStatement.executeQuery(DelegatingPreparedStatement.java:91)
org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecoverNextMessages(DefaultJDBCAdapter.java:709)
org.apache.activemq.store.jdbc.JDBCMessageStore.recoverNextMessages(JDBCMessageStore.java:230)
org.apache.activemq.store.ProxyMessageStore.recoverNextMessages(ProxyMessageStore.java:83)
org.apache.activemq.broker.region.cursors.QueueStorePrefetch.doFillBatch(QueueStorePrefetch.java:75)
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:210)
org.apache.activemq.broker.region.cursors.AbstractStoreCursor.hasNext(AbstractStoreCursor.java:119)
org.apache.activemq.broker.region.cursors.StoreQueueCursor.hasNext(StoreQueueCursor.java:131)
org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1243)
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1378)
org.apache.activemq.broker.region.Queue.iterate(Queue.java:1086)
org.apache.activemq.thread.DeterministicTaskRunner.runTask(DeterministicTaskRunner.java:84)
org.apache.activemq.thread.DeterministicTaskRunner$1.run(DeterministicTaskRunner.java:41)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
java.lang.Thread.run(Thread.java:595)


The broker itself is not hung but the cleanup task locks the entire JDBC database. If the broker is left running for a longer time, it will process new messages, but only a few within a longer time frame.
I raised MB-558 (mirrored in AMQ-2470) for this problem and Gary Tully fixed it by changing the clean up task from a running at a fixed rate to running at a fixed delay.

13 May 2009

Understanding Authentication and Authorization in ServiceMix


I spent a good few hours lately trying to understand how authentication and authorization work inside ServiceMix 3. It is this gathered knowledge that I want to share in this post. For those in a hurry, there's an executive summary at the end. For this article I assume you know the basic concepts of JAAS. Some reference material on JAAS can be found here and here.

Such complex topic is easier to understand if it is based on a valid use-case that others can reproduce easily. A pretty good candidate is the cxf-ws-security demo in ServiceMix 3.2. Anyone not familiar with that demo might want to build and run it prior to reading on.
In addition I encourage you to debug through the ServiceMix source code while you read through this article. Simply set the SERVICEMIX_DEBUG=1 environment variable and attach a Java debugger to ServiceMix. You can then place breakpoints into the relevant source code that I discuss below and walk through the call stack and source code as you read along.

I will not try to discuss all of the security aspects in ServiceMix 3 here as such scope is far too broad. Rather this article is based on running the cxf-ws-security demo and aims to explain how an incoming SOAP/HTTP request gets authenticated and authorized in ServiceMix 3.

Alright, the stage is set, let's start.
The demo's use-case is
External CXF Java client -> CXF-BC consumer -> CXF-SE component

as shown in the demo's README.txt. With regards to security we want to make sure that the client gets authenticated based on a username/password combination before verifying if the client has permissions to call the service (authorization). Sounds simple enough but it involves a good number of components as well we see next.

As documented in the cxf-ws-security demo, an external SOAP client sends a SOAP/HTTP request to the CXF-BC consumer component deployed in ServiceMix. The SOAP request includes a large WSSE Security header carrying a WS-Security UsernameToken profile among an XML-Signature and some XML-Encrypted content. I will not focus on XML-Signature and XML-Encryption here (some information on this regards was posted previously). For this discussion the UsernameToken profile is of most interest and is included in the SOAP request as follows (omitting some details for clarity):

<soap:Envelope xmlns:soap="..." >
<soap:Header>
...
<wsse:Security>
<wsse:UsernameToken>
<wsse:Username>alice</wsse:Username>
<wsse:Password>password</wsse:Password>
</wsse:UsernameToken>
...

It is this username/password combination in the SOAP security header that will be used for authentication inside ServiceMix.
So the request first hits the jetty server in the servicemix-cxf-bc component from where it gets passed into the CXF interceptor stack. There are a good couple of CXF interceptors configured by default. In our demo the list of CXF interceptors to be executed is as follows:

DEBUG - PhaseInterceptorChain:
receive [AttachmentInInterceptor, LoggingInInterceptor]
post-stream [StaxInInterceptor]
read [ReadHeadersInterceptor]
pre-protocol [MustUnderstandInterceptor, SAAJInInterceptor, WSS4JInInterceptor,
JbiJAASInterceptor]
unmarshal [JbiOperationInterceptor]
pre-invoke [JbiInWsdl1Interceptor, JbiInInterceptor]
invoke [JbiInvokerInterceptor, UltimateReceiverMustUnderstandInterceptor]
post-invoke [JbiPostInvokerInterceptor, OutgoingChainInterceptor]

Those interceptors in italics are of primary interest for this discussion.
In the configuration of the demo's ws-security-cxfbc-su component (in ws-security-cxfbc-su /src/main/resources/xbean.xml) we explicitly added and configured the org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor that will perform the WS-Security functions. It is the first security relevant interceptor that we want to look at closer.

org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor.handleMessage()

This interceptor handles the entire SOAP WS-Security header. In our example it needs to decrypt all the encrypted parts of the SOAP request, perform the XML-Signature check and extract the username/password information from the WS-Security header. What functions to perform is specified in xbean.xml, where this interceptor is configured (see the "action" list):

<bean class="org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor"
id="WSS4J">
<constructor-arg>
<map>
<entry key="action" value="Timestamp Signature Encrypt UsernameToken"/>
<entry key="..." .../>
...
</map>
</constructor-arg>
</bean>

The name tells it already, this interceptor uses Apache WSS4J to perform these WS-Security functions. This line of code inside CXF WSS4JInInterceptor.handleMessage() method calls into WSS4J:

Vector wsResult = getSecurityEngine().processSecurityHeader(
doc.getSOAPPart(),
actor,
cbHandler,
reqData.getSigCrypto(),
reqData.getDecCrypto()
);

I will not cover the Apache WSS4J specifics here but want to mention that the WSS4JInInterceptor configuration includes a password callback implementation that is used to obtain password for decryption and UsernameToken verification. This is also specified in the xbean.xml of the servicemix-cxf-bc-su component:

<entry key="passwordCallbackClass"
value="org.apache.servicemix.samples.cxf_ws_security.KeystorePasswordCallback"/>

See the Apache WSS4J documentation for more detailed information on WSS4J.

The outcome of each of the WS-Security function performed by WSS4J is stored in a java.util.Vector of org.apache.ws.security.WSSecurityEngineResult objects called wsResult that is then added as a property to the SoapMessage using the key WSHandlerConstants.RECV_RESULTS. This vector carries all the results from processing the WS-Security header, such as the principal, SAML token, X.509 certificates, etc.

Other interceptors that get invoked later will require these results for their own processing.

One of the vector elements will contain the result of parsing the WS-Security UsernameToken profile in form of a org.apache.ws.security.WSUsernameTokenPrincipal object instance. This hosts the username and password information that was received with the SOAP request.

That is roughly what the WSS4JInInterceptor does with our SOAP request.
The next CXF interceptor to be invoked is:

org.apache.servicemix.cxfbc.interceptors.JbiJAASInterceptor.handleMessage()
This one is invoked right after the WSSJ4InInterceptor and is rather complex. I will try my best to explain it in most simple terms.
It first of all extracts the WSHandlerConstants.RECV_RESULTS vector that was set by the previous WSS4JInInterceptor

// in JbiJAASInterceptor.handleMessage()
List<Object> results = (Vector<Object>)message.get(WSHandlerConstants.RECV_RESULTS);

and next checks if this Vector contains any org.apache.ws.security.WSUsernameTokenPrincipal object:

for (Iterator it = hr.getResults().iterator(); it.hasNext();) {
WSSecurityEngineResult er = (WSSecurityEngineResult) it.next();
if (er != null && er.getPrincipal() instanceof
WSUsernameTokenPrincipal){
WSUsernameTokenPrincipal p =
(WSUsernameTokenPrincipal)er.getPrincipal();
subject.getPrincipals().add(p);

this.authenticationService.authenticate(subject, domain, p.getName(), p.getPassword());
}
}

If yes, then the principal gets added to the javax.security.auth.Subject instance and the authentication service gets invoked. It is this security Subject instance that will carry all credentials information after a successful authentication!

org.apache.servicemix.jbi.security.auth.impl.JAASAuthenticationService.authenticate()
JAAS will be used now to perform the authentication.
At first a new JAAS javax.security.auth.login.LoginContext gets created while also registering a JAAS callback handler:

//in method authenticate()
LoginContext loginContext = new LoginContext(domain, subject, new CallbackHandler() {
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (int i = 0; i < callbacks.length; i++) {
if (callbacks[i] instanceof NameCallback) {
((NameCallback) callbacks[i]).setName(user);
} else if (callbacks[i] instanceof PasswordCallback && credentials instanceof String) {
((PasswordCallback) callbacks[i]).setPassword(((String) credentials).toCharArray());
} else if (callbacks[i] instanceof CertificateCallback && credentials instanceof X509Certificate) {
((CertificateCallback) callbacks[i]).setCertificate((X509Certificate) credentials);
} else {
throw new UnsupportedCallbackException(callbacks[i]);
}
}
}
});

This callback handler that we pass into the LoginContext constructor is able to deal with different types of callbacks (see the handle() method) and will be invoked later in order to take the username and password of the incoming request and place it into the callback object.
Once the LoginContext got instantiated, we can perform the authentication by calling

loginContext.login();

This API call internally invokes the configured JAAS login modules to perform their respective types of authentication (username/password based or certificate based, etc). If the call to login() returns without throwing an exception, the overall authentication succeeded. It is therefore the login module implementation that really decides whether a client is authenticated or not.

How are the login modules configured resolved and configured at runtime? Well, the domain argument of the LoginContext constructor specifies the index into a configuration that determines which LoginModules to use for the authentication. In our case the domain value will be "servicemix-domain", which matches the domain defined in conf/login.properties:

/* conf/login.properties */
servicemix-domain {
org.apache.servicemix.jbi.security.login.PropertiesLoginModule
sufficient
org.apache.servicemix.security.properties.user="users-passwords.properties"
org.apache.servicemix.security.properties.group="groups.properties";

org.apache.servicemix.jbi.security.login.CertificatesLoginModule
sufficient
org.apache.servicemix.security.certificates.user="users-credentials.properties"
org.apache.servicemix.security.certificates.group="groups.properties";
};

By default this registers two login modules which will be invoked in order until the call to login() succeeds. Actually the JAAS javax.security.auth.login.LoginModule will invoke initialize(), login() and commit() on the configured login modules in this order. Each login module class gets configured for a bunch of config files (we'll explore that later).

So the call to loginContext.login() internally invokes

  • PropertiesLoginModule.initialize()

  • PropertiesLoginModule.login()

  • PropertiesLoginModule.commit()


in this order. If the call to any of these methods raises an exception, the CertificatesLoginModule will be tried. If the CertificatesLoginModule also raises an exception, then the authentication fails.
The various classes used in JAASAuthenticationService.authenticate() method (LoginContext, LoginModule, CallbackHandler, etc) are pure JAAS terminology and not ServiceMix specific. If you are not familiar with these, check the JAAS documentation.

Time to examine the org.apache.servicemix.jbi.security.login.PropertiesLoginModule next and see how it performs the authentication.


org.apache.servicemix.jbi.security.login.PropertiesLoginModule
This JAAS login module is used for username/password based login and is configured for two property files in conf/login.properties. The file users-passwords.properties (to be found in your ServiceMix conf/ directory) stores usernames and their passwords. The other properties file groups.properties maps users to their roles. If authentication succeeds, each authenticated user gets one or more roles assigned, according to this properties file. The authorization decision that is performed later will be done on the user's role, not the user's name (role based access control)!
PropertiesLoginModule.initialize() does not do too much, it only resolves the two property file names from the login module configuration in login.properties.
In PropertiesLoginModule.login() these files get loaded into memory. Notice that any changes to either users-passwords.properties or groups.properties will be reloaded when dealing with the next invocation! So you can adjust the user and role mapping at runtime.
The next step is to instantiate a javax.security.auth.callback.Callback array containing a NameCallback and a PasswordCallback.

// in method PropertiesLoginModule.login()
Callback[] callbacks = new Callback[2];
callbacks[0] = new NameCallback("Username: ");
callbacks[1] = new PasswordCallback("Password: ", false);

These callbacks are then passed into the callback handler that was created with the LoginContext earlier in JAASAuthenticationService.authenticate() (see previous sample code above):

//in method PropertiesLoginModule.login()
callbackHandler.handle(callbacks);

The callback handler now iterates through all the callbacks (in our case two, namely the NameCallback and PasswordCallback) and checks of what type they are. Check the code that I quoted above already:

//in method JAASAuthenticationService.authenticate()
LoginContext loginContext = new LoginContext(domain, subject, new CallbackHandler() {
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (int i = 0; i < callbacks.length; i++) {
if (callbacks[i] instanceof NameCallback) {
((NameCallback) callbacks[i]).setName(user);
} else if (callbacks[i] instanceof PasswordCallback && credentials instanceof String) {
((PasswordCallback) callbacks[i]).setPassword(((String) credentials).toCharArray());
} else if (callbacks[i] instanceof CertificateCallback && credentials instanceof X509Certificate) {
((CertificateCallback) callbacks[i]).setCertificate((X509Certificate) credentials);
} else {
throw new UnsupportedCallbackException(callbacks[i]);
}
}
}
});

For the NameCallback it sets the username that was extracted from the WS UsernameToken profile of the incoming SOAP request. And for the PasswordCallback it sets the corresponding password. If the callback was of type CertificateCallback, it would set the X.509 certificate of the incoming WSSE security header (this is not the case in our example).
Next we jump back into our PropertiesLoginModule.login() method. The two callback objects now contain the client's username and password (as sent with the SOAP request), so we can authenticate them.
The rest is very simple. The username and password extracted from the callback are compared to the user/password combinations loaded from users-passwords.properties and if they don't math an FailedLoginException is raised.

// method PropertiesLoginModule.login() continued
user = ((NameCallback) callbacks[0]).getName();
char[] tmpPassword = ((PasswordCallback) callbacks[1]).getPassword();
if (tmpPassword == null) {
tmpPassword = new char[0];
}
String password = users.getProperty(user);
if (password == null) {
throw new FailedLoginException("User does not exist");
}
if (!password.equals(new String(tmpPassword))) {
throw new FailedLoginException("Password does not match");
}

It is this code where the authentication either succeeds or fails. If authentication fails, a FailedLoginException is raised, causing the call to PropertiesLoginModule.abort(), otherwise PropertiesLoginModule.commit() is invoked. Inside commit() we only add the relevant security roles from groups.properties to the authenticated user and add the entire authenticated principal to the java security Subject:

public boolean commit() throws LoginException {
principals.add(new UserPrincipal(user));

for (Enumeration enumeration = groups.keys(); enumeration.hasMoreElements();) {
String name = (String) enumeration.nextElement();
String[] userList = ((String) groups.getProperty(name) + "").split(",");
for (int i = 0; i < userList.length; i++) {
if (user.equals(userList[i])) {
principals.add(new GroupPrincipal(name));
break;
}
}
}
subject.getPrincipals().addAll(principals);
...
}

The Java security Subject now carries all credential information including its user roles.
The list of roles will be required at last to perform authorization.

Note: All these operations are performed from within JbiJAASInterceptor.handleMessage(). This interceptor handles all the complex JAAS authentication part. Thus when this interceptor finishes, the client request is either authenticated or rejected. The remaining CXF interceptors can now be invoked but they won't perform any security related functions.

Finally the
org.apache.servicemix.cxfbc.JbiInvokerInterceptor.handleMessage()
will use the org.apache.servicemix.jbi.security.SecuredBroker to place the constructed JBI message onto the ServiceMix Normalized Message Router (NMR). But before doing so, the SecuredBroker will check if the caller is authorized. So this is the final authorization.

org.apache.servicemix.jbi.security.SecuredBroker.sendExchangePacket()
Before the message is sent to the NMR, the SecuredBroker checks if the caller is authorized. Once the JBI message containing the payload of the SOAP message is put onto the NMR it will be delivered to the target service (in our case the demo's ws-security-cxfse-su component) without performing any security checks. So this authorization is the final security operation in our scenario.

The information about what roles are allowed to invoke on what services are specified in conf/security.xml in your ServiceMix installation. Out of the box there is an <authorizationMap> entry as follows:

<sm:authorizationMap id="authorizationMap">
<sm:authorizationEntries>
<sm:authorizationEntry service="*:*" roles="*" />
</sm:authorizationEntries>
</sm:authorizationMap>

So out of the box anyone can invoke on any service. This basically turns off authorization. The source code actually checks if roles="*" and bypasses authorization completely:

// inside SecuredBroker.sendExchangePacket()
if (!acls.contains(GroupPrincipal.ANY)) {
//perform authorization, details omitted.
}

... whereby GroupPrincipal.ANY resolves to "*".
If you want to enable authorization you need to specify one or more authorization entries in security.xml where the role are not just "*", meaning everyone. You would generally list some of the role names from your conf/groups.properties file.
Of course you can fine grain access control to a particular service by specifying the service attribute in the <authorizationEntry> element. The syntax is
 
<sm:authorizationEntry service="{namespace}:servicename" roles="admin"/>

as typically defined in your xbean.xml service configuration. So in our example, in order to limit access to only the administrator role for the cxf-se service, you would specify:

<sm:authorizationEntry service="{http://apache.org/hello_world_soap_http}:SOAPServiceWSSecurity"
roles="admin" />

So this list of authorization entry configuration from security.xml is available to the SecuredBroker. On the other hand it also has access to the javax.security.auth.Subject instance that contains all the clients security credentials including the authenticated user roles and it can extract the service name of the target service to invoke on from the JBI MessageExchange. Hence it is just a question of checking that the user role in the security Subject is allowed to invoke the service named in the MessageExchange. This check will be performed using the authorization configuration from security.xml.
If the caller is not authorized a security exception will be thrown, otherwise the exchanges gets put onto the NMR from where it will be routed to the right service.
That's it. In case the authorization succeeds the JBI message is put onto the NMR from where it will be routed and dispatched to the target service.

Executive Summary:

  • The WSS4JInInterceptor runs the WS-Security related functions, extract WS username and password from the SOAP security header and puts it into a WSUsernameTokenPrincipal object.
  • The JbiJAASInterceptor is the next CXF interceptor to invoke on and checks for the presence of a WSUsernameTokenPrincipal, extracts the credential information and invokes on the JAAS authentication service.

  • The JAAS authentication service uses the configured login module class (PropertiesLoginModule as set in conf/login.properties) to delegate the authentication decision.

  • The PropertiesLoginModules calls back on the registered JAAS callback handler to retrieve the username/password and authenticates these credentials against the definitions in user-passwords.properties. If authentication succeeds, it assigns the list of roles to this authenticated Subject. The call stack returns all the way back to the JbiJAASInterceptor.

  • The JbiInvokerInterceptor dispatches the request and uses the SecuredBroker to send the message exchange to the NMR.

  • The SecuredBroker performs authorization based on the user role names stored in the Java security Subject and based on the authorization configuration made in conf/security.xml.


I hope I could spot some light onto the mysteries of authentication/authorization in ServiceMix 3.
The ServiceMix documentation does unfortunately not document these concepts in much detail.

Looking forward to any feedback from you.

16 Apr 2009

Slower performance with the new servicemix-jms endpoints?


There are two servicemix-jms components available in the later versions of ServiceMix 3. The traditional jms endpoint that is configured using <jms:endpoint> and the new jms endpoint that uses <jms:consumer>; and <jms:provider>. The new JMS endpoints reuse much of Spring JMS. Very recently I noticed a pretty large performance difference between these two JMS consumers, in the way that the new servicemix-jms consumer was much slower than the traditional consumer while using a pretty default configuration for both.
The reason for that is that out of the box the new JMS endpoint is configured to use the Spring JMS DefaultMessageListenerContainer without any caching, so all JMS resources get re-created and closed for every message being sent or received. This of course degrades performance a lot.

There are two simple solutions:

1) Configure caching for this DMLC

<jms:consumer service="test: NewJMSConsumer"
endpoint="endpoint"
targetservice="test:bean"
targetendpoint="endpoint"
destinationname="queue/TEST.FOO"
connectionfactory="#AMQConnectionFactory"
cachelevel="3">


Using cacheLevel="3" (CACHE_CONSUMER) is the highest cache option and caches the pretty much all JMS resources for each listener thread (JMS connection, session and message consumer). Other levels are 2 (CACHE_SESSION ), 1 (CACHE_CONNECTION) and 0 (CACHE_NONE). If no cacheLevel is specified, CACHE_NONE is used!

2) Or configure for a different Spring JMS listener

<jms:consumer service="test:NewJMSConsumer"
endpoint="endpoint"
targetservice="test:bean"
targetendpoint="endpoint"
destinationname="queue/TEST.FOO"
connectionfactory="#AMQConnectionFactory"
listenertype="server">


The ServerSessionMessageListenerContainer type internally uses a ServerSessionPool (which as the name suggests pools the server sessions) and also results in reasonable performance. However this listener type might not be suitable for all deployments.

Both options should boost performance and perform equally fast as the old JMS endpoint. These jms consumer attributes are both documented but it is easy to overlook them and just use the default configuration.
I recommend solution 1, using the DefaultMessageListenerContainer and enable caching.

Finally there seems no reason why caching could not be turned on by default. So I raised JIRA SM-1841 at Apache.

23 Jan 2009

Using the Camel Aggregator correctly

I was recently working on a project that required the use of a Camel aggregator. Since I had not used the aggregator much before I started by reading through the Camel Aggregator documentation. Some questions remained though.
In my use case I needed to aggregate every five incoming messages of the same format together into a new exchange. Sounds very simple but I did not find it that straight forward. And that's the motivation for this post.
To have an example at hand; let's say I want to aggregate messages of the following general XML format (with different values of course):

<artist genre='Alternative'>
<name>The White Stripes</name>
<album>Elephant</album>
<album>Get Behind Me Satan</album>
<album>White Blood Cells</album>
</artist>


Suppose that's the message format of the incoming messages. I will build the aggregator according to this format.
As mentioned in the Camel documentation, each aggregator is configured by a correlation expression and an aggregation strategy.
The correlation expression is used to define which messages should be aggregated together. When the correlation expression is being evaluated at runtime on a particular message, it returns a value that is used as the correlation id under which messages are being aggregated. All messages that evaluate to the same correlation id are aggregated together. A mistake I made initially was assuming that all messages that do not match my correlation expression will not be aggregated. But every correlation expression will always evaluate to a particular correlation id for every message. Suppose I want to aggregate all those messages that have an alternative genre (genre='Alternative') and I define my aggregation strategy accordingly (we will explore that later). All messages that have a different genre set will still be aggregated somehow, depending on what id the correlation expression evaluates to for these messages at runtime. We will see more detailed examples of that later, but this is important to keep in mind.
The aggregation strategy on the other hand contains the logic for combining all message exchanges into a single aggregated message. The default aggregation strategy is org.apache.camel.processor.aggregate.UseLatestAggregationStrategy, which only aggregates the latest incoming message, discarding older messages. This is great for throttling purposes but did not serve my use case.

Generally a Camel aggregator is set up in Java DSL as follows (I will only use Java examples, XML based configuration will be quite similar):


from("somewhere").aggregate("somehow").to("some destination");


Let's not care too much about the from(…) and to(…) part of this route, they will be defined by your business use case. It is the aggregate(…) part that we are interested in here.
As mentioned above the aggregator is configured via the correlation expression (which messages to aggregate together) and the aggregation strategy (how to aggregate messages). So it comes as no surprise that the aggregate() method can take the following parameters:


public AggregatorType aggregate(Expression correlationExpression, AggregationStrategy strategy);


When defining a correlation expression, one can either correlate on a particular message header or on the message content. Pretty much all examples I have seen so far did correlate messages on the existence of a particular message header element. In Camel headers are named and can take values - they are basically (name, value) pairs.
Suppose some of my messages also have a header called "MusicCollection" with a value "Yes" set. I can then aggregate my messages based on this header and define my correlation expression as follows:


.aggregate(header("MusicCollection"), …).


This correlation expression will evaluate to the value of the header "MusicCollection" at runtime (the String "Yes" in the above example). If no such header is set, the correlation expression will evaluate to null at runtime, which can still be used as a key for aggregating messages.
But what If I don't want to or cannot use message headers? Perhaps the message was received from an external system without any headers. I can then also correlate messages based on their message payload as well. Using an XPath expression I can test if a specific element or structure is present in the message payload. In my example the messages start with as the root element, so it seems natural to define a correlation expression like this:


.aggregate(xpath("/artist", …).



This however won't work as expected. Internally the aggregator will use the Camel XPathBuilder to evaluate the XPath expression and the above expression will return a new instance of org.w3c.dom.NodeList for every new message. The aggregation however can only occur on a correlation expression that evaluates to the same object or primitive value at runtime for all messages to be aggregated (as that object or primitive value is used internally as a HashMap key for storing the aggregated messages into exchanges). So we need to define an XPath expression that evaluates to the same object (or key) at runtime. The trick is to select a proper XPath expression and to cast the evaluation result into a type like java.lang.String, such as in the following example:


.aggregate(…).xpath("name(/artist)='artist'", String.class).


I had to move the XPath expression after the aggregate() method call because of the additional String.class argument. The above example will take the name of the root element of my message (which will be "artist") and test if it equals the string "artist". For the sample message shown in the beginning, this expression will evaluate to the String "true". For any other XML messages that don't start with , this XPath expression will evaluate to the String value "false" and these messages will still be aggregated according to my aggregation strategy.
Aggregating on only the root XML element does not seem very useful. I might want to aggregate on the genre attribute of the element instead. Here is the corresponding correlation expression:


.aggregate(…).xpath("/artist/@genre", String.class).


This does serve my use case. If I want to aggregate all messages with an "Alternative" genre I could use:


.aggregate(…).xpath("name(/artist[@genre='Alternative'])", String.class).


Notice this will aggregate all messages starting with together into a new exchange. All other messages that have a different genre will also be aggregated together into a different exchange, no matter what their genre value is. If you wondered why I use the XPath function name() here, then that is because using name() will return the String "artist" as the correlation key for the example message above (the name of the XML element that matches my XPath expression). If I omitted the name() function, it would return a string representation of the node set itself. In the case of my example message the correlation key would be the string "White Stripes Elephant Get Behind Me Satan White Blood Cells". Using "artist" as the key seems better somehow.
If you are unsure to what key your own correlation expression evaluates at runtime, place a breakpoint into method org.apache.camel.processor.aggregate.DefaultAggregationCollection.add() and debug your Camel route from there. Check the value of the computed correlationKey object.
You can of course define your correlation expression in different expression languages than XPath such as XQuery, SQL and others.

That should cover the correlation expression part of the aggregator. The second part deals with the aggregation strategy. Remember, the strategy defines how messages are aggregated together. The default strategy used by Camel is the UseLatestAggregationStrategy, which out of x received messages only takes the latest message. If however I want to aggregate multiple messages together into a single message, I need to write my own aggregation strategy. This isn't a big deal and the Camel aggregator documentation shows an example implementation. I have re-used that code for my example as well.


public class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newIn.getBody(String.class);
newIn.setBody(oldBody + newBody);
return newExchange;
}
}


In this implementation we extract the messages of both Exchanges old and new, then simply concatenate the two messages together and place the result on the new Exchange. Nothing fancy here. We can then instantiate this class and pass its reference into the aggregator:


.aggregate(…, new MyAggregationStrategy()).


So we are done, right? Well, what we have not defined yet is a criterion as to when the aggregation is done. There are again multiple ways of doing so. If I want to aggregate a fixed number of incoming messages, I can easily use the batch options:


.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).
batchSize(5).
to(…);


In this example the aggregation will finish whenever it has processed 5 incoming messages no matter as to how they got aggregated (into which exchanges). Or I could specify a timeout after which the aggregated message is sent, irrespective of how many message have been aggregated:



.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).
batchTimeout(5000L).
to(…);


If I want to finish the aggregation when there are 5 messages aggregated together into one new exchange, then a more complex logic is required, which I can specify using the completedPredicate() expression:


.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).
completedPredicate(header("aggregated").isEqualTo(5)).


The example above assumes that there is a header called "aggregated" set and whenever its value reaches the number 5, the aggregation will finish. Of course some application level code needs to set this header and also increment the counter. This would typically be done in the custom aggregation strategy implementation, where increasing that number can be based on any application logic (see attached demo, which illustrates this approach).
I can also call a particular method on a Spring bean for deciding when to finish the aggregation by using a BeanExpression:


.aggregate(new MyAggregationStrategy()).
xpath("("/artist/@genre", String.class).
completedPredicate(
new BeanExpression("aggregatorStrategy", "isCompleted"))
).
to(...)


This requires a Spring bean with name "aggregatorStrategy" being defined in the Camel Spring configuration (your camel-context.xml) which exposes a public method called isCompleted().


<bean id="aggregatorStrategy"
class="com.progress.cs.camelaggregatortest.MyAggregationStrategy">


Check the attached demo for an example.
Finally, the entire aggregator code in Java DSL for my use case now reads:


from("…").
aggregate(new MyAggregationStrategy()).
xpath("/artist/@genre", String.class).
batchSize(5).
to("…");


Using a Camel aggregator can appear rather complex at first. But once you get to see some working examples it becomes much clearer (as with all things in life). I personally had some troubles getting the aggregator to work with an XPath based correlation expression. That part I wanted to share most of all in this article.

And finally here is the demo for this article. After extracting, follow the steps in README.txt to run it quickly.

29 Jul 2008

WS-Security woes in ServiceMix 3.2

Previously I had to get WS-Security UsernameToken based authentication working in ServiceMix 3.2 for a customer. All the customer wanted was to have an external SOAP client connecting securely to a service deployed in ServiceMix and have the client being authenticated and authorized by ServiceMix. Sounds like a straight forward use-case and the fact that there already is a ws-security demo in ServiceMix made me hope this would be a simple project.
As Murphy's Law dictates, it turned out to be the opposite.

My initial hope was to simply define a CXF-BC consumer that is WS-Security enabled and that authenticates the client and passes on the request to a CXF-SE service. So overall use-case is
External client -> CXF-BC consumer -> CXF-SE service.
This can be set up using Maven in less than 10 minutes. The tricky part was to configure security. The ws-security demo of ServiceMix already shows the configuration for using XML-Signature and XML-Encryption, so I could simply copy that part. For using WS-Security functions ServiceMix simply leverages WSS4J, so it becomes a matter of correctly configuring WSS4J interceptors for the incoming request and outgoing reply.
Adding WS-Security UsernameToken based authentication configuration to the WSS4J interceptor is relatively simple too:

<bean class="org.apache.cxf.ws.security.wss4j.WSS4JInInterceptor"
id="TimestampSignEncrypt_Request">
<constructor-arg>
<map>
<entry key="action" value="Timestamp Signature Encrypt UsernameToken"/>
<entry key="signaturePropFile" value="alice.properties"/>
<entry key="decryptionPropFile" value="bob.properties"/>
<entry key="passwordCallbackClass"
value="org.apache.servicemix.samples.cxf_ws_security_usernametoken.KeystorePasswordCallback"/>
</map>
</constructor-arg>
</bean>

I simply had to add UsernameToken to the <action> element in the above WSS4JInInterceptor configuration (config for XML-Signature and XML-Encryption was directly taken from ws-security demo). I won't care about the client configuration here, as this could be any SOAP client but need to add that my client was configured to transmit username "alice" with password "password".
For authorization to work, I had to add these credentials to conf/users-passwords.properties

# users-passwords.properties
smx=smx
alice=password

and make alice a member of the administrators role in conf/groups.properties (for a real use-case I would have defined a new role):

# groups.properties
admin=smx,alice

Finally to restrict only users of the administration role to invoke my service, I had to change conf/security.xml

<sm:authorizationMap id="authorizationMap">
<sm:authorizationEntries>
<sm:authorizationEntry service="*: SOAPServiceWSSecurity"
roles="admin" />
</sm:authorizationEntries>
</sm:authorizationMap>

Easy I thought and went to testing it. But it did not work. There was simply no authentication happening. Only when debugging through the code I realized the CXF-BC component did not delegate these credentials to JAAS for authentication! So there was no authentication possible in CXF-BC! I raised a bug report and took a deep breath...
The good news is that by now this bug got resolved and things work as expected. However you need a very recent version of ServiceMix to have this fix included. Now if a client sends a WS-Security header containing username and password, these will be authenticated correctly by JAAS inside ServiceMix.
Still at the time of discovering the bug I had to get things working for the customer and got to know that the HTTB-BC component supports username/password based authentication. So I started to replace my CXF-BC consumer with a HTTP-BC consumer. And this is where the real pain started.
The configuration again is straight forward and does not look much different from the CXF-BC consumer configuration. However, for authentication to work in HTTP-BC, I must set soap="true" in my bc consumer config:

<beans xmlns:http="http://servicemix.apache.org/http/1.0"
xmlns:greeter="http://apache.org/hello_world_soap_http"
xmlns:soap="http://servicemix.apache.org/soap/1.0">
<http:endpoint service="greeter:SOAPServiceWSSecurityHTTP"
endpoint="endpoint"
role="consumer"
targetService="greeter:SOAPServiceWSSecurity"
targetEndpoint="endpoint"
locationURI="http://localhost:9000/"
defaultMep="http://www.w3.org/2004/08/wsdl/in-out"
soap="true">
<http:policies>
<soap:ws-addressing />
<soap:ws-security receiveAction="UsernameToken" keystore="default" />
</http:policies>
</http:endpoint>
</beans>


This is because the WSSecurityHandler that is called by the HTTP-BC consumer expects a DOM representation of the SOAP header, which won't get created unless soap="true" is set. With that I got the HTTP-BC component to authenticate the client, but the request failed to be un-marshaled by the CXF-SE runtime. Setting soap="true" in my http consumer had the side effect of stripping off the SOAP header from the request before sending it on to the next component inside ServiceMix. The CXF-SE runtime however requires a valid SOAP request, which it now did not get. Aargh...

So what to do with this dilemma? The easiest option I could think of was to insert a Camel component between the HTTP-BC consumer and the CXF-SE service that adds a default SOAP envelope to the In message using xslt transformation and removes the SOAP envelope from the Out message produced by the CXF-SE component again (as the http-bc component will add an envelope when soap="true" is set.) Not an ideal solution and rather a hack than a proper workaround. Another option would have been to use a custom CXF interceptor that takes care of adding/stripping the SOAP envelope but I felt a Camel XSLT component will be simpler to use.
I inserted the Camel component that did add/remove the SOAP envelope. Still I had a problem with the reply. The reply message had to be signed and encrypted but that was not the case, despite my configuration being correct. After another few hours of debugging (I am new to the codebase) I realized this part had yet not been implemented and I had hit another bug. Because of that bug outgoing SOAP reply message cannot be encrypted nor signed when using the HTTP-BC component.

I still found some other issues on the way that I raised accordingly. Password digests as defined by the WS-Security UsernameToken spec are currently not supported (passwords need to be send in clear text in the SOAP header - but the header can still be encrypted, so this is no security problem). And the call to authorization in ServiceMix does not produce any logging. Finally I raised a few documentation issues.

Lessons learned:
As you can see I hit a number of issues on the way. But the good news is that these will now be addressed and fixed in future versions of ServiceMix, making the product more stable and easier to use. The most important bug is already resolved so there is no need to use a HTTP-BC consumer anymore when doing WS-Security (and I highly recommend against it).

The demo that I worked on is already incorporated into the cxf-ws-security demo of ServiceMix in the current release.

The most difficult part for me was to hit very low level error messages that did not tell me much about the real cause of the problem. This can be quite frustrating and the only way I got to understand the error was to debug through the source code. So error reporting needs a hell lot of improvement in ServiceMix.

Secondly, the low-level XML configuration is highly error-prone. ServiceMix really needs proper tooling that free the developer from configuring xml files. It is not only easy to make mistakes in the config, there are also so many configuration options that have implications unknown to the regular developer (like the soap="true" story).

28 May 2008

... the namespace on the "message" element, is not a valid SOAP version.

Recently I was working on a SMX usecase:
external client -> CXF BC consumer -> CXF SE

All configured nicely it still failed at runtime with

INFO: Interceptor has thrown exception,
unwinding now org.apache.cxf.binding.soap.SoapFault:
"http://java.sun.com/xml/ns/jbi/wsdl-11-wrapper",
the namespace on the "message" element, is not a valid SOAP version.


It took me a while to figure out what's wrong. In the end I only had to set useJBIWrapper="true" in the config of my CXF SE component:


<beans xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0">
<cxfse:endpoint useJBIWrapper="true">
<cxfse:pojo>
<bean class="org.apache.servicemix.samples.cxf_ws_security.GreeterImpl" />
</cxfse:pojo>
</cxfse:endpoint>
</beans>


Those error messages really need to get improved in SMX. There are too many low level errors raised that give you no indication of what the real problem is.

15 Apr 2008

Chronic - a date and time parser for Ruby

I have recently come across Chronic, a very elegant Date and Time parser library for Ruby. It can be easily installed via RubyGems and can process all sorts of dates and times written in natural language formats. If you need to parse date in Ruby this might be the library you have been looking for.

There is an interesting post on parsing US and non-US date formats in this post.

7 Apr 2008

Kerberos authentication in Java

Have you used Kerberos authentication in Java using JGSS? The GSSContext interface defines the two methods initSecContext() and acceptSecContext() for an initiator and acceptor of a Kerberos authentication request. JGSS is configured in a file typically called jaas.conf, where you specify the Kerberos keytab files, Kerberos principal name and other properties that configure the authentication process.

According to the Kerberos authentication algorithm, only the initiator (or client) needs to contact the Kerberos authentication server, asking for a service ticket. The service ticket is later passed on to the server that the client wants to authenticate against and it contains all the information that the server (or acceptor) requires for authenticating the client. That implies the server does not need to communicate with the Kerberos authentication server when authenticating a client, as it is emphasized in pretty much every description of the Kerberos protocol.

Still when using JGSS you might have noticed that your service does contact the Kerberos authentication server as part of the authentication process. The server will request a ticket granting ticket from the KDC even though this is not needed. This behavior has caused me some headaches in the past as I did not understand why the server would need to contact the KDC.

There is a not very much documented property in jaas.conf that will solve this issue. If in your com.sun.security.jgss.accept stanza of jaas.conf you specify isInitiator=false, then the server will not request a TGT. Officially this property is only supported from Java SE 6 b89 onwards, however I verified that the later patches of Java 5 also include it.
So when setting up jaas.conf, I recommend adding isInitiator=false to the configuration of the acceptor.

This topic is also discussed here.

4 Apr 2008

Let's change the default transport protocol used in ServiceMix 3

Today I want to post about a potential issue that I came across recently when running tests using the FUSE ESB (which is IONA's distribution of Apache ServiceMix).

Currently ServiceMix 3.2 configures the underlying ActiveMQ broker to simply use tcp as the transport protocol. This is defined in conf/servicemix.properties:

activemq.url = tcp://${activemq.host}:${activemq.port}

While this configuration works fine and is sufficient for using ServiceMix in development, it will not recover from any network failures and is therefore not suitable for running in a production system. In case of a network failure or connection inactivity timeout, ActiveMQ will simply raise an exception but won't try to reconnect. You will need to restart ServiceMix in order to recover from such error. For automatic reconnection to happen, the failover protocol should be used instead. Hence the default configuration should be changed to

activemq.url = failover://(tcp://${activemq.host}:${activemq.port})

The failover protocol should really be the default protocol and enabled out of the box. I logged SM-1302.

29 Feb 2008

Bug in gdm under SuSE 10.3 (and perhaps in other Linux distributions)

Anyone who is running Linux with GNOME in VMWare might have come across a rather serious problem with gdm lately after doing an upgrade of the system with the latest patches. I am running OpenSuSE 10.3 and installed the latest patches just a few days ago. The patch installation went fine but next morning when I tried to reboot my Linux image, gdm failed to start up. Gdm initially started up but then crashed just before or shortly after showing the login prompt. When gdm was started the first time it ran a few seconds longer before crashing. You won't even have enough time to login. On subsequent restarts, GDM crashed almost instantly for every restart attempt. It attempted 6 restarts, then raised the following error to the console, slept for 2 minutes and started over again:

"The display server has been shut down about 6 times in the last 90 seconds, it is likely that something bad is going on. Waiting for 2 minutes before trying again on display:0"

The log in /var/logs/Xorg.0.log should had the following lines at the end:
Backtrace:
0: /usr/bin/X(xf86SigHandler+0x81) [0x80e6d81]
1: [0xffffe420]
2: /usr/bin/X [0x817b185]
3: /usr/bin/X(CompositePicture+0x150) [0x8161be0]
4: /usr/bin/X [0x8167a1f]
5: /usr/bin/X [0x8164d75]
6: /usr/bin/X [0x815809e]
7: /usr/bin/X(Dispatch+0x1af) [0x808f68f]
8: /usr/bin/X(main+0x47e) [0x807717e]
9: /lib/libc.so.6(__libc_start_main+0xe0) [0xb7d1ffe0]
10: /usr/bin/X(FontFileCompleteXLFD+0x1e5) [0x8076501]

Fatal server error:
Caught signal 11. Server aborting


After various tries I was still unable to resolve the problem but found a way to bring up the X Windows system. I had to login as root from console and run gdm -stop. The next time gdm was restarted (after the 2 min had elapsed) it did come up fine.

So I went on the web to see if someone else had encountered the same problem as well. When I checked SuSE's bug database, I realized I was not the first one to encounter this issue. A bug was already raised https://bugzilla.novell.com/show_bug.cgi?id=350318.

Apparently the culprit is a patch to the Control-Center2 (https://bugzilla.novell.com/show_bug.cgi?id=337434) that seens to break gdm.

This is quite a severe problem and only seems to affect Linux running inside VMWare. I am not yet sure if other distributions are effected as well, most likely they are.