Hello Timothy, I've just looked the svn trunk and I don't see the fix...
The last commit on ConnectionStateTracker.cpp has been done the 21th April 2011. ActiveMQ-CPP has been release the 29th April 2011. On the issue AMQ-2877, the method processMessagePull() has been overridden in the ConnectionStateTracker class. This is not the case in the trunk of ActiveMQ-CPP. Daniel. -----Original Message----- From: Timothy Bish [mailto:tabish...@gmail.com] Sent: 08 November 2011 16:33 To: users@activemq.apache.org Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung consumers if the MessagePull command is lost On Tue, 2011-11-08 at 16:25 +0100, Daniel Laugt wrote: > For ActiveMQ-CPP, I'm using the version 3.4.0. > > Daniel. I believe these fixes were already made in trunk, I'd recommend you try out that code. Regards > > -----Original Message----- > From: Timothy Bish [mailto:tabish...@gmail.com] > Sent: 08 November 2011 16:21 > To: users@activemq.apache.org > Subject: RE: ActiveMQCPP - Failover and prefetch=0 can result in hung > consumers if the MessagePull command is lost > > On Tue, 2011-11-08 at 16:13 +0100, Daniel Laugt wrote: > > Hello, > > > > I'm using a synchronous consumer. If I understand well the configuration > > prefetch size = 0 makes the consumers as synchronous. > > > > What version of ActiveMQ-CPP are you using? > > > Regards, > > Daniel. > > > > -----Original Message----- > > From: Oscar Pernas [mailto:os...@pernas.es] > > Sent: 08 November 2011 15:45 > > To: users@activemq.apache.org > > Subject: Re: ActiveMQCPP - Failover and prefetch=0 can result in hung > > consumers if the MessagePull command is lost > > > > Hi Laugt, > > > > Are you using synchronous or asynchronous consumer? When I was using > > synchronous consuming I used to have problems with failover reconnection. > > > > > > regards > > > > 2011/11/8 Daniel Laugt <daniel.la...@wallstreetsystems.com> > > > > > It seems that email attachment is not allowed... I put the diff directly > > > below... > > > > > > > > > > > > Daniel Laügt. > > > > > > > > > > > > > > > > > > --- > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h > > > 2011/10/19 10:02:31 146083 > > > > > > +++ > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h > > > 2011/10/19 10:17:24 146084 > > > > > > @@ -54,9 +54,11 @@ > > > > > > // Either we need to implement something similar to > > > LinkedHashMap or find > > > > > > // some other way of tracking the eldest entry into the map > > > and removing it > > > > > > // if the cache size is exceeded. > > > > > > - ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>, > > > > > > + ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>, > > > > > > MessageId::COMPARATOR > messageCache; > > > > > > + ConcurrentStlMap< std::string, Pointer<Command> > > > > messagePullCache; > > > > > > + > > > > > > bool trackTransactions; > > > > > > bool restoreSessions; > > > > > > bool restoreConsumers; > > > > > > @@ -122,6 +124,8 @@ > > > > > > virtual Pointer<Command> processEndTransaction( TransactionInfo* > > > info ); > > > > > > + virtual Pointer<Command> processMessagePull( MessagePull* pull ); > > > > > > + > > > > > > bool isRestoreConsumers() const { > > > > > > return this->restoreConsumers; > > > > > > } > > > > > > > > > > > > --- > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp > > > 2011/10/19 10:02:31 146083 > > > > > > +++ > > > soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp > > > 2011/10/19 10:17:24 146084 > > > > > > @@ -108,11 +108,21 @@ > > > > > > void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) > > > { > > > > > > try{ > > > > > > - if( trackMessages && command != NULL && command->isMessage() ) { > > > > > > - Pointer<Message> message = > > > > > > + if( command != NULL ) { > > > > > > + if( trackMessages && command->isMessage() ) { > > > > > > + Pointer<Message> message = > > > > > > command.dynamicCast<Message>(); > > > > > > - if( message->getTransactionId() == NULL ) { > > > > > > + if( message->getTransactionId() == NULL ) { > > > > > > currentCacheSize = currentCacheSize + message->getSize(); > > > > > > + } > > > > > > + } > > > > > > + else { > > > > > > + Pointer<MessagePull> messagePull = > > > > > > + command.dynamicCast<MessagePull>(); > > > > > > + if( messagePull != NULL ) { > > > > > > + // just needs to be a rough estimate of size, ~4 > > > identifiers > > > > > > + currentCacheSize += 400; > > > > > > + } > > > > > > } > > > > > > } > > > > > > } > > > > > > @@ -148,12 +158,19 @@ > > > > > > } > > > > > > // Now we flush messages > > > > > > - std::vector< Pointer<Message> > messages = messageCache.values(); > > > > > > - std::vector< Pointer<Message> >::const_iterator messageIter = > > > messages.begin(); > > > > > > + std::vector< Pointer<Command> > messages = messageCache.values(); > > > > > > + std::vector< Pointer<Command> >::const_iterator messageIter = > > > messages.begin(); > > > > > > for( ; messageIter != messages.end(); ++messageIter ) { > > > > > > transport->oneway( *messageIter ); > > > > > > } > > > > > > + > > > > > > + std::vector< Pointer<Command> > messagePulls = > > > messagePullCache.values(); > > > > > > + std::vector< Pointer<Command> >::const_iterator messagePullIter = > > > messagePulls.begin(); > > > > > > + > > > > > > + for( ; messagePullIter != messagePulls.end(); ++messagePullIter ) > > > { > > > > > > + transport->oneway( *messagePullIter ); > > > > > > + } > > > > > > } > > > > > > AMQ_CATCH_RETHROW( IOException ) > > > > > > AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException ) > > > > > > @@ -790,6 +807,19 @@ > > > > > > } > > > > > > > > > > > > //////////////////////////////////////////////////////////////////////////////// > > > > > > +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull* > > > pull ) { > > > > > > + if( pull != NULL > > > > > > + && pull->getDestination() != NULL > > > > > > + && pull->getConsumerId() != NULL) { > > > > > > + std::string id = pull->getDestination()->toString() + "::" + > > > pull->getConsumerId()->toString(); > > > > > > + messagePullCache.put( id, > > > > > > + Pointer<Command>( pull->cloneDataStructure() ) ); > > > > > > + } > > > > > > + > > > > > > + return Pointer<Command>(); > > > > > > +} > > > > > > + > > > > > > > > > +//////////////////////////////////////////////////////////////////////////////// > > > > > > void ConnectionStateTracker::connectionInterruptProcessingComplete( > > > > > > transport::Transport* transport, const Pointer<ConnectionId>& > > > connectionId ) { > > > > > > > > > > > > > > > > > > From: Daniel Laugt [mailto:daniel.la...@wallstreetsystems.com] > > > Sent: 08 November 2011 13:16 > > > To: users@activemq.apache.org > > > Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung > > > consumers if the MessagePull command is lost > > > > > > > > > > > > Hello, > > > > > > > > > > > > I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration > > > prefetch size = 0, polling consumer fails to reconnect during the > > > failover. > > > > > > > > > > > > This issue has been fixed by the item AMQ-2877: > > > > > > https://issues.apache.org/jira/browse/AMQ-2877 > > > > > > > > > > > > AMQ-2877 fixes the problem in the java client side but not in the c++ > > > client side. Is it possible to merge this fix to ActiveMQ-CPP? > > > > > > > > > > > > Attached on this email a diff of what I've merged from AMQ-2877 to resolve > > > the problem on my ActiveMQ-CPP. This diff can be used probably as a > > > suggestion... > > > > > > > > > > > > Regards, > > > > > > Daniel Laügt. > > > > > > > > > > > > > > > > > > > -- Tim Bish ------------ FuseSource Email: tim.b...@fusesource.com Web: http://fusesource.com Twitter: tabish121 Blog: http://timbish.blogspot.com/