Hi Laugt, Are you using synchronous or asynchronous consumer? When I was using synchronous consuming I used to have problems with failover reconnection.
regards 2011/11/8 Daniel Laugt <daniel.la...@wallstreetsystems.com> > It seems that email attachment is not allowed... I put the diff directly > below... > > > > Daniel Laügt. > > > > > > --- > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h > 2011/10/19 10:02:31 146083 > > +++ > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h > 2011/10/19 10:17:24 146084 > > @@ -54,9 +54,11 @@ > > // Either we need to implement something similar to > LinkedHashMap or find > > // some other way of tracking the eldest entry into the map > and removing it > > // if the cache size is exceeded. > > - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>, > > + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>, > > MessageId::COMPARATOR > messageCache; > > + ConcurrentStlMap< std::string, Pointer<Command> > > messagePullCache; > > + > > bool trackTransactions; > > bool restoreSessions; > > bool restoreConsumers; > > @@ -122,6 +124,8 @@ > > virtual Pointer<Command> processEndTransaction( TransactionInfo* > info ); > > + virtual Pointer<Command> processMessagePull( MessagePull* pull ); > > + > > bool isRestoreConsumers() const { > > return this->restoreConsumers; > > } > > > > --- > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp > 2011/10/19 10:02:31 146083 > > +++ > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp > 2011/10/19 10:17:24 146084 > > @@ -108,11 +108,21 @@ > > void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) { > > try{ > > - if( trackMessages && command != NULL && command->isMessage() ) { > > - Pointer<Message> message = > > + if( command != NULL ) { > > + if( trackMessages && command->isMessage() ) { > > + Pointer<Message> message = > > command.dynamicCast<Message>(); > > - if( message->getTransactionId() == NULL ) { > > + if( message->getTransactionId() == NULL ) { > > currentCacheSize = currentCacheSize + message->getSize(); > > + } > > + } > > + else { > > + Pointer<MessagePull> messagePull = > > + command.dynamicCast<MessagePull>(); > > + if( messagePull != NULL ) { > > + // just needs to be a rough estimate of size, ~4 > identifiers > > + currentCacheSize += 400; > > + } > > } > > } > > } > > @@ -148,12 +158,19 @@ > > } > > // Now we flush messages > > - std::vector< Pointer<Message> > messages = messageCache.values(); > > - std::vector< Pointer<Message> >::const_iterator messageIter = > messages.begin(); > > + std::vector< Pointer<Command> > messages = messageCache.values(); > > + std::vector< Pointer<Command> >::const_iterator messageIter = > messages.begin(); > > for( ; messageIter != messages.end(); ++messageIter ) { > > transport->oneway( *messageIter ); > > } > > + > > + std::vector< Pointer<Command> > messagePulls = > messagePullCache.values(); > > + std::vector< Pointer<Command> >::const_iterator messagePullIter = > messagePulls.begin(); > > + > > + for( ; messagePullIter != messagePulls.end(); ++messagePullIter ) > { > > + transport->oneway( *messagePullIter ); > > + } > > } > > AMQ_CATCH_RETHROW( IOException ) > > AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException ) > > @@ -790,6 +807,19 @@ > > } > > > > //////////////////////////////////////////////////////////////////////////////// > > +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull* > pull ) { > > + if( pull != NULL > > + && pull->getDestination() != NULL > > + && pull->getConsumerId() != NULL) { > > + std::string id = pull->getDestination()->toString() + "::" + > pull->getConsumerId()->toString(); > > + messagePullCache.put( id, > > + Pointer<Command>( pull->cloneDataStructure() ) ); > > + } > > + > > + return Pointer<Command>(); > > +} > > + > > > +//////////////////////////////////////////////////////////////////////////////// > > void ConnectionStateTracker::connectionInterruptProcessingComplete( > > transport::Transport* transport, const Pointer<ConnectionId>& > connectionId ) { > > > > > > From: Daniel Laugt [mailto:daniel.la...@wallstreetsystems.com] > Sent: 08 November 2011 13:16 > To: users@activemq.apache.org > Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung > consumers if the MessagePull command is lost > > > > Hello, > > > > I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration > prefetch size = 0, polling consumer fails to reconnect during the failover. > > > > This issue has been fixed by the item AMQ-2877: > > https://issues.apache.org/jira/browse/AMQ-2877 > > > > AMQ-2877 fixes the problem in the java client side but not in the c++ > client side. Is it possible to merge this fix to ActiveMQ-CPP? > > > > Attached on this email a diff of what I've merged from AMQ-2877 to resolve > the problem on my ActiveMQ-CPP. This diff can be used probably as a > suggestion... > > > > Regards, > > Daniel Laügt. > > > > -- Óscar Pernas Plaza.