Alright, I think I’ve figured it out. All it takes is to acknowledge accepted 
messages individually, commit ASAP, and finally – rollback all used sessions at 
very the end of every poll request. This bit returns all the messages that 
couldn’t be handled to the broker at once, and only after that their redelivery 
timer sets off.


// A snippet of Groovy script code to try it out:

import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ServerLocator
import org.apache.activemq.artemis.utils.RandomUtil

import java.util.concurrent.CountDownLatch

final log = { text ->
    println "[${Thread.currentThread().name}] $text"
}
final url = '…'
final user = '…'
final password = '…'
final messageCount = 20
final threadCount = 3
final queue = 'test'
final filter = 'id IS NOT NULL'

final rejectMessageId = RandomUtil.randomInterval(0, messageCount)
log 'Will reject message #' + rejectMessageId

ActiveMQClient.createServerLocator(url + "?consumerWindowSize=0").withCloseable 
{ locator ->

    locator.createSessionFactory().withCloseable { sf ->

        final createSession = {
            sf.createSession(user, password, false, false, false, false, 
0).start()
        }

        createSession().withCloseable { session ->
            session.createProducer(queue).withCloseable { producer ->
                messageCount.times { id ->
                    final m = session.createMessage(true)
                    m.writeBodyBufferString('message #' + id)
                    m.putIntProperty('id', id)
                    producer.send(m)
                }
            }
            session.commit(true)
        }

        final latch = new CountDownLatch(threadCount)

        threadCount.times {

            Thread.start {

                log "Started."

                createSession().withCloseable { receivingSession ->
                    final consumer = receivingSession.createConsumer(queue, 
filter)
                    while (true) {
                        final m = consumer.receive(1000)
                        if (m == null) {
                            break
                        }
                        final id = m.getIntProperty('id')
                        if (id == rejectMessageId) {
                            log "Rejecting message #$id"
                        } else {
                            m.individualAcknowledge()
                            receivingSession.commit(true)
                            log "Accepted message #$id"
                        }
                        Thread.sleep(RandomUtil.randomInterval(10, 1000))
                    }
                    latch.countDown()
                    log 'Waiting for the other threads to finish ...'
                    latch.await()
                    receivingSession.rollback()
                }

                log 'Stopped.'
            }
        }

        latch.await()

        log "Now attempting to receive the previously rejected message 
#$rejectMessageId ..."

        createSession().withCloseable { session ->
            session.createConsumer(queue, filter).withCloseable { consumer ->
                final m = consumer.receive()
                final id = m.getIntProperty('id')
                m.individualAcknowledge()
                log "Accepted message #$id"
            }
            session.commit(true)
        }
    }
}

System.exit(0)


And some logs:

[main] Will reject message #4
[Thread-0] Started.
[Thread-1] Started.
[Thread-2] Started.
[Thread-2] Accepted message #2
[Thread-1] Accepted message #1
[Thread-0] Accepted message #0
[Thread-2] Accepted message #3
[Thread-1] Rejecting message #4
[Thread-0] Accepted message #5
[Thread-1] Accepted message #6
[Thread-2] Accepted message #7
[Thread-0] Accepted message #8
[Thread-1] Accepted message #9
[Thread-0] Accepted message #10
[Thread-0] Accepted message #11
[Thread-2] Accepted message #12
[Thread-1] Accepted message #13
[Thread-0] Accepted message #14
[Thread-2] Accepted message #15
[Thread-1] Accepted message #16
[Thread-0] Accepted message #17
[Thread-2] Accepted message #18
[Thread-1] Accepted message #19
[Thread-0] Waiting for the other threads to finish ...
[Thread-2] Waiting for the other threads to finish ...
[Thread-1] Waiting for the other threads to finish ...
[main] Now attempting to receive the previously rejected message #4 ...
[Thread-2] Stopped.
[Thread-0] Stopped.
[Thread-1] Stopped.
[main] Accepted message #4


Jan


From: Jan Šmucr<mailto:jan.sm...@aimtecglobal.com>
Sent: čtvrtek 4. července 2024 13:46
To: users@activemq.apache.org<mailto:users@activemq.apache.org>
Subject: Using multiple sessions in a single poll request

Hello.

We’re using Artemis to distribute files between various systems. The workflow 
is almost always the same:

  1.  Shell scripts pass files to client applications.
  2.  These client applications deliver the files to an Artemis broker using 
the core protocol.
  3.  Another core protocol clients poll the broker repeatedly, receive 
messages, store them on file systems, run shell script handlers (one instance 
per file), and then accept or refuse each message individually based on its 
handler return code.
Clients are massively parallelized in order to achieve good performance, so 
each client has multiple sessions opened.

Now let’s focus on pollers:

  *   Each poll request asks for at most N files, and based on that, at most S 
sessions are assigned jobs, where each job is meant do handle at most one file 
at a time. If N > S, then some sessions are used repeatedly during that request.
  *   Each session uses exactly one consumer at a time.
  *   When a file arrives, it is passed to a handler.
     *   If the handler ends with RC 0, message.individualAcknowledge() and 
session.commit() calls are performed.
     *   Otherwise, session.rollback() is called.
  *   Handlers can run for tens of seconds, or even minutes before it’s clear 
whether to accept or refuse a file.
  *   When no file arrives, then that session’s work is done.

Unfortunately, handling failure followed by session.rollback() is where things 
get complicated. As soon as a file is refused, the broker schedules it for 
repeated delivery, and another session working on the very same request can 
receive it again. So far, this hasn’t been much of an issue and I’ve been using 
a workaround of closing the consumer, instead of proceeding with just another 
session.rollback() which would increase the number of unsuccessful deliveries 
for that message.

But the time has come, and now I have to implement message grouping support, 
which rules out my workaround, because I need to keep my consumers open for as 
long as possible.

How can I group my sessions together so that they can work on a single request 
in parallel in a way that none of them receive files I’ve already rejected?

Thanks for your ideas.
Jan

Reply via email to