Alright, I think I’ve figured it out. All it takes is to acknowledge accepted messages individually, commit ASAP, and finally – rollback all used sessions at very the end of every poll request. This bit returns all the messages that couldn’t be handled to the broker at once, and only after that their redelivery timer sets off.
// A snippet of Groovy script code to try it out: import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ServerLocator import org.apache.activemq.artemis.utils.RandomUtil import java.util.concurrent.CountDownLatch final log = { text -> println "[${Thread.currentThread().name}] $text" } final url = '…' final user = '…' final password = '…' final messageCount = 20 final threadCount = 3 final queue = 'test' final filter = 'id IS NOT NULL' final rejectMessageId = RandomUtil.randomInterval(0, messageCount) log 'Will reject message #' + rejectMessageId ActiveMQClient.createServerLocator(url + "?consumerWindowSize=0").withCloseable { locator -> locator.createSessionFactory().withCloseable { sf -> final createSession = { sf.createSession(user, password, false, false, false, false, 0).start() } createSession().withCloseable { session -> session.createProducer(queue).withCloseable { producer -> messageCount.times { id -> final m = session.createMessage(true) m.writeBodyBufferString('message #' + id) m.putIntProperty('id', id) producer.send(m) } } session.commit(true) } final latch = new CountDownLatch(threadCount) threadCount.times { Thread.start { log "Started." createSession().withCloseable { receivingSession -> final consumer = receivingSession.createConsumer(queue, filter) while (true) { final m = consumer.receive(1000) if (m == null) { break } final id = m.getIntProperty('id') if (id == rejectMessageId) { log "Rejecting message #$id" } else { m.individualAcknowledge() receivingSession.commit(true) log "Accepted message #$id" } Thread.sleep(RandomUtil.randomInterval(10, 1000)) } latch.countDown() log 'Waiting for the other threads to finish ...' latch.await() receivingSession.rollback() } log 'Stopped.' } } latch.await() log "Now attempting to receive the previously rejected message #$rejectMessageId ..." createSession().withCloseable { session -> session.createConsumer(queue, filter).withCloseable { consumer -> final m = consumer.receive() final id = m.getIntProperty('id') m.individualAcknowledge() log "Accepted message #$id" } session.commit(true) } } } System.exit(0) And some logs: [main] Will reject message #4 [Thread-0] Started. [Thread-1] Started. [Thread-2] Started. [Thread-2] Accepted message #2 [Thread-1] Accepted message #1 [Thread-0] Accepted message #0 [Thread-2] Accepted message #3 [Thread-1] Rejecting message #4 [Thread-0] Accepted message #5 [Thread-1] Accepted message #6 [Thread-2] Accepted message #7 [Thread-0] Accepted message #8 [Thread-1] Accepted message #9 [Thread-0] Accepted message #10 [Thread-0] Accepted message #11 [Thread-2] Accepted message #12 [Thread-1] Accepted message #13 [Thread-0] Accepted message #14 [Thread-2] Accepted message #15 [Thread-1] Accepted message #16 [Thread-0] Accepted message #17 [Thread-2] Accepted message #18 [Thread-1] Accepted message #19 [Thread-0] Waiting for the other threads to finish ... [Thread-2] Waiting for the other threads to finish ... [Thread-1] Waiting for the other threads to finish ... [main] Now attempting to receive the previously rejected message #4 ... [Thread-2] Stopped. [Thread-0] Stopped. [Thread-1] Stopped. [main] Accepted message #4 Jan From: Jan Šmucr<mailto:jan.sm...@aimtecglobal.com> Sent: čtvrtek 4. července 2024 13:46 To: users@activemq.apache.org<mailto:users@activemq.apache.org> Subject: Using multiple sessions in a single poll request Hello. We’re using Artemis to distribute files between various systems. The workflow is almost always the same: 1. Shell scripts pass files to client applications. 2. These client applications deliver the files to an Artemis broker using the core protocol. 3. Another core protocol clients poll the broker repeatedly, receive messages, store them on file systems, run shell script handlers (one instance per file), and then accept or refuse each message individually based on its handler return code. Clients are massively parallelized in order to achieve good performance, so each client has multiple sessions opened. Now let’s focus on pollers: * Each poll request asks for at most N files, and based on that, at most S sessions are assigned jobs, where each job is meant do handle at most one file at a time. If N > S, then some sessions are used repeatedly during that request. * Each session uses exactly one consumer at a time. * When a file arrives, it is passed to a handler. * If the handler ends with RC 0, message.individualAcknowledge() and session.commit() calls are performed. * Otherwise, session.rollback() is called. * Handlers can run for tens of seconds, or even minutes before it’s clear whether to accept or refuse a file. * When no file arrives, then that session’s work is done. Unfortunately, handling failure followed by session.rollback() is where things get complicated. As soon as a file is refused, the broker schedules it for repeated delivery, and another session working on the very same request can receive it again. So far, this hasn’t been much of an issue and I’ve been using a workaround of closing the consumer, instead of proceeding with just another session.rollback() which would increase the number of unsuccessful deliveries for that message. But the time has come, and now I have to implement message grouping support, which rules out my workaround, because I need to keep my consumers open for as long as possible. How can I group my sessions together so that they can work on a single request in parallel in a way that none of them receive files I’ve already rejected? Thanks for your ideas. Jan