How big are your messages ? - they might be reclaimed if you hit a
memory limit in the broker
On 29 Mar 2008, at 07:22, Andrew M wrote:
My Retroactive Consumer is only receiving the last 789 out of 5000
msgs
sent. Any suggestions?
Thanks.
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class CommandLinePublisher {
static final String HOST = "tupolev";
static final String PORT = "61616";
static final String URL = "tcp://"+HOST+":"+PORT;
static final String MSG = "1"; // 1 character msg
static final String topic = "test";
static final int MSGS_TO_SEND = 5000;
static MessageProducer producer;
static Session session;
static Connection connection;
public static void main(String[] args) throws Exception {
new CommandLinePublisher();
}
public CommandLinePublisher() {
try {
connect();
TextMessage message;
message = session.createTextMessage(MSG);
System.out.println("Sent message: " + message.hashCode()
+ " : "
+ Thread.currentThread().getName());
for (int x = 0; x < MSGS_TO_SEND; x++) {
System.out.println("sending msg " + x);
producer.send(message);
}
disconnect();
new MonitorApp();
} catch (JMSException e) {
e.printStackTrace();
}
}
class MonitorApp implements MessageListener {
public MonitorApp() {
// connect CLient to ActiveMQ server.
ActiveMqClient c = new ActiveMqClient(this);
Thread brokerThread = new Thread(c);
brokerThread.setDaemon(true);
brokerThread.start();
System.out.println("Waiting for connection to Active MQ
server...");
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("...Connected to Active MQ server.");
try {
c.subscribe("test", this);
} catch (JMSException e) {
e.printStackTrace();
}
}
int counter = 0;
public void onMessage(Message message) {
System.out.println("received=" + counter++);
}
}
class ActiveMqClient implements Runnable, ExceptionListener {
Session session;
Connection connection;
Object o;
public ActiveMqClient(Object o) {
this.o = o;
}
public void run() {
try {
String url =
"failover:(tcp://" + HOST + ":" + PORT +
"?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
// Create a Connection
connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
System.out.println("activeMQ client waiting for msgs");
synchronized (o) {
o.notifyAll();
}
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void close() throws JMSException {
session.close();
connection.close();
}
public void subscribe(String destName, MessageListener l)
throws
JMSException {
char c = destName.contains("?") ? '&' : '?';
destName = destName + c + "consumer.retroactive=true";
System.out.println("ActiveMqClient subscribe " + destName);
MessageConsumer mc =
session.createConsumer(session.createTopic(destName));
mc.setMessageListener(l);
}
int messageCounter;
public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down
client.");
}
}
private static void connect() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(URL);
connectionFactory.setUseAsyncSend(true);
// Create a Connection
connection = connectionFactory.createConnection();
connection.start();
// Create a Session
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic);
// Create a MessageProducer from the Session to the Topic or
Queue
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
static void disconnect() {
// Clean up
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}