I am writing a custom SubscriptionRecoveryPolicy and one of the things I need
to do while recovering is create a temporary topic and consume messages from
it.

What is the right way to do this?

I don't see a straightforward API for it (such as the one for sending
messages: BrokerFilter.send).

I have tried going the standard JMS route with a ActiveMQConnectionFactory
to the vm://localhost
transport, but I keep running into all kinds of locking problems and
deadlocks that not only hold
up what I am trying to do, but stop ANY connections from succeeding from
then on.  It seems that
creating a connection and trying to create a consumer inside another attempt
to create a consumer
doesn't work.

For the record, here is what I am trying.  This is inside
SubscriptionRecoveryPolicy.recover:


        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(this.broker.getVmConnectorURI());
        factory.setUseAsyncSend(true);
        factory.setWatchTopicAdvisories(false);

        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        TemporaryTopic replaySource = session.createTemporaryTopic();
        MessageConsumer replayConsumer =
session.createConsumer(replaySource);

This hangs on createConsumer:

"ActiveMQ Transport: ssl:///10.1.210.140:59574" - Thread t...@53
   java.lang.Thread.State: WAITING on
java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobj...@3fd48679
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
        at
org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
        at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
        at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1276)
        at
org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1874)
        at
org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:254)
        at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1116)
        at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1060)
        at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:973)
        at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:946)

Note that if I don't set watchTopicAdvisories to false on the connection
factory, then
this hangs during Connection.start() in a different area, but one also
related to creating
a consumer.

Also, this hang seems to prevent any other new connections to activemq from
completing.

Anyway, I'm not attached to this methodology.  I just want to create a temp
topic and
consume from it.  I don't care how :)

So what's the right way to do this?  There is an easy way to send messages
via the
broker object, but nothing for consumers that I can find.  Any help is much
appreciated!

-adam
-- 
View this message in context: 
http://activemq.2283324.n4.nabble.com/How-to-safely-consume-a-topic-from-INSIDE-the-broker-tp3068674p3068674.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to