All:
I have done quite a bit of reading - ActiveMQ documentation, the ActiveMQ In 
Action book (MEAP), the ActiveMQ forum, as well as many other internet postings.
We are using ActiveMQ 5.3.1 in production and recently experienced a case where 
849 messages were duplicated within a 70 second period.
My environment:
linux; JDK 1.6; ActiveMQ 5.3.1; Camel 2.4
My current configuration is master/slave shared nothing. Here's the master 
config:
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at


    http://www.apache.org/licenses/LICENSE-2.0


    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>


    <!-- Allows us to use system properties as variables in this configuration 
file -->
    <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>


    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core";
                        brokerName="emsp-primary"
                        dataDirectory="${activemq.base}/data"
                        destroyApplicationContextOnStop="true" >


        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true" 
memoryLimit="5mb" useCache="false">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" 
memoryLimit="5mb" useCache="false">
                  <!-- Use VM cursor for better latency
                       For more information, see:


                       http://activemq.apache.org/message-cursors.html


                  <pendingQueuePolicy>
                    <vmQueueCursor/>
                  </pendingQueuePolicy>
                  -->
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>


                <networkConnectors>
                </networkConnectors>


                <persistenceAdapter>
                        <kahaDB directory="${activemq.base}/data/kahadb"
                                        indexWriteBatchSize="100"
                                        journalMaxFileLength="33554432"
                                        enableIndexWriteAsync="true"
                                        enableJournalDiskSyncs="false" />
                </persistenceAdapter>


                <systemUsage>
                        <systemUsage>
                                <memoryUsage>
                                        <memoryUsage limit="512 mb"/>
                                </memoryUsage>
                                <storeUsage>
                                        <storeUsage limit="2 gb" 
name="data-store"/>
                                </storeUsage>
                                <tempUsage>
                                        <tempUsage limit="100 mb"/>
                                </tempUsage>
                        </systemUsage>
                </systemUsage>


        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>


    </broker>


    <import resource="jetty.xml"/>


</beans>
And the Failover config:
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at


    http://www.apache.org/licenses/LICENSE-2.0


    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>


    <!-- Allows us to use system properties as variables in this configuration 
file -->
    <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>


    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core";
                        brokerName="emsp-failover"
                        dataDirectory="${activemq.base}/data"
                        destroyApplicationContextOnStop="true" >


        <!--
                        For better performances use VM cursor and small memory 
limit.
                        For more information, see:


            http://activemq.apache.org/message-cursors.html


            Also, if your producer is "hanging", it's probably due to producer 
flow control.
            For more information, see:
            http://activemq.apache.org/producer-flow-control.html
        -->


        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true" 
memoryLimit="5mb" useCache="false">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" 
memoryLimit="5mb" useCache="false">
                  <!-- Use VM cursor for better latency
                       For more information, see:


                       http://activemq.apache.org/message-cursors.html


                  <pendingQueuePolicy>
                    <vmQueueCursor/>
                  </pendingQueuePolicy>
                  -->
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed 
in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:


            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>


                <!-- The store and forward broker networks ActiveMQ will listen 
to -->
                <networkConnectors>
                        <networkConnector name="bridge-to-primary"
                                                   
uri="static:(tcp://10.9.8.7:61616)"
                                                   dynamicOnly="false"
                                                   duplex="true"
                                                   prefetchSize="1">
                                <excludedDestinations>
                                        <queue 
physicalName="Consumer.*.VirtualTopic.>"/>
                                </excludedDestinations>
                                <staticallyIncludedDestinations>
                                        <topic 
physicalName="VirtualTopic.MyCompany.SharedServices.DirectoryServices"/>
                                </staticallyIncludedDestinations>
                        </networkConnector>
                </networkConnectors>


                <persistenceAdapter>
                        <kahaDB directory="${activemq.base}/data/kahadb"
                                        indexWriteBatchSize="100"
                                        journalMaxFileLength="33554432"
                                        enableIndexWriteAsync="true"
                                        enableJournalDiskSyncs="false" />
                </persistenceAdapter>


                <systemUsage>
                        <systemUsage>
                                <memoryUsage>
                                        <memoryUsage limit="512 mb"/>
                                </memoryUsage>
                                <storeUsage>
                                        <storeUsage limit="2 gb" 
