2008/10/28 Eugeny N Dzhurinsky <[EMAIL PROTECTED]>: > Hello! > > I am facing some strange issue with acknowledge mode in ActiveMQ. After > reading the specs, I realized the broker will never deliver a message to the > consumer unless the consumer acknowledges the previous message. So I thought > it is enough to not send the acknowledgement message to the server in the > onMessage method of the consumer and sent it later from another thread, which > does it's job. This means I spawn a thread when onMessage is called and forget > about it in this method.
You should only be using a JMS session from one thread at once - so you should call message.acknowlege() from the consumer thread. > > However I've found this is not true. Below is my simple test case - I am > starting the single consumer and several producers. > > =================================================================================================================== > import javax.jms.Connection; > > public class TestAcknowledgeMode { > > private static final String DATA = "data"; > > private static final String SERVER_QUEUE = "server_queue"; > > private static final String CONNECTION_URL = > "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1"; > > private static final int CLIENT_THREADS = 5; > > static ActiveMQConnectionFactory factory; > > Semaphore semaphore = new Semaphore(CLIENT_THREADS, true); > > AtomicInteger successCount = new AtomicInteger(0); > > @BeforeClass > public static void initFactory() throws Exception { > factory = new ActiveMQConnectionFactory(CONNECTION_URL); > BasicConfigurator.configure(new ConsoleAppender(new PatternLayout( > "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n"))); > Logger.getRootLogger().setLevel(Level.DEBUG); > } > > @Test(timeout = 10000) > public void testAckDelivery() throws Exception { > final Connection clientConnection = factory.createConnection(); > clientConnection.start(); > final Connection serverConnection = factory.createConnection(); > serverConnection.start(); > > final Session serverSession = serverConnection.createSession(false, > Session.CLIENT_ACKNOWLEDGE); > final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE); > final MessageConsumer consumer = serverSession > .createConsumer(serverQueue); > consumer.setMessageListener(new MessageListener() { > > public void onMessage(final Message message) { > System.out.println("Got message"); > try { > System.out.println(message.getStringProperty(DATA)); > // message.acknowledge(); > successCount.incrementAndGet(); > } catch (final Exception e) { > e.printStackTrace(); > } > } > > }); > for (int i = 0; i < CLIENT_THREADS; i++) { > final Thread t = new Thread(new Runnable() { > > public void run() { > try { > final Session clientSession = clientConnection > .createSession(false, > Session.CLIENT_ACKNOWLEDGE); > final Queue clientQueue = clientSession > .createQueue(SERVER_QUEUE); > final MessageProducer producer = clientSession > .createProducer(clientQueue); > final Message msg = clientSession.createMessage(); > msg.setStringProperty(DATA, Thread.currentThread() > .getName()); > System.err.println("Sending data " > + Thread.currentThread().getName()); > producer.send(msg); > } catch (final Exception e) { > e.printStackTrace(); > } > } > }); > t.start(); > } > for (;;) { > if (successCount.intValue() == CLIENT_THREADS) > break; > Thread.sleep(1000); > } > Assert.assertEquals(CLIENT_THREADS, successCount.intValue()); > } > } > =================================================================================================================== > > I expected the only one message will be processed by the consumer, so the rest > messages will never be delivered to it, but this test shows the messages are > delivered to the consumer and onMessage method is called exactly > CLIENT_THREADS times, > which seems to be wrong. > > Does it mean the onMessage method will be executed with no matter of > acknowledgement is sent back, as soon as previous execution finishes? > > And what is the purpose of message.acknowledge() method? If you don't want the message broker to send another message to the consumer until it is acknowledged, set prefetch to 1 http://activemq.apache.org/what-is-the-prefetch-limit-for.html -- James ------- http://macstrac.blogspot.com/ Open Source Integration http://fusesource.com/