My Retroactive Consumer is only receiving the last 789 out of 5000 msgs sent. Any suggestions?
Thanks. import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class CommandLinePublisher { static final String HOST = "tupolev"; static final String PORT = "61616"; static final String URL = "tcp://"+HOST+":"+PORT; static final String MSG = "1"; // 1 character msg static final String topic = "test"; static final int MSGS_TO_SEND = 5000; static MessageProducer producer; static Session session; static Connection connection; public static void main(String[] args) throws Exception { new CommandLinePublisher(); } public CommandLinePublisher() { try { connect(); TextMessage message; message = session.createTextMessage(MSG); System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName()); for (int x = 0; x < MSGS_TO_SEND; x++) { System.out.println("sending msg " + x); producer.send(message); } disconnect(); new MonitorApp(); } catch (JMSException e) { e.printStackTrace(); } } class MonitorApp implements MessageListener { public MonitorApp() { // connect CLient to ActiveMQ server. ActiveMqClient c = new ActiveMqClient(this); Thread brokerThread = new Thread(c); brokerThread.setDaemon(true); brokerThread.start(); System.out.println("Waiting for connection to Active MQ server..."); synchronized (this) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("...Connected to Active MQ server."); try { c.subscribe("test", this); } catch (JMSException e) { e.printStackTrace(); } } int counter = 0; public void onMessage(Message message) { System.out.println("received=" + counter++); } } class ActiveMqClient implements Runnable, ExceptionListener { Session session; Connection connection; Object o; public ActiveMqClient(Object o) { this.o = o; } public void run() { try { String url = "failover:(tcp://" + HOST + ":" + PORT + "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // Create a Connection connection = connectionFactory.createConnection(); connection.start(); connection.setExceptionListener(this); // Create a Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); System.out.println("activeMQ client waiting for msgs"); synchronized (o) { o.notifyAll(); } } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } public void close() throws JMSException { session.close(); connection.close(); } public void subscribe(String destName, MessageListener l) throws JMSException { char c = destName.contains("?") ? '&' : '?'; destName = destName + c + "consumer.retroactive=true"; System.out.println("ActiveMqClient subscribe " + destName); MessageConsumer mc = session.createConsumer(session.createTopic(destName)); mc.setMessageListener(l); } int messageCounter; public synchronized void onException(JMSException ex) { System.out.println("JMS Exception occured. Shutting down client."); } } private static void connect() throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); connectionFactory.setUseAsyncSend(true); // Create a Connection connection = connectionFactory.createConnection(); connection.start(); // Create a Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(topic); // Create a MessageProducer from the Session to the Topic or Queue producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); } static void disconnect() { // Clean up try { session.close(); } catch (JMSException e) { e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }