It seems that I'm getting the last messages but I'm not getting all of them for the day. I probably need to set some Recovery Policy options... http://activemq.apache.org/subscription-recovery-policy.html Can I do that in my java code instead of the XML? What would the URL look like? I want to configure everything through java. Are the available Recovery Policies mutually exclusive or can multiple policies be applied simultaneously?
Also, when using a retroactive consumer it would make sense to register the listener before I call session.createConsumer but until the consumer is instantiated, how can I set the listener? A Catch 22? This is my consumer's method... any suggestions to improve it? public void subscribe(String destName, MessageListener l) throws JMSException { char c = destName.contains("?")?'&':'?'; destName = destName + c + "consumer.retroactive=true"; System.out.println("ActiveMqClient subscribe " + destName); MessageConsumer mc = session.createConsumer( session.createTopic(destName) ); mc.setMessageListener(l); } Thanks.. Andrew -----Original Message----- From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED] On Behalf Of Aaron Mulder Sent: Tuesday, March 18, 2008 12:54 PM To: users@activemq.apache.org Subject: Re: Retroactive consumer...yes, no, maybe so? Are you sure destName doesn't already have some "?options" in it? Are you sure no other consumer gets the messages off the queue before your retroactive consumer? Are you sure you shouldn't set the message listener on the message consumer before registering it as a consumer? Thanks, Aaron On Tue, Mar 18, 2008 at 1:19 PM, Andrew M <[EMAIL PROTECTED]> wrote: > Aaron, > My original producer and consumer code are at the bottom of the msg. Note > the subscribe method appends retroactive=true. > Thanks for any suggestions you may have.. > Andrew > > > > -----Original Message----- > From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED] On Behalf Of Aaron > Mulder > Sent: Tuesday, March 18, 2008 10:07 AM > To: users@activemq.apache.org > > > Subject: Re: Retroactive consumer...yes, no, maybe so? > > Do you want to post your example that's *not* working? I last used > retroactive consumers probably 18 months ago, and they worked fine at > that time. I was doing a network of brokers with fail-over, and if I > took one broker down and caused a consumer to fail over, it missed > messages during the fail-over operation. With retroactive consumer > enabled, it didn't miss any messages (but got some duplicates) once it > failed over. I don't have that code/configuration at hand, though -- > just this from my notes: > > topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true"); > > And I used this to set the retroactive consumers to receive the last > 30s worth of messages, instead of the default (which I think at the > time was last 100): > > <broker> > <destinationPolicy> > <policyMap> > <defaultEntry> > <policyEntry topic="*"> > <subscriptionRecoveryPolicy> > <timedSubscriptionRecoveryPolicy recoverDuration="30000" /> > </subscriptionRecoveryPolicy> > </policyEntry> > </defaultEntry> > </policyMap> > </destinationPolicy> > </broker> > > http://www.activemq.org/site/retroactive-consumer.html > http://www.activemq.org/site/subscription-recovery-policy.html > > Thanks, > Aaron > > On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <[EMAIL PROTECTED]> wrote: > > > > > > > -----Original Message----- > > From: Andrew [mailto:[EMAIL PROTECTED] > > Sent: Wednesday, March 05, 2008 2:40 PM > > To: users@activemq.apache.org > > Subject: Retroactive consumer not working... > > > > My broker is not feeding my consumer the messages from the retroactive > queue > > when the consumer connects. The producer puts in a 10 min (600000ms) TTL > so > > I would think when my consumer reconnects it should receive the last 10 > mins > > of msgs. Otherwise things appear fine, new msgs are received, etc... any > > ideas? > > > > > > On the producer....... > > > > private Session getActiveMqSession() throws JMSException { > > String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" + > > ACTIVE_MQ_PORT + > > "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0"; > > ActiveMQConnectionFactory connectionFactory = new > > ActiveMQConnectionFactory(url); > > connection = connectionFactory.createConnection(); > > ((ActiveMQConnection)connection).setUseAsyncSend(false); > > connection.start(); > > return connection.createSession(false, > Session.SESSION_TRANSACTED); > > } > > > > Session session = getActiveMqSession(); > > > > void send(Object a) throws blah blah blah { > > Destination destination = session.createQueue(consumerName); > > producer = session.createProducer(destination); > > ObjectMessage m = session.createObjectMessage(); > > m.setObject(a); > > //10 min TTL > > ((ActiveMQMessageProducer)producer).send(m, > DeliveryMode.PERSISTENT, > > Message.DEFAULT_PRIORITY, 600000L); > > } > > > > > > ...and on the Consumer... > > > > Session session; > > > > public void run() { > > String url = > > > "failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn > > ectAttempts=0"; > > ActiveMQConnectionFactory connectionFactory = new > > ActiveMQConnectionFactory(url); > > Connection connection = connectionFactory.createConnection(); > > connection.start(); > > connection.setExceptionListener(this); > > session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); > > } > > > > public void subscribe(String destName, MessageListener l) throws > > JMSException { > > destName = destName + "?consumer.retroactive=true"; > > MessageConsumer mc = > > session.createConsumer(session.createQueue(destName)); > > mc.setMessageListener(l); > > } > > > > > > > >