This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.8.x by this push:
     new 83cb3dc3199 CAMEL-21659: introduce deadline concept to TimeoutExtender 
(#16945) (#16948)
83cb3dc3199 is described below

commit 83cb3dc31996916f3314148599f667509a30caee
Author: Simon Oxenvad Rasmussen <[email protected]>
AuthorDate: Tue Jan 28 14:30:59 2025 +0100

    CAMEL-21659: introduce deadline concept to TimeoutExtender (#16945) (#16948)
    
    * CAMEL-21659: introduce deadline concept to TimeoutExtender
    
    * Split component test to separate test and enable again
---
 .../camel/component/aws2/sqs/Sqs2Consumer.java     | 196 +++++++++++++--------
 .../component/aws2/sqs/AmazonSQSClientMock.java    |  29 ++-
 .../sqs/SqsBatchConsumerConcurrentConsumersIT.java |  14 +-
 .../SqsConsumerExtendMessageVisibilityTest.java    |  11 +-
 .../sqs/integration/SqsComponentLocalstackIT.java  |  34 +---
 ...ava => SqsComponentSendInOnlyLocalstackIT.java} |  37 +---
 .../SqsProducerSendByteArrayLocalstackIT.java      |  11 +-
 7 files changed, 174 insertions(+), 158 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 6fd18faa640..b2603198499 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.aws2.sqs;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -66,7 +67,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.exception.SdkException;
 import software.amazon.awssdk.services.sqs.SqsClient;
-import software.amazon.awssdk.services.sqs.model.*;
+import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
+import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
+import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
+import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.unmodifiableList;
@@ -98,7 +107,8 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
         pendingExchanges = 0;
 
         List<software.amazon.awssdk.services.sqs.model.Message> messages = 
pollingTask.call();
-        // okay we have some response from aws so lets mark the consumer as 
ready
+        // okay we have some response from aws so lets mark the consumer as
+        // ready
         forceConsumerAsReady();
 
         Queue<Exchange> exchanges = createExchanges(messages);
@@ -301,18 +311,16 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
             Integer visibilityTimeout = 
getConfiguration().getVisibilityTimeout();
 
             if (visibilityTimeout != null && visibilityTimeout > 0) {
-                int initialDelay = visibilityTimeout / 2;
-                int period = visibilityTimeout;
-                int repeatSeconds = (int) (visibilityTimeout.doubleValue() * 
1.5);
-                this.timeoutExtender = new TimeoutExtender(repeatSeconds);
+                int delay = Math.max(1, visibilityTimeout / 2);
+                this.timeoutExtender = new TimeoutExtender(visibilityTimeout, 
delay);
 
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
-                            "Scheduled TimeoutExtender task to start after {} 
delay, and run with {}/{} period/repeat (seconds)",
-                            initialDelay, period, repeatSeconds);
+                            "Scheduled TimeoutExtender task to start after {} 
delay, and run with {}/{} delay/repeat (seconds)",
+                            delay, delay, visibilityTimeout);
                 }
                 this.scheduledFuture
-                        = 
scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, initialDelay, 
period, TimeUnit.SECONDS);
+                        = 
scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay, 
TimeUnit.SECONDS);
             }
         }
 
@@ -345,13 +353,16 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
 
     private class TimeoutExtender implements Runnable {
 
+        private static final String RECEIPT_HANDLE_IS_INVALID = 
"ReceiptHandleIsInvalid";
         private static final int MAX_REQUESTS = 10;
-        private final int repeatSeconds;
+        private final int visibilityTimeout;
+        private final int delayBetweenExecutions;
         private final AtomicBoolean run = new AtomicBoolean(true);
-        private final Map<String, ChangeMessageVisibilityBatchRequestEntry> 
entries = new ConcurrentHashMap<>();
+        private final Map<String, TimeoutExtenderEntry> entries = new 
ConcurrentHashMap<>();
 
-        TimeoutExtender(int repeatSeconds) {
-            this.repeatSeconds = repeatSeconds;
+        TimeoutExtender(int visibilityTimeout, int delayBetweenExecutions) {
+            this.visibilityTimeout = visibilityTimeout;
+            this.delayBetweenExecutions = delayBetweenExecutions;
         }
 
         public void add(Exchange exchange) {
@@ -367,19 +378,16 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
                 }
 
                 private void remove(Exchange exchange) {
-                    LOG.trace("Removing exchangeId {} from the 
TimeoutExtender, processing done",
-                            exchange.getExchangeId());
+                    LOG.trace("Removing exchangeId {} from the 
TimeoutExtender, processing done", exchange.getExchangeId());
                     entries.remove(exchange.getExchangeId());
                 }
             });
 
-            ChangeMessageVisibilityBatchRequestEntry entry
-                    = ChangeMessageVisibilityBatchRequestEntry.builder()
-                            
.id(exchange.getExchangeId()).visibilityTimeout(repeatSeconds)
-                            
.receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, 
String.class))
-                            .build();
+            ChangeMessageVisibilityBatchRequestEntry entry = 
ChangeMessageVisibilityBatchRequestEntry.builder()
+                    
.id(exchange.getExchangeId()).visibilityTimeout(visibilityTimeout)
+                    
.receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, 
String.class)).build();
 
