This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.8.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push:
new 83cb3dc3199 CAMEL-21659: introduce deadline concept to TimeoutExtender
(#16945) (#16948)
83cb3dc3199 is described below
commit 83cb3dc31996916f3314148599f667509a30caee
Author: Simon Oxenvad Rasmussen <[email protected]>
AuthorDate: Tue Jan 28 14:30:59 2025 +0100
CAMEL-21659: introduce deadline concept to TimeoutExtender (#16945) (#16948)
* CAMEL-21659: introduce deadline concept to TimeoutExtender
* Split component test to separate test and enable again
---
.../camel/component/aws2/sqs/Sqs2Consumer.java | 196 +++++++++++++--------
.../component/aws2/sqs/AmazonSQSClientMock.java | 29 ++-
.../sqs/SqsBatchConsumerConcurrentConsumersIT.java | 14 +-
.../SqsConsumerExtendMessageVisibilityTest.java | 11 +-
.../sqs/integration/SqsComponentLocalstackIT.java | 34 +---
...ava => SqsComponentSendInOnlyLocalstackIT.java} | 37 +---
.../SqsProducerSendByteArrayLocalstackIT.java | 11 +-
7 files changed, 174 insertions(+), 158 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 6fd18faa640..b2603198499 100644
---
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.aws2.sqs;
import java.io.Closeable;
import java.io.IOException;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -66,7 +67,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sqs.SqsClient;
-import software.amazon.awssdk.services.sqs.model.*;
+import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
+import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
+import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
+import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList;
@@ -98,7 +107,8 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
pendingExchanges = 0;
List<software.amazon.awssdk.services.sqs.model.Message> messages =
pollingTask.call();
- // okay we have some response from aws so lets mark the consumer as
ready
+ // okay we have some response from aws so lets mark the consumer as
+ // ready
forceConsumerAsReady();
Queue<Exchange> exchanges = createExchanges(messages);
@@ -301,18 +311,16 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
Integer visibilityTimeout =
getConfiguration().getVisibilityTimeout();
if (visibilityTimeout != null && visibilityTimeout > 0) {
- int initialDelay = visibilityTimeout / 2;
- int period = visibilityTimeout;
- int repeatSeconds = (int) (visibilityTimeout.doubleValue() *
1.5);
- this.timeoutExtender = new TimeoutExtender(repeatSeconds);
+ int delay = Math.max(1, visibilityTimeout / 2);
+ this.timeoutExtender = new TimeoutExtender(visibilityTimeout,
delay);
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Scheduled TimeoutExtender task to start after {}
delay, and run with {}/{} period/repeat (seconds)",
- initialDelay, period, repeatSeconds);
+ "Scheduled TimeoutExtender task to start after {}
delay, and run with {}/{} delay/repeat (seconds)",
+ delay, delay, visibilityTimeout);
}
this.scheduledFuture
- =
scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, initialDelay,
period, TimeUnit.SECONDS);
+ =
scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay,
TimeUnit.SECONDS);
}
}
@@ -345,13 +353,16 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
private class TimeoutExtender implements Runnable {
+ private static final String RECEIPT_HANDLE_IS_INVALID =
"ReceiptHandleIsInvalid";
private static final int MAX_REQUESTS = 10;
- private final int repeatSeconds;
+ private final int visibilityTimeout;
+ private final int delayBetweenExecutions;
private final AtomicBoolean run = new AtomicBoolean(true);
- private final Map<String, ChangeMessageVisibilityBatchRequestEntry>
entries = new ConcurrentHashMap<>();
+ private final Map<String, TimeoutExtenderEntry> entries = new
ConcurrentHashMap<>();
- TimeoutExtender(int repeatSeconds) {
- this.repeatSeconds = repeatSeconds;
+ TimeoutExtender(int visibilityTimeout, int delayBetweenExecutions) {
+ this.visibilityTimeout = visibilityTimeout;
+ this.delayBetweenExecutions = delayBetweenExecutions;
}
public void add(Exchange exchange) {
@@ -367,19 +378,16 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
}
private void remove(Exchange exchange) {
- LOG.trace("Removing exchangeId {} from the
TimeoutExtender, processing done",
- exchange.getExchangeId());
+ LOG.trace("Removing exchangeId {} from the
TimeoutExtender, processing done", exchange.getExchangeId());
entries.remove(exchange.getExchangeId());
}
});
- ChangeMessageVisibilityBatchRequestEntry entry
- = ChangeMessageVisibilityBatchRequestEntry.builder()
-
.id(exchange.getExchangeId()).visibilityTimeout(repeatSeconds)
-
.receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE,
String.class))
- .build();
+ ChangeMessageVisibilityBatchRequestEntry entry =
ChangeMessageVisibilityBatchRequestEntry.builder()
+
.id(exchange.getExchangeId()).visibilityTimeout(visibilityTimeout)
+
.receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE,
String.class)).build();
- entries.put(exchange.getExchangeId(), entry);
+ entries.put(exchange.getExchangeId(), new
TimeoutExtenderEntry(entry));
}
public void cancel() {
@@ -390,60 +398,106 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
@Override
public void run() {
if (run.get()) {
- Queue<ChangeMessageVisibilityBatchRequestEntry> entryQueue =
new LinkedList<>(entries.values());
+ final Instant nextExpectedExecution =
Instant.now().plusSeconds(Math.max(1, delayBetweenExecutions));
+
+ final Queue<TimeoutExtenderEntry> entryQueue = new
LinkedList<>(entries.values());
while (!entryQueue.isEmpty()) {
- List<ChangeMessageVisibilityBatchRequestEntry>
batchEntries = new LinkedList<>();
- // up to 10 requests can be sent with each
ChangeMessageVisibilityBatch action
+ List<ChangeMessageVisibilityBatchRequestEntry>
batchEntries = new ArrayList<>();
+ // up to 10 requests can be sent with each
+ // ChangeMessageVisibilityBatch action
while (!entryQueue.isEmpty() && batchEntries.size() <
MAX_REQUESTS) {
- batchEntries.add(entryQueue.poll());
+ TimeoutExtenderEntry nextEntry = entryQueue.poll();
+ if
(nextEntry.isDeadlineReachedAt(nextExpectedExecution)) {
+ batchEntries.add(nextEntry.extendRequest);
+ }
}
-
- ChangeMessageVisibilityBatchRequest request
- =
ChangeMessageVisibilityBatchRequest.builder().queueUrl(getQueueUrl()).entries(batchEntries)
- .build();
-
- try {
- LOG.trace("Extending visibility window by {} seconds
for request entries: {}", repeatSeconds,
- batchEntries);
- ChangeMessageVisibilityBatchResponse br
- =
getEndpoint().getClient().changeMessageVisibilityBatch(request);
- if (br.hasFailed()) {
- LOG.warn("Extended visibility window for request
entries failed: {}", br.failed());
- } else {
- LOG.debug("Extended visibility window for request
entries successful: {}", br.successful());
+ if (!batchEntries.isEmpty()) {
+ ChangeMessageVisibilityBatchRequest request =
ChangeMessageVisibilityBatchRequest.builder()
+
.queueUrl(getQueueUrl()).entries(batchEntries).build();
+
+ try {
+ LOG.trace("Extending visibility window by {}
seconds for request entries: {}", visibilityTimeout,
+ batchEntries);
+ ChangeMessageVisibilityBatchResponse br
+ =
getEndpoint().getClient().changeMessageVisibilityBatch(request);
+ if (br.hasFailed()) {
+ br.failed().forEach(failedEntry -> {
+ if
(failedEntry.code().equals(RECEIPT_HANDLE_IS_INVALID)) {
+ LOG.debug("Extended visibility window
for request entry failed with invalid handle.",
+ br.failed());
+ } else {
+ LOG.warn("Extended visibility window
for request entry failed: {}", br.failed());
+ }
+ });
+ }
+ if (br.hasSuccessful()) {
+ br.successful().forEach(successEntry -> {
+ LOG.debug("Extended visibility window for
request entry: {}", successEntry.id());
+
entries.computeIfPresent(successEntry.id(), (t, u) -> u.extendDeadline());
+ });
+ }
+ } catch (SdkException e) {
+ logException(e, batchEntries);
}
- } catch (SdkException e) {
- logException(e, batchEntries);
}
}
}
}
private void logException(Exception e,
List<ChangeMessageVisibilityBatchRequestEntry> entries) {
- LOG.warn("Extending visibility window failed for entries {}"
- + ". Will not attempt to extend visibility further. This
exception will be ignored.",
+ LOG.warn(
+ "Extending visibility window failed for entries {}. Will
not attempt to extend visibility further. This exception will be ignored.",
entries, e);
}
+
+ private final class TimeoutExtenderEntry {
+
+ /**
+ * Should be extended before this deadline is reached
+ */
+ private final Instant deadline;
+
+ /**
+ * The entry send to AWS for extending the message visibility
+ */
+ private final ChangeMessageVisibilityBatchRequestEntry
extendRequest;
+
+ TimeoutExtenderEntry(ChangeMessageVisibilityBatchRequestEntry
extendRequest) {
+
+ // setting deadline to 80% of now until expected visibility
+ // timeout, this is for taking into account processing time
+ this.deadline =
Instant.now().plusMillis(extendRequest.visibilityTimeout() * 800);
+ this.extendRequest = extendRequest;
+ }
+
+ TimeoutExtenderEntry extendDeadline() {
+ return new TimeoutExtenderEntry(extendRequest);
+ }
+
+ boolean isDeadlineReachedAt(Instant time) {
+ return deadline.isBefore(time);
+ }
+ }
}
/**
* Task responsible for polling the messages from Amazon SQS server.
- *
+ * <p />
* Depending on the configuration, the polling may involve sending one or
more receive requests in a single task
* call. The number of send requests depends on the {@link
Sqs2Endpoint#getMaxMessagesPerPoll()} configuration. The
* Amazon SQS receive API has upper limit of maximum 10 messages that can
be fetched with a single request. To
* enable handling greater number of messages fetched per poll, multiple
requests are being send asynchronously and
* then joined together.
- *
+ * <p />
* To preserver the ordering, an optional {@link
Sqs2Configuration#getSortAttributeName()} can be configured. When
* specified, all messages collected from the concurrent requests are
being sorted using this attribute.
- *
+ * <p />
* In addition to that, the task is also responsible for handling
auto-creation of the SQS queue, when its missing.
* The queue is created when receive request returns an error about the
missing queue and the
* {@link Sqs2Configuration#isAutoCreateQueue()} is enabled. In such case,
the queue will be created and the task
* will return empty list of messages.
- *
+ * <p />
* If the queue creation fails with an error related to recently deleted
queue, the queue creation will be postponed
* for at least 30 seconds. To prevent task from blocking the consumer
thread, the 30 second timeout is being
* checked in each task call. If the scheduled time for queue
auto-creation was not reached yet, the task will
@@ -536,7 +590,8 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
}
throw new IOException(
("Error while polling - all %s requests resulted in an
error, "
- + "please check the logs for more
details").formatted(numberOfRequestsPerPoll));
+ + "please check the logs for more details")
+ .formatted(numberOfRequestsPerPoll));
}
return messages;
}
@@ -568,8 +623,10 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
private List<software.amazon.awssdk.services.sqs.model.Message>
poll(int maxNumberOfMessages, PollingContext context) {
if (context.isQueueMissing()) {
- // if one of the request encountered a missing queue error the
remaining requests
- // should be ignored, even if the queue is automatically
created it will be empty
+ // if one of the request encountered a missing queue error the
+ // remaining requests
+ // should be ignored, even if the queue is automatically
created
+ // it will be empty
// so there is no reason for immediate polling after creation
return emptyList();
}
@@ -603,11 +660,9 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
}
private ReceiveMessageRequest createReceiveRequest(int
maxNumberOfMessages) {
- ReceiveMessageRequest.Builder requestBuilder =
ReceiveMessageRequest.builder()
- .queueUrl(queueUrl)
- .maxNumberOfMessages(maxNumberOfMessages)
- .visibilityTimeout(visibilityTimeout)
- .waitTimeSeconds(waitTimeSeconds);
+ ReceiveMessageRequest.Builder requestBuilder
+ =
ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(maxNumberOfMessages)
+
.visibilityTimeout(visibilityTimeout).waitTimeSeconds(waitTimeSeconds);
if (!attributeNames.isEmpty()) {
requestBuilder.messageSystemAttributeNames(attributeNames);
}
@@ -623,9 +678,11 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
try {
if (isClosed() ||
context.isMissingQueueHandledInAnotherRequest(requestId)) {
// the missing queue error can be thrown by multiple
threads
- // the first thread that is handling the error should
prevent other threads
+ // the first thread that is handling the error should
+ // prevent other threads
// from repeating the logic
- // as the operation is synchronized, the other threads
should wait and then
+ // as the operation is synchronized, the other threads
+ // should wait and then
// check if it wasn't handled already
return;
}
@@ -696,25 +753,21 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
}
private static MessageSystemAttributeName
getSortAttributeName(Sqs2Configuration configuration) {
- return
parseMessageSystemAttributeName(configuration.getSortAttributeName())
- .filter(attribute -> {
- if (attribute == MessageSystemAttributeName.ALL) {
- LOG.warn("The {} attribute cannot be used for
sorting the received messages",
- MessageSystemAttributeName.ALL);
- return false;
- }
- return true;
- })
- .orElse(null);
+ return
parseMessageSystemAttributeName(configuration.getSortAttributeName()).filter(attribute
-> {
+ if (attribute == MessageSystemAttributeName.ALL) {
+ LOG.warn("The {} attribute cannot be used for sorting the
received messages",
+ MessageSystemAttributeName.ALL);
+ return false;
+ }
+ return true;
+ }).orElse(null);
}
private static List<MessageSystemAttributeName> getAttributeNames(
Sqs2Configuration configuration, MessageSystemAttributeName
sortAttributeName) {
List<MessageSystemAttributeName> result = new ArrayList<>();
for (String attributeName :
splitCommaSeparatedValues(configuration.getAttributeNames())) {
- parseMessageSystemAttributeName(attributeName)
- .filter(it -> !result.contains(it))
- .ifPresent(result::add);
+ parseMessageSystemAttributeName(attributeName).filter(it ->
!result.contains(it)).ifPresent(result::add);
}
if (sortAttributeName != null &&
!result.contains(MessageSystemAttributeName.ALL)
&& !result.contains(sortAttributeName)) {
@@ -742,8 +795,7 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
LOG.trace("Received {} messages in {} requests",
messages.size(), numberOfRequestsPerPoll);
}
if (sortAttributeName != null) {
- return messages.stream()
- .sorted(comparing(message ->
message.attributes().getOrDefault(sortAttributeName, "")))
+ return messages.stream().sorted(comparing(message ->
message.attributes().getOrDefault(sortAttributeName, "")))
.toList();
}
return messages;
diff --git
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
index 9e8707b9735..e9b796f5e7f 100644
---
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
+++
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
@@ -36,7 +36,9 @@ import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsServiceClientConfiguration;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
+import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
+import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
@@ -111,9 +113,8 @@ public class AmazonSQSClientMock implements SqsClient {
listQueuesRequests.offer(request);
ListQueuesResponse.Builder result = ListQueuesResponse.builder();
- result.queueUrls(Optional.ofNullable(queueName)
- .map(it -> List.of("/" + it))
- .orElseGet(() -> List.of("/queue1", "/queue2")));
+ result.queueUrls(
+ Optional.ofNullable(queueName).map(it -> List.of("/" +
it)).orElseGet(() -> List.of("/queue1", "/queue2")));
return result.build();
}
@@ -246,7 +247,21 @@ public class AmazonSQSClientMock implements SqsClient {
ChangeMessageVisibilityBatchRequest
changeMessageVisibilityBatchRequest) {
this.changeMessageVisibilityBatchRequests.offer(changeMessageVisibilityBatchRequest);
- return ChangeMessageVisibilityBatchResponse.builder().build();
+ // mark all as success
+ List<ChangeMessageVisibilityBatchResultEntry> successful
+ =
changeMessageVisibilityBatchRequest.entries().stream().map(this::successVisibilityExtension).toList();
+
+ // setting empty collections to null to support hasSuccessful which
+ // perform null check rather than isEmpty checks
+ if (successful.isEmpty()) {
+ successful = null;
+ }
+
+ return
ChangeMessageVisibilityBatchResponse.builder().successful(successful).build();
+ }
+
+ private ChangeMessageVisibilityBatchResultEntry
successVisibilityExtension(ChangeMessageVisibilityBatchRequestEntry r) {
+ return
ChangeMessageVisibilityBatchResultEntry.builder().id(r.id()).build();
}
@Override
@@ -264,7 +279,7 @@ public class AmazonSQSClientMock implements SqsClient {
Collection<BatchResultErrorEntry> entriesFail = new ArrayList<>();
BatchResultErrorEntry.Builder entry3 = BatchResultErrorEntry.builder();
BatchResultErrorEntry.Builder entry4 = BatchResultErrorEntry.builder();
- entry3.id("team1");
+ entry3.id("team3");
entry4.id("team4");
entriesFail.add(entry3.build());
entriesFail.add(entry4.build());
@@ -312,9 +327,7 @@ public class AmazonSQSClientMock implements SqsClient {
if (queueUrl == null) {
throw QueueDoesNotExistException.builder().build();
}
- return GetQueueUrlResponse.builder()
- .queueUrl(queueUrl)
- .build();
+ return GetQueueUrlResponse.builder().queueUrl(queueUrl).build();
}
ScheduledExecutorService getScheduler() {
diff --git
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
index d8115e2eb90..2b2d1ab929f 100644
---
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
+++
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.aws2.sqs;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.BindToRegistry;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
@@ -31,21 +33,21 @@ public class SqsBatchConsumerConcurrentConsumersIT extends
CamelTestSupport {
@Test
public void receiveBatch() throws Exception {
- mock.expectedMinimumMessageCount(5);
- MockEndpoint.assertIsSatisfied(context);
+ mock.expectedMessageCount(6);
+ MockEndpoint.assertIsSatisfied(context, 3, TimeUnit.SECONDS);
}
@BindToRegistry("amazonSQSClient")
public AmazonSQSClientMock addClient() {
AmazonSQSClientMock clientMock = new AmazonSQSClientMock();
- // add 6 messages, one more we will poll
+ // add 6 messages, one more than we will poll
for (int counter = 0; counter < 6; counter++) {
Message.Builder message = Message.builder();
message.body("Message " + counter);
- message.md5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
- message.messageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
- message.receiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5");
+ message.md5OfBody("6a1559560f67c5e7a7d5d838bf0272ee" + counter);
+ message.messageId("f6fb6f99-5eb2-4be4-9b15-144774141458" +
counter);
+ message.receiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5" +
counter);
clientMock.addMessage(message.build());
}
diff --git
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
index 1c2d2aea695..8c105dd7adc 100644
---
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
+++
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
@@ -26,11 +26,11 @@ import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.sqs.model.Message;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
public class SqsConsumerExtendMessageVisibilityTest extends CamelTestSupport {
- private static final int TIMEOUT = 2; // 2 seconds.
+ private static final int TIMEOUT_IN_SECONDS = 2; // 2 seconds.
private static final String RECEIPT_HANDLE =
"0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5";
@EndpointInject("mock:result")
@@ -46,7 +46,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends
CamelTestSupport {
@Override
public void process(Exchange exchange) throws Exception {
// Simulate message that takes a while to receive.
- Thread.sleep(TIMEOUT * 3000L); // 150% of TIMEOUT.
+ Thread.sleep(TIMEOUT_IN_SECONDS * 1500L); // 150% of TIMEOUT.
}
});
@@ -60,8 +60,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends
CamelTestSupport {
// Wait for message to arrive.
MockEndpoint.assertIsSatisfied(context);
-
assertTrue(this.client.getChangeMessageVisibilityBatchRequests().size() >= 1);
-
assertTrue(this.client.getChangeMessageVisibilityBatchRequests().size() <= 3);
+
assertThat(this.client.getChangeMessageVisibilityBatchRequests()).hasSizeBetween(1,
3);
}
@Override
@@ -69,7 +68,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends
CamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() {
-
from("aws2-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&visibilityTimeout=" +
TIMEOUT
+
from("aws2-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&visibilityTimeout=" +
TIMEOUT_IN_SECONDS
+ "&extendMessageVisibility=true").to("mock:result");
}
};
diff --git
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
index 5e5d8534d46..4566d60cf45 100644
---
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
+++
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
@@ -37,35 +37,12 @@ public class SqsComponentLocalstackIT extends
Aws2SQSBaseTest {
@EndpointInject("mock:result")
private MockEndpoint result;
- @Test
- public void sendInOnly() throws Exception {
- result.expectedMessageCount(1);
-
- Exchange exchange = template.send("direct:start",
ExchangePattern.InOnly, new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("This is my message text.");
- }
- });
-
- MockEndpoint.assertIsSatisfied(context);
-
- Exchange resultExchange = result.getExchanges().get(0);
- assertEquals("This is my message text.",
resultExchange.getIn().getBody());
-
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
-
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE));
- assertEquals("6a1559560f67c5e7a7d5d838bf0272ee",
resultExchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
-
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.ATTRIBUTES));
-
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES));
-
- assertNotNull(exchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
- assertEquals("6a1559560f67c5e7a7d5d838bf0272ee",
exchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
- }
-
@Test
public void sendInOut() throws Exception {
result.expectedMessageCount(1);
Exchange exchange = template.send("direct:start",
ExchangePattern.InOut, new Processor() {
+ @Override
public void process(Exchange exchange) {
exchange.getIn().setBody("This is my message text.");
}
@@ -87,11 +64,10 @@ public class SqsComponentLocalstackIT extends
Aws2SQSBaseTest {
@Override
protected RouteBuilder createRouteBuilder() {
- final String sqsEndpointUri = String
-
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
- sharedNameGenerator.getName(),
- "1209600", "65536", "60",
-
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+ final String sqsEndpointUri = String.format(
+
"aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+ sharedNameGenerator.getName(), "1209600", "65536", "60",
+
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
return new RouteBuilder() {
@Override
diff --git
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentSendInOnlyLocalstackIT.java
similarity index 64%
copy from
components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
copy to
components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentSendInOnlyLocalstackIT.java
index 5e5d8534d46..d5d7d865d92 100644
---
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentLocalstackIT.java
+++
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsComponentSendInOnlyLocalstackIT.java
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-public class SqsComponentLocalstackIT extends Aws2SQSBaseTest {
+public class SqsComponentSendInOnlyLocalstackIT extends Aws2SQSBaseTest {
@EndpointInject("direct:start")
private ProducerTemplate template;
@@ -42,11 +42,11 @@ public class SqsComponentLocalstackIT extends
Aws2SQSBaseTest {
result.expectedMessageCount(1);
Exchange exchange = template.send("direct:start",
ExchangePattern.InOnly, new Processor() {
+ @Override
public void process(Exchange exchange) {
exchange.getIn().setBody("This is my message text.");
}
});
-
MockEndpoint.assertIsSatisfied(context);
Exchange resultExchange = result.getExchanges().get(0);
@@ -61,37 +61,12 @@ public class SqsComponentLocalstackIT extends
Aws2SQSBaseTest {
assertEquals("6a1559560f67c5e7a7d5d838bf0272ee",
exchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
}
- @Test
- public void sendInOut() throws Exception {
- result.expectedMessageCount(1);
-
- Exchange exchange = template.send("direct:start",
ExchangePattern.InOut, new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("This is my message text.");
- }
- });
-
- MockEndpoint.assertIsSatisfied(context);
-
- Exchange resultExchange = result.getExchanges().get(0);
- assertEquals("This is my message text.",
resultExchange.getIn().getBody());
-
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE));
-
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ID));
- assertEquals("6a1559560f67c5e7a7d5d838bf0272ee",
resultExchange.getIn().getHeader(Sqs2Constants.MD5_OF_BODY));
-
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.ATTRIBUTES));
-
assertNotNull(resultExchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES));
-
-
assertNotNull(exchange.getMessage().getHeader(Sqs2Constants.MESSAGE_ID));
- assertEquals("6a1559560f67c5e7a7d5d838bf0272ee",
exchange.getMessage().getHeader(Sqs2Constants.MD5_OF_BODY));
- }
-
@Override
protected RouteBuilder createRouteBuilder() {
- final String sqsEndpointUri = String
-
.format("aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
- sharedNameGenerator.getName(),
- "1209600", "65536", "60",
-
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
+ final String sqsEndpointUri = String.format(
+
"aws2-sqs://%s?messageRetentionPeriod=%s&maximumMessageSize=%s&visibilityTimeout=%s&policy=%s&autoCreateQueue=true",
+ sharedNameGenerator.getName(), "1209600", "65536", "60",
+
"file:src/test/resources/org/apache/camel/component/aws2/sqs/policy.txt");
return new RouteBuilder() {
@Override
diff --git
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendByteArrayLocalstackIT.java
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendByteArrayLocalstackIT.java
index 8e10866644d..93d9fda2d7e 100644
---
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendByteArrayLocalstackIT.java
+++
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/integration/SqsProducerSendByteArrayLocalstackIT.java
@@ -24,11 +24,9 @@ import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Assert;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.bouncycastle.util.Strings;
-@Disabled("Not working due localstack update (Incorrect padding error), it is
working against real SQS")
public class SqsProducerSendByteArrayLocalstackIT extends Aws2SQSBaseTest {
@EndpointInject("direct:start")
@@ -42,6 +40,7 @@ public class SqsProducerSendByteArrayLocalstackIT extends
Aws2SQSBaseTest {
result.expectedMessageCount(1);
Exchange exchange = template.send("direct:start",
ExchangePattern.InOnly, new Processor() {
+ @Override
public void process(Exchange exchange) {
byte[] headerValue = "HeaderTest".getBytes();
exchange.getIn().setHeader("value1", headerValue);
@@ -61,11 +60,11 @@ public class SqsProducerSendByteArrayLocalstackIT extends
Aws2SQSBaseTest {
return new RouteBuilder() {
@Override
public void configure() {
- from("direct:start").startupOrder(2)
- .toF("aws2-sqs://%s?autoCreateQueue=true",
sharedNameGenerator.getName()).to("mock:result");
+
from("direct:start").startupOrder(2).toF("aws2-sqs://%s?autoCreateQueue=true",
sharedNameGenerator.getName())
+ .to("mock:result");
-
fromF("aws2-sqs://%s?deleteAfterRead=true&autoCreateQueue=true",
sharedNameGenerator.getName())
- .startupOrder(1).log("${body}");
+
fromF("aws2-sqs://%s?deleteAfterRead=true&autoCreateQueue=true",
sharedNameGenerator.getName()).startupOrder(1)
+ .log("${body}");
}
};
}