25 Oct 2012

Integrate Log4j Nagios Appender into Karaf/ServiceMix

This is perhaps a post for a rather restricted audience.

As the ServiceMix/Karaf users know, the Pax Logging framework used in Karaf integrates nicely with Apache Log4J out of the box. In fact the Karaf logging configuration file located in etc/org.ops4j.pax.logging.cfg uses Log4J configuration syntax.
Log4J integration comes out of the box in Karad, the Pax Logging Service OSGi bundle includes the core Log4J classes, so that the default Log4J appenders are supported. 

Now there are loads of additional appenders for Log4J available (a non-exhaustive list is here). One such appender is the Log4J Nagios appender, which pushes logging messages to Nagios (via an NSCA server).

I have never used Nagios before but had the task to get this Nagios appender working in Fuse ESB Enterprise 7.0.2. Fuse ESB is based on Apache ServiceMix, so the outlined solution applies to ServiceMix 4.x and Karaf as well.
As things were not dead-simple I take the time to document my solution so that it can hopefully save someone else's time.

To install and setup Nagios, I fired up a Ubuntu VM and installed the nagios and nsca packages using the Ubuntu package manager. With regards to configuring Nagios I followed this article and this document explaining the NSCA server setup. Although I had no previous knowledge on Nagios, I got it setup and running within an hour thanks to the referenced articles.

Turning back to ServiceMix/Karaf. The Nagios Log4J appender comes as a plain jar file (not OSGi enabled).  A possible Nagios Log4j configuration is given at the end of this post.

The problem is that logging is performed by the Pax Logging Service in Karaf. So how do you tell the pax-logging-service system bundle that it should also load the Nagios Log4J appender from a different jar file deployed into Karaf?
There is probably other ways to resolve this but I found it easiest to use the OSGi fragment bundle concept.

In OSGi there is the concept of a fragment bundle. From the OSGi Wiki:
"A Bundle fragment, or simply a fragment, is a bundle whose contents are made available to another bundle (the fragment host). Importantly, fragments share the classloader of their parent bundle."

Its important to note that fragments use the classloader of their parent or host bundle. 
By using a fragment bundle you can extend the classes that can be loaded by the host bundle, without having to modify the OSGi Import-Package list.

With respect to my use case this means: By making the Nagios Lo4J jar file a fragment bundle of the Pax Logging Service bundle, the Pax Logging Service bundle will be able to load the Nagios Log4J appender classes and send logging statements to the Nagios NSCA server.

The Nagios Log4J jar does not contain any OSGi metadata, so I had to manually add these. I extracted the jar file and modified META-INF/MANIFEST.MF to contain these headers

Manifest-Version: 1.0
Ant-Version: Apache Ant 1.8.2
Created-By: 1.6.0_13-b03 (Sun Microsystems Inc.)
Bundle-Name: log4j-nagios
Bundle-SymbolicName: org.apache.log4j.nagios
Bundle-Version: 2.0.0
Bundle-ManifestVersion: 2
Fragment-Host: org.ops4j.pax.logging.pax-logging-service
Export-Package: org.apache.log4j.nagios;version="2.0.0"



Notice the Fragment-Host header, it sets the host to the pax-logging-service OSGi bundle.
Further there is no need to define an Import-Package list as all required Log4J classes will be made available by the host bundle.

I then rebuild the jar file and named it log4j-nagios-appender-2.0.0.osgi.jar.
If you don't want to run these steps manually yourself, you can download the OSGi enabled jar file using the above link.



Deploying this new jar is easy. The perhaps simplest form is to start with a fresh container (having no or an empty data/ folder).
Assuming etc/org.ops4j.pax.logging.cfg already configures for Nagios logging (see example config below) you can simply copy log4j-nagios-appender-2.0.0.osgi.jar to the ServiceMix deploy/ folder and startup ServiceMix.

It may raise the following exception on the first startup

java.lang.ClassNotFoundException: org.apache.log4j.nagios.NagiosAppender not found 
by org.ops4j.pax.logging.pax-logging-service [3]

but you can ignore that. Because the pax-logging-service bundle was started before the fragment Nagios Log4J bundl, Pax Logging is not able to load the Nagios appender right at startup. However when the Nagios Log4J fragment bundle attaches to the pax-logging-service, the Nagios appender classes will get loaded and logging via that appender will start. Messages will get pushed to the NSCA server.
On subsequent restarts of Karaf the bundles are already wired together (i.e. the pax-logging-service knows there is a fragment bundle), so this exception will not be raised anymore.



Hope this helps.




Example org.ops4j.pax.logging.cfg configuration using Nagios appender:

################################################################################
#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
################################################################################

# Root logger
log4j.rootLogger=INFO, out, osgi:* , NAGIOS
log4j.throwableRenderer=org.apache.log4j.OsgiThrowableRenderer

# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %X{bundle.id} - %X{bundle.name} - %X{bundle.version} | %m%n

# File appender
log4j.appender.out=org.apache.log4j.RollingFileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %X{bundle.id} - %X{bundle.name} - %X{bundle.version} | %m%n
log4j.appender.out.file=${karaf.data}/log/karaf.log
log4j.appender.out.append=true
log4j.appender.out.maxFileSize=1MB
log4j.appender.out.maxBackupIndex=10

# Sift appender

log4j.appender.sift=org.apache.log4j.sift.MDCSiftingAppender
log4j.appender.sift.key=bundle.name
log4j.appender.sift.default=karaf
log4j.appender.sift.appender=org.apache.log4j.FileAppender
log4j.appender.sift.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.sift.appender.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %m%n
log4j.appender.sift.appender.file=${karaf.data}/log/$\\{bundle.name\\}.log
log4j.appender.sift.appender.append=true



# Nagios Log4J configuration
# ------------------------------------------------------------

# set the appender for Nagios
log4j.appender.NAGIOS=org.apache.log4j.nagios.NagiosAppender

# Nagios configurations
log4j.appender.NAGIOS.Host=192.168.178.44
log4j.appender.NAGIOS.Port=5667
log4j.appender.NAGIOS.ServiceNameDefault=FuseESB

log4j.appender.NAGIOS.MDCCanonicalHostNameKey=nagios_canonical_hostname


# It may be required to set a Nagios config file if non-default
# data encryption algorithms are used.
log4j.appender.NAGIOS.ConfigFile=/opt/fuse/SMX/fuse-esb-7.0.2.fuse-097/send_nsca.cfg

# mapping warning levels.
log4j.appender.NAGIOS.Log4j_Level_INFO=NAGIOS_OK
log4j.appender.NAGIOS.Log4j_Level_WARN=NAGIOS_WARN
log4j.appender.NAGIOS.Log4j_Level_ERROR=NAGIOS_CRITICAL
log4j.appender.NAGIOS.Log4j_Level_FATAL=NAGIOS_CRITICAL

# set the layout for appender Nagios
log4j.appender.NAGIOS.layout=org.apache.log4j.PatternLayout
log4j.appender.NAGIOS.layout.conversionPattern=server: %X{nagios_canonical_hostname}: %m%n



21 Mar 2012

I have messages on a queue but they don't get dispatched to connected consumer.



Another somewhat tricky lesson learned on ActiveMQ.

Suppose you have a network of two broker instances.
Your JMS clients (i.e. producers and consumers) kind of randomly connect to one of the two brokers at runtime. That way, load is distributed across both brokers.

Because of consumers connecting to a broker randomly you're smart and configure for replayWhenNoConsumers using the following policy configuration


<policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb">
  <networkBridgeFilterFactory>
    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
  <networkBridgeFilterFactory>
<policyEntry>


By using replayWhenNoConsumers messages that were passed along the broker network can be replayed back to the original broker in case a broker has no consumers connected. This is a good thing to do when your consumers randomly pick a broker at runtime and when the number of consumers per destination is rather low. 


Now at runtime you may kind of irregularly observe that on one broker instance a particular queue has a QueueSize greater than 0 and a consumer connected but the messages do not get dispatched to the connected consumer.

In addition when you try to browse the queue using either the ActiveMQ web console or jconsole, no messages are shown. The browse command simply returns an empty set of messages.

Restarting the consumer does not help for these messages to be dispatched. Restarting the broker however does help and all remaining messages get dispatched correctly. 



What's going on?

A broker by default has duplicate detection turned on. This is to prevent a producer from sending the same message twice within a small time window (e.g. due to a network fault and producer reconnect). Its the combination of duplicated detection and replayWhenNoConsumers that is causing this behavior.


Lets use a little example to illustrate this in more detail:

The broker network consists of two broker instances, brokerA and brokerB.

Consider a producer connecting to brokerA and sending a few messages to queue1. Slightly later a consumer connects to brokerB. BrokerA forwards all the messages on queue1 to brokerB. BrokerB dispatches the messages to the connected consumer.

Lets assume the consumer disconnects from brokerB before it has processed all messages. Shortly thereafter the consumer (or a new consumer) connects to brokerA again ready to process any messages on queue1. BrokerA itself does not have any messages on queue1 stored locally, as it passed them all to brokerB.

However thanks to the replayWhenNoConsumers policy configuration, brokerB will replay all messages on queue1 back to brokerA again. Without setting replayWhenNoConsumers messages would not be replayed back to brokerA and instead be stuck on brokerB until a consumer reconnects to brokerB and consumes these messages.

As these messages are replayed back to brokerA within a short time frame, the duplicate detection in the brokers cursor on brokerA detects these messages as duplicates. brokerA has initially received these message (with the same message id from the connected producer). Because they are seen as duplicates, they won't get dispatched to the consumer but these messages are already enqueued on the queue.

You can't browse these messages either because a queue browser is also a JMS client from a brokers point of view and the cursor dispatches the message to a queue browser as well.


The solution is to disable duplicate detection in the cursor


<policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb" enableAudit="false" >
  <networkBridgeFilterFactory>
    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
  <networkBridgeFilterFactory>
<policyEntry>


Disabling duplicate detection will not mark those replayed messages as duplicates. So they get dispatched correctly to any local consumers. The broker additionally has producer side duplicate detection turned on by default in the persistence adapter so that duplicate messages send by external producers due to a failover reconnect will still be detected!


When messages get replayed to a broker over the network bridge, these messages are not duplicates. So disabling duplicate detection will in general not cause any duplicate messages over the bridge. Only "in general", because when the network bridge between the two broker instances dies before a msg got acked, the bridge gets re-established later and the same message would be resent. So in case the bridge dies while messages are replayed back, we could potentially receive duplicate messages.

In an older version of this blog post I suggested to set  auditNetworkProducers=true on the brokers transport connector. However this configuration is known to cause problems in certain broker topologies and will therefore not be the proper solution to this issue.

Rather than setting auditNetworkProducers=true, we can allow the duplicate message that was received via the network bridge to be written to the store. Later, when the cursor tries to dispatch the message to a client it should still detect its a duplicate and from version 5.10 onwards it should move that duplicate to the ActiveMQ.DLQ queue. In older versions of the broker however that duplicate message would be trapped on the queue as the broker does not move duplicate messages to the DLQ. This behaviour got introduced with ENTMQ-496 / AMQ-4952.



Update, 08.04.2016:
I updated the last part of this article and removed the auditNetworkProducers option as it is known to cause problems in certain broker topologies or when used in conjunction with replayWhenNoConsumers=true.

Update, 23.10.2013:
We just found a bug in ActiveMQ versions 5.8.0 to 5.10-SNAPSHOT (but very likely also in older versions) where you could get into the situation described above despite of setting replayWhenNoConsumer=true and enableAudit=false. This bug is logged at ENTMQ-444 and has a JUnit test attached.


1 Mar 2012

Camel JMS with transactions - lessons learned