name="data-store"/>
                                </storeUsage>
                                <tempUsage>
                                        <tempUsage limit="100 mb"/>
                                </tempUsage>
                        </systemUsage>
                </systemUsage>


        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
        </transportConnectors>


    </broker>


    <import resource="jetty.xml"/>


</beans>
Finally, here are portions of the ActiveMQ log files:
Log Messages from Primary
2011-02-16 05:18:39,197 | WARN  | Ignoring ack received before dispatch; result 
of failover with an outstanding ack. Acked messages will be replayed if present 
on this broker. Ignored ack: MessageAck {commandId = 1166, responseRequired = 
false, ackType = 2, consumerId = 
ID:prod-interop-mq2.mycompany.com-52485-1296046376681-0:4:3:1, firstMessageId = 
ID:prod-interop-mq1.mycompany.com-48185-1296047391209-0:2:3:1540:1, 
lastMessageId = 
ID:prod-interop-mq1.mycompany.com-48185-1296047391209-0:2:3:1540:1, destination 
= queue://MyCompany.CAM.Import, transactionId = null, messageCount = 1} | 
org.apache.activemq.broker.region.PrefetchSubscription | ActiveMQ Transport: 
tcp:///10.9.8.7:371512011-02-16 05:18:45,861 | INFO  | Network connection 
between vm://emsp-failover#0 and tcp:///10.9.8.7:61616 shutdown due to a local 
error: org.apache.activemq.transport.InactivityIOException: Channel was 
inactive for too long: /10.9.8.7:61616 | 
org.apache.activemq.network.DemandForwardingBridge | InactivityMonitor Async 
Task: java.util.concurrent.ThreadPoolExecutor$Worker@e757372011-02-16 
05:18:45,905 | INFO  | Connector vm://emsp-failover Stopped | 
org.apache.activemq.broker.TransportConnector | InactivityMonitor Async Task: 
java.util.concurrent.ThreadPoolExecutor$Worker@e757372011-02-16 05:18:45,905 | 
INFO  | emsp-failover bridge to emsp-primary stopped | 
org.apache.activemq.network.DemandForwardingBridge | InactivityMonitor Async 
Task: java.util.concurrent.ThreadPoolExecutor$Worker@e757372011-02-16 
05:18:45,908 | INFO  | Establishing network connection from vm://emsp-failover 
to tcp://10.9.8.7:61616 | org.apache.activemq.network.DiscoveryNetworkConnector 
| Simple Discovery Agent: 
java.util.concurrent.ThreadPoolExecutor$Worker@504fe42011-02-16 05:18:45,911 | 
INFO  | Connector vm://emsp-failover Started | 
org.apache.activemq.broker.TransportConnector | Simple Discovery Agent: 
java.util.concurrent.ThreadPoolExecutor$Worker@504fe42011-02-16 05:19:30,738 | 
INFO  | Usage Manager Memory Limit reached on queue://MyCompany.CAM.Import. 
Producers will be throttled to the rate at which messages are removed from this 
destination to prevent flooding it. See 
http://activemq.apache.org/producer-flow-control.html for more info | 
org.apache.activemq.broker.region.Queue | ActiveMQ Transport: 
tcp:///10.9.8.8:446342011-02-16 05:19:38,877 | INFO  | Network connection 
between vm://emsp-failover#6 and tcp:///10.9.8.7:61616(emsp-primary) has been 
established. | org.apache.activemq.network.DemandForwardingBridge | 
StartLocalBridge: localBroker=vm://emsp-failover#6

Log Messages from Failover
2011-02-16 05:19:30,361 | WARN  | Caught an exception processing local command 
| org.apache.activemq.network.DemandForwardingBridge | ActiveMQ Connection 
Dispatcher: 
vm://emsp-primary#6org.apache.activemq.transport.InactivityIOException: Channel 
was inactive for too long: /10.9.8.8:49724        at 
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:235)
   at 
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)   
     at 
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
     at 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)  at 
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
  at 
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:702)
        at 
org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(DemandForwardingBridgeSupport.java:158)
        at 
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
      at 
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
     at 
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:118)    at 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)  at 
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
  at 
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1205)
       at 
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:790)
 at 
org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:826)
 at 
org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
  at 
org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)2011-02-16
 05:19:30,334 | WARN  | Network connection between vm://emsp-primary#6 and 
