1 Mar 2012

Camel JMS with transactions - lessons learned


Note: I have recently (2012-07-16) updated point 1) below to correct a little mistake. If you are revisiting this post, you may want to re-read point 1).

This blog tries to summarize a couple of lessons learned when using the camel-jms component with transactions.

Setting the scene

A typical camel-jms component configuration that configures for transactions could look like follow:


<bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> 
  <property name="configuration" ref="jmsConfig" />
 < /bean>
  
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration" >
  <property name="connectionFactory" ref="jmsPooledConnectionFactory" /> 
  <property name="transacted" value="true" /> 
  <property name="transactionManager" ref="jmsTransactionManager" />
  <property name="cacheLevelName" value="CACHE_CONSUMER" />
 < /bean>

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
  <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
 < /bean>           

<bean id="jmsPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
      init-method="start" destroy-method="stop" >
  <property name="maxConnections" value="2" />
  <property name="connectionFactory" ref="jmsConnectionFactory" />
 < /bean> 

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
  <property name="brokerURL" value="tcp://localhost:61617" /> 
  <property name="watchTopicAdvisories" value="false" />
 < /bean>


This configuration defines a camel-jms component which uses the ActiveMQ PooledConnectionFactory, enables transactions (via transacted=true) and configures an external transaction manager (Spring JmsTransactionManager).  Finally the camel-jms components cache level is set to CACHE_CONSUMER.

Fairly standard stuff so far.


Lessons learned

1) Despite setting a cache level of CACHE_CONSUMER the configured transaction manager creates its own JMS connection and JMS session.

CACHE_CONSUMER is actually the cache level setting for Springs DefaultMessageListenerContainer class (DMLC) which is used by camel-jms internally. Its the DMLC that is responsible for receiving a msg from the broker, starting and committing transactions, etc. Camel simply hooks in as a message listener in the DMLC. So all the low level JMS stuff is handled by Spring.
In the JmsConfiguration bean example above all the specified properties configure Springs DMLC.

At runtime Springs DMLC ensures a new transaction is spawned before an attempt is made to receive one message. As you surely know, the DMLC does not support batched transactions (which is basically the reason why camel-jms does not support it either).

The low level JMS code reads as follows


/* AbstractPollingMessageListenerContainer.java */
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
  throws JMSException {

  if (this.transactionManager != null) {
    // Execute receive within transaction.
    TransactionStatus status =     
      this.transactionManager.getTransaction(this.transactionDefinition);
    boolean messageReceived;
    try {
      messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
    }
    ...



In the Spring XML configuration above we configured for a transaction manager, so this code at first creates a new transaction and then enters into the doReceiveAndExecute() method, which tries to receive a msg and dispatches it to the configured MessageListener (Camel in this case).

The call to getTransaction() is invoked on the configured JmsTransactionManager. The transaction manager does not re-use the already instantiated JMS connection or session but uses the registered connection factory to obtain a new connection and to create a new JMS session. Have a look at JmsTransactionManager.doBegin():



protected void doBegin(Object transaction, TransactionDefinition definition) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                  throw new InvalidIsolationLevelException("JMS does not support an isolation level concept");
            }
            JmsTransactionObject txObject = (JmsTransactionObject) transaction;
            Connection con = null;
            Session session = null;
            try {
                  con = createConnection();
                  session = createSession(con);
                  if (logger.isDebugEnabled()) {
                        logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
                  }
                  txObject.setResourceHolder(new JmsResourceHolder(getConnectionFactory(), con, session));
                  txObject.getResourceHolder().setSynchronizedWithTransaction(true);
                  int timeout = determineTimeout(definition);
                  if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                        txObject.getResourceHolder().setTimeoutInSeconds(timeout);
                  }
                  TransactionSynchronizationManager.bindResource(
                              getConnectionFactory(), txObject.getResourceHolder());
            }
            catch (JMSException ex) {


The created JMS session and connection is then linked to the transaction object.
These session and connection object will not be used for receiving a message from the JMS broker but will only be used to commit the transaction. As part of the cleanup after that commit the session and connection will be cleaned up correctly.
So even though you specify a cacheLevelName of CACHE_CONSUMER in you Canel JmsConfiguration, there is still a new connection being created by the transaction manager for every message. If you did not configure for a pooled ConnectionFactory (unlike the above Camel configuration) you would create a new physical tcp connection into the broker and a JMS session on top of that for every message to be processed in your Camel route.
If you're using the ActiveMQ PooledConnectionFactory, then the transaction manager requests a new connection and a new session from the pool each time, avoiding the overhead of creating a new physical tcp connection into the broker for every new transaction.

Note that for receiving the message and dispatching it to the configured MessageListener (Camel in our case) the DMLC correctly uses the cached consumer. Its only for the transaction management that a new JMS connection and session is created.

After a message got received and dispatched, the transaction commits and the cleanup after the commit calls Connection.close(), which returns the extra connection and session back to the pool again.I have discussed this behavior with the Spring folks in this forum post.
Bottom line: When using a camel-jms configuration as above, its really important to use a pooled ConnectionFactory such as ActiveMQs PooledConnectionFactory or Springs CachingConnectionFactory. Not using a pooled ConnectionFactory means you will open a new JMS connection and session to the broker for every transaction / message to be received. 

2) Springs DefaultMessageListenerContainer allows to use local JMS transactions without setting a transaction manager.

