This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7357fbb9a16c feat: add flink continuous split enumerator (#17562)
7357fbb9a16c is described below
commit 7357fbb9a16cbd433c5813d9485cd5bdac8e1cd3
Author: Peter Huang <[email protected]>
AuthorDate: Wed Dec 17 19:02:04 2025 -0800
feat: add flink continuous split enumerator (#17562)
---
.../apache/hudi/source/IncrementalInputSplits.java | 25 ++
.../java/org/apache/hudi/source/ScanContext.java | 212 ++++++++++
.../enumerator/AbstractHoodieSplitEnumerator.java | 2 +-
.../HoodieContinuousSplitEnumerator.java | 126 ++++++
.../enumerator/HoodieEnumeratorPosition.java | 93 +++++
.../enumerator/HoodieSplitEnumeratorState.java | 18 +-
.../source/split/DefaultHoodieSplitDiscover.java | 57 +++
.../source/split/HoodieContinuousSplitBatch.java | 79 ++++
.../HoodieContinuousSplitDiscover.java} | 22 +-
.../hudi/source/split/HoodieSourceSplit.java | 5 +-
.../hudi/source/TestIncrementalInputSplits.java | 136 +++++++
.../org/apache/hudi/source/TestScanContext.java | 257 ++++++++++++
.../TestHoodieContinuousSplitEnumerator.java | 448 +++++++++++++++++++++
.../split/TestDefaultHoodieSplitDiscover.java | 234 +++++++++++
14 files changed, 1695 insertions(+), 19 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 4258a49a36ed..052f0c94cac4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -39,6 +39,7 @@ import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.format.cdc.CdcInputSplit;
@@ -245,6 +246,11 @@ public class IncrementalInputSplits implements
Serializable {
HoodieTableMetaClient metaClient,
@Nullable String issuedOffset,
boolean cdcEnabled) {
+
+ if (metaClient == null) {
+ throw new IllegalArgumentException("metaClient must not be null");
+ }
+
metaClient.reloadActiveTimeline();
IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder()
.metaClient(metaClient)
@@ -303,6 +309,25 @@ public class IncrementalInputSplits implements
Serializable {
}
}
+ /**
+ * Returns the incremental Hoodie source split batch.
+ *
+ * @param metaClient The meta client
+ * @param startInstant The start Instant of the splits
+ * @param cdcEnabled Whether cdc is enabled
+ *
+ * @return The list of incremental input splits or empty if there are no new
instants
+ */
+ public HoodieContinuousSplitBatch inputHoodieSourceSplits(
+ HoodieTableMetaClient metaClient,
+ @Nullable String startInstant,
+ boolean cdcEnabled) {
+ Result result = inputSplits(metaClient, startInstant, cdcEnabled);
+
+
+ return HoodieContinuousSplitBatch.fromResult(result);
+ }
+
/**
* Returns the input splits for streaming incremental read.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
new file mode 100644
index 000000000000..c0b8eca81acb
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * Hudi source scan context.
+ */
+@Internal
+public class ScanContext implements Serializable {
+ private final Configuration conf;
+ private final Path path;
+ private final RowType rowType;
+ private final String startCommit;
+ private final String endCommit;
+ private final long maxCompactionMemoryInBytes;
+ // max pending splits that are not assigned in split provider
+ private final long maxPendingSplits;
+ // skip compaction
+ private final boolean skipCompaction;
+ // skip clustering
+ private final boolean skipClustering;
+ // skip insert overwrite
+ private final boolean skipInsertOverwrite;
+ // cdc enabled
+ private final boolean cdcEnabled;
+
+ public ScanContext(
+ Configuration conf,
+ Path path,
+ RowType rowType,
+ String startCommit,
+ String endCommit,
+ long maxCompactionMemoryInBytes,
+ long maxPendingSplits,
+ boolean skipCompaction,
+ boolean skipClustering,
+ boolean skipInsertOverwrite,
+ boolean cdcEnabled) {
+ this.conf = conf;
+ this.path = path;
+ this.rowType = rowType;
+ this.startCommit = startCommit;
+ this.endCommit = endCommit;
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.maxPendingSplits = maxPendingSplits;
+ this.skipCompaction = skipCompaction;
+ this.skipClustering = skipClustering;
+ this.skipInsertOverwrite = skipInsertOverwrite;
+ this.cdcEnabled = cdcEnabled;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public RowType getRowType() {
+ return rowType;
+ }
+
+ public String getStartCommit() {
+ return startCommit;
+ }
+
+ public String getEndCommit() {
+ return endCommit;
+ }
+
+ public long getMaxCompactionMemoryInBytes() {
+ return maxCompactionMemoryInBytes;
+ }
+
+ public long getMaxPendingSplits() {
+ return maxPendingSplits;
+ }
+
+ public boolean skipCompaction() {
+ return skipCompaction;
+ }
+
+ public boolean skipClustering() {
+ return skipClustering;
+ }
+
+ public boolean skipInsertOverwrite() {
+ return skipInsertOverwrite;
+ }
+
+ public boolean cdcEnabled() {
+ return cdcEnabled;
+ }
+
+ public Duration getScanInterval() {
+ return
Duration.ofSeconds(conf.get(FlinkOptions.READ_STREAMING_CHECK_INTERVAL));
+ }
+
+ /**
+ * Builder for {@link ScanContext}.
+ */
+ public static class Builder {
+ private Configuration conf;
+ private Path path;
+ private RowType rowType;
+ private String startInstant;
+ private String endInstant;
+ private long maxCompactionMemoryInBytes;
+ private long maxPendingSplits;
+ private boolean skipCompaction;
+ private boolean skipClustering;
+ private boolean skipInsertOverwrite;
+ private boolean cdcEnabled;
+
+ public Builder conf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder path(Path path) {
+ this.path = path;
+ return this;
+ }
+
+ public Builder rowType(RowType rowType) {
+ this.rowType = rowType;
+ return this;
+ }
+
+ public Builder startInstant(String startInstant) {
+ this.startInstant = startInstant;
+ return this;
+ }
+
+ public Builder endInstant(String endInstant) {
+ this.endInstant = endInstant;
+ return this;
+ }
+
+ public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes)
{
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ return this;
+ }
+
+ public Builder maxPendingSplits(long maxPendingSplits) {
+ this.maxPendingSplits = maxPendingSplits;
+ return this;
+ }
+
+ public Builder skipCompaction(boolean skipCompaction) {
+ this.skipCompaction = skipCompaction;
+ return this;
+ }
+
+ public Builder skipClustering(boolean skipClustering) {
+ this.skipClustering = skipClustering;
+ return this;
+ }
+
+ public Builder skipInsertOverwrite(boolean skipInsertOverwrite) {
+ this.skipInsertOverwrite = skipInsertOverwrite;
+ return this;
+ }
+
+ public Builder cdcEnabled(boolean cdcEnabled) {
+ this.cdcEnabled = cdcEnabled;
+ return this;
+ }
+
+ public ScanContext build() {
+ return new ScanContext(
+ conf,
+ path,
+ rowType,
+ startInstant,
+ endInstant,
+ maxCompactionMemoryInBytes,
+ maxPendingSplits,
+ skipCompaction,
+ skipClustering,
+ skipInsertOverwrite,
+ cdcEnabled);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
index 331ee0d7e22d..6693ea20da92 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
@@ -116,7 +116,7 @@ abstract class AbstractHoodieSplitEnumerator
@Override
public HoodieSplitEnumeratorState snapshotState(long checkpointId) throws
Exception {
- return new HoodieSplitEnumeratorState(splitProvider.state());
+ return new HoodieSplitEnumeratorState(splitProvider.state(),
Option.empty(), Option.empty());
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
new file mode 100644
index 000000000000..b45acd60374c
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
+import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.HoodieSplitProvider;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Continuous Hoodie enumerator that discovers Hoodie splits from new Hoodie
commits of upstream Hoodie table.
+ */
+public class HoodieContinuousSplitEnumerator extends
AbstractHoodieSplitEnumerator {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieContinuousSplitEnumerator.class);
+
+ private final SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext;
+ private final HoodieSplitProvider splitProvider;
+ private final HoodieContinuousSplitDiscover splitDiscover;
+ private final ScanContext scanContext;
+
+ /**
+ * Instant for the last enumerated commit. Next incremental enumeration
should be based off
+ * this as the starting position.
+ */
+ private final AtomicReference<HoodieEnumeratorPosition> position;
+
+ public HoodieContinuousSplitEnumerator(
+ SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext,
+ HoodieSplitProvider splitProvider,
+ HoodieContinuousSplitDiscover splitDiscover,
+ ScanContext scanContext,
+ Option<HoodieSplitEnumeratorState> enumStateOpt) {
+ super(enumeratorContext, splitProvider);
+ this.enumeratorContext = enumeratorContext;
+ this.splitProvider = splitProvider;
+ this.splitDiscover = splitDiscover;
+ this.scanContext = scanContext;
+ this.position = new AtomicReference<>();
+
+ if (enumStateOpt.isPresent()) {
+
this.position.set(HoodieEnumeratorPosition.of(enumStateOpt.get().getLastEnumeratedInstant(),
enumStateOpt.get().getLastEnumeratedInstantOffset()));
+ } else {
+ // We need to set the instantOffset as null for the first read. For
first read, the start instant is inclusive,
+ // while for continuous incremental read, the start instant is exclusive.
+ this.position.set(HoodieEnumeratorPosition.of(Option.empty(),
Option.empty()));
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ enumeratorContext.callAsync(
+ this::discoverSplits,
+ this::processDiscoveredSplits,
+ 0L,
+ scanContext.getScanInterval().toMillis());
+ }
+
+ @Override
+ boolean shouldWaitForMoreSplits() {
+ return true;
+ }
+
+ @Override
+ public HoodieSplitEnumeratorState snapshotState(long checkpointId) throws
Exception {
+ return new HoodieSplitEnumeratorState(splitProvider.state(),
position.get().issuedInstant(), position.get().issuedOffset());
+ }
+
+ private HoodieContinuousSplitBatch discoverSplits() {
+ int pendingSplitNumber = splitProvider.pendingSplitCount();
+ if (pendingSplitNumber > scanContext.getMaxPendingSplits()) {
+ LOG.info(
+ "Pause split discovery as the assigner already has too many pending
splits: {}",
+ pendingSplitNumber);
+ return HoodieContinuousSplitBatch.EMPTY;
+ }
+ return
splitDiscover.discoverSplits(position.get().issuedOffset().isPresent() ?
position.get().issuedOffset().get() : null);
+ }
+
+ private void processDiscoveredSplits(HoodieContinuousSplitBatch result,
Throwable throwable) {
+ if (throwable != null) {
+ throw new RuntimeException("Failed to discover new splits", throwable);
+ }
+
+ if (!result.getSplits().isEmpty()) {
+ splitProvider.onDiscoveredSplits(result.getSplits());
+ LOG.debug(
+ "Added {} splits discovered between ({}, {}] to the assigner",
+ result.getSplits().size(),
+ position.get().issuedOffset(),
+ result.getOffset());
+ } else {
+ LOG.debug(
+ "No new splits discovered between ({}, {}]",
+ position.get().issuedOffset(),
+ result.getOffset());
+ }
+ position.set(HoodieEnumeratorPosition.of(result.getEndInstant(),
result.getOffset()));
+ LOG.info("Update enumerator position to {}", position.get());
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
new file mode 100644
index 000000000000..05eb45609830
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The position of {@link HoodieContinuousSplitEnumerator}. The completionTime
notion which is introduced in incremental
+ * read mainly for scenarios of “hollow” instants: instants start very early
but finished long time after, the completion
+ * time based inc query gives clear semantics of the consumption boundaries.
The instantTime is from legacy readers and
+ * now just used for logging purposes.
+ */
+public class HoodieEnumeratorPosition implements Serializable {
+ // Max instant time consumed
+ private final Option<String> issuedInstant;
+ // Max completion time
+ private final Option<String> issuedOffset;
+
+ static HoodieEnumeratorPosition empty() {
+ return new HoodieEnumeratorPosition(Option.empty(), Option.empty());
+ }
+
+ static HoodieEnumeratorPosition of(String lastInstant, String
lastInstantCompletionTime) {
+ return new HoodieEnumeratorPosition(lastInstant,
lastInstantCompletionTime);
+ }
+
+ static HoodieEnumeratorPosition of(Option<String> lastInstant,
Option<String> lastInstantCompletionTime) {
+ return new HoodieEnumeratorPosition(lastInstant,
lastInstantCompletionTime);
+ }
+
+ private HoodieEnumeratorPosition(String issuedInstant, String issuedOffset) {
+ this.issuedInstant = StringUtils.isNullOrEmpty(issuedInstant) ?
Option.empty() : Option.of(issuedInstant);
+ this.issuedOffset = StringUtils.isNullOrEmpty(issuedOffset) ?
Option.empty() : Option.of(issuedOffset);
+ }
+
+ private HoodieEnumeratorPosition(Option<String> issuedInstant,
Option<String> issuedOffset) {
+ this.issuedInstant = issuedInstant;
+ this.issuedOffset = issuedOffset;
+ }
+
+ public Option<String> issuedInstant() {
+ return issuedInstant;
+ }
+
+ public Option<String> issuedOffset() {
+ return issuedOffset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HoodieEnumeratorPosition that = (HoodieEnumeratorPosition) o;
+ return Objects.equals(issuedInstant, that.issuedInstant) &&
Objects.equals(issuedOffset, that.issuedOffset);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(issuedInstant, issuedOffset);
+ }
+
+ @Override
+ public String toString() {
+ return "HoodieEnumeratorPosition{"
+ + "issuedInstant=" + issuedInstant
+ + ", issuedOffset=" + issuedOffset
+ + '}';
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
index de41d1d0e0f4..42c4563a51a6 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source.enumerator;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.source.split.HoodieSourceSplitState;
import java.io.Serializable;
@@ -27,13 +28,28 @@ import java.util.Collection;
* State of Hoodie split enumerator. Mainly include the states of pending
splits of split provider.
*/
public class HoodieSplitEnumeratorState implements Serializable {
+ private final Option<String> lastEnumeratedInstant;
+ private final Option<String> lastEnumeratedInstantOffset;
private final Collection<HoodieSourceSplitState> pendingSplitStates;
- public HoodieSplitEnumeratorState(Collection<HoodieSourceSplitState>
pendingSplitStates) {
+ public HoodieSplitEnumeratorState(
+ Collection<HoodieSourceSplitState> pendingSplitStates,
+ Option<String> lastEnumeratedInstant,
+ Option<String> lastEnumeratedInstantOffset) {
this.pendingSplitStates = pendingSplitStates;
+ this.lastEnumeratedInstant = lastEnumeratedInstant;
+ this.lastEnumeratedInstantOffset = lastEnumeratedInstantOffset;
}
public Collection<HoodieSourceSplitState> getPendingSplitStates() {
return pendingSplitStates;
}
+
+ public Option<String> getLastEnumeratedInstant() {
+ return lastEnumeratedInstant;
+ }
+
+ public Option<String> getLastEnumeratedInstantOffset() {
+ return lastEnumeratedInstantOffset;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
new file mode 100644
index 000000000000..718b78fe966e
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.source.IncrementalInputSplits;
+import org.apache.hudi.source.ScanContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of HoodieContinuousSplitDiscover.
+ */
+public class DefaultHoodieSplitDiscover implements
HoodieContinuousSplitDiscover {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultHoodieSplitDiscover.class);
+
+ private final HoodieTableMetaClient metaClient;
+ private final ScanContext scanContext;
+ private final IncrementalInputSplits incrementalInputSplits;
+
+ public DefaultHoodieSplitDiscover(
+ ScanContext scanContext,
+ HoodieTableMetaClient metaClient) {
+ this.scanContext = scanContext;
+ this.metaClient = metaClient;
+ this.incrementalInputSplits = IncrementalInputSplits.builder()
+ .conf(scanContext.getConf())
+ .path(scanContext.getPath())
+ .rowType(scanContext.getRowType())
+
.maxCompactionMemoryInBytes(scanContext.getMaxCompactionMemoryInBytes())
+ .skipCompaction(scanContext.skipCompaction())
+ .skipClustering(scanContext.skipClustering())
+ .skipInsertOverwrite(scanContext.skipInsertOverwrite()).build();
+ }
+
+ @Override
+ public HoodieContinuousSplitBatch discoverSplits(String lastInstant) {
+ return incrementalInputSplits.inputHoodieSourceSplits(metaClient,
lastInstant, scanContext.cdcEnabled());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
new file mode 100644
index 000000000000..3c1c7c090d85
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.source.IncrementalInputSplits;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Result from continuous enumerator. It has the same semantic to the {@link
org.apache.hudi.source.IncrementalInputSplits.Result}.
+ */
+public class HoodieContinuousSplitBatch {
+ public static final HoodieContinuousSplitBatch EMPTY = new
HoodieContinuousSplitBatch(Collections.emptyList(), "", "");
+ private final Collection<HoodieSourceSplit> splits;
+ private final String endInstant; // end instant to consume to
+ private final String offset; // monotonic increasing consumption offset
+
+ /**
+ * @param splits should never be null. But it can be an empty collection
+ * @param endInstant should never be null, end instant to consume to
+ * @param offset could be null. monotonic increasing consumption offset
+ */
+ public HoodieContinuousSplitBatch(
+ Collection<HoodieSourceSplit> splits,
+ String endInstant,
+ String offset) {
+
+ ValidationUtils.checkArgument(splits != null, "Invalid to splits
collection: null");
+ ValidationUtils.checkArgument(endInstant != null, "Invalid end instant:
null");
+ this.splits = splits;
+ this.endInstant = endInstant;
+ this.offset = offset;
+ }
+
+ public static HoodieContinuousSplitBatch
fromResult(IncrementalInputSplits.Result result) {
+ List<HoodieSourceSplit> splits =
result.getInputSplits().stream().map(split ->
+ new HoodieSourceSplit(
+ HoodieSourceSplit.SPLIT_COUNTER.incrementAndGet(),
+ split.getBasePath().orElse(null),
+ split.getLogPaths(), split.getTablePath(),
+ split.getMergeType(), split.getFileId()
+ )
+ ).collect(Collectors.toList());
+
+ return new HoodieContinuousSplitBatch(splits, result.getEndInstant(),
result.getOffset());
+ }
+
+ public Collection<HoodieSourceSplit> getSplits() {
+ return splits;
+ }
+
+ public String getEndInstant() {
+ return endInstant;
+ }
+
+ public String getOffset() {
+ return offset;
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java
similarity index 55%
copy from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
copy to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java
index de41d1d0e0f4..ec09fae0fa76 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java
@@ -16,24 +16,16 @@
* limitations under the License.
*/
-package org.apache.hudi.source.enumerator;
+package org.apache.hudi.source.split;
-import org.apache.hudi.source.split.HoodieSourceSplitState;
-
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.annotation.Internal;
/**
- * State of Hoodie split enumerator. Mainly include the states of pending
splits of split provider.
+ * Interface for continuously discover new splits from active timeline.
*/
-public class HoodieSplitEnumeratorState implements Serializable {
- private final Collection<HoodieSourceSplitState> pendingSplitStates;
-
- public HoodieSplitEnumeratorState(Collection<HoodieSourceSplitState>
pendingSplitStates) {
- this.pendingSplitStates = pendingSplitStates;
- }
+@Internal
+public interface HoodieContinuousSplitDiscover {
- public Collection<HoodieSourceSplitState> getPendingSplitStates() {
- return pendingSplitStates;
- }
+ /** Discover the files appended between {@code lastInstant} and the last
commit of current table */
+ HoodieContinuousSplitBatch discoverSplits(String lastInstant);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
index ee05fe0e8ba7..45260b2a1edd 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
@@ -26,13 +26,14 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * Hoodie SourceSplit implementation for source V2.
+ * Hoodie SourceSplit implementation for source V2. It has the same semantic
to the {@link org.apache.hudi.table.format.mor.MergeOnReadInputSplit}.
*/
public class HoodieSourceSplit implements SourceSplit, Serializable {
+ public static AtomicInteger SPLIT_COUNTER = new AtomicInteger(0);
private static final long serialVersionUID = 1L;
-
private static final long NUM_NO_CONSUMPTION = 0L;
// the split number
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index 31343c8e3af2..391daaf1e9e9 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -77,6 +77,7 @@ import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -560,4 +561,139 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
.findFirst()
.orElse(-1);
}
+
+ @Test
+ void testInputHoodieSourceSplits() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ metaClient.reloadActiveTimeline();
+ HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+ .filter(hoodieInstant ->
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
+
+ IncrementalInputSplits iis = IncrementalInputSplits.builder()
+ .conf(conf)
+ .path(new Path(basePath))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .build();
+
+ // Test inputHoodieSourceSplits method
+ org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+ iis.inputHoodieSourceSplits(metaClient,
firstInstant.getCompletionTime(), false);
+
+ assertNotNull(result, "Result should not be null");
+ assertNotNull(result.getSplits(), "Splits should not be null");
+ assertNotNull(result.getOffset(), "To instant should not be null");
+ }
+
+ @Test
+ void testInputHoodieSourceSplitsWithNullStartInstant() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ IncrementalInputSplits iis = IncrementalInputSplits.builder()
+ .conf(conf)
+ .path(new Path(basePath))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .build();
+
+ // Test with null start instant (should read from earliest)
+ org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+ iis.inputHoodieSourceSplits(metaClient, null, false);
+
+ assertNotNull(result, "Result should not be null");
+ assertNotNull(result.getSplits(), "Splits should not be null");
+ }
+
+ @Test
+ void testInputHoodieSourceSplitsWithNoNewInstants() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ metaClient.reloadActiveTimeline();
+ HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+ .filter(hoodieInstant ->
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ String lastInstant =
commitsTimeline.lastInstant().get().getCompletionTime();
+
+ IncrementalInputSplits iis = IncrementalInputSplits.builder()
+ .conf(conf)
+ .path(new Path(basePath))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .build();
+
+ // Query with the last instant - should return empty since there's nothing
new
+ org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+ iis.inputHoodieSourceSplits(metaClient, lastInstant, false);
+
+ assertNotNull(result, "Result should not be null");
+ assertNotNull(result.getSplits(), "Splits should not be null");
+ }
+
+ @Test
+ void testInputHoodieSourceSplitsConvertToHoodieSourceSplit() throws
Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ IncrementalInputSplits iis = IncrementalInputSplits.builder()
+ .conf(conf)
+ .path(new Path(basePath))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .build();
+
+ org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+ iis.inputHoodieSourceSplits(metaClient, null, false);
+
+ // Verify that splits are converted to HoodieSourceSplit type
+ result.getSplits().forEach(split -> {
+ assertTrue(split instanceof
org.apache.hudi.source.split.HoodieSourceSplit,
+ "Split should be of type HoodieSourceSplit");
+ assertNotNull(split.splitId(), "Split ID should not be null");
+ });
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testInputHoodieSourceSplitsWithDifferentTableTypes(HoodieTableType
tableType) throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, tableType);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+ conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ IncrementalInputSplits iis = IncrementalInputSplits.builder()
+ .conf(conf)
+ .path(new Path(basePath))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .build();
+
+ org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+ iis.inputHoodieSourceSplits(metaClient, null, false);
+
+ assertNotNull(result, "Result should not be null for table type: " +
tableType);
+ assertNotNull(result.getSplits(), "Splits should not be null for table
type: " + tableType);
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
new file mode 100644
index 000000000000..8fbcad14997b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link ScanContext}.
+ */
+public class TestScanContext {
+
+ @Test
+ public void testGetConf() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.PATH, "/tmp/test");
+
+ ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+
+ assertNotNull(scanContext.getConf(), "Configuration should not be null");
+ assertEquals("/tmp/test", scanContext.getConf().get(FlinkOptions.PATH),
+ "Configuration should match");
+ }
+
+ @Test
+ public void testGetPath() throws Exception {
+ Configuration conf = new Configuration();
+ Path expectedPath = new Path("/tmp/test/table");
+
+ ScanContext scanContext = createTestScanContext(conf, expectedPath,
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+
+ assertEquals(expectedPath, scanContext.getPath(), "Path should match");
+ }
+
+ @Test
+ public void testGetRowType() throws Exception {
+ Configuration conf = new Configuration();
+ RowType rowType = TestConfigurations.ROW_TYPE;
+
+ ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ rowType, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+
+ assertNotNull(scanContext.getRowType(), "RowType should not be null");
+ assertEquals(rowType, scanContext.getRowType(), "RowType should match");
+ }
+
+ @Test
+ public void testGetStartInstant() throws Exception {
+ Configuration conf = new Configuration();
+ String expectedInstant = "20231201000000000";
+
+ ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, expectedInstant, 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+
+ assertEquals(expectedInstant, scanContext.getStartCommit(),
+ "Start instant should match");
+ }
+
+ @Test
+ public void testGetMaxCompactionMemoryInBytes() throws Exception {
+ Configuration conf = new Configuration();
+ long expectedMemory = 1024L * 1024L * 1024L; // 1GB
+
+ ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", expectedMemory,
+ 1000, false, false, false, false);
+
+ assertEquals(expectedMemory, scanContext.getMaxCompactionMemoryInBytes(),
+ "Max compaction memory should match");
+ }
+
+ @Test
+ public void testGetMaxPendingSplits() throws Exception {
+ Configuration conf = new Configuration();
+ long expectedMaxPendingSplits = 5000L;
+
+ ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ expectedMaxPendingSplits, false, false, false, false);
+
+ assertEquals(expectedMaxPendingSplits, scanContext.getMaxPendingSplits(),
+ "Max pending splits should match");
+ }
+
+ @Test
+ public void testSkipCompaction() throws Exception {
+ Configuration conf = new Configuration();
+
+ ScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, true, false, false, false);
+ assertTrue(scanContextTrue.skipCompaction(), "Skip compaction should be
true");
+
+ ScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+ assertFalse(scanContextFalse.skipCompaction(), "Skip compaction should be
false");
+ }
+
+ @Test
+ public void testSkipClustering() throws Exception {
+ Configuration conf = new Configuration();
+
+ ScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, true, false, false);
+ assertTrue(scanContextTrue.skipClustering(), "Skip clustering should be
true");
+
+ ScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+ assertFalse(scanContextFalse.skipClustering(), "Skip clustering should be
false");
+ }
+
+ @Test
+ public void testSkipInsertOverwrite() throws Exception {
+ Configuration conf = new Configuration();
+
+ ScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, true, false);
+ assertTrue(scanContextTrue.skipInsertOverwrite(), "Skip insert overwrite
should be true");
+
+ ScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+ assertFalse(scanContextFalse.skipInsertOverwrite(), "Skip insert overwrite
should be false");
+ }
+
+ @Test
+ public void testCdcEnabled() throws Exception {
+ Configuration conf = new Configuration();
+
+ ScanContext scanContextTrue = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, true);
+ assertTrue(scanContextTrue.cdcEnabled(), "CDC should be enabled");
+
+ ScanContext scanContextFalse = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+ assertFalse(scanContextFalse.cdcEnabled(), "CDC should be disabled");
+ }
+
+ @Test
+ public void testGetScanInterval() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 5);
+
+ ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+ Duration scanInterval = scanContext.getScanInterval();
+
+ assertNotNull(scanInterval, "Scan interval should not be null");
+ assertEquals(Duration.ofSeconds(5), scanInterval, "Scan interval should be
5 minutes");
+ }
+
+ @Test
+ public void testGetScanIntervalDefaultValue() throws Exception {
+ Configuration conf = new Configuration();
+ // Not setting READ_STREAMING_CHECK_INTERVAL to use default
+
+ ScanContext scanContext = createTestScanContext(conf, new
Path("/tmp/test"),
+ TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+ 1000, false, false, false, false);
+ Duration scanInterval = scanContext.getScanInterval();
+
+ assertNotNull(scanInterval, "Scan interval should not be null");
+
assertEquals(Duration.ofSeconds(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.defaultValue()),
+ scanInterval, "Scan interval should use default value");
+ }
+
+ @Test
+ public void testAllFieldsInitialized() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 10);
+ Path path = new Path("/tmp/hoodie/table");
+ String startInstant = "20231215120000000";
+ long maxCompactionMemory = 2L * 1024L * 1024L * 1024L; // 2GB
+ long maxPendingSplits = 10000L;
+
+ ScanContext scanContext = createTestScanContext(conf, path,
+ TestConfigurations.ROW_TYPE, startInstant, maxCompactionMemory,
+ maxPendingSplits, true, true, true, true);
+
+ assertEquals(path, scanContext.getPath());
+ assertEquals(startInstant, scanContext.getStartCommit());
+ assertEquals(maxCompactionMemory,
scanContext.getMaxCompactionMemoryInBytes());
+ assertEquals(maxPendingSplits, scanContext.getMaxPendingSplits());
+ assertTrue(scanContext.skipCompaction());
+ assertTrue(scanContext.skipClustering());
+ assertTrue(scanContext.skipInsertOverwrite());
+ assertTrue(scanContext.cdcEnabled());
+ assertEquals(Duration.ofSeconds(10), scanContext.getScanInterval());
+ }
+
+ // Helper method to create ScanContext using the Builder
+ private ScanContext createTestScanContext(
+ Configuration conf,
+ Path path,
+ RowType rowType,
+ String startInstant,
+ long maxCompactionMemoryInBytes,
+ long maxPendingSplits,
+ boolean skipCompaction,
+ boolean skipClustering,
+ boolean skipInsertOverwrite,
+ boolean cdcEnabled) throws Exception {
+ return new ScanContext.Builder()
+ .conf(conf)
+ .path(path)
+ .rowType(rowType)
+ .startInstant(startInstant)
+ .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
+ .maxPendingSplits(maxPendingSplits)
+ .skipCompaction(skipCompaction)
+ .skipClustering(skipClustering)
+ .skipInsertOverwrite(skipInsertOverwrite)
+ .cdcEnabled(cdcEnabled)
+ .build();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
new file mode 100644
index 000000000000..0559afd88aaa
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.split.DefaultHoodieSplitProvider;
+import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
+import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieContinuousSplitEnumerator}.
+ */
+public class TestHoodieContinuousSplitEnumerator {
+ private MockSplitEnumeratorContext context;
+ private DefaultHoodieSplitProvider splitProvider;
+ private MockContinuousSplitDiscover splitDiscover;
+ private ScanContext scanContext;
+ private HoodieContinuousSplitEnumerator enumerator;
+ private HoodieSourceSplit split1;
+ private HoodieSourceSplit split2;
+ private HoodieSourceSplit split3;
+
+ @BeforeEach
+ public void setUp() {
+ context = new MockSplitEnumeratorContext();
+ splitProvider = new DefaultHoodieSplitProvider();
+
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.PATH, "/tmp/test");
+ conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1);
+
+ scanContext = TestScanContext.builder()
+ .conf(conf)
+ .path(new Path("/tmp/test"))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant("20231201000000000")
+ .maxPendingSplits(1000)
+ .build();
+
+ splitDiscover = new MockContinuousSplitDiscover();
+
+ split1 = createTestSplit(1, "file1");
+ split2 = createTestSplit(2, "file2");
+ split3 = createTestSplit(3, "file3");
+ }
+
+ @Test
+ public void testStartEnumerator() {
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+ enumerator.start();
+
+ // Verify that async task was scheduled
+ assertEquals(1, context.getAsyncCallCount(), "Should schedule async split
discovery");
+ }
+
+ @Test
+ public void testEnumeratorWithInitialState() {
+ HoodieSplitEnumeratorState state = new HoodieSplitEnumeratorState(
+ Collections.emptyList(), Option.of("20231201120000000"),
Option.of("20231201120001000"));
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.of(state));
+
+ assertNotNull(enumerator, "Enumerator should be created with state");
+ }
+
+ @Test
+ public void testEnumeratorWithoutInitialState() {
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+
+ assertNotNull(enumerator, "Enumerator should be created without state");
+ }
+
+ @Test
+ public void testShouldWaitForMoreSplits() {
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+
+ assertTrue(enumerator.shouldWaitForMoreSplits(),
+ "Continuous enumerator should always wait for more splits");
+ }
+
+ @Test
+ public void testSnapshotState() throws Exception {
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+ splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2));
+
+ HoodieSplitEnumeratorState state = enumerator.snapshotState(1L);
+
+ assertNotNull(state, "Snapshot state should not be null");
+ assertNotNull(state.getPendingSplitStates(), "Split state should not be
null");
+ }
+
+ @Test
+ public void testDiscoverSplitsWhenBelowThreshold() {
+ splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+ Arrays.asList(split1, split2), "20231201000000000",
"20231201120000000"));
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+ enumerator.start();
+
+ // Trigger async callback manually
+ context.executeAsyncCallbacks();
+
+ // Verify splits were discovered and added
+ assertEquals(2, splitProvider.pendingSplitCount(),
+ "Should discover and add splits when below threshold");
+ }
+
+ @Test
+ public void testDiscoverSplitsPausedWhenAboveThreshold() {
+ // Fill provider with many splits to exceed threshold
+ List<HoodieSourceSplit> manySplits = new ArrayList<>();
+ for (int i = 0; i < 1100; i++) {
+ manySplits.add(createTestSplit(i, "file" + i));
+ }
+ splitProvider.onDiscoveredSplits(manySplits);
+
+ splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+ Arrays.asList(split1, split2), "20231201000000000",
"20231201120000000"));
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+ enumerator.start();
+
+ int initialCount = splitProvider.pendingSplitCount();
+ context.executeAsyncCallbacks();
+
+ // Verify no new splits were added due to threshold
+ assertEquals(initialCount, splitProvider.pendingSplitCount(),
+ "Should not discover splits when above threshold");
+ }
+
+ @Test
+ public void testProcessDiscoveredSplitsWithMatchingInstant() {
+ splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+ Arrays.asList(split1, split2), "20231201000000000",
"20231201120000000"));
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+ enumerator.start();
+ context.executeAsyncCallbacks();
+
+ assertEquals(2, splitProvider.pendingSplitCount(),
+ "Should add splits when instant matches");
+ }
+
+ @Test
+ public void testProcessDiscoveredSplitsWithEmptyResult() {
+ splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+ Collections.emptyList(), "20231201000000000", "20231201120000000"));
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+ enumerator.start();
+ context.executeAsyncCallbacks();
+
+ assertEquals(0, splitProvider.pendingSplitCount(),
+ "Should handle empty split batch");
+ }
+
+ @Test
+ public void testProcessDiscoveredSplitsWithException() {
+ splitDiscover.setThrowException(true);
+
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+ enumerator.start();
+
+ try {
+ context.executeAsyncCallbacks();
+ } catch (RuntimeException e) {
+ assertTrue(e.getMessage().contains("Failed to discover new splits"),
+ "Should throw exception with correct message");
+ }
+ }
+
+ @Test
+ public void testHandleSplitRequest() {
+ splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2));
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+ enumerator.start();
+
+ context.registerReader(new ReaderInfo(0, "localhost"));
+ enumerator.handleSplitRequest(0, "localhost");
+
+ assertTrue(context.getAssignedSplits().containsKey(0),
+ "Should assign splits to requesting reader");
+ }
+
+ @Test
+ public void testAddSplitsBack() {
+ enumerator = new HoodieContinuousSplitEnumerator(
+ context, splitProvider, splitDiscover, scanContext, Option.empty());
+ enumerator.start();
+
+ enumerator.addSplitsBack(Arrays.asList(split1), 0);
+
+ assertEquals(1, splitProvider.pendingSplitCount(),
+ "Split should be added back to provider");
+ }
+
+ private HoodieSourceSplit createTestSplit(int splitNum, String fileId) {
+ return new HoodieSourceSplit(
+ splitNum,
+ "basePath_" + splitNum,
+ Option.empty(),
+ "/table/path",
+ "read_optimized",
+ fileId
+ );
+ }
+
+ /**
+ * Mock implementation of HoodieContinuousSplitDiscover for testing.
+ */
+ private static class MockContinuousSplitDiscover implements
HoodieContinuousSplitDiscover {
+ private HoodieContinuousSplitBatch nextBatch =
HoodieContinuousSplitBatch.EMPTY;
+ private boolean throwException = false;
+
+ public void setNextBatch(HoodieContinuousSplitBatch batch) {
+ this.nextBatch = batch;
+ }
+
+ public void setThrowException(boolean throwException) {
+ this.throwException = throwException;
+ }
+
+ @Override
+ public HoodieContinuousSplitBatch discoverSplits(String lastInstant) {
+ if (throwException) {
+ throw new RuntimeException("Mock exception during split discovery");
+ }
+ return nextBatch;
+ }
+ }
+
+ /**
+ * Mock implementation of SplitEnumeratorContext for testing.
+ */
+ private static class MockSplitEnumeratorContext implements
SplitEnumeratorContext<HoodieSourceSplit> {
+ private final Map<Integer, ReaderInfo> registeredReaders = new HashMap<>();
+ private final Map<Integer, List<HoodieSourceSplit>> assignedSplits = new
HashMap<>();
+ private final List<Integer> noMoreSplitsSignaled = new ArrayList<>();
+ private final List<AsyncTask<?>> asyncTasks = new ArrayList<>();
+ private final AtomicInteger asyncCallCount = new AtomicInteger(0);
+
+ public void registerReader(ReaderInfo readerInfo) {
+ registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+ }
+
+ public void unregisterReader(int subtaskId) {
+ registeredReaders.remove(subtaskId);
+ }
+
+ public Map<Integer, List<HoodieSourceSplit>> getAssignedSplits() {
+ return assignedSplits;
+ }
+
+ public List<Integer> getNoMoreSplitsSignaled() {
+ return noMoreSplitsSignaled;
+ }
+
+ public int getAsyncCallCount() {
+ return asyncCallCount.get();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void executeAsyncCallbacks() {
+ for (AsyncTask<?> task : asyncTasks) {
+ try {
+ Object result = task.callable.call();
+ ((BiConsumer<Object, Throwable>) task.handler).accept(result, null);
+ } catch (Exception e) {
+ ((BiConsumer<Object, Throwable>) task.handler).accept(null, e);
+ }
+ }
+ }
+
+ @Override
+ public SplitEnumeratorMetricGroup metricGroup() {
+ return UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup();
+ }
+
+ @Override
+ public void sendEventToSourceReader(int subtaskId,
org.apache.flink.api.connector.source.SourceEvent event) {
+ // No-op for testing
+ }
+
+ @Override
+ public int currentParallelism() {
+ return registeredReaders.size();
+ }
+
+ @Override
+ public Map<Integer, ReaderInfo> registeredReaders() {
+ return new HashMap<>(registeredReaders);
+ }
+
+ @Override
+ public void assignSplits(SplitsAssignment<HoodieSourceSplit>
newSplitAssignments) {
+ newSplitAssignments.assignment().forEach((subtask, splits) -> {
+ assignedSplits.computeIfAbsent(subtask, k -> new
ArrayList<>()).addAll(splits);
+ });
+ }
+
+ @Override
+ public void assignSplit(HoodieSourceSplit split, int subtask) {
+ assignedSplits.computeIfAbsent(subtask, k -> new
ArrayList<>()).add(split);
+ }
+
+ @Override
+ public void signalNoMoreSplits(int subtask) {
+ noMoreSplitsSignaled.add(subtask);
+ }
+
+ @Override
+ public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable>
handler) {
+ asyncCallCount.incrementAndGet();
+ asyncTasks.add(new AsyncTask<>(callable, handler));
+ }
+
+ @Override
+ public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable>
handler, long initialDelay, long period) {
+ asyncCallCount.incrementAndGet();
+ asyncTasks.add(new AsyncTask<>(callable, handler));
+ }
+
+ @Override
+ public void runInCoordinatorThread(Runnable runnable) {
+ runnable.run();
+ }
+
+ private static class AsyncTask<T> {
+ final Callable<T> callable;
+ final BiConsumer<T, Throwable> handler;
+
+ AsyncTask(Callable<T> callable, BiConsumer<T, Throwable> handler) {
+ this.callable = callable;
+ this.handler = handler;
+ }
+ }
+ }
+
+ /**
+ * Test implementation of ScanContext for testing.
+ */
+ private static class TestScanContext extends ScanContext {
+ private TestScanContext(
+ Configuration conf,
+ Path path,
+ org.apache.flink.table.types.logical.RowType rowType,
+ String startInstant,
+ long maxPendingSplits) {
+ super(conf, path, rowType, startInstant, "",0L, maxPendingSplits, false,
false, false, false);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Configuration conf;
+ private Path path;
+ private org.apache.flink.table.types.logical.RowType rowType;
+ private String startInstant;
+ private long maxPendingSplits = 1000;
+
+ public Builder conf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder path(Path path) {
+ this.path = path;
+ return this;
+ }
+
+ public Builder rowType(org.apache.flink.table.types.logical.RowType
rowType) {
+ this.rowType = rowType;
+ return this;
+ }
+
+ public Builder startInstant(String startInstant) {
+ this.startInstant = startInstant;
+ return this;
+ }
+
+ public Builder maxPendingSplits(long maxPendingSplits) {
+ this.maxPendingSplits = maxPendingSplits;
+ return this;
+ }
+
+ public ScanContext build() {
+ return new TestScanContext(conf, path, rowType, startInstant,
maxPendingSplits);
+ }
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
new file mode 100644
index 000000000000..3c437e0e7e35
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Test cases for {@link DefaultHoodieSplitDiscover}.
+ */
+public class TestDefaultHoodieSplitDiscover extends HoodieCommonTestHarness {
+
+ @BeforeEach
+ void init() {
+ initPath();
+ }
+
+ @Test
+ void testDiscoverSplitsWithNoNewInstants() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ metaClient.reloadActiveTimeline();
+ HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+ .filter(hoodieInstant ->
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ String lastInstant =
commitsTimeline.lastInstant().get().getCompletionTime();
+
+ ScanContext scanContext = createScanContext(conf);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext, metaClient);
+
+ // Query with the last instant - should return empty or minimal splits
+ HoodieContinuousSplitBatch result = discover.discoverSplits(lastInstant);
+
+ assertNotNull(result, "Result should not be null");
+ assertNotNull(result.getSplits(), "Splits should not be null");
+ }
+
+ @Test
+ void testDiscoverSplitsWithNewInstants() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+ // Insert initial data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ metaClient.reloadActiveTimeline();
+ HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+ .filter(hoodieInstant ->
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
+
+ // Insert more data to create new instants
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ metaClient.reloadActiveTimeline();
+
+ ScanContext scanContext = createScanContext(conf);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext, metaClient);
+
+ // Discover splits after the first instant
+ HoodieContinuousSplitBatch result =
discover.discoverSplits(firstInstant.getCompletionTime());
+
+ assertNotNull(result, "Result should not be null");
+ assertNotNull(result.getSplits(), "Splits should not be null");
+ assertNotNull(result.getOffset(), "To instant should not be null");
+ }
+
+ @Test
+ void testDiscoverSplitsFromEarliest() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ ScanContext scanContext = createScanContext(conf);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext, metaClient);
+
+ // Discover splits from null (earliest)
+ HoodieContinuousSplitBatch result = discover.discoverSplits(null);
+
+ assertNotNull(result, "Result should not be null");
+ assertNotNull(result.getSplits(), "Splits should not be null");
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testDiscoverSplitsWithDifferentTableTypes(HoodieTableType tableType)
throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, tableType);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ ScanContext scanContext = createScanContext(conf);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext, metaClient);
+
+ HoodieContinuousSplitBatch result = discover.discoverSplits(null);
+
+ assertNotNull(result, "Result should not be null for table type: " +
tableType);
+ assertNotNull(result.getSplits(), "Splits should not be null for table
type: " + tableType);
+ }
+
+ @Test
+ void testDiscoverSplitsReturnsCorrectInstantRange() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+ // Insert multiple commits
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ metaClient.reloadActiveTimeline();
+ HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+ .filter(hoodieInstant ->
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
+ String firstCompletionTime = firstInstant.getCompletionTime();
+
+ ScanContext scanContext = createScanContext(conf);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext, metaClient);
+
+ HoodieContinuousSplitBatch result =
discover.discoverSplits(firstCompletionTime);
+
+ assertNotNull(result, "Result should not be null");
+ assertNotNull(result.getOffset(), "To instant should not be null");
+ }
+
+ @Test
+ void testDiscoverSplitsWithSkipOptions() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true);
+ conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
+
+ // Insert test data
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ ScanContext scanContext = createScanContextWithSkipOptions(conf, true,
true, false);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext, metaClient);
+
+ HoodieContinuousSplitBatch result =
discover.discoverSplits(scanContext.getStartCommit());
+
+ assertNotNull(result, "Result should not be null");
+ assertNotNull(result.getSplits(), "Splits should not be null");
+ }
+
+ @Test
+ void testDiscoverSplitsConstructor() throws Exception {
+ metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+
+ ScanContext scanContext = createScanContext(conf);
+ DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+ scanContext, metaClient);
+
+ assertNotNull(discover, "Discover instance should not be null");
+ }
+
+ // Helper methods
+
+ private ScanContext createScanContext(Configuration conf) throws Exception {
+ return createScanContextWithSkipOptions(conf, false, false, false);
+ }
+
+ private ScanContext createScanContextWithSkipOptions(
+ Configuration conf,
+ boolean skipCompaction,
+ boolean skipClustering,
+ boolean skipInsertOverwrite) throws Exception {
+ return new ScanContext.Builder()
+ .conf(conf)
+ .path(new Path(basePath))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .startInstant(conf.get(FlinkOptions.READ_START_COMMIT))
+ .endInstant("")
+ .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+ .maxPendingSplits(1000)
+ .skipCompaction(skipCompaction)
+ .skipClustering(skipClustering)
+ .skipInsertOverwrite(skipInsertOverwrite)
+ .cdcEnabled(false)
+ .build();
+ }
+}