Hi guys,

I am facing a strange problem related to slow consumers that couldn’t been
aborted and I couldn’t solve the issue by myself.
We have one queue where we process quite heavy operations (report
generation). The number of messages per day isn’t too big: usually less than
100. We have to messageListeners, each on it’s own application node. If the
message couldn’t be processed (e.g. due to db transaction timeout) we throw
the message into the DLQ (after one redelivery).

*Questions: (copied from the bottom so it's easier to see them)*
1. Why AbortSlowAckConsumerStrategy couldn't abort the consumer?
2. Is this configuration reasonable for our use-case (few but heavy messages
to process)?
3. Is there a better way of making sure that queue processing wouldn't stop?
(maybe prefetch set to 0, some DMLC functions which are not good documented,
etc.)
4. Could redelivery policy (consumer based or broker based) somehow related
to this problem?
5. Is using “consumer caching” in DMLC and PooledConnectionFactory a
reasonable choice here? Or would it be better just to use
ActiveMQConnectionFactory along with DMLC?
6. Is there something about the whole configuration which tingles yours
spider-senses?


*First issue:*
Firstly, we had problems with stopped processing after some heavy operations
were performed in the DMLC. Back then we had the following configuration:
- prefetch size: default 1000
- none of “abortSlow*ConsumerStrategy” was defined
- redelivery policy defined on the jms connection (not on jms factory) - one
redelivery

Several times processing on this message has been stopped (usually after
attempts to generate some gigantic report) and couldn’t resume by itself
without restarting Tomcat nodes. And then I have started to check
AbortSlow*ConsumerStrategy.

I have ended up with the following changes:

*Second issue:*
- Prefetch size: 1
- I have defined  abortSlowAckConsumerStrategy as follows:
<slowConsumerStrategy>
<abortSlowAckConsumerStrategy ignoreIdleConsumers="false"
checkPeriod="60000" maxTimeSinceLastAck="600000" abortConnection="false"/>
</slowConsumerStrategy>

- Redelivery policy was defined in the Broker insted of on a connection:
<redeliveryPolicyEntries> 
<redeliveryPolicy queue="generateReportQueue" maximumRedeliveries="1"
initialRedeliveryDelay="100000" redeliveryDelay="100000"/>    
</redeliveryPolicyEntries>

With this configuration I hoped to abort a consumer after every 10 minutes
(or if after the consumer has acked back), so that the messages wouldn't
hang in the queue indefinitely (first issue). 
Unfortunately it looked like that the AbortSlowAckConsumerStrategy couldn’t
abort the slow consumer.


The logs from AMQ server are here: 
….
2014-10-25 00:00:11,455 [host] Scheduler] INFO  AbortSlowConsumerStrategy     
- aborting slow consumer:
ID:min-p-app02.osl.basefarm.net-36433-1414153506788-1:1:17:7 for
destination:queue://generateReportQueue
...
2014-10-25 01:12:11,641 [host] Scheduler] INFO  AbortSlowConsumerStrategy     
- aborting slow consumer:
ID:min-p-app02.osl.basefarm.net-36433-1414153506788-1:1:17:7 for
destination:queue://generateReportQueue
...
2014-10-25 18:14:14,557 [host] Scheduler] INFO  AbortSlowConsumerStrategy     
- aborting slow consumer:
ID:min-p-app02.osl.basefarm.net-36433-1414153506788-1:1:17:7 for
destination:queue://generateReportQueue
…

After restarting Tomcat nodes the AbortSlowAckConsumerStrategy started to
successfully aborting the messages:
2014-10-25 21:18:15,111 [host] Scheduler] INFO  AbortSlowConsumerStrategy     
- aborting slow consumer:
ID:min-p-app01.osl.basefarm.net-50652-1414258093487-1:1:40:6 for
destination:queue://generateReportQueue
2014-10-25 21:26:15,112 [host] Scheduler] INFO  AbortSlowConsumerStrategy     
- aborting slow consumer:
ID:min-p-app02.osl.basefarm.net-36743-1414257416757-1:1:50:1 for
destination:queue://generateReportQueue
2014-10-25 21:29:15,113 [host] Scheduler] INFO  AbortSlowConsumerStrategy     
- aborting slow consumer:
ID:min-p-app01.osl.basefarm.net-50652-1414258093487-1:1:43:1 for
destination:queue://generateReportQueue
...

