Hi, we are testing Pure Master/Slave component with following configuration :
- brokers are located on Vmware ESX in separate VM. Linux 5.4. Active-MQ 5.3.0 master : <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" waitForSlave="true" shutdownOnSlaveFailure="true" persistent="true" useJmx="true" dataDirectory="${activemq.base}/data"> <managementContext> <managementContext createConnector="false"/> </managementContext> <destinations> <queue physicalName="TEST.FOO" /> </destinations> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.base}/data/amqstore" syncOnWrite="true" maxFileLength="128mb" directoryArchive="${activemq.base}/archive/amqstore" archiveDataLogs="true"/> </persistenceAdapter> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb"> <pendingSubscriberPolicy> <fileCursor /> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="64mb"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="Test.DLQ."/> </deadLetterStrategy> <pendingQueuePolicy> <fileQueueCursor /> </pendingQueuePolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <systemUsage> <systemUsage > <memoryUsage> <memoryUsage limit="128mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb" name="foo"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/> </transportConnectors> </broker> slave : <broker xmlns="http://activemq.apache.org/schema/core" masterConnectorURI="tcp://mombrks02.ina.fr:61616" brokerName="localhost" shutdownOnMasterFailure="true" useJmx="true" dataDirectory="${activemq.base}/data"> <destinations> <queue physicalName="TEST.FOO" /> </destinations> <managementContext> <managementContext createConnector="false"/> </managementContext> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.base}/data/amqstore" maxFileLength="128mb" directoryArchive="${activemq.base}/archive/amqstore" archiveDataLogs="true"/> </persistenceAdapter> stomp producers post presistent messages until they are suspended by FlowControl mechanism. ->on "stand alone" broker, when consumers get messages, producers are unlocked as usual. -> but on pure master/slave, stomp consumers cannot get messages and as a consequence, all producers/consumers are freezed. rebooting broker is the only way to go back to normal behavior (no lost messages). :confused: producer code : <...> # Connect to the broker. my $stomp = Net::Stomp->new( { hostname => "$_", port => '61613' } ); print "connect to broker $_ \n"; $stomp->connect( { login => "$login", passscode => "$pass", clientid => 'CLIENT' } ); my $beg_time=time; print "send messages $_ at $beg_time\n"; my $i=0; while ($i<10000) { # Send test message containing $time timestamp. $stomp->send( { destination => "$queue", body => "test $i"x32768, persistent=> "true" } ); print "message $i ok\n"; $i++; } <...> and get_message: # Connect to the broker. my $stomp = Net::Stomp->new( { hostname => "$_", port => '61613' } ); $stomp->connect( { login => "$login", passscode => "$pass", clientid => 'CLIENT_GET' } ); print "connected to broker $_\n"; # Send test message containing $time timestamp. # Subscribe to messages from the $queue. $stomp->subscribe( { destination => "$queue", ack => 'client', prefetchSize => 1, persistent=>"true" } ); my $i=0; my $can_read=1; my $framebody; print "subscribed to queue $queue\n"; while ($i>=0 && $can_read) { # Wait max 5 seconds for message to appear. my $can_read = $stomp->can_read({ timeout => "5" }); print "$i:$can_read\n"; if ( $can_read ) { # There is a message to collect. my $frame = $stomp->receive_frame; $stomp->ack( { frame => $frame } ); $framebody=$frame->body; print "length=".length($framebody); } else { # There's still to message to collect. print "CRITICAL: Timed out while trying to collect the message\n"; $exitcode="critical"; } # Whatever the outcome, we have managed to connect to given broker. # There's no need to try the other. $i++; } print "nb lus : $i"; $stomp->disconnect; exit $error{"$exitcode"}; } print "CRITICAL: No connection to ActiveMQ; tried $evalcount out of " . @hosts . " brokers\n"; exit $error{"critical"}; on producer side: ... message 340 ok message 341 ok message 342 ok <freeze> on consumer side, we start consumer only when producers are suspended by flow control : [r...@momcltls01 client_perl]# perl ./get_massif.pl connected to broker mombrks02.ina.fr subscribed to queue /queue/TEST.FOO 0:0 CRITICAL: Timed out while trying to collect the message 1:0 CRITICAL: Timed out while trying to collect the message what's going wrong?? -- View this message in context: http://old.nabble.com/Pure-Master-Slave-freeze-once-flowcontrol-tp27466380p27466380.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.