It seems that email attachment is not allowed... I put the diff directly 
below...

 

Daniel Laügt.

 

 

--- 
soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
           2011/10/19 10:02:31                146083

+++ 
soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
        2011/10/19 10:17:24                146084

@@ -54,9 +54,11 @@

         //        Either we need to implement something similar to 
LinkedHashMap or find

         //        some other way of tracking the eldest entry into the map and 
removing it

         //        if the cache size is exceeded.

-        ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,

+        ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,

                           MessageId::COMPARATOR > messageCache;

+        ConcurrentStlMap< std::string, Pointer<Command> > messagePullCache;

+

         bool trackTransactions;

         bool restoreSessions;

         bool restoreConsumers;

@@ -122,6 +124,8 @@

         virtual Pointer<Command> processEndTransaction( TransactionInfo* info 
);

+        virtual Pointer<Command> processMessagePull( MessagePull* pull );

+

         bool isRestoreConsumers() const {

             return this->restoreConsumers;

         }

 

--- 
soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
      2011/10/19 10:02:31                146083

+++ 
soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
   2011/10/19 10:17:24                146084

@@ -108,11 +108,21 @@

void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {

     try{

-        if( trackMessages && command != NULL && command->isMessage() ) {

-            Pointer<Message> message =

+        if( command != NULL ) {

+            if( trackMessages && command->isMessage() ) {

+              Pointer<Message> message =

                 command.dynamicCast<Message>();

-            if( message->getTransactionId() == NULL ) {

+              if( message->getTransactionId() == NULL ) {

                 currentCacheSize = currentCacheSize + message->getSize();

+              }

+            }

+            else {

+              Pointer<MessagePull> messagePull =

+                command.dynamicCast<MessagePull>();

+              if( messagePull != NULL ) {

+                // just needs to be a rough estimate of size, ~4 identifiers

+                currentCacheSize += 400;

+              }

             }

         }

     }

@@ -148,12 +158,19 @@

         }

         // Now we flush messages

-        std::vector< Pointer<Message> > messages = messageCache.values();

-        std::vector< Pointer<Message> >::const_iterator messageIter = 
messages.begin();

+        std::vector< Pointer<Command> > messages = messageCache.values();

+        std::vector< Pointer<Command> >::const_iterator messageIter = 
messages.begin();

         for( ; messageIter != messages.end(); ++messageIter ) {

             transport->oneway( *messageIter );

         }

+

+        std::vector< Pointer<Command> > messagePulls = 
messagePullCache.values();

+        std::vector< Pointer<Command> >::const_iterator messagePullIter = 
messagePulls.begin();

+

+        for( ; messagePullIter != messagePulls.end(); ++messagePullIter ) {

+          transport->oneway( *messagePullIter );

+        }

     }

     AMQ_CATCH_RETHROW( IOException )

     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )

@@ -790,6 +807,19 @@

}

 
////////////////////////////////////////////////////////////////////////////////

+Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull* pull 
) {

+  if( pull != NULL 

+    && pull->getDestination() != NULL

+    && pull->getConsumerId() != NULL) {

+      std::string id = pull->getDestination()->toString() + "::" + 
pull->getConsumerId()->toString();

+      messagePullCache.put( id,

+        Pointer<Command>( pull->cloneDataStructure() ) );

+  }

+

+  return Pointer<Command>();

+}

+

+////////////////////////////////////////////////////////////////////////////////

void ConnectionStateTracker::connectionInterruptProcessingComplete(

     transport::Transport* transport, const Pointer<ConnectionId>& connectionId 
) {

 

 

From: Daniel Laugt [mailto:daniel.la...@wallstreetsystems.com] 
Sent: 08 November 2011 13:16
To: users@activemq.apache.org
Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if 
the MessagePull command is lost

 

Hello,

 

I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration 
prefetch size = 0, polling consumer fails to reconnect during the failover.

 

This issue has been fixed by the item AMQ-2877:

https://issues.apache.org/jira/browse/AMQ-2877

 

AMQ-2877 fixes the problem in the java client side but not in the c++ client 
side. Is it possible to merge this fix to ActiveMQ-CPP?

 

Attached on this email a diff of what I've merged from AMQ-2877 to resolve the 
problem on my ActiveMQ-CPP. This diff can be used probably as a suggestion...

 

Regards,

Daniel Laügt.

 

Reply via email to