In the code below I'm sending 5000 of these: static final String MSG = "1"; // 1 character msg
so it shouldn't be a memory issue. The java file of the code below is here: http://66.17.204.68:8765/~andrew/CommandLinePublisher.java Thanks, Andrew -----Original Message----- From: Rob Davies [mailto:[EMAIL PROTECTED] Sent: Monday, March 31, 2008 2:58 AM To: users@activemq.apache.org Subject: Re: Retroactive consumers loose msgs... How big are your messages ? - they might be reclaimed if you hit a memory limit in the broker On 29 Mar 2008, at 07:22, Andrew M wrote: > My Retroactive Consumer is only receiving the last 789 out of 5000 > msgs > sent. Any suggestions? > > Thanks. > > > > > > import javax.jms.Connection; > > import javax.jms.DeliveryMode; > > import javax.jms.Destination; > > import javax.jms.ExceptionListener; > > import javax.jms.JMSException; > > import javax.jms.Message; > > import javax.jms.MessageConsumer; > > import javax.jms.MessageListener; > > import javax.jms.MessageProducer; > > import javax.jms.Session; > > import javax.jms.TextMessage; > > > > import org.apache.activemq.ActiveMQConnectionFactory; > > > > > > public class CommandLinePublisher { > > > > static final String HOST = "tupolev"; > > static final String PORT = "61616"; > > static final String URL = "tcp://"+HOST+":"+PORT; > > static final String MSG = "1"; // 1 character msg > > static final String topic = "test"; > > static final int MSGS_TO_SEND = 5000; > > > > static MessageProducer producer; > > static Session session; > > static Connection connection; > > > > public static void main(String[] args) throws Exception { > > new CommandLinePublisher(); > > } > > > > public CommandLinePublisher() { > > > > try { > > connect(); > > TextMessage message; > > message = session.createTextMessage(MSG); > > System.out.println("Sent message: " + message.hashCode() > + " : " > + Thread.currentThread().getName()); > > > > for (int x = 0; x < MSGS_TO_SEND; x++) { > > System.out.println("sending msg " + x); > > producer.send(message); > > } > > > > disconnect(); > > > > new MonitorApp(); > > > > } catch (JMSException e) { > > e.printStackTrace(); > > } > > > > } > > > > > > class MonitorApp implements MessageListener { > > > > public MonitorApp() { > > > > // connect CLient to ActiveMQ server. > > ActiveMqClient c = new ActiveMqClient(this); > > Thread brokerThread = new Thread(c); > > brokerThread.setDaemon(true); > > brokerThread.start(); > > > > System.out.println("Waiting for connection to Active MQ > server..."); > > synchronized (this) { > > try { > > wait(); > > } catch (InterruptedException e) { > > e.printStackTrace(); > > } > > } > > System.out.println("...Connected to Active MQ server."); > > > > > > try { > > c.subscribe("test", this); > > } catch (JMSException e) { > > e.printStackTrace(); > > } > > > > > > } > > > > int counter = 0; > > > > public void onMessage(Message message) { > > System.out.println("received=" + counter++); > > } > > } > > > > > > class ActiveMqClient implements Runnable, ExceptionListener { > > > > Session session; > > Connection connection; > > > > Object o; > > > > > > public ActiveMqClient(Object o) { > > this.o = o; > > > > } > > > > public void run() { > > > > try { > > String url = > > "failover:(tcp://" + HOST + ":" + PORT + > "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0"; > > > > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(url); > > // Create a Connection > > connection = connectionFactory.createConnection(); > > connection.start(); > > connection.setExceptionListener(this); > > // Create a Session > > > > session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > > > > System.out.println("activeMQ client waiting for msgs"); > > synchronized (o) { > > o.notifyAll(); > > } > > > > > > } catch (Exception e) { > > System.out.println("Caught: " + e); > > e.printStackTrace(); > > } > > > > } > > > > public void close() throws JMSException { > > session.close(); > > connection.close(); > > } > > > > public void subscribe(String destName, MessageListener l) > throws > JMSException { > > char c = destName.contains("?") ? '&' : '?'; > > destName = destName + c + "consumer.retroactive=true"; > > System.out.println("ActiveMqClient subscribe " + destName); > > MessageConsumer mc = > session.createConsumer(session.createTopic(destName)); > > mc.setMessageListener(l); > > } > > > > int messageCounter; > > > > public synchronized void onException(JMSException ex) { > > System.out.println("JMS Exception occured. Shutting down > client."); > > } > > } > > > > > > private static void connect() throws JMSException { > > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(URL); > > connectionFactory.setUseAsyncSend(true); > > > > // Create a Connection > > connection = connectionFactory.createConnection(); > > connection.start(); > > > > // Create a Session > > session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > > > > Destination destination = session.createTopic(topic); > > > > // Create a MessageProducer from the Session to the Topic or > Queue > > producer = session.createProducer(destination); > > producer.setDeliveryMode(DeliveryMode.PERSISTENT); > > } > > > > static void disconnect() { > > // Clean up > > try { > > session.close(); > > } catch (JMSException e) { > > e.printStackTrace(); > > } > > > > try { > > connection.close(); > > } catch (JMSException e) { > > e.printStackTrace(); > > } > > } > > > > > > } >