We've got a cluster of two brokers (5.7.0) running (same configuration--puppet deployed--on both boxes except for the brokerName) in our production environment and most of the VirtualTopics and queues appear to be fine, however one broker (amq00) is having problems.

We have the VirtualTopic.nonBlockingMessageFile which has a few Consumer.*.VirtualTopic.nonBlockingMessageFile queues consuming from it. If I use the web admin and "Send To" the VirtualTopic or a consuming queue that message is delivered to the queue(s) and processed as expected. However when our producers try to produce to the topic VirtualTopic.nonBlockingMessageFile the topic will be created, however nothing is ever enqueued. This is all on amq00. On amq01 the queue is created and messages are enqueued, then delivered to the consuming queues. Our producers are using the failover protocol and do switch between the brokers as we start and stop them.

The other issue is that if we pin some consumers to a Consumer.foo.VirtualTopic.nonBlockingMessageFile queue on amq00 and turn off any consumers on the same queue on amq01 I can see messages being delivered to the networkConnector, however I never see them enqueued or processed through amq00 (we have a number of networkConnectors configured based on the consumer process priority so we actually exclude the VirtualTopic and explicitly include the queues).

I've let all the queues drain, deleted all the queues (even trashed the whole kahadb directory), and bounced the broker only to have this problem persist. Any ideas what's going on? As mentioned before amq01 is working fine, and we can't reproduce this in our test environment.

