This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b2806197e54 Replace Docker-based Kafka with embedded KRaft broker for
integration tests (#17790)
b2806197e54 is described below
commit b2806197e54234a25826eb640ef2233041682508
Author: Xiang Fu <[email protected]>
AuthorDate: Sat Mar 7 23:21:58 2026 -0800
Replace Docker-based Kafka with embedded KRaft broker for integration tests
(#17790)
- Replace KafkaServerStartable (Docker CLI-based) with EmbeddedKafkaCluster
using Kafka's KafkaClusterTestKit in KRaft mode for integration tests
- Eliminate Docker dependency, improving startup speed and removing CI
flakiness
- Add essential embedded Kafka broker configs for transactions
- Rewrite ExactlyOnce test to use single KafkaProducer for both abort and
commit transactions, avoiding transaction marker race conditions
- Fix PartialUpsertTableRebalanceIntegrationTest with retry logic for
embedded Kafka topic metadata propagation delays
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
pinot-integration-test-base/pom.xml | 36 ++++
.../tests/BaseClusterIntegrationTest.java | 222 ++++-----------------
pinot-integration-tests/pom.xml | 38 ++++
...tlyOnceKafkaRealtimeClusterIntegrationTest.java | 203 ++++++++++++++++++-
...PartialUpsertTableRebalanceIntegrationTest.java | 44 ++--
.../pinot-stream-ingestion/pinot-kafka-3.0/pom.xml | 32 +++
.../kafka30/KafkaPartitionLevelConsumerTest.java | 22 +-
.../kafka30/server/EmbeddedKafkaCluster.java | 220 ++++++++++++++++++++
pom.xml | 32 +++
9 files changed, 618 insertions(+), 231 deletions(-)
diff --git a/pinot-integration-test-base/pom.xml
b/pinot-integration-test-base/pom.xml
index b785d421101..1bc17243f51 100644
--- a/pinot-integration-test-base/pom.xml
+++ b/pinot-integration-test-base/pom.xml
@@ -90,6 +90,42 @@
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-kafka-3.0</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.compat.version}</artifactId>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-server-common</artifactId>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index ca59d7f33dd..c1784b43eea 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -24,7 +24,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.net.ServerSocket;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -41,7 +40,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
@@ -69,7 +67,7 @@ import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.plugin.stream.kafka30.server.KafkaServerStartable;
+import org.apache.pinot.plugin.stream.kafka30.server.EmbeddedKafkaCluster;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
@@ -125,8 +123,6 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected static final int DEFAULT_LLC_NUM_KAFKA_BROKERS = 2;
protected static final int DEFAULT_LLC_NUM_KAFKA_PARTITIONS = 2;
protected static final int DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH = 10000;
- private static final int KAFKA_START_MAX_ATTEMPTS = 3;
- private static final long KAFKA_START_RETRY_WAIT_MS = 2_000L;
private static final long KAFKA_CLUSTER_READY_TIMEOUT_MS = 120_000L;
private static final long KAFKA_TOPIC_READY_TIMEOUT_MS = 120_000L;
protected static final List<String> DEFAULT_NO_DICTIONARY_COLUMNS =
@@ -142,7 +138,6 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
protected List<StreamDataServerStartable> _kafkaStarters;
- private List<KafkaBrokerConfig> _kafkaBrokerConfigs;
protected org.apache.pinot.client.Connection _pinotConnection;
protected org.apache.pinot.client.Connection _pinotConnectionV2;
@@ -427,6 +422,10 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
}
protected String getKafkaBrokerList() {
+ StreamDataServerStartable firstStarter = _kafkaStarters.get(0);
+ if (firstStarter instanceof EmbeddedKafkaCluster) {
+ return ((EmbeddedKafkaCluster) firstStarter).bootstrapServers();
+ }
StringBuilder builder = new StringBuilder();
for (int i = 0; i < _kafkaStarters.size(); i++) {
if (i > 0) {
@@ -816,107 +815,22 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected void startKafkaWithoutTopic() {
int requestedBrokers = getNumKafkaBrokers();
- List<KafkaBrokerConfig> brokerConfigs =
getOrCreateKafkaBrokerConfigs(requestedBrokers);
- Throwable lastFailure = null;
- for (int attempt = 1; attempt <= KAFKA_START_MAX_ATTEMPTS; attempt++) {
- String clusterId = UUID.randomUUID().toString().replace("-", "");
- String networkName = "pinot-it-kafka-" +
UUID.randomUUID().toString().replace("-", "");
- String quorumVoters = brokerConfigs.stream()
- .map(config -> config._brokerId + "@" + config._containerName +
":9093")
- .collect(Collectors.joining(","));
-
- List<StreamDataServerStartable> kafkaStarters = new
ArrayList<>(requestedBrokers);
+ Properties props = new Properties();
+ props.setProperty(EmbeddedKafkaCluster.BROKER_COUNT_PROP,
Integer.toString(requestedBrokers));
+ EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster();
+ cluster.init(props);
+ cluster.start();
+ _kafkaStarters = Collections.singletonList(cluster);
+ try {
+ waitForKafkaClusterReady(getKafkaBrokerList(), requestedBrokers,
useKafkaTransaction());
+ } catch (RuntimeException e) {
try {
- for (KafkaBrokerConfig brokerConfig : brokerConfigs) {
- StreamDataServerStartable kafkaStarter =
- createKafkaServerStarter(brokerConfig, clusterId, networkName,
quorumVoters, requestedBrokers);
- kafkaStarter.start();
- kafkaStarters.add(kafkaStarter);
- }
- _kafkaStarters = kafkaStarters;
- waitForKafkaClusterReady(getKafkaBrokerList(), requestedBrokers,
useKafkaTransaction());
- if (attempt > 1) {
- LOGGER.info("Kafka startup succeeded on retry attempt {}/{}",
attempt, KAFKA_START_MAX_ATTEMPTS);
- }
- return;
- } catch (Throwable t) {
- if (t instanceof Error && !(t instanceof AssertionError)) {
- throw (Error) t;
- }
-
- lastFailure = t;
- LOGGER.warn("Kafka startup attempt {}/{} failed; stopping started
brokers before retry", attempt,
- KAFKA_START_MAX_ATTEMPTS, t);
- _kafkaStarters = kafkaStarters;
- try {
- stopKafka();
- } catch (RuntimeException stopException) {
- LOGGER.warn("Kafka cleanup failed after startup attempt {}/{}",
attempt, KAFKA_START_MAX_ATTEMPTS,
- stopException);
- t.addSuppressed(stopException);
- }
-
- if (attempt < KAFKA_START_MAX_ATTEMPTS) {
- try {
- Thread.sleep(KAFKA_START_RETRY_WAIT_MS);
- } catch (InterruptedException interruptedException) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted while waiting to retry
Kafka startup", interruptedException);
- }
- }
+ cluster.stop();
+ } catch (Exception suppressed) {
+ e.addSuppressed(suppressed);
}
- }
-
- _kafkaBrokerConfigs = null;
- throw new RuntimeException("Failed to start Kafka cluster after " +
KAFKA_START_MAX_ATTEMPTS + " attempts",
- lastFailure);
- }
-
- private List<KafkaBrokerConfig> getOrCreateKafkaBrokerConfigs(int
brokerCount) {
- if (_kafkaBrokerConfigs != null && _kafkaBrokerConfigs.size() ==
brokerCount) {
- return _kafkaBrokerConfigs;
- }
- _kafkaBrokerConfigs = createKafkaBrokerConfigs(brokerCount);
- return _kafkaBrokerConfigs;
- }
-
- private StreamDataServerStartable createKafkaServerStarter(KafkaBrokerConfig
brokerConfig, String clusterId,
- String networkName, String quorumVoters, int clusterSize) {
- Properties serverProperties = new Properties();
- serverProperties.put("kafka.server.owner.name",
getClass().getSimpleName());
- serverProperties.put("kafka.server.bootstrap.servers", "localhost:" +
brokerConfig._port);
- serverProperties.put("kafka.server.port",
Integer.toString(brokerConfig._port));
- serverProperties.put("kafka.server.broker.id",
Integer.toString(brokerConfig._brokerId));
- serverProperties.put("kafka.server.allow.managed.for.configured.broker",
"true");
- serverProperties.put("kafka.server.container.name",
brokerConfig._containerName);
- serverProperties.put("kafka.server.network.name", networkName);
- serverProperties.put("kafka.server.cluster.id", clusterId);
- serverProperties.put("kafka.server.cluster.size",
Integer.toString(clusterSize));
- serverProperties.put("kafka.server.controller.quorum.voters",
quorumVoters);
- serverProperties.put("kafka.server.internal.host",
brokerConfig._containerName);
- serverProperties.put("kafka.server.skip.readiness.check", "true");
- KafkaServerStartable kafkaServerStartable = new KafkaServerStartable();
- kafkaServerStartable.init(serverProperties);
- return kafkaServerStartable;
- }
-
- private List<KafkaBrokerConfig> createKafkaBrokerConfigs(int brokerCount) {
- String containerPrefix = "pinot-it-kafka-" +
UUID.randomUUID().toString().replace("-", "");
- List<KafkaBrokerConfig> brokerConfigs = new ArrayList<>(brokerCount);
- for (int i = 0; i < brokerCount; i++) {
- int brokerId = i + 1;
- int port = getAvailablePort();
- String containerName = containerPrefix + "-" + brokerId;
- brokerConfigs.add(new KafkaBrokerConfig(brokerId, port, containerName));
- }
- return brokerConfigs;
- }
-
- private static int getAvailablePort() {
- try (ServerSocket socket = new ServerSocket(0)) {
- return socket.getLocalPort();
- } catch (IOException e) {
- throw new RuntimeException("Failed to find an available port for Kafka",
e);
+ _kafkaStarters = null;
+ throw e;
}
}
@@ -961,20 +875,19 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
}
protected void createKafkaTopic(String topic) {
- int replicationFactor = Math.max(1, Math.min(getNumKafkaBrokers(),
_kafkaStarters.size()));
- createKafkaTopic(topic, getNumKafkaPartitions(), replicationFactor);
+ createKafkaTopic(topic, getNumKafkaPartitions(), getNumKafkaBrokers());
}
protected void createKafkaTopic(String topic, int numPartitions) {
- createKafkaTopic(topic, numPartitions, Math.max(1,
Math.min(getNumKafkaBrokers(), _kafkaStarters.size())));
+ createKafkaTopic(topic, numPartitions, getNumKafkaBrokers());
}
protected void createKafkaTopic(String topic, int numPartitions, int
replicationFactor) {
- int expectedReplicationFactor = Math.max(1, Math.min(replicationFactor,
_kafkaStarters.size()));
+ int effectiveReplicationFactor = Math.max(1, Math.min(replicationFactor,
getNumKafkaBrokers()));
_kafkaStarters.get(0).createTopic(
topic,
- KafkaStarterUtils.getTopicCreationProps(numPartitions,
expectedReplicationFactor));
- waitForKafkaTopicReady(topic, numPartitions, expectedReplicationFactor);
+ KafkaStarterUtils.getTopicCreationProps(numPartitions,
effectiveReplicationFactor));
+ waitForKafkaTopicReady(topic, numPartitions, effectiveReplicationFactor);
waitForKafkaTopicMetadataReadyForConsumer(topic, numPartitions);
}
@@ -1016,13 +929,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
if (_kafkaStarters == null || _kafkaStarters.isEmpty()) {
return false;
}
- for (StreamDataServerStartable kafkaStarter : _kafkaStarters) {
- if (!isKafkaTopicReadyOnBroker("localhost:" + kafkaStarter.getPort(),
topic, expectedPartitions,
- expectedReplicationFactor)) {
- return false;
- }
- }
- return true;
+ return isKafkaTopicReadyOnBroker(getKafkaBrokerList(), topic,
expectedPartitions, expectedReplicationFactor);
}
private boolean isKafkaTopicMetadataReadyForConsumer(String topic, int
expectedPartitions) {
@@ -1069,74 +976,27 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
}
protected void stopKafka() {
- if (_kafkaStarters == null || _kafkaStarters.isEmpty()) {
- return;
- }
- List<StreamDataServerStartable> kafkaStarters = _kafkaStarters;
- _kafkaStarters = null;
-
- RuntimeException stopException = null;
- for (int i = kafkaStarters.size() - 1; i >= 0; i--) {
- StreamDataServerStartable kafkaStarter = kafkaStarters.get(i);
+ if (_kafkaStarters != null) {
+ Exception firstException = null;
try {
- kafkaStarter.stop();
- } catch (Exception e) {
- RuntimeException wrapped = new RuntimeException(
- "Failed to stop Kafka broker on port: " + kafkaStarter.getPort(),
e);
- if (stopException == null) {
- stopException = wrapped;
- } else {
- stopException.addSuppressed(wrapped);
+ for (StreamDataServerStartable starter : _kafkaStarters) {
+ try {
+ starter.stop();
+ } catch (Exception e) {
+ if (firstException == null) {
+ firstException = e;
+ } else {
+ firstException.addSuppressed(e);
+ }
+ }
}
+ } finally {
+ _kafkaStarters = null;
}
- }
-
- for (StreamDataServerStartable kafkaStarter : kafkaStarters) {
- try {
- waitForKafkaBrokerStopped(kafkaStarter.getPort());
- } catch (RuntimeException e) {
- if (stopException == null) {
- stopException = e;
- } else {
- stopException.addSuppressed(e);
- }
+ if (firstException != null) {
+ throw new RuntimeException("Failed to stop Kafka starters cleanly",
firstException);
}
}
-
- if (stopException != null) {
- throw stopException;
- }
- }
-
- private void waitForKafkaBrokerStopped(int brokerPort) {
- String brokerList = "localhost:" + brokerPort;
- TestUtils.waitForCondition(aVoid -> !isKafkaBrokerAvailable(brokerList),
200L, 60_000L,
- "Kafka broker is still reachable after stop: " + brokerList);
- }
-
- private boolean isKafkaBrokerAvailable(String brokerList) {
- Properties adminProps = new Properties();
- adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- adminProps.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
- adminProps.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000");
- try (AdminClient adminClient = AdminClient.create(adminProps)) {
- adminClient.describeCluster().nodes().get(2, TimeUnit.SECONDS);
- return true;
- } catch (Exception e) {
- return false;
- }
- }
-
- private static final class KafkaBrokerConfig {
- private final int _brokerId;
- private final int _port;
- private final String _containerName;
-
- private KafkaBrokerConfig(int brokerId, int port, String containerName) {
- _brokerId = brokerId;
- _port = port;
- _containerName = containerName;
- }
}
/**
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index d934ff7058a..d304beb91f8 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -222,6 +222,44 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-kafka-3.0</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.compat.version}</artifactId>
+ <version>${kafka3.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka3.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-server-common</artifactId>
+ <version>${kafka3.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-udf-test</artifactId>
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
index d11ad4ac456..b5c4fe12979 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java
@@ -18,22 +18,36 @@
*/
package org.apache.pinot.integration.tests;
+import com.google.common.primitives.Longs;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.util.TestUtils;
public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
-
private static final int REALTIME_TABLE_CONFIG_RETRY_COUNT = 5;
private static final long REALTIME_TABLE_CONFIG_RETRY_WAIT_MS = 1_000L;
private static final long KAFKA_TOPIC_METADATA_READY_TIMEOUT_MS = 30_000L;
@@ -80,14 +94,185 @@ public class
ExactlyOnceKafkaRealtimeClusterIntegrationTest extends BaseRealtime
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
String kafkaBrokerList = getKafkaBrokerList();
- // the first transaction of kafka messages are aborted
- ClusterIntegrationTestUtils
- .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList,
getKafkaTopic(),
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), false);
- // the second transaction of kafka messages are committed
- ClusterIntegrationTestUtils
- .pushAvroIntoKafkaWithTransaction(avroFiles, kafkaBrokerList,
getKafkaTopic(),
- getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(),
getPartitionColumn(), true);
+ // Use System.err for diagnostics - log4j2 console appender is filtered to
ERROR in CI
+ System.err.println("[ExactlyOnce] Pushing transactional data to Kafka at:
" + kafkaBrokerList);
+ System.err.println("[ExactlyOnce] Avro files count: " + avroFiles.size());
+
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaBrokerList);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+ producerProps.put(ProducerConfig.RETRIES_CONFIG,
Integer.toString(Integer.MAX_VALUE));
+ producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
"5");
+ producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "600000");
+ producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"test-transaction-" + UUID.randomUUID());
+
+ // Use a SINGLE producer for both abort and commit transactions.
+ // With a single producer, the coordinator's state machine ensures that
after
+ // abortTransaction() returns, it returns CONCURRENT_TRANSACTIONS for any
new
+ // transaction operations until the abort is fully done (markers written).
+ try (KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps)) {
+ producer.initTransactions();
+ System.err.println("[ExactlyOnce] initTransactions() succeeded");
+
+ // Transaction 1: aborted batch
+ long abortedCount = pushAvroRecords(producer, avroFiles, false);
+ System.err.println("[ExactlyOnce] Aborted batch: " + abortedCount + "
records");
+
+ // Transaction 2: committed batch
+ long committedCount = pushAvroRecords(producer, avroFiles, true);
+ System.err.println("[ExactlyOnce] Committed batch: " + committedCount +
" records");
+ }
+
+ // After producer is closed, verify data visibility with independent
consumers
+ System.err.println("[ExactlyOnce] Producer closed. Verifying data
visibility...");
+ waitForCommittedRecordsVisible(kafkaBrokerList);
+ }
+
+ /**
+ * Wait for committed records to be visible to a read_committed consumer.
+ * This ensures transaction markers have been fully propagated before
returning.
+ */
+ private void waitForCommittedRecordsVisible(String brokerList) {
+ long deadline = System.currentTimeMillis() + 60_000L;
+ int lastCommitted = 0;
+ int lastUncommitted = 0;
+ int iteration = 0;
+
+ while (System.currentTimeMillis() < deadline) {
+ iteration++;
+ lastCommitted = countRecords(brokerList, "read_committed");
+ if (lastCommitted > 0) {
+ System.err.println("[ExactlyOnce] Verification OK: read_committed=" +
lastCommitted
+ + " after " + iteration + " iterations");
+ return;
+ }
+ // Check if data reached Kafka at all
+ if (iteration == 1 || iteration % 5 == 0) {
+ lastUncommitted = countRecords(brokerList, "read_uncommitted");
+ System.err.println("[ExactlyOnce] Verification iteration " + iteration
+ + ": read_committed=" + lastCommitted + ", read_uncommitted=" +
lastUncommitted);
+ }
+ try {
+ Thread.sleep(2_000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ // Final diagnostic dump
+ lastUncommitted = countRecords(brokerList, "read_uncommitted");
+ System.err.println("[ExactlyOnce] VERIFICATION FAILED after 60s:
read_committed=" + lastCommitted
+ + ", read_uncommitted=" + lastUncommitted);
+ throw new AssertionError("[ExactlyOnce] Transaction markers were not
propagated within 60s; "
+ + "committed records are not visible to read_committed consumers. "
+ + "read_committed=" + lastCommitted + ", read_uncommitted=" +
lastUncommitted);
+ }
+
+ /**
+ * Push Avro records to Kafka within a transaction. Does NOT call
initTransactions().
+ * Returns the number of records sent.
+ */
+ private long pushAvroRecords(KafkaProducer<byte[], byte[]> producer,
List<File> avroFiles, boolean commit)
+ throws Exception {
+ int maxMessagesPerTransaction =
+ getMaxNumKafkaMessagesPerBatch() > 0 ?
getMaxNumKafkaMessagesPerBatch() : Integer.MAX_VALUE;
+ long counter = 0;
+ int recordsInTransaction = 0;
+ boolean hasOpenTransaction = false;
+ byte[] header = getKafkaMessageHeader();
+ String partitionColumn = getPartitionColumn();
+
+ try (ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(65536)) {
+ for (File avroFile : avroFiles) {
+ try (DataFileStream<GenericRecord> reader =
AvroUtils.getAvroReader(avroFile)) {
+ BinaryEncoder binaryEncoder = new
EncoderFactory().directBinaryEncoder(outputStream, null);
+ GenericDatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<>(reader.getSchema());
+ for (GenericRecord genericRecord : reader) {
+ if (!hasOpenTransaction) {
+ producer.beginTransaction();
+ hasOpenTransaction = true;
+ recordsInTransaction = 0;
+ }
+
+ outputStream.reset();
+ if (header != null && header.length > 0) {
+ outputStream.write(header);
+ }
+ datumWriter.write(genericRecord, binaryEncoder);
+ binaryEncoder.flush();
+
+ byte[] keyBytes = (partitionColumn == null) ?
Longs.toByteArray(counter)
+ :
genericRecord.get(partitionColumn).toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ byte[] bytes = outputStream.toByteArray();
+ producer.send(new ProducerRecord<>(getKafkaTopic(), keyBytes,
bytes));
+ counter++;
+
+ recordsInTransaction++;
+ if (recordsInTransaction >= maxMessagesPerTransaction) {
+ if (commit) {
+ producer.commitTransaction();
+ } else {
+ producer.abortTransaction();
+ }
+ hasOpenTransaction = false;
+ }
+ }
+ }
+ }
+ }
+ if (hasOpenTransaction) {
+ if (commit) {
+ producer.commitTransaction();
+ } else {
+ producer.abortTransaction();
+ }
+ }
+ return counter;
+ }
+
+ /**
+ * Count records visible in the topic with the given isolation level.
+ */
+ private int countRecords(String brokerList, String isolationLevel) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "txn-diag-" + isolationLevel +
"-" + UUID.randomUUID());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolationLevel);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
Integer.toString(10 * 1024 * 1024));
+
+ int totalRecords = 0;
+ try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
+ List<PartitionInfo> partitions = consumer.partitionsFor(getKafkaTopic(),
Duration.ofSeconds(10));
+ if (partitions == null || partitions.isEmpty()) {
+ System.err.println("[ExactlyOnce] No partitions found for topic " +
getKafkaTopic());
+ return 0;
+ }
+ for (PartitionInfo pi : partitions) {
+ TopicPartition tp = new TopicPartition(pi.topic(), pi.partition());
+ consumer.assign(Collections.singletonList(tp));
+ consumer.seekToBeginning(Collections.singletonList(tp));
+ long deadline = System.currentTimeMillis() + 30_000L;
+ int partitionRecords = 0;
+ while (System.currentTimeMillis() < deadline) {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofSeconds(5));
+ if (records.isEmpty()) {
+ break;
+ }
+ partitionRecords += records.count();
+ }
+ totalRecords += partitionRecords;
+ }
+ } catch (Exception e) {
+ System.err.println("[ExactlyOnce] Error counting records with " +
isolationLevel + ": " + e.getMessage());
+ }
+ return totalRecords;
}
private boolean isRetryableRealtimePartitionMetadataError(Throwable
throwable) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
index 6a3097c4c1a..70aa705dff2 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java
@@ -37,12 +37,12 @@ import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.http.HttpClient;
import org.apache.pinot.controller.api.dto.PinotTableReloadStatusResponse;
-import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -154,6 +154,8 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
serverStarter1.stop();
serverStarter2.stop();
+ // Re-init the static executor because stopping servers shuts it down;
required for subsequent operations.
+ SegmentBuildTimeLeaseExtender.initExecutor();
TestUtils.waitForCondition(aVoid ->
_resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful()
&&
_resourceManager.dropInstance(serverStarter2.getInstanceId()).isSuccessful(),
60_000L,
"Failed to drop servers");
@@ -218,6 +220,8 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
serverStarter1.stop();
serverStarter2.stop();
+ // Re-init the static executor because stopping servers shuts it down;
required for subsequent operations.
+ SegmentBuildTimeLeaseExtender.initExecutor();
TestUtils.waitForCondition(aVoid ->
_resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful()
&&
_resourceManager.dropInstance(serverStarter2.getInstanceId()).isSuccessful(),
60_000L,
"Failed to drop servers");
@@ -249,35 +253,20 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
public void afterMethod()
throws Exception {
String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
- getControllerRequestClient().pauseConsumption(realtimeTableName);
- TestUtils.waitForCondition((aVoid) -> {
- try {
- PauseStatusDetails pauseStatusDetails =
getControllerRequestClient().getPauseStatusDetails(realtimeTableName);
- return pauseStatusDetails.getConsumingSegments().isEmpty();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }, 60_000L, "Failed to drop the segments");
- // Test dropping all segments one by one
- List<String> segments = listSegments(realtimeTableName);
- for (String segment : segments) {
- dropSegment(realtimeTableName, segment);
- }
+ // Drop the table entirely to clean up all segments and server-side upsert
state.
+ // This is more reliable than the pause/drop-segments/restart cycle
because it uses
+ // the standard table lifecycle and avoids issues with stale
controller/server state.
+ dropRealtimeTable(getTableName());
+ waitForTableDataManagerRemoved(realtimeTableName);
+ waitForEVToDisappear(realtimeTableName);
- // NOTE: There is a delay to remove the segment from property store
- TestUtils.waitForCondition((aVoid) -> {
- try {
- return listSegments(realtimeTableName).isEmpty();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }, 60_000L, "Failed to drop the segments");
+ // Delete and recreate the Kafka topic for a clean stream
+ deleteKafkaTopic(getKafkaTopic());
+ createKafkaTopic(getKafkaTopic());
- stopKafka(); // to clean up the topic
- restartServers();
- startKafka();
- getControllerRequestClient().resumeConsumption(realtimeTableName);
+ // Recreate the table — this triggers fresh consuming segment creation
+ addTableConfig(_tableConfig);
}
protected void verifySegmentAssignment(Map<String, Map<String, String>>
segmentAssignment, int numSegmentsExpected,
@@ -363,6 +352,7 @@ public class PartialUpsertTableRebalanceIntegrationTest
extends BaseClusterInteg
@AfterClass
public void tearDown()
throws IOException {
+ dropRealtimeTable(getTableName());
stopServer();
stopBroker();
stopController();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml
index 771442bf691..af7c1ad6f30 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/pom.xml
@@ -78,6 +78,38 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.compat.version}</artifactId>
+ <version>${kafka3.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka3.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-server-common</artifactId>
+ <version>${kafka3.version}</version>
+ <classifier>test</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
index d382537c6c6..3f554b91f07 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
@@ -43,7 +43,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
-import org.apache.pinot.plugin.stream.kafka30.server.KafkaServerStartable;
+import org.apache.pinot.plugin.stream.kafka30.server.EmbeddedKafkaCluster;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
@@ -56,7 +56,6 @@ import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.spi.utils.retry.ExponentialBackoffRetryPolicy;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -84,24 +83,19 @@ public class KafkaPartitionLevelConsumerTest {
private static final long TIMESTAMP = Instant.now().toEpochMilli();
private static final Random RANDOM = new Random();
- private KafkaServerStartable _kafkaCluster;
+ private EmbeddedKafkaCluster _kafkaCluster;
private String _kafkaBrokerAddress;
@BeforeClass
public void setUp()
throws Exception {
- int kafkaServerPort = NetUtils.findOpenPort();
- Properties serverProperties = new Properties();
- serverProperties.put("kafka.server.bootstrap.servers", "localhost:" +
kafkaServerPort);
- serverProperties.put("kafka.server.port",
Integer.toString(kafkaServerPort));
- serverProperties.put("kafka.server.broker.id", "0");
- serverProperties.put("kafka.server.owner.name",
getClass().getSimpleName());
- serverProperties.put("kafka.server.allow.managed.for.configured.broker",
"true");
-
- _kafkaCluster = new KafkaServerStartable();
- _kafkaCluster.init(serverProperties);
+ Properties props = new Properties();
+ props.setProperty(EmbeddedKafkaCluster.BROKER_COUNT_PROP, "1");
+
+ _kafkaCluster = new EmbeddedKafkaCluster();
+ _kafkaCluster.init(props);
_kafkaCluster.start();
- _kafkaBrokerAddress = "localhost:" + _kafkaCluster.getPort();
+ _kafkaBrokerAddress = _kafkaCluster.bootstrapServers();
_kafkaCluster.createTopic(TEST_TOPIC_1, 1);
_kafkaCluster.createTopic(TEST_TOPIC_2, 2);
_kafkaCluster.createTopic(TEST_TOPIC_3, 1);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
new file mode 100644
index 00000000000..a196353c4a8
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/server/EmbeddedKafkaCluster.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka30.server;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * In-process embedded Kafka cluster using KRaft mode for integration tests.
+ * Eliminates Docker dependency and provides fast, reliable Kafka for testing.
+ */
+public class EmbeddedKafkaCluster implements StreamDataServerStartable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+
+ public static final String BROKER_COUNT_PROP = "embedded.kafka.broker.count";
+
+ private static final int TOPIC_MUTATION_RETRIES = 5;
+
+ private int _brokerCount = 1;
+ private KafkaClusterTestKit _cluster;
+ private String _bootstrapServers;
+
+ @Override
+ public void init(Properties props) {
+ _brokerCount = Integer.parseInt(props.getProperty(BROKER_COUNT_PROP, "1"));
+ }
+
+ @Override
+ public void start() {
+ try {
+ int replicationFactor = Math.min(3, _brokerCount);
+
+ TestKitNodes nodes = new TestKitNodes.Builder()
+ .setCombined(true)
+ .setNumBrokerNodes(_brokerCount)
+ .setNumControllerNodes(1)
+ .setPerServerProperties(Collections.emptyMap())
+ .setBootstrapMetadataVersion(MetadataVersion.latestProduction())
+ .build();
+
+ _cluster = new KafkaClusterTestKit.Builder(nodes)
+ .setConfigProp("offsets.topic.replication.factor",
String.valueOf(replicationFactor))
+ .setConfigProp("offsets.topic.num.partitions", "1")
+ .setConfigProp("transaction.state.log.replication.factor",
String.valueOf(replicationFactor))
+ .setConfigProp("transaction.state.log.min.isr", "1")
+ .setConfigProp("transaction.state.log.num.partitions", "1")
+ .setConfigProp("group.initial.rebalance.delay.ms", "0")
+ .setConfigProp("log.flush.interval.messages", "1")
+ .build();
+
+ _cluster.format();
+ _cluster.startup();
+ _cluster.waitForReadyBrokers();
+ _bootstrapServers = _cluster.bootstrapServers();
+ LOGGER.info("Embedded Kafka cluster started with {} broker(s) at {}",
_brokerCount, _bootstrapServers);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start embedded Kafka cluster", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (_cluster != null) {
+ try {
+ _cluster.close();
+ LOGGER.info("Embedded Kafka cluster stopped");
+ } catch (Exception e) {
+ LOGGER.warn("Failed to stop embedded Kafka cluster cleanly", e);
+ } finally {
+ _cluster = null;
+ _bootstrapServers = null;
+ }
+ }
+ }
+
+ /**
+ * Returns the full bootstrap servers string (e.g.
"localhost:12345,localhost:12346").
+ */
+ public String bootstrapServers() {
+ return _bootstrapServers;
+ }
+
+ @Override
+ public int getPort() {
+ if (_bootstrapServers == null) {
+ throw new IllegalStateException("Embedded Kafka cluster is not started");
+ }
+ // Parse the port from the first broker in the bootstrap servers string
+ String firstBroker = _bootstrapServers.split(",")[0];
+ return Integer.parseInt(firstBroker.substring(firstBroker.lastIndexOf(':')
+ 1));
+ }
+
+ @Override
+ public void createTopic(String topic, Properties topicProps) {
+ int numPartitions =
Integer.parseInt(String.valueOf(topicProps.getOrDefault("partition", "1")));
+ int requestedReplicationFactor = Integer.parseInt(
+ String.valueOf(topicProps.getOrDefault("replicationFactor", "1")));
+ short replicationFactor = (short) Math.max(1, Math.min(_brokerCount,
requestedReplicationFactor));
+ try (AdminClient adminClient = createAdminClient()) {
+ NewTopic newTopic = new NewTopic(topic, numPartitions,
replicationFactor);
+ runAdminWithRetry(() ->
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(),
+ "create topic: " + topic);
+ } catch (Exception e) {
+ if (e instanceof ExecutionException
+ && e.getCause() instanceof
org.apache.kafka.common.errors.TopicExistsException) {
+ return;
+ }
+ throw new RuntimeException("Failed to create topic: " + topic, e);
+ }
+ }
+
+ @Override
+ public void deleteTopic(String topic) {
+ try (AdminClient adminClient = createAdminClient()) {
+ runAdminWithRetry(() ->
adminClient.deleteTopics(Collections.singletonList(topic)).all().get(),
+ "delete topic: " + topic);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to delete topic: " + topic, e);
+ }
+ }
+
+ @Override
+ public void createPartitions(String topic, int numPartitions) {
+ try (AdminClient adminClient = createAdminClient()) {
+ runAdminWithRetry(() -> {
+ adminClient.createPartitions(Collections.singletonMap(topic,
NewPartitions.increaseTo(numPartitions)))
+ .all().get();
+ return null;
+ }, "create partitions for topic: " + topic);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create partitions for topic: " +
topic, e);
+ }
+ }
+
+ @Override
+ public void deleteRecordsBeforeOffset(String topic, int partition, long
offset) {
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+ try (AdminClient adminClient = createAdminClient()) {
+ runAdminWithRetry(() -> {
+ adminClient.deleteRecords(Collections.singletonMap(topicPartition,
RecordsToDelete.beforeOffset(offset)))
+ .all().get();
+ return null;
+ }, "delete records before offset for topic: " + topic + ", partition: "
+ partition);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to delete records before offset for
topic: " + topic
+ + ", partition: " + partition + ", offset: " + offset, e);
+ }
+ }
+
+ private AdminClient createAdminClient() {
+ if (_bootstrapServers == null) {
+ throw new IllegalStateException("Embedded Kafka cluster is not started");
+ }
+ Properties props = new Properties();
+ props.put("bootstrap.servers", _bootstrapServers);
+ return AdminClient.create(props);
+ }
+
+ private <T> T runAdminWithRetry(AdminOperation<T> operation, String action)
+ throws Exception {
+ ExecutionException lastException = null;
+ for (int attempt = 1; attempt <= TOPIC_MUTATION_RETRIES; attempt++) {
+ try {
+ return operation.execute();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof
org.apache.kafka.common.errors.TimeoutException) {
+ lastException = e;
+ if (attempt < TOPIC_MUTATION_RETRIES) {
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw ie;
+ }
+ continue;
+ } else {
+ break;
+ }
+ }
+ throw e;
+ }
+ }
+ throw new IllegalStateException("Failed to " + action + " after retries",
lastException);
+ }
+
+ @FunctionalInterface
+ private interface AdminOperation<T> {
+ T execute()
+ throws Exception;
+ }
+}
diff --git a/pom.xml b/pom.xml
index aa0ce9639b8..a1bc9b7c799 100644
--- a/pom.xml
+++ b/pom.xml
@@ -304,6 +304,7 @@
<!-- Test Libraries -->
<testng.version>7.12.0</testng.version>
+ <junit-jupiter-api.version>5.10.2</junit-jupiter-api.version>
<mockito-core.version>5.22.0</mockito-core.version>
<equalsverifier.version>3.19.4</equalsverifier.version>
<testcontainers.version>2.0.3</testcontainers.version>
@@ -678,6 +679,12 @@
<artifactId>pinot-kafka-3.0</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-kafka-3.0</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-kafka-4.0</artifactId>
@@ -1758,6 +1765,24 @@
<artifactId>kafka_${scala.compat.version}</artifactId>
<version>${kafka3.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.compat.version}</artifactId>
+ <version>${kafka3.version}</version>
+ <classifier>test</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka3.version}</version>
+ <classifier>test</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-server-common</artifactId>
+ <version>${kafka3.version}</version>
+ <classifier>test</classifier>
+ </dependency>
<dependency>
<groupId>io.confluent</groupId>
@@ -2054,6 +2079,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>${junit-jupiter-api.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- mockito bom -->
<dependency>
<groupId>org.mockito</groupId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]