Sorry, I think I got it wrong. If message remain intact when consumer
disconnect then CLIENT_ACKNOWLEDGE is honored.


On Tue, Oct 28, 2008 at 7:13 PM, Vadim Chekan <[EMAIL PROTECTED]> wrote:
> James,
>
> Are you saying that CLIENT_ACKNOWLEDGE option can not be trusted?
>
> Vadim.
>
> On Tue, Oct 28, 2008 at 10:44 AM, James Strachan
> <[EMAIL PROTECTED]> wrote:
>> 2008/10/28 Eugeny N Dzhurinsky <[EMAIL PROTECTED]>:
>>> Hello!
>>>
>>> I am facing some strange issue with acknowledge mode in ActiveMQ. After
>>> reading the specs, I realized the broker will never deliver a message to the
>>> consumer unless the consumer acknowledges the previous message. So I thought
>>> it is enough to not send the acknowledgement message to the server in the
>>> onMessage method of the consumer and sent it later from another thread, 
>>> which
>>> does it's job. This means I spawn a thread when onMessage is called and 
>>> forget
>>> about it in this method.
>>
>>
>> You should only be using a JMS session from one thread at once - so
>> you should call message.acknowlege() from the consumer thread.
>>
>>
>>
>>>
>>> However I've found this is not true. Below is my simple test case - I am
>>> starting the single consumer and several producers.
>>>
>>> ===================================================================================================================
>>> import javax.jms.Connection;
>>>
>>> public class TestAcknowledgeMode {
>>>
>>>    private static final String DATA = "data";
>>>
>>>    private static final String SERVER_QUEUE = "server_queue";
>>>
>>>    private static final String CONNECTION_URL = 
>>> "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1";
>>>
>>>    private static final int CLIENT_THREADS = 5;
>>>
>>>    static ActiveMQConnectionFactory factory;
>>>
>>>    Semaphore semaphore = new Semaphore(CLIENT_THREADS, true);
>>>
>>>    AtomicInteger successCount = new AtomicInteger(0);
>>>
>>>    @BeforeClass
>>>    public static void initFactory() throws Exception {
>>>        factory = new ActiveMQConnectionFactory(CONNECTION_URL);
>>>        BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(
>>>                "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n")));
>>>        Logger.getRootLogger().setLevel(Level.DEBUG);
>>>    }
>>>
>>>    @Test(timeout = 10000)
>>>    public void testAckDelivery() throws Exception {
>>>        final Connection clientConnection = factory.createConnection();
>>>        clientConnection.start();
>>>        final Connection serverConnection = factory.createConnection();
>>>        serverConnection.start();
>>>
>>>        final Session serverSession = serverConnection.createSession(false,
>>>                Session.CLIENT_ACKNOWLEDGE);
>>>        final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE);
>>>        final MessageConsumer consumer = serverSession
>>>                .createConsumer(serverQueue);
>>>        consumer.setMessageListener(new MessageListener() {
>>>
>>>            public void onMessage(final Message message) {
>>>                System.out.println("Got message");
>>>                try {
>>>                    System.out.println(message.getStringProperty(DATA));
>>>                    // message.acknowledge();
>>>                    successCount.incrementAndGet();
>>>                } catch (final Exception e) {
>>>                    e.printStackTrace();
>>>                }
>>>            }
>>>
>>>        });
>>>        for (int i = 0; i < CLIENT_THREADS; i++) {
>>>            final Thread t = new Thread(new Runnable() {
>>>
>>>                public void run() {
>>>                    try {
>>>                        final Session clientSession = clientConnection
>>>                                .createSession(false,
>>>                                        Session.CLIENT_ACKNOWLEDGE);
>>>                        final Queue clientQueue = clientSession
>>>                                .createQueue(SERVER_QUEUE);
>>>                        final MessageProducer producer = clientSession
>>>                                .createProducer(clientQueue);
>>>                        final Message msg = clientSession.createMessage();
>>>                        msg.setStringProperty(DATA, Thread.currentThread()
>>>                                .getName());
>>>                        System.err.println("Sending data "
>>>                                + Thread.currentThread().getName());
>>>                        producer.send(msg);
>>>                    } catch (final Exception e) {
>>>                        e.printStackTrace();
>>>                    }
>>>                }
>>>            });
>>>            t.start();
>>>        }
>>>        for (;;) {
>>>            if (successCount.intValue() == CLIENT_THREADS)
>>>                break;
>>>            Thread.sleep(1000);
>>>        }
>>>        Assert.assertEquals(CLIENT_THREADS, successCount.intValue());
>>>    }
>>> }
>>> ===================================================================================================================
>>>
>>> I expected the only one message will be processed by the consumer, so the 
>>> rest
>>> messages will never be delivered to it, but this test shows the messages are
>>> delivered to the consumer and onMessage method is called exactly 
>>> CLIENT_THREADS times,
>>> which seems to be wrong.
>>>
>>> Does it mean the onMessage method will be executed with no matter of
>>> acknowledgement is sent back, as soon as previous execution finishes?
>>>
>>> And what is the purpose of message.acknowledge() method?
>>
>> If you don't want the message broker to send another message to the
>> consumer until it is acknowledged, set prefetch to 1
>> http://activemq.apache.org/what-is-the-prefetch-limit-for.html
>>
>> --
>> James
>> -------
>> http://macstrac.blogspot.com/
>>
>> Open Source Integration
>> http://fusesource.com/
>>
>
>
>
> --
> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT
> is explicitly specified
>



-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT
is explicitly specified

Reply via email to