Not that it helps much, I tried it with a 5.x release and your code works fine. I see the subscription is durable across consumer and broker restarts. Have you modified your activemq.xml, are you running with 5.1.0?
/Dave On Fri, Jun 13, 2008 at 1:05 PM, eaglepointe <[EMAIL PROTECTED]> wrote: > > Forgot to mention that I also restarted message broker before start durable > subscriber again. > > > eaglepointe wrote: > > > > Ok, here is what I did, > > > > 1) start message broker > > 2) start a durable subscriber which subscribes "topictest.messages" as > the > > attached code > > 3) start publisher, which sends some messages to "topictest.messages" in > > 'persistent' mode, and the subscriber receives the messages > > 4) stop the subscriber and publisher > > 5) start publisher again, which sends some messages to > > "topictest.messages" > > 6) stop publisher > > 7) start subscriber again, according to my understanding of durable > > subscription, I'd expect subscriber receives the messages sent by > > publisher, but I got the exceptions instead > > > > Following is a more complete version of subscriber code, which is really > > the sample code comes with ActiveMQ installation, except for the durable > > subscription. > > > > The same code works fine, meaning no exceptions, if I don't use durable > > subscription. > > It seems that durable subscription requires some context in message > broker > > which somehow got lost. > > > > > > > -------------------------------------------------------------------------------------------- > > > > public class TopicListener implements MessageListener { > > > > private Connection connection; > > private MessageProducer producer; > > private Session session; > > private int count; > > private long start; > > private Topic topic; > > private Topic control; > > //private TopicSubscriber consumer; > > > > private String url = "tcp://192.168.30.60:61616"; > > > > public static void main(String[] argv) throws Exception { > > TopicListener l = new TopicListener(); > > String[] unknown = CommandLineSupport.setOptions(l, argv); > > if (unknown.length > 0) { > > System.out.println("Unknown options: " + > > Arrays.toString(unknown)); > > System.exit(-1); > > } > > l.run(); > > } > > > > public void run() throws JMSException { > > ActiveMQConnectionFactory factory = new > > ActiveMQConnectionFactory(url); > > connection = factory.createConnection(); > > connection.setClientID("myClient"); > > session = connection.createSession(false, > > Session.AUTO_ACKNOWLEDGE); > > topic = session.createTopic("topictest.messages"); > > control = session.createTopic("topictest.control"); > > > > TopicSubscriber consumer = > > session.createDurableSubscriber(topic,"myDurable1"); > > //MessageConsumer consumer = session.createConsumer(topic); > > consumer.setMessageListener(this); > > > > producer = session.createProducer(control); > > > > System.out.println("Waiting for messages..."); > > connection.start(); > > } > > > --------------------------------------------------------------------------------- > > > > your grateful, > > -Jeff > > > > > > > > Dave Stanley wrote: > >> > >> Your not running this from within a thread by any chance? I don't > >> believe connection.start() is going to block, so try and create the > >> connection outside the thread and pass it in by reference to the thread > >> instance. > >> > >> HTH > >> /Dave > >> > >> > >> On Thu, Jun 12, 2008 at 10:30 PM, eaglepointe <[EMAIL PROTECTED]> > >> wrote: > >> > >>> > >>> Hi, > >>> > >>> I'm new to ActiveMQ and having problem with durable subscription, > >>> following > >>> code based on example code, > >>> > >>> =================================================================== > >>> public void run() throws JMSException { > >>> ActiveMQConnectionFactory factory = new > >>> ActiveMQConnectionFactory(url); > >>> connection = factory.createConnection(); > >>> connection.setClientID("myClient"); > >>> session = connection.createSession(false, > >>> Session.AUTO_ACKNOWLEDGE); > >>> topic = session.createTopic("topictest.messages"); > >>> control = session.createTopic("topictest.control"); > >>> > >>> TopicSubscriber consumer = > >>> session.createDurableSubscriber(topic,"myDurable1"); > >>> //MessageConsumer consumer = session.createConsumer(topic); > >>> consumer.setMessageListener(this); > >>> > >>> producer = session.createProducer(control); > >>> > >>> System.out.println("Waiting for messages..."); > >>> connection.start(); > >>> } > >>> ================================================================= > >>> > >>> is causing exceptions on broker side and client side, below are the > >>> exceptions > >>> > >>> broker side: > >>> > >>> ERROR Service - Async error occurred: > >>> java.lang.NullPointerException > >>> java.lang.NullPointerException > >>> at > >>> > >>> > org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:479) > >>> at > >>> org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:105) > >>> at > >>> > >>> > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292) > >>> at > >>> > >>> > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180) > >>> at > >>> > >>> > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) > >>> at > >>> > >>> > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) > >>> at > >>> > >>> > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) > >>> at > >>> > >>> > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) > >>> at > >>> > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196) > >>> at > >>> > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183) > >>> at java.lang.Thread.run(Thread.java:619) > >>> > >>> client side: > >>> > >>> Exception in thread "ActiveMQ Session Task" > >>> java.util.concurrent.RejectedExecutionException > >>> at > >>> > >>> > java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown > >>> Source) > >>> at java.util.concurrent.ThreadPoolExecutor.reject(Unknown > Source) > >>> at java.util.concurrent.ThreadPoolExecutor.execute(Unknown > >>> Source) > >>> at > >>> > >>> > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:144) > >>> at > >>> > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43) > >>> at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown > >>> Source) > >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > >>> Source) > >>> at java.lang.Thread.run(Unknown Source) > >>> > >>> > >>> Any idea? Thanks in advance! > >>> > >>> -- > >>> View this message in context: > >>> > http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17813954.html > >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. > >>> > >>> > >> > >> > > > > > > -- > View this message in context: > http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17828072.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > >