I just ran the test a couple dozen times (against 5.2) and it always gives
me green with the following messages

receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
sender: sent 100-A
sender: sent 100-B
receiver1: got 100-A
sender: got back: 100-A
receiver1: got 100-B
sender: got back: 100-B

Can somebody else give it a try?

Cheers
--
Dejan Bosanac

Open Source Integration - http://fusesource.com/
ActiveMQ in Action - http://www.manning.com/snyder/
Blog - http://www.nighttale.net


On Sat, Feb 21, 2009 at 9:03 PM, hackingbear <hackingb...@gmail.com> wrote:

>
> The latest codes in the SVN still seems to receive from two receivers.
> Maybe
> I miss something?
>
> Anyway, I have written a test case which depends on nothing but AMQ. I was
> using 5.1 and this bug shows up in 5.1; I tried running it in 5.2 for
> several times, and it always works. [What prevents me from upgrading to 5.2
> is a strange performance hit I have not solved yet.]
>
> (Note: in 5.1, this can be reproduced quite reliably but not 100%; so you
> may to run a few times to reproduce it, also make sure to purge the queue
> using jmx.)
>
> The results:
>
> // AMQ 5.2
> receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
> receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
> sender: sent 100-A
> sender: sent 100-B
> receiver1: got 100-A
> sender: got back: 100-A
> receiver1: got 100-B
> sender: got back: 100-B
>
> // AMQ 5.1
> receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
> receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
> sender: sent 100-A
> sender: sent 100-B
> receiver1: got 100-A
> sender: got back: 100-A
> sender: got back: NOTHING
>
>
> The codes:
>
> import java.util.Properties;
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.ObjectMessage;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.naming.Context;
> import javax.naming.InitialContext;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.command.ActiveMQQueue;
>
> public class TestPrefetching extends TestCase {
>    private class Receiver {
>        private final String m_receiverID;
>        private Session m_session;
>        private MessageConsumer m_receiver;
>        private Destination m_queue;
>
>        public Receiver(String receiverID) {
>            try {
>                m_receiverID = receiverID;
>                m_session = s_connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                m_queue = new ActiveMQQueue(qp);
>                print(m_receiverID, "Start receiver to " + qp);
>                m_receiver = m_session.createConsumer(m_queue);
>            } catch (JMSException e) {
>                throw new RuntimeException(e);
>            }
>        }
>
>        public synchronized String getMsg(long waitFor) {
>            try {
>                ObjectMessage objMsg;
>                objMsg = waitFor == 0 ? (ObjectMessage) m_receiver.receive()
>                        : (ObjectMessage) m_receiver.receive(waitFor);
>                if (objMsg != null) {
>                    String msg = (String) objMsg.getObject();
>                    print(m_receiverID, "got " + msg);
>                    return msg;
>                }
>            } catch (JMSException e) {
>                throw new RuntimeException(e);
>            }
>            return "NOTHING";
>        }
>    }
>
>    static final String qp = "TEST_ONLY?consumer.prefetchSize=0";
>
>    static Connection s_connection;
>    static {
>        try {
>            System.setProperty("org.apache.activemq.UseDedicatedTaskRunner",
> "false");
>            Properties props = new Properties();
>            props.put(Context.PROVIDER_URL, "tcp://localhost:61616");
>            InitialContext ctx = new InitialContext(props);
>            ConnectionFactory cf = (ConnectionFactory)
> ctx.lookup("ConnectionFactory");
>            s_connection = cf.createConnection();
>            s_connection.start();
>        } catch (Exception e) {
>            e.printStackTrace();
>            throw new RuntimeException(e);
>        }
>    }
>
>    private static void print(String who, String msg) {
>        System.out.println(who + ": " + msg);
>        System.out.flush();
>    }
>
>    private Receiver m_receiver1;
>    private Receiver m_receiver2;
>
>    @Override
>    protected void setUp() throws Exception {
>        super.setUp();
>        m_receiver1 = new Receiver("receiver1");
>        m_receiver2 = new Receiver("receiver2");
>    }
>
>    public void testReplies() throws Exception {
>        Session session = s_connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>        Queue queue = new ActiveMQQueue(qp);
>        MessageProducer sender = session.createProducer(queue);
>
>        String payload = "100-A";
>        ObjectMessage msg = session.createObjectMessage();
>        msg.setObject(payload);
>        sender.send(msg);
>        print("sender", "sent " + payload);
>
>        payload = "100-B";
>        msg = session.createObjectMessage();
>        msg.setObject(payload);
>        sender.send(msg);
>        print("sender", "sent " + payload);
>
>        Object waiter = "WAITER";
>        synchronized (waiter) {
>            waiter.wait(2000); // wait
>        }
>        String reply = m_receiver1.getMsg(0);
>        print("sender", "got back: " + reply);
>        synchronized (waiter) {
>            waiter.wait(2000); // wait
>        }
>        reply = m_receiver1.getMsg(5000);
>        print("sender", "got back: " + reply);
>    }
> }
>
>
> --
> View this message in context:
> http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22139980.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Reply via email to