Note: I have recently (2012-07-16) updated point 1) below to correct a little mistake. If you are revisiting this post, you may want to re-read point 1).

This blog tries to summarize a couple of lessons learned when using the camel-jms component with transactions.

Setting the scene

A typical camel-jms component configuration that configures for transactions could look like follow:


<bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> 
  <property name="configuration" ref="jmsConfig" />
 < /bean>
  
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration" >
  <property name="connectionFactory" ref="jmsPooledConnectionFactory" /> 
  <property name="transacted" value="true" /> 
  <property name="transactionManager" ref="jmsTransactionManager" />
  <property name="cacheLevelName" value="CACHE_CONSUMER" />
 < /bean>

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
  <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
 < /bean>           

<bean id="jmsPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
      init-method="start" destroy-method="stop" >
  <property name="maxConnections" value="2" />
  <property name="connectionFactory" ref="jmsConnectionFactory" />
 < /bean> 

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
  <property name="brokerURL" value="tcp://localhost:61617" /> 
  <property name="watchTopicAdvisories" value="false" />
 < /bean>


This configuration defines a camel-jms component which uses the ActiveMQ PooledConnectionFactory, enables transactions (via transacted=true) and configures an external transaction manager (Spring JmsTransactionManager).  Finally the camel-jms components cache level is set to CACHE_CONSUMER.

Fairly standard stuff so far.


Lessons learned

1) Despite setting a cache level of CACHE_CONSUMER the configured transaction manager creates its own JMS connection and JMS session.

CACHE_CONSUMER is actually the cache level setting for Springs DefaultMessageListenerContainer class (DMLC) which is used by camel-jms internally. Its the DMLC that is responsible for receiving a msg from the broker, starting and committing transactions, etc. Camel simply hooks in as a message listener in the DMLC. So all the low level JMS stuff is handled by Spring.
In the JmsConfiguration bean example above all the specified properties configure Springs DMLC.

At runtime Springs DMLC ensures a new transaction is spawned before an attempt is made to receive one message. As you surely know, the DMLC does not support batched transactions (which is basically the reason why camel-jms does not support it either).

The low level JMS code reads as follows


/* AbstractPollingMessageListenerContainer.java */
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
  throws JMSException {

  if (this.transactionManager != null) {
    // Execute receive within transaction.
    TransactionStatus status =     
      this.transactionManager.getTransaction(this.transactionDefinition);
    boolean messageReceived;
    try {
      messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
    }
    ...



In the Spring XML configuration above we configured for a transaction manager, so this code at first creates a new transaction and then enters into the doReceiveAndExecute() method, which tries to receive a msg and dispatches it to the configured MessageListener (Camel in this case).

The call to getTransaction() is invoked on the configured JmsTransactionManager. The transaction manager does not re-use the already instantiated JMS connection or session but uses the registered connection factory to obtain a new connection and to create a new JMS session. Have a look at JmsTransactionManager.doBegin():



protected void doBegin(Object transaction, TransactionDefinition definition) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                  throw new InvalidIsolationLevelException("JMS does not support an isolation level concept");
            }
            JmsTransactionObject txObject = (JmsTransactionObject) transaction;
            Connection con = null;
            Session session = null;
            try {
                  con = createConnection();
                  session = createSession(con);
                  if (logger.isDebugEnabled()) {
                        logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
                  }
                  txObject.setResourceHolder(new JmsResourceHolder(getConnectionFactory(), con, session));
                  txObject.getResourceHolder().setSynchronizedWithTransaction(true);
                  int timeout = determineTimeout(definition);
                  if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                        txObject.getResourceHolder().setTimeoutInSeconds(timeout);
                  }
                  TransactionSynchronizationManager.bindResource(
                              getConnectionFactory(), txObject.getResourceHolder());
            }
            catch (JMSException ex) {


The created JMS session and connection is then linked to the transaction object.
These session and connection object will not be used for receiving a message from the JMS broker but will only be used to commit the transaction. As part of the cleanup after that commit the session and connection will be cleaned up correctly.
So even though you specify a cacheLevelName of CACHE_CONSUMER in you Canel JmsConfiguration, there is still a new connection being created by the transaction manager for every message. If you did not configure for a pooled ConnectionFactory (unlike the above Camel configuration) you would create a new physical tcp connection into the broker and a JMS session on top of that for every message to be processed in your Camel route.
If you're using the ActiveMQ PooledConnectionFactory, then the transaction manager requests a new connection and a new session from the pool each time, avoiding the overhead of creating a new physical tcp connection into the broker for every new transaction.

Note that for receiving the message and dispatching it to the configured MessageListener (Camel in our case) the DMLC correctly uses the cached consumer. Its only for the transaction management that a new JMS connection and session is created.

After a message got received and dispatched, the transaction commits and the cleanup after the commit calls Connection.close(), which returns the extra connection and session back to the pool again.I have discussed this behavior with the Spring folks in this forum post.
Bottom line: When using a camel-jms configuration as above, its really important to use a pooled ConnectionFactory such as ActiveMQs PooledConnectionFactory or Springs CachingConnectionFactory. Not using a pooled ConnectionFactory means you will open a new JMS connection and session to the broker for every transaction / message to be received. 

2) Springs DefaultMessageListenerContainer allows to use local JMS transactions without setting a transaction manager.

