I agree with Erwin. It seems like you'd be better served using the
synchronous
org.apache.activemq.artemis.api.core.client.ClientConsumer#receive(long)
[1] rather than using an asynchronous MessageHandler. If the call to
receive(long) returns null then you know the consumer did not receive a
message within the allotted timeout.


Justin

[1]
http://activemq.apache.org/components/artemis/documentation/javadocs/javadoc-latest/org/apache/activemq/artemis/api/core/client/ClientConsumer.html#receive-long-

On Mon, Feb 8, 2021 at 8:45 AM Arno Schatz <a...@xerai.biz> wrote:

> Hi,
>
> This is a beginners question. I am having a queue, with a bunch of work
> orders and try to have aynchoun listeners to do
> the workpackages. In the main thread I need to know when the workers are
> actually done with the work. How do I know all
> messages have been worked on?
>
> Using Artemis 2.1.16, I wrote the little Hello World like below. I am
> using a Semaphore to check if the workers (aka
> MessageListeners) are done with their workload. Obviously the semaphoer
> construct has some downsides, is here a build
> -in way to see if the messageListeners all completed (or timed out)?
>
> thanks in advance.
>
> regards,
>    Arno
>
>
>
>
>     @Test
>     public void mqSampleWorking() {
>         try {
>             Semaphore p = new Semaphore(0);
>             //
> https://activemq.apache.org/components/artemis/documentation/latest/embedding-activemq.html
>             Configuration config = new ConfigurationImpl();
>             config.addAcceptorConfiguration("in-vm", "vm://0");
>             config.setPersistenceEnabled(false);
>             config.setSecurityEnabled(false);
>
>             EmbeddedActiveMQ server = new EmbeddedActiveMQ();
>             server.setConfiguration(config);
>             server.start();
>
>             ServerLocator serverLocator =
> ActiveMQClient.createServerLocator("vm://0");
>
>             ClientSessionFactory factory =
> serverLocator.createSessionFactory();
>             ClientSession session = factory.createSession();
>             session.createQueue(new QueueConfiguration("example"));
>
>             ClientProducer producer = session.createProducer("example");
>             ClientMessage message = session.createMessage(true);
>             message.getBodyBuffer().writeString("Hello World");
>             producer.send(message);
>             System.out.println(session.queueQuery(new
> SimpleString("example")).getMessageCount());
>
>             session.start();
>             ClientConsumer consumer = session.createConsumer("example");
>             consumer.setMessageHandler(msg -> {
>                 System.out.println("message = " +
> msg.getBodyBuffer().readString());
>                 p.release();
>             });
>             System.out.println("Waiting for message delivery.");
>             p.acquire();
>             session.close();
>             server.stop();
>         }
>         catch (Exception e) {
>             throw new RuntimeException(e);
>         }
>     }
>
>

Reply via email to