How big are your messages ? - they might be reclaimed if you hit a memory limit in the broker
On 29 Mar 2008, at 07:22, Andrew M wrote:

My Retroactive Consumer is only receiving the last 789 out of 5000 msgs
sent.  Any suggestions?

Thanks.





import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.ExceptionListener;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;



import org.apache.activemq.ActiveMQConnectionFactory;





public class CommandLinePublisher {



   static final String HOST = "tupolev";

   static final String PORT = "61616";

   static final String URL = "tcp://"+HOST+":"+PORT;

   static final String MSG = "1"; // 1 character msg

   static final String topic = "test";

   static final int MSGS_TO_SEND = 5000;



   static MessageProducer producer;

   static Session session;

   static Connection connection;



   public static void main(String[] args) throws Exception {

       new CommandLinePublisher();

   }



   public CommandLinePublisher() {



       try {

           connect();

           TextMessage message;

           message = session.createTextMessage(MSG);

System.out.println("Sent message: " + message.hashCode() + " : "
+ Thread.currentThread().getName());



           for (int x = 0; x < MSGS_TO_SEND; x++) {

               System.out.println("sending msg " + x);

               producer.send(message);

           }



           disconnect();



           new MonitorApp();



       } catch (JMSException e) {

           e.printStackTrace();

       }



   }





   class MonitorApp implements MessageListener {



       public MonitorApp() {



           // connect CLient to ActiveMQ server.

           ActiveMqClient c = new ActiveMqClient(this);

           Thread brokerThread = new Thread(c);

           brokerThread.setDaemon(true);

           brokerThread.start();



           System.out.println("Waiting for connection to Active MQ
server...");

           synchronized (this) {

               try {

                   wait();

               } catch (InterruptedException e) {

                   e.printStackTrace();

               }

           }

           System.out.println("...Connected to Active MQ server.");





           try {

               c.subscribe("test", this);

           } catch (JMSException e) {

               e.printStackTrace();

           }





       }



       int counter = 0;



       public void onMessage(Message message) {

           System.out.println("received=" + counter++);

       }

   }





   class ActiveMqClient implements Runnable, ExceptionListener {



       Session session;

       Connection connection;



       Object o;





       public ActiveMqClient(Object o) {

           this.o = o;



       }



       public void run() {



           try {

               String url =

                   "failover:(tcp://" + HOST + ":" + PORT +
"?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";



               ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);

               // Create a Connection

               connection = connectionFactory.createConnection();

               connection.start();

               connection.setExceptionListener(this);

               // Create a Session



               session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);



               System.out.println("activeMQ client waiting for msgs");

               synchronized (o) {

                   o.notifyAll();

               }





           } catch (Exception e) {

               System.out.println("Caught: " + e);

               e.printStackTrace();

           }



       }



       public void close() throws JMSException {

           session.close();

           connection.close();

       }



public void subscribe(String destName, MessageListener l) throws
JMSException {

           char c = destName.contains("?") ? '&' : '?';

           destName = destName + c + "consumer.retroactive=true";

           System.out.println("ActiveMqClient subscribe " + destName);

           MessageConsumer mc =
session.createConsumer(session.createTopic(destName));

           mc.setMessageListener(l);

       }



       int messageCounter;



       public synchronized void onException(JMSException ex) {

           System.out.println("JMS Exception occured.  Shutting down
client.");

       }

   }





   private static void connect() throws JMSException {

       ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(URL);

       connectionFactory.setUseAsyncSend(true);



       // Create a Connection

       connection = connectionFactory.createConnection();

       connection.start();



       // Create a Session

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);



       Destination destination = session.createTopic(topic);



// Create a MessageProducer from the Session to the Topic or Queue

       producer = session.createProducer(destination);

       producer.setDeliveryMode(DeliveryMode.PERSISTENT);

   }



   static void disconnect() {

       // Clean up

       try {

           session.close();

       } catch (JMSException e) {

           e.printStackTrace();

       }



       try {

           connection.close();

       } catch (JMSException e) {

           e.printStackTrace();

       }

   }





}


Reply via email to