14 Nov 2011

Consuming Topic messages in Spring and Camel


When using a Spring to consume topic messages, make sure to cache the JMS consumer somewhere. Otherwise your consumer may not receive all messages.

Consider the following Camel route definition that consumes messages from a JMS topic.

<!-- Can be any JMS broker really, e.g ActiveMQ -->
<bean id="JmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
  <property name="serverUrl" value="tcp://localhost:61616” />
</bean>
<bean id="SingleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory” >
  <property name="targetConnectionFactory" ref="JmsConnectionFactory" />
  <property name="reconnectOnException" value="true" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" >
  <property name="connectionFactory" ref="SingleConnectionFactory" />
  <property name="cacheLevelName" value="CACHE_SESSION" />
  <property name="acknowledgementMode" value="1" />
</bean>

<camelContext xmlns="http://camel.apache.org/schema/spring" trace="true" >
  <route id="Consume-From-JMS">
    <from uri="jms:topic:Test.Topic?deliveryPersistent=true" />
    <to uri="whatever"/>
    ...
  </route>
</camelContext>

The camel-jms component is configured to use a Springs SingleConnectionFactory and a cache level of CACHE_SESSION. This combination will cause problems!

When using Springs SingleConnectionFactory, it will not cache the consumer. This ConnectionFactory only reuses the same connection but does not cache any other JMS resources on top of the connection (i.e. session and consumer).
At the same time Springs DefaultMessageListenerContainer (which is used by Camel to consume messages from a JMS broker) does not cache the consumer either as its cache level is set to CACHE_SESSION. The result of this configuration is that a new JMS consumer instance get created by Springs DMLC prior to requesting the next message from the JMS broker. After the message has been dispatched and processed, the consumer is destroyed (and unregistered from the broker).
In addition a JMS topic works differently from a JMS queue in the way that if there is no subscriber registered, the broker will discard the topic message. As with the above configuration the JMS consumer gets recreated for every message by Spring (and registered in the JMS broker), there are certain time windows when the broker does not have a topic subscriber registered and so it discards the messages it receives. The result will be that your Spring JMS topic consumer will not receive all of the messages from the broker.

When consuming topic messages in Spring, it is necessary to cache the JMS consumer. Either by using a connection factory that supports consumer caching like the Spring CachingConnectionFactory or by configuring the cache level of the DMLC to use CACHE_CONSUMER.
Please note that the ActiveMQ PooledConnectionFactory does not cache consumers!

This only applies to topic consumers in Spring. Topic producers will not use these cache level settings and hence don't have this problem.

7 Nov 2011

Loosing messages despite of sending within a transaction


Last week I made an interesting observation. I was able to loose persistent queue messages that were sent in a transaction to an ActiveMQ broker.
I believe its best to share the findings.

SETTING THE SCENE


Suppose you have a MessageProducer that sends messages to an ActiveMQ broker inside a transaction. This producer could be anything, a plain Java JMS Producer client, a Spring JMS producer or as in my case a Camel route. It also does not matter if the producer sends one or more messages within the same transaction.

Next the ActiveMQ broker instance is configured for a specific limit. In my test that limit was fairly low in order to reproduce the problem quickly. In addition I did set the property sendFailIfNoSpace so that the broker raises an exception back to the producer once any of its global limits have been reached. So the configuration I used reads

<systemUsage>
  <systemUsage sendFailIfNoSpace="true">
    <memoryUsage>
       <memoryUsage limit="10 mb"/>
     </memoryUsage>
     <storeUsage>
       <storeUsage limit="64 mb"/>
     </storeUsage>
     <tempUsage>
        <tempUsage limit="10 mb"/>
     </tempUsage>
  </systemUsage>
</systemUsage>

Without sendFailIfNoSpace="true" ActiveMQ would block the producer until additional space is available on the broker. This would most likely not reproduce the problem I am talking about.

I ran the ActiveMQ broker with these settings, kicked off my Camel route and let it do what it can do best: route messages (which are finally sent to a queue on my ActiveMQ broker). Btw, I did not have any consumers attached to the broker; only the Camel route producer was connected. Still the same problem can occur with slow consumers.
And finally my producer did not register an exception listener. In my tests the producer was a Camel route and Camel currently does not register an exception listener.

THE PROBLEM


Now, the problem starts when the broker reaches its configured limits. Because of the property sendFailIfNoSpace="true", it will not block the producer indefinitely but throw an exception back to the producer. In my test I was hitting the storeUsage limit of 64 MB.

On the other hand when sending messages in a transaction, then the actual send is done asynchronously. That means the thread that is sending the message does not wait for the ack from the broker. It’s the transport thread that receives the broker’s ack and deals with it. Using an async send in the case of running inside a transaction is done for performance reasons. By not waiting for the broker ack, you can send messages more quickly to the broker. Until the transaction finally commits the messages received by the broker will not be placed onto the brokers queue.

Putting this together the following happened in my test:
-       The producer started a new transaction. This information was sent to the broker, the broker accepted the new transaction.
-       The producer then sent a message to the broker. As this send was within an existing transaction it was done async. The call to send the message returned immediately after sending the message. It did not wait for the broker’s ack.
-       The broker did no accept this message but raised a "javax.jms.ResourceAllocationException: Persistent store is Full, 100% of 67108864" back to the producer.
-       The producers transport thread received the exception but didn’t know how to handle it. Instead it printed the following debug log message:
DEBUG - ActiveMQConnection  - Async exception with no exception listener: javax.jms.ResourceAllocationException: Persistent store is Full, 100% of 67108864. Stopping producer (ID:Mac.local-51375-1320685288867-2:1:1:1) to prevent flooding queue://TEST.IN.
It flagged the fact that there was an error but it did not mark the current transaction to be rolled back. This would have been the job of an exception listener, but none was registered.
-       The producer then committed the transaction. From the producers point of view no errors did happen (it never became aware of the JMSException) so it asked the broker to commit the transaction.
-       The broker committed the transaction just fine although the message did not get stored on the broker’s queue. From the brokers point of view all went fine. The producer did start a new transaction, it then sent some msgs, which the broker rejected and then the broker got asked to commit the transaction. All fine from the brokers point of view. It’s the producer’s task to deal with the JMSException and to decide whether to rollback or not.
-       The message(s) that got sent inside the transaction is (are) lost!

It does not necessarily require an ActiveMQ broker that has reached its configured limits to run into this problem. If the transport for any reason fails to send the msg to the broker but works again on the transaction commit, then the same problem could arise. This is less likely to occur though. Transport related problems (e.g. connection loss) will most of the time affect the ability to commit the transaction as well (and not only the individual message send). When there is an exception during commit, it will correctly roll back the entire transaction.

You would think that using JMS Transactions would guard you against message loss. In general that is correct. However you need to make sure that any errors that can occur within the transaction are being dealt with. In this scenario, there was no exception listener registered in the producer that would deal with the JMSException. So I lost messages because I did not deal with the exception returned from the async send.
When using Camel as a message producer, you currently cannot easily register an exception listener, which then ensures the transaction gets rolled back in case of any problems.

Luckily there are some simple solutions to this problem:

THE SOLUTION


.. is to either

1) Question if you really need to send your messages inside a transaction. When sending only one persistent message at a time inside a transaction, there is not much benefit over sending the message without a transaction. Persistent messages are sent synchronously by default so the producer waits for the brokers ack before proceeding and it can deal directly with any error returned from the broker. Transactions are very useful though when sending multiple messages that need to all succeed or fail as one atomic operation.

2) If you need to use transactions for sending your messages to the broker, then configure the ConnectionFactory to always send messages synchronously rather than using the default async mode. Changing to sync send will have a small performance impact, particularly when sending a batch of messages within one transaction. This performance impact however is probably negligible in most scenarios.
You can switch to using a sync send by using the alwaysSyncSend=true property on the ActiveMQConnectionFactor, e.g. for a Spring configuration:


<bean id="AMQJMSConnectionFactory3" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="failover:(tcp://localhost:61620)" />
  <property name="alwaysSyncSend" value="true"/>
</bean>

Alternatively, set jms.alwaysSyncSend=true on the broker URL, e.g.
  tcp://localhost:61616?jms.alwaysSyncSend=true, or
  failover:(tcp://localhost:61616)?jms.alwaysSyncSend=true


3) Register a JMS exception listener programmatically so that it reacts on any exceptions thrown by the broker in case of sending the message async. For a plain Java JMS client you can use
ActiveMQConnectionFactory connectionFactory = 
  new ActiveMQConnectionFactory(brokerUrl); connectionFactory.setExceptionListener(new ExceptionListener() {

  public void onException(JMSException ex) {
    // handle the exception
  }
});

You can also set this exception listener on an existing JMS Connection object.
The somewhat tricky part may be to figure out which transaction needs to be rolled back, in case of having multiple threads sending messages concurrently to the broker.

I already mentioned this option is not easily done in case where the producer is a Camel route. We hope to improve Camel to have an out of the box exception listener registered in future. Till then CAMEL-4616
captures this request.


If you want to be on the safe side, the go with option #2 and ensure a sync send of your messages.