Here's our activemq.xml:

   <beans
      xmlns="http://www.springframework.org/schema/beans";
      xmlns:amq="http://activemq.apache.org/schema/core";
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
      xsi:schemaLocation="http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
      http://activemq.apache.org/schema/core
   http://activemq.apache.org/schema/core/activemq-core.xsd";>

        <!-- Allows us to use system properties as variables in this
   configuration file -->
        <bean
   
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
   <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>

        <!--
            The <broker> element is used to configure the ActiveMQ broker.
        -->
        <broker xmlns="http://activemq.apache.org/schema/core";
   brokerName="amq00.lan" dataDirectory="${activemq.data}">

            <!--
                For better performances use VM cursor and small memory
   limit.
                For more information, see:

                http://activemq.apache.org/message-cursors.html

                Also, if your producer is "hanging", it's probably due
   to producer flow control.
                For more information, see:
                http://activemq.apache.org/producer-flow-control.html
            -->

            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" producerFlowControl="false"
   memoryLimit="128mb">
                      <pendingSubscriberPolicy>
                        <vmCursor />
                      </pendingSubscriberPolicy>
                    </policyEntry>
                    <policyEntry queue=">" producerFlowControl="false"
   memoryLimit="128mb">
                      <!-- Use VM cursor for better latency
                           For more information, see:

   http://activemq.apache.org/message-cursors.html

                      <pendingQueuePolicy>
                        <vmQueueCursor />
                      </pendingQueuePolicy>
                      -->
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>


            <!--
                The managementContext is used to configure how ActiveMQ
   is exposed in
                JMX. By default, ActiveMQ uses the MBean server that is
   started by
                the JVM. For more information, see:

                http://activemq.apache.org/jmx.html
            -->
            <managementContext>
                <managementContext createConnector="true" />
            </managementContext>

            <!--
                Configure message persistence for the broker. The
   default persistence
                mechanism is the KahaDB store (identified by the kahaDB
   tag).
                For more information, see:

                http://activemq.apache.org/persistence.html
            -->
            <persistenceAdapter>
                           <!--
                <kahaDB directory="${activemq.data}/kahadb"
                        ignoreMissingJournalfiles="true"
                        checkForCorruptJournalFiles="false"
                        checksumJournalFiles="false" />
                           -->
                           <mKahaDB
   directory="${activemq.base}/data/kahadb">
                                   <filteredPersistenceAdapters>
                                           <!-- kahaDB per destinations -->
                                           <filteredKahaDB
   perDestination="true" >
   <persistenceAdapter>
                                                           <kahaDB
   journalMaxFileLength="32mb" />
   </persistenceAdapter>
                                           </filteredKahaDB>
   </filteredPersistenceAdapters>
                           </mKahaDB>
            </persistenceAdapter>


              <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage limit="2 gb" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="65 gb" />
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="25 gb" />
                    </tempUsage>
                </systemUsage>
            </systemUsage>

            <!--
                networkConnectors added for clustering
            -->
            <networkConnectors>
                <networkConnector
                    uri="multicast://default?group=nonblocking"
                    dynamicOnly="true"
                    networkTTL="3"
                    prefetchSize="1000"
                    decreaseNetworkConsumerPriority="true">

                    <excludedDestinations>
                        <!-- If you have the netcwork connector on both the
                             VirtualTopic and the underlying queues
   then your
                             consumers are likely to get duplicates. -->
                        <topic physicalName="VirtualTopic.>"/>
                        <!-- This queue has its own dedicated
   networkConnector -->
                        <queue
   physicalName="Consumer.Sorter.VirtualTopic.nonBlockingMessageFile" />
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.BadForwarderBucket" />
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.ConfProblemBucket" />
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.GoodForwarderBucket" />
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.NoProblemBucket" />
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.SuspiciousBucket" />
                        <queue
   physicalName="Consumer.Engine.VirtualTopic.HbaseLoader" />
                    </excludedDestinations>
                </networkConnector>
                <networkConnector
                    name="NC_Sorter_nonBlockingMessageFile"
                    uri="multicast://default?group=nonblocking"
                    dynamicOnly="true"
                    networkTTL="3"
                    prefetchSize="10"
                    decreaseNetworkConsumerPriority="true">

                    <dynamicallyIncludedDestinations>
                        <queue
   physicalName="Consumer.Sorter.VirtualTopic.nonBlockingMessageFile" />
                    </dynamicallyIncludedDestinations>
                </networkConnector>
                <networkConnector
                    name="NC_Aggregator_Buckets"
                    uri="multicast://default?group=nonblocking"
                    dynamicOnly="true"
                    networkTTL="3"
                    prefetchSize="1000"
                    decreaseNetworkConsumerPriority="true">

                    <dynamicallyIncludedDestinations>
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.BadForwarderBucket" />
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.ConfProblemBucket" />
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.GoodForwarderBucket" />
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.NoProblemBucket" />
                        <queue
   physicalName="Consumer.Aggregator.VirtualTopic.SuspiciousBucket" />
                    </dynamicallyIncludedDestinations>
                </networkConnector>
                <networkConnector
                    name="NC_HbaseLoader"
                    uri="multicast://default?group=nonblocking"
                    dynamicOnly="true"
                    networkTTL="3"
                    prefetchSize="10000"
                    decreaseNetworkConsumerPriority="true">

                    <dynamicallyIncludedDestinations>
                        <queue
   physicalName="Consumer.Engine.VirtualTopic.HbaseLoader" />
                    </dynamicallyIncludedDestinations>
                </networkConnector>
            </networkConnectors>


            <!--
                The transport connectors expose ActiveMQ over a given
   protocol to
                clients and other brokers. For more information, see:

                http://activemq.apache.org/configuring-transports.html
            -->
            <transportConnectors>
                <transportConnector
                    name="openwire"
                    uri="tcp://0.0.0.0:61616"
   discoveryUri="multicast://default?group=nonblocking"
                    updateClusterClients="true"
                    rebalanceClusterClients="true"
                    updateClusterClientsOnRemove="true" />
                <transportConnector
                    name="stomp"
   uri="stomp://0.0.0.0:61613?transport.closeAsync=false"
   discoveryUri="multicast://default?group=nonblocking"
                    updateClusterClients="true"
                    rebalanceClusterClients="true"
                    updateClusterClientsOnRemove="true" />
            </transportConnectors>

            <plugins>
                <discardingDLQBrokerPlugin dropAll="true"
   dropTemporaryTopics="true" dropTemporaryQueues="true" />
            </plugins>

        </broker>

        <!--
            Enable web consoles, REST and Ajax APIs and demos

            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
        -->
        <import resource="jetty.xml" />

   </beans>


Any insight or suggestions as to what might be going on, or how to resolve it, would be greatly appreciated.

~/Matt

Reply via email to