Perhaps this is known to everyone but me. My perception so far was that you need to configure for a transaction manager if you want to use transactions in Spring JMS.
But the following configuration of a DMLC in XML is perfectly valid and works


<bean id="DMLC" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="cachingConnectionFactory" />
  <property name="sessionTransacted" value="true" />
  <property name="destinationName" value="requestQueue" />
  <property name="cacheLevelName" value="CACHE_CONSUMER" />
  <property name="messageListener" ref="msgListener" />
< /bean>


This is a plain Spring JMS configuration example, no Camel involved.
Using this config for setting up a JMS consumer will honor the cache level setting of CACHE_CONSUMER.
This config will still use local JMS transactions but not via an external transaction manager. In this case the DMLC will handle the transaction directly and hence can respect the cache level settings.

Actually the javadoc of Springs AbstractPollingMessageListenerContainer.setTransactionManager() has a little note that mentions this option:

"Note: Consider the use of local JMS transactions instead. Simply switch the "sessionTransacted" flag to "true" in order to use a locally transacted JMS Session for the entire receive processing, including any Session operations performed by a SessionAwareMessageListener (e.g. sending a response message)."
So the problem discussed in 1) could be resolved by not setting an external transaction manager in the camel-jms component configuration.



3) Camel (up to version 2.9) does not allow for using local JMS transactions without an external TX manager set. 

The JmsConfiguration class tries to get the transaction manager and raises an IllegalArgumentException if none was set. The code reads as follows in Camel 2.9:

/* JmsConfiguration.java */

protected void configureMessageListenerContainer(DefaultMessageListenerContainer container,
  JmsEndpoint endpoint) throws Exception {


  ...
  PlatformTransactionManager tm = getTransactionManager();
  if (tm != null) {
    container.setTransactionManager(tm);
  } else if (transacted) {
    throw new IllegalArgumentException("Property transacted is enabled but a transactionManager was not injected!");
  }
  ...
}


The camel-activemq component is slightly different as it creates a default JmsTransactionManager if none is specified. I have raised Improvement CAMEL-5055.

Bottom line: You cannot use transactions in camel-jms without specifying a transaction manager. As a result you should always use a pooled connection factory (see point 1).

Update: As of Camel 2.10 it is now possible to use transactions without specifying a transaction manager in the Camel JmsComponent configuration.



4) Using Camel to setup a transacted durable topic subscriber requires you configure it for its own non shared PooledConnectionFactory instance.

Using the camel-jms configuration shown in the beginning of this post we can define a Camel route as follows

public void configure() throws Exception {

            
  from("jms:topic:TestTopic?clientId=testclient1&durableSubscriptionName=testdurasub1")
  .id("transacted-route")
  .to("log:transacted-route?showAll=true&showBody=true")
  .to(...)


This route definition creates a durable topic subscriber. This is achieved by specifying a clientId on the JMS connection and a durable subscription name on the TopicConsumer. 

In addition the initial Spring config configures the camel-jms component to be transactional using an external transaction manager.  

As outlined in point 1) the external transaction manager bypasses any caching at DMLC level. So the connection is returned to the pool after every usage. The pool will have one connection that has the clientId "testclient1" set. 

Now imagine what happens if the maximum pool size of this PooledConnectionFactory is set to >1 and another Camel route is also using the same PooledConnectionFactory. 
Then both Camel routes will request a connection from the same pool and it is very likely that the DMLC of the above route retrieves a connection that does not have the clientId set. So it uses a different connection than it should for receiving a message from the broker using its durable subscription.

That can lead to various errors. 
If the second Camel route also creates a durable subscriber but with a different client id (and uses transactions) then problems like these can occur:


org.apache.camel.component.jms.JmsMessageListenerContainer WARN  Could not refresh JMS Connection for destination 'test' - retrying in 5000 ms. Cause: Setting clientID on a used Connection is not allowed.


As the second route retrieved the connection that had already got a client id set and tries to set its own client id. The connection to the broker will not succeed.

A durable subscriber cannot share the connection with other consumers because of the client id that is specifically set on the connection. Hence a Camel route that creates a durable subscriber should not share the PooledConnectionFactory with any other routes. 

In addition the connection pools maximum size should be set to 1 connection only so that it always reuses the connection that has the client id set correctly. 

When the durable subscriber is not transacted things are slightly better as then the DMLC cache level settings will be respected and using CACHE_CONNECTION or higher the connection is not returned back to the pool until the route gets shut down (which destroys the DMLC and returns the connection back to the pool). However if the route is restarted later (within the same JVM instance), you would run into the same problem again and get to see the above warning.  

Also related to this is bug MB-1106

Bottom line: 
When using Camel to create durable topic subscribers, always assign a non-shared PooledConnectionFactory with maxConnections=1 to that Camel route. 


5) Set prefetch=1 on the ActiveMQConnectionFactory when using ActiveMQs PooledConnectionFactory with XA transactions.

When using XA transactions across two different transaction resources, then you will most likely need to turn off JMS resource caching in the Spring DMLC configuration by using a cache level name of CACHE_NONE. Caching at DMLC level often does not work correctly with XA transactions.

Using a connection factory that supports pooling, like ActiveMQs PooledConnectionFactory is highly encouraged in this case.
However ActiveMQs PooledConnectionFactory does not support pooling of JMS consumers. So the consumer still gets created and destroyed for every single message to be consumed. When using a default prefetch of 1000 on the ActiveMQConnectionFactory, the broker will try to send up to 1000 messages to the consumer although the consumer will only ever process one message and then get destroyed (by Springs DMLC, due to CACHE_NONE configuration). So in the worst case the broker eagerly prefetches 1000 messages out of which 999 get discarded . This places quite some overhead on the broker and the consumer. By reducing the prefetch to just one message, there won't be any overhead on messages being dispatched eagerly to consumers. This can actually have a significant performance improvement, depending on your use case. If processing of the messages takes much more time than the low level JMS operation, then you may not notice much of a difference. However if you need to route messages from one destination to another as fast as possible without much processing, the performance improvements may be high. In both cases it reduced the overhead of handling prefetched messages inside the broker and consumer.
So the suggestion is to use a brokerUrl with a prefetch of 1, e.g.

failover:(tcp://localhost:61614)?jms.prefetchPolicy.all=1


6) Use Springs CachingConnectionFactory
With the optimization of point 5) we still create a new JMS consumer for every message to be consumed as we don't have consumer pooling in ActiveMQs PooledConnectionFactory.
Creating a new JMS consumer is a synchronous call into the broker. The consumer gets registered in the broker and a response is sent back from the broker. The consumer thread is blocked until it receives the broker's response. If a new consumer is created for every message then the client side thread is waiting for the broker's response quite a significant amount of time.
The following CPU sampler report from jvisualvm for a demo that tries to move msgs from one destination to another as fast as possible using transactions illustrates this (click on image to enlarge):



This report shows that about 16% of the Camel threads overall CPU time is spent on creating the consumers. Out of these 16% the thread waits 14% of the time for a response from the broker on registering the new consumer. For high throughput scenarios this may not be ideal.

Springs CachingConnectionFactory does allow to cache consumers.When caching of consumers is enabled, this overhead of recreating a JMS consumer for every message can be avoided. However this comes at a cost.
You need to be very careful when caching/pooling consumers in the CachingConnectionFactory because of the prefetch. If multiple consumers get created and each consumer gets a couple of messages prefetched then its possible that consumers remain idle in the cache but have prefetched messages that never get processed. This results in the brokers JMX statistics showing a number of messages inflight and an equal number of messages on the queue which never seem to get processed.

If you want to use the CachingConnectionFactory with consumer caching enabled, you then need to either
1) configure each Camel route instance for its own CachingConnectionFactory instance. That is don't share a CachingConnectionFactory between two Camel routes.  In that case you can work with prefetches>1 and get even more performance from prefetching messages eagerly to consumers.
If using concurrentConsumers=x on the camel-jms endpoint configuration then set sessionCacheSize=x on the CachingConnectionFactory as well!
Alternatively
2) use a prefetch=0 with the ActiveMQConnectionFactory that is wrapped inside the CachingConnectionFactory. This turns on polling mode and the broker will never eagerly prefetch messages in advance. Such configuration will probably wipe out the performance improvements that you can get with consumer caching in general as the consumer needs to actively ask the broker for a message and will never have any messages prefetched.

Bottom line is you have to be very careful when caching consumers because of the prefetch. Otherwise you may experience stuck msgs. On the other hand you can achieve good performance improvements in certain high throughput scenarios by caching the consumer.

I highly suggest to test your configuration properly before deploying to production.


Additional updates added later:


7) Even with transacted=false set on the JmsComponent configuration, transactions are used if a transaction manager is configured.

So a configuration like


<bean id="AMQListenerJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  <property name="connectionFactory" ref="AMQPooledConnectionFactory" />
  <property name="transactionManager" ref="SpringJMSTransactionManager" />  
  <property name="transacted" value="false" />
   <property name="concurrentConsumers" value="10" />
   <property name="cacheLevelName" value="CACHE_CONSUMER" />
   <property name="acknowledgementMode" value="1"/>
</bean>

which explicitly sets transacted=false but also configures a transaction manager, will use local JMS transactions.
The reason for that is again in Spring's code which has a check if a transaction manager is configured and then immediately creates a transaction which is later committed.