tcp:///10.9.8.8:49724 shutdown due to a remote error: java.net.SocketException: 
Broken pipe | org.apache.activemq.network.DemandForwardingBridge | 
InactivityMonitor Async Task: 
java.util.concurrent.ThreadPoolExecutor$Worker@9ec3322011-02-16 05:19:28,418 | 
WARN  | KahaDB PageFile flush: 6 queued writes, latch wait took 732 | 
org.apache.kahadb.page.PageFile | ActiveMQ Journal Checkpoint Worker2011-02-16 
05:19:27,385 | INFO  | Slow KahaDB access: Journal append took: 411 ms, Index 
update took 2368 ms | org.apache.activemq.store.kahadb.MessageDatabase | 
ActiveMQ Transport: tcp:///10.9.8.7:608992011-02-16 05:19:30,915 | INFO  | 
Network connection between vm://emsp-primary#6 and tcp:///10.9.8.8:49724 
shutdown due to a local error: 
org.apache.activemq.transport.InactivityIOException: Channel was inactive for 
too long: /10.9.8.8:49724 | org.apache.activemq.network.DemandForwardingBridge 
| ActiveMQ Connection Dispatcher: vm://emsp-primary#62011-02-16 05:19:32,153 | 
INFO  | Slow KahaDB access: cleanup took 9200 | 
org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Journal Checkpoint 
Worker2011-02-16 05:19:33,470 | INFO  | Slow KahaDB access: Journal append 
took: 0 ms, Index update took 7002 ms | 
org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ Transport: 
tcp:///10.9.8.8:477082011-02-16 05:19:33,887 | WARN  | KahaDB PageFile flush: 4 
queued writes, latch wait took 410 | org.apache.kahadb.page.PageFile | ActiveMQ 
Journal Checkpoint Worker2011-02-16 05:19:38,048 | INFO  | Connector 
vm://emsp-primary Stopped | org.apache.activemq.broker.TransportConnector | 
ActiveMQ Task2011-02-16 05:19:38,068 | INFO  | emsp-primary bridge to 
emsp-failover stopped | org.apache.activemq.network.DemandForwardingBridge | 
ActiveMQ Task2011-02-16 05:19:38,048 | INFO  | The connection to 
'/10.9.8.8:49724' is taking a long time to shutdown. | 
org.apache.activemq.broker.TransportConnection | NetworkBridge2011-02-16 
05:19:52,288 | INFO  | Connector vm://emsp-primary Started | 
org.apache.activemq.broker.TransportConnector | ActiveMQ Transport: 
tcp:///10.9.8.8:386262011-02-16 05:19:55,082 | INFO  | Slow KahaDB access: 
cleanup took 1839 | org.apache.activemq.store.kahadb.MessageDatabase | ActiveMQ 
Journal Checkpoint Worker2011-02-16 05:19:55,554 | INFO  | Created Duplex 
Bridge back to emsp-failover | org.apache.activemq.broker.TransportConnection | 
ActiveMQ Transport: tcp:///10.9.8.8:386262011-02-16 05:19:55,641 | INFO  | 
Network connection between vm://emsp-primary#8 and 
tcp:///10.9.8.8:38626(emsp-failover) has been established. | 
org.apache.activemq.network.DemandForwardingBridge | StartLocalBridge: 
localBroker=vm://emsp-primary#82011-02-16 05:19:59,225 | INFO  | Slow KahaDB 
access: cleanup took 610 | org.apache.activemq.store.kahadb.MessageDatabase | 
ActiveMQ Journal Checkpoint Worker
I should point out that all clients connect using the failover transport with 
randomize=false. In other words our second broker is only there to provide 
failover not load-balancing.
The reliability and availability (i.e. failover) of ActiveMQ have been 
outstanding. However, producing (and then processing) 849 duplicate messages is 
unacceptable. 
Is the (occasional) production of duplicate messages known and customary 
behavior?Is there something in my configuration that can be changed to prevent 
duplicate messages?
Would using a "shared database" strategy prevent this? ( because only one 
broker can be active at a time; also good would be a much simpler config - i.e. 
all brokers can use the same configuration file)
Any other suggestions/advice would be quite welcome!
TIA,
Mike L. (aka patzerbud)
                                          

Reply via email to