I agree with Erwin. It seems like you'd be better served using the synchronous org.apache.activemq.artemis.api.core.client.ClientConsumer#receive(long) [1] rather than using an asynchronous MessageHandler. If the call to receive(long) returns null then you know the consumer did not receive a message within the allotted timeout.
Justin [1] http://activemq.apache.org/components/artemis/documentation/javadocs/javadoc-latest/org/apache/activemq/artemis/api/core/client/ClientConsumer.html#receive-long- On Mon, Feb 8, 2021 at 8:45 AM Arno Schatz <a...@xerai.biz> wrote: > Hi, > > This is a beginners question. I am having a queue, with a bunch of work > orders and try to have aynchoun listeners to do > the workpackages. In the main thread I need to know when the workers are > actually done with the work. How do I know all > messages have been worked on? > > Using Artemis 2.1.16, I wrote the little Hello World like below. I am > using a Semaphore to check if the workers (aka > MessageListeners) are done with their workload. Obviously the semaphoer > construct has some downsides, is here a build > -in way to see if the messageListeners all completed (or timed out)? > > thanks in advance. > > regards, > Arno > > > > > @Test > public void mqSampleWorking() { > try { > Semaphore p = new Semaphore(0); > // > https://activemq.apache.org/components/artemis/documentation/latest/embedding-activemq.html > Configuration config = new ConfigurationImpl(); > config.addAcceptorConfiguration("in-vm", "vm://0"); > config.setPersistenceEnabled(false); > config.setSecurityEnabled(false); > > EmbeddedActiveMQ server = new EmbeddedActiveMQ(); > server.setConfiguration(config); > server.start(); > > ServerLocator serverLocator = > ActiveMQClient.createServerLocator("vm://0"); > > ClientSessionFactory factory = > serverLocator.createSessionFactory(); > ClientSession session = factory.createSession(); > session.createQueue(new QueueConfiguration("example")); > > ClientProducer producer = session.createProducer("example"); > ClientMessage message = session.createMessage(true); > message.getBodyBuffer().writeString("Hello World"); > producer.send(message); > System.out.println(session.queueQuery(new > SimpleString("example")).getMessageCount()); > > session.start(); > ClientConsumer consumer = session.createConsumer("example"); > consumer.setMessageHandler(msg -> { > System.out.println("message = " + > msg.getBodyBuffer().readString()); > p.release(); > }); > System.out.println("Waiting for message delivery."); > p.acquire(); > session.close(); > server.stop(); > } > catch (Exception e) { > throw new RuntimeException(e); > } > } > >