/* AbstractPollingMessageListenerContainer.java */
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
  throws JMSException {

  if (this.transactionManager != null) {
    // Execute receive within transaction.
    TransactionStatus status =     
      this.transactionManager.getTransaction(this.transactionDefinition);
    boolean messageReceived;
    try {
      messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
    }
    ...
 
So referencing the transaction manager in the JmsComponent configuration is enough to turn on transactions. On the other hand it is not enough to say transacted=false in order to switch off transactions. You need to also remove the transaction manager from the Camel JmsComponent configuration if you want to turn off transactions.
Enabling debug logging for org.springframework.transaction is always a good check whether transactions are used in the Camel route or not. 
If they are used the logging output should periodically log messages like 


DEBUG JmsTransactionManager - Creating new transaction with name [JmsConsumer[test.in]]: 
PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG JmsTransactionManager - Created JMS transaction on Session [PooledSession 
{ ActiveMQSession {id=ID:mac.fritz.box-49502-1364202453607-1:6:1,started=false} }] from 
Connection [PooledConnection { ConnectionPool[ActiveMQConnection {id=ID:mac.fritz.box-49502-1364202453607-1:6,
clientId=ID:mac.fritz.box-49502-1364202453607-0:6,started=false}] }]
DEBUG JmsTransactionManager - Committing JMS transaction on Session [PooledSession 
{ ActiveMQSession {id=ID:mac.fritz.box-49502-1364202453607-1:5:1,started=false} }]

even on an idle route that does not process any messages.

8) Configuring a cacheLevelName >= CACHE_CONNECTION will only use one JMS connection into the broker regardless of how many concurrentConsumers are configured.

Consider this Camel and JMS configuration:


<bean id="AMQListenerJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
   <property name="connectionFactory" ref="AMQPooledConnectionFactory" />
   <property name="transacted" value="false" />
   <property name="concurrentConsumers" value="20" />
   <property name="cacheLevelName" value="CACHE_CONSUMER" />
   <property name="acknowledgementMode" value="1"/>
</bean>


<bean id="AMQPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
   <property name="maxConnections" value="20" />
   <property name="maximumActive" value="20" />
   <property name="connectionFactory" ref="AMQJMSConnectionFactory" />
</bean>
 
which configures for 20 concurrent JMS consumers in the Camel route and also for a connection pool of 20 connections. This sounds reasonable at a first glance as you would think that each concurrent consumer can then use its own JMS connection.
But with a cacheLevelName  > CACHE_NONE this is not the case. Spring internally will ask the PooledConnectionFactory for only one connection (i.e. call PooledConnectionFactory.createConnection() only once). It then creates all JMS sessions and consumers on top of this one connection. This in itself is not a problem as the JMS specification specifically allows multiple JMS sessions to share the same connection. However it then does not make much sense to configure maxConnection=20 on the PooledConnectionFactory if this connection factory is not shared by multiple Camel routes.
You can confirm that only one connection is created in the above configuration by checking the log file for the presence of 

INFO  FailoverTransport - Successfully connected to tcp://localhost:61616

(assuming the failover transport is used which is highly recommended).
There should only be one such line in the log.

Now - to make things more complicated - this changes when you use JMS transaction. Recall point 1) of this post. The transaction manager creates its own connection into the broker and it does not cache any created connection. Instead it calls ConnectionFactory.createConnection() when creating a new transaction (this happens for every new message to be consumed) and calls Connection.close() as part of the cleanup routine after each transaction got committed or rolled back.
In addition the PooledConnectionFactory eagerly fills the pool with another connection on every call to createConnection() until it reaches the configured maxConnections limit. So with the configuration above where maxConnections=20, the transaction manager will cause 20 connections being created in the pool as it subsequently calls PooledConnectionFactory.createConnection(). So 20 active connections get created in the pool but only two of these will be used at any time. One connection will be used by the transaction manager and another connection by the Spring consumer. The Spring consumer will use the same connection over its lifetime (due to a cache level > CACHE_NONE) but the tx manager will request a new connection from the pool for every new message.
The effect is that 20 connections are made into the broker and only two of them are ever used at the same time. The Camel log should contain 20 lines of
INFO  FailoverTransport - Successfully connected to tcp://localhost:61616

Therefore the recommendation is to reduce the poolSize to 1 or maximum 2 connections. Using just one connection is fine as then the transaction manager and the Spring consumer can share the same connection. They both get called in sequence and never use the connection in parallel.
By reconfiguring the PooledConnectionFactory to only 1 or 2 connections you save heap memory in the Camel route and you also save resources in the ActiveMQ broker (i.e. heap memory and threads as by default there is a thread per connection created in the broker).
Again I am assuming that the PooledConnectionFactory instance is used by only one Camel route and not shared by multiple routes (which is a scenario I have seen quite often with customers). If you deploy multiple Camel routes then it is indeed recommended to share the same PooledConnectionFactory. However then resize the pools maxConnections for 1 connection * the number of Camel routes.

Summary: For each Camel route you only need to configure a PooledConnectionFactory pool size of 1 connection.

4 comments:

Unknown said...

Thanks a lot for this thorough digging into the transaction semantics in using Camel Torsen. I cannot tell how many times I've been confused with the options available just for Spring & JMS, let alone together with Camel.

In my opinion your post should be linked from both the ActiveMQ and Camel wiki's to form a set of best practices for transaction handling in Camel. Three tumbs up!

Unknown said...

Very interesting post. I learned a lot about Camel with this post.
I have one question, why do you use ActiveMQ PooledConnectionFactory for subscribers (or consumers)?
Is it not enough to use ActiveMQConnectionFactory with VirtualTopics to implement reliable topic subscribers?

Unknown said...

Thanks a lot for this useful post, I didn't not pooledConnectionFactory is required. Can I as you a question about the transaction timeout? I am wondering how do I set a timeout for a transaction. Thanks!!

Ashwini Kuntamukkala said...

Excellent post! Thanks for making time to clarify these issues.