James, Are you saying that CLIENT_ACKNOWLEDGE option can not be trusted?
Vadim. On Tue, Oct 28, 2008 at 10:44 AM, James Strachan <[EMAIL PROTECTED]> wrote: > 2008/10/28 Eugeny N Dzhurinsky <[EMAIL PROTECTED]>: >> Hello! >> >> I am facing some strange issue with acknowledge mode in ActiveMQ. After >> reading the specs, I realized the broker will never deliver a message to the >> consumer unless the consumer acknowledges the previous message. So I thought >> it is enough to not send the acknowledgement message to the server in the >> onMessage method of the consumer and sent it later from another thread, which >> does it's job. This means I spawn a thread when onMessage is called and >> forget >> about it in this method. > > > You should only be using a JMS session from one thread at once - so > you should call message.acknowlege() from the consumer thread. > > > >> >> However I've found this is not true. Below is my simple test case - I am >> starting the single consumer and several producers. >> >> =================================================================================================================== >> import javax.jms.Connection; >> >> public class TestAcknowledgeMode { >> >> private static final String DATA = "data"; >> >> private static final String SERVER_QUEUE = "server_queue"; >> >> private static final String CONNECTION_URL = >> "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1"; >> >> private static final int CLIENT_THREADS = 5; >> >> static ActiveMQConnectionFactory factory; >> >> Semaphore semaphore = new Semaphore(CLIENT_THREADS, true); >> >> AtomicInteger successCount = new AtomicInteger(0); >> >> @BeforeClass >> public static void initFactory() throws Exception { >> factory = new ActiveMQConnectionFactory(CONNECTION_URL); >> BasicConfigurator.configure(new ConsoleAppender(new PatternLayout( >> "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n"))); >> Logger.getRootLogger().setLevel(Level.DEBUG); >> } >> >> @Test(timeout = 10000) >> public void testAckDelivery() throws Exception { >> final Connection clientConnection = factory.createConnection(); >> clientConnection.start(); >> final Connection serverConnection = factory.createConnection(); >> serverConnection.start(); >> >> final Session serverSession = serverConnection.createSession(false, >> Session.CLIENT_ACKNOWLEDGE); >> final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE); >> final MessageConsumer consumer = serverSession >> .createConsumer(serverQueue); >> consumer.setMessageListener(new MessageListener() { >> >> public void onMessage(final Message message) { >> System.out.println("Got message"); >> try { >> System.out.println(message.getStringProperty(DATA)); >> // message.acknowledge(); >> successCount.incrementAndGet(); >> } catch (final Exception e) { >> e.printStackTrace(); >> } >> } >> >> }); >> for (int i = 0; i < CLIENT_THREADS; i++) { >> final Thread t = new Thread(new Runnable() { >> >> public void run() { >> try { >> final Session clientSession = clientConnection >> .createSession(false, >> Session.CLIENT_ACKNOWLEDGE); >> final Queue clientQueue = clientSession >> .createQueue(SERVER_QUEUE); >> final MessageProducer producer = clientSession >> .createProducer(clientQueue); >> final Message msg = clientSession.createMessage(); >> msg.setStringProperty(DATA, Thread.currentThread() >> .getName()); >> System.err.println("Sending data " >> + Thread.currentThread().getName()); >> producer.send(msg); >> } catch (final Exception e) { >> e.printStackTrace(); >> } >> } >> }); >> t.start(); >> } >> for (;;) { >> if (successCount.intValue() == CLIENT_THREADS) >> break; >> Thread.sleep(1000); >> } >> Assert.assertEquals(CLIENT_THREADS, successCount.intValue()); >> } >> } >> =================================================================================================================== >> >> I expected the only one message will be processed by the consumer, so the >> rest >> messages will never be delivered to it, but this test shows the messages are >> delivered to the consumer and onMessage method is called exactly >> CLIENT_THREADS times, >> which seems to be wrong. >> >> Does it mean the onMessage method will be executed with no matter of >> acknowledgement is sent back, as soon as previous execution finishes? >> >> And what is the purpose of message.acknowledge() method? > > If you don't want the message broker to send another message to the > consumer until it is acknowledged, set prefetch to 1 > http://activemq.apache.org/what-is-the-prefetch-limit-for.html > > -- > James > ------- > http://macstrac.blogspot.com/ > > Open Source Integration > http://fusesource.com/ > -- >From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified