27 Feb 2017

How to add a JMS appender to the Pax Logging Service in JBoss Fuse?

Ever wondered if its possible to publish JBoss Fuse container logging messages to the embedded ActiveMQ broker?

Well,.... it is.

This solution has been verified on JBoss Fuse 6.3.0. The Pax Logging Service in JBoss Fuse is configured to use Log4J Logging internally.
Technically it should be possible to configure the Log4J JMSAppender but that may be difficult to configure due to class loading issues.

The Apache ServiceMix project offers a JMS Appender for Pax Logging that is easier to use and that is used in this solution.

As this solution utilizes two bundle provided by Apache ServiceMix which we don't ship, these bundle are not supported by Red Hat (since they are not shipped out of the box and hence are not certified). As a consequence we cannot fully support this solution.

Instructions

  • In your JBoss Fuse installation edit etc/system.properties and append these two lines at the end, replacing the values with your admin
    username and password defined in etc/users.properties
    activemq.jms.user=admin
    activemq.jms.password=admin
    

  • Next, edit etc/org.ops4j.pax.logging.cfg and replace
    log4j.rootLogger = INFO, out, osgi:VmLogAppender
    with
    log4j.rootLogger = INFO, out, osgi:VmLogAppender, osgi:JMSLogAppender

  • Start JBoss Fuse and use the Karaf shell for the remaining commands

  • The default broker name in JBoss Fuse is called amq, so lets create a configuration needed by the Apache ServiceMix bundle where we set this broker name
    config:edit org.apache.servicemix.activemq.service
    config:propset broker-name amq
    config:update
    
    This configuration will be used by a bundle to be deployed later. If you assign a different broker name in
    etc/io.fabric8.mq.fabric.server-broker.cfg, then you need to assign the same broker name in above config:propset Karaf command.

  • Uninstall the JMS 2.0 API and install JMS 1.1 API.
    There should be a more elegant way to do this but I have not found one yet.
    osgi:list -l -t 0 | grep jms-api/2.0.1
    and note the bundle id (199 typically)
    osgi:uninstall 199
    features:install jms-spec
    features:uninstall jms-spec
    features:install jms-spec/1.1
    packages:exports | grep javax.jms
    
    The last command should only output the JMS 1.1 API, e.g.
    294 javax.jms; version=1.1.0

  • While the previous sequence of Karaf commands correctly installs the JMS 1.1 API, all JMS related bundles are stopped.
    You could manually restart them but the quickest way is to simply restart the JBoss Fuse container. This is only needed due to the sequence of Karaf commands in previous step, its not needed any more later.

  • Install the Pax Logging bundles
    osgi:install -s mvn:org.apache.servicemix.logging/jms-appender/6.1.3
    osgi:install -s mvn:org.apache.servicemix.activemq/org.apache.servicemix.activemq.service/6.1.3
    

  • check the broker statistics using
    activemq:dstat
    It should show a topic called Logging.Events with a number of messages enqueued. You can attach a topic subscriber to this topic to receive logging events.


If you further want to configure the JMS Topic name used by this Logger, you can run
config:edit org.apache.servicemix.logging.jms
config:propset destinationName LogEvents
config:update
Any new logging events will be directed to the new topic destination.
There is currently no way to specify a JMS queue as the destination.

This solution is based on SMX4-1205 but it was difficult to chase down all the details as I also ran into SM-2183.

The source code of this appender can be found at
https://github.com/apache/servicemix/tree/master/logging/jms-appender.

13 Apr 2016

Starving JMS consumers when not setting destination limits in ActiveMQ


My last post is fairly old, limited time is the reason.

This blog post could get two titles. The title above but also "Stuck messages in ActiveMQ, not getting dispatched to active consumer". Both can be symptoms of the same problem I will try to discuss in this post.

In my day-to-day job I analyse many ActiveMQ related problems and have seen a number of weird, unusual behaviours that are often caused by users not knowing the side effects of their ActiveMQ tuning activities. But by the following discovery I was also very surprised.

When you download and extract one of the later (including latest) versions of Apache ActiveMQ or JBoss A-MQ and look at the out of the box configuration, then you will notice, the <destintionPolicy> section of the broker configuration does not configure any destination limits anymore. Older versions did configure these limits out of the box.


I always was a supporter of this configuration. Why do you want to restrict every queue cursor to enforce a particular memory limit if most of the time your destinations have small backlogs? If a backlog accumulates on a particular queue, its better to use the brokers full <memoryUsage> to cache messages in memory irrespective of the destination in order to dispatch them quickly when needed. This also allows to better utilize the brokers <memoryUsage>, queues on which a backlog builds up, can use the brokers memory, queues that have no backlog obviously don't need the memory at the moment. If the back log grows too high or if backlogs build up on too many queues, the broker will enforce the overall <memoryUsage> limit across all destinations. So from this point of view setting no destination limits make perfect sense.

However, we lately discovered a not so exotic use case where not setting a destination limit caused problems. Here are the details:

The Problem
We initially reproduced the problem in a test case that may be less likely to mirror a real production environment. However this test case makes it easier to explain the situation. In that test we only used two JMS queues. The broker configuration did not set any destination limits and it does not matter how high the <memoryUsage> limit is set to. The higher the limit the more messages are needed in the test but it can be reproduced with every <memoryUsage> limit. We used KahaDB as the persistence store.

The broker was started with a few messages on the first destination, lets say queue A, stored in KahaDB. As this queue had no consumers attached upon broker start, the messages remained in the store only and did not get loaded into the cursor cache. Note messages only get loaded from store into memory when there is a demand, i.e. when there is is an active consumer.

Now a producer connected to the second queue B and pushed enough messages until 70% of the brokers <memoryUsage> limit got used by queue B. Remember, no destination limits were set, so each destination can use up to the brokers full <memoryUsage> limit. However the StoreQueueCursor used for a JMS queue stops caching more messages in memory, once it reaches 70% (the threshold is configurable via cursorMemoryHighWaterMark). Any additional messages received from a producer are written to the store only but not accepted by the cursor. When its time to dispatch these additional messages (i.e. once the cache runs empty again), they will be loaded from the KahaDB store.

So we had a few messages on queue A that were not loaded into memory but only resided in KahaDB and we had a few 10,000 messages on queue B that were all loaded into memory and made the cursor for queue B use 70% of the configured <memoryUsage> limit. Since queue B did not configure for any destination limit, it inherited the limits of the <memoryUsage> and had therefore used 70% of that brokers limit.

However the same applied to all other JMS queues. They also did not set any destination limits and hence also inherited the <memoryUsage> limit of the broker, which was utilized to 70% already (due to queue B).

Since there was no consumer connected to queue B, messages would not get removed from the queue and <memoryUsage> limit would not decline.

Next a consumer connected to queue A, ready to receive messages. The cursor for queue A would typically go to the store now and load maxPageSize (200 by default) number of messages from the persistence store into memory in one batch. Just that it could not do so this time, because 70% of the brokers <memoryUsage> limit were already reached. Again, remember 70% is the tipping point at which the cursor stops accepting or loading more messages into its cache. The cursors own MemoryPercentUsage JMX attribute for queue A was 0% (it had not loaded any messages in memory yet) but the brokers MemoryPercentUsage was already at 70%. The latter condition is enough so that the cursor for queue A cannot load any more messages into memory. The broker needs to protect against running out of memory and needs to enforce its <memoryUsage>. That's why it would load a full maxPageSize (again, 200 by default) number of messages if the MemoryPercentUsage is below 70% but stops loading any messages into memory once the 70% limit got reached.

The result is an active consumer on queue A that does not receive any messages although there is a backlog of message sitting in the persistence store. Unless a consumer drains off some messages on queue B and hence reduces the brokers MemoryPercentUsage below 70%, the cursor for queue A will not be able load any messages from the store. The consumer for queue A gets starved as a result.

A few consequences:
  • If there are multiple consumers on queue A, they will all get starved.
  • If there are other destinations with no messages loaded into memory but messages in the store and active consumers connected, they get starved as well.
  • You don't necessarily need one destination with no consumers that uses 70% of the broker's <memoryUsage> limit. There could be multiple destinations that have no consumers but a larger backlog which sums up to 70% of the brokers <memoryUsage> limit to reproduce the same behaviour..

How to identify this problem?
The following conditions should all be met when you run into this problem:
  • Do you detect consumer(s) that receive no messages despite a queue back log?
  • Does the destination to which the consumer(s) are connected show a MemoryPercentUsage of 0%?
  •  Look at the brokers overall MemoryPercentUsage. Does it match 70% or higher?
  •  Then drill into the JMX MemoryPercentUsage value of the various destinations and check for destinations that use a substantial portion of these 70% and that have no consumers attached.
  • If you find all of these conditions then you may have hit this problem.

How to resolve this situation?
On a running broker you can either connect a consumer or more to queue A and start consuming messages or if you can afford it from a business perspective, purge the queue A. Both should bring the brokers <memoryUsage> below 70% and allow cursors of other destinations to load messages from store into their cursor cache.

Restarting the broker would also help as after the restart messages only get loaded from the store if there are consumers connected. The many messages of queue A won't be loaded unless there is a consumer connected and even then the cursor loads maxPageSize number of messages only in one go (200 as you surely learned by now). The brokers <memoryUsage> should remain well below 70% in this case.

Configuring destination limits would typically also work around the problem. If you know that certain destination may not have any consumers for a while, then perhaps explicitly configure decent memory limits for these destinations so they cannot take the entire brokers <memoryUsage>.

I raised this problem in ENTMQ-1543. However no fix was made as fixing turned out to be very difficult.


Still more?
Yes, as with that much background now, we can come to the second symptom of this problem. Above I talked about one or more destinations with large backlog(s) and no consumers starving consumers of other destinations.

If you think this further, perhaps queue A does have a consumer connected but the consumer is much slower than the rate at which messages get produced. Perhaps it takes a few seconds to process each message (not entirely off the world for certain use cases). Now imagine we have multiple destinations like queue A: slow consumers, large backlog of messages.

These destinations together could use the 70% of the brokers <memoryUsage>. Now think a second about what happens to other destinations that have fast consumers and (almost) no backlog? These destinations could see a high throughput in general. Because of the destinations with slow consumers and large backlogs together constantly reaching 70% of the brokers <memoryUsage> limit, any new messages sent to other destinations with fast consumers and no backlog would not get loaded into the cursor cache of that destination. Its the same problem as above. So these fast consumers don't receive messages until the slow consumers of other destinations have consumed some messages and reduced the broker's <memoryUsage> limit below 70%. In essence these fast consumers do not get starved completely but they get slowed down pretty much to the speed of the slow consumers on other destinations.

I produced a unit test that shows this problem in action. If you are interested, check out the code from demo StuckConsumerActiveMQ and follow the instructions in the test's README.md.


Again the solution to this problem is to set destination limits for at least the destinations that have slow consumers and a large message backlog.

Conclusion
And this would be my general advice as the biggest take away message from this post: If you know you will have destinations with large message backlogs building up for more than just short peak times, then consider configuring destination limits for these queues, in order to avoid the problems discussed here

A very good background read on message cursors in ActiveMQ is this blog post from my colleague here at Red Hat, Christian Posta: ActiveMQ: Understanding Memory Cursors

25 Mar 2014

ActiveMQ and the JMSRedelivered header



This post applies to ActiveMQ version 5.9.0 and below. ActiveMQ 5.9.0 is the latest released version at the time of writing this post. It also applies to JBoss A-MQ 6.0 and below.

The JMSRedelivered header is set on a JMS message to indicate that a redelivery of this message is occurring. Or to use the quote from the JMS 1.1. spec:
"If a client receives a message with the JMSRedelivered indicator set, it is likely, but not guaranteed, that this message was delivered but not acknowledged in the past."
In ActiveMQ messages can be prefetched to consumers. Let say a consumer gets 1000 messages prefetched but for some reason disconnects after consuming and acknowledging 900 messages. When the broker looses the connection to the consumer, it knows that out of the 1000 prefetched messages only 900 were acknowledged, so the remaining 100 messages will get the JMSRedelivered flag set to true to indicate that these messages get redelivered.
Another or the same consumer reconnecting can examine this flag and perhaps process the message differently (e.g. run extra duplicate detection to verify this message has not been processed before).

If however your broker crashes or shuts down before redelivering the 100 messages that have JMSRedelivered=true set, then this redelivery flag will be lost by default. The reason is that ActiveMQ stores a message to persistent storage (e.g. KahaDB) only when it initially receives the message from the producer. It does not write the message to storage thereafter. So the JMSRedelivered flag is set on the JMS message that is held in memory only. After a restart of the broker none of the remaining 100 messages would have JMSRedelivered=true.

For the KahaDB persistence adapter there has been an improvement introduced in ActiveMQ 5.6 as part of AMQ-3519. With this improvement it is possible to configure KahaDB to persist the change to the JMSRedelivered flag for a message in the persistence store, if the consumer is transacted. In case the consumer rolls back the transaction, the broker will re-write the message to the store with an updated redelivery count. This obviously has a performance impact, which should generally be rather low, assuming that most transactions will commit and rollbacks occur only occasionally.

This behavior is not enabled out of the box. To enable it you need to
- set transactedIndividualAck=true on the ConnectionFactory, and
- add rewriteOnRedelivery=true property to the KahaDB persistence store configuration

Again this feature only applies to using the KahaDB persistence store and transacted consumers. Non-transacted consumers cannot benefit from it. Also, if the broker crashes during such rollback (before it rewrites the messages with an updated redelivery count to KahaDB), the broker will also loose the JMSRedelivered information.

Now with AMQ-5068 we raised another improvement. Rather than the JMSRedelivered header only working for transacted consumers, it should work for every consumer, transacted and non-transacted. And it should also remain consistent if the broker crashes. This improvement will be implemented in ActiveMQ 5.10 and will be enabled via the property persistJMSRedelivered="true" on a policyEntry configuration.

This feature (when it gets implemented) will obviously not be enabled by default either. As every single JMS message needs to be rewritten to the store with an updated redelivery count prior to dispatching it to a consumer, this will have a significant performance overhead. We essentially write each message twice to the store (when we receive it from the producer but before we ack the message to the producer and before we dispatch it to the consumer). Writing to the store requires a sync write which typically has a very high cost on performance.

Apparently WebSphere MQ has a similar feature when setting the "Harden get backout" attribute to HARDENBO. See this WebSphere MQ doc. They also issue a warning that this will drastically effect the message throughput.



Summary: As of ActiveMQ 5.9.0 you should not 100%ly rely on the JMSRedelivered header. If the broker crashes, it will currently not restore this redelivery information on a broker restart as such information is currently only held in memory. One exception is the configuration for transacted consumers introduced in AMQ-3519.

23 Oct 2013

"javax.jms.TransactionInProgressException: Cannot rollback() inside an XASession"

The following problem has come up repeatedly among ActiveMQ/Camel/Karaf users so I want to capture it here in this post.

Most of the time the use case is a Camel route deployed into Karaf consuming messages using XA transaction from an ActiveMQ broker (using Aries as the TX manager). A possible example route can be found in the Camel-WMQ-AMQ-XA-TX demo on my fuse-demos github repoository. Deploying this demo to Fuse ESB Enterprise 7.1 will immediately reproduce the problem.

