This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 3b649727ef [Improve][E2E] improve kafka e2e (#8295) 3b649727ef is described below commit 3b649727efa5f6ff305f4463fa78ca21c57313b8 Author: zhangdonghao <39961809+hawk9...@users.noreply.github.com> AuthorDate: Sun Dec 15 10:30:35 2024 +0800 [Improve][E2E] improve kafka e2e (#8295) --- .../org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index f9483fd65f..b199a2848a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -75,7 +75,6 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.DockerLoggerFactory; @@ -104,7 +103,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; -import static org.awaitility.Awaitility.await; +import static org.testcontainers.shaded.org.awaitility.Awaitility.given; @Slf4j public class KafkaIT extends TestSuiteBase implements TestResource { @@ -132,8 +131,7 @@ public class KafkaIT extends TestSuiteBase implements TestResource { DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); Startables.deepStart(Stream.of(kafkaContainer)).join(); log.info("Kafka container started"); - Awaitility.given() - .ignoreExceptions() + given().ignoreExceptions() .atLeast(100, TimeUnit.MILLISECONDS) .pollInterval(500, TimeUnit.MILLISECONDS) .atMost(180, TimeUnit.SECONDS) @@ -789,11 +787,12 @@ public class KafkaIT extends TestSuiteBase implements TestResource { } return null; }); - TimeUnit.MINUTES.sleep(5); // wait for data written to kafka Long finalEndOffset = endOffset; - await().atMost(5, TimeUnit.MINUTES) - .pollInterval(5000, TimeUnit.MILLISECONDS) + given().pollDelay(30, TimeUnit.SECONDS) + .pollInterval(5, TimeUnit.SECONDS) + .await() + .atMost(5, TimeUnit.MINUTES) .untilAsserted( () -> Assertions.assertTrue(