Alright, I think I’ve figured it out. All it takes is to acknowledge accepted
messages individually, commit ASAP, and finally – rollback all used sessions at
very the end of every poll request. This bit returns all the messages that
couldn’t be handled to the broker at once, and only after that their redelivery
timer sets off.
// A snippet of Groovy script code to try it out:
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ServerLocator
import org.apache.activemq.artemis.utils.RandomUtil
import java.util.concurrent.CountDownLatch
final log = { text ->
println "[${Thread.currentThread().name}] $text"
}
final url = '…'
final user = '…'
final password = '…'
final messageCount = 20
final threadCount = 3
final queue = 'test'
final filter = 'id IS NOT NULL'
final rejectMessageId = RandomUtil.randomInterval(0, messageCount)
log 'Will reject message #' + rejectMessageId
ActiveMQClient.createServerLocator(url + "?consumerWindowSize=0").withCloseable
{ locator ->
locator.createSessionFactory().withCloseable { sf ->
final createSession = {
sf.createSession(user, password, false, false, false, false,
0).start()
}
createSession().withCloseable { session ->
session.createProducer(queue).withCloseable { producer ->
messageCount.times { id ->
final m = session.createMessage(true)
m.writeBodyBufferString('message #' + id)
m.putIntProperty('id', id)
producer.send(m)
}
}
session.commit(true)
}
final latch = new CountDownLatch(threadCount)
threadCount.times {
Thread.start {
log "Started."
createSession().withCloseable { receivingSession ->
final consumer = receivingSession.createConsumer(queue,
filter)
while (true) {
final m = consumer.receive(1000)
if (m == null) {
break
}
final id = m.getIntProperty('id')
if (id == rejectMessageId) {
log "Rejecting message #$id"
} else {
m.individualAcknowledge()
receivingSession.commit(true)
log "Accepted message #$id"
}
Thread.sleep(RandomUtil.randomInterval(10, 1000))
}
latch.countDown()
log 'Waiting for the other threads to finish ...'
latch.await()
receivingSession.rollback()
}
log 'Stopped.'
}
}
latch.await()
log "Now attempting to receive the previously rejected message
#$rejectMessageId ..."
createSession().withCloseable { session ->
session.createConsumer(queue, filter).withCloseable { consumer ->
final m = consumer.receive()
final id = m.getIntProperty('id')
m.individualAcknowledge()
log "Accepted message #$id"
}
session.commit(true)
}
}
}
System.exit(0)
And some logs:
[main] Will reject message #4
[Thread-0] Started.
[Thread-1] Started.
[Thread-2] Started.
[Thread-2] Accepted message #2
[Thread-1] Accepted message #1
[Thread-0] Accepted message #0
[Thread-2] Accepted message #3
[Thread-1] Rejecting message #4
[Thread-0] Accepted message #5
[Thread-1] Accepted message #6
[Thread-2] Accepted message #7
[Thread-0] Accepted message #8
[Thread-1] Accepted message #9
[Thread-0] Accepted message #10
[Thread-0] Accepted message #11
[Thread-2] Accepted message #12
[Thread-1] Accepted message #13
[Thread-0] Accepted message #14
[Thread-2] Accepted message #15
[Thread-1] Accepted message #16
[Thread-0] Accepted message #17
[Thread-2] Accepted message #18
[Thread-1] Accepted message #19
[Thread-0] Waiting for the other threads to finish ...
[Thread-2] Waiting for the other threads to finish ...
[Thread-1] Waiting for the other threads to finish ...
[main] Now attempting to receive the previously rejected message #4 ...
[Thread-2] Stopped.
[Thread-0] Stopped.
[Thread-1] Stopped.
[main] Accepted message #4
Jan
From: Jan Šmucr<mailto:[email protected]>
Sent: čtvrtek 4. července 2024 13:46
To: [email protected]<mailto:[email protected]>
Subject: Using multiple sessions in a single poll request
Hello.
We’re using Artemis to distribute files between various systems. The workflow
is almost always the same:
1. Shell scripts pass files to client applications.
2. These client applications deliver the files to an Artemis broker using
the core protocol.
3. Another core protocol clients poll the broker repeatedly, receive
messages, store them on file systems, run shell script handlers (one instance
per file), and then accept or refuse each message individually based on its
handler return code.
Clients are massively parallelized in order to achieve good performance, so
each client has multiple sessions opened.
Now let’s focus on pollers:
* Each poll request asks for at most N files, and based on that, at most S
sessions are assigned jobs, where each job is meant do handle at most one file
at a time. If N > S, then some sessions are used repeatedly during that request.
* Each session uses exactly one consumer at a time.
* When a file arrives, it is passed to a handler.
* If the handler ends with RC 0, message.individualAcknowledge() and
session.commit() calls are performed.
* Otherwise, session.rollback() is called.
* Handlers can run for tens of seconds, or even minutes before it’s clear
whether to accept or refuse a file.
* When no file arrives, then that session’s work is done.
Unfortunately, handling failure followed by session.rollback() is where things
get complicated. As soon as a file is refused, the broker schedules it for
repeated delivery, and another session working on the very same request can
receive it again. So far, this hasn’t been much of an issue and I’ve been using
a workaround of closing the consumer, instead of proceeding with just another
session.rollback() which would increase the number of unsuccessful deliveries
for that message.
But the time has come, and now I have to implement message grouping support,
which rules out my workaround, because I need to keep my consumers open for as
long as possible.
How can I group my sessions together so that they can work on a single request
in parallel in a way that none of them receive files I’ve already rejected?
Thanks for your ideas.
Jan