Sorry, I think I got it wrong. If message remain intact when consumer disconnect then CLIENT_ACKNOWLEDGE is honored.
On Tue, Oct 28, 2008 at 7:13 PM, Vadim Chekan <[EMAIL PROTECTED]> wrote: > James, > > Are you saying that CLIENT_ACKNOWLEDGE option can not be trusted? > > Vadim. > > On Tue, Oct 28, 2008 at 10:44 AM, James Strachan > <[EMAIL PROTECTED]> wrote: >> 2008/10/28 Eugeny N Dzhurinsky <[EMAIL PROTECTED]>: >>> Hello! >>> >>> I am facing some strange issue with acknowledge mode in ActiveMQ. After >>> reading the specs, I realized the broker will never deliver a message to the >>> consumer unless the consumer acknowledges the previous message. So I thought >>> it is enough to not send the acknowledgement message to the server in the >>> onMessage method of the consumer and sent it later from another thread, >>> which >>> does it's job. This means I spawn a thread when onMessage is called and >>> forget >>> about it in this method. >> >> >> You should only be using a JMS session from one thread at once - so >> you should call message.acknowlege() from the consumer thread. >> >> >> >>> >>> However I've found this is not true. Below is my simple test case - I am >>> starting the single consumer and several producers. >>> >>> =================================================================================================================== >>> import javax.jms.Connection; >>> >>> public class TestAcknowledgeMode { >>> >>> private static final String DATA = "data"; >>> >>> private static final String SERVER_QUEUE = "server_queue"; >>> >>> private static final String CONNECTION_URL = >>> "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1"; >>> >>> private static final int CLIENT_THREADS = 5; >>> >>> static ActiveMQConnectionFactory factory; >>> >>> Semaphore semaphore = new Semaphore(CLIENT_THREADS, true); >>> >>> AtomicInteger successCount = new AtomicInteger(0); >>> >>> @BeforeClass >>> public static void initFactory() throws Exception { >>> factory = new ActiveMQConnectionFactory(CONNECTION_URL); >>> BasicConfigurator.configure(new ConsoleAppender(new PatternLayout( >>> "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n"))); >>> Logger.getRootLogger().setLevel(Level.DEBUG); >>> } >>> >>> @Test(timeout = 10000) >>> public void testAckDelivery() throws Exception { >>> final Connection clientConnection = factory.createConnection(); >>> clientConnection.start(); >>> final Connection serverConnection = factory.createConnection(); >>> serverConnection.start(); >>> >>> final Session serverSession = serverConnection.createSession(false, >>> Session.CLIENT_ACKNOWLEDGE); >>> final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE); >>> final MessageConsumer consumer = serverSession >>> .createConsumer(serverQueue); >>> consumer.setMessageListener(new MessageListener() { >>> >>> public void onMessage(final Message message) { >>> System.out.println("Got message"); >>> try { >>> System.out.println(message.getStringProperty(DATA)); >>> // message.acknowledge(); >>> successCount.incrementAndGet(); >>> } catch (final Exception e) { >>> e.printStackTrace(); >>> } >>> } >>> >>> }); >>> for (int i = 0; i < CLIENT_THREADS; i++) { >>> final Thread t = new Thread(new Runnable() { >>> >>> public void run() { >>> try { >>> final Session clientSession = clientConnection >>> .createSession(false, >>> Session.CLIENT_ACKNOWLEDGE); >>> final Queue clientQueue = clientSession >>> .createQueue(SERVER_QUEUE); >>> final MessageProducer producer = clientSession >>> .createProducer(clientQueue); >>> final Message msg = clientSession.createMessage(); >>> msg.setStringProperty(DATA, Thread.currentThread() >>> .getName()); >>> System.err.println("Sending data " >>> + Thread.currentThread().getName()); >>> producer.send(msg); >>> } catch (final Exception e) { >>> e.printStackTrace(); >>> } >>> } >>> }); >>> t.start(); >>> } >>> for (;;) { >>> if (successCount.intValue() == CLIENT_THREADS) >>> break; >>> Thread.sleep(1000); >>> } >>> Assert.assertEquals(CLIENT_THREADS, successCount.intValue()); >>> } >>> } >>> =================================================================================================================== >>> >>> I expected the only one message will be processed by the consumer, so the >>> rest >>> messages will never be delivered to it, but this test shows the messages are >>> delivered to the consumer and onMessage method is called exactly >>> CLIENT_THREADS times, >>> which seems to be wrong. >>> >>> Does it mean the onMessage method will be executed with no matter of >>> acknowledgement is sent back, as soon as previous execution finishes? >>> >>> And what is the purpose of message.acknowledge() method? >> >> If you don't want the message broker to send another message to the >> consumer until it is acknowledged, set prefetch to 1 >> http://activemq.apache.org/what-is-the-prefetch-limit-for.html >> >> -- >> James >> ------- >> http://macstrac.blogspot.com/ >> >> Open Source Integration >> http://fusesource.com/ >> > > > > -- > From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT > is explicitly specified > -- >From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified