Arno,

It is a message-queue, the work may never be done as it is always possible that 
new work is added later.
But otherwise, you would need to use a timeout while reading the message.
e.g. if no new message arrives within 5 seconds, then assume the work is done.

Instead of using "setMessageHandler", you can use your own loop and use 
"Message msg = consumer.receive(5000);"
When after 5000 milliseconds no message has been received (so msg=null) just 
exit the loop.

Erwin

-----Oorspronkelijk bericht-----
Van: Arno Schatz <a...@xerai.biz> 
Verzonden: maandag 8 februari 2021 15:45
Aan: users@activemq.apache.org
Onderwerp: How to see


EXTERNAL SENDER:   Do not click any links or open any attachments unless you 
trust the sender and know the content is safe.
EXPÉDITEUR EXTERNE:    Ne cliquez sur aucun lien et n’ouvrez aucune pièce 
jointe à moins qu’ils ne proviennent d’un expéditeur fiable, ou que vous ayez 
l'assurance que le contenu provient d'une source sûre.

Hi,

This is a beginners question. I am having a queue, with a bunch of work orders 
and try to have aynchoun listeners to do the workpackages. In the main thread I 
need to know when the workers are actually done with the work. How do I know 
all messages have been worked on?

Using Artemis 2.1.16, I wrote the little Hello World like below. I am using a 
Semaphore to check if the workers (aka
MessageListeners) are done with their workload. Obviously the semaphoer 
construct has some downsides, is here a build -in way to see if the 
messageListeners all completed (or timed out)?

thanks in advance.

regards,
   Arno




    @Test
    public void mqSampleWorking() {
        try {
            Semaphore p = new Semaphore(0);
            // 
https://urldefense.com/v3/__https://activemq.apache.org/components/artemis/documentation/latest/embedding-activemq.html__;!!AaIhyw!_gWR4dud8jjPxDc1MbLBxHSzWpEXGpnaEgTPdyz0Q7qSbpdKNjaGXKUCDWpcHiuX$
            Configuration config = new ConfigurationImpl();
            config.addAcceptorConfiguration("in-vm", "vm://0");
            config.setPersistenceEnabled(false);
            config.setSecurityEnabled(false);

            EmbeddedActiveMQ server = new EmbeddedActiveMQ();
            server.setConfiguration(config);
            server.start();

            ServerLocator serverLocator = 
ActiveMQClient.createServerLocator("vm://0");

            ClientSessionFactory factory = serverLocator.createSessionFactory();
            ClientSession session = factory.createSession();
            session.createQueue(new QueueConfiguration("example"));

            ClientProducer producer = session.createProducer("example");
            ClientMessage message = session.createMessage(true);
            message.getBodyBuffer().writeString("Hello World");
            producer.send(message);
            System.out.println(session.queueQuery(new 
SimpleString("example")).getMessageCount());

            session.start();
            ClientConsumer consumer = session.createConsumer("example");
            consumer.setMessageHandler(msg -> {
                System.out.println("message = " + 
msg.getBodyBuffer().readString());
                p.release();
            });
            System.out.println("Waiting for message delivery.");
            p.acquire();
            session.close();
            server.stop();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

Reply via email to