This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b2806197e54 Replace Docker-based Kafka with embedded KRaft broker for 
integration tests (#17790)
b2806197e54 is described below

commit b2806197e54234a25826eb640ef2233041682508
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Mar 7 23:21:58 2026 -0800

    Replace Docker-based Kafka with embedded KRaft broker for integration tests 
(#17790)
    
    - Replace KafkaServerStartable (Docker CLI-based) with EmbeddedKafkaCluster
      using Kafka's KafkaClusterTestKit in KRaft mode for integration tests
    - Eliminate Docker dependency, improving startup speed and removing CI 
flakiness
    - Add essential embedded Kafka broker configs for transactions
    - Rewrite ExactlyOnce test to use single KafkaProducer for both abort and
      commit transactions, avoiding transaction marker race conditions
    - Fix PartialUpsertTableRebalanceIntegrationTest with retry logic for
      embedded Kafka topic metadata propagation delays
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 pinot-integration-test-base/pom.xml                |  36 ++++
 .../tests/BaseClusterIntegrationTest.java          | 222 ++++-----------------
 pinot-integration-tests/pom.xml                    |  38 ++++
 ...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 203 ++++++++++++++++++-
 ...PartialUpsertTableRebalanceIntegrationTest.java |  44 ++--
 .../pinot-stream-ingestion/pinot-kafka-3.0/pom.xml |  32 +++
 .../kafka30/KafkaPartitionLevelConsumerTest.java   |  22 +-
 .../kafka30/server/EmbeddedKafkaCluster.java       | 220 ++++++++++++++++++++
 pom.xml                                            |  32 +++
 9 files changed, 618 insertions(+), 231 deletions(-)

diff --git a/pinot-integration-test-base/pom.xml 
b/pinot-integration-test-base/pom.xml
index b785d421101..1bc17243f51 100644
--- a/pinot-integration-test-base/pom.xml
+++ b/pinot-integration-test-base/pom.xml
@@ -90,6 +90,42 @@
       <type>test-jar</type>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-kafka-3.0</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.compat.version}</artifactId>
+      <classifier>test</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-server-common</artifactId>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index ca59d7f33dd..c1784b43eea 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -24,7 +24,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.ServerSocket;
 import java.net.URL;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -41,7 +40,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
@@ -69,7 +67,7 @@ import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder;
 import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.plugin.stream.kafka30.server.KafkaServerStartable;
+import org.apache.pinot.plugin.stream.kafka30.server.EmbeddedKafkaCluster;
 import org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.DedupConfig;
@@ -125,8 +123,6 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   protected static final int DEFAULT_LLC_NUM_KAFKA_BROKERS = 2;
   protected static final int DEFAULT_LLC_NUM_KAFKA_PARTITIONS = 2;
   protected static final int DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH = 10000;
-  private static final int KAFKA_START_MAX_ATTEMPTS = 3;
-  private static final long KAFKA_START_RETRY_WAIT_MS = 2_000L;
   private static final long KAFKA_CLUSTER_READY_TIMEOUT_MS = 120_000L;
   private static final long KAFKA_TOPIC_READY_TIMEOUT_MS = 120_000L;
   protected static final List<String> DEFAULT_NO_DICTIONARY_COLUMNS =
@@ -142,7 +138,6 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
   protected List<StreamDataServerStartable> _kafkaStarters;
-  private List<KafkaBrokerConfig> _kafkaBrokerConfigs;
 
   protected org.apache.pinot.client.Connection _pinotConnection;
   protected org.apache.pinot.client.Connection _pinotConnectionV2;
@@ -427,6 +422,10 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   }
 
   protected String getKafkaBrokerList() {
+    StreamDataServerStartable firstStarter = _kafkaStarters.get(0);
+    if (firstStarter instanceof EmbeddedKafkaCluster) {
+      return ((EmbeddedKafkaCluster) firstStarter).bootstrapServers();
+    }
     StringBuilder builder = new StringBuilder();
     for (int i = 0; i < _kafkaStarters.size(); i++) {
       if (i > 0) {
@@ -816,107 +815,22 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
 
   protected void startKafkaWithoutTopic() {
     int requestedBrokers = getNumKafkaBrokers();
-    List<KafkaBrokerConfig> brokerConfigs = 
getOrCreateKafkaBrokerConfigs(requestedBrokers);
-    Throwable lastFailure = null;
-    for (int attempt = 1; attempt <= KAFKA_START_MAX_ATTEMPTS; attempt++) {
-      String clusterId = UUID.randomUUID().toString().replace("-", "");
-      String networkName = "pinot-it-kafka-" + 
UUID.randomUUID().toString().replace("-", "");
-      String quorumVoters = brokerConfigs.stream()
-          .map(config -> config._brokerId + "@" + config._containerName + 
":9093")
-          .collect(Collectors.joining(","));
-
-      List<StreamDataServerStartable> kafkaStarters = new 
ArrayList<>(requestedBrokers);
+    Properties props = new Properties();
+    props.setProperty(EmbeddedKafkaCluster.BROKER_COUNT_PROP, 
Integer.toString(requestedBrokers));
+    EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
+    cluster.init(props);
+    cluster.start();
+    _kafkaStarters = Collections.singletonList(cluster);
+    try {
+      waitForKafkaClusterReady(getKafkaBrokerList(), requestedBrokers, 
useKafkaTransaction());
+    } catch (RuntimeException e) {
       try {
-        for (KafkaBrokerConfig brokerConfig : brokerConfigs) {
-          StreamDataServerStartable kafkaStarter =
-              createKafkaServerStarter(brokerConfig, clusterId, networkName, 
quorumVoters, requestedBrokers);
-          kafkaStarter.start();
-          kafkaStarters.add(kafkaStarter);
-        }
-        _kafkaStarters = kafkaStarters;
-        waitForKafkaClusterReady(getKafkaBrokerList(), requestedBrokers, 
useKafkaTransaction());
-        if (attempt > 1) {
-          LOGGER.info("Kafka startup succeeded on retry attempt {}/{}", 
attempt, KAFKA_START_MAX_ATTEMPTS);
-        }
-        return;
-      } catch (Throwable t) {
-        if (t instanceof Error && !(t instanceof AssertionError)) {
-          throw (Error) t;
-        }
-
-        lastFailure = t;
-        LOGGER.warn("Kafka startup attempt {}/{} failed; stopping started 
brokers before retry", attempt,
-            KAFKA_START_MAX_ATTEMPTS, t);
-        _kafkaStarters = kafkaStarters;
-        try {
-          stopKafka();
-        } catch (RuntimeException stopException) {
-          LOGGER.warn("Kafka cleanup failed after startup attempt {}/{}", 
attempt, KAFKA_START_MAX_ATTEMPTS,
-              stopException);
-          t.addSuppressed(stopException);
-        }
-
-        if (attempt < KAFKA_START_MAX_ATTEMPTS) {
-          try {
-            Thread.sleep(KAFKA_START_RETRY_WAIT_MS);
-          } catch (InterruptedException interruptedException) {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException("Interrupted while waiting to retry 
Kafka startup", interruptedException);
-          }
-        }
+        cluster.stop();
+      } catch (Exception suppressed) {
+        e.addSuppressed(suppressed);
       }
-    }
-
-    _kafkaBrokerConfigs = null;
-    throw new RuntimeException("Failed to start Kafka cluster after " + 
KAFKA_START_MAX_ATTEMPTS + " attempts",
-        lastFailure);
-  }
-
-  private List<KafkaBrokerConfig> getOrCreateKafkaBrokerConfigs(int 
brokerCount) {
-    if (_kafkaBrokerConfigs != null && _kafkaBrokerConfigs.size() == 
brokerCount) {
-      return _kafkaBrokerConfigs;
-    }
-    _kafkaBrokerConfigs = createKafkaBrokerConfigs(brokerCount);
-    return _kafkaBrokerConfigs;
-  }
-
-  private StreamDataServerStartable createKafkaServerStarter(KafkaBrokerConfig 
brokerConfig, String clusterId,
-      String networkName, String quorumVoters, int clusterSize) {
-    Properties serverProperties = new Properties();
-    serverProperties.put("kafka.server.owner.name", 
getClass().getSimpleName());
-    serverProperties.put("kafka.server.bootstrap.servers", "localhost:" + 
brokerConfig._port);
-    serverProperties.put("kafka.server.port", 
Integer.toString(brokerConfig._port));
-    serverProperties.put("kafka.server.broker.id", 
Integer.toString(brokerConfig._brokerId));
-    serverProperties.put("kafka.server.allow.managed.for.configured.broker", 
"true");
-    serverProperties.put("kafka.server.container.name", 
brokerConfig._containerName);
-    serverProperties.put("kafka.server.network.name", networkName);
-    serverProperties.put("kafka.server.cluster.id", clusterId);
-    serverProperties.put("kafka.server.cluster.size", 
Integer.toString(clusterSize));
-    serverProperties.put("kafka.server.controller.quorum.voters", 
quorumVoters);
-    serverProperties.put("kafka.server.internal.host", 
brokerConfig._containerName);
-    serverProperties.put("kafka.server.skip.readiness.check", "true");
-    KafkaServerStartable kafkaServerStartable = new KafkaServerStartable();
-    kafkaServerStartable.init(serverProperties);
-    return kafkaServerStartable;
-  }
-
-  private List<KafkaBrokerConfig> createKafkaBrokerConfigs(int brokerCount) {
-    String containerPrefix = "pinot-it-kafka-" + 
UUID.randomUUID().toString().replace("-", "");
-    List<KafkaBrokerConfig> brokerConfigs = new ArrayList<>(brokerCount);
-    for (int i = 0; i < brokerCount; i++) {
-      int brokerId = i + 1;
-      int port = getAvailablePort();
-      String containerName = containerPrefix + "-" + brokerId;
-      brokerConfigs.add(new KafkaBrokerConfig(brokerId, port, containerName));
-    }
-    return brokerConfigs;
-  }
-
-  private static int getAvailablePort() {
-    try (ServerSocket socket = new ServerSocket(0)) {
-      return socket.getLocalPort();
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to find an available port for Kafka", 
e);
+      _kafkaStarters = null;
+      throw e;
     }
   }
 
@@ -961,20 +875,19 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   }
 
   protected void createKafkaTopic(String topic) {
-    int replicationFactor = Math.max(1, Math.min(getNumKafkaBrokers(), 
_kafkaStarters.size()));
-    createKafkaTopic(topic, getNumKafkaPartitions(), replicationFactor);
+    createKafkaTopic(topic, getNumKafkaPartitions(), getNumKafkaBrokers());
   }
 
   protected void createKafkaTopic(String topic, int numPartitions) {
-    createKafkaTopic(topic, numPartitions, Math.max(1, 
Math.min(getNumKafkaBrokers(), _kafkaStarters.size())));
+    createKafkaTopic(topic, numPartitions, getNumKafkaBrokers());
   }
 
   protected void createKafkaTopic(String topic, int numPartitions, int 
replicationFactor) {
-    int expectedReplicationFactor = Math.max(1, Math.min(replicationFactor, 
_kafkaStarters.size()));
+    int effectiveReplicationFactor = Math.max(1, Math.min(replicationFactor, 
getNumKafkaBrokers()));
     _kafkaStarters.get(0).createTopic(
         topic,
-        KafkaStarterUtils.getTopicCreationProps(numPartitions, 
expectedReplicationFactor));
-    waitForKafkaTopicReady(topic, numPartitions, expectedReplicationFactor);
+        KafkaStarterUtils.getTopicCreationProps(numPartitions, 
effectiveReplicationFactor));
+    waitForKafkaTopicReady(topic, numPartitions, effectiveReplicationFactor);
     waitForKafkaTopicMetadataReadyForConsumer(topic, numPartitions);
   }
 
@@ -1016,13 +929,7 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     if (_kafkaStarters == null || _kafkaStarters.isEmpty()) {
       return false;
     }
-    for (StreamDataServerStartable kafkaStarter : _kafkaStarters) {
-      if (!isKafkaTopicReadyOnBroker("localhost:" + kafkaStarter.getPort(), 
topic, expectedPartitions,
-          expectedReplicationFactor)) {
-        return false;
-      }
-    }
-    return true;
+    return isKafkaTopicReadyOnBroker(getKafkaBrokerList(), topic, 
expectedPartitions, expectedReplicationFactor);
   }
 
   private boolean isKafkaTopicMetadataReadyForConsumer(String topic, int 
expectedPartitions) {
@@ -1069,74 +976,27 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
   }
 
   protected void stopKafka() {
-    if (_kafkaStarters == null || _kafkaStarters.isEmpty()) {
-      return;
-    }
-    List<StreamDataServerStartable> kafkaStarters = _kafkaStarters;
-    _kafkaStarters = null;
-
-    RuntimeException stopException = null;
-    for (int i = kafkaStarters.size() - 1; i >= 0; i--) {
-      StreamDataServerStartable kafkaStarter = kafkaStarters.get(i);
+    if (_kafkaStarters != null) {
+      Exception firstException = null;
       try {
-        kafkaStarter.stop();
-      } catch (Exception e) {
-        RuntimeException wrapped = new RuntimeException(
-            "Failed to stop Kafka broker on port: " + kafkaStarter.getPort(), 
e);
-        if (stopException == null) {
-          stopException = wrapped;
-        } else {
-          stopException.addSuppressed(wrapped);
+        for (StreamDataServerStartable starter : _kafkaStarters) {
+          try {
+            starter.stop();
+          } catch (Exception e) {
+            if (firstException == null) {
+              firstException = e;
+            } else {
+              firstException.addSuppressed(e);
+            }
+          }
         }
+      } finally {
+        _kafkaStarters = null;
       }
-    }
-
-    for (StreamDataServerStartable kafkaStarter : kafkaStarters) {
-      try {
-        waitForKafkaBrokerStopped(kafkaStarter.getPort());
-      } catch (RuntimeException e) {
-        if (stopException == null) {
-          stopException = e;
-        } else {
-          stopException.addSuppressed(e);
-        }
+      if (firstException != null) {
+        throw new RuntimeException("Failed to stop Kafka starters cleanly", 
firstException);
       }
     }
-
-    if (stopException != null) {
-      throw stopException;
-    }
-  }
-
-  private void waitForKafkaBrokerStopped(int brokerPort) {
-    String brokerList = "localhost:" + brokerPort;
-    TestUtils.waitForCondition(aVoid -> !isKafkaBrokerAvailable(brokerList), 
200L, 60_000L,
-        "Kafka broker is still reachable after stop: " + brokerList);
-  }
-
-  private boolean isKafkaBrokerAvailable(String brokerList) {
-    Properties adminProps = new Properties();
-    adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
-    adminProps.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
-    adminProps.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000");
-    try (AdminClient adminClient = AdminClient.create(adminProps)) {
-      adminClient.describeCluster().nodes().get(2, TimeUnit.SECONDS);
-      return true;
-    } catch (Exception e) {
-      return false;
-    }
-  }
-
-  private static final class KafkaBrokerConfig {
-    private final int _brokerId;
-    private final int _port;
-    private final String _containerName;
-
-    private KafkaBrokerConfig(int brokerId, int port, String containerName) {
-      _brokerId = brokerId;
-      _port = port;
-      _containerName = containerName;
-    }
   }
 
   /**
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index d934ff7058a..d304beb91f8 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -222,6 +222,44 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-kafka-3.0</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.compat.version}</artifactId>
+      <version>${kafka3.version}</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka3.version}</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-server-common</artifactId>
+      <version>${kafka3.version}</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-udf-test</artifactId>
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
index d11ad4ac456..b5c4fe12979 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
@@ -18,22 +18,36 @@
  */
 package org.apache.pinot.integration.tests;
 
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.util.TestUtils;
 
 
 public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
-
   private static final int REALTIME_TABLE_CONFIG_RETRY_COUNT = 5;
   private static final long REALTIME_TABLE_CONFIG_RETRY_WAIT_MS = 1_000L;
   private static final long KAFKA_TOPIC_METADATA_READY_TIMEOUT_MS = 30_000L;
@@ -80,14 +94,185 @@ public class 
ExactlyOnceKafkaRealtimeClusterIntegrationTest extends BaseRealtime
   protected void pushAvroIntoKafka(List<File> avroFiles)
       throws Exception {
     String kafkaBrokerList = getKafkaBrokerList();
-    // the first transaction of kafka messages are aborted
-    ClusterIntegrationTestUtils
-        .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList, 
getKafkaTopic(),
-            getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn(), false);
-    // the second transaction of kafka messages are committed
-    ClusterIntegrationTestUtils
-        .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList, 
getKafkaTopic(),
-            getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), 
getPartitionColumn(), true);
+    // Use System.err for diagnostics - log4j2 console appender is filtered to 
ERROR in CI
+    System.err.println("[ExactlyOnce] Pushing transactional data to Kafka at: 
" + kafkaBrokerList);
+    System.err.println("[ExactlyOnce] Avro files count: " + avroFiles.size());
+
+    Properties producerProps = new Properties();
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaBrokerList);
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+    producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+    producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+    producerProps.put(ProducerConfig.RETRIES_CONFIG, 
Integer.toString(Integer.MAX_VALUE));
+    producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
"5");
+    producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "600000");
+    producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"test-transaction-" + UUID.randomUUID());
+
+    // Use a SINGLE producer for both abort and commit transactions.
+    // With a single producer, the coordinator's state machine ensures that 
after
+    // abortTransaction() returns, it returns CONCURRENT_TRANSACTIONS for any 
new
+    // transaction operations until the abort is fully done (markers written).
+    try (KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps)) {
+      producer.initTransactions();
+      System.err.println("[ExactlyOnce] initTransactions() succeeded");
+
+      // Transaction 1: aborted batch
+      long abortedCount = pushAvroRecords(producer, avroFiles, false);
+      System.err.println("[ExactlyOnce] Aborted batch: " + abortedCount + " 
records");
+
+      // Transaction 2: committed batch
+      long committedCount = pushAvroRecords(producer, avroFiles, true);
+      System.err.println("[ExactlyOnce] Committed batch: " + committedCount + 
" records");
+    }
+
+    // After producer is closed, verify data visibility with independent 
consumers
+    System.err.println("[ExactlyOnce] Producer closed. Verifying data 
visibility...");
+    waitForCommittedRecordsVisible(kafkaBrokerList);
+  }
+
+  /**
+   * Wait for committed records to be visible to a read_committed consumer.
+   * This ensures transaction markers have been fully propagated before 
returning.
+   */
+  private void waitForCommittedRecordsVisible(String brokerList) {
+    long deadline = System.currentTimeMillis() + 60_000L;
+    int lastCommitted = 0;
+    int lastUncommitted = 0;
+    int iteration = 0;
+
+    while (System.currentTimeMillis() < deadline) {
+      iteration++;
+      lastCommitted = countRecords(brokerList, "read_committed");
+      if (lastCommitted > 0) {
+        System.err.println("[ExactlyOnce] Verification OK: read_committed=" + 
lastCommitted
+            + " after " + iteration + " iterations");
+        return;
+      }
+      // Check if data reached Kafka at all
+      if (iteration == 1 || iteration % 5 == 0) {
+        lastUncommitted = countRecords(brokerList, "read_uncommitted");
+        System.err.println("[ExactlyOnce] Verification iteration " + iteration
+            + ": read_committed=" + lastCommitted + ", read_uncommitted=" + 
lastUncommitted);
+      }
+      try {
+        Thread.sleep(2_000L);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        break;
+      }
+    }
+
+    // Final diagnostic dump
+    lastUncommitted = countRecords(brokerList, "read_uncommitted");
+    System.err.println("[ExactlyOnce] VERIFICATION FAILED after 60s: 
read_committed=" + lastCommitted
+        + ", read_uncommitted=" + lastUncommitted);
+    throw new AssertionError("[ExactlyOnce] Transaction markers were not 
propagated within 60s; "
+        + "committed records are not visible to read_committed consumers. "
+        + "read_committed=" + lastCommitted + ", read_uncommitted=" + 
lastUncommitted);
+  }
+
+  /**
+   * Push Avro records to Kafka within a transaction. Does NOT call 
initTransactions().
+   * Returns the number of records sent.
+   */
+  private long pushAvroRecords(KafkaProducer<byte[], byte[]> producer, 
List<File> avroFiles, boolean commit)
+      throws Exception {
+    int maxMessagesPerTransaction =
+        getMaxNumKafkaMessagesPerBatch() > 0 ? 
getMaxNumKafkaMessagesPerBatch() : Integer.MAX_VALUE;
+    long counter = 0;
+    int recordsInTransaction = 0;
+    boolean hasOpenTransaction = false;
+    byte[] header = getKafkaMessageHeader();
+    String partitionColumn = getPartitionColumn();
+
+    try (ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(65536)) {
+      for (File avroFile : avroFiles) {
+        try (DataFileStream<GenericRecord> reader = 
AvroUtils.getAvroReader(avroFile)) {
+          BinaryEncoder binaryEncoder = new 
EncoderFactory().directBinaryEncoder(outputStream, null);
+          GenericDatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(reader.getSchema());
+          for (GenericRecord genericRecord : reader) {
+            if (!hasOpenTransaction) {
+              producer.beginTransaction();
+              hasOpenTransaction = true;
+              recordsInTransaction = 0;
+            }
+
+            outputStream.reset();
+            if (header != null && header.length > 0) {
+              outputStream.write(header);
+            }
+            datumWriter.write(genericRecord, binaryEncoder);
+            binaryEncoder.flush();
+
+            byte[] keyBytes = (partitionColumn == null) ? 
Longs.toByteArray(counter)
+                : 
genericRecord.get(partitionColumn).toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
+            byte[] bytes = outputStream.toByteArray();
+            producer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes, 
bytes));
+            counter++;
+
+            recordsInTransaction++;
+            if (recordsInTransaction >= maxMessagesPerTransaction) {
+              if (commit) {
+                producer.commitTransaction();
+              } else {
+                producer.abortTransaction();
+              }
+              hasOpenTransaction = false;
+            }
+          }
+        }
+      }
+    }
+    if (hasOpenTransaction) {
+      if (commit) {
+        producer.commitTransaction();
+      } else {
+        producer.abortTransaction();
+      }
+    }
+    return counter;
+  }
+
+  /**
+   * Count records visible in the topic with the given isolation level.
+   */
+  private int countRecords(String brokerList, String isolationLevel) {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, "txn-diag-" + isolationLevel + 
"-" + UUID.randomUUID());
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
Integer.toString(10 * 1024 * 1024));
+
+    int totalRecords = 0;
+    try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+      List<PartitionInfo> partitions = consumer.partitionsFor(getKafkaTopic(), 
Duration.ofSeconds(10));
+      if (partitions == null || partitions.isEmpty()) {
+        System.err.println("[ExactlyOnce] No partitions found for topic " + 
getKafkaTopic());
+        return 0;
+      }
+      for (PartitionInfo pi : partitions) {
+        TopicPartition tp = new TopicPartition(pi.topic(), pi.partition());
+        consumer.assign(Collections.singletonList(tp));
+        consumer.seekToBeginning(Collections.singletonList(tp));
+        long deadline = System.currentTimeMillis() + 30_000L;
+        int partitionRecords = 0;
+        while (System.currentTimeMillis() < deadline) {
+          ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofSeconds(5));
+          if (records.isEmpty()) {
+            break;
+          }
+          partitionRecords += records.count();
+        }
+        totalRecords += partitionRecords;
+      }
+    } catch (Exception e) {
+      System.err.println("[ExactlyOnce] Error counting records with " + 
isolationLevel + ": " + e.getMessage());
+    }
+    return totalRecords;
   }
 
   private boolean isRetryableRealtimePartitionMetadataError(Throwable 
throwable) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 6a3097c4c1a..70aa705dff2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -37,12 +37,12 @@ import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.http.HttpClient;
 import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
