You could  create your connection and consumer in the initialization
of the recovery policy rather than an the recover method, that may
help.

On 2 December 2010 08:35, adam <adam.suss...@gmail.com> wrote:
>
> I am writing a custom SubscriptionRecoveryPolicy and one of the things I need
> to do while recovering is create a temporary topic and consume messages from
> it.
>
> What is the right way to do this?
>
> I don't see a straightforward API for it (such as the one for sending
> messages: BrokerFilter.send).
>
> I have tried going the standard JMS route with a ActiveMQConnectionFactory
> to the vm://localhost
> transport, but I keep running into all kinds of locking problems and
> deadlocks that not only hold
> up what I am trying to do, but stop ANY connections from succeeding from
> then on.  It seems that
> creating a connection and trying to create a consumer inside another attempt
> to create a consumer
> doesn't work.
>
> For the record, here is what I am trying.  This is inside
> SubscriptionRecoveryPolicy.recover:
>
>
>        ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory(this.broker.getVmConnectorURI());
>        factory.setUseAsyncSend(true);
>        factory.setWatchTopicAdvisories(false);
>
>        Connection connection = factory.createConnection();
>        connection.start();
>
>        Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>        TemporaryTopic replaySource = session.createTemporaryTopic();
>        MessageConsumer replayConsumer =
> session.createConsumer(replaySource);
>
> This hangs on createConsumer:
>
> "ActiveMQ Transport: ssl:///10.1.210.140:59574" - Thread t...@53
>   java.lang.Thread.State: WAITING on
> java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobj...@3fd48679
>        at sun.misc.Unsafe.park(Native Method)
>        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>        at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>        at
> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
>        at
> org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
>        at
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
>        at
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1276)
>        at
> org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1874)
>        at
> org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:254)
>        at
> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1116)
>        at
> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:1060)
>        at
> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:973)
>        at
> org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:946)
>
> Note that if I don't set watchTopicAdvisories to false on the connection
> factory, then
> this hangs during Connection.start() in a different area, but one also
> related to creating
> a consumer.
>
> Also, this hang seems to prevent any other new connections to activemq from
> completing.
>
> Anyway, I'm not attached to this methodology.  I just want to create a temp
> topic and
> consume from it.  I don't care how :)
>
> So what's the right way to do this?  There is an easy way to send messages
> via the
> broker object, but nothing for consumers that I can find.  Any help is much
> appreciated!
>
> -adam
> --
> View this message in context: 
> http://activemq.2283324.n4.nabble.com/How-to-safely-consume-a-topic-from-INSIDE-the-broker-tp3068674p3068674.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://blog.garytully.com
http://fusesource.com

Reply via email to