Perhaps this is known to everyone but me. My perception so far was that you need to configure for a transaction manager if you want to use transactions in Spring JMS.
But the following configuration of a DMLC in XML is perfectly valid and works


<bean id="DMLC" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="cachingConnectionFactory" />
  <property name="sessionTransacted" value="true" />
  <property name="destinationName" value="requestQueue" />
  <property name="cacheLevelName" value="CACHE_CONSUMER" />
  <property name="messageListener" ref="msgListener" />
< /bean>


This is a plain Spring JMS configuration example, no Camel involved.
Using this config for setting up a JMS consumer will honor the cache level setting of CACHE_CONSUMER.
This config will still use local JMS transactions but not via an external transaction manager. In this case the DMLC will handle the transaction directly and hence can respect the cache level settings.

Actually the javadoc of Springs AbstractPollingMessageListenerContainer.setTransactionManager() has a little note that mentions this option:

"Note: Consider the use of local JMS transactions instead. Simply switch the "sessionTransacted" flag to "true" in order to use a locally transacted JMS Session for the entire receive processing, including any Session operations performed by a SessionAwareMessageListener (e.g. sending a response message)."
So the problem discussed in 1) could be resolved by not setting an external transaction manager in the camel-jms component configuration.



3) Camel (up to version 2.9) does not allow for using local JMS transactions without an external TX manager set. 

The JmsConfiguration class tries to get the transaction manager and raises an IllegalArgumentException if none was set. The code reads as follows in Camel 2.9:

/* JmsConfiguration.java */

protected void configureMessageListenerContainer(DefaultMessageListenerContainer container,
  JmsEndpoint endpoint) throws Exception {


  ...
  PlatformTransactionManager tm = getTransactionManager();
  if (tm != null) {
    container.setTransactionManager(tm);
  } else if (transacted) {
    throw new IllegalArgumentException("Property transacted is enabled but a transactionManager was not injected!");
  }
  ...
}


The camel-activemq component is slightly different as it creates a default JmsTransactionManager if none is specified. I have raised Improvement CAMEL-5055.

Bottom line: You cannot use transactions in camel-jms without specifying a transaction manager. As a result you should always use a pooled connection factory (see point 1).

Update: As of Camel 2.10 it is now possible to use transactions without specifying a transaction manager in the Camel JmsComponent configuration.



4) Using Camel to setup a transacted durable topic subscriber requires you configure it for its own non shared PooledConnectionFactory instance.

Using the camel-jms configuration shown in the beginning of this post we can define a Camel route as follows