-            entries.put(exchange.getExchangeId(), entry);
+            entries.put(exchange.getExchangeId(), new 
TimeoutExtenderEntry(entry));
         }
 
         public void cancel() {
@@ -390,60 +398,106 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
         @Override
         public void run() {
             if (run.get()) {
-                Queue<ChangeMessageVisibilityBatchRequestEntry> entryQueue = 
new LinkedList<>(entries.values());
+                final Instant nextExpectedExecution = 
Instant.now().plusSeconds(Math.max(1, delayBetweenExecutions));
+
+                final Queue<TimeoutExtenderEntry> entryQueue = new 
LinkedList<>(entries.values());
 
                 while (!entryQueue.isEmpty()) {
-                    List<ChangeMessageVisibilityBatchRequestEntry> 
batchEntries = new LinkedList<>();
-                    // up to 10 requests can be sent with each 
ChangeMessageVisibilityBatch action
+                    List<ChangeMessageVisibilityBatchRequestEntry> 
batchEntries = new ArrayList<>();
+                    // up to 10 requests can be sent with each
+                    // ChangeMessageVisibilityBatch action
                     while (!entryQueue.isEmpty() && batchEntries.size() < 
MAX_REQUESTS) {
-                        batchEntries.add(entryQueue.poll());
+                        TimeoutExtenderEntry nextEntry = entryQueue.poll();
+                        if 
(nextEntry.isDeadlineReachedAt(nextExpectedExecution)) {
+                            batchEntries.add(nextEntry.extendRequest);
+                        }
                     }
-
-                    ChangeMessageVisibilityBatchRequest request
-                            = 
ChangeMessageVisibilityBatchRequest.builder().queueUrl(getQueueUrl()).entries(batchEntries)
-                                    .build();
-
-                    try {
-                        LOG.trace("Extending visibility window by {} seconds 
for request entries: {}", repeatSeconds,
-                                batchEntries);
-                        ChangeMessageVisibilityBatchResponse br
-                                = 
getEndpoint().getClient().changeMessageVisibilityBatch(request);
-                        if (br.hasFailed()) {
-                            LOG.warn("Extended visibility window for request 
entries failed: {}", br.failed());
-                        } else {
-                            LOG.debug("Extended visibility window for request 
entries successful: {}", br.successful());
+                    if (!batchEntries.isEmpty()) {
+                        ChangeMessageVisibilityBatchRequest request = 
ChangeMessageVisibilityBatchRequest.builder()
+                                
.queueUrl(getQueueUrl()).entries(batchEntries).build();
+
+                        try {
+                            LOG.trace("Extending visibility window by {} 
seconds for request entries: {}", visibilityTimeout,
+                                    batchEntries);
+                            ChangeMessageVisibilityBatchResponse br
+                                    = 
getEndpoint().getClient().changeMessageVisibilityBatch(request);
+                            if (br.hasFailed()) {
+                                br.failed().forEach(failedEntry -> {
+                                    if 
(failedEntry.code().equals(RECEIPT_HANDLE_IS_INVALID)) {
+                                        LOG.debug("Extended visibility window 
for request entry failed with invalid handle.",
+                                                br.failed());
+                                    } else {
+                                        LOG.warn("Extended visibility window 
for request entry failed: {}", br.failed());
+                                    }
+                                });
+                            }
+                            if (br.hasSuccessful()) {
+                                br.successful().forEach(successEntry -> {
+                                    LOG.debug("Extended visibility window for 
request entry: {}", successEntry.id());
+                                    
entries.computeIfPresent(successEntry.id(), (t, u) -> u.extendDeadline());
+                                });
+                            }
+                        } catch (SdkException e) {
+                            logException(e, batchEntries);
                         }
-                    } catch (SdkException e) {
-                        logException(e, batchEntries);
                     }
                 }
             }
         }
 
         private void logException(Exception e, 
List<ChangeMessageVisibilityBatchRequestEntry> entries) {
-            LOG.warn("Extending visibility window failed for entries {}"
-                     + ". Will not attempt to extend visibility further. This 
exception will be ignored.",
+            LOG.warn(
+                    "Extending visibility window failed for entries {}. Will 
not attempt to extend visibility further. This exception will be ignored.",
                     entries, e);
         }
+
+        private final class TimeoutExtenderEntry {
+
+            /**
+             * Should be extended before this deadline is reached
+             */
+            private final Instant deadline;
+
+            /**
+             * The entry send to AWS for extending the message visibility
+             */
+            private final ChangeMessageVisibilityBatchRequestEntry 
extendRequest;
+
+            TimeoutExtenderEntry(ChangeMessageVisibilityBatchRequestEntry 
extendRequest) {
+
+                // setting deadline to 80% of now until expected visibility
+                // timeout, this is for taking into account processing time
+                this.deadline = 
Instant.now().plusMillis(extendRequest.visibilityTimeout() * 800);
+                this.extendRequest = extendRequest;
+            }
+
+            TimeoutExtenderEntry extendDeadline() {
+                return new TimeoutExtenderEntry(extendRequest);
+            }
+
+            boolean isDeadlineReachedAt(Instant time) {
+                return deadline.isBefore(time);
+            }
+        }
     }
 
     /**
      * Task responsible for polling the messages from Amazon SQS server.
-     *
+     * <p />
      * Depending on the configuration, the polling may involve sending one or 
more receive requests in a single task
      * call. The number of send requests depends on the {@link 
Sqs2Endpoint#getMaxMessagesPerPoll()} configuration. The
      * Amazon SQS receive API has upper limit of maximum 10 messages that can 
be fetched with a single request. To
      * enable handling greater number of messages fetched per poll, multiple 
requests are being send asynchronously and
      * then joined together.
-     *
+     * <p />
      * To preserver the ordering, an optional {@link 
Sqs2Configuration#getSortAttributeName()} can be configured. When
      * specified, all messages collected from the concurrent requests are 
being sorted using this attribute.
-     *
+     * <p />
      * In addition to that, the task is also responsible for handling 
auto-creation of the SQS queue, when its missing.
      * The queue is created when receive request returns an error about the 
missing queue and the
      * {@link Sqs2Configuration#isAutoCreateQueue()} is enabled. In such case, 
the queue will be created and the task
      * will return empty list of messages.
-     *
+     * <p />
      * If the queue creation fails with an error related to recently deleted 
queue, the queue creation will be postponed
      * for at least 30 seconds. To prevent task from blocking the consumer 
thread, the 30 second timeout is being
      * checked in each task call. If the scheduled time for queue 
auto-creation was not reached yet, the task will
@@ -536,7 +590,8 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
                 }
                 throw new IOException(
                         ("Error while polling - all %s requests resulted in an 
error, "
-                         + "please check the logs for more 
details").formatted(numberOfRequestsPerPoll));
+                         + "please check the logs for more details")
+                                .formatted(numberOfRequestsPerPoll));
             }
             return messages;
         }
@@ -568,8 +623,10 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
 
         private List<software.amazon.awssdk.services.sqs.model.Message> 
poll(int maxNumberOfMessages, PollingContext context) {
             if (context.isQueueMissing()) {
-                // if one of the request encountered a missing queue error the 
remaining requests
-                // should be ignored, even if the queue is automatically 
created it will be empty
+                // if one of the request encountered a missing queue error the
+                // remaining requests
+                // should be ignored, even if the queue is automatically 
created
+                // it will be empty
                 // so there is no reason for immediate polling after creation
                 return emptyList();
             }
@@ -603,11 +660,9 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
         }
 
         private ReceiveMessageRequest createReceiveRequest(int 
maxNumberOfMessages) {
-            ReceiveMessageRequest.Builder requestBuilder = 
ReceiveMessageRequest.builder()
-                    .queueUrl(queueUrl)
-                    .maxNumberOfMessages(maxNumberOfMessages)
-                    .visibilityTimeout(visibilityTimeout)
-                    .waitTimeSeconds(waitTimeSeconds);
+            ReceiveMessageRequest.Builder requestBuilder
+                    = 
ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(maxNumberOfMessages)
+                            
.visibilityTimeout(visibilityTimeout).waitTimeSeconds(waitTimeSeconds);
             if (!attributeNames.isEmpty()) {
                 requestBuilder.messageSystemAttributeNames(attributeNames);
             }
@@ -623,9 +678,11 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
             try {
                 if (isClosed() || 
context.isMissingQueueHandledInAnotherRequest(requestId)) {
                     // the missing queue error can be thrown by multiple 
threads
-                    // the first thread that is handling the error should 
prevent other threads
+                    // the first thread that is handling the error should
+                    // prevent other threads
                     // from repeating the logic
-                    // as the operation is synchronized, the other threads 
should wait and then
+                    // as the operation is synchronized, the other threads
+                    // should wait and then
                     // check if it wasn't handled already
                     return;
                 }
@@ -696,25 +753,21 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
         }
 
         private static MessageSystemAttributeName 
getSortAttributeName(Sqs2Configuration configuration) {
-            return 
parseMessageSystemAttributeName(configuration.getSortAttributeName())
-                    .filter(attribute -> {
-                        if (attribute == MessageSystemAttributeName.ALL) {
-                            LOG.warn("The {} attribute cannot be used for 
sorting the received messages",
-                                    MessageSystemAttributeName.ALL);
-                            return false;
-                        }
-                        return true;
-                    })
-                    .orElse(null);
+            return 
parseMessageSystemAttributeName(configuration.getSortAttributeName()).filter(attribute
 -> {
+                if (attribute == MessageSystemAttributeName.ALL) {
+                    LOG.warn("The {} attribute cannot be used for sorting the 
received messages",
+                            MessageSystemAttributeName.ALL);
+                    return false;
+                }
+                return true;
+            }).orElse(null);
         }
 
         private static List<MessageSystemAttributeName> getAttributeNames(
                 Sqs2Configuration configuration, MessageSystemAttributeName 
sortAttributeName) {
             List<MessageSystemAttributeName> result = new ArrayList<>();
             for (String attributeName : 
splitCommaSeparatedValues(configuration.getAttributeNames())) {
-                parseMessageSystemAttributeName(attributeName)
-                        .filter(it -> !result.contains(it))
-                        .ifPresent(result::add);
+                parseMessageSystemAttributeName(attributeName).filter(it -> 
!result.contains(it)).ifPresent(result::add);
             }
             if (sortAttributeName != null && 
!result.contains(MessageSystemAttributeName.ALL)
                     && !result.contains(sortAttributeName)) {
@@ -742,8 +795,7 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
                 LOG.trace("Received {} messages in {} requests", 
messages.size(), numberOfRequestsPerPoll);
             }
             if (sortAttributeName != null) {
-                return messages.stream()
-                        .sorted(comparing(message -> 
message.attributes().getOrDefault(sortAttributeName, "")))
+                return messages.stream().sorted(comparing(message -> 
message.attributes().getOrDefault(sortAttributeName, "")))
                         .toList();
             }
             return messages;
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
index 9e8707b9735..e9b796f5e7f 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
@@ -36,7 +36,9 @@ import software.amazon.awssdk.services.sqs.SqsClient;
 import software.amazon.awssdk.services.sqs.SqsServiceClientConfiguration;
 import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
 import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
 import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
+import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResultEntry;
 import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
 import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
 import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
@@ -111,9 +113,8 @@ public class AmazonSQSClientMock implements SqsClient {
         listQueuesRequests.offer(request);
 
         ListQueuesResponse.Builder result = ListQueuesResponse.builder();
-        result.queueUrls(Optional.ofNullable(queueName)
-                .map(it -> List.of("/" + it))
-                .orElseGet(() -> List.of("/queue1", "/queue2")));
+        result.queueUrls(
+                Optional.ofNullable(queueName).map(it -> List.of("/" + 
it)).orElseGet(() -> List.of("/queue1", "/queue2")));
         return result.build();
     }
 
@@ -246,7 +247,21 @@ public class AmazonSQSClientMock implements SqsClient {
             ChangeMessageVisibilityBatchRequest 
changeMessageVisibilityBatchRequest) {
         
this.changeMessageVisibilityBatchRequests.offer(changeMessageVisibilityBatchRequest);
 
-        return ChangeMessageVisibilityBatchResponse.builder().build();
+        // mark all as success
+        List<ChangeMessageVisibilityBatchResultEntry> successful
+                = 
changeMessageVisibilityBatchRequest.entries().stream().map(this::successVisibilityExtension).toList();
+
+        // setting empty collections to null to support hasSuccessful which
+        // perform null check rather than isEmpty checks
+        if (successful.isEmpty()) {
+            successful = null;
+        }
+
+        return 
ChangeMessageVisibilityBatchResponse.builder().successful(successful).build();
+    }
+
+    private ChangeMessageVisibilityBatchResultEntry 
successVisibilityExtension(ChangeMessageVisibilityBatchRequestEntry r) {
+        return 
ChangeMessageVisibilityBatchResultEntry.builder().id(r.id()).build();
     }
 
     @Override
@@ -264,7 +279,7 @@ public class AmazonSQSClientMock implements SqsClient {
         Collection<BatchResultErrorEntry> entriesFail = new ArrayList<>();
         BatchResultErrorEntry.Builder entry3 = BatchResultErrorEntry.builder();
         BatchResultErrorEntry.Builder entry4 = BatchResultErrorEntry.builder();
-        entry3.id("team1");
+        entry3.id("team3");
         entry4.id("team4");
         entriesFail.add(entry3.build());
         entriesFail.add(entry4.build());
@@ -312,9 +327,7 @@ public class AmazonSQSClientMock implements SqsClient {
         if (queueUrl == null) {
             throw QueueDoesNotExistException.builder().build();
         }
-        return GetQueueUrlResponse.builder()
-                .queueUrl(queueUrl)
-                .build();
+        return GetQueueUrlResponse.builder().queueUrl(queueUrl).build();
     }
 
     ScheduledExecutorService getScheduler() {
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
index d8115e2eb90..2b2d1ab929f 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.aws2.sqs;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
@@ -31,21 +33,21 @@ public class SqsBatchConsumerConcurrentConsumersIT extends 
CamelTestSupport {
 
     @Test
     public void receiveBatch() throws Exception {
-        mock.expectedMinimumMessageCount(5);
-        MockEndpoint.assertIsSatisfied(context);
+        mock.expectedMessageCount(6);
+        MockEndpoint.assertIsSatisfied(context, 3, TimeUnit.SECONDS);
     }
 
     @BindToRegistry("amazonSQSClient")
     public AmazonSQSClientMock addClient() {
 
         AmazonSQSClientMock clientMock = new AmazonSQSClientMock();
-        // add 6 messages, one more we will poll
+        // add 6 messages, one more than we will poll
         for (int counter = 0; counter < 6; counter++) {
             Message.Builder message = Message.builder();
             message.body("Message " + counter);
-            message.md5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
-            message.messageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
-            message.receiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5");
+            message.md5OfBody("6a1559560f67c5e7a7d5d838bf0272ee" + counter);
+            message.messageId("f6fb6f99-5eb2-4be4-9b15-144774141458" + 
counter);
+            message.receiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5" + 
counter);
 
             clientMock.addMessage(message.build());
         }
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
index 1c2d2aea695..8c105dd7adc 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
@@ -26,11 +26,11 @@ import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.sqs.model.Message;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 public class SqsConsumerExtendMessageVisibilityTest extends CamelTestSupport {
 
-    private static final int TIMEOUT = 2; // 2 seconds.
+    private static final int TIMEOUT_IN_SECONDS = 2; // 2 seconds.
     private static final String RECEIPT_HANDLE = 
"0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5";
 
     @EndpointInject("mock:result")
@@ -46,7 +46,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends 
CamelTestSupport {
             @Override
             public void process(Exchange exchange) throws Exception {
                 // Simulate message that takes a while to receive.
-                Thread.sleep(TIMEOUT * 3000L); // 150% of TIMEOUT.
+                Thread.sleep(TIMEOUT_IN_SECONDS * 1500L); // 150% of TIMEOUT.
             }
         });
 
@@ -60,8 +60,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends 
CamelTestSupport {
         // Wait for message to arrive.
         MockEndpoint.assertIsSatisfied(context);
 
-        
assertTrue(this.client.getChangeMessageVisibilityBatchRequests().size() >= 1);
-        
assertTrue(this.client.getChangeMessageVisibilityBatchRequests().size() <= 3);
+        
assertThat(this.client.getChangeMessageVisibilityBatchRequests()).hasSizeBetween(1,
 3);
     }
 
     @Override
@@ -69,7 +68,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends 
CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() {
-                
from("aws2-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&visibilityTimeout=" + 
TIMEOUT
+                
from("aws2-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&visibilityTimeout=" + 
TIMEOUT_IN_SECONDS
                      + "&extendMessageVisibility=true").to("mock:result");
             }
         };
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
index 5e5d8534d46..4566d60cf45 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
@@ -37,35 +37,12 @@ public class SqsComponentLocalstackIT extends 
Aws2SQSBaseTest {
     @EndpointInject("mock:result")
     private MockEndpoint result;
 
-    @Test
-    public void sendInOnly() throws Exception {
-        result.expectedMessageCount(1);
-
-        Exchange exchange = template.send("direct:start", 
ExchangePattern.InOnly, new Processor() {
-            public void process(Exchange exchange) {
-                exchange.getIn().setBody("This is my message text.");
-            }
-        });
-
-        MockEndpoint.assertIsSatisfied(context);
-
-        Exchange resultExchange = result.getExchanges().get(0);
-        assertEquals("This is my message text.", 
resultExchange.getIn().getBody());
-        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
-        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE));
-        assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", 
resultExchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
-        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.ATTRIBUTES));
-        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES));
-
-        assertNotNull(exchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
-        assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", 
exchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
-    }
-
     @Test
     public void sendInOut() throws Exception {
         result.expectedMessageCount(1);
 
         Exchange exchange = template.send("direct:start", 
ExchangePattern.InOut, new Processor() {
+            @Override
             public void process(Exchange exchange) {
                 exchange.getIn().setBody("This is my message text.");
             }
@@ -87,11 +64,10 @@ public class SqsComponentLocalstackIT extends 
Aws2SQSBaseTest {
 
     @Override
     protected RouteBuilder createRouteBuilder() {
-        final String sqsEndpointUri = String
-                
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
-                        sharedNameGenerator.getName(),
-                        "1209600", "65536", "60",
-                        
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+        final String sqsEndpointUri = String.format(
+                
"aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+                sharedNameGenerator.getName(), "1209600", "65536", "60",
+                
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
 
         return new RouteBuilder() {
             @Override
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentSendInOnlyLocalstackIT.java
similarity index 64%
copy from 
components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
copy to 
components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentSendInOnlyLocalstackIT.java
index 5e5d8534d46..d5d7d865d92 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentSendInOnlyLocalstackIT.java
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.Test;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
-public class SqsComponentLocalstackIT extends Aws2SQSBaseTest {
+public class SqsComponentSendInOnlyLocalstackIT extends Aws2SQSBaseTest {
 
     @EndpointInject("direct:start")
     private ProducerTemplate template;
@@ -42,11 +42,11 @@ public class SqsComponentLocalstackIT extends 
Aws2SQSBaseTest {
         result.expectedMessageCount(1);
 
         Exchange exchange = template.send("direct:start", 
ExchangePattern.InOnly, new Processor() {
+            @Override
             public void process(Exchange exchange) {
                 exchange.getIn().setBody("This is my message text.");
             }
         });
-
         MockEndpoint.assertIsSatisfied(context);
 
         Exchange resultExchange = result.getExchanges().get(0);
@@ -61,37 +61,12 @@ public class SqsComponentLocalstackIT extends 
Aws2SQSBaseTest {
         assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", 
exchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
     }
 
-    @Test
-    public void sendInOut() throws Exception {
-        result.expectedMessageCount(1);
-
-        Exchange exchange = template.send("direct:start", 
ExchangePattern.InOut, new Processor() {
-            public void process(Exchange exchange) {
-                exchange.getIn().setBody("This is my message text.");
-            }
-        });
-
-        MockEndpoint.assertIsSatisfied(context);
-
-        Exchange resultExchange = result.getExchanges().get(0);
-        assertEquals("This is my message text.", 
resultExchange.getIn().getBody());
-        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE));
-        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
-        assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", 
resultExchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
-        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.ATTRIBUTES));
-        
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES));
-
-        
assertNotNull(exchange.getMessage().getHeader(Sqs2Constants.MESSAGE_ID));
-        assertEquals("6a1559560f67c5e7a7d5d838bf0272ee", 
exchange.getMessage().getHeader(Sqs2Constants.MD5_OF_BODY));
-    }
-
     @Override
     protected RouteBuilder createRouteBuilder() {
-        final String sqsEndpointUri = String
-                
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
-                        sharedNameGenerator.getName(),
-                        "1209600", "65536", "60",
-                        
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+        final String sqsEndpointUri = String.format(
+                
"aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+                sharedNameGenerator.getName(), "1209600", "65536", "60",
+                
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
 
         return new RouteBuilder() {
             @Override
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendByteArrayLocalstackIT.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendByteArrayLocalstackIT.java
index 8e10866644d..93d9fda2d7e 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendByteArrayLocalstackIT.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendByteArrayLocalstackIT.java
@@ -24,11 +24,9 @@ import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Assert;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.shaded.org.bouncycastle.util.Strings;
 
-@Disabled("Not working due localstack update (Incorrect padding error), it is 
working against real SQS")
 public class SqsProducerSendByteArrayLocalstackIT extends Aws2SQSBaseTest {
 
     @EndpointInject("direct:start")
@@ -42,6 +40,7 @@ public class SqsProducerSendByteArrayLocalstackIT extends 
Aws2SQSBaseTest {
         result.expectedMessageCount(1);
 
         Exchange exchange = template.send("direct:start", 
ExchangePattern.InOnly, new Processor() {
+            @Override
             public void process(Exchange exchange) {
                 byte[] headerValue = "HeaderTest".getBytes();
                 exchange.getIn().setHeader("value1", headerValue);
@@ -61,11 +60,11 @@ public class SqsProducerSendByteArrayLocalstackIT extends 
Aws2SQSBaseTest {
         return new RouteBuilder() {
             @Override
             public void configure() {
-                from("direct:start").startupOrder(2)
-                        .toF("aws2-sqs://%s?autoCreateQueue=true", 
sharedNameGenerator.getName()).to("mock:result");
+                
from("direct:start").startupOrder(2).toF("aws2-sqs://%s?autoCreateQueue=true", 
sharedNameGenerator.getName())
+                        .to("mock:result");
 
-                
fromF("aws2-sqs://%s?deleteAfterRead=true&autoCreateQueue=true", 
sharedNameGenerator.getName())
-                        .startupOrder(1).log("${body}");
+                
fromF("aws2-sqs://%s?deleteAfterRead=true&autoCreateQueue=true", 
sharedNameGenerator.getName()).startupOrder(1)
+                        .log("${body}");
             }
         };
     }


Reply via email to