Greetings!
I have recently started enumerating messages in a queue from time to time,
because we’re trying to figure out if a unit of work is still pending. But I’m
getting this error occasionally:
2023-07-27T21:38:30.007 [CanvasState-Abandoned]
RpdmQueueUtils.browseStartMessages:159 [] ERROR - Error enumerating messages
for pod
{"message_type":"pod_specifier","pod_type":"project_execution","memory_mb":2048,"jvm_memory_mb":250}
java.lang.RuntimeException: AMQ219023: The large message lost connection with
its session, either because of a rollback or a closed session
at
org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.getBodyBuffer(ClientLargeMessageImpl.java:91)
at
org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage.readBytes(ActiveMQBytesMessage.java:217)
at
net.redpoint.rpdm.services.RpdmQueueUtils.browseStartMessages(RpdmQueueUtils.java:155)
at
net.redpoint.rpdm.services.RpdmQueueUtils.getAllQueuedCanvasIds(RpdmQueueUtils.java:76)
at
net.redpoint.rpdm.canvas_state_query.CanvasStateQueryServerImpl.doCanvasAbandoned(CanvasStateQueryServerImpl.java:282)
at
net.redpoint.rpdm.canvas_state_query.CanvasStateQueryServerImpl$Abandoned.action(CanvasStateQueryServerImpl.java:265)
at
net.redpoint.rpdm.services.LazyStartPeriodicThread.run(LazyStartPeriodicThread.java:91)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: ActiveMQIllegalStateException[errorType=ILLEGAL_STATE
message=AMQ219023: The large message lost connection with its session, either
because of a rollback or a closed session]
at
org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:265)
at
org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.checkBuffer(ClientLargeMessageImpl.java:157)
at
org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.getBodyBuffer(ClientLargeMessageImpl.java:89)
This happens in two parts using JMS. The first enumerates the queue using
browseMessages() to collect a List<Message>, and the second processes the
Messages, reading their bodies and examining them. The problem occurs on this
line in browseStartMessages when we read the body bytes
bm.readBytes(requestBytes);
Despite the error text, the session isn’t closed, or at least this error
doesn’t always happen despite my not doing anything to re-create the session.
I suspect that, given the ClientLargeMessageImpl.getBodyBuffer on the stack,
something is going wrong that is specific to “large” messages. IIRC the large
messages use a coat-check pattern to store the message on disk and bypass the
in-memory queue.
Can you suggest a solution? I’ve though of two potential changes:
- Raise minLargeMessageSize to something big enough that our messages are never
“large”
- Read the body bytes immediately, while enumerating the queue, instead of
later when looping over the List<Message>
I was also trying to see if there is a way to attach “metadata” to a message
and only read that? These messages are large, and presumably it is expensive
to scan them, and I’m really only looking for a UUID embedded in all of that.
JDK: Temurin 17
AMQ broker version: 2.28.0
AMQ JMS client dependency:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client-all</artifactId>
<version>2.28.0</version>
</dependency>
Thanks
John
public List<Message> browseMessages(String queueName) {
// This helps avoid a known issue with queue message enumeration (in ActiveMQ
"classic"), where after getting the final message
// it hangs for a significant time, up to 20 seconds, before the Enumeration
returns false.
int count = getQueueEntryCount(queueName);
List<Message> result = new ArrayList<>();
synchronized (lock) {
try {
Queue queue = getSession().createQueue(queueName);
try (QueueBrowser browser = getSession().createBrowser(queue)) {
Enumeration e = browser.getEnumeration();
while (count-- > 0 && e.hasMoreElements()) {
try {
result.add((Message) e.nextElement());
} catch (Exception ex) {
LOG.warn("Error browsing queue '" + queueName + "'", ex);
}
}
}
} catch (Exception ex) {
LOG.warn("Error browsing queue '" + queueName + "'", ex);
}
}
return result;
}
The second processes the Messages:
public List<RpcRequestPacket> browseStartMessages(PodSpecifier podSpecifier) {
List<RpcRequestPacket> result = new ArrayList<>();
for (var message : browseMessages(getStartQueueName(podSpecifier))) {
var bm = (BytesMessage)message;
try {
var requestBytes = new byte[(int) bm.getBodyLength()];
bm.readBytes(requestBytes);
result.add(new RpcRequestPacket(requestBytes, new
JmsRpcMessageMetadata(message.getJMSMessageID(), "")));
}
catch (Exception ex) {
LOG.error("Error enumerating messages for pod " + podSpecifier, ex);
}
}
return result;
}
[rg] <https://www.redpointglobal.com/>
John Lilley
Data Management Chief Architect, Redpoint Global Inc.
888 Worcester Street, Suite 200 Wellesley, MA 02482
M: +1 7209385761<tel:+1%207209385761> |
[email protected]<mailto:[email protected]>
PLEASE NOTE: This e-mail from Redpoint Global Inc. (“Redpoint”) is confidential
and is intended solely for the use of the individual(s) to whom it is
addressed. If you believe you received this e-mail in error, please notify the
sender immediately, delete the e-mail from your computer and do not copy, print
or disclose it to anyone else. If you properly received this e-mail as a
customer, partner or vendor of Redpoint, you should maintain its contents in
confidence subject to the terms and conditions of your agreement(s) with
Redpoint.