2008/10/28 Eugeny N Dzhurinsky <[EMAIL PROTECTED]>:
> Hello!
>
> I am facing some strange issue with acknowledge mode in ActiveMQ. After
> reading the specs, I realized the broker will never deliver a message to the
> consumer unless the consumer acknowledges the previous message. So I thought
> it is enough to not send the acknowledgement message to the server in the
> onMessage method of the consumer and sent it later from another thread, which
> does it's job. This means I spawn a thread when onMessage is called and forget
> about it in this method.


You should only be using a JMS session from one thread at once - so
you should call message.acknowlege() from the consumer thread.



>
> However I've found this is not true. Below is my simple test case - I am
> starting the single consumer and several producers.
>
> ===================================================================================================================
> import javax.jms.Connection;
>
> public class TestAcknowledgeMode {
>
>    private static final String DATA = "data";
>
>    private static final String SERVER_QUEUE = "server_queue";
>
>    private static final String CONNECTION_URL = 
> "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1";
>
>    private static final int CLIENT_THREADS = 5;
>
>    static ActiveMQConnectionFactory factory;
>
>    Semaphore semaphore = new Semaphore(CLIENT_THREADS, true);
>
>    AtomicInteger successCount = new AtomicInteger(0);
>
>    @BeforeClass
>    public static void initFactory() throws Exception {
>        factory = new ActiveMQConnectionFactory(CONNECTION_URL);
>        BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(
>                "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n")));
>        Logger.getRootLogger().setLevel(Level.DEBUG);
>    }
>
>    @Test(timeout = 10000)
>    public void testAckDelivery() throws Exception {
>        final Connection clientConnection = factory.createConnection();
>        clientConnection.start();
>        final Connection serverConnection = factory.createConnection();
>        serverConnection.start();
>
>        final Session serverSession = serverConnection.createSession(false,
>                Session.CLIENT_ACKNOWLEDGE);
>        final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE);
>        final MessageConsumer consumer = serverSession
>                .createConsumer(serverQueue);
>        consumer.setMessageListener(new MessageListener() {
>
>            public void onMessage(final Message message) {
>                System.out.println("Got message");
>                try {
>                    System.out.println(message.getStringProperty(DATA));
>                    // message.acknowledge();
>                    successCount.incrementAndGet();
>                } catch (final Exception e) {
>                    e.printStackTrace();
>                }
>            }
>
>        });
>        for (int i = 0; i < CLIENT_THREADS; i++) {
>            final Thread t = new Thread(new Runnable() {
>
>                public void run() {
>                    try {
>                        final Session clientSession = clientConnection
>                                .createSession(false,
>                                        Session.CLIENT_ACKNOWLEDGE);
>                        final Queue clientQueue = clientSession
>                                .createQueue(SERVER_QUEUE);
>                        final MessageProducer producer = clientSession
>                                .createProducer(clientQueue);
>                        final Message msg = clientSession.createMessage();
>                        msg.setStringProperty(DATA, Thread.currentThread()
>                                .getName());
>                        System.err.println("Sending data "
>                                + Thread.currentThread().getName());
>                        producer.send(msg);
>                    } catch (final Exception e) {
>                        e.printStackTrace();
>                    }
>                }
>            });
>            t.start();
>        }
>        for (;;) {
>            if (successCount.intValue() == CLIENT_THREADS)
>                break;
>            Thread.sleep(1000);
>        }
>        Assert.assertEquals(CLIENT_THREADS, successCount.intValue());
>    }
> }
> ===================================================================================================================
>
> I expected the only one message will be processed by the consumer, so the rest
> messages will never be delivered to it, but this test shows the messages are
> delivered to the consumer and onMessage method is called exactly 
> CLIENT_THREADS times,
> which seems to be wrong.
>
> Does it mean the onMessage method will be executed with no matter of
> acknowledgement is sent back, as soon as previous execution finishes?
>
> And what is the purpose of message.acknowledge() method?

If you don't want the message broker to send another message to the
consumer until it is acknowledged, set prefetch to 1
http://activemq.apache.org/what-is-the-prefetch-limit-for.html

-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://fusesource.com/

Reply via email to