----- Original Message ---- > From: Jose Luna <j-l...@rocketmail.com> > To: users@activemq.apache.org > Sent: Thursday, April 30, 2009 3:04:33 PM > Subject: Durable subscription with subscription recovery policy > > > Hello, > > I haven't received a response after a few days, so I'd like to try again. > I'll > try to be brief this time: > > Our consumers disconnect/reconnect frequently. We need durable subscriptions > to > work with the subscription recovery policy. Currently (Activemq 5.2), > durable > subscriptions with the "activemq.retroactive" header (STOMP) will receive all > retroactive messages every time the durable subscriber reconnects. > > A bug report can be found at > https://issues.apache.org/activemq/browse/AMQ-1549 > . The bug is open and assigned, with fixed > version marked as 5.3 (although no work has been done yet). > > However, in this thread > http://www.nabble.com/Durable-subscriptions-to19688854.html, Bruce and James > seems to say that subscription recovery policy is > only for non-durable subscriptions. > > So, are durable subscriptions meant to work with subscription recovery > policies? It used to work this way in 4.x, but I'm not sure if it was an > intentional change for 5.x. > > I'm looking for two kinds of advice: > 1.) How we might patch this ourselves. We're a small development team on a > tight deadline, but we might be able to take a crack at it. > > 2.) Any advice on how to otherwise circumvent this problem. The obvious > solution is to use the retroactive header only on the very first connection, > but > how can we know when the last retroactive message is received? > > I hope reposting is ok, I couldn't find any mailing list rules. Here's the > more > verbose version that I originally wrote: > http://www.nabble.com/Durable-subscription-with-retroactive-subscription-recovery-policy-td23258698ef2356.html > > Thanks for your time, > > JLuna
Just in case anyone has a similar problem finds this post, we ended up modifying ActiveMQ to send an advisory message after all past messages have been recovered for the subscription. (The Advisory message goes only to the consumer, not to the topic for all consumers to see.) This allows the consumer to know that it has received all past messages and shouldn't use the retroactive header in future. This is undoubtedly somewhat "hackish" way to do it, but it's the quickest thing we could come up with given a short deadline. The diff is attached. JLuna
diff -rupN apache-activemq-5.3.0.1-fuse/src/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java apache-activemq-5.3.0.1-fuse-modified/src/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java --- apache-activemq-5.3.0.1-fuse/src/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java 2009-04-22 17:35:56.000000000 -0400 +++ apache-activemq-5.3.0.1-fuse-modified/src/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java 2009-05-06 11:27:05.000000000 -0400 @@ -27,6 +27,20 @@ import org.apache.activemq.command.Activ import org.apache.activemq.command.Message; import org.apache.activemq.filter.DestinationFilter; +//Imports for USBMIS modification +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.JMSException; + +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.util.IdGenerator; + + /** * This implementation of {...@link SubscriptionRecoveryPolicy} will keep a fixed * count of last messages. @@ -39,6 +53,10 @@ public class FixedCountSubscriptionRecov private int maximumSize = 100; private int tail; + private AtomicLong messageSequence = new AtomicLong(0); + private IdGenerator idGenerator = new IdGenerator(); + private ProducerId producerId = createProducerId(); + public SubscriptionRecoveryPolicy copy() { FixedCountSubscriptionRecoveryPolicy rc = new FixedCountSubscriptionRecoveryPolicy(); rc.setMaximumSize(maximumSize); @@ -54,6 +72,14 @@ public class FixedCountSubscriptionRecov } public synchronized void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception { + //Prepare advisory message that will be sent when recovery is complete. + ActiveMQMessage advisoryMessage= new ActiveMQMessage(); + advisoryMessage.setType("Advisory.SubscriptionRecoveryComplete"); + advisoryMessage.setRegionDestination((Destination)topic); + advisoryMessage.setResponseRequired(false); + advisoryMessage.setPersistent(false); + configureMessage(advisoryMessage); + // Re-dispatch the last message seen. int t = tail; // The buffer may not have rolled over yet..., start from the front @@ -62,6 +88,7 @@ public class FixedCountSubscriptionRecov } // Well the buffer is really empty then. if (messages[t] == null) { + sub.addRecoveredMessage(context, (MessageReference)advisoryMessage); return; } // Keep dispatching until t hit's tail again. @@ -73,8 +100,25 @@ public class FixedCountSubscriptionRecov t = 0; } } while (t != tail); + + sub.addRecoveredMessage(context, (MessageReference)advisoryMessage); + } + protected void configureMessage(ActiveMQMessage msg) throws JMSException { + long sequenceNumber = messageSequence.incrementAndGet(); + msg.setMessageId(new MessageId(producerId, sequenceNumber)); + msg.onSend(); + msg.setProducerId(producerId); + } + + protected ProducerId createProducerId() { + String id = idGenerator.generateId(); + ConnectionId connectionId = new ConnectionId(id); + SessionId sessionId = new SessionId(connectionId, 1); + return new ProducerId(sessionId, 1); + } + public void start() throws Exception { messages = new MessageReference[maximumSize]; }