14 Nov 2011

Consuming Topic messages in Spring and Camel


When using a Spring to consume topic messages, make sure to cache the JMS consumer somewhere. Otherwise your consumer may not receive all messages.

Consider the following Camel route definition that consumes messages from a JMS topic.

<!-- Can be any JMS broker really, e.g ActiveMQ -->
<bean id="JmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
  <property name="serverUrl" value="tcp://localhost:61616” />
</bean>
<bean id="SingleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory” >
  <property name="targetConnectionFactory" ref="JmsConnectionFactory" />
  <property name="reconnectOnException" value="true" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" >
  <property name="connectionFactory" ref="SingleConnectionFactory" />
  <property name="cacheLevelName" value="CACHE_SESSION" />
  <property name="acknowledgementMode" value="1" />
</bean>

<camelContext xmlns="http://camel.apache.org/schema/spring" trace="true" >
  <route id="Consume-From-JMS">
    <from uri="jms:topic:Test.Topic?deliveryPersistent=true" />
    <to uri="whatever"/>
    ...
  </route>
</camelContext>

The camel-jms component is configured to use a Springs SingleConnectionFactory and a cache level of CACHE_SESSION. This combination will cause problems!

When using Springs SingleConnectionFactory, it will not cache the consumer. This ConnectionFactory only reuses the same connection but does not cache any other JMS resources on top of the connection (i.e. session and consumer).
At the same time Springs DefaultMessageListenerContainer (which is used by Camel to consume messages from a JMS broker) does not cache the consumer either as its cache level is set to CACHE_SESSION. The result of this configuration is that a new JMS consumer instance get created by Springs DMLC prior to requesting the next message from the JMS broker. After the message has been dispatched and processed, the consumer is destroyed (and unregistered from the broker).
In addition a JMS topic works differently from a JMS queue in the way that if there is no subscriber registered, the broker will discard the topic message. As with the above configuration the JMS consumer gets recreated for every message by Spring (and registered in the JMS broker), there are certain time windows when the broker does not have a topic subscriber registered and so it discards the messages it receives. The result will be that your Spring JMS topic consumer will not receive all of the messages from the broker.

When consuming topic messages in Spring, it is necessary to cache the JMS consumer. Either by using a connection factory that supports consumer caching like the Spring CachingConnectionFactory or by configuring the cache level of the DMLC to use CACHE_CONSUMER.
Please note that the ActiveMQ PooledConnectionFactory does not cache consumers!

This only applies to topic consumers in Spring. Topic producers will not use these cache level settings and hence don't have this problem.

2 comments:

Tung said...

Interesting post. Would durable subscriptions be another alternative for not losing messages from the topic?

Torsten Mielke said...

@Tung: Yes that would help as well.