When Camel tries to commit the XA transaction it raises the following warning:
14:58:37,063 | WARN  | Consumer[ESB_IN] | PooledSession | 122 - org.apache.activemq.activemq-pool - 5.7.0.fuse-71-047 | 
Caught exception trying rollback() when putting session back into the pool, will invalidate. 
javax.jms.TransactionInProgressException: Cannot rollback() inside an XASession
    at org.apache.activemq.ActiveMQXASession.rollback(ActiveMQXASession.java:76)[125:org.apache.activemq.activemq-core:5.7.0.fuse-71-047]
    at org.apache.activemq.pool.PooledSession.close(PooledSession.java:120)[122:org.apache.activemq.activemq-pool:5.7.0.fuse-71-047]
    at org.springframework.jms.connection.JmsResourceHolder.closeAll(JmsResourceHolder.java:193)
    at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.releaseResource(ConnectionFactoryUtils.java:412)
    at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.releaseResource(ConnectionFactoryUtils.java:1)
    at org.springframework.transaction.support.ResourceHolderSynchronization.beforeCompletion(ResourceHolderSynchronization.java:72)
    at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerBeforeCompletion(TransactionSynchronizationUtils.java:106)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerBeforeCompletion(AbstractPlatformTransactionManager.java:940)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:738)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:723)
    at org.apache.aries.transaction.GeronimoPlatformTransactionManager.commit(GeronimoPlatformTransactionManager.java:76)
    at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_65]
    at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_65]
    at org.apache.aries.proxy.impl.ProxyHandler$1.invoke(ProxyHandler.java:54)[13:org.apache.aries.proxy.impl:1.0.0]
    at org.apache.aries.proxy.impl.ProxyHandler.invoke(ProxyHandler.java:119)[13:org.apache.aries.proxy.impl:1.0.0]
    at com.sun.proxy.$Proxy74.commit(Unknown Source)[148:org.springframework.transaction:3.0.7.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)[153:org.springframework.jms:3.0.7.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)[153:org.springframework.jms:3.0.7.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)[153:org.springframework.jms:3.0.7.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)[153:org.springframework.jms:3.0.7.RELEASE]
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)[:1.6.0_65]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)[:1.6.0_65]
    at java.lang.Thread.run(Thread.java:695)[:1.6.0_65]


This problem is caused by a bug in activemq-pool of version 5.7.0 (and most likely earlier versions). It is raised under ENTMQ-441.

Version 5.8.0 of activemq-pool has this bug fixed already. So anyone running into this problem, check if you're using ActiveMQ 5.7.0 on the client side. If so, upgrade to ActiveMQ 5.8.0 on the client. JBoss Fuse 6.0 uses ActiveMQ 5.8.0.

If you are running on Fuse ESB Enterprise 7.x and you cannot upgrade to JBoss Fuse 6.0, then a possible solution is to replace the activemq-pool/5.7.0 bundle with version 5.8.0. You can follow these steps:

  • osgi:stop <bundleid of your Camel route doing XA>

  • osgi:list -l -t 0 | grep activemq-pool.
    Take note of the bundle id

  • osgi:uninstall <bundleid>

  • As the 5.8.0 version of activemq-pool is not an OSGi bundle, we need to wrap it accordingly
      osgi:install 'wrap:mvn:org.apache.activemq/activemq-pool/5.8.0 \
    $Bundle-SymbolicName=org.apache.activemq.activemq-pool&Bundle-Version=5.8.0& \
    Export-Package=org.apache.activemq.pool;version=5.8.0'

  • osgi:refresh <bundleid of your Camel route doing XA>

  • osgi:start <bundleid of your Camel route doing XA>


This should rewire your Camel bundle to use the new activemq-pool jar. If you still see the exception, run a packages:imports and verify if it pulls in activemq-pool from the new jar file. It may be that instead it now pulls in from activemq-spring.
In this case I suggest to change the maven-bundle-plugin configuration and explicitly require activemq-pool to be of version 5.8.0 or higher, using e.g. the following configuration

  org.apache.felix
  maven-bundle-plugin
  true
  
    
      ...
      *, org.apache.activemq.pool;version="[5.8.0,6)" 
    
  



You will need to rebuild and redeploy the Camel route.

2 Aug 2013

Creating ActiveMQ Broker cluster topologies using Fuse Fabric


Fuse Fabric is a very powerful open source integration platform (iPaaS) developed by Red Hat (former FuseSource). If you don't know what Fabric is, have a look at the project web site: http://fuse.fusesource.org/fabric/.
Introducing Fabric in this post would mean to simply repeat the documentation of the Fabric web site, so I'll spare that.

Instead, this blog post will attempt to deep dive a bit into the fabric:mq-create command. I hope this post will be useful to anyone who uses Fabric and wants to setup highly dynamic broker networks using Fabric. Let's get right into it...



The fabric:mq-create command allows to create broker instances that run in their own OSGi containers. For help on general usage type
fabric:mq-create --help


In Fabric every broker has a broker name and is part of a broker group. The broker name is the last argument specified on the mq-create command and names an individual broker. The group name is specified using the --group argument. If that argument is omitted, the broker gets the default group name called 'default' assigned. These concepts of broker names and group names are important as the next sections will show.
fabric:mq-create creates a new Fabric profile that contains the specified broker configuration. The profile name is the same as the broker name which is specified as the last argument to mq-create. This profile can then be easily deployed to one or multiple containers.
The fabric:mq-create command is quite powerful and can also be used to create more complex broker topologies. Here is a quick walk-through of some of the most common topologies.


Single Master/Slave Broker Pair


For high availability of a broker one may want to configure a master/slave broker pair. The main idea in Fabric is that two broker instances that use the same group name and the same broker name will form a master/slave pair. Using this idea a master/slave pair can be created using
fabric:mq-create --create-container broker1,broker2 MasterSlaveBroker

This command creates a Fabric profile called MasterSlaveBroker. in this profile it configures an ActiveMQ broker with the same name MasterSlaveBroker. As no --group argument was specified the group name will be 'default'. The option --create-container broker1,broker2 also deploys this profile to two new OSGi container instances called broker1 and broker2. As both container instances deploy the very same profile, they will instantiate a broker with the same broker name and the same group name and as such form a master/slave pair.
Note: The option --create-container is really optional. Its also possible to simply create the Fabric profile first using mq-create and then in a later step deploy this profile to different container using fabric:container-create-* commands.

Broker network with two interconnected Master/Slave pairs


Extending the previous topology to create two broker pairs that are interconnected using a network connector. Each pair consists of a master and slave broker instance. This can be created by invoking
fabric:mq-create
    --group network-brokers1
    --create-container broker1_1,broker1_2
    --networks network-brokers2 
    --networks-password admin
    --networks-username admin
    MasterSlaveBroker1

fabric:mq-create
    --group network-brokers2
    --create-container broker2_1,broker2_2
    --networks network-brokers1 
    --networks-password admin
    --networks-username admin
    MasterSlaveBroker2

These commands create two Fabric profiles. Each profile configures an ActiveMQ broker with the names MasterSlaveBroker1 and MasterSlaveBroker2. Each broker configuration also sets a group name so that the broker instances will be part of that group. Further the MasterSlaveBroker1 that is part of the group network-brokers1 configures a network connector to the broker instances in the group network-brokers2 and vice versa.
Using --create-container we instantiate two OSGi container in each command that each deploy the relevant Fabric profile and create the broker instances. The two container in each --create-container argument will form a master/slave broker pair as they both use the same broker name (either MasterSlaveBroker1 or MasterSlaveBroker2) and the same group name (either network-brokers1 or network-brokers2).
By default the brokers created by mq-create are secured and require authentication. So when configuring the network bridge it is then necessary to supply username and password credentials in order to successfully establish a network bridge. Username and password are supplied using the arguments --networks-password admin --networks-username admin.
Altogether these two commands create four broker instances out of which only two will be master broker instances, the other two will be slave instance. Each master/slave pair belongs to one broker group. A network bridge between the two master will be established in both directions. If one of the master broker dies or gets shut down, the slave broker will take over within a few seconds and the network bridges will get re-established from and to the new master broker.

Fully connected Broker Mesh


The above example sets up a master/slave pair for each broker group, where only one instance is active at a time. Its also possible to configure for multiple active broker instances within the same group. For the broker instance to be active independent of other instances it simply needs to have a unique broker name within the same group. These instances can also be networked using a full mesh topology.
When running
fabric:mq-create
    --group BrokerClusterMesh
    --networks BrokerClusterMesh
    --create-container MeshBroker1 
    --networks-password admin
    --networks-username admin
    BrokerClusterMesh1

fabric:mq-create
    --group BrokerClusterMesh
    --networks BrokerClusterMesh
    --create-container MeshBroker2 
    --networks-password admin
    --networks-username admin
    BrokerClusterMesh2

it will again create two broker profiles named BrokerClusterMesh1 and BrokerClusterMesh2. Each profile configures an ActiveMQ broker. Both broker configurations are part of the same group BrokerClusterMesh. Using --create-container, each profile gets deployed to exactly one OSGi container. Since both broker instances have their own broker name configured (BrokerClusterMesh1 and BrokerClusterMesh2) they will be both active broker instances within the same group BrokerClusterMesh. Using --network BrokerClusterMesh a network bridge is configured in each broker that points to its own network group name. This in essence will create a network bridge from each broker to all the other broker instances within the same group and form a full mesh topology. In this example only two broker instances get created so its a fairly small mesh. However you can easily add another broker to the group, e.g. by running
fabric:mq-create
    --group BrokerClusterMesh 
    --networks BrokerClusterMesh 
    --create-container MeshBroker3 
    --networks-password admin 
    --networks-username admin
    BrokerClusterMesh3