public void configure() throws Exception {

            
  from("jms:topic:TestTopic?clientId=testclient1&durableSubscriptionName=testdurasub1")
  .id("transacted-route")
  .to("log:transacted-route?showAll=true&showBody=true")
  .to(...)


This route definition creates a durable topic subscriber. This is achieved by specifying a clientId on the JMS connection and a durable subscription name on the TopicConsumer. 

In addition the initial Spring config configures the camel-jms component to be transactional using an external transaction manager.  

As outlined in point 1) the external transaction manager bypasses any caching at DMLC level. So the connection is returned to the pool after every usage. The pool will have one connection that has the clientId "testclient1" set. 

Now imagine what happens if the maximum pool size of this PooledConnectionFactory is set to >1 and another Camel route is also using the same PooledConnectionFactory. 
Then both Camel routes will request a connection from the same pool and it is very likely that the DMLC of the above route retrieves a connection that does not have the clientId set. So it uses a different connection than it should for receiving a message from the broker using its durable subscription.

That can lead to various errors. 
If the second Camel route also creates a durable subscriber but with a different client id (and uses transactions) then problems like these can occur:


org.apache.camel.component.jms.JmsMessageListenerContainer WARN  Could not refresh JMS Connection for destination 'test' - retrying in 5000 ms. Cause: Setting clientID on a used Connection is not allowed.


As the second route retrieved the connection that had already got a client id set and tries to set its own client id. The connection to the broker will not succeed.

A durable subscriber cannot share the connection with other consumers because of the client id that is specifically set on the connection. Hence a Camel route that creates a durable subscriber should not share the PooledConnectionFactory with any other routes. 

In addition the connection pools maximum size should be set to 1 connection only so that it always reuses the connection that has the client id set correctly. 

When the durable subscriber is not transacted things are slightly better as then the DMLC cache level settings will be respected and using CACHE_CONNECTION or higher the connection is not returned back to the pool until the route gets shut down (which destroys the DMLC and returns the connection back to the pool). However if the route is restarted later (within the same JVM instance), you would run into the same problem again and get to see the above warning.  

Also related to this is bug MB-1106

Bottom line: 
When using Camel to create durable topic subscribers, always assign a non-shared PooledConnectionFactory with maxConnections=1 to that Camel route. 


5) Set prefetch=1 on the ActiveMQConnectionFactory when using ActiveMQs PooledConnectionFactory with XA transactions.

When using XA transactions across two different transaction resources, then you will most likely need to turn off JMS resource caching in the Spring DMLC configuration by using a cache level name of CACHE_NONE. Caching at DMLC level often does not work correctly with XA transactions.

Using a connection factory that supports pooling, like ActiveMQs PooledConnectionFactory is highly encouraged in this case.
However ActiveMQs PooledConnectionFactory does not support pooling of JMS consumers. So the consumer still gets created and destroyed for every single message to be consumed. When using a default prefetch of 1000 on the ActiveMQConnectionFactory, the broker will try to send up to 1000 messages to the consumer although the consumer will only ever process one message and then get destroyed (by Springs DMLC, due to CACHE_NONE configuration). So in the worst case the broker eagerly prefetches 1000 messages out of which 999 get discarded . This places quite some overhead on the broker and the consumer. By reducing the prefetch to just one message, there won't be any overhead on messages being dispatched eagerly to consumers. This can actually have a significant performance improvement, depending on your use case. If processing of the messages takes much more time than the low level JMS operation, then you may not notice much of a difference. However if you need to route messages from one destination to another as fast as possible without much processing, the performance improvements may be high. In both cases it reduced the overhead of handling prefetched messages inside the broker and consumer.
So the suggestion is to use a brokerUrl with a prefetch of 1, e.g.

failover:(tcp://localhost:61614)?jms.prefetchPolicy.all=1


6) Use Springs CachingConnectionFactory
With the optimization of point 5) we still create a new JMS consumer for every message to be consumed as we don't have consumer pooling in ActiveMQs PooledConnectionFactory.
Creating a new JMS consumer is a synchronous call into the broker. The consumer gets registered in the broker and a response is sent back from the broker. The consumer thread is blocked until it receives the broker's response. If a new consumer is created for every message then the client side thread is waiting for the broker's response quite a significant amount of time.
The following CPU sampler report from jvisualvm for a demo that tries to move msgs from one destination to another as fast as possible using transactions illustrates this (click on image to enlarge):



