It seems that email attachment is not allowed... I put the diff directly below...
Daniel Laügt. --- soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h 2011/10/19 10:02:31 146083 +++ soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h 2011/10/19 10:17:24 146084 @@ -54,9 +54,11 @@ // Either we need to implement something similar to LinkedHashMap or find // some other way of tracking the eldest entry into the map and removing it // if the cache size is exceeded. - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>, + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>, MessageId::COMPARATOR > messageCache; + ConcurrentStlMap< std::string, Pointer<Command> > messagePullCache; + bool trackTransactions; bool restoreSessions; bool restoreConsumers; @@ -122,6 +124,8 @@ virtual Pointer<Command> processEndTransaction( TransactionInfo* info ); + virtual Pointer<Command> processMessagePull( MessagePull* pull ); + bool isRestoreConsumers() const { return this->restoreConsumers; } --- soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp 2011/10/19 10:02:31 146083 +++ soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp 2011/10/19 10:17:24 146084 @@ -108,11 +108,21 @@ void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) { try{ - if( trackMessages && command != NULL && command->isMessage() ) { - Pointer<Message> message = + if( command != NULL ) { + if( trackMessages && command->isMessage() ) { + Pointer<Message> message = command.dynamicCast<Message>(); - if( message->getTransactionId() == NULL ) { + if( message->getTransactionId() == NULL ) { currentCacheSize = currentCacheSize + message->getSize(); + } + } + else { + Pointer<MessagePull> messagePull = + command.dynamicCast<MessagePull>(); + if( messagePull != NULL ) { + // just needs to be a rough estimate of size, ~4 identifiers + currentCacheSize += 400; + } } } } @@ -148,12 +158,19 @@ } // Now we flush messages - std::vector< Pointer<Message> > messages = messageCache.values(); - std::vector< Pointer<Message> >::const_iterator messageIter = messages.begin(); + std::vector< Pointer<Command> > messages = messageCache.values(); + std::vector< Pointer<Command> >::const_iterator messageIter = messages.begin(); for( ; messageIter != messages.end(); ++messageIter ) { transport->oneway( *messageIter ); } + + std::vector< Pointer<Command> > messagePulls = messagePullCache.values(); + std::vector< Pointer<Command> >::const_iterator messagePullIter = messagePulls.begin(); + + for( ; messagePullIter != messagePulls.end(); ++messagePullIter ) { + transport->oneway( *messagePullIter ); + } } AMQ_CATCH_RETHROW( IOException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException ) @@ -790,6 +807,19 @@ } //////////////////////////////////////////////////////////////////////////////// +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull* pull ) { + if( pull != NULL + && pull->getDestination() != NULL + && pull->getConsumerId() != NULL) { + std::string id = pull->getDestination()->toString() + "::" + pull->getConsumerId()->toString(); + messagePullCache.put( id, + Pointer<Command>( pull->cloneDataStructure() ) ); + } + + return Pointer<Command>(); +} + +//////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::connectionInterruptProcessingComplete( transport::Transport* transport, const Pointer<ConnectionId>& connectionId ) { From: Daniel Laugt [mailto:daniel.la...@wallstreetsystems.com] Sent: 08 November 2011 13:16 To: users@activemq.apache.org Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost Hello, I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration prefetch size = 0, polling consumer fails to reconnect during the failover. This issue has been fixed by the item AMQ-2877: https://issues.apache.org/jira/browse/AMQ-2877 AMQ-2877 fixes the problem in the java client side but not in the c++ client side. Is it possible to merge this fix to ActiveMQ-CPP? Attached on this email a diff of what I've merged from AMQ-2877 to resolve the problem on my ActiveMQ-CPP. This diff can be used probably as a suggestion... Regards, Daniel Laügt.