-import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import 
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import 
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
 import org.apache.pinot.server.starter.helix.BaseServerStarter;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -154,6 +154,8 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
 
     serverStarter1.stop();
     serverStarter2.stop();
+    // Re-init the static executor because stopping servers shuts it down; 
required for subsequent operations.
+    SegmentBuildTimeLeaseExtender.initExecutor();
     TestUtils.waitForCondition(aVoid -> 
_resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful()
             && 
_resourceManager.dropInstance(serverStarter2.getInstanceId()).isSuccessful(), 
60_000L,
         "Failed to drop servers");
@@ -218,6 +220,8 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
 
     serverStarter1.stop();
     serverStarter2.stop();
+    // Re-init the static executor because stopping servers shuts it down; 
required for subsequent operations.
+    SegmentBuildTimeLeaseExtender.initExecutor();
     TestUtils.waitForCondition(aVoid -> 
_resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful()
             && 
_resourceManager.dropInstance(serverStarter2.getInstanceId()).isSuccessful(), 
60_000L,
         "Failed to drop servers");
@@ -249,35 +253,20 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
   public void afterMethod()
       throws Exception {
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
-    getControllerRequestClient().pauseConsumption(realtimeTableName);
-    TestUtils.waitForCondition((aVoid) -> {
-      try {
-        PauseStatusDetails pauseStatusDetails = 
getControllerRequestClient().getPauseStatusDetails(realtimeTableName);
-        return pauseStatusDetails.getConsumingSegments().isEmpty();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }, 60_000L, "Failed to drop the segments");
 
-    // Test dropping all segments one by one
-    List<String> segments = listSegments(realtimeTableName);
-    for (String segment : segments) {
-      dropSegment(realtimeTableName, segment);
-    }
+    // Drop the table entirely to clean up all segments and server-side upsert 
state.
+    // This is more reliable than the pause/drop-segments/restart cycle 
because it uses
+    // the standard table lifecycle and avoids issues with stale 
controller/server state.
+    dropRealtimeTable(getTableName());
+    waitForTableDataManagerRemoved(realtimeTableName);
+    waitForEVToDisappear(realtimeTableName);
 
-    // NOTE: There is a delay to remove the segment from property store
-    TestUtils.waitForCondition((aVoid) -> {
-      try {
-        return listSegments(realtimeTableName).isEmpty();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }, 60_000L, "Failed to drop the segments");
+    // Delete and recreate the Kafka topic for a clean stream
+    deleteKafkaTopic(getKafkaTopic());
+    createKafkaTopic(getKafkaTopic());
 
-    stopKafka(); // to clean up the topic
-    restartServers();
-    startKafka();
-    getControllerRequestClient().resumeConsumption(realtimeTableName);
+    // Recreate the table — this triggers fresh consuming segment creation
+    addTableConfig(_tableConfig);
   }
 
   protected void verifySegmentAssignment(Map<String, Map<String, String>> 
segmentAssignment, int numSegmentsExpected,
@@ -363,6 +352,7 @@ public class PartialUpsertTableRebalanceIntegrationTest 
extends BaseClusterInteg
   @AfterClass
   public void tearDown()
       throws IOException {
+    dropRealtimeTable(getTableName());
     stopServer();
     stopBroker();
     stopController();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml
index 771442bf691..af7c1ad6f30 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml
@@ -78,6 +78,38 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.compat.version}</artifactId>
+      <version>${kafka3.version}</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka3.version}</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-server-common</artifactId>
+      <version>${kafka3.version}</version>
+      <classifier>test</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
index d382537c6c6..3f554b91f07 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
@@ -43,7 +43,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
 import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.plugin.stream.kafka30.server.KafkaServerStartable;
+import org.apache.pinot.plugin.stream.kafka30.server.EmbeddedKafkaCluster;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
@@ -56,7 +56,6 @@ import org.apache.pinot.spi.stream.StreamMessage;
 import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.utils.NetUtils;
 import org.apache.pinot.spi.utils.retry.ExponentialBackoffRetryPolicy;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -84,24 +83,19 @@ public class KafkaPartitionLevelConsumerTest {
   private static final long TIMESTAMP = Instant.now().toEpochMilli();
   private static final Random RANDOM = new Random();
 
-  private KafkaServerStartable _kafkaCluster;
+  private EmbeddedKafkaCluster _kafkaCluster;
   private String _kafkaBrokerAddress;
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    int kafkaServerPort = NetUtils.findOpenPort();
-    Properties serverProperties = new Properties();
-    serverProperties.put("kafka.server.bootstrap.servers", "localhost:" + 
kafkaServerPort);
-    serverProperties.put("kafka.server.port", 
Integer.toString(kafkaServerPort));
-    serverProperties.put("kafka.server.broker.id", "0");
-    serverProperties.put("kafka.server.owner.name", 
getClass().getSimpleName());
-    serverProperties.put("kafka.server.allow.managed.for.configured.broker", 
"true");
-
-    _kafkaCluster = new KafkaServerStartable();
-    _kafkaCluster.init(serverProperties);
+    Properties props = new Properties();
+    props.setProperty(EmbeddedKafkaCluster.BROKER_COUNT_PROP, "1");
+
+    _kafkaCluster = new EmbeddedKafkaCluster();
+    _kafkaCluster.init(props);
     _kafkaCluster.start();
-    _kafkaBrokerAddress = "localhost:" + _kafkaCluster.getPort();
+    _kafkaBrokerAddress = _kafkaCluster.bootstrapServers();
     _kafkaCluster.createTopic(TEST_TOPIC_1, 1);
     _kafkaCluster.createTopic(TEST_TOPIC_2, 2);
     _kafkaCluster.createTopic(TEST_TOPIC_3, 1);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
new file mode 100644
index 00000000000..a196353c4a8
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka30.server;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * In-process embedded Kafka cluster using KRaft mode for integration tests.
+ * Eliminates Docker dependency and provides fast, reliable Kafka for testing.
+ */
+public class EmbeddedKafkaCluster implements StreamDataServerStartable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+
+  public static final String BROKER_COUNT_PROP = "embedded.kafka.broker.count";
+
+  private static final int TOPIC_MUTATION_RETRIES = 5;
+
+  private int _brokerCount = 1;
+  private KafkaClusterTestKit _cluster;
+  private String _bootstrapServers;
+
+  @Override
+  public void init(Properties props) {
+    _brokerCount = Integer.parseInt(props.getProperty(BROKER_COUNT_PROP, "1"));
+  }
+
+  @Override
+  public void start() {
+    try {
+      int replicationFactor = Math.min(3, _brokerCount);
+
+      TestKitNodes nodes = new TestKitNodes.Builder()
+          .setCombined(true)
+          .setNumBrokerNodes(_brokerCount)
+          .setNumControllerNodes(1)
+          .setPerServerProperties(Collections.emptyMap())
+          .setBootstrapMetadataVersion(MetadataVersion.latestProduction())
+          .build();
+
+      _cluster = new KafkaClusterTestKit.Builder(nodes)
+          .setConfigProp("offsets.topic.replication.factor", 
String.valueOf(replicationFactor))
+          .setConfigProp("offsets.topic.num.partitions", "1")
+          .setConfigProp("transaction.state.log.replication.factor", 
String.valueOf(replicationFactor))
+          .setConfigProp("transaction.state.log.min.isr", "1")
+          .setConfigProp("transaction.state.log.num.partitions", "1")
+          .setConfigProp("group.initial.rebalance.delay.ms", "0")
+          .setConfigProp("log.flush.interval.messages", "1")
+          .build();
+
+      _cluster.format();
+      _cluster.startup();
+      _cluster.waitForReadyBrokers();
+      _bootstrapServers = _cluster.bootstrapServers();
+      LOGGER.info("Embedded Kafka cluster started with {} broker(s) at {}", 
_brokerCount, _bootstrapServers);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start embedded Kafka cluster", e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (_cluster != null) {
+      try {
+        _cluster.close();
+        LOGGER.info("Embedded Kafka cluster stopped");
+      } catch (Exception e) {
+        LOGGER.warn("Failed to stop embedded Kafka cluster cleanly", e);
+      } finally {
+        _cluster = null;
+        _bootstrapServers = null;
+      }
+    }
+  }
+
+  /**
+   * Returns the full bootstrap servers string (e.g. 
"localhost:12345,localhost:12346").
+   */
+  public String bootstrapServers() {
+    return _bootstrapServers;
+  }
+
+  @Override
+  public int getPort() {
+    if (_bootstrapServers == null) {
+      throw new IllegalStateException("Embedded Kafka cluster is not started");
+    }
+    // Parse the port from the first broker in the bootstrap servers string
+    String firstBroker = _bootstrapServers.split(",")[0];
+    return Integer.parseInt(firstBroker.substring(firstBroker.lastIndexOf(':') 
+ 1));
+  }
+
+  @Override
+  public void createTopic(String topic, Properties topicProps) {
+    int numPartitions = 
Integer.parseInt(String.valueOf(topicProps.getOrDefault("partition", "1")));
+    int requestedReplicationFactor = Integer.parseInt(
+        String.valueOf(topicProps.getOrDefault("replicationFactor", "1")));
+    short replicationFactor = (short) Math.max(1, Math.min(_brokerCount, 
requestedReplicationFactor));
+    try (AdminClient adminClient = createAdminClient()) {
+      NewTopic newTopic = new NewTopic(topic, numPartitions, 
replicationFactor);
+      runAdminWithRetry(() -> 
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(),
+          "create topic: " + topic);
+    } catch (Exception e) {
+      if (e instanceof ExecutionException
+          && e.getCause() instanceof 
org.apache.kafka.common.errors.TopicExistsException) {
+        return;
+      }
+      throw new RuntimeException("Failed to create topic: " + topic, e);
+    }
+  }
+
+  @Override
+  public void deleteTopic(String topic) {
+    try (AdminClient adminClient = createAdminClient()) {
+      runAdminWithRetry(() -> 
adminClient.deleteTopics(Collections.singletonList(topic)).all().get(),
+          "delete topic: " + topic);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to delete topic: " + topic, e);
+    }
+  }
+
+  @Override
+  public void createPartitions(String topic, int numPartitions) {
+    try (AdminClient adminClient = createAdminClient()) {
+      runAdminWithRetry(() -> {
+        adminClient.createPartitions(Collections.singletonMap(topic, 
NewPartitions.increaseTo(numPartitions)))
+            .all().get();
+        return null;
+      }, "create partitions for topic: " + topic);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create partitions for topic: " + 
topic, e);
+    }
+  }
+
+  @Override
+  public void deleteRecordsBeforeOffset(String topic, int partition, long 
offset) {
+    TopicPartition topicPartition = new TopicPartition(topic, partition);
+    try (AdminClient adminClient = createAdminClient()) {
+      runAdminWithRetry(() -> {
+        adminClient.deleteRecords(Collections.singletonMap(topicPartition, 
RecordsToDelete.beforeOffset(offset)))
+            .all().get();
+        return null;
+      }, "delete records before offset for topic: " + topic + ", partition: " 
+ partition);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to delete records before offset for 
topic: " + topic
+          + ", partition: " + partition + ", offset: " + offset, e);
+    }
+  }
+
+  private AdminClient createAdminClient() {
+    if (_bootstrapServers == null) {
+      throw new IllegalStateException("Embedded Kafka cluster is not started");
+    }
+    Properties props = new Properties();
+    props.put("bootstrap.servers", _bootstrapServers);
+    return AdminClient.create(props);
+  }
+
+  private <T> T runAdminWithRetry(AdminOperation<T> operation, String action)
+      throws Exception {
+    ExecutionException lastException = null;
+    for (int attempt = 1; attempt <= TOPIC_MUTATION_RETRIES; attempt++) {
+      try {
+        return operation.execute();
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof 
org.apache.kafka.common.errors.TimeoutException) {
+          lastException = e;
+          if (attempt < TOPIC_MUTATION_RETRIES) {
+            try {
+              Thread.sleep(1000L);
+            } catch (InterruptedException ie) {
+              Thread.currentThread().interrupt();
+              throw ie;
+            }
+            continue;
+          } else {
+            break;
+          }
+        }
+        throw e;
+      }
+    }
+    throw new IllegalStateException("Failed to " + action + " after retries", 
lastException);
+  }
+
+  @FunctionalInterface
+  private interface AdminOperation<T> {
+    T execute()
+        throws Exception;
+  }
+}
diff --git a/pom.xml b/pom.xml
index aa0ce9639b8..a1bc9b7c799 100644
--- a/pom.xml
+++ b/pom.xml
@@ -304,6 +304,7 @@
 
     <!-- Test Libraries -->
     <testng.version>7.12.0</testng.version>
