James,

Are you saying that CLIENT_ACKNOWLEDGE option can not be trusted?

Vadim.

On Tue, Oct 28, 2008 at 10:44 AM, James Strachan
<[EMAIL PROTECTED]> wrote:
> 2008/10/28 Eugeny N Dzhurinsky <[EMAIL PROTECTED]>:
>> Hello!
>>
>> I am facing some strange issue with acknowledge mode in ActiveMQ. After
>> reading the specs, I realized the broker will never deliver a message to the
>> consumer unless the consumer acknowledges the previous message. So I thought
>> it is enough to not send the acknowledgement message to the server in the
>> onMessage method of the consumer and sent it later from another thread, which
>> does it's job. This means I spawn a thread when onMessage is called and 
>> forget
>> about it in this method.
>
>
> You should only be using a JMS session from one thread at once - so
> you should call message.acknowlege() from the consumer thread.
>
>
>
>>
>> However I've found this is not true. Below is my simple test case - I am
>> starting the single consumer and several producers.
>>
>> ===================================================================================================================
>> import javax.jms.Connection;
>>
>> public class TestAcknowledgeMode {
>>
>>    private static final String DATA = "data";
>>
>>    private static final String SERVER_QUEUE = "server_queue";
>>
>>    private static final String CONNECTION_URL = 
>> "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1";
>>
>>    private static final int CLIENT_THREADS = 5;
>>
>>    static ActiveMQConnectionFactory factory;
>>
>>    Semaphore semaphore = new Semaphore(CLIENT_THREADS, true);
>>
>>    AtomicInteger successCount = new AtomicInteger(0);
>>
>>    @BeforeClass
>>    public static void initFactory() throws Exception {
>>        factory = new ActiveMQConnectionFactory(CONNECTION_URL);
>>        BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(
>>                "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n")));
>>        Logger.getRootLogger().setLevel(Level.DEBUG);
>>    }
>>
>>    @Test(timeout = 10000)
>>    public void testAckDelivery() throws Exception {
>>        final Connection clientConnection = factory.createConnection();
>>        clientConnection.start();
>>        final Connection serverConnection = factory.createConnection();
>>        serverConnection.start();
>>
>>        final Session serverSession = serverConnection.createSession(false,
>>                Session.CLIENT_ACKNOWLEDGE);
>>        final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE);
>>        final MessageConsumer consumer = serverSession
>>                .createConsumer(serverQueue);
>>        consumer.setMessageListener(new MessageListener() {
>>
>>            public void onMessage(final Message message) {
>>                System.out.println("Got message");
>>                try {
>>                    System.out.println(message.getStringProperty(DATA));
>>                    // message.acknowledge();
>>                    successCount.incrementAndGet();
>>                } catch (final Exception e) {
>>                    e.printStackTrace();
>>                }
>>            }
>>
>>        });
>>        for (int i = 0; i < CLIENT_THREADS; i++) {
>>            final Thread t = new Thread(new Runnable() {
>>
>>                public void run() {
>>                    try {
>>                        final Session clientSession = clientConnection
>>                                .createSession(false,
>>                                        Session.CLIENT_ACKNOWLEDGE);
>>                        final Queue clientQueue = clientSession
>>                                .createQueue(SERVER_QUEUE);
>>                        final MessageProducer producer = clientSession
>>                                .createProducer(clientQueue);
>>                        final Message msg = clientSession.createMessage();
>>                        msg.setStringProperty(DATA, Thread.currentThread()
>>                                .getName());
>>                        System.err.println("Sending data "
>>                                + Thread.currentThread().getName());
>>                        producer.send(msg);
>>                    } catch (final Exception e) {
>>                        e.printStackTrace();
>>                    }
>>                }
>>            });
>>            t.start();
>>        }
>>        for (;;) {
>>            if (successCount.intValue() == CLIENT_THREADS)
>>                break;
>>            Thread.sleep(1000);
>>        }
>>        Assert.assertEquals(CLIENT_THREADS, successCount.intValue());
>>    }
>> }
>> ===================================================================================================================
>>
>> I expected the only one message will be processed by the consumer, so the 
>> rest
>> messages will never be delivered to it, but this test shows the messages are
>> delivered to the consumer and onMessage method is called exactly 
>> CLIENT_THREADS times,
>> which seems to be wrong.
>>
>> Does it mean the onMessage method will be executed with no matter of
>> acknowledgement is sent back, as soon as previous execution finishes?
>>
>> And what is the purpose of message.acknowledge() method?
>
> If you don't want the message broker to send another message to the
> consumer until it is acknowledged, set prefetch to 1
> http://activemq.apache.org/what-is-the-prefetch-limit-for.html
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>
> Open Source Integration
> http://fusesource.com/
>



-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT
is explicitly specified

Reply via email to