This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 28e1abba16c52b0b8d234ccc92c18110ca96e04a Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Thu Aug 4 09:12:38 2022 +0200 (chores) camel-kafka: ensure tests run a little bit faster This ensures that the polls happen multiple times within the test execution window and also that the session timeout does not exceed it. --- .../processor/resume/kafka/MultiNodeKafkaResumeStrategy.java | 11 ----------- .../kafka/integration/KafkaConsumerAsyncManualCommitIT.java | 2 +- .../kafka/integration/KafkaConsumerBadPortHealthCheckIT.java | 2 +- .../KafkaConsumerBadPortSupervisingHealthCheckIT.java | 2 +- .../component/kafka/integration/KafkaConsumerFullIT.java | 2 +- .../kafka/integration/KafkaConsumerHealthCheckIT.java | 2 +- .../kafka/integration/KafkaConsumerIdempotentIT.java | 2 +- .../KafkaConsumerIdempotentWithCustomSerializerIT.java | 2 +- .../integration/KafkaConsumerIdempotentWithProcessorIT.java | 2 +- .../kafka/integration/KafkaConsumerTopicIsPatternIT.java | 5 +++-- .../integration/KafkaConsumerUnresolvableHealthCheckIT.java | 2 +- .../kafka/integration/commit/KafkaConsumerNoopCommitIT.java | 2 +- .../kafka/integration/commit/KafkaConsumerSyncCommitIT.java | 2 +- .../pause/KafkaPausableConsumerCircuitBreakerIT.java | 2 +- .../kafka/integration/pause/KafkaPausableConsumerIT.java | 2 +- 15 files changed, 16 insertions(+), 26 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java index 82580200f03..fd76c7a6046 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java @@ -17,20 +17,10 @@ package org.apache.camel.processor.resume.kafka; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Properties; -import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.camel.resume.Deserializable; import org.apache.camel.resume.Resumable; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,5 +57,4 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod super(resumeStrategyConfiguration, executorService); } - } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java index bfda5a5a5e8..6b94fb6454a 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java @@ -55,7 +55,7 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class); @EndpointInject("kafka:" + TOPIC - + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false" + + "?groupId=KafkaConsumerAsyncManualCommitIT&pollTimeoutMs=1000&autoCommitEnable=false" + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory") private Endpoint from; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java index 0ca75d0b2b4..c9a8402cf66 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java @@ -72,7 +72,7 @@ public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport { @EndpointInject("kafka:" + TOPIC + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; @EndpointInject("mock:result") private MockEndpoint to; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java index 31727cae3fd..d210fe298ad 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java @@ -74,7 +74,7 @@ public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSuppo @EndpointInject("kafka:" + TOPIC + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + + "&autoCommitIntervalMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; @EndpointInject("mock:result") private MockEndpoint to; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java index 46e304d0613..9b857342181 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java @@ -59,7 +59,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport { @EndpointInject("kafka:" + TOPIC + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; @EndpointInject("mock:result") diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java index bfac711c265..74fbc8fdbf6 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java @@ -79,7 +79,7 @@ public class KafkaConsumerHealthCheckIT extends CamelTestSupport { @EndpointInject("kafka:" + TOPIC + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; @EndpointInject("mock:result") private MockEndpoint to; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java index 6ad1dba93ce..36de458fcfe 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java @@ -45,7 +45,7 @@ public class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSuppor + "?groupId=group2&autoOffsetReset=earliest" + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true" + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true" + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java index 2e80a8d23b7..2095aafdc0f 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java @@ -41,7 +41,7 @@ public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumer + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + "&headerDeserializer=#class:org.apache.camel.component.kafka.integration.CustomHeaderDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true" + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true" + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java index 4a5f9ff685f..d68eddfa296 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java @@ -40,7 +40,7 @@ public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempot + "?groupId=group2&autoOffsetReset=earliest" + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true" + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true" + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java index a178095e280..bbae0f74910 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java @@ -38,8 +38,9 @@ public class KafkaConsumerTopicIsPatternIT extends BaseEmbeddedKafkaTestSupport public static final String TOPIC = "vess123d"; public static final String TOPIC_PATTERN = "v.*d"; - @EndpointInject("kafka:" + TOPIC_PATTERN + "?topicIsPattern=true&groupId=group1&autoOffsetReset=earliest" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor&metadataMaxAgeMs=1000") + @EndpointInject("kafka:" + TOPIC_PATTERN + + "?topicIsPattern=true&groupId=KafkaConsumerTopicIsPatternIT&autoOffsetReset=earliest" + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor&metadataMaxAgeMs=1000") private Endpoint from; @EndpointInject("mock:result") diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java index c0395ec7f6c..5f34c34997c 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java @@ -72,7 +72,7 @@ public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport { @EndpointInject("kafka:" + TOPIC + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; @EndpointInject("mock:result") private MockEndpoint to; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java index a1daccb40df..d69387a6460 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java @@ -35,7 +35,7 @@ public class KafkaConsumerNoopCommitIT extends BaseManualCommitTestSupport { public static final String TOPIC = "testManualNoopCommitTest"; @EndpointInject("kafka:" + TOPIC - + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false" + + "?groupId=KafkaConsumerNoopCommitIT&pollTimeoutMs=1000&autoCommitEnable=false" + "&allowManualCommit=true&autoOffsetReset=earliest&metadataMaxAgeMs=1000") private Endpoint from; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java index a6e4cddb173..e9967641009 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java @@ -34,7 +34,7 @@ public class KafkaConsumerSyncCommitIT extends BaseManualCommitTestSupport { public static final String TOPIC = "testManualCommitSyncTest"; @EndpointInject("kafka:" + TOPIC - + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false" + + "?groupId=KafkaConsumerSyncCommitIT&pollTimeoutMs=1000&autoCommitEnable=false" + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory") private Endpoint from; diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java index 89501aa7c5e..1b813178602 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java @@ -63,7 +63,7 @@ public class KafkaPausableConsumerCircuitBreakerIT extends BaseEmbeddedKafkaTest @EndpointInject("kafka:" + SOURCE_TOPIC + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; @EndpointInject("direct:intermediate") diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java index 369bd194e9f..a30f21a252b 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java @@ -81,7 +81,7 @@ public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport { @EndpointInject("kafka:" + SOURCE_TOPIC + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" - + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") + + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; @EndpointInject("direct:intermediate")