+    <junit-jupiter-api.version>5.10.2</junit-jupiter-api.version>
     <mockito-core.version>5.22.0</mockito-core.version>
     <equalsverifier.version>3.19.4</equalsverifier.version>
     <testcontainers.version>2.0.3</testcontainers.version>
@@ -678,6 +679,12 @@
         <artifactId>pinot-kafka-3.0</artifactId>
         <version>${project.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.pinot</groupId>
+        <artifactId>pinot-kafka-3.0</artifactId>
+        <version>${project.version}</version>
+        <type>test-jar</type>
+      </dependency>
       <dependency>
         <groupId>org.apache.pinot</groupId>
         <artifactId>pinot-kafka-4.0</artifactId>
@@ -1758,6 +1765,24 @@
         <artifactId>kafka_${scala.compat.version}</artifactId>
         <version>${kafka3.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka_${scala.compat.version}</artifactId>
+        <version>${kafka3.version}</version>
+        <classifier>test</classifier>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>${kafka3.version}</version>
+        <classifier>test</classifier>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-server-common</artifactId>
+        <version>${kafka3.version}</version>
+        <classifier>test</classifier>
+      </dependency>
 
       <dependency>
         <groupId>io.confluent</groupId>
@@ -2054,6 +2079,13 @@
         <scope>test</scope>
       </dependency>
 
+      <dependency>
+        <groupId>org.junit.jupiter</groupId>
+        <artifactId>junit-jupiter-api</artifactId>
+        <version>${junit-jupiter-api.version}</version>
+        <scope>test</scope>
+      </dependency>
+
       <!-- mockito bom -->
       <dependency>
         <groupId>org.mockito</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to