Hi, I'm encountering a problem in using VirtualTopics in a cluster of 3 brokers. Hope ActiveMQ experts can help. I attached the configuration at the end. I used most of the networkconnector and transportconnector option, hoping I got it right - please help. Thank you.
OS redhad linux java 1.6 activemq 5.4.2 3 broker cluster: A, B, C 1 producer, sending to topic VirtualTopic.testtopic 3 consumers subscribing to queue Consumer.group1.VirtualTopic.testtopic 1 consumer subscribing to topic VirtualTopic.testtopic consumers connect to brokers using the failover:// - so they randomally connect to any one broker. The transport connector has the update cluster client and reblance options. The consumer on the topic gets the correct number of messages (all of them). However, the consumers (on queue for load-balance) on a few occasions got the correct number of messages on the 1st batch (2000 msgs) after initial connect, but numbers always not add up on subsequent batches. Problems observed: 1. 3 consumers (load-balanced) are not properly load-balanced. The number of messages does not always add up to the number sent. Sometimes total is below, sometimes it received way more that what's sent. Sometimes a queue consumer would not get any message. 2. Producer test send batch of 2000 msgs. In subsequent batches, the 3 load-balanced consumers progressively received more msgs than 2000 each time. It's like msgs already received were sent again by the broker. 3. I check the jms destination name, the queue consumers sometimes got message from queue only, sometimes topic only (which I assume it should not), and sometimes both queue AND topic - queue://Consumer.group1.VirtualTopic.testtopic, topic://VirtualTopic.testtopic 4. Log file shows many rejectet messages: Duplicate message add attempt rejected (on the queue destination). I specified the excludedDestinaton and suppressDuplicateQueueSubscriptions to no avail. 5. Sometimes queue consomer got repeated messages after a disconnect/connect. The disconnect was after no message had been received. At one point I used a separate transport conector for the network connector. Still had problems. In addition, the consumer connections sometimes were failed over to the network connector port - not sure if that is the correct behavior. Any help would be much appreciated. Thanks, again. Configuration applied: 1. <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="VirtualTopic.>" /> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> I tried both with and without the prefix attribute. No difference. 2. <networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)" conduitSubscriptions="false" dynamicOnly="true" networkTTL="3" suppressDuplicateQueueSubscriptions="true" > <excludedDestinations> <queue physicalName="Consumer.*.VirtualTopic.>"/> </excludedDestinations> </networkConnector> Tried various combination of values. 3. <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" rebalanceClusterClients="true" updateClusterClientsOnRemove="true"/> </transportConnectors> broker xml config below <!-- START SNIPPET: example --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.base}/conf/credentials.properties</value> </property> </bean> <!-- The <broker> element is used to configure the ActiveMQ broker. --> useJmx="true" brokerName="d01-fus-arch01.stage.root" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true"> <!-- <virtualTopic name="VirtualTopic.>" prefix="Consumer.*."/> <virtualTopic name="VirtualTopic.>" /> --> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="VirtualTopic.>" /> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> <!-- For better performances use VM cursor and small memory limit. For more information, see: http://activemq.apache.org/message-cursors.html Also, if your producer is "hanging", it's probably due to producer flow control. For more information, see: http://activemq.apache.org/producer-flow-control.html --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb"> <pendingSubscriberPolicy> <vmCursor /> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> <!-- Use VM cursor for better latency For more information, see: http://activemq.apache.org/message-cursors.html <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy> --> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- The managementContext is used to configure how ActiveMQ is exposed in JMX. By default, ActiveMQ uses the MBean server that is started by the JVM. For more information, see: http://activemq.apache.org/jmx.html --> <managementContext> <managementContext createConnector="false"/> </managementContext> <networkConnectors> <networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)" conduitSubscriptions="false" dynamicOnly="true" networkTTL="3" suppressDuplicateQueueSubscriptions="true" > <excludedDestinations> <queue physicalName="Consumer.*.VirtualTopic.>"/> </excludedDestinations> </networkConnector> </networkConnectors> <!-- Configure message persistence for the broker. The default persistence mechanism is the KahaDB store (identified by the kahaDB tag). For more information, see: http://activemq.apache.org/persistence.html --> <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter> <!-- The systemUsage controls the maximum amount of space the broker will use before slowing down producers. For more information, see: http://activemq.apache.org/producer-flow-control.html <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> --> <!-- The transport connectors expose ActiveMQ over a given protocol to clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html --> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" updateClusterClients="true" rebalanceClusterClients="true" updateClusterClientsOnRemove="true"/> </transportConnectors> </broker> <!-- Enable web consoles, REST and Ajax APIs and demos It also includes Camel (with its web console), see ${ACTIVEMQ_HOME}/conf/camel.xml for more info Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details --> <import resource="jetty.xml"/> </beans>