Do messages get stuck in both queues, or only one of them? Tim
On Thu, Jan 23, 2020, 5:42 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > By the way, what's the broker URL in the connection factory ? failover ? > Are you using rebalance client ? > > What's the prefetch on client side configuration ? > > Regards > JB > > On 23/01/2020 11:19, Jérôme Barotin wrote: > > Hi, > > > > I've got two AMQ brokers installed in cluster mode with KahaDB, in order > > to have an HA configuration. I have configured 2 queues (no topic) : > > > > * one that exchange 200 000 messages by day, message group is enabled, > > time to process a message goes between 10 to 60 seconds, it's done > > by 4 job with 15 threads each. > > * one others deal with 10 000 messages by day and there no group, it's > > consumed by 4 job with 5 thread each. > > > > Consumers are multithreaded with PooledConnectionFactory. > > > > All is working well but some time there messages that stuck. My > > consumers are paused and are doing nothing. I can see that, on the > > ActiveMq monitoring GUI : > > > > * Number of pending message is higher than zero > > * All consumer, Dispatched queue is setted to zero > > > > Message can stuck 1 or 2 hours and get unblocked by theirself without > > explicit reason, if I restart brockers : it unblocks messages. > > > > I checked AMQ documentation and lot of parameters and haven't found a > > solution. That's why I'm posting here a description of my issue. Is > > anybody have an idea of a something to change or something to monitor, > > in order to help me ? > > > > The AMQ configuration is the following : > > > > <beans > > xmlns="http://www.springframework.org/schema/beans" > > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > > xsi:schemaLocation="http://www.springframework.org/schema/beans > > http://www.springframework.org/schema/beans/spring-beans.xsd > > http://activemq.apache.org/schema/core > > http://activemq.apache.org/schema/core/activemq-core.xsd"> > > > > <!-- Allows us to use system properties as variables in this > > configuration file --> > > <bean > > > class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> > > > > <property name="locations"> > > <value>file:${activemq.conf}/credentials.properties</value> > > </property> > > </bean> > > > > <!-- Allows accessing the server log --> > > <bean id="logQuery" > class="io.fabric8.insight.log.log4j.Log4jLogQuery" > > lazy-init="false" scope="singleton" > > init-method="start" destroy-method="stop"> > > </bean> > > > > <!-- > > The <broker> element is used to configure the ActiveMQ broker. > > --> > > <broker xmlns="http://activemq.apache.org/schema/core" > > brokerName="energysoft" dataDirectory="${activemq.data}" useJmx="true"> > > > > <networkConnectors> > > <networkConnector uri="static:(tcp://myOtherBrokerIP:61616)" > > userName="mylogin" > > password="mypassword" > > prefetchSize="100" > > duplex="true" > > /> > > </networkConnectors> > > > > <destinationPolicy> > > <policyMap> > > <policyEntries> > > <policyEntry queue=">" maxPageSize="10000" > > maxBrowsePageSize="0" enableAudit="false" > > > <networkBridgeFilterFactory> > > <conditionalNetworkBridgeFilterFactory > > replayWhenNoConsumers="true"/> > > </networkBridgeFilterFactory> > > <messageGroupMapFactory> > > <simpleMessageGroupMapFactory/> > > </messageGroupMapFactory> > > </policyEntry> > > <policyEntry topic=">" > > > <pendingMessageLimitStrategy> > > <constantPendingMessageLimitStrategy limit="1000"/> > > </pendingMessageLimitStrategy> > > </policyEntry> > > </policyEntries> > > </policyMap> > > </destinationPolicy> > > > > <plugins> > > <simpleAuthenticationPlugin> > > <users> > > <authenticationUser username="mylogin" > > password="mypassword" groups="users,admins"/> > > </users> > > </simpleAuthenticationPlugin> > > </plugins> > > > > <managementContext> > > <managementContext createConnector="false"/> > > </managementContext> > > > > > > <persistenceAdapter> > > <kahaDB directory="${activemq.data}/kahadb" > > journalMaxFileLength="32mb"/> > > </persistenceAdapter> > > > > <systemUsage> > > <systemUsage> > > <memoryUsage> > > <memoryUsage percentOfJvmHeap="70" /> > > </memoryUsage> > > <storeUsage> > > <storeUsage limit="100 gb"/> > > </storeUsage> > > <tempUsage> > > <tempUsage limit="50 gb"/> > > </tempUsage> > > </systemUsage> > > </systemUsage> > > > > <transportConnectors> > > <!-- DOS protection, limit concurrent connections to 1000 > > and frame size to 100MB --> > > <transportConnector name="openwire" > > uri="tcp:// > 0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 > " > > /> > > <transportConnector name="amqp" > > uri="amqp:// > 0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600 > "/> > > > > <transportConnector name="stomp" > > uri="stomp:// > 0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600 > "/> > > > > <transportConnector name="mqtt" > > uri="mqtt:// > 0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600 > "/> > > > > <transportConnector name="ws" > > uri="ws:// > 0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 > "/> > > > > <!-- cluster port --> > > <!-- <transportConnector uri="tcp://localhost:62002"/> --> > > </transportConnectors> > > > > <!-- destroy the spring context on shutdown to stop jetty --> > > <shutdownHooks> > > <bean xmlns="http://www.springframework.org/schema/beans" > > class="org.apache.activemq.hooks.SpringContextHook" /> > > </shutdownHooks> > > </broker> > > > > <import resource="jetty.xml"/> > > > > </beans> > > > > > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >