this is something done on the core API. I can't pinpoint the exact reason, but the JMS facade works fine with a similar sender and producer:
https://gist.github.com/clebertsuconic/03fe7206914d8753e9bd966f805a0257 I created a branch with a test comparing Core and JMS API on my fork: https://github.com/clebertsuconic/activemq-artemis/tree/withCore I'm pasting the test here for future reference if the gist is ever gone (or the branch is gone): /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * <p> * http://www.apache.org/licenses/LICENSE-2.0 * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.activemq.artemis.tests.integration.paging; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.Wait; import org.checkerframework.checker.units.qual.A; import org.junit.Assert; import org.junit.Before; import org.junit.Test; public class Main extends ActiveMQTestBase { private final static String QUEUE = "service.images.dev::service.images.dev"; ActiveMQServer server; @Before @Override public void setUp() throws Exception { super.setUp(); Configuration config = createDefaultConfig(0, true).setJournalSyncNonTransactional(false); config.setMessageExpiryScanPeriod(-1); server = createServer(true, config, 100 * 1024 * 1024, 10 * 1024 * 1024); server.getAddressSettingsRepository().clear(); AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(100 * 1024 * 1024).setMaxSizeBytes(10 * 1024 * 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false); server.getAddressSettingsRepository().addMatch("#", defaultSetting); server.start(); server.addAddressInfo(new AddressInfo(QUEUE).addRoutingType(RoutingType.ANYCAST)); server.createQueue(new QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST)); } @Test public void testSending() throws Exception { final String username = null; final String password = null; var serverLocator = ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(true).setBlockOnNonDurableSend(true).setMinLargeMessageSize(1024); final var sessionFactory = serverLocator.createSessionFactory(); final var xa = false; final var autoCommitSends = true; final var autoCommitAcks = true; final var ackBatchSize = serverLocator.getAckBatchSize(); final var preAcknowledge = serverLocator.isPreAcknowledge(); final var clientSession = sessionFactory.createSession(username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, ackBatchSize); var queueQueryResult = clientSession.queueQuery(SimpleString.toSimpleString(QUEUE)); if (!queueQueryResult.isExists()) { clientSession.createQueue(_ServiceQueueConfiguration(new SimpleString(QUEUE))); } final var consumer = clientSession.createConsumer(QUEUE); clientSession.start(); AtomicInteger errors = new AtomicInteger(0); AtomicInteger received = new AtomicInteger(0); consumer.setMessageHandler((msg) -> { try { msg.getDataBuffer(); received.incrementAndGet(); } catch (Throwable e) { e.printStackTrace(); errors.incrementAndGet(); } }); try (ClientSession producerSession = sessionFactory.createSession()) { ClientProducer producer = producerSession.createProducer(QUEUE); for (int i = 0; i < 100; i++) { ClientMessage message = producerSession.createMessage(true); message.getBodyBuffer().writeBytes(new byte[1024 * 1024]); producer.send(message); } } Wait.assertEquals(100, received::get); Assert.assertEquals(0, errors.get()); } @Test public void testWithJMSListener() throws Exception { final String username = null; final String password = null; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setMinLargeMessageSize(1024); Connection connection = factory.createConnection(); Session sessionProducer = connection.createSession(true, Session.SESSION_TRANSACTED); Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue jmsQueue = sessionProducer.createQueue(QUEUE); MessageConsumer consumer = sessionConsumer.createConsumer(jmsQueue); connection.start(); AtomicInteger errors = new AtomicInteger(0); AtomicInteger received = new AtomicInteger(0); consumer.setMessageListener((msg) -> { try { System.out.println("Received: " + ((TextMessage)msg).getText().length()); received.incrementAndGet(); } catch (Throwable e) { e.printStackTrace(); errors.incrementAndGet(); } }); MessageProducer producer = sessionProducer.createProducer(jmsQueue); StringBuffer buffer = new StringBuffer(); while (buffer.length() < 100 * 1024) { buffer.append("*****"); } for (int i = 0; i < 100; i++) { TextMessage message = sessionProducer.createTextMessage(buffer.toString()); producer.send(message); } sessionProducer.commit(); Wait.assertEquals(100, received::get); Assert.assertEquals(0, errors.get()); } private static QueueConfiguration _ServiceQueueConfiguration(SimpleString queueName) { final var config = new QueueConfiguration(queueName); config.setMaxConsumers(1); config.setPurgeOnNoConsumers(false); config.setDurable(false); config.setAutoDelete(false); config.setRoutingType(RoutingType.MULTICAST); return config; } } On Mon, Feb 21, 2022 at 2:00 PM Justin Bertram <jbert...@apache.org> wrote: > > No. The test was super-simple. Just send one large message and then consume > it. > > > Justin > > On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic <clebert.suco...@gmail.com> > wrote: > > > It did not involved paging ? > > > > On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <jbert...@apache.org> > > wrote: > > > > > I recreated this exception with a very simple test-case. I took the > > > consumer code pasted earlier in the thread and just added a producer > > > sending a large message. I lowered the minLargeMessageSize to make it > > > faster. I thought I still had that code laying around somewhere, but I > > > can't find it at the moment. > > > > > > > > > Justin > > > > > > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic < > > clebert.suco...@gmail.com > > > > > > > wrote: > > > > > > > I have seen (and fixed) cases where the large message file is gone. I > > > > would need a reproducer creating the issue from scratch (send and > > > consume) > > > > > > > > Typically it could be associated with paging ? Did you have the > > > > destination in page mode ? > > > > > > > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <t...@abcwxy.com> wrote: > > > > > > > > > The client code below the stack trace - will reproduce it - the > > > > > msg.getDataBuffer() inside the handler of that code will trigger it > > > when > > > > a > > > > > large message is sent to the address. The exception in question > > > > > (IndexOutOfBoundsException) is being caught in line 249 of the apache > > > > > CoreMessage code... and turned into a logged warning on line 250. > > The > > > > > CoreMessage code then returns the buffer from getReadOnlyBuffer() > > > (which > > > > > appears to be fine from a quick survey - data also seems to be > > > preserved > > > > > and accessible from client code) - it is just not clear what the > > intent > > > > is > > > > > if the exception code is executed... is it a "don't worry about it" - > > > or > > > > a > > > > > "something is wrong here" and I should concern myself with something? > > > > (the > > > > > warning level is making me believe that it is more than a "don't > > worry > > > > > about it" - but the fact that the exception was caught and a valid > > > buffer > > > > > is returned makes me think it is just a fallback for the other > > choices > > > of > > > > > buffers - and I should not worry about it?). > > > > > > > > > > Thanks for any insight you may have.... > > > > > > > > > > logged warning from the caught exception is: > > > > > > > > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage - > > > > readerIndex(4) > > > > > + length(270740) exceeds writerIndex(4): > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > 4, > > > > > widx: 4, cap: 270740) > > > > > java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740) > > > > > exceeds writerIndex(4): > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > 4, > > > > > widx: 4, cap: 270740) > > > > > at > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) > > > > > at > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) > > > > > at > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937) > > > > > at > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407) > > > > > at > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264) > > > > > at > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241) > > > > > at io.m45.sart.Main.lambda$main$0(Main.java:57) > > > > > at > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013) > > > > > at > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133) > > > > > at > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) > > > > > at > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) > > > > > at > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65) > > > > > at > > > > > > > > > > > > > > > > > > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > > > > at > > > > > > > > > > > > > > > > > > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > > > > at > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) > > > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl - > > > Sending > > > > > 65962 from flow-control > > > > > > > > > > Simple Client Code below: > > > > > > > > > > > > > > > public class Main { > > > > > > > > > > private final static String ACCEPTOR = "tcp://localhost:9322"; > > > > > private final static String QUEUE="service.images.dev:: > > > > > service.images.dev"; > > > > > > > > > > public static void main(String[] args) throws Exception { > > > > > > > > > > final String username = null; > > > > > final String password = null; > > > > > > > > > > var serverLocator = ActiveMQClient > > > > > .createServerLocator(ACCEPTOR) > > > > > .setBlockOnDurableSend(true) > > > > > .setBlockOnNonDurableSend(true); > > > > > > > > > > final var sessionFactory = > > > serverLocator.createSessionFactory(); > > > > > > > > > > final var xa = false; > > > > > final var autoCommitSends = true; > > > > > final var autoCommitAcks = true; > > > > > final var ackBatchSize = serverLocator.getAckBatchSize(); > > > > > final var preAcknowledge = serverLocator.isPreAcknowledge(); > > > > > final var clientSession = sessionFactory.createSession( > > > > > username, > > > > > password, > > > > > xa, > > > > > autoCommitSends, > > > > > autoCommitAcks, > > > > > preAcknowledge, > > > > > ackBatchSize > > > > > ); > > > > > > > > > > var queueQueryResult = > > > > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE)); > > > > > if (!queueQueryResult.isExists()) { > > > > > clientSession.createQueue(_ServiceQueueConfiguration(new > > > > > SimpleString(QUEUE))); > > > > > } > > > > > > > > > > final var consumer = clientSession.createConsumer(QUEUE); > > > > > > > > > > clientSession.start(); > > > > > > > > > > consumer.setMessageHandler((msg) -> { > > > > > > > > > > System.out.println("Received: "+msg.getBodySize()); > > > > > msg.getDataBuffer(); > > > > > > > > > > }); > > > > > > > > > > while(true) { > > > > > Thread.sleep(1000); > > > > > } > > > > > > > > > > } > > > > > > > > > > private static QueueConfiguration > > > > > _ServiceQueueConfiguration(SimpleString queueName) { > > > > > final var config = new QueueConfiguration(queueName); > > > > > config.setMaxConsumers(1); > > > > > config.setPurgeOnNoConsumers(false); > > > > > config.setDurable(false); > > > > > config.setAutoDelete(false); > > > > > config.setRoutingType(RoutingType.MULTICAST); > > > > > return config; > > > > > } > > > > > > > > > > > > > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jbert...@apache.org> > > > > > wrote: > > > > > > > > > > > Typically an IndexOutOfBoundsException indicates a bug. Do you > > have a > > > > way > > > > > > to reproduce this? > > > > > > > > > > > > > > > > > > Justin > > > > > > > > > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <t...@abcwxy.com> wrote: > > > > > > > > > > > > > This seems to appear on larger messages only - I am getting a > > > warning > > > > > > when > > > > > > > calling getDataBuffer (2.20.0 Artemis Client). Curious if there > > is > > > > > > > something I may be missing - or if this is completely ignorable? > > > > > Thanks - > > > > > > > Tim > > > > > > > > > > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2 > > > > > (ActiveMQ-client-global-threads)] > > > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2 > > > > > > (ActiveMQ-client-global-threads)] > > > > > > > readerIndex(270740) + length(270740) exceeds writerIndex(271572): > > > > > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > > > > > 270740, widx: 271572, cap: 271572) > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) + > > > > > length(270740) > > > > > > > exceeds writerIndex(271572): > > > > > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > > > > > 270740, widx: 271572, cap: 271572) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) > > > > > > > at > > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241) > > > > > > > > > > > > > > > > > > > > > > -- > > > > Clebert Suconic > > > > > > > > > -- > > Clebert Suconic > > -- Clebert Suconic