I just ran the test a couple dozen times (against 5.2) and it always gives me green with the following messages
receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0 receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0 sender: sent 100-A sender: sent 100-B receiver1: got 100-A sender: got back: 100-A receiver1: got 100-B sender: got back: 100-B Can somebody else give it a try? Cheers -- Dejan Bosanac Open Source Integration - http://fusesource.com/ ActiveMQ in Action - http://www.manning.com/snyder/ Blog - http://www.nighttale.net On Sat, Feb 21, 2009 at 9:03 PM, hackingbear <hackingb...@gmail.com> wrote: > > The latest codes in the SVN still seems to receive from two receivers. > Maybe > I miss something? > > Anyway, I have written a test case which depends on nothing but AMQ. I was > using 5.1 and this bug shows up in 5.1; I tried running it in 5.2 for > several times, and it always works. [What prevents me from upgrading to 5.2 > is a strange performance hit I have not solved yet.] > > (Note: in 5.1, this can be reproduced quite reliably but not 100%; so you > may to run a few times to reproduce it, also make sure to purge the queue > using jmx.) > > The results: > > // AMQ 5.2 > receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0 > receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0 > sender: sent 100-A > sender: sent 100-B > receiver1: got 100-A > sender: got back: 100-A > receiver1: got 100-B > sender: got back: 100-B > > // AMQ 5.1 > receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0 > receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0 > sender: sent 100-A > sender: sent 100-B > receiver1: got 100-A > sender: got back: 100-A > sender: got back: NOTHING > > > The codes: > > import java.util.Properties; > > import javax.jms.Connection; > import javax.jms.ConnectionFactory; > import javax.jms.Destination; > import javax.jms.JMSException; > import javax.jms.MessageConsumer; > import javax.jms.MessageProducer; > import javax.jms.ObjectMessage; > import javax.jms.Queue; > import javax.jms.Session; > import javax.naming.Context; > import javax.naming.InitialContext; > > import junit.framework.TestCase; > > import org.apache.activemq.command.ActiveMQQueue; > > public class TestPrefetching extends TestCase { > private class Receiver { > private final String m_receiverID; > private Session m_session; > private MessageConsumer m_receiver; > private Destination m_queue; > > public Receiver(String receiverID) { > try { > m_receiverID = receiverID; > m_session = s_connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > m_queue = new ActiveMQQueue(qp); > print(m_receiverID, "Start receiver to " + qp); > m_receiver = m_session.createConsumer(m_queue); > } catch (JMSException e) { > throw new RuntimeException(e); > } > } > > public synchronized String getMsg(long waitFor) { > try { > ObjectMessage objMsg; > objMsg = waitFor == 0 ? (ObjectMessage) m_receiver.receive() > : (ObjectMessage) m_receiver.receive(waitFor); > if (objMsg != null) { > String msg = (String) objMsg.getObject(); > print(m_receiverID, "got " + msg); > return msg; > } > } catch (JMSException e) { > throw new RuntimeException(e); > } > return "NOTHING"; > } > } > > static final String qp = "TEST_ONLY?consumer.prefetchSize=0"; > > static Connection s_connection; > static { > try { > System.setProperty("org.apache.activemq.UseDedicatedTaskRunner", > "false"); > Properties props = new Properties(); > props.put(Context.PROVIDER_URL, "tcp://localhost:61616"); > InitialContext ctx = new InitialContext(props); > ConnectionFactory cf = (ConnectionFactory) > ctx.lookup("ConnectionFactory"); > s_connection = cf.createConnection(); > s_connection.start(); > } catch (Exception e) { > e.printStackTrace(); > throw new RuntimeException(e); > } > } > > private static void print(String who, String msg) { > System.out.println(who + ": " + msg); > System.out.flush(); > } > > private Receiver m_receiver1; > private Receiver m_receiver2; > > @Override > protected void setUp() throws Exception { > super.setUp(); > m_receiver1 = new Receiver("receiver1"); > m_receiver2 = new Receiver("receiver2"); > } > > public void testReplies() throws Exception { > Session session = s_connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > Queue queue = new ActiveMQQueue(qp); > MessageProducer sender = session.createProducer(queue); > > String payload = "100-A"; > ObjectMessage msg = session.createObjectMessage(); > msg.setObject(payload); > sender.send(msg); > print("sender", "sent " + payload); > > payload = "100-B"; > msg = session.createObjectMessage(); > msg.setObject(payload); > sender.send(msg); > print("sender", "sent " + payload); > > Object waiter = "WAITER"; > synchronized (waiter) { > waiter.wait(2000); // wait > } > String reply = m_receiver1.getMsg(0); > print("sender", "got back: " + reply); > synchronized (waiter) { > waiter.wait(2000); // wait > } > reply = m_receiver1.getMsg(5000); > print("sender", "got back: " + reply); > } > } > > > -- > View this message in context: > http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22139980.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. >