This report shows that about 16% of the Camel threads overall CPU time is spent on creating the consumers. Out of these 16% the thread waits 14% of the time for a response from the broker on registering the new consumer. For high throughput scenarios this may not be ideal.

Springs CachingConnectionFactory does allow to cache consumers.When caching of consumers is enabled, this overhead of recreating a JMS consumer for every message can be avoided. However this comes at a cost.
You need to be very careful when caching/pooling consumers in the CachingConnectionFactory because of the prefetch. If multiple consumers get created and each consumer gets a couple of messages prefetched then its possible that consumers remain idle in the cache but have prefetched messages that never get processed. This results in the brokers JMX statistics showing a number of messages inflight and an equal number of messages on the queue which never seem to get processed.

If you want to use the CachingConnectionFactory with consumer caching enabled, you then need to either
1) configure each Camel route instance for its own CachingConnectionFactory instance. That is don't share a CachingConnectionFactory between two Camel routes.  In that case you can work with prefetches>1 and get even more performance from prefetching messages eagerly to consumers.
If using concurrentConsumers=x on the camel-jms endpoint configuration then set sessionCacheSize=x on the CachingConnectionFactory as well!
Alternatively
2) use a prefetch=0 with the ActiveMQConnectionFactory that is wrapped inside the CachingConnectionFactory. This turns on polling mode and the broker will never eagerly prefetch messages in advance. Such configuration will probably wipe out the performance improvements that you can get with consumer caching in general as the consumer needs to actively ask the broker for a message and will never have any messages prefetched.

Bottom line is you have to be very careful when caching consumers because of the prefetch. Otherwise you may experience stuck msgs. On the other hand you can achieve good performance improvements in certain high throughput scenarios by caching the consumer.

I highly suggest to test your configuration properly before deploying to production.


Additional updates added later:


7) Even with transacted=false set on the JmsComponent configuration, transactions are used if a transaction manager is configured.

So a configuration like


<bean id="AMQListenerJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  <property name="connectionFactory" ref="AMQPooledConnectionFactory" />
  <property name="transactionManager" ref="SpringJMSTransactionManager" />  
  <property name="transacted" value="false" />
   <property name="concurrentConsumers" value="10" />
   <property name="cacheLevelName" value="CACHE_CONSUMER" />
   <property name="acknowledgementMode" value="1"/>
</bean>

which explicitly sets transacted=false but also configures a transaction manager, will use local JMS transactions.
The reason for that is again in Spring's code which has a check if a transaction manager is configured and then immediately creates a transaction which is later committed.


