The latest codes in the SVN still seems to receive from two receivers. Maybe
I miss something?

Anyway, I have written a test case which depends on nothing but AMQ. I was
using 5.1 and this bug shows up in 5.1; I tried running it in 5.2 for
several times, and it always works. [What prevents me from upgrading to 5.2
is a strange performance hit I have not solved yet.]

(Note: in 5.1, this can be reproduced quite reliably but not 100%; so you
may to run a few times to reproduce it, also make sure to purge the queue
using jmx.)

The results:

// AMQ 5.2
receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
sender: sent 100-A
sender: sent 100-B
receiver1: got 100-A
sender: got back: 100-A
receiver1: got 100-B
sender: got back: 100-B

// AMQ 5.1
receiver1: Start receiver to TEST_ONLY?consumer.prefetchSize=0
receiver2: Start receiver to TEST_ONLY?consumer.prefetchSize=0
sender: sent 100-A
sender: sent 100-B
receiver1: got 100-A
sender: got back: 100-A
sender: got back: NOTHING


The codes:

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

import junit.framework.TestCase;

import org.apache.activemq.command.ActiveMQQueue;

public class TestPrefetching extends TestCase {
    private class Receiver {
        private final String m_receiverID;
        private Session m_session;
        private MessageConsumer m_receiver;
        private Destination m_queue;

        public Receiver(String receiverID) {
            try {
                m_receiverID = receiverID;
                m_session = s_connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                m_queue = new ActiveMQQueue(qp);
                print(m_receiverID, "Start receiver to " + qp);
                m_receiver = m_session.createConsumer(m_queue);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

        public synchronized String getMsg(long waitFor) {
            try {
                ObjectMessage objMsg;
                objMsg = waitFor == 0 ? (ObjectMessage) m_receiver.receive()
                        : (ObjectMessage) m_receiver.receive(waitFor);
                if (objMsg != null) {
                    String msg = (String) objMsg.getObject();
                    print(m_receiverID, "got " + msg);
                    return msg;
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
            return "NOTHING";
        }
    }

    static final String qp = "TEST_ONLY?consumer.prefetchSize=0";

    static Connection s_connection;
    static {
        try {
            System.setProperty("org.apache.activemq.UseDedicatedTaskRunner",
"false");
            Properties props = new Properties();
            props.put(Context.PROVIDER_URL, "tcp://localhost:61616");
            InitialContext ctx = new InitialContext(props);
            ConnectionFactory cf = (ConnectionFactory)
ctx.lookup("ConnectionFactory");
            s_connection = cf.createConnection();
            s_connection.start();
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private static void print(String who, String msg) {
        System.out.println(who + ": " + msg);
        System.out.flush();
    }

    private Receiver m_receiver1;
    private Receiver m_receiver2;

    @Override
    protected void setUp() throws Exception {
        super.setUp();
        m_receiver1 = new Receiver("receiver1");
        m_receiver2 = new Receiver("receiver2");
    }

    public void testReplies() throws Exception {
        Session session = s_connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        Queue queue = new ActiveMQQueue(qp);
        MessageProducer sender = session.createProducer(queue);

        String payload = "100-A";
        ObjectMessage msg = session.createObjectMessage();
        msg.setObject(payload);
        sender.send(msg);
        print("sender", "sent " + payload);

        payload = "100-B";
        msg = session.createObjectMessage();
        msg.setObject(payload);
        sender.send(msg);
        print("sender", "sent " + payload);

        Object waiter = "WAITER";
        synchronized (waiter) {
            waiter.wait(2000); // wait
        }
        String reply = m_receiver1.getMsg(0);
        print("sender", "got back: " + reply);
        synchronized (waiter) {
            waiter.wait(2000); // wait
        }
        reply = m_receiver1.getMsg(5000);
        print("sender", "got back: " + reply);
    }
}


-- 
View this message in context: 
http://www.nabble.com/Prefetch%3D0-how-to--tp22018602p22139980.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to