I am writing a custom SubscriptionRecoveryPolicy and one of the things I need to do while recovering is create a temporary topic and consume messages from it.
What is the right way to do this? I don't see a straightforward API for it (such as the one for sending messages: BrokerFilter.send). I have tried going the standard JMS route with a ActiveMQConnectionFactory to the vm://localhost transport, but I keep running into all kinds of locking problems and deadlocks that not only hold up what I am trying to do, but stop ANY connections from succeeding from then on. It seems that creating a connection and trying to create a consumer inside another attempt to create a consumer doesn't work. For the record, here is what I am trying. This is inside SubscriptionRecoveryPolicy.recover: ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.broker.getVmConnectorURI()); factory.setUseAsyncSend(true); factory.setWatchTopicAdvisories(false); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); TemporaryTopic replaySource = session.createTemporaryTopic(); MessageConsumer replayConsumer = session.createConsumer(replaySource); This hangs on createConsumer: "ActiveMQ Transport: ssl:///10.1.210.140:59574" - Thread t...@53 java.lang.Thread.State: WAITING on java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobj...@3fd48679 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317) at org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40) at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87) at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1276) at org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1874) at org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:254) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1116) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1060) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:973) at org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:946) Note that if I don't set watchTopicAdvisories to false on the connection factory, then this hangs during Connection.start() in a different area, but one also related to creating a consumer. Also, this hang seems to prevent any other new connections to activemq from completing. Anyway, I'm not attached to this methodology. I just want to create a temp topic and consume from it. I don't care how :) So what's the right way to do this? There is an easy way to send messages via the broker object, but nothing for consumers that I can find. Any help is much appreciated! -adam -- View this message in context: http://activemq.2283324.n4.nabble.com/How-to-safely-consume-a-topic-from-INSIDE-the-broker-tp3068674p3068674.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.