This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f641a9c1a7a Fix EmbeddedKafkaCluster startup/teardown ordering in 
integration tests (#17855)
f641a9c1a7a is described below

commit f641a9c1a7a20a23d69b1b79d607fc5ba85ef2a8
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Mar 11 22:00:02 2026 -0700

    Fix EmbeddedKafkaCluster startup/teardown ordering in integration tests 
(#17855)
    
    * Fix Kafka startup/teardown ordering in integration tests
    
    - Add getKafkaExtraProperties() hook in BaseClusterIntegrationTest for
      subclasses to pass custom Kafka broker config
    - Update EmbeddedKafkaCluster to forward extra config properties to
      KafkaClusterTestKit builder
    - Set log.flush.interval.messages=1 in ExactlyOnceKafka test to ensure
      transactional data is flushed to disk immediately
    - Fix timeout message mismatch (was "60s", actual deadline is 120s)
    - Add retry logic for realtime table creation when Kafka topic metadata
      is not yet available
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * Address review comments: try/finally teardown, SLF4J logging, clear extra 
props
    
    - Wrap all @AfterClass tearDown methods in try/finally with
      FileUtils.deleteQuietly for reliable temp directory cleanup
    - Fix BrokerQueryLimitTest duplicate deleteDirectory and wrong ordering
    - Replace System.err.println with SLF4J LOGGER in ExactlyOnce test
    - Clear _extraConfigProps at start of init() in EmbeddedKafkaCluster
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../sink/PinotSinkUpsertTableIntegrationTest.java  | 23 ++++++++++--
 .../tests/BaseClusterIntegrationTest.java          |  8 +++++
 .../tests/BasePauselessRealtimeIngestionTest.java  |  2 +-
 .../tests/BaseRealtimeClusterIntegrationTest.java  |  5 ++-
 .../integration/tests/BrokerQueryLimitTest.java    | 22 ++++++------
 .../tests/CLPEncodingRealtimeIntegrationTest.java  | 20 +++++++++--
 ...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 42 +++++++++++++---------
 .../tests/NullHandlingIntegrationTest.java         |  5 ++-
 ...PartialUpsertTableRebalanceIntegrationTest.java |  5 ++-
 ...sRealtimeIngestionSegmentCommitFailureTest.java |  3 +-
 .../tests/QueryWorkloadIntegrationTest.java        | 20 ++++++++++-
 .../tests/RetentionManagerIntegrationTest.java     | 19 +++++++++-
 .../tests/StaleSegmentCheckIntegrationTest.java    |  5 +--
 .../TableRebalancePauselessIntegrationTest.java    |  1 +
 .../CustomDataQueryClusterIntegrationTest.java     |  6 ++--
 .../BaseLogicalTableIntegrationTest.java           |  6 ++--
 .../kafka30/server/EmbeddedKafkaCluster.java       | 19 ++++++++--
 17 files changed, 153 insertions(+), 58 deletions(-)

diff --git 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
index 42d7405d3de..642b54dc7d1 100644
--- 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
+++ 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkUpsertTableIntegrationTest.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -52,6 +53,7 @@ import 
org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -77,15 +79,14 @@ public class PinotSinkUpsertTableIntegrationTest extends 
BaseClusterIntegrationT
 
     // Start the Pinot cluster
     startZk();
+    // Start Kafka and push data into Kafka
+    startKafka();
     // Start a customized controller with more frequent realtime segment 
validation
     startController();
     startBroker();
     startServers(2);
     startMinion();
 
-    // Start Kafka and push data into Kafka
-    startKafka();
-
     // Push data to Kafka and set up table
     setupTable(getTableName(), getKafkaTopic(), "gameScores_csv.tar.gz", null);
 
@@ -223,6 +224,22 @@ public class PinotSinkUpsertTableIntegrationTest extends 
BaseClusterIntegrationT
     return tableConfig;
   }
 
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    try {
+      dropRealtimeTable(getTableName());
+      stopMinion();
+      stopServer();
+      stopBroker();
+      stopController();
+      stopKafka();
+      stopZk();
+    } finally {
+      FileUtils.deleteQuietly(_tempDir);
+    }
+  }
+
   @Override
   protected String getSchemaFileName() {
     return "upsert_table_test.schema";
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index c1784b43eea..d27eb3e094d 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -189,6 +189,13 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     return useKafkaTransaction() ? DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS : 
DEFAULT_LLC_NUM_KAFKA_BROKERS;
   }
 
+  /**
+   * Override to pass extra Kafka broker config properties when starting the 
embedded Kafka cluster.
+   */
+  protected Properties getKafkaExtraProperties() {
+    return new Properties();
+  }
+
   protected int getKafkaPort() {
     int idx = RANDOM.nextInt(_kafkaStarters.size());
     return _kafkaStarters.get(idx).getPort();
@@ -817,6 +824,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     int requestedBrokers = getNumKafkaBrokers();
     Properties props = new Properties();
     props.setProperty(EmbeddedKafkaCluster.BROKER_COUNT_PROP, 
Integer.toString(requestedBrokers));
+    props.putAll(getKafkaExtraProperties());
     EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
     cluster.init(props);
     cluster.start();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
index bb91cf3338b..9c3a933c27e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
@@ -100,6 +100,7 @@ public abstract class BasePauselessRealtimeIngestionTest 
extends BaseClusterInte
       throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
     startZk();
+    startKafka();
     startController();
     startBroker();
     startServer();
@@ -113,7 +114,6 @@ public abstract class BasePauselessRealtimeIngestionTest 
extends BaseClusterInte
   protected void setupNonPauselessTable()
       throws Exception {
     _avroFiles = unpackAvroData(_tempDir);
-    startKafka();
     pushAvroIntoKafka(_avroFiles);
 
     Schema schema = createSchema();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
index f7492846fc8..aa6ff7e59c3 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
@@ -48,6 +48,8 @@ public abstract class BaseRealtimeClusterIntegrationTest 
extends BaseClusterInte
 
     // Start the Pinot cluster
     startZk();
+    // Start Kafka
+    startKafka();
     startController();
 
     HelixConfigScope scope =
@@ -63,9 +65,6 @@ public abstract class BaseRealtimeClusterIntegrationTest 
extends BaseClusterInte
     startBroker();
     startServer();
 
-    // Start Kafka
-    startKafka();
-
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
index 068b8c1cf00..59f816140eb 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BrokerQueryLimitTest.java
@@ -167,18 +167,16 @@ public class BrokerQueryLimitTest extends 
BaseClusterIntegrationTest {
   public void tearDown()
       throws Exception {
     LOGGER.warn("Tearing down integration test class: {}", 
getClass().getSimpleName());
-    dropOfflineTable(getTableName());
-    FileUtils.deleteDirectory(_tempDir);
-
-    // Stop Kafka
-    LOGGER.warn("Stop Kafka in the integration test class");
-    stopKafka();
-    // Shutdown the Pinot cluster
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-    FileUtils.deleteDirectory(_tempDir);
+    try {
+      dropOfflineTable(getTableName());
+      stopServer();
+      stopBroker();
+      stopController();
+      stopKafka();
+      stopZk();
+    } finally {
+      FileUtils.deleteQuietly(_tempDir);
+    }
     LOGGER.warn("Finished tearing down integration test class: {}", 
getClass().getSimpleName());
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
index dab36f2c1a9..e9f409bfe17 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -30,6 +31,7 @@ import 
org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -50,12 +52,11 @@ public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTe
 
     // Start the Pinot cluster
     startZk();
+    startKafka();
     // Start a customized controller with more frequent realtime segment 
validation
     startController();
     startBroker();
     startServer();
-
-    startKafka();
     pushAvroIntoKafka(_avroFiles);
 
     Schema schema = createSchema();
@@ -103,6 +104,21 @@ public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTe
         .getLong(0), 53);
   }
 
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    try {
+      dropRealtimeTable(getTableName());
+      stopServer();
+      stopBroker();
+      stopController();
+      stopKafka();
+      stopZk();
+    } finally {
+      FileUtils.deleteQuietly(_tempDir);
+    }
+  }
+
   protected int getRealtimeSegmentFlushSize() {
     return 30;
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
index b5c4fe12979..9aff311ed91 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
@@ -45,9 +45,12 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ExactlyOnceKafkaRealtimeClusterIntegrationTest.class);
   private static final int REALTIME_TABLE_CONFIG_RETRY_COUNT = 5;
   private static final long REALTIME_TABLE_CONFIG_RETRY_WAIT_MS = 1_000L;
   private static final long KAFKA_TOPIC_METADATA_READY_TIMEOUT_MS = 30_000L;
@@ -80,6 +83,13 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest 
extends BaseRealtime
     return true;
   }
 
