or use the JMS API instead.. that's another possibility. but if you want to use the core API, use getbodyBuffer instead.
On Tue, Feb 22, 2022 at 1:17 PM Clebert Suconic <clebert.suco...@gmail.com> wrote: > > Found the reason: > > > You should use getBoddyBuffer instead. > > > getbodybuffer() will perform a call to checkBuffer() before returning > you the large message. > > > This should fix your test / problem. > > On Tue, Feb 22, 2022 at 1:03 PM Clebert Suconic > <clebert.suco...@gmail.com> wrote: > > > > this is something done on the core API. > > > > I can't pinpoint the exact reason, but the JMS facade works fine with > > a similar sender and producer: > > > > https://gist.github.com/clebertsuconic/03fe7206914d8753e9bd966f805a0257 > > > > > > > > I created a branch with a test comparing Core and JMS API on my fork: > > > > https://github.com/clebertsuconic/activemq-artemis/tree/withCore > > > > > > > > I'm pasting the test here for future reference if the gist is ever > > gone (or the branch is gone): > > > > > > /** > > * Licensed to the Apache Software Foundation (ASF) under one or more > > * contributor license agreements. See the NOTICE file distributed with > > * this work for additional information regarding copyright ownership. > > * The ASF licenses this file to You under the Apache License, Version 2.0 > > * (the "License"); you may not use this file except in compliance with > > * the License. You may obtain a copy of the License at > > * <p> > > * http://www.apache.org/licenses/LICENSE-2.0 > > * <p> > > * Unless required by applicable law or agreed to in writing, software > > * distributed under the License is distributed on an "AS IS" BASIS, > > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > > * See the License for the specific language governing permissions and > > * limitations under the License. > > */ > > > > package org.apache.activemq.artemis.tests.integration.paging; > > > > import javax.jms.Connection; > > import javax.jms.ConnectionFactory; > > import javax.jms.MessageConsumer; > > import javax.jms.MessageProducer; > > import javax.jms.Queue; > > import javax.jms.Session; > > import javax.jms.TextMessage; > > import java.util.concurrent.atomic.AtomicInteger; > > > > import org.apache.activemq.artemis.api.core.QueueConfiguration; > > import org.apache.activemq.artemis.api.core.RoutingType; > > import org.apache.activemq.artemis.api.core.SimpleString; > > import org.apache.activemq.artemis.api.core.client.ActiveMQClient; > > import org.apache.activemq.artemis.api.core.client.ClientMessage; > > import org.apache.activemq.artemis.api.core.client.ClientProducer; > > import org.apache.activemq.artemis.api.core.client.ClientSession; > > import org.apache.activemq.artemis.core.config.Configuration; > > import org.apache.activemq.artemis.core.server.ActiveMQServer; > > import org.apache.activemq.artemis.core.server.impl.AddressInfo; > > import > > org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; > > import org.apache.activemq.artemis.core.settings.impl.AddressSettings; > > import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; > > import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; > > import org.apache.activemq.artemis.tests.util.Wait; > > import org.checkerframework.checker.units.qual.A; > > import org.junit.Assert; > > import org.junit.Before; > > import org.junit.Test; > > > > public class Main extends ActiveMQTestBase { > > > > private final static String QUEUE = > > "service.images.dev::service.images.dev"; > > > > ActiveMQServer server; > > > > > > @Before > > @Override > > public void setUp() throws Exception { > > super.setUp(); > > > > Configuration config = createDefaultConfig(0, > > true).setJournalSyncNonTransactional(false); > > > > config.setMessageExpiryScanPeriod(-1); > > server = createServer(true, config, 100 * 1024 * 1024, 10 * 1024 * > > 1024); > > > > server.getAddressSettingsRepository().clear(); > > > > AddressSettings defaultSetting = new > > AddressSettings().setPageSizeBytes(100 * 1024 * > > 1024).setMaxSizeBytes(10 * 1024 * > > 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false); > > > > server.getAddressSettingsRepository().addMatch("#", defaultSetting); > > > > > > server.start(); > > > > > > server.addAddressInfo(new > > AddressInfo(QUEUE).addRoutingType(RoutingType.ANYCAST)); > > server.createQueue(new > > QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST)); > > > > } > > > > @Test > > public void testSending() throws Exception { > > > > final String username = null; > > final String password = null; > > > > var serverLocator = > > ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(true).setBlockOnNonDurableSend(true).setMinLargeMessageSize(1024); > > > > final var sessionFactory = serverLocator.createSessionFactory(); > > > > final var xa = false; > > final var autoCommitSends = true; > > final var autoCommitAcks = true; > > final var ackBatchSize = serverLocator.getAckBatchSize(); > > final var preAcknowledge = serverLocator.isPreAcknowledge(); > > final var clientSession = sessionFactory.createSession(username, > > password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, > > ackBatchSize); > > > > var queueQueryResult = > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE)); > > if (!queueQueryResult.isExists()) { > > clientSession.createQueue(_ServiceQueueConfiguration(new > > SimpleString(QUEUE))); > > } > > > > final var consumer = clientSession.createConsumer(QUEUE); > > > > clientSession.start(); > > > > AtomicInteger errors = new AtomicInteger(0); > > AtomicInteger received = new AtomicInteger(0); > > > > consumer.setMessageHandler((msg) -> { > > try { > > msg.getDataBuffer(); > > received.incrementAndGet(); > > } catch (Throwable e) { > > e.printStackTrace(); > > errors.incrementAndGet(); > > } > > }); > > > > > > try (ClientSession producerSession = sessionFactory.createSession()) { > > ClientProducer producer = producerSession.createProducer(QUEUE); > > for (int i = 0; i < 100; i++) { > > ClientMessage message = producerSession.createMessage(true); > > message.getBodyBuffer().writeBytes(new byte[1024 * 1024]); > > producer.send(message); > > } > > } > > > > Wait.assertEquals(100, received::get); > > Assert.assertEquals(0, errors.get()); > > } > > > > @Test > > public void testWithJMSListener() throws Exception { > > > > final String username = null; > > final String password = null; > > > > ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); > > factory.setMinLargeMessageSize(1024); > > Connection connection = factory.createConnection(); > > Session sessionProducer = connection.createSession(true, > > Session.SESSION_TRANSACTED); > > > > Session sessionConsumer = connection.createSession(false, > > Session.AUTO_ACKNOWLEDGE); > > > > Queue jmsQueue = sessionProducer.createQueue(QUEUE); > > > > MessageConsumer consumer = sessionConsumer.createConsumer(jmsQueue); > > > > > > connection.start(); > > > > AtomicInteger errors = new AtomicInteger(0); > > AtomicInteger received = new AtomicInteger(0); > > > > consumer.setMessageListener((msg) -> { > > try { > > System.out.println("Received: " + > > ((TextMessage)msg).getText().length()); > > received.incrementAndGet(); > > } catch (Throwable e) { > > e.printStackTrace(); > > errors.incrementAndGet(); > > } > > > > }); > > > > MessageProducer producer = sessionProducer.createProducer(jmsQueue); > > > > StringBuffer buffer = new StringBuffer(); > > while (buffer.length() < 100 * 1024) { > > buffer.append("*****"); > > } > > > > for (int i = 0; i < 100; i++) { > > TextMessage message = > > sessionProducer.createTextMessage(buffer.toString()); > > producer.send(message); > > } > > sessionProducer.commit(); > > > > Wait.assertEquals(100, received::get); > > Assert.assertEquals(0, errors.get()); > > } > > > > private static QueueConfiguration > > _ServiceQueueConfiguration(SimpleString queueName) { > > final var config = new QueueConfiguration(queueName); > > config.setMaxConsumers(1); > > config.setPurgeOnNoConsumers(false); > > config.setDurable(false); > > config.setAutoDelete(false); > > config.setRoutingType(RoutingType.MULTICAST); > > return config; > > } > > } > > > > On Mon, Feb 21, 2022 at 2:00 PM Justin Bertram <jbert...@apache.org> wrote: > > > > > > No. The test was super-simple. Just send one large message and then > > > consume > > > it. > > > > > > > > > Justin > > > > > > On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic > > > <clebert.suco...@gmail.com> > > > wrote: > > > > > > > It did not involved paging ? > > > > > > > > On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <jbert...@apache.org> > > > > wrote: > > > > > > > > > I recreated this exception with a very simple test-case. I took the > > > > > consumer code pasted earlier in the thread and just added a producer > > > > > sending a large message. I lowered the minLargeMessageSize to make it > > > > > faster. I thought I still had that code laying around somewhere, but I > > > > > can't find it at the moment. > > > > > > > > > > > > > > > Justin > > > > > > > > > > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic < > > > > clebert.suco...@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > I have seen (and fixed) cases where the large message file is gone. > > > > > > I > > > > > > would need a reproducer creating the issue from scratch (send and > > > > > consume) > > > > > > > > > > > > Typically it could be associated with paging ? Did you have the > > > > > > destination in page mode ? > > > > > > > > > > > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <t...@abcwxy.com> wrote: > > > > > > > > > > > > > The client code below the stack trace - will reproduce it - the > > > > > > > msg.getDataBuffer() inside the handler of that code will trigger > > > > > > > it > > > > > when > > > > > > a > > > > > > > large message is sent to the address. The exception in question > > > > > > > (IndexOutOfBoundsException) is being caught in line 249 of the > > > > > > > apache > > > > > > > CoreMessage code... and turned into a logged warning on line 250. > > > > The > > > > > > > CoreMessage code then returns the buffer from getReadOnlyBuffer() > > > > > (which > > > > > > > appears to be fine from a quick survey - data also seems to be > > > > > preserved > > > > > > > and accessible from client code) - it is just not clear what the > > > > intent > > > > > > is > > > > > > > if the exception code is executed... is it a "don't worry about > > > > > > > it" - > > > > > or > > > > > > a > > > > > > > "something is wrong here" and I should concern myself with > > > > > > > something? > > > > > > (the > > > > > > > warning level is making me believe that it is more than a "don't > > > > worry > > > > > > > about it" - but the fact that the exception was caught and a valid > > > > > buffer > > > > > > > is returned makes me think it is just a fallback for the other > > > > choices > > > > > of > > > > > > > buffers - and I should not worry about it?). > > > > > > > > > > > > > > Thanks for any insight you may have.... > > > > > > > > > > > > > > logged warning from the caught exception is: > > > > > > > > > > > > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage - > > > > > > readerIndex(4) > > > > > > > + length(270740) exceeds writerIndex(4): > > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > > 4, > > > > > > > widx: 4, cap: 270740) > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(4) + > > > > > > > length(270740) > > > > > > > exceeds writerIndex(4): > > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > > 4, > > > > > > > widx: 4, cap: 270740) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) > > > > > > > at > > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241) > > > > > > > at io.m45.sart.Main.lambda$main$0(Main.java:57) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) > > > > > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl - > > > > > Sending > > > > > > > 65962 from flow-control > > > > > > > > > > > > > > Simple Client Code below: > > > > > > > > > > > > > > > > > > > > > public class Main { > > > > > > > > > > > > > > private final static String ACCEPTOR = "tcp://localhost:9322"; > > > > > > > private final static String QUEUE="service.images.dev:: > > > > > > > service.images.dev"; > > > > > > > > > > > > > > public static void main(String[] args) throws Exception { > > > > > > > > > > > > > > final String username = null; > > > > > > > final String password = null; > > > > > > > > > > > > > > var serverLocator = ActiveMQClient > > > > > > > .createServerLocator(ACCEPTOR) > > > > > > > .setBlockOnDurableSend(true) > > > > > > > .setBlockOnNonDurableSend(true); > > > > > > > > > > > > > > final var sessionFactory = > > > > > serverLocator.createSessionFactory(); > > > > > > > > > > > > > > final var xa = false; > > > > > > > final var autoCommitSends = true; > > > > > > > final var autoCommitAcks = true; > > > > > > > final var ackBatchSize = serverLocator.getAckBatchSize(); > > > > > > > final var preAcknowledge = > > > > > > > serverLocator.isPreAcknowledge(); > > > > > > > final var clientSession = sessionFactory.createSession( > > > > > > > username, > > > > > > > password, > > > > > > > xa, > > > > > > > autoCommitSends, > > > > > > > autoCommitAcks, > > > > > > > preAcknowledge, > > > > > > > ackBatchSize > > > > > > > ); > > > > > > > > > > > > > > var queueQueryResult = > > > > > > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE)); > > > > > > > if (!queueQueryResult.isExists()) { > > > > > > > > > > > > > > clientSession.createQueue(_ServiceQueueConfiguration(new > > > > > > > SimpleString(QUEUE))); > > > > > > > } > > > > > > > > > > > > > > final var consumer = clientSession.createConsumer(QUEUE); > > > > > > > > > > > > > > clientSession.start(); > > > > > > > > > > > > > > consumer.setMessageHandler((msg) -> { > > > > > > > > > > > > > > System.out.println("Received: "+msg.getBodySize()); > > > > > > > msg.getDataBuffer(); > > > > > > > > > > > > > > }); > > > > > > > > > > > > > > while(true) { > > > > > > > Thread.sleep(1000); > > > > > > > } > > > > > > > > > > > > > > } > > > > > > > > > > > > > > private static QueueConfiguration > > > > > > > _ServiceQueueConfiguration(SimpleString queueName) { > > > > > > > final var config = new QueueConfiguration(queueName); > > > > > > > config.setMaxConsumers(1); > > > > > > > config.setPurgeOnNoConsumers(false); > > > > > > > config.setDurable(false); > > > > > > > config.setAutoDelete(false); > > > > > > > config.setRoutingType(RoutingType.MULTICAST); > > > > > > > return config; > > > > > > > } > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram > > > > > > > <jbert...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > Typically an IndexOutOfBoundsException indicates a bug. Do you > > > > have a > > > > > > way > > > > > > > > to reproduce this? > > > > > > > > > > > > > > > > > > > > > > > > Justin > > > > > > > > > > > > > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <t...@abcwxy.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > This seems to appear on larger messages only - I am getting a > > > > > warning > > > > > > > > when > > > > > > > > > calling getDataBuffer (2.20.0 Artemis Client). Curious if > > > > > > > > > there > > > > is > > > > > > > > > something I may be missing - or if this is completely > > > > > > > > > ignorable? > > > > > > > Thanks - > > > > > > > > > Tim > > > > > > > > > > > > > > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2 > > > > > > > (ActiveMQ-client-global-threads)] > > > > > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2 > > > > > > > > (ActiveMQ-client-global-threads)] > > > > > > > > > readerIndex(270740) + length(270740) exceeds > > > > > > > > > writerIndex(271572): > > > > > > > > > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > > > > > > > 270740, widx: 271572, cap: 271572) > > > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) + > > > > > > > length(270740) > > > > > > > > > exceeds writerIndex(271572): > > > > > > > > > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > > > > > > > 270740, widx: 271572, cap: 271572) > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) > > > > > > > > > at > > > > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937) > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407) > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264) > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Clebert Suconic > > > > > > > > > > > > > > > -- > > > > Clebert Suconic > > > > > > > > > > > > -- > > Clebert Suconic > > > > -- > Clebert Suconic -- Clebert Suconic