It seems that I'm getting the last messages but I'm not getting all of them
for the day.  I probably need to set some Recovery Policy options...
  http://activemq.apache.org/subscription-recovery-policy.html 
Can I do that in my java code instead of the XML?  What would the URL look
like?  I want to configure everything through java.  Are the available
Recovery Policies mutually exclusive or can multiple policies be applied
simultaneously?

Also, when using a retroactive consumer it would make sense to register the
listener before I call session.createConsumer but until the consumer is
instantiated, how can I set the listener?  A Catch 22?  This is my
consumer's method...  any suggestions to improve it?

    public void subscribe(String destName, MessageListener l) throws
JMSException {
        char c = destName.contains("?")?'&':'?';
        destName = destName + c + "consumer.retroactive=true";
        System.out.println("ActiveMqClient subscribe " + destName); 
        MessageConsumer mc = session.createConsumer(
session.createTopic(destName) );
        mc.setMessageListener(l);
    }

Thanks..
Andrew


-----Original Message-----
From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED] On Behalf Of Aaron
Mulder
Sent: Tuesday, March 18, 2008 12:54 PM
To: users@activemq.apache.org
Subject: Re: Retroactive consumer...yes, no, maybe so?

Are you sure destName doesn't already have some "?options" in it?  Are
you sure no other consumer gets the messages off the queue before your
retroactive consumer?  Are you sure you shouldn't set the message
listener on the message consumer before registering it as a consumer?

Thanks,
       Aaron

On Tue, Mar 18, 2008 at 1:19 PM, Andrew M <[EMAIL PROTECTED]> wrote:
> Aaron,
>  My original producer and consumer code are at the bottom of the msg.
Note
>  the subscribe method appends retroactive=true.
>  Thanks for any suggestions you may have..
>  Andrew
>
>
>
>  -----Original Message-----
>  From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED] On Behalf Of Aaron
>  Mulder
>  Sent: Tuesday, March 18, 2008 10:07 AM
>  To: users@activemq.apache.org
>
>
> Subject: Re: Retroactive consumer...yes, no, maybe so?
>
>  Do you want to post your example that's *not* working?  I last used
>  retroactive consumers probably 18 months ago, and they worked fine at
>  that time.  I was doing a network of brokers with fail-over, and if I
>  took one broker down and caused a consumer to fail over, it missed
>  messages during the fail-over operation.  With retroactive consumer
>  enabled, it didn't miss any messages (but got some duplicates) once it
>  failed over.  I don't have that code/configuration at hand, though --
>  just this from my notes:
>
>  topic = new
ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");
>
>  And I used this to set the retroactive consumers to receive the last
>  30s worth of messages, instead of the default (which I think at the
>  time was last 100):
>
>  <broker>
>   <destinationPolicy>
>     <policyMap>
>       <defaultEntry>
>         <policyEntry topic="*">
>           <subscriptionRecoveryPolicy>
>             <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
>           </subscriptionRecoveryPolicy>
>         </policyEntry>
>       </defaultEntry>
>     </policyMap>
>   </destinationPolicy>
>  </broker>
>
>  http://www.activemq.org/site/retroactive-consumer.html
>  http://www.activemq.org/site/subscription-recovery-policy.html
>
>  Thanks,
>        Aaron
>
>  On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <[EMAIL PROTECTED]> wrote:
>
>  >
>
>
> >  -----Original Message-----
>  >  From: Andrew [mailto:[EMAIL PROTECTED]
>  >  Sent: Wednesday, March 05, 2008 2:40 PM
>  >  To: users@activemq.apache.org
>  >  Subject: Retroactive consumer not working...
>  >
>  >  My broker is not feeding my consumer the messages from the retroactive
>  queue
>  >  when the consumer connects.  The producer puts in a 10 min (600000ms)
TTL
>  so
>  >  I would think when my consumer reconnects it should receive the last
10
>  mins
>  >  of msgs.  Otherwise things appear fine, new msgs are received, etc...
any
>  >  ideas?
>  >
>  >
>  >  On the producer.......
>  >
>  >     private Session getActiveMqSession() throws JMSException {
>  >         String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
>  >  ACTIVE_MQ_PORT +
>  >  "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>  >         ActiveMQConnectionFactory connectionFactory = new
>  >  ActiveMQConnectionFactory(url);
>  >         connection = connectionFactory.createConnection();
>  >         ((ActiveMQConnection)connection).setUseAsyncSend(false);
>  >         connection.start();
>  >         return connection.createSession(false,
>  Session.SESSION_TRANSACTED);
>  >     }
>  >
>  >     Session session = getActiveMqSession();
>  >
>  >     void send(Object a) throws blah blah blah {
>  >         Destination destination = session.createQueue(consumerName);
>  >         producer = session.createProducer(destination);
>  >         ObjectMessage m = session.createObjectMessage();
>  >         m.setObject(a);
>  >         //10 min TTL
>  >         ((ActiveMQMessageProducer)producer).send(m,
>  DeliveryMode.PERSISTENT,
>  >  Message.DEFAULT_PRIORITY, 600000L);
>  >     }
>  >
>  >
>  >  ...and on the Consumer...
>  >
>  >     Session session;
>  >
>  >     public void run() {
>  >         String url =
>  >
>
"failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
>  >  ectAttempts=0";
>  >         ActiveMQConnectionFactory connectionFactory = new
>  >  ActiveMQConnectionFactory(url);
>  >       Connection connection = connectionFactory.createConnection();
>  >       connection.start();
>  >       connection.setExceptionListener(this);
>  >       session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
>  >    }
>  >
>  >    public void subscribe(String destName, MessageListener l) throws
>  >  JMSException {
>  >       destName = destName + "?consumer.retroactive=true";
>  >       MessageConsumer mc =
>  >  session.createConsumer(session.createQueue(destName));
>  >       mc.setMessageListener(l);
>  >     }
>  >
>  >
>
>
>
>

Reply via email to