You could create your connection and consumer in the initialization of the recovery policy rather than an the recover method, that may help.
On 2 December 2010 08:35, adam <adam.suss...@gmail.com> wrote: > > I am writing a custom SubscriptionRecoveryPolicy and one of the things I need > to do while recovering is create a temporary topic and consume messages from > it. > > What is the right way to do this? > > I don't see a straightforward API for it (such as the one for sending > messages: BrokerFilter.send). > > I have tried going the standard JMS route with a ActiveMQConnectionFactory > to the vm://localhost > transport, but I keep running into all kinds of locking problems and > deadlocks that not only hold > up what I am trying to do, but stop ANY connections from succeeding from > then on. It seems that > creating a connection and trying to create a consumer inside another attempt > to create a consumer > doesn't work. > > For the record, here is what I am trying. This is inside > SubscriptionRecoveryPolicy.recover: > > > ActiveMQConnectionFactory factory = new > ActiveMQConnectionFactory(this.broker.getVmConnectorURI()); > factory.setUseAsyncSend(true); > factory.setWatchTopicAdvisories(false); > > Connection connection = factory.createConnection(); > connection.start(); > > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > TemporaryTopic replaySource = session.createTemporaryTopic(); > MessageConsumer replayConsumer = > session.createConsumer(replaySource); > > This hangs on createConsumer: > > "ActiveMQ Transport: ssl:///10.1.210.140:59574" - Thread t...@53 > java.lang.Thread.State: WAITING on > java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobj...@3fd48679 > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > at > java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317) > at > org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40) > at > org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87) > at > org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1276) > at > org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1874) > at > org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:254) > at > org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1116) > at > org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1060) > at > org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:973) > at > org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:946) > > Note that if I don't set watchTopicAdvisories to false on the connection > factory, then > this hangs during Connection.start() in a different area, but one also > related to creating > a consumer. > > Also, this hang seems to prevent any other new connections to activemq from > completing. > > Anyway, I'm not attached to this methodology. I just want to create a temp > topic and > consume from it. I don't care how :) > > So what's the right way to do this? There is an easy way to send messages > via the > broker object, but nothing for consumers that I can find. Any help is much > appreciated! > > -adam > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/How-to-safely-consume-a-topic-from-INSIDE-the-broker-tp3068674p3068674.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > -- http://blog.garytully.com http://fusesource.com