+  @Override
+  protected Properties getKafkaExtraProperties() {
+    Properties props = new Properties();
+    props.setProperty("log.flush.interval.messages", "1");
+    return props;
+  }
+
   @Override
   protected int getNumKafkaBrokers() {
     return DEFAULT_TRANSACTION_NUM_KAFKA_BROKERS;
@@ -94,9 +104,8 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest 
extends BaseRealtime
   protected void pushAvroIntoKafka(List<File> avroFiles)
       throws Exception {
     String kafkaBrokerList = getKafkaBrokerList();
-    // Use System.err for diagnostics - log4j2 console appender is filtered to 
ERROR in CI
-    System.err.println("[ExactlyOnce] Pushing transactional data to Kafka at: 
" + kafkaBrokerList);
-    System.err.println("[ExactlyOnce] Avro files count: " + avroFiles.size());
+    LOGGER.info("Pushing transactional data to Kafka at: {}", kafkaBrokerList);
+    LOGGER.info("Avro files count: {}", avroFiles.size());
 
     Properties producerProps = new Properties();
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaBrokerList);
@@ -115,19 +124,19 @@ public class 
ExactlyOnceKafkaRealtimeClusterIntegrationTest extends BaseRealtime
     // transaction operations until the abort is fully done (markers written).
     try (KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps)) {
       producer.initTransactions();
-      System.err.println("[ExactlyOnce] initTransactions() succeeded");
+      LOGGER.info("initTransactions() succeeded");
 
       // Transaction 1: aborted batch
       long abortedCount = pushAvroRecords(producer, avroFiles, false);
-      System.err.println("[ExactlyOnce] Aborted batch: " + abortedCount + " 
records");
+      LOGGER.info("Aborted batch: {} records", abortedCount);
 
       // Transaction 2: committed batch
       long committedCount = pushAvroRecords(producer, avroFiles, true);
-      System.err.println("[ExactlyOnce] Committed batch: " + committedCount + 
" records");
+      LOGGER.info("Committed batch: {} records", committedCount);
     }
 
     // After producer is closed, verify data visibility with independent 
consumers
-    System.err.println("[ExactlyOnce] Producer closed. Verifying data 
visibility...");
+    LOGGER.info("Producer closed. Verifying data visibility...");
     waitForCommittedRecordsVisible(kafkaBrokerList);
   }
 
@@ -136,7 +145,7 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest 
extends BaseRealtime
    * This ensures transaction markers have been fully propagated before 
returning.
    */
   private void waitForCommittedRecordsVisible(String brokerList) {
-    long deadline = System.currentTimeMillis() + 60_000L;
+    long deadline = System.currentTimeMillis() + 120_000L;
     int lastCommitted = 0;
     int lastUncommitted = 0;
     int iteration = 0;
@@ -145,15 +154,14 @@ public class 
ExactlyOnceKafkaRealtimeClusterIntegrationTest extends BaseRealtime
       iteration++;
       lastCommitted = countRecords(brokerList, "read_committed");
       if (lastCommitted > 0) {
-        System.err.println("[ExactlyOnce] Verification OK: read_committed=" + 
lastCommitted
-            + " after " + iteration + " iterations");
+        LOGGER.info("Verification OK: read_committed={} after {} iterations", 
lastCommitted, iteration);
         return;
       }
       // Check if data reached Kafka at all
       if (iteration == 1 || iteration % 5 == 0) {
         lastUncommitted = countRecords(brokerList, "read_uncommitted");
-        System.err.println("[ExactlyOnce] Verification iteration " + iteration
-            + ": read_committed=" + lastCommitted + ", read_uncommitted=" + 
lastUncommitted);
+        LOGGER.info("Verification iteration {}: read_committed={}, 
read_uncommitted={}", iteration, lastCommitted,
+            lastUncommitted);
       }
       try {
         Thread.sleep(2_000L);
@@ -165,9 +173,9 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest 
extends BaseRealtime
 
     // Final diagnostic dump
     lastUncommitted = countRecords(brokerList, "read_uncommitted");
-    System.err.println("[ExactlyOnce] VERIFICATION FAILED after 60s: 
read_committed=" + lastCommitted
-        + ", read_uncommitted=" + lastUncommitted);
-    throw new AssertionError("[ExactlyOnce] Transaction markers were not 
propagated within 60s; "
+    LOGGER.error("VERIFICATION FAILED after 120s: read_committed={}, 
read_uncommitted={}", lastCommitted,
+        lastUncommitted);
+    throw new AssertionError("[ExactlyOnce] Transaction markers were not 
propagated within 120s; "
         + "committed records are not visible to read_committed consumers. "
         + "read_committed=" + lastCommitted + ", read_uncommitted=" + 
lastUncommitted);
   }
@@ -251,7 +259,7 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest 
extends BaseRealtime
     try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
       List<PartitionInfo> partitions = consumer.partitionsFor(getKafkaTopic(), 
Duration.ofSeconds(10));
       if (partitions == null || partitions.isEmpty()) {
-        System.err.println("[ExactlyOnce] No partitions found for topic " + 
getKafkaTopic());
+        LOGGER.warn("No partitions found for topic {}", getKafkaTopic());
         return 0;
       }
       for (PartitionInfo pi : partitions) {
@@ -270,7 +278,7 @@ public class ExactlyOnceKafkaRealtimeClusterIntegrationTest 
extends BaseRealtime
         totalRecords += partitionRecords;
       }
     } catch (Exception e) {
-      System.err.println("[ExactlyOnce] Error counting records with " + 
isolationLevel + ": " + e.getMessage());
+      LOGGER.error("Error counting records with {}: {}", isolationLevel, 
e.getMessage());
     }
     return totalRecords;
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index c25af339add..f0375a74ca0 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -51,13 +51,12 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     // Start the Pinot cluster
     startZk();
+    // Start Kafka
+    startKafka();
     startController();
     startBroker();
     startServer();
 
-    // Start Kafka
-    startKafka();
-
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 70aa705dff2..e3bbde37258 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -83,13 +83,12 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
 
     startZk();
+    // Start Kafka
+    startKafka();
     startController();
     startBroker();
     startServers(NUM_SERVERS);
 
-    // Start Kafka and push data into Kafka
-    startKafka();
-
     _resourceManager = _controllerStarter.getHelixResourceManager();
     _tableRebalancer = new 
TableRebalancer(_resourceManager.getHelixZkManager());
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
index 1cd5842fbd9..23c9bcc086d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
@@ -92,6 +92,8 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
 
     // Start the Pinot cluster
     startZk();
+    // Start Kafka
+    startKafka();
     // Start a customized controller with more frequent realtime segment 
validation
     startController();
     startBroker();
@@ -99,7 +101,6 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
 
     // load data in kafka
     List<File> avroFiles = unpackAvroData(_tempDir);
-    startKafka();
     pushAvroIntoKafka(avroFiles);
 
     setMaxSegmentCompletionTimeMillis();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
index ce175b81e8e..00db1afc4fd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryWorkloadIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -43,6 +44,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -72,10 +74,10 @@ public class QueryWorkloadIntegrationTest extends 
BaseClusterIntegrationTest {
 
     // Start Zk, Kafka and Pinot
     startZk();
+    startKafka();
     startController();
     startBroker();
     startServer();
-    startKafka();
 
     List<File> avroFiles = getAllAvroFiles();
     List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles, 
NUM_OFFLINE_SEGMENTS);
@@ -115,6 +117,22 @@ public class QueryWorkloadIntegrationTest extends 
BaseClusterIntegrationTest {
     waitForAllDocsLoaded(100_000L);
   }
 
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    try {
+      dropOfflineTable(getTableName());
+      dropRealtimeTable(getTableName());
+      stopServer();
+      stopBroker();
+      stopController();
+      stopKafka();
+      stopZk();
+    } finally {
+      FileUtils.deleteQuietly(_tempDir);
+    }
+  }
+
   // TODO: Expand tests to cover more scenarios for workload enforcement
   @Test
   public void testQueryWorkloadConfig() throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
index d1b229dbb15..cc14afc1b5a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RetentionManagerIntegrationTest.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig;
@@ -36,6 +37,7 @@ import org.apache.pinot.util.TestUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -71,6 +73,7 @@ public class RetentionManagerIntegrationTest extends 
BaseClusterIntegrationTest
       throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
     startZk();
+    startKafka();
     startController();
     startBroker();
     startServer();
@@ -81,7 +84,6 @@ public class RetentionManagerIntegrationTest extends 
BaseClusterIntegrationTest
   protected void setupTable()
       throws Exception {
     _avroFiles = unpackAvroData(_tempDir);
-    startKafka();
     pushAvroIntoKafka(_avroFiles);
 
     Schema schema = createSchema();
@@ -95,6 +97,21 @@ public class RetentionManagerIntegrationTest extends 
BaseClusterIntegrationTest
     waitForDocsLoaded(600_000L, true, tableConfig.getTableName());
   }
 
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    try {
+      dropRealtimeTable(getTableName());
+      stopServer();
+      stopBroker();
+      stopController();
+      stopKafka();
+      stopZk();
+    } finally {
+      FileUtils.deleteQuietly(_tempDir);
+    }
+  }
+
   @Test
   public void testClusterConfigChangeListener()
       throws IOException, URISyntaxException {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
index c2fbd83cb94..c1045bae7b4 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StaleSegmentCheckIntegrationTest.java
@@ -73,12 +73,12 @@ public class StaleSegmentCheckIntegrationTest extends 
BaseClusterIntegrationTest
 
     // Start the Pinot cluster
     startZk();
+    // Start Kafka
+    startKafka();
     startController();
     startBroker();
     startServer();
     startMinion();
-    // Start Kafka
-    startKafka();
 
     _taskManager = _controllerStarter.getTaskManager();
     _taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
@@ -193,6 +193,7 @@ public class StaleSegmentCheckIntegrationTest extends 
BaseClusterIntegrationTest
       stopServer();
       stopBroker();
       stopController();
+      stopKafka();
       stopZk();
     } finally {
       FileUtils.deleteQuietly(_tempDir);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java
index 945fac33506..5a72a3c1e22 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java
@@ -73,6 +73,7 @@ public class TableRebalancePauselessIntegrationTest extends 
BasePauselessRealtim
       throws Exception {
     TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
     startZk();
+    startKafka();
     startController();
     startBroker();
     startServer();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
index 5714057a7a8..739214cfa5c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CustomDataQueryClusterIntegrationTest.java
@@ -85,13 +85,13 @@ public abstract class CustomDataQueryClusterIntegrationTest 
extends BaseClusterI
   public void tearDownSuite()
       throws Exception {
     LOGGER.warn("Tearing down integration test suite");
-    // Stop Kafka
-    LOGGER.warn("Stop Kafka in the integration test suite");
-    stopKafka();
     // Shutdown the Pinot cluster
     stopServer();
     stopBroker();
     stopController();
+    // Stop Kafka
+    LOGGER.warn("Stop Kafka in the integration test suite");
+    stopKafka();
     stopZk();
     FileUtils.deleteDirectory(_tempDir);
     LOGGER.warn("Finished tearing down integration test suite");
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
index b06a6d1481b..877e1b25489 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
@@ -94,13 +94,13 @@ public abstract class BaseLogicalTableIntegrationTest 
extends BaseClusterIntegra
   public void tearDownSuite()
       throws Exception {
     LOGGER.info("Tearing down integration test suite");
-    // Stop Kafka
-    LOGGER.info("Stop Kafka in the integration test suite");
-    stopKafka();
     // Shutdown the Pinot cluster
     stopServer();
     stopBroker();
     stopController();
+    // Stop Kafka
+    LOGGER.info("Stop Kafka in the integration test suite");
+    stopKafka();
     stopZk();
     FileUtils.deleteDirectory(_tempDir);
     LOGGER.info("Finished tearing down integration test suite");
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
index 56b3fe8eef2..a8296c172ba 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
@@ -46,12 +46,20 @@ public class EmbeddedKafkaCluster implements 
StreamDataServerStartable {
   private static final int TOPIC_MUTATION_RETRIES = 5;
 
   private int _brokerCount = 1;
+  private final Properties _extraConfigProps = new Properties();
   private KafkaClusterTestKit _cluster;
   private String _bootstrapServers;
 
   @Override
   public void init(Properties props) {
     _brokerCount = Integer.parseInt(props.getProperty(BROKER_COUNT_PROP, "1"));
+    _extraConfigProps.clear();
+    // Forward any additional properties (excluding our internal ones) as 
Kafka broker config
+    for (String key : props.stringPropertyNames()) {
+      if (!key.equals(BROKER_COUNT_PROP)) {
+        _extraConfigProps.setProperty(key, props.getProperty(key));
+      }
+    }
   }
 
   @Override
@@ -67,15 +75,20 @@ public class EmbeddedKafkaCluster implements 
StreamDataServerStartable {
           .setBootstrapMetadataVersion(MetadataVersion.latestProduction())
           .build();
 
-      _cluster = new KafkaClusterTestKit.Builder(nodes)
+      KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(nodes)
           .setConfigProp("offsets.topic.replication.factor", 
String.valueOf(replicationFactor))
           .setConfigProp("offsets.topic.num.partitions", "1")
           .setConfigProp("transaction.state.log.replication.factor", 
String.valueOf(replicationFactor))
           .setConfigProp("transaction.state.log.min.isr", "1")
           .setConfigProp("transaction.state.log.num.partitions", "1")
-          .setConfigProp("group.initial.rebalance.delay.ms", "0")
+          .setConfigProp("group.initial.rebalance.delay.ms", "0");
 
-          .build();
+      // Apply any extra config properties passed via init()
+      for (String key : _extraConfigProps.stringPropertyNames()) {
+        builder.setConfigProp(key, _extraConfigProps.getProperty(key));
+      }
+
+      _cluster = builder.build();
 
       _cluster.format();
       _cluster.startup();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to