In the code below I'm sending 5000 of these:

        static final String MSG = "1"; // 1 character msg

so it shouldn't be a memory issue.  The java file of the code below is here:

        http://66.17.204.68:8765/~andrew/CommandLinePublisher.java

Thanks,
Andrew


-----Original Message-----
From: Rob Davies [mailto:[EMAIL PROTECTED] 
Sent: Monday, March 31, 2008 2:58 AM
To: users@activemq.apache.org
Subject: Re: Retroactive consumers loose msgs...

How big are your messages ? - they might be reclaimed if you hit a  
memory limit in the broker
On 29 Mar 2008, at 07:22, Andrew M wrote:

> My Retroactive Consumer is only receiving the last 789 out of 5000  
> msgs
> sent.  Any suggestions?
>
> Thanks.
>
>
>
>
>
> import javax.jms.Connection;
>
> import javax.jms.DeliveryMode;
>
> import javax.jms.Destination;
>
> import javax.jms.ExceptionListener;
>
> import javax.jms.JMSException;
>
> import javax.jms.Message;
>
> import javax.jms.MessageConsumer;
>
> import javax.jms.MessageListener;
>
> import javax.jms.MessageProducer;
>
> import javax.jms.Session;
>
> import javax.jms.TextMessage;
>
>
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
>
>
>
>
> public class CommandLinePublisher {
>
>
>
>    static final String HOST = "tupolev";
>
>    static final String PORT = "61616";
>
>    static final String URL = "tcp://"+HOST+":"+PORT;
>
>    static final String MSG = "1"; // 1 character msg
>
>    static final String topic = "test";
>
>    static final int MSGS_TO_SEND = 5000;
>
>
>
>    static MessageProducer producer;
>
>    static Session session;
>
>    static Connection connection;
>
>
>
>    public static void main(String[] args) throws Exception {
>
>        new CommandLinePublisher();
>
>    }
>
>
>
>    public CommandLinePublisher() {
>
>
>
>        try {
>
>            connect();
>
>            TextMessage message;
>
>            message = session.createTextMessage(MSG);
>
>            System.out.println("Sent message: " + message.hashCode()  
> + " : "
> + Thread.currentThread().getName());
>
>
>
>            for (int x = 0; x < MSGS_TO_SEND; x++) {
>
>                System.out.println("sending msg " + x);
>
>                producer.send(message);
>
>            }
>
>
>
>            disconnect();
>
>
>
>            new MonitorApp();
>
>
>
>        } catch (JMSException e) {
>
>            e.printStackTrace();
>
>        }
>
>
>
>    }
>
>
>
>
>
>    class MonitorApp implements MessageListener {
>
>
>
>        public MonitorApp() {
>
>
>
>            // connect CLient to ActiveMQ server.
>
>            ActiveMqClient c = new ActiveMqClient(this);
>
>            Thread brokerThread = new Thread(c);
>
>            brokerThread.setDaemon(true);
>
>            brokerThread.start();
>
>
>
>            System.out.println("Waiting for connection to Active MQ
> server...");
>
>            synchronized (this) {
>
>                try {
>
>                    wait();
>
>                } catch (InterruptedException e) {
>
>                    e.printStackTrace();
>
>                }
>
>            }
>
>            System.out.println("...Connected to Active MQ server.");
>
>
>
>
>
>            try {
>
>                c.subscribe("test", this);
>
>            } catch (JMSException e) {
>
>                e.printStackTrace();
>
>            }
>
>
>
>
>
>        }
>
>
>
>        int counter = 0;
>
>
>
>        public void onMessage(Message message) {
>
>            System.out.println("received=" + counter++);
>
>        }
>
>    }
>
>
>
>
>
>    class ActiveMqClient implements Runnable, ExceptionListener {
>
>
>
>        Session session;
>
>        Connection connection;
>
>
>
>        Object o;
>
>
>
>
>
>        public ActiveMqClient(Object o) {
>
>            this.o = o;
>
>
>
>        }
>
>
>
>        public void run() {
>
>
>
>            try {
>
>                String url =
>
>                    "failover:(tcp://" + HOST + ":" + PORT +
> "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>
>
>
>                ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(url);
>
>                // Create a Connection
>
>                connection = connectionFactory.createConnection();
>
>                connection.start();
>
>                connection.setExceptionListener(this);
>
>                // Create a Session
>
>
>
>                session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>
>
>                System.out.println("activeMQ client waiting for msgs");
>
>                synchronized (o) {
>
>                    o.notifyAll();
>
>                }
>
>
>
>
>
>            } catch (Exception e) {
>
>                System.out.println("Caught: " + e);
>
>                e.printStackTrace();
>
>            }
>
>
>
>        }
>
>
>
>        public void close() throws JMSException {
>
>            session.close();
>
>            connection.close();
>
>        }
>
>
>
>        public void subscribe(String destName, MessageListener l)  
> throws
> JMSException {
>
>            char c = destName.contains("?") ? '&' : '?';
>
>            destName = destName + c + "consumer.retroactive=true";
>
>            System.out.println("ActiveMqClient subscribe " + destName);
>
>            MessageConsumer mc =
> session.createConsumer(session.createTopic(destName));
>
>            mc.setMessageListener(l);
>
>        }
>
>
>
>        int messageCounter;
>
>
>
>        public synchronized void onException(JMSException ex) {
>
>            System.out.println("JMS Exception occured.  Shutting down
> client.");
>
>        }
>
>    }
>
>
>
>
>
>    private static void connect() throws JMSException {
>
>        ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(URL);
>
>        connectionFactory.setUseAsyncSend(true);
>
>
>
>        // Create a Connection
>
>        connection = connectionFactory.createConnection();
>
>        connection.start();
>
>
>
>        // Create a Session
>
>        session = connection.createSession(false,  
> Session.AUTO_ACKNOWLEDGE);
>
>
>
>        Destination destination = session.createTopic(topic);
>
>
>
>        // Create a MessageProducer from the Session to the Topic or  
> Queue
>
>        producer = session.createProducer(destination);
>
>        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>
>    }
>
>
>
>    static void disconnect() {
>
>        // Clean up
>
>        try {
>
>            session.close();
>
>        } catch (JMSException e) {
>
>            e.printStackTrace();
>
>        }
>
>
>
>        try {
>
>            connection.close();
>
>        } catch (JMSException e) {
>
>            e.printStackTrace();
>
>        }
>
>    }
>
>
>
>
>
> }
>


Reply via email to