*Questions:*
1. Why AbortSlowAckConsumerStrategy couldn't abort the consumer?
2. Is this configuration reasonable for our use-case (few but heavy messages
to process)?
3. Is there a better way of making sure that queue processing wouldn't stop?
(maybe prefetch set to 0, some DMLC functions which are not good documented,
etc.)
4. Could redelivery policy (consumer based or broker based) somehow related
to this problem?
5. Is using “consumer caching” in DMLC and PooledConnectionFactory a
reasonable choice here? Or would it be better just to use
ActiveMQConnectionFactory along with DMLC?
6. Is there something about the whole configuration which tingles yours
spider-senses?

*
The context:*

*Environment:*
- We have a clustered Spring based application (running on 2 Tomcat nodes)
- We have ActiveMQ standalone working on two nodes (for failover)
- We run the application on Linux boxes (Red Hat Enterprise) 
- Versions: Spring 4.1.0.RC1, ActiveMQ 5.9.1, Java 7
-Memory: 
- ActiveMQ on each node: -Xmx2048m -Xms512m -XX:MaxPermSize=128m 
- Tomcat on each node: 12Gb of memory 

*Configuration of JMS: *
*AMQ factory and jmsTemplate on client side:*

  <bean id="connectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>${activemq.broker.url}</value>
                </property>
                <property name="nonBlockingRedelivery">
                    <value>false</value>
                </property>
            </bean>
        </property>
</bean>

 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="receiveTimeout" value="2000"/>
        <property name="sessionTransacted" value="true"/>
</bean>

*Spring DMLC*

<bean id="defaultMessageListenerContainerParent" abstract="true"
   
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="sessionTransacted" value="true"/>
        <property name="cacheLevelName" value="CACHE_CONSUMER"/>
</bean>

 <bean parent="defaultMessageListenerContainerParent">
        <property name="destination" ref="generateReportQueue"/>
        <property name="messageListener"
ref="sendReportToMailMessageListener"/>
  </bean>

*ActiveMQ broker:*

<broker xmlns="http://activemq.apache.org/schema/core";
brokerName="localhost" dataDirectory="${activemq.data}" useJmx="true"
schedulerSupport="true">     
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry topic=">" producerFlowControl="false"
memoryLimit="64mb">                     
                        <pendingMessageLimitStrategy>
                            <constantPendingMessageLimitStrategy
limit="1000"/>
                        </pendingMessageLimitStrategy>
                    </policyEntry>
                    <policyEntry queue=">" producerFlowControl="false"
memoryLimit="64mb">                      
                        <deadLetterStrategy>                          
                            <individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true"/>
                        </deadLetterStrategy>
                    </policyEntry>                  
                    <policyEntry queue="generateReportQueue"
producerFlowControl="false">
                        <deadLetterStrategy>
                            <individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true"/>
                        </deadLetterStrategy>
                        <slowConsumerStrategy>
                            <abortSlowAckConsumerStrategy
ignoreIdleConsumers="false" checkPeriod="60000"
                                                         
maxTimeSinceLastAck="600000" abortConnection="false"/>
                        </slowConsumerStrategy>
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <managementContext>
            <managementContext connectorPort="1099"/>
        </managementContext>

        <persistenceAdapter>
          <kahaDB directory="XXXX"/>
        </persistenceAdapter>

        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="5 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="2 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <transportConnectors>
            <transportConnector name="openwire"
                               
uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
            <transportConnector name="amqp"
                               
uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
            <transportConnector name="stomp"
                               
uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt"
                               
uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws"
                               
uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans";
                  class="org.apache.activemq.hooks.SpringContextHook"/>
        </shutdownHooks>

        <plugins>
            <redeliveryPlugin fallbackToDeadLetter="true"
sendToDlqIfMaxRetriesExceeded="true">
                <redeliveryPolicyMap>
                    <redeliveryPolicyMap>
                        <redeliveryPolicyEntries>
                            <redeliveryPolicy queue="generateReportQueue"
maximumRedeliveries="1"
                                             
initialRedeliveryDelay="100000" redeliveryDelay="100000"/>
                        </redeliveryPolicyEntries>
                    </redeliveryPolicyMap>
                </redeliveryPolicyMap>
            </redeliveryPlugin>
        </plugins>

    </broker>


Any ideas / help is appreciated.
Thank you in the advance.

Regards
Marek Dominiak



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Not-abortable-slow-consumers-stopped-processing-of-messages-in-a-queue-tp4686721.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to