Hi Laugt,

Are you using synchronous or asynchronous consumer? When I was using
synchronous consuming I used to have problems with failover reconnection.


regards

2011/11/8 Daniel Laugt <daniel.la...@wallstreetsystems.com>

> It seems that email attachment is not allowed... I put the diff directly
> below...
>
>
>
> Daniel Laügt.
>
>
>
>
>
> ---
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
>           2011/10/19 10:02:31                146083
>
> +++
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.h
>        2011/10/19 10:17:24                146084
>
> @@ -54,9 +54,11 @@
>
>         //        Either we need to implement something similar to
> LinkedHashMap or find
>
>         //        some other way of tracking the eldest entry into the map
> and removing it
>
>         //        if the cache size is exceeded.
>
> -        ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
>
> +        ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
>
>                           MessageId::COMPARATOR > messageCache;
>
> +        ConcurrentStlMap< std::string, Pointer<Command> >
> messagePullCache;
>
> +
>
>         bool trackTransactions;
>
>         bool restoreSessions;
>
>         bool restoreConsumers;
>
> @@ -122,6 +124,8 @@
>
>         virtual Pointer<Command> processEndTransaction( TransactionInfo*
> info );
>
> +        virtual Pointer<Command> processMessagePull( MessagePull* pull );
>
> +
>
>         bool isRestoreConsumers() const {
>
>             return this->restoreConsumers;
>
>         }
>
>
>
> ---
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
>      2011/10/19 10:02:31                146083
>
> +++
> soseco/7.4/7.4.stable/nirvana/tools/amq/src/main/activemq/state/ConnectionStateTracker.cpp
>   2011/10/19 10:17:24                146084
>
> @@ -108,11 +108,21 @@
>
> void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
>
>     try{
>
> -        if( trackMessages && command != NULL && command->isMessage() ) {
>
> -            Pointer<Message> message =
>
> +        if( command != NULL ) {
>
> +            if( trackMessages && command->isMessage() ) {
>
> +              Pointer<Message> message =
>
>                 command.dynamicCast<Message>();
>
> -            if( message->getTransactionId() == NULL ) {
>
> +              if( message->getTransactionId() == NULL ) {
>
>                 currentCacheSize = currentCacheSize + message->getSize();
>
> +              }
>
> +            }
>
> +            else {
>
> +              Pointer<MessagePull> messagePull =
>
> +                command.dynamicCast<MessagePull>();
>
> +              if( messagePull != NULL ) {
>
> +                // just needs to be a rough estimate of size, ~4
> identifiers
>
> +                currentCacheSize += 400;
>
> +              }
>
>             }
>
>         }
>
>     }
>
> @@ -148,12 +158,19 @@
>
>         }
>
>         // Now we flush messages
>
> -        std::vector< Pointer<Message> > messages = messageCache.values();
>
> -        std::vector< Pointer<Message> >::const_iterator messageIter =
> messages.begin();
>
> +        std::vector< Pointer<Command> > messages = messageCache.values();
>
> +        std::vector< Pointer<Command> >::const_iterator messageIter =
> messages.begin();
>
>         for( ; messageIter != messages.end(); ++messageIter ) {
>
>             transport->oneway( *messageIter );
>
>         }
>
> +
>
> +        std::vector< Pointer<Command> > messagePulls =
> messagePullCache.values();
>
> +        std::vector< Pointer<Command> >::const_iterator messagePullIter =
> messagePulls.begin();
>
> +
>
> +        for( ; messagePullIter != messagePulls.end(); ++messagePullIter )
> {
>
> +          transport->oneway( *messagePullIter );
>
> +        }
>
>     }
>
>     AMQ_CATCH_RETHROW( IOException )
>
>     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
>
> @@ -790,6 +807,19 @@
>
> }
>
>
>  
> ////////////////////////////////////////////////////////////////////////////////
>
> +Pointer<Command> ConnectionStateTracker::processMessagePull( MessagePull*
> pull ) {
>
> +  if( pull != NULL
>
> +    && pull->getDestination() != NULL
>
> +    && pull->getConsumerId() != NULL) {
>
> +      std::string id = pull->getDestination()->toString() + "::" +
> pull->getConsumerId()->toString();
>
> +      messagePullCache.put( id,
>
> +        Pointer<Command>( pull->cloneDataStructure() ) );
>
> +  }
>
> +
>
> +  return Pointer<Command>();
>
> +}
>
> +
>
>
> +////////////////////////////////////////////////////////////////////////////////
>
> void ConnectionStateTracker::connectionInterruptProcessingComplete(
>
>     transport::Transport* transport, const Pointer<ConnectionId>&
> connectionId ) {
>
>
>
>
>
> From: Daniel Laugt [mailto:daniel.la...@wallstreetsystems.com]
> Sent: 08 November 2011 13:16
> To: users@activemq.apache.org
> Subject: ActiveMQCPP - Failover and prefetch=0 can result in hung
> consumers if the MessagePull command is lost
>
>
>
> Hello,
>
>
>
> I'm using ActiveMQ 5.5.0 with ActiveMQ-CPP as client. With configuration
> prefetch size = 0, polling consumer fails to reconnect during the failover.
>
>
>
> This issue has been fixed by the item AMQ-2877:
>
> https://issues.apache.org/jira/browse/AMQ-2877
>
>
>
> AMQ-2877 fixes the problem in the java client side but not in the c++
> client side. Is it possible to merge this fix to ActiveMQ-CPP?
>
>
>
> Attached on this email a diff of what I've merged from AMQ-2877 to resolve
> the problem on my ActiveMQ-CPP. This diff can be used probably as a
> suggestion...
>
>
>
> Regards,
>
> Daniel Laügt.
>
>
>
>


-- 
Óscar Pernas Plaza.

Reply via email to