My Retroactive Consumer is only receiving the last 789 out of 5000 msgs
sent.  Any suggestions?

Thanks.

 

 

import javax.jms.Connection;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.ExceptionListener;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

 

public class CommandLinePublisher {

 

    static final String HOST = "tupolev";

    static final String PORT = "61616";

    static final String URL = "tcp://"+HOST+":"+PORT;

    static final String MSG = "1"; // 1 character msg

    static final String topic = "test";

    static final int MSGS_TO_SEND = 5000;

 

    static MessageProducer producer;

    static Session session;

    static Connection connection;

 

    public static void main(String[] args) throws Exception {

        new CommandLinePublisher();

    }

 

    public CommandLinePublisher() {

 

        try {

            connect();

            TextMessage message;

            message = session.createTextMessage(MSG);

            System.out.println("Sent message: " + message.hashCode() + " : "
+ Thread.currentThread().getName());

 

            for (int x = 0; x < MSGS_TO_SEND; x++) {

                System.out.println("sending msg " + x);

                producer.send(message);

            }

 

            disconnect();

 

            new MonitorApp();

 

        } catch (JMSException e) {

            e.printStackTrace();

        }

 

    }

 

 

    class MonitorApp implements MessageListener {

 

        public MonitorApp() {

 

            // connect CLient to ActiveMQ server.

            ActiveMqClient c = new ActiveMqClient(this);

            Thread brokerThread = new Thread(c);

            brokerThread.setDaemon(true);

            brokerThread.start();

 

            System.out.println("Waiting for connection to Active MQ
server...");

            synchronized (this) {

                try {

                    wait();

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

            System.out.println("...Connected to Active MQ server.");

 

 

            try {

                c.subscribe("test", this);

            } catch (JMSException e) {

                e.printStackTrace();

            }

 

 

        }

 

        int counter = 0;

 

        public void onMessage(Message message) {

            System.out.println("received=" + counter++);

        }

    }

 

 

    class ActiveMqClient implements Runnable, ExceptionListener {

 

        Session session;

        Connection connection;

 

        Object o;

 

 

        public ActiveMqClient(Object o) {

            this.o = o;

 

        }

 

        public void run() {

 

            try {

                String url = 

                    "failover:(tcp://" + HOST + ":" + PORT +
"?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";

 

                ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);

                // Create a Connection

                connection = connectionFactory.createConnection();

                connection.start();

                connection.setExceptionListener(this);

                // Create a Session

 

                session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

 

                System.out.println("activeMQ client waiting for msgs");

                synchronized (o) {

                    o.notifyAll();

                }

 

 

            } catch (Exception e) {

                System.out.println("Caught: " + e);

                e.printStackTrace();

            }

 

        }

 

        public void close() throws JMSException {

            session.close();

            connection.close();

        }

 

        public void subscribe(String destName, MessageListener l) throws
JMSException {

            char c = destName.contains("?") ? '&' : '?';

            destName = destName + c + "consumer.retroactive=true";

            System.out.println("ActiveMqClient subscribe " + destName);

            MessageConsumer mc =
session.createConsumer(session.createTopic(destName));

            mc.setMessageListener(l);

        }

 

        int messageCounter;

 

        public synchronized void onException(JMSException ex) {

            System.out.println("JMS Exception occured.  Shutting down
client.");

        }

    }

 

 

    private static void connect() throws JMSException {

        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(URL);

        connectionFactory.setUseAsyncSend(true);

 

        // Create a Connection

        connection = connectionFactory.createConnection();

        connection.start();

 

        // Create a Session

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

        Destination destination = session.createTopic(topic);

 

        // Create a MessageProducer from the Session to the Topic or Queue

        producer = session.createProducer(destination);

        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    }

 

    static void disconnect() {

        // Clean up

        try {

            session.close();

        } catch (JMSException e) {

            e.printStackTrace();

        }

 

        try {

            connection.close();

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

 

 

}

Reply via email to