and all broker instances (the new one that is introduced as well as the two existing instances) will each reconfigure their network connectors to connect to all the other broker instances in this group. So a full mesh of 3 broker instances gets created. This mesh can be expanded with additional instances if needed. Once a new instance is introduced all broker instances reconfigure their network bridges accordingly.
Note: Its generally not advisable to create large broker meshes (e.g. > 4 broker instances) as depending on the use case it will cause some chatter on advisory messages that these broker instances exchange.

Full Broker Mesh with Master/Slave pair on each Broker


Combining the last two use cases its also possible to configure a full broker mesh where each broker consists of a master/slave pair. This is achieved using the following commands:
fabric:mq-create
    --group BrokerClusterMeshWithSlave 
    --networks BrokerClusterMeshWithSlave 
    --create-container MeshBroker1,MeshBroker2 
    --networks-password admin 
    --networks-username admin 
    BrokerClusterMeshWithSlave1

fabric:mq-create 
    --group BrokerClusterMeshWithSlave 
    --networks BrokerClusterMeshWithSlave 
    --create-container MeshBroker3,MeshBroker4 
    --networks-password admin 
    --networks-username admin 
    BrokerClusterMeshWithSlave2

This command differs from the previous example only in the --create-container argument. The previous example deploys the broker configuration to only one container. Now we deploy each broker configuration to two containers. Each container within --create-container will use exactly the same broker configuration (i.e. the same broker name and the same group name) and both instances will therefore form a master/slave broker pair. Each master broker will create a network bridge to all other active broker instances within the same network group.
Its of course possible to add additional master/slave pairs to this broker group if needed and all active broker instances (i.e. master broker instances) will reconfigure their network bridge dynamically as new brokers enter or leave the network group. To add another master/slave broker pair to the mesh you can simply run
fabric:mq-create 
    --group BrokerClusterMeshWithSlave 
    --networks BrokerClusterMeshWithSlave 
    --create-container MeshBroker5,MeshBroker6 
    --networks-password admin 
    --networks-username admin 
    BrokerClusterMeshWithSlave3


Additional notes


- Each of the broker profiles that get created by the above mq-create examples all create a profile that has the profile mq-base as the parent.
- The profile mq-base contains a broker.xml configuration file which serves as the basis broker configuration for all the broker instances created above. So you could upfront adjust this broker.xml in mq-base to your needs (e.g. configure systemUsage and policy entries) and then create your broker instances using mq-create and they will all leverage this configuration.
- The broker configuration in mq-base does not configure any network bridges. The configuration about network bridges is not stored in the broker.xml when using mq-create. Instead it is stored in the configuration file org.fusesource.mq.fabric.server-.properties that is created when running mq-create. This file stores all network connector related configuration using the 'network.' keyword. E.g.
network.password=admin
network=BrokerClusterMeshWithSlave
group=BrokerClusterMeshWithSlave
network.userName=admin
broker-name=BrokerClusterMeshWithSlave1

The network bridge gets created and configured based on this configuration and not based on activemq.xml! This allows for fairly dynamic configuration changes at runtime.
- The network connector [configuration](http://activemq.apache.org/networks-of-brokers.html) of ActiveMQ allows further fine grained configuration. This configuration can be added manually to the configuration file org.fusesource.mq.fabric.server-.properties in each profile by prefixing the configuration property with 'network.'. E.g. in order to set the property decreaseNetworkConsumerPriority=true on the network connector, one can simply add
network.decreaseNetworkConsumerPriority=true

to org.fusesource.mq.fabric.server-.properties. Likewise with all the other network connector properties.

JMS connections are leaked when stopping a Camel route that consumes from JMS using Springs SingleConnectionFactory

I came across this interesting problem today and want to capture it.
Suppose this simple Camel route, consuming from ActiveMQ and logging the message
<camelcontext id="camelContext" xmlns="http://camel.apache.org/schema/spring">
  <route id="jms-consumer4">
    <from uri="amq:queue:Test" />
    <to uri="log:JMSConsumer?level=INFO&showBody=true" />
  </route>
</camelcontext>

<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amq">
  <property name="connectionFactory" ref="singleCF" />
  <property name="useSingleConnection" value="true" />
  <property name="usePooledConnection" value="false" />
  <property name="preserveMessageQos" value="true" />
</bean>

<bean class="org.springframework.jms.connection.SingleConnectionFactory" id="singleCF">
  <property name="targetConnectionFactory" ref="AMQCF" />
  <property name="reconnectOnException" value="true" />
</bean>

<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AMQCF">
  <property name="userName" value="admin" />
  <property name="password" value="admin" />
  <property name="brokerURL" value="tcp://localhost:61616" />
  <property name="copyMessageOnSend" value="false" />
  <property name="useAsyncSend" value="true" />
</bean>

After deploying this to ServiceMix (JBoss Fuse 6.0 in my test) it works just fine. The problem occurs when stopping this Camel route (via an osgi:stop of the corresponding bundle). The connection into the ActiveMQ broker does not get closed! Drilling into the JMX view of the broker the connection is still registered in JMX. Even worse each restart and stop of the Camel route leaks another connection.

I did a bit of root cause analysis and found:
When the Camel route is stopped it calls into SingleConnectionFactory.destroy(). This cleans up the internally held ActiveMQConnection. At this stage the connection is properly removed from the broker's JMX view, which you will probably only notice when debugging through the code.
However the Spring JMS listener used by the Camel JMS consumer is still running and it detects that the connection is down and tries to transparently reconnect. This calls into SingleConnectionFactory.createConnection again (full stack trace below [1]). The SingleConnectionFactory does happily reopen a new connection into the broker which then remains registered in JMX and open although the route gets stopped.


So how to resolve this?

Instead of using Springs SingleConnectionFactory I recommend to use ActiveMQs PooledConnectionFactory instead. So the above Camel route configuration becomes

<bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amq">
  <property name="connectionFactory" ref="pooledCF" />
  <property name="useSingleConnection" value="true" />
  <property name="usePooledConnection" value="false" />
  <property name="preserveMessageQos" value="true" />
</bean>

<bean class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop" id="pooledCF" init-method="start">
  <property name="connectionFactory" ref="AMQCF" />
  <property name="maxConnections" value="1" />
</bean>

<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AMQCF">
  <property name="userName" value="admin" />
  <property name="password" value="admin" />
  <property name="brokerURL" value="tcp://localhost:61616" />
  <property name="copyMessageOnSend" value="false" />
  <property name="useAsyncSend" value="true" />
</bean>

When using the ActiveMQs PooledConnectionFactory instead, things behave pretty much the same with one subtle but important difference.
Similar to above, stopping the Camel bundle calls into PooledConnectionFactory.stop(). This internally closes all ActiveMQConnections (only one in this example case but potentially more) which also unregisters the connection from the brokers JMX view. Now, Springs JMS listener used by the Camel JMS consumer is still running and detects the connection closure and tries to transparently reconnect. This calls into PooledConnectionFactory.createConnection(). This implementation however contains the following check:
if (stopped.get()) {
  LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
  return null;
}


AtomicBoolean stopped will be set to true and no new connection is established!
Springs SingleConnectionFactory does not have this logic. It happily reopens a new connection after it got destroyed. Please note the properties init-method="start" destroy-method="stop" on the PooledConnectionFactory bean definition are important as otherwise you may also leak connections when shutting down your bundles.
[1]
Daemon Thread [Camel (camelContext) thread #21 - JmsConsumer[EwdTest1]] 
(Suspended (breakpoint at line 280 in org.springframework.jms.connection.SingleConnectionFactory))
owns: java.lang.Object (id=9254)
owns: java.lang.Object (id=9255)
owns: java.lang.Object (id=9286)
org.springframework.jms.connection.SingleConnectionFactory.initConnection() line: 280
org.springframework.jms.connection.SingleConnectionFactory.createConnection() line: 225
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.support.JmsAccessor).createConnection() line: 184
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.AbstractJmsListeningContainer).createSharedConnection() line: 404
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.AbstractJmsListeningContainer).refreshSharedConnection() line: 389
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.DefaultMessageListenerContainer).refreshConnectionUntilSuccessful() line: 869
org.apache.camel.component.jms.DefaultJmsMessageListenerContainer(org.springframework.jms.listener.DefaultMessageListenerContainer).recoverAfterListenerSetupFailure() line: 851
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run() line: 982
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(java.lang.Runnable) line: 895
java.util.concurrent.ThreadPoolExecutor$Worker.run() line: 918
java.lang.Thread.run() line: 680

