fapaul commented on code in PR #195:
URL:
https://github.com/apache/flink-connector-kafka/pull/195#discussion_r2523565815
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -51,16 +51,43 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import static org.apache.flink.util.Preconditions.checkState;
+import static
org.apache.flink.util.CollectionUtil.newLinkedHashMapWithExpectedSize;
+import static org.apache.flink.util.Preconditions.checkNotNull;
-/** The enumerator class for Kafka source. */
+/**
+ * The enumerator class for Kafka source.
+ *
+ * <p>A core part of the enumerator is handling discovered splits. The
following lifecycle applies
+ * to splits:
+ *
+ * <ol>
+ * <li>{@link #getSubscribedTopicPartitions()} initially or periodically
retrieves a list of topic
+ * partitions in the worker thread.
+ * <li>Partitions are consolidated in {@link #checkPartitionChanges(Set,
Throwable)} in the main
+ * thread.
+ * <li>New partitions will result in new splits, which are initialized
through {@link
+ * #initializePartitionSplits(PartitionChange)} where start/offsets are
resolved in the worker
+ * thread.
+ * <li>The new, initialized splits are put into {@link #unassignedSplits}
and {@link
+ * #pendingPartitionSplitAssignment} in {@link
+ * #handlePartitionSplitChanges(PartitionSplitChange, Throwable)} in the
main thread.
+ * <li>{@link #assignPendingPartitionSplits(Set)} eventually assigns the
pending splits to readers
Review Comment:
Let's also mention that this is still happening in the main thread
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -51,16 +51,43 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import static org.apache.flink.util.Preconditions.checkState;
+import static
org.apache.flink.util.CollectionUtil.newLinkedHashMapWithExpectedSize;
+import static org.apache.flink.util.Preconditions.checkNotNull;
-/** The enumerator class for Kafka source. */
+/**
+ * The enumerator class for Kafka source.
+ *
+ * <p>A core part of the enumerator is handling discovered splits. The
following lifecycle applies
+ * to splits:
+ *
+ * <ol>
+ * <li>{@link #getSubscribedTopicPartitions()} initially or periodically
retrieves a list of topic
+ * partitions in the worker thread.
+ * <li>Partitions are consolidated in {@link #checkPartitionChanges(Set,
Throwable)} in the main
+ * thread.
+ * <li>New partitions will result in new splits, which are initialized
through {@link
+ * #initializePartitionSplits(PartitionChange)} where start/offsets are
resolved in the worker
Review Comment:
Nit: I find it helpful to describe that `where start/offsets are resolved`
actually talks to the broker to find the offsets to make clear why it's in the
worker thread
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -134,117 +161,59 @@ public KafkaSourceEnumerator(
this.context = context;
this.boundedness = boundedness;
- Map<AssignmentStatus, List<KafkaPartitionSplit>> splits =
- initializeMigratedSplits(kafkaSourceEnumState.splits());
- this.assignedSplits =
indexByPartition(splits.get(AssignmentStatus.ASSIGNED));
- this.unassignedSplits =
indexByPartition(splits.get(AssignmentStatus.UNASSIGNED));
- this.pendingPartitionSplitAssignment = new HashMap<>();
this.partitionDiscoveryIntervalMs =
KafkaSourceOptions.getOption(
properties,
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS,
Long::parseLong);
this.consumerGroupId =
properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
this.initialDiscoveryFinished =
kafkaSourceEnumState.initialDiscoveryFinished();
+ this.assignedSplits =
indexByPartition(kafkaSourceEnumState.assignedSplits());
+ this.unassignedSplits =
indexByPartition(kafkaSourceEnumState.unassignedSplits());
}
- /**
- * Initialize migrated splits to splits with concrete starting offsets.
This method ensures that
- * the costly offset resolution is performed only when there are splits
that have been
- * checkpointed with previous enumerator versions.
- *
- * <p>Note that this method is deliberately performed in the main thread
to avoid a checkpoint
- * of the splits without starting offset.
- */
- private Map<AssignmentStatus, List<KafkaPartitionSplit>>
initializeMigratedSplits(
- Set<SplitAndAssignmentStatus> splits) {
- final Set<TopicPartition> migratedPartitions =
- splits.stream()
- .filter(
- splitStatus ->
- splitStatus.split().getStartingOffset()
- ==
KafkaPartitionSplit.MIGRATED)
- .map(splitStatus ->
splitStatus.split().getTopicPartition())
- .collect(Collectors.toSet());
-
- if (migratedPartitions.isEmpty()) {
- return splitByAssignmentStatus(splits.stream());
- }
-
- final Map<TopicPartition, Long> startOffsets =
- startingOffsetInitializer.getPartitionOffsets(
- migratedPartitions, getOffsetsRetriever());
- return splitByAssignmentStatus(
- splits.stream()
- .map(splitStatus -> resolveMigratedSplit(splitStatus,
startOffsets)));
- }
-
- private static Map<AssignmentStatus, List<KafkaPartitionSplit>>
splitByAssignmentStatus(
- Stream<SplitAndAssignmentStatus> stream) {
- return stream.collect(
- Collectors.groupingBy(
- SplitAndAssignmentStatus::assignmentStatus,
- Collectors.mapping(SplitAndAssignmentStatus::split,
Collectors.toList())));
- }
-
- private static SplitAndAssignmentStatus resolveMigratedSplit(
- SplitAndAssignmentStatus splitStatus, Map<TopicPartition, Long>
startOffsets) {
- final KafkaPartitionSplit split = splitStatus.split();
- if (split.getStartingOffset() != KafkaPartitionSplit.MIGRATED) {
- return splitStatus;
- }
- final Long startOffset = startOffsets.get(split.getTopicPartition());
- checkState(
- startOffset != null,
- "Cannot find starting offset for migrated partition %s",
- split.getTopicPartition());
- return new SplitAndAssignmentStatus(
- new KafkaPartitionSplit(split.getTopicPartition(),
startOffset),
- splitStatus.assignmentStatus());
- }
-
- private Map<TopicPartition, KafkaPartitionSplit> indexByPartition(
- List<KafkaPartitionSplit> splits) {
- if (splits == null) {
- return new HashMap<>();
- }
+ private static Map<TopicPartition, KafkaPartitionSplit> indexByPartition(
+ Collection<KafkaPartitionSplit> splits) {
return splits.stream()
-
.collect(Collectors.toMap(KafkaPartitionSplit::getTopicPartition, split ->
split));
+
.collect(Collectors.toMap(KafkaPartitionSplit::getTopicPartition, e -> e));
}
/**
* Start the enumerator.
*
* <p>Depending on {@link #partitionDiscoveryIntervalMs}, the enumerator
will trigger a one-time
* partition discovery, or schedule a callable for discover partitions
periodically.
- *
- * <p>The invoking chain of partition discovery would be:
- *
- * <ol>
- * <li>{@link #findNewPartitionSplits} in worker thread
- * <li>{@link #handlePartitionSplitChanges} in coordinator thread
- * </ol>
*/
@Override
public void start() {
adminClient = getKafkaAdminClient();
+
+ // find splits where the start offset has been initialized but not yet
assigned to readers
+ final List<KafkaPartitionSplit> preinitializedSplits =
+ unassignedSplits.values().stream()
+ .filter(split -> !split.isStartOffsetMigrated())
Review Comment:
Why is it important to check whether they are migrated, in any case they
should be unassigned correct?
##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceMigrationITCase.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE;
+import static
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The test for creation savepoint for migration tests for the Kafka Sink. */
+@Testcontainers
+public class KafkaSourceMigrationITCase extends TestLogger {
+ public static final String KAFKA_SOURCE_UID = "kafka-source-operator-uid";
+ // Directory to store the savepoints in src/test/resources
+ private static final Path KAFKA_SOURCE_SAVEPOINT_PATH =
+
Path.of("src/test/resources/kafka-source-savepoint").toAbsolutePath();
+
+ @RegisterExtension
+ public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(3)
+ .build());
+
+ public static final int NUM_RECORDS =
+ KafkaSourceTestEnv.NUM_PARTITIONS *
KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
+ private static final String TOPIC = "topic";
+
+ @BeforeEach
+ void setupEnv() throws Throwable {
+ KafkaSourceTestEnv.setup();
+ }
+
+ @AfterEach
+ void removeEnv() throws Exception {
+ KafkaSourceTestEnv.tearDown();
+ }
+
+ static Stream<Arguments> getKafkaSourceSavepoint() throws IOException {
+ return Files.walk(KAFKA_SOURCE_SAVEPOINT_PATH)
+ .filter(
+ f ->
+ Files.isDirectory(f)
+ &&
f.getFileName().toString().startsWith("savepoint"))
+ // allow
+ .map(KAFKA_SOURCE_SAVEPOINT_PATH::relativize)
+ .map(Arguments::arguments);
+ }
+
+ @Disabled("Enable if you want to create savepoint of KafkaSource")
+ @Test
+ void createAndStoreSavepoint(
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient ClusterClient<?> clusterClient)
+ throws Throwable {
+
+ // this is the part that has been read already in the savepoint
+ KafkaSourceTestEnv.createTestTopic(TOPIC);
+ final List<ProducerRecord<String, Integer>> writtenRecords =
+ KafkaSourceTestEnv.getRecordsForTopic(TOPIC);
+ KafkaSourceTestEnv.produceToKafka(writtenRecords);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+
+ final KafkaSource<TimestampedRecord> source = createSource();
+ final int enumVersion =
source.getEnumeratorCheckpointSerializer().getVersion();
+ final int splitVersion = source.getSplitSerializer().getVersion();
+ String testCase = String.format("enum%s-split%s", enumVersion,
splitVersion);
+
+ Path savepointPath = KAFKA_SOURCE_SAVEPOINT_PATH.resolve(testCase);
+ Files.createDirectories(savepointPath);
+
+ final CloseableIterator<TimestampedRecord> recordIterator =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"TestDataSource")
+ .uid(KAFKA_SOURCE_UID)
+ .collectAsync();
+
+ final JobClient jobClient = env.executeAsync();
+ final JobID jobID = jobClient.getJobID();
+
+ final Queue<TimestampedRecord> readRecords = new
ConcurrentLinkedQueue<>();
+ while (readRecords.size() < writtenRecords.size() &&
recordIterator.hasNext()) {
+ readRecords.add(recordIterator.next());
+ }
+
+ try {
+ CompletableFuture<String> savepointFuture =
+ clusterClient.stopWithSavepoint(
+ jobID, false, savepointPath.toString(),
SavepointFormatType.NATIVE);
+ savepointFuture.get(2, TimeUnit.MINUTES);
+
+ final long maxTS = getMaxTS(writtenRecords);
+ assertThat(readRecords).hasSize(NUM_RECORDS).allMatch(r ->
r.getTimestamp() <= maxTS);
+ } finally {
+ SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(clusterClient.requestJobResult(jobID).get())
+ .returns(ApplicationStatus.SUCCEEDED,
JobResult::getApplicationStatus)
+ .extracting(
+ JobResult::getSerializedThrowable,
+
InstanceOfAssertFactories.optional(Exception.class))
+ .isEmpty();
+ softly.assertAll();
+ }
+ }
+
+ private static long getMaxTS(List<ProducerRecord<String, Integer>>
writtenRecords) {
+ return
writtenRecords.stream().mapToLong(ProducerRecord::timestamp).max().orElseThrow();
+ }
+
+ private static KafkaSource<TimestampedRecord> createSource() {
+ return KafkaSource.<TimestampedRecord>builder()
+
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ .setTopics(TOPIC)
+ .setDeserializer(new TestDeserializer())
+ .build();
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("getKafkaSourceSavepoint")
+ void testRestoreFromSavepointWithCurrentVersion(Path savepointPath) throws
Throwable {
+ // this is the part that has been read already in the savepoint
+ KafkaSourceTestEnv.createTestTopic(TOPIC);
+ final List<ProducerRecord<String, Integer>> existingRecords =
+ KafkaSourceTestEnv.getRecordsForTopic(TOPIC);
+ KafkaSourceTestEnv.produceToKafka(existingRecords);
+ // the new data supposed to be read after resuming from the savepoint
+ final List<ProducerRecord<String, Integer>> writtenRecords =
+ KafkaSourceTestEnv.getRecordsForTopicWithoutTimestamp(TOPIC);
+ KafkaSourceTestEnv.produceToKafka(writtenRecords);
+
+ final Configuration configuration = new Configuration();
+ configuration.set(
+ SAVEPOINT_PATH,
KAFKA_SOURCE_SAVEPOINT_PATH.resolve(savepointPath).toString());
+ configuration.set(SAVEPOINT_IGNORE_UNCLAIMED_STATE, true);
Review Comment:
What does this configuration do?
##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceMigrationITCase.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE;
+import static
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The test for creation savepoint for migration tests for the Kafka Sink. */
+@Testcontainers
+public class KafkaSourceMigrationITCase extends TestLogger {
+ public static final String KAFKA_SOURCE_UID = "kafka-source-operator-uid";
+ // Directory to store the savepoints in src/test/resources
+ private static final Path KAFKA_SOURCE_SAVEPOINT_PATH =
+
Path.of("src/test/resources/kafka-source-savepoint").toAbsolutePath();
+
+ @RegisterExtension
+ public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(3)
+ .build());
+
+ public static final int NUM_RECORDS =
+ KafkaSourceTestEnv.NUM_PARTITIONS *
KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
+ private static final String TOPIC = "topic";
+
+ @BeforeEach
+ void setupEnv() throws Throwable {
+ KafkaSourceTestEnv.setup();
+ }
+
+ @AfterEach
+ void removeEnv() throws Exception {
+ KafkaSourceTestEnv.tearDown();
+ }
+
+ static Stream<Arguments> getKafkaSourceSavepoint() throws IOException {
+ return Files.walk(KAFKA_SOURCE_SAVEPOINT_PATH)
+ .filter(
+ f ->
+ Files.isDirectory(f)
+ &&
f.getFileName().toString().startsWith("savepoint"))
+ // allow
+ .map(KAFKA_SOURCE_SAVEPOINT_PATH::relativize)
+ .map(Arguments::arguments);
+ }
+
+ @Disabled("Enable if you want to create savepoint of KafkaSource")
+ @Test
+ void createAndStoreSavepoint(
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient ClusterClient<?> clusterClient)
+ throws Throwable {
+
+ // this is the part that has been read already in the savepoint
+ KafkaSourceTestEnv.createTestTopic(TOPIC);
+ final List<ProducerRecord<String, Integer>> writtenRecords =
+ KafkaSourceTestEnv.getRecordsForTopic(TOPIC);
+ KafkaSourceTestEnv.produceToKafka(writtenRecords);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+
+ final KafkaSource<TimestampedRecord> source = createSource();
+ final int enumVersion =
source.getEnumeratorCheckpointSerializer().getVersion();
+ final int splitVersion = source.getSplitSerializer().getVersion();
+ String testCase = String.format("enum%s-split%s", enumVersion,
splitVersion);
+
+ Path savepointPath = KAFKA_SOURCE_SAVEPOINT_PATH.resolve(testCase);
+ Files.createDirectories(savepointPath);
+
+ final CloseableIterator<TimestampedRecord> recordIterator =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"TestDataSource")
+ .uid(KAFKA_SOURCE_UID)
+ .collectAsync();
+
+ final JobClient jobClient = env.executeAsync();
+ final JobID jobID = jobClient.getJobID();
+
+ final Queue<TimestampedRecord> readRecords = new
ConcurrentLinkedQueue<>();
+ while (readRecords.size() < writtenRecords.size() &&
recordIterator.hasNext()) {
+ readRecords.add(recordIterator.next());
+ }
+
+ try {
+ CompletableFuture<String> savepointFuture =
+ clusterClient.stopWithSavepoint(
+ jobID, false, savepointPath.toString(),
SavepointFormatType.NATIVE);
+ savepointFuture.get(2, TimeUnit.MINUTES);
+
+ final long maxTS = getMaxTS(writtenRecords);
+ assertThat(readRecords).hasSize(NUM_RECORDS).allMatch(r ->
r.getTimestamp() <= maxTS);
+ } finally {
+ SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(clusterClient.requestJobResult(jobID).get())
+ .returns(ApplicationStatus.SUCCEEDED,
JobResult::getApplicationStatus)
+ .extracting(
+ JobResult::getSerializedThrowable,
+
InstanceOfAssertFactories.optional(Exception.class))
+ .isEmpty();
+ softly.assertAll();
+ }
+ }
+
+ private static long getMaxTS(List<ProducerRecord<String, Integer>>
writtenRecords) {
+ return
writtenRecords.stream().mapToLong(ProducerRecord::timestamp).max().orElseThrow();
+ }
+
+ private static KafkaSource<TimestampedRecord> createSource() {
+ return KafkaSource.<TimestampedRecord>builder()
+
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ .setTopics(TOPIC)
+ .setDeserializer(new TestDeserializer())
+ .build();
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("getKafkaSourceSavepoint")
+ void testRestoreFromSavepointWithCurrentVersion(Path savepointPath) throws
Throwable {
+ // this is the part that has been read already in the savepoint
+ KafkaSourceTestEnv.createTestTopic(TOPIC);
+ final List<ProducerRecord<String, Integer>> existingRecords =
+ KafkaSourceTestEnv.getRecordsForTopic(TOPIC);
+ KafkaSourceTestEnv.produceToKafka(existingRecords);
+ // the new data supposed to be read after resuming from the savepoint
+ final List<ProducerRecord<String, Integer>> writtenRecords =
+ KafkaSourceTestEnv.getRecordsForTopicWithoutTimestamp(TOPIC);
+ KafkaSourceTestEnv.produceToKafka(writtenRecords);
+
+ final Configuration configuration = new Configuration();
+ configuration.set(
+ SAVEPOINT_PATH,
KAFKA_SOURCE_SAVEPOINT_PATH.resolve(savepointPath).toString());
+ configuration.set(SAVEPOINT_IGNORE_UNCLAIMED_STATE, true);
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ env.setParallelism(2);
+
+ final KafkaSource<TimestampedRecord> source = createSource();
+
+ final CloseableIterator<TimestampedRecord> recordIterator =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"TestDataSource")
+ .uid(KAFKA_SOURCE_UID)
+ .collectAsync();
Review Comment:
Are you sure collectAsync works with parallelism 2?
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -331,27 +319,36 @@ private PartitionSplitChange findNewPartitionSplits() {
* partitions
*/
private PartitionSplitChange initializePartitionSplits(PartitionChange
partitionChange) {
- Set<TopicPartition> newPartitions =
-
Collections.unmodifiableSet(partitionChange.getNewPartitions());
+ Set<TopicPartition> newPartitions = partitionChange.getNewPartitions();
+ Set<TopicPartition> initialPartitions =
partitionChange.getInitialPartitions();
OffsetsInitializer.PartitionOffsetsRetriever offsetsRetriever =
getOffsetsRetriever();
// initial partitions use OffsetsInitializer specified by the user
while new partitions use
// EARLIEST
- final OffsetsInitializer initializer;
- if (!initialDiscoveryFinished) {
- initializer = startingOffsetInitializer;
- } else {
- initializer = newDiscoveryOffsetsInitializer;
- }
Map<TopicPartition, Long> startingOffsets =
- initializer.getPartitionOffsets(newPartitions,
offsetsRetriever);
-
+ newLinkedHashMapWithExpectedSize(newPartitions.size() +
initialPartitions.size());
Map<TopicPartition, Long> stoppingOffsets =
- stoppingOffsetInitializer.getPartitionOffsets(newPartitions,
offsetsRetriever);
+ newLinkedHashMapWithExpectedSize(newPartitions.size() +
initialPartitions.size());
+ if (!newPartitions.isEmpty()) {
+ startingOffsets.putAll(
+ newDiscoveryOffsetsInitializer.getPartitionOffsets(
+ newPartitions, offsetsRetriever));
+ stoppingOffsets.putAll(
+
stoppingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever));
+ }
Review Comment:
Nit: the whole block could be a method that you reuse for the
`initialPartitions`
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java:
##########
@@ -41,14 +41,20 @@ public class KafkaPartitionSplit implements SourceSplit {
public static final long EARLIEST_OFFSET = -2;
// Indicating the split should consume from the last committed offset.
public static final long COMMITTED_OFFSET = -3;
- // Used to indicate the split has been migrated from an earlier enumerator
state; offset needs
- // to be initialized on recovery
+ // Used to indicate the offset has not been initialized yet in the
enumerator state; offset
+ // needs to be initialized on recovery
public static final long MIGRATED = Long.MIN_VALUE;
+ public static final long MIGRATED_INITIAL = MIGRATED + 1;
Review Comment:
Can you add a comment about why we need `MIGRATED` and `MIGRATED_INITIAL`?
In my understanding `MIGRATED` can either be splits that are newly discovered
or part of the initial fetch or added back by the source reader (assigned). If
initial here refers to initial fetch partitions I do not understand the
difference fully.
##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
##########
@@ -123,13 +123,13 @@ public void deleteTestTopic(String topic) {
private void tryDelete(AdminClient adminClient, String topic) throws
Exception {
try {
- adminClient.deleteTopics(Collections.singleton(topic)).all().get();
CommonTestUtils.waitUtil(
() -> {
try {
- return
adminClient.listTopics().listings().get().stream()
- .map(TopicListing::name)
- .noneMatch((name) -> name.equals(topic));
+
adminClient.deleteTopics(Collections.singleton(topic)).all().get();
Review Comment:
I think this is not a safe refactoring. Due to kafka's eventual consistency
it's better to try the listing until they are removed. Wdyt?
##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceMigrationITCase.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.InjectMiniCluster;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE;
+import static
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The test for creation savepoint for migration tests for the Kafka Sink. */
+@Testcontainers
+public class KafkaSourceMigrationITCase extends TestLogger {
+ public static final String KAFKA_SOURCE_UID = "kafka-source-operator-uid";
+ // Directory to store the savepoints in src/test/resources
+ private static final Path KAFKA_SOURCE_SAVEPOINT_PATH =
+
Path.of("src/test/resources/kafka-source-savepoint").toAbsolutePath();
+
+ @RegisterExtension
+ public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(3)
+ .build());
+
+ public static final int NUM_RECORDS =
+ KafkaSourceTestEnv.NUM_PARTITIONS *
KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION;
+ private static final String TOPIC = "topic";
+
+ @BeforeEach
+ void setupEnv() throws Throwable {
+ KafkaSourceTestEnv.setup();
+ }
+
+ @AfterEach
+ void removeEnv() throws Exception {
+ KafkaSourceTestEnv.tearDown();
+ }
+
+ static Stream<Arguments> getKafkaSourceSavepoint() throws IOException {
+ return Files.walk(KAFKA_SOURCE_SAVEPOINT_PATH)
+ .filter(
+ f ->
+ Files.isDirectory(f)
+ &&
f.getFileName().toString().startsWith("savepoint"))
+ // allow
+ .map(KAFKA_SOURCE_SAVEPOINT_PATH::relativize)
+ .map(Arguments::arguments);
+ }
+
+ @Disabled("Enable if you want to create savepoint of KafkaSource")
+ @Test
+ void createAndStoreSavepoint(
+ @InjectMiniCluster MiniCluster miniCluster,
+ @InjectClusterClient ClusterClient<?> clusterClient)
+ throws Throwable {
+
+ // this is the part that has been read already in the savepoint
+ KafkaSourceTestEnv.createTestTopic(TOPIC);
+ final List<ProducerRecord<String, Integer>> writtenRecords =
+ KafkaSourceTestEnv.getRecordsForTopic(TOPIC);
+ KafkaSourceTestEnv.produceToKafka(writtenRecords);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(3);
+
+ final KafkaSource<TimestampedRecord> source = createSource();
+ final int enumVersion =
source.getEnumeratorCheckpointSerializer().getVersion();
+ final int splitVersion = source.getSplitSerializer().getVersion();
+ String testCase = String.format("enum%s-split%s", enumVersion,
splitVersion);
+
+ Path savepointPath = KAFKA_SOURCE_SAVEPOINT_PATH.resolve(testCase);
+ Files.createDirectories(savepointPath);
+
+ final CloseableIterator<TimestampedRecord> recordIterator =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"TestDataSource")
+ .uid(KAFKA_SOURCE_UID)
+ .collectAsync();
+
+ final JobClient jobClient = env.executeAsync();
+ final JobID jobID = jobClient.getJobID();
+
+ final Queue<TimestampedRecord> readRecords = new
ConcurrentLinkedQueue<>();
+ while (readRecords.size() < writtenRecords.size() &&
recordIterator.hasNext()) {
+ readRecords.add(recordIterator.next());
+ }
+
+ try {
+ CompletableFuture<String> savepointFuture =
+ clusterClient.stopWithSavepoint(
+ jobID, false, savepointPath.toString(),
SavepointFormatType.NATIVE);
+ savepointFuture.get(2, TimeUnit.MINUTES);
+
+ final long maxTS = getMaxTS(writtenRecords);
+ assertThat(readRecords).hasSize(NUM_RECORDS).allMatch(r ->
r.getTimestamp() <= maxTS);
+ } finally {
+ SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(clusterClient.requestJobResult(jobID).get())
+ .returns(ApplicationStatus.SUCCEEDED,
JobResult::getApplicationStatus)
+ .extracting(
+ JobResult::getSerializedThrowable,
+
InstanceOfAssertFactories.optional(Exception.class))
+ .isEmpty();
+ softly.assertAll();
+ }
+ }
+
+ private static long getMaxTS(List<ProducerRecord<String, Integer>>
writtenRecords) {
+ return
writtenRecords.stream().mapToLong(ProducerRecord::timestamp).max().orElseThrow();
+ }
+
+ private static KafkaSource<TimestampedRecord> createSource() {
+ return KafkaSource.<TimestampedRecord>builder()
+
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ .setTopics(TOPIC)
+ .setDeserializer(new TestDeserializer())
+ .build();
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("getKafkaSourceSavepoint")
+ void testRestoreFromSavepointWithCurrentVersion(Path savepointPath) throws
Throwable {
+ // this is the part that has been read already in the savepoint
+ KafkaSourceTestEnv.createTestTopic(TOPIC);
+ final List<ProducerRecord<String, Integer>> existingRecords =
+ KafkaSourceTestEnv.getRecordsForTopic(TOPIC);
+ KafkaSourceTestEnv.produceToKafka(existingRecords);
Review Comment:
Let's extract a method to make sure this part is in sync between the setup
and the test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]