Yes, I'm running 5.1.0 and using default activemq.xml. I've found out the cause! It is a bug subscriber code, it closes the connection in 'onMessage' upon receiving 'SHUTDOWN' message while using 'session.AUTO_ACKNOWLEDGE', so 'SHUTDOWN' message never gets acknowledged, which results in everybody appearing weird across subscriber/publisher/broker restart. Changing to 'session.CLIENT_ACKNOWLEDGE' solved the problem.
Thanks for the help! Dave Stanley wrote: > > Not that it helps much, I tried it with a 5.x release and your code works > fine. I see the subscription is durable across consumer and broker > restarts. > Have you modified your activemq.xml, are you running with 5.1.0? > > /Dave > On Fri, Jun 13, 2008 at 1:05 PM, eaglepointe <[EMAIL PROTECTED]> > wrote: > >> >> Forgot to mention that I also restarted message broker before start >> durable >> subscriber again. >> >> >> eaglepointe wrote: >> > >> > Ok, here is what I did, >> > >> > 1) start message broker >> > 2) start a durable subscriber which subscribes "topictest.messages" as >> the >> > attached code >> > 3) start publisher, which sends some messages to "topictest.messages" >> in >> > 'persistent' mode, and the subscriber receives the messages >> > 4) stop the subscriber and publisher >> > 5) start publisher again, which sends some messages to >> > "topictest.messages" >> > 6) stop publisher >> > 7) start subscriber again, according to my understanding of durable >> > subscription, I'd expect subscriber receives the messages sent by >> > publisher, but I got the exceptions instead >> > >> > Following is a more complete version of subscriber code, which is >> really >> > the sample code comes with ActiveMQ installation, except for the >> durable >> > subscription. >> > >> > The same code works fine, meaning no exceptions, if I don't use durable >> > subscription. >> > It seems that durable subscription requires some context in message >> broker >> > which somehow got lost. >> > >> > >> > >> -------------------------------------------------------------------------------------------- >> > >> > public class TopicListener implements MessageListener { >> > >> > private Connection connection; >> > private MessageProducer producer; >> > private Session session; >> > private int count; >> > private long start; >> > private Topic topic; >> > private Topic control; >> > //private TopicSubscriber consumer; >> > >> > private String url = "tcp://192.168.30.60:61616"; >> > >> > public static void main(String[] argv) throws Exception { >> > TopicListener l = new TopicListener(); >> > String[] unknown = CommandLineSupport.setOptions(l, argv); >> > if (unknown.length > 0) { >> > System.out.println("Unknown options: " + >> > Arrays.toString(unknown)); >> > System.exit(-1); >> > } >> > l.run(); >> > } >> > >> > public void run() throws JMSException { >> > ActiveMQConnectionFactory factory = new >> > ActiveMQConnectionFactory(url); >> > connection = factory.createConnection(); >> > connection.setClientID("myClient"); >> > session = connection.createSession(false, >> > Session.AUTO_ACKNOWLEDGE); >> > topic = session.createTopic("topictest.messages"); >> > control = session.createTopic("topictest.control"); >> > >> > TopicSubscriber consumer = >> > session.createDurableSubscriber(topic,"myDurable1"); >> > //MessageConsumer consumer = session.createConsumer(topic); >> > consumer.setMessageListener(this); >> > >> > producer = session.createProducer(control); >> > >> > System.out.println("Waiting for messages..."); >> > connection.start(); >> > } >> > >> --------------------------------------------------------------------------------- >> > >> > your grateful, >> > -Jeff >> > >> > >> > >> > Dave Stanley wrote: >> >> >> >> Your not running this from within a thread by any chance? I don't >> >> believe connection.start() is going to block, so try and create the >> >> connection outside the thread and pass it in by reference to the >> thread >> >> instance. >> >> >> >> HTH >> >> /Dave >> >> >> >> >> >> On Thu, Jun 12, 2008 at 10:30 PM, eaglepointe <[EMAIL PROTECTED]> >> >> wrote: >> >> >> >>> >> >>> Hi, >> >>> >> >>> I'm new to ActiveMQ and having problem with durable subscription, >> >>> following >> >>> code based on example code, >> >>> >> >>> =================================================================== >> >>> public void run() throws JMSException { >> >>> ActiveMQConnectionFactory factory = new >> >>> ActiveMQConnectionFactory(url); >> >>> connection = factory.createConnection(); >> >>> connection.setClientID("myClient"); >> >>> session = connection.createSession(false, >> >>> Session.AUTO_ACKNOWLEDGE); >> >>> topic = session.createTopic("topictest.messages"); >> >>> control = session.createTopic("topictest.control"); >> >>> >> >>> TopicSubscriber consumer = >> >>> session.createDurableSubscriber(topic,"myDurable1"); >> >>> //MessageConsumer consumer = session.createConsumer(topic); >> >>> consumer.setMessageListener(this); >> >>> >> >>> producer = session.createProducer(control); >> >>> >> >>> System.out.println("Waiting for messages..."); >> >>> connection.start(); >> >>> } >> >>> ================================================================= >> >>> >> >>> is causing exceptions on broker side and client side, below are the >> >>> exceptions >> >>> >> >>> broker side: >> >>> >> >>> ERROR Service - Async error occurred: >> >>> java.lang.NullPointerException >> >>> java.lang.NullPointerException >> >>> at >> >>> >> >>> >> org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:479) >> >>> at >> >>> org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:105) >> >>> at >> >>> >> >>> >> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292) >> >>> at >> >>> >> >>> >> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180) >> >>> at >> >>> >> >>> >> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) >> >>> at >> >>> >> >>> >> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) >> >>> at >> >>> >> >>> >> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) >> >>> at >> >>> >> >>> >> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) >> >>> at >> >>> >> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196) >> >>> at >> >>> >> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183) >> >>> at java.lang.Thread.run(Thread.java:619) >> >>> >> >>> client side: >> >>> >> >>> Exception in thread "ActiveMQ Session Task" >> >>> java.util.concurrent.RejectedExecutionException >> >>> at >> >>> >> >>> >> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown >> >>> Source) >> >>> at java.util.concurrent.ThreadPoolExecutor.reject(Unknown >> Source) >> >>> at java.util.concurrent.ThreadPoolExecutor.execute(Unknown >> >>> Source) >> >>> at >> >>> >> >>> >> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:144) >> >>> at >> >>> >> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43) >> >>> at >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown >> >>> Source) >> >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >> >>> Source) >> >>> at java.lang.Thread.run(Unknown Source) >> >>> >> >>> >> >>> Any idea? Thanks in advance! >> >>> >> >>> -- >> >>> View this message in context: >> >>> >> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17813954.html >> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >> >>> >> >>> >> >> >> >> >> > >> > >> >> -- >> View this message in context: >> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17828072.html >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >> >> > > -- View this message in context: http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17833760.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.