9 Apr 2013

How to change colors in Karaf shell console?


Karaf uses colored output, this is great!!
E.g. running log:tail or log:display prints the karaf log in different colors depending on the log level of the log message.

However sometimes the colors used by default don't work nicely on your terminals background.
You could change the background  of the terminal but would it not be nicer to simple reconfigure the color codes used by Karaf?

Of course this is possible.
Simply add something like the following to $KARAF_HOME/etc/org.apache.karaf.log.cfg


#
# ANSI Colors
#
fatalColor = 31;1
errorColor = 31;1
warnColor = 35
infoColor = 36
debugColor = 39
traceColor = 39


and restart your Karaf/ServiceMix/Fuse ESB Enterprise/JBoss Fuse container.
How do these values map to actual colors? See http://en.wikipedia.org/wiki/ANSI_escape_code#graphics, in particular the section on colors.

Appending a ";1" to the color value renders the text in bold as well. Other formatting options such as italic or blinking don't seem to be supported.






25 Oct 2012

Integrate Log4j Nagios Appender into Karaf/ServiceMix

This is perhaps a post for a rather restricted audience.

As the ServiceMix/Karaf users know, the Pax Logging framework used in Karaf integrates nicely with Apache Log4J out of the box. In fact the Karaf logging configuration file located in etc/org.ops4j.pax.logging.cfg uses Log4J configuration syntax.
Log4J integration comes out of the box in Karad, the Pax Logging Service OSGi bundle includes the core Log4J classes, so that the default Log4J appenders are supported. 

Now there are loads of additional appenders for Log4J available (a non-exhaustive list is here). One such appender is the Log4J Nagios appender, which pushes logging messages to Nagios (via an NSCA server).

I have never used Nagios before but had the task to get this Nagios appender working in Fuse ESB Enterprise 7.0.2. Fuse ESB is based on Apache ServiceMix, so the outlined solution applies to ServiceMix 4.x and Karaf as well.
As things were not dead-simple I take the time to document my solution so that it can hopefully save someone else's time.

To install and setup Nagios, I fired up a Ubuntu VM and installed the nagios and nsca packages using the Ubuntu package manager. With regards to configuring Nagios I followed this article and this document explaining the NSCA server setup. Although I had no previous knowledge on Nagios, I got it setup and running within an hour thanks to the referenced articles.

Turning back to ServiceMix/Karaf. The Nagios Log4J appender comes as a plain jar file (not OSGi enabled).  A possible Nagios Log4j configuration is given at the end of this post.

The problem is that logging is performed by the Pax Logging Service in Karaf. So how do you tell the pax-logging-service system bundle that it should also load the Nagios Log4J appender from a different jar file deployed into Karaf?
There is probably other ways to resolve this but I found it easiest to use the OSGi fragment bundle concept.

In OSGi there is the concept of a fragment bundle. From the OSGi Wiki:
"A Bundle fragment, or simply a fragment, is a bundle whose contents are made available to another bundle (the fragment host). Importantly, fragments share the classloader of their parent bundle."

Its important to note that fragments use the classloader of their parent or host bundle. 
By using a fragment bundle you can extend the classes that can be loaded by the host bundle, without having to modify the OSGi Import-Package list.

With respect to my use case this means: By making the Nagios Lo4J jar file a fragment bundle of the Pax Logging Service bundle, the Pax Logging Service bundle will be able to load the Nagios Log4J appender classes and send logging statements to the Nagios NSCA server.

The Nagios Log4J jar does not contain any OSGi metadata, so I had to manually add these. I extracted the jar file and modified META-INF/MANIFEST.MF to contain these headers

Manifest-Version: 1.0
Ant-Version: Apache Ant 1.8.2
Created-By: 1.6.0_13-b03 (Sun Microsystems Inc.)
Bundle-Name: log4j-nagios
Bundle-SymbolicName: org.apache.log4j.nagios
Bundle-Version: 2.0.0
Bundle-ManifestVersion: 2
Fragment-Host: org.ops4j.pax.logging.pax-logging-service
Export-Package: org.apache.log4j.nagios;version="2.0.0"



Notice the Fragment-Host header, it sets the host to the pax-logging-service OSGi bundle.
Further there is no need to define an Import-Package list as all required Log4J classes will be made available by the host bundle.

I then rebuild the jar file and named it log4j-nagios-appender-2.0.0.osgi.jar.
If you don't want to run these steps manually yourself, you can download the OSGi enabled jar file using the above link.



Deploying this new jar is easy. The perhaps simplest form is to start with a fresh container (having no or an empty data/ folder).
Assuming etc/org.ops4j.pax.logging.cfg already configures for Nagios logging (see example config below) you can simply copy log4j-nagios-appender-2.0.0.osgi.jar to the ServiceMix deploy/ folder and startup ServiceMix.

It may raise the following exception on the first startup

java.lang.ClassNotFoundException: org.apache.log4j.nagios.NagiosAppender not found 
by org.ops4j.pax.logging.pax-logging-service [3]

but you can ignore that. Because the pax-logging-service bundle was started before the fragment Nagios Log4J bundl, Pax Logging is not able to load the Nagios appender right at startup. However when the Nagios Log4J fragment bundle attaches to the pax-logging-service, the Nagios appender classes will get loaded and logging via that appender will start. Messages will get pushed to the NSCA server.
On subsequent restarts of Karaf the bundles are already wired together (i.e. the pax-logging-service knows there is a fragment bundle), so this exception will not be raised anymore.



Hope this helps.




Example org.ops4j.pax.logging.cfg configuration using Nagios appender:

################################################################################
#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
################################################################################

# Root logger
log4j.rootLogger=INFO, out, osgi:* , NAGIOS
log4j.throwableRenderer=org.apache.log4j.OsgiThrowableRenderer

# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %X{bundle.id} - %X{bundle.name} - %X{bundle.version} | %m%n

# File appender
log4j.appender.out=org.apache.log4j.RollingFileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %X{bundle.id} - %X{bundle.name} - %X{bundle.version} | %m%n
log4j.appender.out.file=${karaf.data}/log/karaf.log
log4j.appender.out.append=true
log4j.appender.out.maxFileSize=1MB
log4j.appender.out.maxBackupIndex=10

# Sift appender

log4j.appender.sift=org.apache.log4j.sift.MDCSiftingAppender
log4j.appender.sift.key=bundle.name
log4j.appender.sift.default=karaf
log4j.appender.sift.appender=org.apache.log4j.FileAppender
log4j.appender.sift.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.sift.appender.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %m%n
log4j.appender.sift.appender.file=${karaf.data}/log/$\\{bundle.name\\}.log
log4j.appender.sift.appender.append=true



# Nagios Log4J configuration
# ------------------------------------------------------------

# set the appender for Nagios
log4j.appender.NAGIOS=org.apache.log4j.nagios.NagiosAppender

# Nagios configurations
log4j.appender.NAGIOS.Host=192.168.178.44
log4j.appender.NAGIOS.Port=5667
log4j.appender.NAGIOS.ServiceNameDefault=FuseESB

log4j.appender.NAGIOS.MDCCanonicalHostNameKey=nagios_canonical_hostname


# It may be required to set a Nagios config file if non-default
# data encryption algorithms are used.
log4j.appender.NAGIOS.ConfigFile=/opt/fuse/SMX/fuse-esb-7.0.2.fuse-097/send_nsca.cfg

# mapping warning levels.
log4j.appender.NAGIOS.Log4j_Level_INFO=NAGIOS_OK
log4j.appender.NAGIOS.Log4j_Level_WARN=NAGIOS_WARN
log4j.appender.NAGIOS.Log4j_Level_ERROR=NAGIOS_CRITICAL
log4j.appender.NAGIOS.Log4j_Level_FATAL=NAGIOS_CRITICAL

