Hi guys, I am facing a strange problem related to slow consumers that couldn’t been aborted and I couldn’t solve the issue by myself. We have one queue where we process quite heavy operations (report generation). The number of messages per day isn’t too big: usually less than 100. We have to messageListeners, each on it’s own application node. If the message couldn’t be processed (e.g. due to db transaction timeout) we throw the message into the DLQ (after one redelivery).
*Questions: (copied from the bottom so it's easier to see them)* 1. Why AbortSlowAckConsumerStrategy couldn't abort the consumer? 2. Is this configuration reasonable for our use-case (few but heavy messages to process)? 3. Is there a better way of making sure that queue processing wouldn't stop? (maybe prefetch set to 0, some DMLC functions which are not good documented, etc.) 4. Could redelivery policy (consumer based or broker based) somehow related to this problem? 5. Is using “consumer caching” in DMLC and PooledConnectionFactory a reasonable choice here? Or would it be better just to use ActiveMQConnectionFactory along with DMLC? 6. Is there something about the whole configuration which tingles yours spider-senses? *First issue:* Firstly, we had problems with stopped processing after some heavy operations were performed in the DMLC. Back then we had the following configuration: - prefetch size: default 1000 - none of “abortSlow*ConsumerStrategy” was defined - redelivery policy defined on the jms connection (not on jms factory) - one redelivery Several times processing on this message has been stopped (usually after attempts to generate some gigantic report) and couldn’t resume by itself without restarting Tomcat nodes. And then I have started to check AbortSlow*ConsumerStrategy. I have ended up with the following changes: *Second issue:* - Prefetch size: 1 - I have defined abortSlowAckConsumerStrategy as follows: <slowConsumerStrategy> <abortSlowAckConsumerStrategy ignoreIdleConsumers="false" checkPeriod="60000" maxTimeSinceLastAck="600000" abortConnection="false"/> </slowConsumerStrategy> - Redelivery policy was defined in the Broker insted of on a connection: <redeliveryPolicyEntries> <redeliveryPolicy queue="generateReportQueue" maximumRedeliveries="1" initialRedeliveryDelay="100000" redeliveryDelay="100000"/> </redeliveryPolicyEntries> With this configuration I hoped to abort a consumer after every 10 minutes (or if after the consumer has acked back), so that the messages wouldn't hang in the queue indefinitely (first issue). Unfortunately it looked like that the AbortSlowAckConsumerStrategy couldn’t abort the slow consumer. The logs from AMQ server are here: …. 2014-10-25 00:00:11,455 [host] Scheduler] INFO AbortSlowConsumerStrategy - aborting slow consumer: ID:min-p-app02.osl.basefarm.net-36433-1414153506788-1:1:17:7 for destination:queue://generateReportQueue ... 2014-10-25 01:12:11,641 [host] Scheduler] INFO AbortSlowConsumerStrategy - aborting slow consumer: ID:min-p-app02.osl.basefarm.net-36433-1414153506788-1:1:17:7 for destination:queue://generateReportQueue ... 2014-10-25 18:14:14,557 [host] Scheduler] INFO AbortSlowConsumerStrategy - aborting slow consumer: ID:min-p-app02.osl.basefarm.net-36433-1414153506788-1:1:17:7 for destination:queue://generateReportQueue … After restarting Tomcat nodes the AbortSlowAckConsumerStrategy started to successfully aborting the messages: 2014-10-25 21:18:15,111 [host] Scheduler] INFO AbortSlowConsumerStrategy - aborting slow consumer: ID:min-p-app01.osl.basefarm.net-50652-1414258093487-1:1:40:6 for destination:queue://generateReportQueue 2014-10-25 21:26:15,112 [host] Scheduler] INFO AbortSlowConsumerStrategy - aborting slow consumer: ID:min-p-app02.osl.basefarm.net-36743-1414257416757-1:1:50:1 for destination:queue://generateReportQueue 2014-10-25 21:29:15,113 [host] Scheduler] INFO AbortSlowConsumerStrategy - aborting slow consumer: ID:min-p-app01.osl.basefarm.net-50652-1414258093487-1:1:43:1 for destination:queue://generateReportQueue ... *Questions:* 1. Why AbortSlowAckConsumerStrategy couldn't abort the consumer? 2. Is this configuration reasonable for our use-case (few but heavy messages to process)? 3. Is there a better way of making sure that queue processing wouldn't stop? (maybe prefetch set to 0, some DMLC functions which are not good documented, etc.) 4. Could redelivery policy (consumer based or broker based) somehow related to this problem? 5. Is using “consumer caching” in DMLC and PooledConnectionFactory a reasonable choice here? Or would it be better just to use ActiveMQConnectionFactory along with DMLC? 6. Is there something about the whole configuration which tingles yours spider-senses? * The context:* *Environment:* - We have a clustered Spring based application (running on 2 Tomcat nodes) - We have ActiveMQ standalone working on two nodes (for failover) - We run the application on Linux boxes (Red Hat Enterprise) - Versions: Spring 4.1.0.RC1, ActiveMQ 5.9.1, Java 7 -Memory: - ActiveMQ on each node: -Xmx2048m -Xms512m -XX:MaxPermSize=128m - Tomcat on each node: 12Gb of memory *Configuration of JMS: * *AMQ factory and jmsTemplate on client side:* <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>${activemq.broker.url}</value> </property> <property name="nonBlockingRedelivery"> <value>false</value> </property> </bean> </property> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="receiveTimeout" value="2000"/> <property name="sessionTransacted" value="true"/> </bean> *Spring DMLC* <bean id="defaultMessageListenerContainerParent" abstract="true" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="sessionTransacted" value="true"/> <property name="cacheLevelName" value="CACHE_CONSUMER"/> </bean> <bean parent="defaultMessageListenerContainerParent"> <property name="destination" ref="generateReportQueue"/> <property name="messageListener" ref="sendReportToMailMessageListener"/> </bean> *ActiveMQ broker:* <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" useJmx="true" schedulerSupport="true"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="false" memoryLimit="64mb"> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> <policyEntry queue=">" producerFlowControl="false" memoryLimit="64mb"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry> <policyEntry queue="generateReportQueue" producerFlowControl="false"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> <slowConsumerStrategy> <abortSlowAckConsumerStrategy ignoreIdleConsumers="false" checkPeriod="60000" maxTimeSinceLastAck="600000" abortConnection="false"/> </slowConsumerStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext connectorPort="1099"/> </managementContext> <persistenceAdapter> <kahaDB directory="XXXX"/> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage percentOfJvmHeap="70"/> </memoryUsage> <storeUsage> <storeUsage limit="5 gb"/> </storeUsage> <tempUsage> <tempUsage limit="2 gb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"/> </shutdownHooks> <plugins> <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true"> <redeliveryPolicyMap> <redeliveryPolicyMap> <redeliveryPolicyEntries> <redeliveryPolicy queue="generateReportQueue" maximumRedeliveries="1" initialRedeliveryDelay="100000" redeliveryDelay="100000"/> </redeliveryPolicyEntries> </redeliveryPolicyMap> </redeliveryPolicyMap> </redeliveryPlugin> </plugins> </broker> Any ideas / help is appreciated. Thank you in the advance. Regards Marek Dominiak -- View this message in context: http://activemq.2283324.n4.nabble.com/Not-abortable-slow-consumers-stopped-processing-of-messages-in-a-queue-tp4686721.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.