Thanks for reply! issue opened: https://issues.apache.org/jira/browse/AMQCPP-354
Regards Henrique 2011/3/1 Timothy Bish <tabish...@gmail.com>: > On Tue, 2011-03-01 at 20:22 -0300, Henrique Magarotto wrote: >> Hi everybody, >> >> First, sorry for long mail and thanks for great job in ActiveMQ and >> ActiveMQ-CPP. >> >> I'm testing some failure cases with ActiveMQ and ActiveMQ-CPP. >> >> In my test, an unexpected exception is thrown if broker is stopped >> while consumer is in transaction. >> >> Environment: >> ActiveMQ 5.4.2 >> ActiveMQ-CPP 3.2.4 >> Ubuntu 10.04 >> JRE 1.6.0_20-b02 >> > >> I get the following message: >> >> terminate called after throwing an instance of 'cms::CMSException' >> what(): Failover timeout of 1000 ms reached. >> Aborted (core dumped) >> > > Sounds like a bug, can you open a new issue in are Jira bug tracker and > attach your test case along with instructions for reproducing the error? > > https://issues.apache.org/jira/browse/AMQCPP > > Regards > Tim. > >> core backtrace: >> #0 0x001ef422 in __kernel_vsyscall () >> #1 0x00c5a651 in *__GI_raise (sig=6) at >> ../nptl/sysdeps/unix/sysv/linux/raise.c:64 >> #2 0x00c5da82 in *__GI_abort () at abort.c:92 >> #3 0x00bf952f in __gnu_cxx::__verbose_terminate_handler() () from >> /usr/lib/libstdc++.so.6 >> #4 0x00bf7465 in ?? () from /usr/lib/libstdc++.so.6 >> #5 0x00bf74a2 in std::terminate() () from /usr/lib/libstdc++.so.6 >> #6 0x00bf74c5 in ?? () from /usr/lib/libstdc++.so.6 >> #7 0x00bf6915 in __cxa_call_unexpected () from /usr/lib/libstdc++.so.6 >> #8 0x0052f8ae in >> activemq::core::TransactionSynhcronization::beforeEnd (this=0x93ac548) >> at activemq/core/ActiveMQConsumer.cpp:84 >> #9 0x00550588 in >> activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x93b3e48) >> at activemq/core/ActiveMQTransactionContext.cpp:192 >> #10 0x00550d63 in activemq::core::ActiveMQTransactionContext::commit >> (this=0x93b3e48) at activemq/core/ActiveMQTransactionContext.cpp:127 >> #11 0x0053e460 in activemq::core::ActiveMQSession::commit >> (this=0x93b3c10) at activemq/core/ActiveMQSession.cpp:189 >> #12 0x0042c1a0 in activemq::cmsutil::PooledSession::commit >> (this=0x93b3fd8) at activemq/cmsutil/PooledSession.h:87 >> #13 0x0804c0fb in Consumer::onMessage (this=0x93a8c30, >> message=0x93b49b0) at main.cpp:455 >> #14 0x0804bdc8 in Consumer::consumeLoop (this=0x93a8c30) at main.cpp:415 >> #15 0x0804bb56 in Consumer::run (this=0x93a8c30) at main.cpp:381 >> #16 0x00823cf1 in decaf::lang::ThreadProperties::runCallback >> (properties=0x93abff0) at decaf/lang/Thread.cpp:135 >> #17 0x00822847 in threadWorker (arg=0x93abff0) at decaf/lang/Thread.cpp:188 >> #18 0x0015a96e in start_thread (arg=0xb6f8bb70) at pthread_create.c:300 >> #19 0x00cfda4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 >> >> >> Exception specification for method >> 'activemq::core::TransactionSynhcronization::beforeEnd' says: 'throw( >> exceptions::ActiveMQException )' but 'ActiveMQConsumer::acknowledge' >> throws 'cms::CMSException', so 'std::unexpected' is called and 'abort' >> occurs. >> (http://www.linuxprogrammingblog.com/cpp-exception-specifications-are-evil) >> >> >> When 'MessageListener' is used, the commit freezes, probability, in thread 5: >> >> Thread 1: >> #0 0x00cfd422 in __kernel_vsyscall () >> #1 0x00ddcb5d in pthread_join (threadid=3061869424, >> thread_return=0xbf964bdc) at pthread_join.c:89 >> #2 0x00a923c0 in decaf::lang::Thread::join (this=0x8eba574) at >> decaf/lang/Thread.cpp:421 >> #3 0x0804c9f1 in AppTest::joinEndPoins (this=0xbf964c64) at main.cpp:586 >> #4 0x0804c678 in AppTest::run (this=0xbf964c64) at main.cpp:555 >> #5 0x0804a33c in main (argc=4, argv=0xbf964d84) at main.cpp:633 >> >> Thread 2: >> #0 0x00cfd422 in __kernel_vsyscall () >> #1 0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at >> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122 >> #2 0x0034f9dd in __pthread_cond_wait (cond=0x8eb0e38, >> mutex=0x8eb0e08) at forward.c:139 >> #3 0x00a606a9 in >> decaf::internal::util::concurrent::ConditionImpl::wait >> (condition=0x8eb0e38) at >> decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94 >> #4 0x00ad1353 in decaf::util::concurrent::Mutex::wait >> (this=0x8eb0d6c) at decaf/util/concurrent/Mutex.cpp:95 >> #5 0x007fc9c5 in activemq::threads::CompositeTaskRunner::run >> (this=0x8eb0d48) at activemq/threads/CompositeTaskRunner.cpp:118 >> #6 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback >> (properties=0x8eb0e90) at decaf/lang/Thread.cpp:135 >> #7 0x00a92847 in threadWorker (arg=0x8eb0e90) at decaf/lang/Thread.cpp:188 >> #8 0x00ddb96e in start_thread (arg=0xb7808b70) at pthread_create.c:300 >> #9 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 >> >> Thread 3: >> #0 0x00cfd422 in __kernel_vsyscall () >> #1 0x00de2af9 in __lll_lock_wait () at >> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142 >> #2 0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0 >> #3 0x00dddf61 in __pthread_mutex_lock (mutex=0x8ebb3a8) at >> pthread_mutex_lock.c:61 >> #4 0x0034fba6 in pthread_mutex_lock (mutex=0x8ebb3a8) at forward.c:182 >> #5 0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock >> (handle=0x8ebb3a8) at >> decaf/internal/util/concurrent/unix/MutexImpl.cpp:71 >> #6 0x00ad1412 in decaf::util::concurrent::Mutex::lock >> (this=0x8ebb2bc) at decaf/util/concurrent/Mutex.cpp:75 >> #7 0x007c5770 in >> decaf::util::StlQueue<decaf::lang::Pointer<activemq::commands::MessageDispatch, >> decaf::util::concurrent::atomic::AtomicRefCounter> >::lock >> (this=0x8ebb2a8) >> at ./decaf/util/StlQueue.h:253 >> #8 activemq::core::MessageDispatchChannel::lock (this=0x8ebb2a8) at >> activemq/core/MessageDispatchChannel.h:153 >> #9 0x00ad0ab5 in decaf::util::concurrent::Lock::lock >> (this=0xb7006fe4) at decaf/util/concurrent/Lock.cpp:54 >> #10 0x00ad0c08 in Lock (this=0xfffffe00, object=0x8ebb3a8, >> intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32 >> #11 0x0078e102 in >> activemq::core::ActiveMQConsumer::clearMessagesInProgress >> (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:1112 >> #12 0x007af15c in >> activemq::core::ActiveMQSession::clearMessagesInProgress >> (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:239 >> #13 0x00778543 in >> activemq::core::ActiveMQConnection::transportInterrupted >> (this=0x8eb1210) at activemq/core/ActiveMQConnection.cpp:704 >> #14 0x00803b34 in >> activemq::transport::TransportFilter::transportInterrupted >> (this=0x8eb11c0) at activemq/transport/TransportFilter.cpp:67 >> #15 0x008187ae in >> activemq::transport::failover::FailoverTransport::handleTransportFailure >> (this=0x8eb0a38, error=...) at >> activemq/transport/failover/FailoverTransport.cpp:476 >> #16 0x0082406c in >> activemq::transport::failover::FailoverTransportListener::onException >> (this=0x8eb0878, ex=...) at >> activemq/transport/failover/FailoverTransportListener.cpp:97 >> #17 0x00803c0b in activemq::transport::TransportFilter::fire >> (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:49 >> #18 0x00803c64 in activemq::transport::TransportFilter::onException >> (this=0x8eb2d80, ex=...) at activemq/transport/TransportFilter.cpp:41 >> #19 0x00803c0b in activemq::transport::TransportFilter::fire >> (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:49 >> #20 0x00803c64 in activemq::transport::TransportFilter::onException >> (this=0x8eb27f0, ex=...) at activemq/transport/TransportFilter.cpp:41 >> #21 0x00801b13 in activemq::transport::IOTransport::fire >> (this=0x8eb27a0, ex=...) at activemq/transport/IOTransport.cpp:73 >> #22 0x008023bf in activemq::transport::IOTransport::run >> (this=0x8eb27a0) at activemq/transport/IOTransport.cpp:246 >> #23 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback >> (properties=0x8eb2ea0) at decaf/lang/Thread.cpp:135 >> #24 0x00a92847 in threadWorker (arg=0x8eb2ea0) at decaf/lang/Thread.cpp:188 >> #25 0x00ddb96e in start_thread (arg=0xb7007b70) at pthread_create.c:300 >> #26 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 >> >> Thread 4: >> #0 0x00cfd422 in __kernel_vsyscall () >> #1 0x00de0015 in pthread_cond_wait@@GLIBC_2.3.2 () at >> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/pthread_cond_wait.S:122 >> #2 0x0034f9dd in __pthread_cond_wait (cond=0x8eba6c8, >> mutex=0x8eba698) at forward.c:139 >> #3 0x00a606a9 in >> decaf::internal::util::concurrent::ConditionImpl::wait >> (condition=0x8eba6c8) at >> decaf/internal/util/concurrent/unix/ConditionImpl.cpp:94 >> #4 0x00ad1353 in decaf::util::concurrent::Mutex::wait >> (this=0x8eba584) at decaf/util/concurrent/Mutex.cpp:95 >> #5 0x00ad029a in decaf::util::concurrent::CountDownLatch::await >> (this=0x8eba57c) at decaf/util/concurrent/CountDownLatch.cpp:56 >> #6 0x0804af84 in JMSEndPointThread::stopCheck (this=0x8eba574, >> timeOut=-1) at main.cpp:226 >> #7 0x0804bb46 in Consumer::run (this=0x8eba570) at main.cpp:379 >> #8 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback >> (properties=0x8eba5b0) at decaf/lang/Thread.cpp:135 >> #9 0x00a92847 in threadWorker (arg=0x8eba5b0) at decaf/lang/Thread.cpp:188 >> #10 0x00ddb96e in start_thread (arg=0xb6806b70) at pthread_create.c:300 >> #11 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 >> >> Thread 5: >> #0 0x00cfd422 in __kernel_vsyscall () >> #1 0x00de2af9 in __lll_lock_wait () at >> ../nptl/sysdeps/unix/sysv/linux/i386/i686/../i486/lowlevellock.S:142 >> #2 0x00dde13b in _L_lock_748 () from /lib/tls/i686/cmov/libpthread.so.0 >> #3 0x00dddf61 in __pthread_mutex_lock (mutex=0x8eb05e0) at >> pthread_mutex_lock.c:61 >> #4 0x0034fba6 in pthread_mutex_lock (mutex=0x8eb05e0) at forward.c:182 >> #5 0x00a60aa1 in decaf::internal::util::concurrent::MutexImpl::lock >> (handle=0x8eb05e0) at >> decaf/internal/util/concurrent/unix/MutexImpl.cpp:71 >> #6 0x00ad1412 in decaf::util::concurrent::Mutex::lock >> (this=0x8eb0a88) at decaf/util/concurrent/Mutex.cpp:75 >> #7 0x00ad0ab5 in decaf::util::concurrent::Lock::lock >> (this=0xb6004ce0) at decaf/util/concurrent/Lock.cpp:54 >> #8 0x00ad0c08 in Lock (this=0xfffffe00, object=0x8eb05e0, >> intiallyLocked=true) at decaf/util/concurrent/Lock.cpp:32 >> #9 0x0081b8fd in >> activemq::transport::failover::FailoverTransport::oneway >> (this=0x8eb0a38, command=...) at >> activemq/transport/failover/FailoverTransport.cpp:186 >> #10 0x00807f9f in >> activemq::transport::correlator::ResponseCorrelator::oneway >> (this=0x8eb11c0, command=...) at >> activemq/transport/correlator/ResponseCorrelator.cpp:82 >> #11 0x007713be in activemq::core::ActiveMQConnection::oneway >> (this=0x8eb1210, command=...) at >> activemq/core/ActiveMQConnection.cpp:741 >> #12 0x007b0a4f in activemq::core::ActiveMQSession::oneway >> (this=0x8ebab90, command=...) at activemq/core/ActiveMQSession.cpp:903 >> #13 0x00795a1c in activemq::core::ActiveMQConsumer::acknowledge >> (this=0x8ebb270) at activemq/core/ActiveMQConsumer.cpp:860 >> #14 0x0079f885 in >> activemq::core::TransactionSynhcronization::beforeEnd (this=0x8ebb9e8) >> at activemq/core/ActiveMQConsumer.cpp:85 >> #15 0x007c0588 in >> activemq::core::ActiveMQTransactionContext::beforeEnd (this=0x8ebadc8) >> at activemq/core/ActiveMQTransactionContext.cpp:192 >> #16 0x007c0d63 in activemq::core::ActiveMQTransactionContext::commit >> (this=0x8ebadc8) at activemq/core/ActiveMQTransactionContext.cpp:127 >> #17 0x007ae460 in activemq::core::ActiveMQSession::commit >> (this=0x8ebab90) at activemq/core/ActiveMQSession.cpp:189 >> #18 0x0069c1a0 in activemq::cmsutil::PooledSession::commit >> (this=0x8ebaf38) at activemq/cmsutil/PooledSession.h:87 >> #19 0x0804c0fb in Consumer::onMessage (this=0x8eba570, >> message=0x8ebbec0) at main.cpp:455 >> #20 0x0079972f in activemq::core::ActiveMQConsumer::dispatch >> (this=0x8ebb270, dispatch=...) at >> activemq/core/ActiveMQConsumer.cpp:1018 >> #21 0x007bd5c3 in activemq::core::ActiveMQSessionExecutor::dispatch >> (this=0x8ebae78, dispatch=...) at >> activemq/core/ActiveMQSessionExecutor.cpp:129 >> #22 0x007bd993 in activemq::core::ActiveMQSessionExecutor::iterate >> (this=0x8ebae78) at activemq/core/ActiveMQSessionExecutor.cpp:166 >> #23 0x008007f3 in activemq::threads::DedicatedTaskRunner::run >> (this=0x8ebb550) at activemq/threads/DedicatedTaskRunner.cpp:111 >> #24 0x00a93cf1 in decaf::lang::ThreadProperties::runCallback >> (properties=0x8ebaad8) at decaf/lang/Thread.cpp:135 >> #25 0x00a92847 in threadWorker (arg=0x8ebaad8) at decaf/lang/Thread.cpp:188 >> #26 0x00ddb96e in start_thread (arg=0xb6005b70) at pthread_create.c:300 >> #27 0x00342a4e in clone () at ../sysdeps/unix/sysv/linux/i386/clone.S:130 >> >> >> >> I tried to resolve this issue changing configurations and code, but i >> always get an error or client freezes. >> Is it a bug? Any suggestions? >> >> Thanks. >> Henrique >> >> How to reproduce: >> 1. Start ActiveMQ >> 2. Send a massage './activemqTest -p1 -n1' >> 3. Run consumer './activemqTest -c1 -d10000' >> 4. Stop activemq when 'Starting delay...' message appears. >> 5. After activemq is down and delay is finish, consumer try commit >> (message 'Try commit') and unexpected exception occurs. >> >> Following test code, based on ActiveMQ-CPP sample >> 'activemq-cpp-library-3.2.4/src/examples/main.cpp': >> >> >> /* >> * Licensed to the Apache Software Foundation (ASF) under one or more >> * contributor license agreements. See the NOTICE file distributed with >> * this work for additional information regarding copyright ownership. >> * The ASF licenses this file to You under the Apache License, Version 2.0 >> * (the "License"); you may not use this file except in compliance with >> * the License. You may obtain a copy of the License at >> * >> * http://www.apache.org/licenses/LICENSE-2.0 >> * >> * Unless required by applicable law or agreed to in writing, software >> * distributed under the License is distributed on an "AS IS" BASIS, >> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> * See the License for the specific language governing permissions and >> * limitations under the License. >> */ >> >> // START SNIPPET: demo >> >> #include <activemq/library/ActiveMQCPP.h> >> #include <activemq/cmsutil/SessionPool.h> >> #include <activemq/cmsutil/PooledSession.h> >> #include <activemq/cmsutil/ResourceLifecycleManager.h> >> #include <activemq/exceptions/ActiveMQException.h> >> #include <decaf/lang/Thread.h> >> #include <decaf/lang/Runnable.h> >> #include <decaf/util/concurrent/CountDownLatch.h> >> #include <decaf/util/concurrent/Mutex.h> >> #include <decaf/lang/Integer.h> >> #include <decaf/lang/Long.h> >> #include <decaf/lang/System.h> >> #include <activemq/core/ActiveMQConnectionFactory.h> >> #include <activemq/util/Config.h> >> #include <cms/Connection.h> >> #include <cms/Session.h> >> #include <cms/TextMessage.h> >> #include <cms/BytesMessage.h> >> #include <cms/MapMessage.h> >> #include <cms/ExceptionListener.h> >> #include <cms/MessageListener.h> >> #include <iostream> >> #include <memory> >> #include <map> >> #include <vector> >> #include <unistd.h> >> >> using namespace activemq::core; >> using namespace activemq::cmsutil; >> using namespace decaf::util::concurrent; >> using namespace decaf::util; >> using namespace decaf::lang; >> using namespace cms; >> >> >> class ConnPoolManager : protected decaf::util::concurrent::Mutex, >> public ExceptionListener { >> private: >> typedef std::map< int, activemq::cmsutil::SessionPool* > tSessionPoolMap; >> >> std::string _brokerURI; >> ConnectionFactory* _connectionFactory; >> Connection* _connection; >> ResourceLifecycleManager* _lifecycleManager; >> tSessionPoolMap _pools; >> bool _started; >> >> public: >> ConnPoolManager() : >> _brokerURI("tcp://localhost:61616"), >> _connectionFactory(NULL), >> _connection(NULL), >> _lifecycleManager(NULL), >> _started(false) >> { >> } >> >> virtual ~ConnPoolManager() { >> cleanup(); >> } >> >> void setBrokerURI(const std::string& uri) { >> _brokerURI = uri; >> } >> >> bool startPools() { >> synchronized(this) { >> if(_started) return true; >> try { >> createConnection(); >> createPools(); >> _started=true; >> return true; >> } catch(CMSException& e) { >> std::cout<<"(ConnPoolManager::startPools) >> CMSException: " << e.getMessage() << std::endl; >> } catch(...) { >> std::cout<<"(ConnPoolManager::startPools) Unknown >> exception!!!" << std::endl; >> } >> } >> cleanup(); >> return false; >> } >> >> PooledSession* getSession(cms::Session::AcknowledgeMode ack) { >> if(_started) { >> return _pools[ack]->takeSession(); >> } else if(startPools()) { >> return _pools[ack]->takeSession(); >> } else { >> return NULL; >> } >> } >> >> private: >> >> void createConnection(){ >> // Create a ConnectionFactory >> _connectionFactory = >> ConnectionFactory::createCMSConnectionFactory( _brokerURI ); >> >> // Create a Connection >> _connection = _connectionFactory->createConnection(); >> _connection->start(); >> _connection->setExceptionListener(this); >> } >> >> void createPools(){ >> // Create lifecycle manager >> _lifecycleManager = new >> activemq::cmsutil::ResourceLifecycleManager(); >> >> // Create pools, one for each acknowledge type >> for( int ack = cms::Session::AUTO_ACKNOWLEDGE; >> ack <= cms::Session::INDIVIDUAL_ACKNOWLEDGE; >> ack++) >> { >> _pools[ack] = new activemq::cmsutil::SessionPool( >> _connection, >> static_cast<cms::Session::AcknowledgeMode>(ack), >> _lifecycleManager); >> } >> } >> >> void cleanup() { >> synchronized(this) { >> releasePools(); >> if(_lifecycleManager!=NULL) { >> try { delete _lifecycleManager; >> } catch(...) {} >> } _lifecycleManager = NULL; >> >> if(_connection!=NULL) { >> try { delete _connection; >> } catch(...) {} >> } _connection = NULL; >> if(_connectionFactory!=NULL) { >> try { delete _connectionFactory; >> } catch(...) {} >> } _connectionFactory = NULL; >> _started=false; >> } >> } >> >> void releasePools() { >> for(tSessionPoolMap::iterator it=_pools.begin(); >> it!=_pools.end(); it++) { >> try { >> delete it->second; >> } catch(...) { } >> } >> _pools.clear(); >> } >> >> // If something bad happens you see it here as this class is also been >> // registered as an ExceptionListener with the connection. >> virtual void onException( const CMSException& ex AMQCPP_UNUSED) { >> std::cout<<"(ConnPoolManager::onException) CMS Exception >> occurred. Shutting down client."<<std::endl; >> ex.printStackTrace(); >> exit(1); >> } >> >> >> >> }; >> >> class JMSEndPointThread : public Thread { >> private: >> CountDownLatch _stopNow; >> ConnPoolManager* _pools; >> std::string _destinationString; >> PooledSession* _session; >> Destination* _destination; >> bool _sessionTransacted; >> >> protected: >> >> JMSEndPointThread( ConnPoolManager* pools, >> const std::string& dest, >> bool sessionTransacted) : >> _stopNow(1), >> _pools(pools), >> _destinationString(dest), >> _session(NULL), >> _destination(NULL), >> _sessionTransacted(sessionTransacted) >> { >> } >> >> virtual bool prepareSession() { >> cleanup(); >> // Create a Session >> if( _sessionTransacted ) { >> _session = _pools->getSession(Session::SESSION_TRANSACTED); >> } else { >> _session = _pools->getSession(Session::AUTO_ACKNOWLEDGE); >> } >> if(_session==NULL) return false; >> >> // Create the Queue destination >> _destination = _session->createQueue( _destinationString ); >> return true; >> } >> >> PooledSession* getSession() { return _session; } >> Destination* getDestination() { return _destination; } >> bool getTransacted() { return _sessionTransacted; } >> >> bool stopCheck(int timeOut=0) { >> if(timeOut>=0) return _stopNow.await(timeOut); >> else _stopNow.await(); >> return true; >> } >> >> virtual void cleanup() { >> // Destroy resources. >> try{ >> if( _destination != NULL ) delete _destination; >> } catch(...) { } >> _destination = NULL; >> >> // Back session to pool >> try{ >> if( _session != NULL ) _session->close(); >> } catch(...) { } >> _session = NULL; >> } >> >> public: >> >> typedef std::vector<JMSEndPointThread*> List; >> >> virtual ~JMSEndPointThread() { >> cleanup(); >> } >> >> void stopNow() { >> _stopNow.countDown(); >> } >> >> }; >> >> class Producer : public JMSEndPointThread { >> private: >> >> MessageProducer* _producer; >> int _delay; >> int _numMessages; >> >> public: >> >> Producer( ConnPoolManager* pools, >> const std::string& dest, >> int delay, >> int numMessages, >> bool sessionTransacted = false ) : >> JMSEndPointThread(pools,dest,sessionTransacted) >> { >> _producer = NULL; >> _delay = delay; >> _numMessages = numMessages; >> } >> >> virtual ~Producer() { >> cleanup(); >> } >> >> virtual void run() { >> std::cout<<"Producer started!!!! Thread: >> "<<Thread::getId()<<std::endl; >> while(!stopCheck() && _numMessages) { >> try { >> if(prepareProducer()) { >> sendMessages(); >> continue; >> } >> } catch(CMSException& e) { >> std::cout<<"(Producer::run) CMSException: " << >> e.getMessage() << std::endl; >> } catch(...) { >> std::cout<<"(Producer::run) Unknown exception!!!" << >> std::endl; >> } >> cleanup(); >> stopCheck(5000); // reconnect delay >> } >> std::cout<<"Producer end!!!! Thread: "<<Thread::getId()<<std::endl; >> cleanup(); >> } >> >> private: >> >> bool prepareProducer() { >> // Get Session >> if(!prepareSession()) return false; >> // Create a MessageProducer from the Session to the Queue >> _producer = getSession()->createProducer( getDestination() ); >> _producer->setDeliveryMode( DeliveryMode::PERSISTENT ); >> return true; >> } >> void sendMessages() { >> // Create a messages >> std::stringstream text; >> text<<"Hello world! from thread " << Thread::getId(); >> >> while(!stopCheck() && _numMessages) { >> >> TextMessage* message = getSession()->createTextMessage( >> text.str() ); >> message->setIntProperty( "Integer", _numMessages ); >> >> // Tell the producer to send the message >> std::cout<<"Sent message #"<<_numMessages<<" from thread >> "<<Thread::getId()<<std::endl; >> _producer->send( message ); >> delete message; >> --_numMessages; >> >> stopCheck(_delay); >> if(getTransacted()) getSession()->commit(); >> } >> } >> >> virtual void cleanup() { >> try{ >> if( _producer != NULL ) delete _producer; >> } catch(...) { } >> _producer = NULL; >> >> JMSEndPointThread::cleanup(); >> } >> >> }; >> >> >> class Consumer : public MessageListener, >> public JMSEndPointThread { >> >> private: >> >> MessageConsumer* _consumer; >> int _delay; >> bool _useListener; >> >> public: >> >> Consumer( ConnPoolManager* pools, >> const std::string& dest, >> int delay, >> bool useListener = true, >> bool sessionTransacted = true ) : >> JMSEndPointThread(pools,dest,sessionTransacted) >> { >> _consumer = NULL; >> _delay = delay; >> _useListener = useListener; >> } >> virtual ~Consumer(){ >> cleanup(); >> } >> >> virtual void run() { >> std::cout<<"Consumer started!!!! Thread: >> "<<Thread::getId()<<std::endl; >> while(!stopCheck()) { >> try { >> if(prepareConsumer()) { >> // Wait while asynchronous messages come in. >> if(_useListener) { >> stopCheck(-1); >> } else { >> consumeLoop(); >> } >> continue; >> } >> } catch (CMSException& e) { >> std::cout<<"(Consumer::run) CMSException: " << >> e.getMessage() << std::endl; >> } catch(...) { >> std::cout<<"(Consumer::run) Unknown exception!!!" << >> std::endl; >> } >> cleanup(); >> stopCheck(5000); // reconnect delay >> } >> std::cout<<"Consumer end!!!! Thread: "<<Thread::getId()<<std::endl; >> cleanup(); >> } >> >> >> private: >> >> bool prepareConsumer() { >> // Get Session >> if(!prepareSession()) return false; >> // Create a MessageConsumer from the Session to the Queue >> _consumer = getSession()->createConsumer( getDestination() ); >> if(_useListener) >> _consumer->setMessageListener( this ); >> >> return true; >> } >> >> void consumeLoop() { >> while(!stopCheck()) { >> Message *msg = _consumer->receive(500); >> if(msg!=NULL) { >> onMessage(msg); >> delete msg; >> } >> } >> } >> >> // Called from the consumer since this class is a registered >> MessageListener. >> virtual void onMessage( const Message* message ) { >> >> static int count = 0; >> >> try >> { >> count++; >> const TextMessage* textMessage = >> dynamic_cast< const TextMessage* >( message ); >> std::string text = ""; >> >> if( textMessage != NULL ) { >> text = textMessage->getText(); >> } else { >> text = "NOT A TEXTMESSAGE!"; >> } >> >> std::cout<<"Message #"<<count<<" Received: "<<text<<std::endl; >> if(_delay) { >> std::cout<<"Starting delay..."<<std::endl; >> stopCheck(_delay); >> } >> >> } catch (CMSException& e) { >> std::cout<<"(Consumer::onMessage) CMSException: " << >> e.getMessage() << std::endl; >> } catch(...) { >> std::cout<<"(Consumer::onMessage) Unknown exception!!!" << >> std::endl; >> } >> >> try >> { // Commit all messages. >> if( getTransacted() ) { >> std::cout<<"Try commit....."<<std::endl; >> getSession()->commit(); >> std::cout<<"Commit OK!!!"<<std::endl; >> } >> } catch (CMSException& e) { >> std::cout<<"(Consumer::onMessage) CMSException: " << >> e.getMessage() << std::endl; >> } catch(...) { >> std::cout<<"(Consumer::onMessage) Unknown exception!!!" << >> std::endl; >> } >> >> } >> >> virtual void cleanup() { >> try{ >> if( _consumer != NULL ) delete _consumer; >> } catch (...) { } >> _consumer = NULL; >> JMSEndPointThread::cleanup(); >> } >> }; >> >> >> class ScopedActiveMQLibrary { >> public: >> ScopedActiveMQLibrary() { >> activemq::library::ActiveMQCPP::initializeLibrary(); >> } >> virtual ~ScopedActiveMQLibrary() { >> activemq::library::ActiveMQCPP::shutdownLibrary(); >> } >> }; >> >> class AppTest : private ScopedActiveMQLibrary { >> private: >> int _consumer; >> int _producer; >> int _numMessages; >> bool _useListener; >> int _delay; >> ConnPoolManager _poolManager; >> JMSEndPointThread::List _endPointList; >> >> static AppTest* _app; >> static CountDownLatch _terminationRequest; >> >> public: >> AppTest(int argc, char** argv) : >> _consumer(0), >> _producer(0), >> _numMessages(1), >> _useListener(false), >> _delay(0) >> { >> _app=this; >> int c; >> while ((c = getopt (argc, argv, "lc:p:n:d:")) != -1) { >> switch(c) { >> case 'l': _useListener=true; break; >> case 'c': _consumer=atoi(optarg); break; >> case 'p': _producer=atoi(optarg); break; >> case 'n': _numMessages=atoi(optarg); break; >> case 'd': _delay=atoi(optarg); break; >> } >> } >> if(_consumer<0) _consumer=0; >> if(_producer<0) _producer=0; >> _poolManager.setBrokerURI( >> "failover://(" >> "tcp://localhost:61616" >> "?transport.useInactivityMonitor=false" >> ")?timeout=1000" >> "&cms.RedeliveryPolicy.maximumRedeliveries=-1" >> ); >> _poolManager.startPools(); >> } >> >> virtual ~AppTest() { >> } >> >> int run() { >> >> // Install signal handler >> if(!installSigAction()) >> return 1; >> >> // Create consumer/producer objects >> loadEndPoins(); >> >> // Start the producer/consumer thread. >> startEndPoins(); >> >> // start another tasks >> // ... >> >> // Wait for termination request >> /*if(_consumer) { >> _terminationRequest.await(); >> stopEndPoins(); >> }*/ >> >> // join threads >> joinEndPoins(); >> >> return 0; >> } >> >> private: >> >> void loadEndPoins() { >> for(int i=0; i<_consumer;i++) { >> _endPointList.push_back( new >> Consumer(&_poolManager,"TEST.FOO",_delay,_useListener) ); >> } >> for(int i=0; i<_producer;i++) { >> _endPointList.push_back( new >> Producer(&_poolManager,"TEST.FOO",_delay,_numMessages) ); >> } >> } >> >> void startEndPoins() { >> for(JMSEndPointThread::List::iterator it=_endPointList.begin(); >> it!=_endPointList.end(); it++ ) >> (**it).start(); >> } >> >> void stopEndPoins() { >> for(JMSEndPointThread::List::reverse_iterator >> it=_endPointList.rbegin(); >> it!=_endPointList.rend(); it++ ) >> (**it).stopNow(); >> } >> >> void joinEndPoins() { >> for(JMSEndPointThread::List::reverse_iterator >> it=_endPointList.rbegin(); >> it!=_endPointList.rend(); it++ ) { >> (**it).join(); >> delete *it; >> *it = NULL; >> } >> _endPointList.clear(); >> } >> >> static void signalHandler(int sig) { >> _app->_terminationRequest.countDown(); >> _app->stopEndPoins(); >> } >> >> bool installSigAction() { >> struct sigaction action; >> memset(&action, 0, sizeof(action)); >> action.sa_handler = signalHandler; >> if( sigaction(SIGTERM, &action, NULL)<0 || >> sigaction(SIGQUIT, &action, NULL)<0 || >> sigaction(SIGINT, &action, NULL)<0 ) >> { return false; } >> return true; >> } >> >> int termWait() { >> sigset_t sset; >> sigemptyset(&sset); >> sigaddset(&sset, SIGINT); >> sigaddset(&sset, SIGQUIT); >> sigaddset(&sset, SIGTERM); >> sigprocmask(SIG_BLOCK, &sset, NULL); >> int sig; >> sigwait(&sset, &sig); >> return sig; >> } >> >> }; >> AppTest* AppTest::_app = NULL; >> CountDownLatch AppTest::_terminationRequest(1); >> >> >> void my_unexpected() { >> throw activemq::exceptions::ActiveMQException(); >> } >> >> int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) { >> //std::set_unexpected(my_unexpected); >> AppTest test(argc, argv); >> return test.run(); >> } >> >> // END SNIPPET: demo > > -- > Tim Bish > ------------ > FuseSource > Email: tim.b...@fusesource.com > Web: http://fusesource.com > Twitter: tabish121 > Blog: http://timbish.blogspot.com/ > > >