# set the layout for appender Nagios
log4j.appender.NAGIOS.layout=org.apache.log4j.PatternLayout
log4j.appender.NAGIOS.layout.conversionPattern=server: %X{nagios_canonical_hostname}: %m%n



21 Mar 2012

I have messages on a queue but they don't get dispatched to connected consumer.



Another somewhat tricky lesson learned on ActiveMQ.

Suppose you have a network of two broker instances.
Your JMS clients (i.e. producers and consumers) kind of randomly connect to one of the two brokers at runtime. That way, load is distributed across both brokers.

Because of consumers connecting to a broker randomly you're smart and configure for replayWhenNoConsumers using the following policy configuration


<policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb">
  <networkBridgeFilterFactory>
    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
  <networkBridgeFilterFactory>
<policyEntry>


By using replayWhenNoConsumers messages that were passed along the broker network can be replayed back to the original broker in case a broker has no consumers connected. This is a good thing to do when your consumers randomly pick a broker at runtime and when the number of consumers per destination is rather low. 


Now at runtime you may kind of irregularly observe that on one broker instance a particular queue has a QueueSize greater than 0 and a consumer connected but the messages do not get dispatched to the connected consumer.

In addition when you try to browse the queue using either the ActiveMQ web console or jconsole, no messages are shown. The browse command simply returns an empty set of messages.

Restarting the consumer does not help for these messages to be dispatched. Restarting the broker however does help and all remaining messages get dispatched correctly. 



What's going on?

A broker by default has duplicate detection turned on. This is to prevent a producer from sending the same message twice within a small time window (e.g. due to a network fault and producer reconnect). Its the combination of duplicated detection and replayWhenNoConsumers that is causing this behavior.


Lets use a little example to illustrate this in more detail:

The broker network consists of two broker instances, brokerA and brokerB.

Consider a producer connecting to brokerA and sending a few messages to queue1. Slightly later a consumer connects to brokerB. BrokerA forwards all the messages on queue1 to brokerB. BrokerB dispatches the messages to the connected consumer.

Lets assume the consumer disconnects from brokerB before it has processed all messages. Shortly thereafter the consumer (or a new consumer) connects to brokerA again ready to process any messages on queue1. BrokerA itself does not have any messages on queue1 stored locally, as it passed them all to brokerB.

However thanks to the replayWhenNoConsumers policy configuration, brokerB will replay all messages on queue1 back to brokerA again. Without setting replayWhenNoConsumers messages would not be replayed back to brokerA and instead be stuck on brokerB until a consumer reconnects to brokerB and consumes these messages.

As these messages are replayed back to brokerA within a short time frame, the duplicate detection in the brokers cursor on brokerA detects these messages as duplicates. brokerA has initially received these message (with the same message id from the connected producer). Because they are seen as duplicates, they won't get dispatched to the consumer but these messages are already enqueued on the queue.

You can't browse these messages either because a queue browser is also a JMS client from a brokers point of view and the cursor dispatches the message to a queue browser as well.


The solution is to disable duplicate detection in the cursor


<policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb" enableAudit="false" >
  <networkBridgeFilterFactory>
    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" />
  <networkBridgeFilterFactory>
<policyEntry>


Disabling duplicate detection will not mark those replayed messages as duplicates. So they get dispatched correctly to any local consumers. The broker additionally has producer side duplicate detection turned on by default in the persistence adapter so that duplicate messages send by external producers due to a failover reconnect will still be detected!


When messages get replayed to a broker over the network bridge, these messages are not duplicates. So disabling duplicate detection will in general not cause any duplicate messages over the bridge. Only "in general", because when the network bridge between the two broker instances dies before a msg got acked, the bridge gets re-established later and the same message would be resent. So in case the bridge dies while messages are replayed back, we could potentially receive duplicate messages.

In an older version of this blog post I suggested to set  auditNetworkProducers=true on the brokers transport connector. However this configuration is known to cause problems in certain broker topologies and will therefore not be the proper solution to this issue.

Rather than setting auditNetworkProducers=true, we can allow the duplicate message that was received via the network bridge to be written to the store. Later, when the cursor tries to dispatch the message to a client it should still detect its a duplicate and from version 5.10 onwards it should move that duplicate to the ActiveMQ.DLQ queue. In older versions of the broker however that duplicate message would be trapped on the queue as the broker does not move duplicate messages to the DLQ. This behaviour got introduced with ENTMQ-496 / AMQ-4952.



Update, 08.04.2016:
I updated the last part of this article and removed the auditNetworkProducers option as it is known to cause problems in certain broker topologies or when used in conjunction with replayWhenNoConsumers=true.

Update, 23.10.2013:
We just found a bug in ActiveMQ versions 5.8.0 to 5.10-SNAPSHOT (but very likely also in older versions) where you could get into the situation described above despite of setting replayWhenNoConsumer=true and enableAudit=false. This bug is logged at ENTMQ-444 and has a JUnit test attached.


1 Mar 2012

Camel JMS with transactions - lessons learned


Note: I have recently (2012-07-16) updated point 1) below to correct a little mistake. If you are revisiting this post, you may want to re-read point 1).

This blog tries to summarize a couple of lessons learned when using the camel-jms component with transactions.

Setting the scene

A typical camel-jms component configuration that configures for transactions could look like follow:


<bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> 
  <property name="configuration" ref="jmsConfig" />
 < /bean>
  
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration" >
  <property name="connectionFactory" ref="jmsPooledConnectionFactory" /> 
  <property name="transacted" value="true" /> 
  <property name="transactionManager" ref="jmsTransactionManager" />
  <property name="cacheLevelName" value="CACHE_CONSUMER" />
 < /bean>

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
  <property name="connectionFactory" ref="jmsPooledConnectionFactory" />
 < /bean>           

<bean id="jmsPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
      init-method="start" destroy-method="stop" >
  <property name="maxConnections" value="2" />
  <property name="connectionFactory" ref="jmsConnectionFactory" />
 < /bean> 

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
  <property name="brokerURL" value="tcp://localhost:61617" /> 
  <property name="watchTopicAdvisories" value="false" />
 < /bean>


This configuration defines a camel-jms component which uses the ActiveMQ PooledConnectionFactory, enables transactions (via transacted=true) and configures an external transaction manager (Spring JmsTransactionManager).  Finally the camel-jms components cache level is set to CACHE_CONSUMER.

Fairly standard stuff so far.


Lessons learned

1) Despite setting a cache level of CACHE_CONSUMER the configured transaction manager creates its own JMS connection and JMS session.

CACHE_CONSUMER is actually the cache level setting for Springs DefaultMessageListenerContainer class (DMLC) which is used by camel-jms internally. Its the DMLC that is responsible for receiving a msg from the broker, starting and committing transactions, etc. Camel simply hooks in as a message listener in the DMLC. So all the low level JMS stuff is handled by Spring.
In the JmsConfiguration bean example above all the specified properties configure Springs DMLC.

At runtime Springs DMLC ensures a new transaction is spawned before an attempt is made to receive one message. As you surely know, the DMLC does not support batched transactions (which is basically the reason why camel-jms does not support it either).

The low level JMS code reads as follows