/* AbstractPollingMessageListenerContainer.java */
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
  throws JMSException {

  if (this.transactionManager != null) {
    // Execute receive within transaction.
    TransactionStatus status =     
      this.transactionManager.getTransaction(this.transactionDefinition);
    boolean messageReceived;
    try {
      messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
    }
    ...
 
So referencing the transaction manager in the JmsComponent configuration is enough to turn on transactions. On the other hand it is not enough to say transacted=false in order to switch off transactions. You need to also remove the transaction manager from the Camel JmsComponent configuration if you want to turn off transactions.
Enabling debug logging for org.springframework.transaction is always a good check whether transactions are used in the Camel route or not. 
If they are used the logging output should periodically log messages like 


DEBUG JmsTransactionManager - Creating new transaction with name [JmsConsumer[test.in]]: 
PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG JmsTransactionManager - Created JMS transaction on Session [PooledSession 
{ ActiveMQSession {id=ID:mac.fritz.box-49502-1364202453607-1:6:1,started=false} }] from 
Connection [PooledConnection { ConnectionPool[ActiveMQConnection {id=ID:mac.fritz.box-49502-1364202453607-1:6,
clientId=ID:mac.fritz.box-49502-1364202453607-0:6,started=false}] }]
DEBUG JmsTransactionManager - Committing JMS transaction on Session [PooledSession 
{ ActiveMQSession {id=ID:mac.fritz.box-49502-1364202453607-1:5:1,started=false} }]

even on an idle route that does not process any messages.

8) Configuring a cacheLevelName >= CACHE_CONNECTION will only use one JMS connection into the broker regardless of how many concurrentConsumers are configured.

Consider this Camel and JMS configuration:


<bean id="AMQListenerJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
   <property name="connectionFactory" ref="AMQPooledConnectionFactory" />
   <property name="transacted" value="false" />
   <property name="concurrentConsumers" value="20" />
   <property name="cacheLevelName" value="CACHE_CONSUMER" />
   <property name="acknowledgementMode" value="1"/>
</bean>


<bean id="AMQPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
   <property name="maxConnections" value="20" />
   <property name="maximumActive" value="20" />
   <property name="connectionFactory" ref="AMQJMSConnectionFactory" />
</bean>
 
which configures for 20 concurrent JMS consumers in the Camel route and also for a connection pool of 20 connections. This sounds reasonable at a first glance as you would think that each concurrent consumer can then use its own JMS connection.
But with a cacheLevelName  > CACHE_NONE this is not the case. Spring internally will ask the PooledConnectionFactory for only one connection (i.e. call PooledConnectionFactory.createConnection() only once). It then creates all JMS sessions and consumers on top of this one connection. This in itself is not a problem as the JMS specification specifically allows multiple JMS sessions to share the same connection. However it then does not make much sense to configure maxConnection=20 on the PooledConnectionFactory if this connection factory is not shared by multiple Camel routes.
You can confirm that only one connection is created in the above configuration by checking the log file for the presence of 

INFO  FailoverTransport - Successfully connected to tcp://localhost:61616

(assuming the failover transport is used which is highly recommended).
There should only be one such line in the log.

Now - to make things more complicated - this changes when you use JMS transaction. Recall point 1) of this post. The transaction manager creates its own connection into the broker and it does not cache any created connection. Instead it calls ConnectionFactory.createConnection() when creating a new transaction (this happens for every new message to be consumed) and calls Connection.close() as part of the cleanup routine after each transaction got committed or rolled back.
In addition the PooledConnectionFactory eagerly fills the pool with another connection on every call to createConnection() until it reaches the configured maxConnections limit. So with the configuration above where maxConnections=20, the transaction manager will cause 20 connections being created in the pool as it subsequently calls PooledConnectionFactory.createConnection(). So 20 active connections get created in the pool but only two of these will be used at any time. One connection will be used by the transaction manager and another connection by the Spring consumer. The Spring consumer will use the same connection over its lifetime (due to a cache level > CACHE_NONE) but the tx manager will request a new connection from the pool for every new message.
The effect is that 20 connections are made into the broker and only two of them are ever used at the same time. The Camel log should contain 20 lines of
INFO  FailoverTransport - Successfully connected to tcp://localhost:61616

Therefore the recommendation is to reduce the poolSize to 1 or maximum 2 connections. Using just one connection is fine as then the transaction manager and the Spring consumer can share the same connection. They both get called in sequence and never use the connection in parallel.
By reconfiguring the PooledConnectionFactory to only 1 or 2 connections you save heap memory in the Camel route and you also save resources in the ActiveMQ broker (i.e. heap memory and threads as by default there is a thread per connection created in the broker).
Again I am assuming that the PooledConnectionFactory instance is used by only one Camel route and not shared by multiple routes (which is a scenario I have seen quite often with customers). If you deploy multiple Camel routes then it is indeed recommended to share the same PooledConnectionFactory. However then resize the pools maxConnections for 1 connection * the number of Camel routes.

Summary: For each Camel route you only need to configure a PooledConnectionFactory pool size of 1 connection.