Thanks for reply!

issue opened:
https://issues.apache.org/jira/browse/AMQCPP-354

Regards
Henrique

2011/3/1 Timothy Bish <tabish...@gmail.com>:
> On Tue, 2011-03-01 at 20:22 -0300, Henrique Magarotto wrote:
>> Hi everybody,
>>
>> First, sorry for long mail and thanks for great job in ActiveMQ and
>> ActiveMQ-CPP.
>>
>> I'm testing some failure cases with ActiveMQ and ActiveMQ-CPP.
>>
>> In my test, an unexpected exception is thrown if broker is stopped
>> while consumer is in transaction.
>>
>> Environment:
>> ActiveMQ 5.4.2
>> ActiveMQ-CPP 3.2.4
>> Ubuntu 10.04
>> JRE 1.6.0_20-b02
>>
>
>> I get the following message:
>>
>> terminate called after throwing an instance of 'cms::CMSException'
>>   what():  Failover timeout of 1000 ms reached.
>> Aborted (core dumped)
>>
>
> Sounds like a bug, can you open a new issue in are Jira bug tracker and
> attach your test case along with instructions for reproducing the error?
>
> https://issues.apache.org/jira/browse/AMQCPP
>
> Regards
> Tim.
>
>> core backtrace:
>> #0  0x001ef422 in __kernel_vsyscall ()
>> #1  0x00c5a651 in *__GI_raise (sig=6) at
>> ../nptl/sysdeps/unix/sysv/linux/raise.c:64
>> #2  0x00c5da82 in *__GI_abort () at abort.c:92
>> #3  0x00bf952f in __gnu_cxx::__verbose_terminate_handler() () from
>> /usr/lib/libstdc++.so.6
>> #4  0x00bf7465 in ?? () from /usr/lib/libstdc++.so.6
>> #5  0x00bf74a2 in std::terminate() () from /usr/lib/libstdc++.so.6
>> #6  0x00bf74c5 in ?? () from /usr/lib/libstdc++.so.6
>> #7  0x00bf6915 in __cxa_call_unexpected () from /usr/lib/libstdc++.so.6
>> #8  0x0052f8ae in
>> activemq::core::TransactionSynhcronization::beforeEnd (this=0x93ac548)
>> at activemq/core/ActiveMQConsumer.cpp:84
>> #9  0x00550588 in
>> activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x93b3e48)
>> at activemq/core/ActiveMQTransactionContext.cpp:192
>> #10 0x00550d63 in activemq::core::ActiveMQTransactionContext::commit
>> (this=0x93b3e48) at activemq/core/ActiveMQTransactionContext.cpp:127
>> #11 0x0053e460 in activemq::core::ActiveMQSession::commit
>> (this=0x93b3c10) at activemq/core/ActiveMQSession.cpp:189
>> #12 0x0042c1a0 in activemq::cmsutil::PooledSession::commit
>> (this=0x93b3fd8) at activemq/cmsutil/PooledSession.h:87
>> #13 0x0804c0fb in Consumer::onMessage (this=0x93a8c30,
>> message=0x93b49b0) at main.cpp:455
>> #14 0x0804bdc8 in Consumer::consumeLoop (this=0x93a8c30) at main.cpp:415
>> #15 0x0804bb56 in Consumer::run (this=0x93a8c30) at main.cpp:381
>> #16 0x00823cf1 in decaf::lang::ThreadProperties::runCallback
>> (properties=0x93abff0) at decaf/lang/Thread.cpp:135
>> #17 0x00822847 in threadWorker (arg=0x93abff0) at decaf/lang/Thread.cpp:188
>> #18 0x0015a96e in start_thread (arg=0xb6f8bb70) at pthread_create.c:300
>> #19 0x00cfda4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
>>
>>
>> Exception specification for method
>> 'activemq::core::TransactionSynhcronization::beforeEnd' says: 'throw(
>> exceptions::ActiveMQException )' but 'ActiveMQConsumer::acknowledge'
>> throws 'cms::CMSException', so 'std::unexpected' is called and 'abort'
>> occurs. 
>> (http://www.linuxprogrammingblog.com/cpp-exception-specifications-are-evil)
>>
>>
>> When 'MessageListener' is used, the commit freezes, probability, in thread 5:
>>
>> Thread 1:
>> #0  0x00cfd422 in __kernel_vsyscall ()
>> #1  0x00ddcb5d in pthread_join (threadid=3061869424,
>> thread_return=0xbf964bdc) at pthread_join.c:89
>> #2  0x00a923c0 in decaf::lang::Thread::join (this=0x8eba574) at
>> decaf/lang/Thread.cpp:421
>> #3  0x0804c9f1 in AppTest::joinEndPoins (this=0xbf964c64) at main.cpp:586
>> #4  0x0804c678 in AppTest::run (this=0xbf964c64) at main.cpp:555
>> #5  0x0804a33c in main (argc=4, argv=0xbf964d84) at main.cpp:633
>>
>> Thread 2:
>> #0  0x00cfd422 in __kernel_vsyscall ()
>> #1  0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at
>> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122
>> #2  0x0034f9dd in __pthread_cond_wait (cond=0x8eb0e38,
>> mutex=0x8eb0e08) at forward.c:139
>> #3  0x00a606a9 in
>> decaf::internal::util::concurrent::ConditionImpl::wait
>> (condition=0x8eb0e38) at
>> decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94
>> #4  0x00ad1353 in decaf::util::concurrent::Mutex::wait
>> (this=0x8eb0d6c) at decaf/util/concurrent/Mutex.cpp:95
>> #5  0x007fc9c5 in activemq::threads::CompositeTaskRunner::run
>> (this=0x8eb0d48) at activemq/threads/CompositeTaskRunner.cpp:118
>> #6  0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
>> (properties=0x8eb0e90) at decaf/lang/Thread.cpp:135
>> #7  0x00a92847 in threadWorker (arg=0x8eb0e90) at decaf/lang/Thread.cpp:188
>> #8  0x00ddb96e in start_thread (arg=0xb7808b70) at pthread_create.c:300
>> #9  0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
>>
>> Thread 3:
>> #0  0x00cfd422 in __kernel_vsyscall ()
>> #1  0x00de2af9 in __lll_lock_wait () at
>> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142
>> #2  0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0
>> #3  0x00dddf61 in __pthread_mutex_lock (mutex=0x8ebb3a8) at
>> pthread_mutex_lock.c:61
>> #4  0x0034fba6 in pthread_mutex_lock (mutex=0x8ebb3a8) at forward.c:182
>> #5  0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock
>> (handle=0x8ebb3a8) at
>> decaf/internal/util/concurrent/unix/MutexImpl.cpp:71
>> #6  0x00ad1412 in decaf::util::concurrent::Mutex::lock
>> (this=0x8ebb2bc) at decaf/util/concurrent/Mutex.cpp:75
>> #7  0x007c5770 in
>> decaf::util::StlQueue<decaf::lang::Pointer<activemq::commands::MessageDispatch,
>> decaf::util::concurrent::atomic::AtomicRefCounter> >::lock
>> (this=0x8ebb2a8)
>>     at ./decaf/util/StlQueue.h:253
>> #8  activemq::core::MessageDispatchChannel::lock (this=0x8ebb2a8) at
>> activemq/core/MessageDispatchChannel.h:153
>> #9  0x00ad0ab5 in decaf::util::concurrent::Lock::lock
>> (this=0xb7006fe4) at decaf/util/concurrent/Lock.cpp:54
>> #10 0x00ad0c08 in Lock (this=0xfffffe00, object=0x8ebb3a8,
>> intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32
>> #11 0x0078e102 in
>> activemq::core::ActiveMQConsumer::clearMessagesInProgress
>> (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:1112
>> #12 0x007af15c in
>> activemq::core::ActiveMQSession::clearMessagesInProgress
>> (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:239
>> #13 0x00778543 in
>> activemq::core::ActiveMQConnection::transportInterrupted
>> (this=0x8eb1210) at activemq/core/ActiveMQConnection.cpp:704
>> #14 0x00803b34 in
>> activemq::transport::TransportFilter::transportInterrupted
>> (this=0x8eb11c0) at activemq/transport/TransportFilter.cpp:67
>> #15 0x008187ae in
>> activemq::transport::failover::FailoverTransport::handleTransportFailure
>> (this=0x8eb0a38, error=...) at
>> activemq/transport/failover/FailoverTransport.cpp:476
>> #16 0x0082406c in
>> activemq::transport::failover::FailoverTransportListener::onException
>> (this=0x8eb0878, ex=...) at
>> activemq/transport/failover/FailoverTransportListener.cpp:97
>> #17 0x00803c0b in activemq::transport::TransportFilter::fire
>> (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:49
>> #18 0x00803c64 in activemq::transport::TransportFilter::onException
>> (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:41
>> #19 0x00803c0b in activemq::transport::TransportFilter::fire
>> (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:49
>> #20 0x00803c64 in activemq::transport::TransportFilter::onException
>> (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:41
>> #21 0x00801b13 in activemq::transport::IOTransport::fire
>> (this=0x8eb27a0, ex=...) at activemq/transport/IOTransport.cpp:73
>> #22 0x008023bf in activemq::transport::IOTransport::run
>> (this=0x8eb27a0) at activemq/transport/IOTransport.cpp:246
>> #23 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
>> (properties=0x8eb2ea0) at decaf/lang/Thread.cpp:135
>> #24 0x00a92847 in threadWorker (arg=0x8eb2ea0) at decaf/lang/Thread.cpp:188
>> #25 0x00ddb96e in start_thread (arg=0xb7007b70) at pthread_create.c:300
>> #26 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
>>
>> Thread 4:
>> #0  0x00cfd422 in __kernel_vsyscall ()
>> #1  0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at
>> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122
>> #2  0x0034f9dd in __pthread_cond_wait (cond=0x8eba6c8,
>> mutex=0x8eba698) at forward.c:139
>> #3  0x00a606a9 in
>> decaf::internal::util::concurrent::ConditionImpl::wait
>> (condition=0x8eba6c8) at
>> decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94
>> #4  0x00ad1353 in decaf::util::concurrent::Mutex::wait
>> (this=0x8eba584) at decaf/util/concurrent/Mutex.cpp:95
>> #5  0x00ad029a in decaf::util::concurrent::CountDownLatch::await
>> (this=0x8eba57c) at decaf/util/concurrent/CountDownLatch.cpp:56
>> #6  0x0804af84 in JMSEndPointThread::stopCheck (this=0x8eba574,
>> timeOut=-1) at main.cpp:226
>> #7  0x0804bb46 in Consumer::run (this=0x8eba570) at main.cpp:379
>> #8  0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
>> (properties=0x8eba5b0) at decaf/lang/Thread.cpp:135
>> #9  0x00a92847 in threadWorker (arg=0x8eba5b0) at decaf/lang/Thread.cpp:188
>> #10 0x00ddb96e in start_thread (arg=0xb6806b70) at pthread_create.c:300
>> #11 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
>>
>> Thread 5:
>> #0  0x00cfd422 in __kernel_vsyscall ()
>> #1  0x00de2af9 in __lll_lock_wait () at
>> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142
>> #2  0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0
>> #3  0x00dddf61 in __pthread_mutex_lock (mutex=0x8eb05e0) at
>> pthread_mutex_lock.c:61
>> #4  0x0034fba6 in pthread_mutex_lock (mutex=0x8eb05e0) at forward.c:182
>> #5  0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock
>> (handle=0x8eb05e0) at
>> decaf/internal/util/concurrent/unix/MutexImpl.cpp:71
>> #6  0x00ad1412 in decaf::util::concurrent::Mutex::lock
>> (this=0x8eb0a88) at decaf/util/concurrent/Mutex.cpp:75
>> #7  0x00ad0ab5 in decaf::util::concurrent::Lock::lock
>> (this=0xb6004ce0) at decaf/util/concurrent/Lock.cpp:54
>> #8  0x00ad0c08 in Lock (this=0xfffffe00, object=0x8eb05e0,
>> intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32
>> #9  0x0081b8fd in
>> activemq::transport::failover::FailoverTransport::oneway
>> (this=0x8eb0a38, command=...) at
>> activemq/transport/failover/FailoverTransport.cpp:186
>> #10 0x00807f9f in
>> activemq::transport::correlator::ResponseCorrelator::oneway
>> (this=0x8eb11c0, command=...) at
>> activemq/transport/correlator/ResponseCorrelator.cpp:82
>> #11 0x007713be in activemq::core::ActiveMQConnection::oneway
>> (this=0x8eb1210, command=...) at
>> activemq/core/ActiveMQConnection.cpp:741
>> #12 0x007b0a4f in activemq::core::ActiveMQSession::oneway
>> (this=0x8ebab90, command=...) at activemq/core/ActiveMQSession.cpp:903
>> #13 0x00795a1c in activemq::core::ActiveMQConsumer::acknowledge
>> (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:860
>> #14 0x0079f885 in
>> activemq::core::TransactionSynhcronization::beforeEnd (this=0x8ebb9e8)
>> at activemq/core/ActiveMQConsumer.cpp:85
>> #15 0x007c0588 in
>> activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x8ebadc8)
>> at activemq/core/ActiveMQTransactionContext.cpp:192
>> #16 0x007c0d63 in activemq::core::ActiveMQTransactionContext::commit
>> (this=0x8ebadc8) at activemq/core/ActiveMQTransactionContext.cpp:127
>> #17 0x007ae460 in activemq::core::ActiveMQSession::commit
>> (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:189
>> #18 0x0069c1a0 in activemq::cmsutil::PooledSession::commit
>> (this=0x8ebaf38) at activemq/cmsutil/PooledSession.h:87
>> #19 0x0804c0fb in Consumer::onMessage (this=0x8eba570,
>> message=0x8ebbec0) at main.cpp:455
>> #20 0x0079972f in activemq::core::ActiveMQConsumer::dispatch
>> (this=0x8ebb270, dispatch=...) at
>> activemq/core/ActiveMQConsumer.cpp:1018
>> #21 0x007bd5c3 in activemq::core::ActiveMQSessionExecutor::dispatch
>> (this=0x8ebae78, dispatch=...) at
>> activemq/core/ActiveMQSessionExecutor.cpp:129
>> #22 0x007bd993 in activemq::core::ActiveMQSessionExecutor::iterate
>> (this=0x8ebae78) at activemq/core/ActiveMQSessionExecutor.cpp:166
>> #23 0x008007f3 in activemq::threads::DedicatedTaskRunner::run
>> (this=0x8ebb550) at activemq/threads/DedicatedTaskRunner.cpp:111
>> #24 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback
>> (properties=0x8ebaad8) at decaf/lang/Thread.cpp:135
>> #25 0x00a92847 in threadWorker (arg=0x8ebaad8) at decaf/lang/Thread.cpp:188
>> #26 0x00ddb96e in start_thread (arg=0xb6005b70) at pthread_create.c:300
>> #27 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130
>>
>>
>>
>> I tried to resolve this issue changing configurations and code, but i
>> always get an error or client freezes.
>> Is it a bug? Any suggestions?
>>
>> Thanks.
>> Henrique
>>
>> How to reproduce:
>> 1. Start ActiveMQ
>> 2. Send a massage './activemqTest -p1 -n1'
>> 3. Run consumer './activemqTest -c1 -d10000'
>> 4. Stop activemq when 'Starting delay...' message appears.
>> 5. After activemq is down and delay is finish, consumer try commit
>> (message 'Try commit') and unexpected exception occurs.
>>
>> Following test code, based on ActiveMQ-CPP sample
>> 'activemq-cpp-library-3.2.4/src/examples/main.cpp':
>>
>>
>> /*
>>  * Licensed to the Apache Software Foundation (ASF) under one or more
>>  * contributor license agreements.  See the NOTICE file distributed with
>>  * this work for additional information regarding copyright ownership.
>>  * The ASF licenses this file to You under the Apache License, Version 2.0
>>  * (the "License"); you may not use this file except in compliance with
>>  * the License.  You may obtain a copy of the License at
>>  *
>>  *     http://www.apache.org/licenses/LICENSE-2.0
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>>
>> // START SNIPPET: demo
>>
>> #include <activemq/library/ActiveMQCPP.h>
>> #include <activemq/cmsutil/SessionPool.h>
>> #include <activemq/cmsutil/PooledSession.h>
>> #include <activemq/cmsutil/ResourceLifecycleManager.h>
>> #include <activemq/exceptions/ActiveMQException.h>
>> #include <decaf/lang/Thread.h>
>> #include <decaf/lang/Runnable.h>
>> #include <decaf/util/concurrent/CountDownLatch.h>
>> #include <decaf/util/concurrent/Mutex.h>
>> #include <decaf/lang/Integer.h>
>> #include <decaf/lang/Long.h>
>> #include <decaf/lang/System.h>
>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> #include <activemq/util/Config.h>
>> #include <cms/Connection.h>
>> #include <cms/Session.h>
>> #include <cms/TextMessage.h>
>> #include <cms/BytesMessage.h>
>> #include <cms/MapMessage.h>
>> #include <cms/ExceptionListener.h>
>> #include <cms/MessageListener.h>
>> #include <iostream>
>> #include <memory>
>> #include <map>
>> #include <vector>
>> #include <unistd.h>
>>
>> using namespace activemq::core;
>> using namespace activemq::cmsutil;
>> using namespace decaf::util::concurrent;
>> using namespace decaf::util;
>> using namespace decaf::lang;
>> using namespace cms;
>>
>>
>> class ConnPoolManager : protected decaf::util::concurrent::Mutex,
>>                         public ExceptionListener {
>> private:
>>     typedef std::map< int, activemq::cmsutil::SessionPool* > tSessionPoolMap;
>>
>>     std::string _brokerURI;
>>     ConnectionFactory* _connectionFactory;
>>     Connection* _connection;
>>     ResourceLifecycleManager* _lifecycleManager;
>>     tSessionPoolMap _pools;
>>     bool _started;
>>
>> public:
>>     ConnPoolManager() :
>>             _brokerURI("tcp://localhost:61616"),
>>             _connectionFactory(NULL),
>>             _connection(NULL),
>>             _lifecycleManager(NULL),
>>             _started(false)
>>     {
>>     }
>>
>>     virtual ~ConnPoolManager() {
>>         cleanup();
>>     }
>>
>>     void setBrokerURI(const std::string& uri) {
>>         _brokerURI = uri;
>>     }
>>
>>     bool startPools() {
>>         synchronized(this) {
>>             if(_started) return true;
>>             try {
>>                 createConnection();
>>                 createPools();
>>                 _started=true;
>>                 return true;
>>             } catch(CMSException& e) {
>>                 std::cout<<"(ConnPoolManager::startPools)
>> CMSException: " << e.getMessage() << std::endl;
>>             } catch(...) {
>>                 std::cout<<"(ConnPoolManager::startPools) Unknown
>> exception!!!" << std::endl;
>>             }
>>         }
>>         cleanup();
>>         return false;
>>     }
>>
>>     PooledSession* getSession(cms::Session::AcknowledgeMode ack) {
>>         if(_started) {
>>             return _pools[ack]->takeSession();
>>         } else if(startPools()) {
>>             return _pools[ack]->takeSession();
>>         } else {
>>             return NULL;
>>         }
>>     }
>>
>> private:
>>
>>     void createConnection(){
>>         // Create a ConnectionFactory
>>         _connectionFactory =
>> ConnectionFactory::createCMSConnectionFactory( _brokerURI );
>>
>>         // Create a Connection
>>         _connection = _connectionFactory->createConnection();
>>         _connection->start();
>>         _connection->setExceptionListener(this);
>>     }
>>
>>     void createPools(){
>>         // Create lifecycle manager
>>         _lifecycleManager = new 
>> activemq::cmsutil::ResourceLifecycleManager();
>>
>>         // Create pools, one for each acknowledge type
>>         for(    int ack = cms::Session::AUTO_ACKNOWLEDGE;
>>                 ack <= cms::Session::INDIVIDUAL_ACKNOWLEDGE;
>>                 ack++)
>>         {
>>             _pools[ack] =  new activemq::cmsutil::SessionPool(
>>                     _connection,
>>                     static_cast<cms::Session::AcknowledgeMode>(ack),
>>                     _lifecycleManager);
>>         }
>>     }
>>
>>     void cleanup() {
>>         synchronized(this) {
>>             releasePools();
>>             if(_lifecycleManager!=NULL) {
>>                 try { delete _lifecycleManager;
>>                 } catch(...) {}
>>             } _lifecycleManager = NULL;
>>
>>             if(_connection!=NULL) {
>>                 try { delete _connection;
>>                 } catch(...) {}
>>             } _connection = NULL;
>>             if(_connectionFactory!=NULL) {
>>                 try { delete _connectionFactory;
>>                 } catch(...) {}
>>             } _connectionFactory = NULL;
>>             _started=false;
>>         }
>>     }
>>
>>     void releasePools() {
>>         for(tSessionPoolMap::iterator it=_pools.begin();
>> it!=_pools.end(); it++) {
>>             try {
>>                 delete it->second;
>>             } catch(...) { }
>>         }
>>         _pools.clear();
>>     }
>>
>>     // If something bad happens you see it here as this class is also been
>>     // registered as an ExceptionListener with the connection.
>>     virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
>>         std::cout<<"(ConnPoolManager::onException) CMS Exception
>> occurred.  Shutting down client."<<std::endl;
>>         ex.printStackTrace();
>>         exit(1);
>>     }
>>
>>
>>
>> };
>>
>> class JMSEndPointThread  : public Thread {
>> private:
>>     CountDownLatch _stopNow;
>>     ConnPoolManager* _pools;
>>     std::string _destinationString;
>>     PooledSession* _session;
>>     Destination* _destination;
>>     bool _sessionTransacted;
>>
>> protected:
>>
>>     JMSEndPointThread(  ConnPoolManager* pools,
>>                         const std::string& dest,
>>                         bool sessionTransacted) :
>>             _stopNow(1),
>>             _pools(pools),
>>             _destinationString(dest),
>>             _session(NULL),
>>             _destination(NULL),
>>             _sessionTransacted(sessionTransacted)
>>     {
>>     }
>>
>>     virtual bool prepareSession() {
>>         cleanup();
>>         // Create a Session
>>         if( _sessionTransacted ) {
>>             _session = _pools->getSession(Session::SESSION_TRANSACTED);
>>         } else {
>>             _session = _pools->getSession(Session::AUTO_ACKNOWLEDGE);
>>         }
>>         if(_session==NULL) return false;
>>
>>         // Create the Queue destination
>>         _destination = _session->createQueue( _destinationString );
>>         return true;
>>     }
>>
>>     PooledSession* getSession() { return _session; }
>>     Destination* getDestination() { return _destination; }
>>     bool getTransacted() { return _sessionTransacted; }
>>
>>     bool stopCheck(int timeOut=0) {
>>         if(timeOut>=0) return _stopNow.await(timeOut);
>>         else _stopNow.await();
>>         return true;
>>     }
>>
>>     virtual void cleanup() {
>>         // Destroy resources.
>>         try{
>>             if( _destination != NULL ) delete _destination;
>>         } catch(...) { }
>>         _destination = NULL;
>>
>>         // Back session to pool
>>         try{
>>             if( _session != NULL ) _session->close();
>>         } catch(...) { }
>>         _session = NULL;
>>     }
>>
>> public:
>>
>>     typedef std::vector<JMSEndPointThread*> List;
>>
>>     virtual ~JMSEndPointThread() {
>>         cleanup();
>>     }
>>
>>     void stopNow() {
>>         _stopNow.countDown();
>>     }
>>
>> };
>>
>> class Producer : public JMSEndPointThread {
>> private:
>>
>>     MessageProducer* _producer;
>>     int _delay;
>>     int _numMessages;
>>
>> public:
>>
>>     Producer(   ConnPoolManager* pools,
>>                 const std::string& dest,
>>                 int delay,
>>                 int numMessages,
>>                 bool sessionTransacted = false ) :
>>         JMSEndPointThread(pools,dest,sessionTransacted)
>>     {
>>         _producer = NULL;
>>         _delay = delay;
>>         _numMessages = numMessages;
>>     }
>>
>>     virtual ~Producer() {
>>         cleanup();
>>     }
>>
>>     virtual void run() {
>>         std::cout<<"Producer started!!!! Thread: 
>> "<<Thread::getId()<<std::endl;
>>         while(!stopCheck() && _numMessages) {
>>             try {
>>                 if(prepareProducer()) {
>>                     sendMessages();
>>                     continue;
>>                 }
>>             } catch(CMSException& e) {
>>                 std::cout<<"(Producer::run) CMSException: " <<
>> e.getMessage() << std::endl;
>>             } catch(...) {
>>                 std::cout<<"(Producer::run) Unknown exception!!!" << 
>> std::endl;
>>             }
>>             cleanup();
>>             stopCheck(5000); // reconnect delay
>>         }
>>         std::cout<<"Producer end!!!! Thread: "<<Thread::getId()<<std::endl;
>>         cleanup();
>>     }
>>
>> private:
>>
>>     bool prepareProducer() {
>>         // Get Session
>>         if(!prepareSession()) return false;
>>         // Create a MessageProducer from the Session to the Queue
>>         _producer = getSession()->createProducer( getDestination() );
>>         _producer->setDeliveryMode( DeliveryMode::PERSISTENT );
>>         return true;
>>     }
>>     void sendMessages() {
>>         // Create a messages
>>         std::stringstream text;
>>         text<<"Hello world! from thread " << Thread::getId();
>>
>>         while(!stopCheck() && _numMessages) {
>>
>>             TextMessage* message = getSession()->createTextMessage(
>> text.str() );
>>             message->setIntProperty( "Integer", _numMessages );
>>
>>             // Tell the producer to send the message
>>             std::cout<<"Sent message #"<<_numMessages<<" from thread
>> "<<Thread::getId()<<std::endl;
>>             _producer->send( message );
>>             delete message;
>>             --_numMessages;
>>
>>             stopCheck(_delay);
>>             if(getTransacted()) getSession()->commit();
>>         }
>>     }
>>
>>     virtual void cleanup() {
>>         try{
>>             if( _producer != NULL ) delete _producer;
>>         } catch(...) { }
>>         _producer = NULL;
>>
>>         JMSEndPointThread::cleanup();
>>     }
>>
>> };
>>
>>
>> class Consumer :    public MessageListener,
>>                     public JMSEndPointThread {
>>
>> private:
>>
>>     MessageConsumer* _consumer;
>>     int _delay;
>>     bool _useListener;
>>
>> public:
>>
>>     Consumer(   ConnPoolManager* pools,
>>                 const std::string& dest,
>>                 int delay,
>>                 bool useListener = true,
>>                 bool sessionTransacted = true ) :
>>         JMSEndPointThread(pools,dest,sessionTransacted)
>>     {
>>         _consumer = NULL;
>>         _delay = delay;
>>         _useListener = useListener;
>>     }
>>     virtual ~Consumer(){
>>         cleanup();
>>     }
>>
>>     virtual void run() {
>>         std::cout<<"Consumer started!!!! Thread: 
>> "<<Thread::getId()<<std::endl;
>>         while(!stopCheck()) {
>>             try {
>>                 if(prepareConsumer()) {
>>                     // Wait while asynchronous messages come in.
>>                     if(_useListener) {
>>                         stopCheck(-1);
>>                     } else {
>>                         consumeLoop();
>>                     }
>>                     continue;
>>                 }
>>             } catch (CMSException& e) {
>>                 std::cout<<"(Consumer::run) CMSException: " <<
>> e.getMessage() << std::endl;
>>             } catch(...) {
>>                 std::cout<<"(Consumer::run) Unknown exception!!!" << 
>> std::endl;
>>             }
>>             cleanup();
>>             stopCheck(5000); // reconnect delay
>>         }
>>         std::cout<<"Consumer end!!!! Thread: "<<Thread::getId()<<std::endl;
>>         cleanup();
>>     }
>>
>>
>> private:
>>
>>     bool prepareConsumer() {
>>         // Get Session
>>         if(!prepareSession()) return false;
>>         // Create a MessageConsumer from the Session to the Queue
>>         _consumer = getSession()->createConsumer( getDestination() );
>>         if(_useListener)
>>             _consumer->setMessageListener( this );
>>
>>         return true;
>>     }
>>
>>     void consumeLoop() {
>>         while(!stopCheck()) {
>>             Message *msg = _consumer->receive(500);
>>             if(msg!=NULL) {
>>                 onMessage(msg);
>>                 delete msg;
>>             }
>>         }
>>     }
>>
>>     // Called from the consumer since this class is a registered
>> MessageListener.
>>     virtual void onMessage( const Message* message ) {
>>
>>         static int count = 0;
>>
>>         try
>>         {
>>             count++;
>>             const TextMessage* textMessage =
>>                 dynamic_cast< const TextMessage* >( message );
>>             std::string text = "";
>>
>>             if( textMessage != NULL ) {
>>                 text = textMessage->getText();
>>             } else {
>>                 text = "NOT A TEXTMESSAGE!";
>>             }
>>
>>             std::cout<<"Message #"<<count<<" Received: "<<text<<std::endl;
>>             if(_delay) {
>>                 std::cout<<"Starting delay..."<<std::endl;
>>                 stopCheck(_delay);
>>             }
>>
>>         } catch (CMSException& e) {
>>             std::cout<<"(Consumer::onMessage) CMSException: " <<
>> e.getMessage() << std::endl;
>>         } catch(...) {
>>             std::cout<<"(Consumer::onMessage) Unknown exception!!!" <<
>> std::endl;
>>         }
>>
>>         try
>>         {   // Commit all messages.
>>             if( getTransacted() ) {
>>                 std::cout<<"Try commit....."<<std::endl;
>>                 getSession()->commit();
>>                 std::cout<<"Commit OK!!!"<<std::endl;
>>             }
>>         } catch (CMSException& e) {
>>             std::cout<<"(Consumer::onMessage) CMSException: " <<
>> e.getMessage() << std::endl;
>>         } catch(...) {
>>             std::cout<<"(Consumer::onMessage) Unknown exception!!!" <<
>> std::endl;
>>         }
>>
>>     }
>>
>>     virtual void cleanup() {
>>         try{
>>             if( _consumer != NULL ) delete _consumer;
>>         } catch (...) { }
>>         _consumer = NULL;
>>         JMSEndPointThread::cleanup();
>>     }
>> };
>>
>>
>> class ScopedActiveMQLibrary {
>> public:
>>     ScopedActiveMQLibrary() {
>>         activemq::library::ActiveMQCPP::initializeLibrary();
>>     }
>>     virtual ~ScopedActiveMQLibrary() {
>>         activemq::library::ActiveMQCPP::shutdownLibrary();
>>     }
>> };
>>
>> class AppTest : private ScopedActiveMQLibrary {
>> private:
>>     int _consumer;
>>     int _producer;
>>     int _numMessages;
>>     bool _useListener;
>>     int _delay;
>>     ConnPoolManager _poolManager;
>>     JMSEndPointThread::List _endPointList;
>>
>>     static AppTest* _app;
>>     static CountDownLatch _terminationRequest;
>>
>> public:
>>     AppTest(int argc, char** argv) :
>>             _consumer(0),
>>             _producer(0),
>>             _numMessages(1),
>>             _useListener(false),
>>             _delay(0)
>>     {
>>         _app=this;
>>         int c;
>>         while ((c = getopt (argc, argv, "lc:p:n:d:")) != -1) {
>>             switch(c) {
>>             case 'l': _useListener=true; break;
>>             case 'c': _consumer=atoi(optarg); break;
>>             case 'p': _producer=atoi(optarg); break;
>>             case 'n': _numMessages=atoi(optarg); break;
>>             case 'd': _delay=atoi(optarg); break;
>>             }
>>         }
>>         if(_consumer<0) _consumer=0;
>>         if(_producer<0) _producer=0;
>>         _poolManager.setBrokerURI(
>>                 "failover://("
>>                 "tcp://localhost:61616"
>>                 "?transport.useInactivityMonitor=false"
>>                 ")?timeout=1000"
>>                 "&cms.RedeliveryPolicy.maximumRedeliveries=-1"
>>                 );
>>         _poolManager.startPools();
>>     }
>>
>>     virtual ~AppTest() {
>>     }
>>
>>     int run() {
>>
>>         // Install signal handler
>>         if(!installSigAction())
>>             return 1;
>>
>>         // Create consumer/producer objects
>>         loadEndPoins();
>>
>>         // Start the producer/consumer thread.
>>         startEndPoins();
>>
>>         // start another tasks
>>         // ...
>>
>>         // Wait for termination request
>>         /*if(_consumer) {
>>             _terminationRequest.await();
>>             stopEndPoins();
>>         }*/
>>
>>         // join threads
>>         joinEndPoins();
>>
>>         return 0;
>>     }
>>
>> private:
>>
>>     void loadEndPoins() {
>>         for(int i=0; i<_consumer;i++) {
>>             _endPointList.push_back( new
>> Consumer(&_poolManager,"TEST.FOO",_delay,_useListener) );
>>         }
>>         for(int i=0; i<_producer;i++) {
>>             _endPointList.push_back( new
>> Producer(&_poolManager,"TEST.FOO",_delay,_numMessages) );
>>         }
>>     }
>>
>>     void startEndPoins() {
>>         for(JMSEndPointThread::List::iterator it=_endPointList.begin();
>>             it!=_endPointList.end(); it++ )
>>             (**it).start();
>>     }
>>
>>     void stopEndPoins() {
>>         for(JMSEndPointThread::List::reverse_iterator 
>> it=_endPointList.rbegin();
>>             it!=_endPointList.rend(); it++ )
>>             (**it).stopNow();
>>     }
>>
>>     void joinEndPoins() {
>>         for(JMSEndPointThread::List::reverse_iterator 
>> it=_endPointList.rbegin();
>>             it!=_endPointList.rend(); it++ ) {
>>             (**it).join();
>>             delete *it;
>>             *it = NULL;
>>         }
>>         _endPointList.clear();
>>     }
>>
>>     static void signalHandler(int sig) {
>>         _app->_terminationRequest.countDown();
>>         _app->stopEndPoins();
>>     }
>>
>>     bool installSigAction() {
>>         struct sigaction action;
>>         memset(&action, 0, sizeof(action));
>>         action.sa_handler = signalHandler;
>>         if( sigaction(SIGTERM, &action, NULL)<0 ||
>>             sigaction(SIGQUIT, &action, NULL)<0 ||
>>             sigaction(SIGINT,  &action, NULL)<0 )
>>         { return false; }
>>         return true;
>>     }
>>
>>     int termWait() {
>>         sigset_t sset;
>>         sigemptyset(&sset);
>>         sigaddset(&sset, SIGINT);
>>         sigaddset(&sset, SIGQUIT);
>>         sigaddset(&sset, SIGTERM);
>>         sigprocmask(SIG_BLOCK, &sset, NULL);
>>         int sig;
>>         sigwait(&sset, &sig);
>>         return sig;
>>     }
>>
>> };
>> AppTest* AppTest::_app = NULL;
>> CountDownLatch AppTest::_terminationRequest(1);
>>
>>
>> void my_unexpected() {
>>     throw activemq::exceptions::ActiveMQException();
>> }
>>
>> int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
>>     //std::set_unexpected(my_unexpected);
>>     AppTest test(argc, argv);
>>     return test.run();
>> }
>>
>> // END SNIPPET: demo
>
> --
> Tim Bish
> ------------
> FuseSource
> Email: tim.b...@fusesource.com
> Web: http://fusesource.com
> Twitter: tabish121
> Blog: http://timbish.blogspot.com/
>
>
>

Reply via email to