/* AbstractPollingMessageListenerContainer.java */
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
  throws JMSException {

  if (this.transactionManager != null) {
    // Execute receive within transaction.
    TransactionStatus status =     
      this.transactionManager.getTransaction(this.transactionDefinition);
    boolean messageReceived;
    try {
      messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
    }
    ...



In the Spring XML configuration above we configured for a transaction manager, so this code at first creates a new transaction and then enters into the doReceiveAndExecute() method, which tries to receive a msg and dispatches it to the configured MessageListener (Camel in this case).

The call to getTransaction() is invoked on the configured JmsTransactionManager. The transaction manager does not re-use the already instantiated JMS connection or session but uses the registered connection factory to obtain a new connection and to create a new JMS session. Have a look at JmsTransactionManager.doBegin():



protected void doBegin(Object transaction, TransactionDefinition definition) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                  throw new InvalidIsolationLevelException("JMS does not support an isolation level concept");
            }
            JmsTransactionObject txObject = (JmsTransactionObject) transaction;
            Connection con = null;
            Session session = null;
            try {
                  con = createConnection();
                  session = createSession(con);
                  if (logger.isDebugEnabled()) {
                        logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
                  }
                  txObject.setResourceHolder(new JmsResourceHolder(getConnectionFactory(), con, session));
                  txObject.getResourceHolder().setSynchronizedWithTransaction(true);
                  int timeout = determineTimeout(definition);
                  if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                        txObject.getResourceHolder().setTimeoutInSeconds(timeout);
                  }
                  TransactionSynchronizationManager.bindResource(
                              getConnectionFactory(), txObject.getResourceHolder());
            }
            catch (JMSException ex) {


The created JMS session and connection is then linked to the transaction object.
These session and connection object will not be used for receiving a message from the JMS broker but will only be used to commit the transaction. As part of the cleanup after that commit the session and connection will be cleaned up correctly.
So even though you specify a cacheLevelName of CACHE_CONSUMER in you Canel JmsConfiguration, there is still a new connection being created by the transaction manager for every message. If you did not configure for a pooled ConnectionFactory (unlike the above Camel configuration) you would create a new physical tcp connection into the broker and a JMS session on top of that for every message to be processed in your Camel route.
If you're using the ActiveMQ PooledConnectionFactory, then the transaction manager requests a new connection and a new session from the pool each time, avoiding the overhead of creating a new physical tcp connection into the broker for every new transaction.

Note that for receiving the message and dispatching it to the configured MessageListener (Camel in our case) the DMLC correctly uses the cached consumer. Its only for the transaction management that a new JMS connection and session is created.

After a message got received and dispatched, the transaction commits and the cleanup after the commit calls Connection.close(), which returns the extra connection and session back to the pool again.I have discussed this behavior with the Spring folks in this forum post.
Bottom line: When using a camel-jms configuration as above, its really important to use a pooled ConnectionFactory such as ActiveMQs PooledConnectionFactory or Springs CachingConnectionFactory. Not using a pooled ConnectionFactory means you will open a new JMS connection and session to the broker for every transaction / message to be received. 

2) Springs DefaultMessageListenerContainer allows to use local JMS transactions without setting a transaction manager.

Perhaps this is known to everyone but me. My perception so far was that you need to configure for a transaction manager if you want to use transactions in Spring JMS.
But the following configuration of a DMLC in XML is perfectly valid and works


<bean id="DMLC" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="cachingConnectionFactory" />
  <property name="sessionTransacted" value="true" />
  <property name="destinationName" value="requestQueue" />
  <property name="cacheLevelName" value="CACHE_CONSUMER" />
  <property name="messageListener" ref="msgListener" />
< /bean>


This is a plain Spring JMS configuration example, no Camel involved.
Using this config for setting up a JMS consumer will honor the cache level setting of CACHE_CONSUMER.
This config will still use local JMS transactions but not via an external transaction manager. In this case the DMLC will handle the transaction directly and hence can respect the cache level settings.

Actually the javadoc of Springs AbstractPollingMessageListenerContainer.setTransactionManager() has a little note that mentions this option:

"Note: Consider the use of local JMS transactions instead. Simply switch the "sessionTransacted" flag to "true" in order to use a locally transacted JMS Session for the entire receive processing, including any Session operations performed by a SessionAwareMessageListener (e.g. sending a response message)."
So the problem discussed in 1) could be resolved by not setting an external transaction manager in the camel-jms component configuration.



3) Camel (up to version 2.9) does not allow for using local JMS transactions without an external TX manager set. 

The JmsConfiguration class tries to get the transaction manager and raises an IllegalArgumentException if none was set. The code reads as follows in Camel 2.9:

/* JmsConfiguration.java */

protected void configureMessageListenerContainer(DefaultMessageListenerContainer container,
  JmsEndpoint endpoint) throws Exception {


  ...
  PlatformTransactionManager tm = getTransactionManager();
  if (tm != null) {
    container.setTransactionManager(tm);
  } else if (transacted) {
    throw new IllegalArgumentException("Property transacted is enabled but a transactionManager was not injected!");
  }
  ...
}


The camel-activemq component is slightly different as it creates a default JmsTransactionManager if none is specified. I have raised Improvement CAMEL-5055.

Bottom line: You cannot use transactions in camel-jms without specifying a transaction manager. As a result you should always use a pooled connection factory (see point 1).

Update: As of Camel 2.10 it is now possible to use transactions without specifying a transaction manager in the Camel JmsComponent configuration.



4) Using Camel to setup a transacted durable topic subscriber requires you configure it for its own non shared PooledConnectionFactory instance.

Using the camel-jms configuration shown in the beginning of this post we can define a Camel route as follows

public void configure() throws Exception {

            
  from("jms:topic:TestTopic?clientId=testclient1&durableSubscriptionName=testdurasub1")
  .id("transacted-route")
  .to("log:transacted-route?showAll=true&showBody=true")
  .to(...)


This route definition creates a durable topic subscriber. This is achieved by specifying a clientId on the JMS connection and a durable subscription name on the TopicConsumer. 

In addition the initial Spring config configures the camel-jms component to be transactional using an external transaction manager.  

As outlined in point 1) the external transaction manager bypasses any caching at DMLC level. So the connection is returned to the pool after every usage. The pool will have one connection that has the clientId "testclient1" set. 

Now imagine what happens if the maximum pool size of this PooledConnectionFactory is set to >1 and another Camel route is also using the same PooledConnectionFactory. 
Then both Camel routes will request a connection from the same pool and it is very likely that the DMLC of the above route retrieves a connection that does not have the clientId set. So it uses a different connection than it should for receiving a message from the broker using its durable subscription.

That can lead to various errors. 
If the second Camel route also creates a durable subscriber but with a different client id (and uses transactions) then problems like these can occur:


org.apache.camel.component.jms.JmsMessageListenerContainer WARN  Could not refresh JMS Connection for destination 'test' - retrying in 5000 ms. Cause: Setting clientID on a used Connection is not allowed.


As the second route retrieved the connection that had already got a client id set and tries to set its own client id. The connection to the broker will not succeed.

A durable subscriber cannot share the connection with other consumers because of the client id that is specifically set on the connection. Hence a Camel route that creates a durable subscriber should not share the PooledConnectionFactory with any other routes. 

In addition the connection pools maximum size should be set to 1 connection only so that it always reuses the connection that has the client id set correctly. 

When the durable subscriber is not transacted things are slightly better as then the DMLC cache level settings will be respected and using CACHE_CONNECTION or higher the connection is not returned back to the pool until the route gets shut down (which destroys the DMLC and returns the connection back to the pool). However if the route is restarted later (within the same JVM instance), you would run into the same problem again and get to see the above warning.  

Also related to this is bug MB-1106

Bottom line: 
When using Camel to create durable topic subscribers, always assign a non-shared PooledConnectionFactory with maxConnections=1 to that Camel route. 


5) Set prefetch=1 on the ActiveMQConnectionFactory when using ActiveMQs PooledConnectionFactory with XA transactions.

When using XA transactions across two different transaction resources, then you will most likely need to turn off JMS resource caching in the Spring DMLC configuration by using a cache level name of CACHE_NONE. Caching at DMLC level often does not work correctly with XA transactions.

Using a connection factory that supports pooling, like ActiveMQs PooledConnectionFactory is highly encouraged in this case.
However ActiveMQs PooledConnectionFactory does not support pooling of JMS consumers. So the consumer still gets created and destroyed for every single message to be consumed. When using a default prefetch of 1000 on the ActiveMQConnectionFactory, the broker will try to send up to 1000 messages to the consumer although the consumer will only ever process one message and then get destroyed (by Springs DMLC, due to CACHE_NONE configuration). So in the worst case the broker eagerly prefetches 1000 messages out of which 999 get discarded . This places quite some overhead on the broker and the consumer. By reducing the prefetch to just one message, there won't be any overhead on messages being dispatched eagerly to consumers. This can actually have a significant performance improvement, depending on your use case. If processing of the messages takes much more time than the low level JMS operation, then you may not notice much of a difference. However if you need to route messages from one destination to another as fast as possible without much processing, the performance improvements may be high. In both cases it reduced the overhead of handling prefetched messages inside the broker and consumer.
So the suggestion is to use a brokerUrl with a prefetch of 1, e.g.

