This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 58966a9af30 CAMEL-19358 - changes to address flakiness of
KafkaConsumerFullIT test (#14423)
58966a9af30 is described below
commit 58966a9af30239549c702c880ceaa86f5547d0d0
Author: Jang-Vijay Singh <[email protected]>
AuthorDate: Mon Jun 10 05:56:49 2024 +0100
CAMEL-19358 - changes to address flakiness of KafkaConsumerFullIT test
(#14423)
changes to address flakiness of KafkaConsumerFullIT test:
- unique topic & route names
- ensure topic is actually deleted in @AfterEach
---
.../kafka/integration/KafkaConsumerFullIT.java | 31 +++++++++++++++-------
1 file changed, 21 insertions(+), 10 deletions(-)
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index 318ecc59e22..0957733834c 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.integration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
import org.apache.camel.BindToRegistry;
@@ -33,7 +34,9 @@ import
org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.infra.core.annotations.RouteFixture;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -54,7 +57,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class KafkaConsumerFullIT extends BaseKafkaTestSupport {
- public static final String TOPIC = "test-full-KafkaConsumerFullIT";
//CAMEL-20722: try a more unique name to avoid clash
+ public static final String TOPIC = "test-full-" + Uuid.randomUuid();
//CAMEL-20722: a more unique name to avoid clash
+ public static final String ROUTE = "full-it-" + Uuid.randomUuid();
//CAMEL-20722: a more unique name to avoid clash
private static final Logger LOG =
LoggerFactory.getLogger(KafkaConsumerFullIT.class);
@@ -80,7 +84,14 @@ public class KafkaConsumerFullIT extends
BaseKafkaTestSupport {
producer.close();
}
// clean all test topics
- kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+ DeleteTopicsResult r =
kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+
+ // wait necessary to ensure the topic is actually deleted, and avoid
chance of clash in unrelate tests
+ Awaitility.await()
+ .timeout(60, TimeUnit.SECONDS)
+ .pollDelay(3, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertTrue(r.all().isDone()));
+
}
@RouteFixture
@@ -94,7 +105,7 @@ public class KafkaConsumerFullIT extends
BaseKafkaTestSupport {
public void configure() {
from(FROM_URI)
.process(exchange -> LOG.trace("Captured on the
processor: {}", exchange.getMessage().getBody()))
- .routeId("full-it").to(KafkaTestUtil.MOCK_RESULT);
+ .routeId(ROUTE).to(KafkaTestUtil.MOCK_RESULT);
}
};
}
@@ -174,12 +185,12 @@ public class KafkaConsumerFullIT extends
BaseKafkaTestSupport {
// Restart endpoint
CamelContext context = contextExtension.getContext();
- context.getRouteController().stopRoute("full-it");
+ context.getRouteController().stopRoute(ROUTE);
KafkaEndpoint kafkaEndpoint = (KafkaEndpoint)
context.getEndpoint(FROM_URI);
kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.BEGINNING);
- context.getRouteController().startRoute("full-it");
+ context.getRouteController().startRoute(ROUTE);
// As wee set seek to beginning we should re-consume all messages
to.assertIsSatisfied(3000);
@@ -205,12 +216,12 @@ public class KafkaConsumerFullIT extends
BaseKafkaTestSupport {
// Restart endpoint
CamelContext context = contextExtension.getContext();
- context.getRouteController().stopRoute("full-it");
+ context.getRouteController().stopRoute(ROUTE);
KafkaEndpoint kafkaEndpoint = (KafkaEndpoint)
context.getEndpoint(FROM_URI);
kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.END);
- context.getRouteController().startRoute("full-it");
+ context.getRouteController().startRoute(ROUTE);
to.assertIsSatisfied(3000);
}
@@ -246,17 +257,17 @@ public class KafkaConsumerFullIT extends
BaseKafkaTestSupport {
// suspend route
CamelContext context = contextExtension.getContext();
- context.getRouteController().suspendRoute("full-it");
+ context.getRouteController().suspendRoute(ROUTE);
// wait until the kafka client is really paused
- KafkaConsumer kc = (KafkaConsumer)
context.getRoute("full-it").getConsumer();
+ KafkaConsumer kc = (KafkaConsumer)
context.getRoute(ROUTE).getConsumer();
Awaitility.await().until(() -> {
boolean paused = kc.isKafkaPaused();
LOG.info("Waiting for kafka client to be paused: {}", paused);
return paused;
});
- context.getRouteController().resumeRoute("full-it");
+ context.getRouteController().resumeRoute(ROUTE);
to.reset();