When using a Spring to consume topic messages, make sure to
cache the JMS consumer somewhere. Otherwise your consumer may not receive all messages.
Consider the following Camel route definition that consumes
messages from a JMS topic.
<!--
Can be any JMS broker really, e.g ActiveMQ -->
<bean
id="JmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
<property name="serverUrl" value="tcp://localhost:61616” />
</bean>
<bean id="SingleConnectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory” >
<property name="targetConnectionFactory" ref="JmsConnectionFactory" />
<property name="reconnectOnException" value="true" />
</bean>
<bean
id="jms" class="org.apache.camel.component.jms.JmsComponent" >
<property name="connectionFactory" ref="SingleConnectionFactory" />
<property name="cacheLevelName" value="CACHE_SESSION" />
<property name="acknowledgementMode"
value="1" />
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring"
trace="true" >
<route id="Consume-From-JMS">
<from uri="jms:topic:Test.Topic?deliveryPersistent=true" />
<to uri="whatever"/>
...
</route>
</camelContext>
The camel-jms component is configured to use a Springs
SingleConnectionFactory and a cache level of CACHE_SESSION. This combination
will cause problems!
When using Springs SingleConnectionFactory, it will not cache
the consumer. This ConnectionFactory only reuses the same connection but does
not cache any other JMS resources on top of the connection (i.e. session and
consumer).
At the same time Springs DefaultMessageListenerContainer
(which is used by Camel to consume messages from a JMS broker) does not cache
the consumer either as its cache level is set to CACHE_SESSION. The result of
this configuration is that a new JMS consumer instance get created by Springs
DMLC prior to requesting the next message from the JMS broker. After the
message has been dispatched and processed, the consumer is destroyed (and
unregistered from the broker).
In addition a JMS topic works differently from a JMS queue in the way that if there is no subscriber registered, the broker will discard the topic message. As with the above configuration the JMS consumer gets recreated for every message by Spring (and registered in the JMS broker), there are certain time windows when the broker does not have a topic subscriber registered and so it discards the messages it receives. The result will be that your Spring JMS topic consumer will not receive all of the messages from the broker.
In addition a JMS topic works differently from a JMS queue in the way that if there is no subscriber registered, the broker will discard the topic message. As with the above configuration the JMS consumer gets recreated for every message by Spring (and registered in the JMS broker), there are certain time windows when the broker does not have a topic subscriber registered and so it discards the messages it receives. The result will be that your Spring JMS topic consumer will not receive all of the messages from the broker.
When consuming topic messages in Spring, it is necessary to cache
the JMS consumer. Either by using a connection factory that supports consumer
caching like the Spring CachingConnectionFactory or by configuring the cache
level of the DMLC to use CACHE_CONSUMER.
Please note that the ActiveMQ PooledConnectionFactory does
not cache consumers!
This only applies to topic consumers in Spring. Topic
producers will not use these cache level settings and hence don't have this
problem.
2 comments:
Interesting post. Would durable subscriptions be another alternative for not losing messages from the topic?
@Tung: Yes that would help as well.
Post a Comment