failover:(tcp://localhost:61614)?jms.prefetchPolicy.all=1


6) Use Springs CachingConnectionFactory
With the optimization of point 5) we still create a new JMS consumer for every message to be consumed as we don't have consumer pooling in ActiveMQs PooledConnectionFactory.
Creating a new JMS consumer is a synchronous call into the broker. The consumer gets registered in the broker and a response is sent back from the broker. The consumer thread is blocked until it receives the broker's response. If a new consumer is created for every message then the client side thread is waiting for the broker's response quite a significant amount of time.
The following CPU sampler report from jvisualvm for a demo that tries to move msgs from one destination to another as fast as possible using transactions illustrates this (click on image to enlarge):



This report shows that about 16% of the Camel threads overall CPU time is spent on creating the consumers. Out of these 16% the thread waits 14% of the time for a response from the broker on registering the new consumer. For high throughput scenarios this may not be ideal.

Springs CachingConnectionFactory does allow to cache consumers.When caching of consumers is enabled, this overhead of recreating a JMS consumer for every message can be avoided. However this comes at a cost.
You need to be very careful when caching/pooling consumers in the CachingConnectionFactory because of the prefetch. If multiple consumers get created and each consumer gets a couple of messages prefetched then its possible that consumers remain idle in the cache but have prefetched messages that never get processed. This results in the brokers JMX statistics showing a number of messages inflight and an equal number of messages on the queue which never seem to get processed.

If you want to use the CachingConnectionFactory with consumer caching enabled, you then need to either
1) configure each Camel route instance for its own CachingConnectionFactory instance. That is don't share a CachingConnectionFactory between two Camel routes.  In that case you can work with prefetches>1 and get even more performance from prefetching messages eagerly to consumers.
If using concurrentConsumers=x on the camel-jms endpoint configuration then set sessionCacheSize=x on the CachingConnectionFactory as well!
Alternatively
2) use a prefetch=0 with the ActiveMQConnectionFactory that is wrapped inside the CachingConnectionFactory. This turns on polling mode and the broker will never eagerly prefetch messages in advance. Such configuration will probably wipe out the performance improvements that you can get with consumer caching in general as the consumer needs to actively ask the broker for a message and will never have any messages prefetched.

Bottom line is you have to be very careful when caching consumers because of the prefetch. Otherwise you may experience stuck msgs. On the other hand you can achieve good performance improvements in certain high throughput scenarios by caching the consumer.

I highly suggest to test your configuration properly before deploying to production.


Additional updates added later:


7) Even with transacted=false set on the JmsComponent configuration, transactions are used if a transaction manager is configured.

So a configuration like


<bean id="AMQListenerJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  <property name="connectionFactory" ref="AMQPooledConnectionFactory" />
  <property name="transactionManager" ref="SpringJMSTransactionManager" />  
  <property name="transacted" value="false" />
   <property name="concurrentConsumers" value="10" />
   <property name="cacheLevelName" value="CACHE_CONSUMER" />
   <property name="acknowledgementMode" value="1"/>
</bean>

which explicitly sets transacted=false but also configures a transaction manager, will use local JMS transactions.
The reason for that is again in Spring's code which has a check if a transaction manager is configured and then immediately creates a transaction which is later committed.


/* AbstractPollingMessageListenerContainer.java */
protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
  throws JMSException {

  if (this.transactionManager != null) {
    // Execute receive within transaction.
    TransactionStatus status =     
      this.transactionManager.getTransaction(this.transactionDefinition);
    boolean messageReceived;
    try {
      messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
    }
    ...
 
So referencing the transaction manager in the JmsComponent configuration is enough to turn on transactions. On the other hand it is not enough to say transacted=false in order to switch off transactions. You need to also remove the transaction manager from the Camel JmsComponent configuration if you want to turn off transactions.
Enabling debug logging for org.springframework.transaction is always a good check whether transactions are used in the Camel route or not. 
If they are used the logging output should periodically log messages like 


DEBUG JmsTransactionManager - Creating new transaction with name [JmsConsumer[test.in]]: 
PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG JmsTransactionManager - Created JMS transaction on Session [PooledSession 
{ ActiveMQSession {id=ID:mac.fritz.box-49502-1364202453607-1:6:1,started=false} }] from 
Connection [PooledConnection { ConnectionPool[ActiveMQConnection {id=ID:mac.fritz.box-49502-1364202453607-1:6,
clientId=ID:mac.fritz.box-49502-1364202453607-0:6,started=false}] }]
DEBUG JmsTransactionManager - Committing JMS transaction on Session [PooledSession 
{ ActiveMQSession {id=ID:mac.fritz.box-49502-1364202453607-1:5:1,started=false} }]

even on an idle route that does not process any messages.

8) Configuring a cacheLevelName >= CACHE_CONNECTION will only use one JMS connection into the broker regardless of how many concurrentConsumers are configured.

Consider this Camel and JMS configuration:


<bean id="AMQListenerJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
   <property name="connectionFactory" ref="AMQPooledConnectionFactory" />
   <property name="transacted" value="false" />
   <property name="concurrentConsumers" value="20" />
   <property name="cacheLevelName" value="CACHE_CONSUMER" />
   <property name="acknowledgementMode" value="1"/>
</bean>


<bean id="AMQPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
   <property name="maxConnections" value="20" />
   <property name="maximumActive" value="20" />
   <property name="connectionFactory" ref="AMQJMSConnectionFactory" />
</bean>
 
which configures for 20 concurrent JMS consumers in the Camel route and also for a connection pool of 20 connections. This sounds reasonable at a first glance as you would think that each concurrent consumer can then use its own JMS connection.
But with a cacheLevelName  > CACHE_NONE this is not the case. Spring internally will ask the PooledConnectionFactory for only one connection (i.e. call PooledConnectionFactory.createConnection() only once). It then creates all JMS sessions and consumers on top of this one connection. This in itself is not a problem as the JMS specification specifically allows multiple JMS sessions to share the same connection. However it then does not make much sense to configure maxConnection=20 on the PooledConnectionFactory if this connection factory is not shared by multiple Camel routes.
You can confirm that only one connection is created in the above configuration by checking the log file for the presence of 

INFO  FailoverTransport - Successfully connected to tcp://localhost:61616

(assuming the failover transport is used which is highly recommended).
There should only be one such line in the log.

Now - to make things more complicated - this changes when you use JMS transaction. Recall point 1) of this post. The transaction manager creates its own connection into the broker and it does not cache any created connection. Instead it calls ConnectionFactory.createConnection() when creating a new transaction (this happens for every new message to be consumed) and calls Connection.close() as part of the cleanup routine after each transaction got committed or rolled back.
In addition the PooledConnectionFactory eagerly fills the pool with another connection on every call to createConnection() until it reaches the configured maxConnections limit. So with the configuration above where maxConnections=20, the transaction manager will cause 20 connections being created in the pool as it subsequently calls PooledConnectionFactory.createConnection(). So 20 active connections get created in the pool but only two of these will be used at any time. One connection will be used by the transaction manager and another connection by the Spring consumer. The Spring consumer will use the same connection over its lifetime (due to a cache level > CACHE_NONE) but the tx manager will request a new connection from the pool for every new message.
The effect is that 20 connections are made into the broker and only two of them are ever used at the same time. The Camel log should contain 20 lines of
INFO  FailoverTransport - Successfully connected to tcp://localhost:61616

Therefore the recommendation is to reduce the poolSize to 1 or maximum 2 connections. Using just one connection is fine as then the transaction manager and the Spring consumer can share the same connection. They both get called in sequence and never use the connection in parallel.
By reconfiguring the PooledConnectionFactory to only 1 or 2 connections you save heap memory in the Camel route and you also save resources in the ActiveMQ broker (i.e. heap memory and threads as by default there is a thread per connection created in the broker).
Again I am assuming that the PooledConnectionFactory instance is used by only one Camel route and not shared by multiple routes (which is a scenario I have seen quite often with customers). If you deploy multiple Camel routes then it is indeed recommended to share the same PooledConnectionFactory. However then resize the pools maxConnections for 1 connection * the number of Camel routes.

Summary: For each Camel route you only need to configure a PooledConnectionFactory pool size of 1 connection.