This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7357fbb9a16c feat: add flink continuous split enumerator (#17562)
7357fbb9a16c is described below

commit 7357fbb9a16cbd433c5813d9485cd5bdac8e1cd3
Author: Peter Huang <[email protected]>
AuthorDate: Wed Dec 17 19:02:04 2025 -0800

    feat: add flink continuous split enumerator (#17562)
---
 .../apache/hudi/source/IncrementalInputSplits.java |  25 ++
 .../java/org/apache/hudi/source/ScanContext.java   | 212 ++++++++++
 .../enumerator/AbstractHoodieSplitEnumerator.java  |   2 +-
 .../HoodieContinuousSplitEnumerator.java           | 126 ++++++
 .../enumerator/HoodieEnumeratorPosition.java       |  93 +++++
 .../enumerator/HoodieSplitEnumeratorState.java     |  18 +-
 .../source/split/DefaultHoodieSplitDiscover.java   |  57 +++
 .../source/split/HoodieContinuousSplitBatch.java   |  79 ++++
 .../HoodieContinuousSplitDiscover.java}            |  22 +-
 .../hudi/source/split/HoodieSourceSplit.java       |   5 +-
 .../hudi/source/TestIncrementalInputSplits.java    | 136 +++++++
 .../org/apache/hudi/source/TestScanContext.java    | 257 ++++++++++++
 .../TestHoodieContinuousSplitEnumerator.java       | 448 +++++++++++++++++++++
 .../split/TestDefaultHoodieSplitDiscover.java      | 234 +++++++++++
 14 files changed, 1695 insertions(+), 19 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 4258a49a36ed..052f0c94cac4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -39,6 +39,7 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.format.cdc.CdcInputSplit;
@@ -245,6 +246,11 @@ public class IncrementalInputSplits implements 
Serializable {
       HoodieTableMetaClient metaClient,
       @Nullable String issuedOffset,
       boolean cdcEnabled) {
+
+    if (metaClient == null) {
+      throw new IllegalArgumentException("metaClient must not be null");
+    }
+
     metaClient.reloadActiveTimeline();
     IncrementalQueryAnalyzer analyzer = IncrementalQueryAnalyzer.builder()
         .metaClient(metaClient)
@@ -303,6 +309,25 @@ public class IncrementalInputSplits implements 
Serializable {
     }
   }
 
+  /**
+   * Returns the incremental Hoodie source split batch.
+   *
+   * @param metaClient    The meta client
+   * @param startInstant  The start Instant of the splits
+   * @param cdcEnabled    Whether cdc is enabled
+   *
+   * @return The list of incremental input splits or empty if there are no new 
instants
+   */
+  public HoodieContinuousSplitBatch inputHoodieSourceSplits(
+      HoodieTableMetaClient metaClient,
+      @Nullable String startInstant,
+      boolean cdcEnabled) {
+    Result result = inputSplits(metaClient, startInstant, cdcEnabled);
+
+
+    return HoodieContinuousSplitBatch.fromResult(result);
+  }
+
   /**
    * Returns the input splits for streaming incremental read.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
new file mode 100644
index 000000000000..c0b8eca81acb
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ScanContext.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * Hudi source scan context.
+ */
+@Internal
+public class ScanContext implements Serializable {
+  private final Configuration conf;
+  private final Path path;
+  private final RowType rowType;
+  private final String startCommit;
+  private final String endCommit;
+  private final long maxCompactionMemoryInBytes;
+  // max pending splits that are not assigned in split provider
+  private final long maxPendingSplits;
+  // skip compaction
+  private final boolean skipCompaction;
+  // skip clustering
+  private final boolean skipClustering;
+  // skip insert overwrite
+  private final boolean skipInsertOverwrite;
+  // cdc enabled
+  private final boolean cdcEnabled;
+
+  public ScanContext(
+      Configuration conf,
+      Path path,
+      RowType rowType,
+      String startCommit,
+      String endCommit,
+      long maxCompactionMemoryInBytes,
+      long maxPendingSplits,
+      boolean skipCompaction,
+      boolean skipClustering,
+      boolean skipInsertOverwrite,
+      boolean cdcEnabled) {
+    this.conf = conf;
+    this.path = path;
+    this.rowType = rowType;
+    this.startCommit = startCommit;
+    this.endCommit = endCommit;
+    this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+    this.maxPendingSplits = maxPendingSplits;
+    this.skipCompaction = skipCompaction;
+    this.skipClustering = skipClustering;
+    this.skipInsertOverwrite = skipInsertOverwrite;
+    this.cdcEnabled = cdcEnabled;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  public RowType getRowType() {
+    return rowType;
+  }
+
+  public String getStartCommit() {
+    return startCommit;
+  }
+
+  public String getEndCommit() {
+    return endCommit;
+  }
+
+  public long getMaxCompactionMemoryInBytes() {
+    return maxCompactionMemoryInBytes;
+  }
+
+  public long getMaxPendingSplits() {
+    return maxPendingSplits;
+  }
+
+  public boolean skipCompaction() {
+    return skipCompaction;
+  }
+
+  public boolean skipClustering() {
+    return skipClustering;
+  }
+
+  public boolean skipInsertOverwrite() {
+    return skipInsertOverwrite;
+  }
+
+  public boolean cdcEnabled() {
+    return cdcEnabled;
+  }
+
+  public Duration getScanInterval() {
+    return 
Duration.ofSeconds(conf.get(FlinkOptions.READ_STREAMING_CHECK_INTERVAL));
+  }
+
+  /**
+   * Builder for {@link ScanContext}.
+   */
+  public static class Builder {
+    private Configuration conf;
+    private Path path;
+    private RowType rowType;
+    private String startInstant;
+    private String endInstant;
+    private long maxCompactionMemoryInBytes;
+    private long maxPendingSplits;
+    private boolean skipCompaction;
+    private boolean skipClustering;
+    private boolean skipInsertOverwrite;
+    private boolean cdcEnabled;
+
+    public Builder conf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder path(Path path) {
+      this.path = path;
+      return this;
+    }
+
+    public Builder rowType(RowType rowType) {
+      this.rowType = rowType;
+      return this;
+    }
+
+    public Builder startInstant(String startInstant) {
+      this.startInstant = startInstant;
+      return this;
+    }
+
+    public Builder endInstant(String endInstant) {
+      this.endInstant = endInstant;
+      return this;
+    }
+
+    public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes) 
{
+      this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+      return this;
+    }
+
+    public Builder maxPendingSplits(long maxPendingSplits) {
+      this.maxPendingSplits = maxPendingSplits;
+      return this;
+    }
+
+    public Builder skipCompaction(boolean skipCompaction) {
+      this.skipCompaction = skipCompaction;
+      return this;
+    }
+
+    public Builder skipClustering(boolean skipClustering) {
+      this.skipClustering = skipClustering;
+      return this;
+    }
+
+    public Builder skipInsertOverwrite(boolean skipInsertOverwrite) {
+      this.skipInsertOverwrite = skipInsertOverwrite;
+      return this;
+    }
+
+    public Builder cdcEnabled(boolean cdcEnabled) {
+      this.cdcEnabled = cdcEnabled;
+      return this;
+    }
+
+    public ScanContext build() {
+      return new ScanContext(
+          conf,
+          path,
+          rowType,
+          startInstant,
+          endInstant,
+          maxCompactionMemoryInBytes,
+          maxPendingSplits,
+          skipCompaction,
+          skipClustering,
+          skipInsertOverwrite,
+          cdcEnabled);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
index 331ee0d7e22d..6693ea20da92 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/AbstractHoodieSplitEnumerator.java
@@ -116,7 +116,7 @@ abstract class AbstractHoodieSplitEnumerator
 
   @Override
   public HoodieSplitEnumeratorState snapshotState(long checkpointId) throws 
Exception {
-    return new HoodieSplitEnumeratorState(splitProvider.state());
+    return new HoodieSplitEnumeratorState(splitProvider.state(), 
Option.empty(), Option.empty());
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
new file mode 100644
index 000000000000..b45acd60374c
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
+import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.source.split.HoodieSplitProvider;
+
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Continuous Hoodie enumerator that discovers Hoodie splits from new Hoodie 
commits of upstream Hoodie table.
+ */
+public class HoodieContinuousSplitEnumerator extends 
AbstractHoodieSplitEnumerator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieContinuousSplitEnumerator.class);
+
+  private final SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext;
+  private final HoodieSplitProvider splitProvider;
+  private final HoodieContinuousSplitDiscover splitDiscover;
+  private final ScanContext scanContext;
+
+  /**
+   * Instant for the last enumerated commit. Next incremental enumeration 
should be based off
+   * this as the starting position.
+   */
+  private final AtomicReference<HoodieEnumeratorPosition> position;
+
+  public HoodieContinuousSplitEnumerator(
+      SplitEnumeratorContext<HoodieSourceSplit> enumeratorContext,
+      HoodieSplitProvider splitProvider,
+      HoodieContinuousSplitDiscover splitDiscover,
+      ScanContext scanContext,
+      Option<HoodieSplitEnumeratorState> enumStateOpt) {
+    super(enumeratorContext, splitProvider);
+    this.enumeratorContext = enumeratorContext;
+    this.splitProvider = splitProvider;
+    this.splitDiscover = splitDiscover;
+    this.scanContext = scanContext;
+    this.position = new AtomicReference<>();
+
+    if (enumStateOpt.isPresent()) {
+      
this.position.set(HoodieEnumeratorPosition.of(enumStateOpt.get().getLastEnumeratedInstant(),
 enumStateOpt.get().getLastEnumeratedInstantOffset()));
+    } else {
+      // We need to set the instantOffset as null for the first read. For 
first read, the start instant is inclusive,
+      // while for continuous incremental read, the start instant is exclusive.
+      this.position.set(HoodieEnumeratorPosition.of(Option.empty(), 
Option.empty()));
+    }
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    enumeratorContext.callAsync(
+        this::discoverSplits,
+        this::processDiscoveredSplits,
+        0L,
+        scanContext.getScanInterval().toMillis());
+  }
+
+  @Override
+  boolean shouldWaitForMoreSplits() {
+    return true;
+  }
+
+  @Override
+  public HoodieSplitEnumeratorState snapshotState(long checkpointId) throws 
Exception {
+    return new HoodieSplitEnumeratorState(splitProvider.state(), 
position.get().issuedInstant(), position.get().issuedOffset());
+  }
+
+  private HoodieContinuousSplitBatch discoverSplits() {
+    int pendingSplitNumber = splitProvider.pendingSplitCount();
+    if (pendingSplitNumber > scanContext.getMaxPendingSplits()) {
+      LOG.info(
+          "Pause split discovery as the assigner already has too many pending 
splits: {}",
+          pendingSplitNumber);
+      return HoodieContinuousSplitBatch.EMPTY;
+    }
+    return 
splitDiscover.discoverSplits(position.get().issuedOffset().isPresent() ? 
position.get().issuedOffset().get() : null);
+  }
+
+  private void processDiscoveredSplits(HoodieContinuousSplitBatch result, 
Throwable throwable) {
+    if (throwable != null) {
+      throw new RuntimeException("Failed to discover new splits", throwable);
+    }
+
+    if (!result.getSplits().isEmpty()) {
+      splitProvider.onDiscoveredSplits(result.getSplits());
+      LOG.debug(
+          "Added {} splits discovered between ({}, {}] to the assigner",
+          result.getSplits().size(),
+          position.get().issuedOffset(),
+          result.getOffset());
+    } else {
+      LOG.debug(
+          "No new splits discovered between ({}, {}]",
+          position.get().issuedOffset(),
+          result.getOffset());
+    }
+    position.set(HoodieEnumeratorPosition.of(result.getEndInstant(), 
result.getOffset()));
+    LOG.info("Update enumerator position to {}", position.get());
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
new file mode 100644
index 000000000000..05eb45609830
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieEnumeratorPosition.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The position of {@link HoodieContinuousSplitEnumerator}. The completionTime 
notion which is introduced in incremental
+ * read mainly for scenarios of “hollow” instants: instants start very early 
but finished long time after, the completion
+ * time based inc query gives clear semantics of the consumption boundaries. 
The instantTime is from legacy readers and
+ * now just used for logging purposes.
+ */
+public class HoodieEnumeratorPosition implements Serializable {
+  // Max instant time consumed
+  private final Option<String> issuedInstant;
+  // Max completion time
+  private final Option<String> issuedOffset;
+
+  static HoodieEnumeratorPosition empty() {
+    return new HoodieEnumeratorPosition(Option.empty(), Option.empty());
+  }
+
+  static HoodieEnumeratorPosition of(String lastInstant, String 
lastInstantCompletionTime) {
+    return new HoodieEnumeratorPosition(lastInstant, 
lastInstantCompletionTime);
+  }
+
+  static HoodieEnumeratorPosition of(Option<String> lastInstant, 
Option<String> lastInstantCompletionTime) {
+    return new HoodieEnumeratorPosition(lastInstant, 
lastInstantCompletionTime);
+  }
+
+  private HoodieEnumeratorPosition(String issuedInstant, String issuedOffset) {
+    this.issuedInstant = StringUtils.isNullOrEmpty(issuedInstant) ? 
Option.empty() : Option.of(issuedInstant);
+    this.issuedOffset = StringUtils.isNullOrEmpty(issuedOffset) ? 
Option.empty() : Option.of(issuedOffset);
+  }
+
+  private HoodieEnumeratorPosition(Option<String> issuedInstant, 
Option<String> issuedOffset) {
+    this.issuedInstant = issuedInstant;
+    this.issuedOffset = issuedOffset;
+  }
+
+  public Option<String> issuedInstant() {
+    return issuedInstant;
+  }
+
+  public Option<String> issuedOffset() {
+    return issuedOffset;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    HoodieEnumeratorPosition that = (HoodieEnumeratorPosition) o;
+    return Objects.equals(issuedInstant, that.issuedInstant) && 
Objects.equals(issuedOffset, that.issuedOffset);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(issuedInstant, issuedOffset);
+  }
+
+  @Override
+  public String toString() {
+    return "HoodieEnumeratorPosition{"
+        + "issuedInstant=" + issuedInstant
+        + ", issuedOffset=" + issuedOffset
+        + '}';
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
index de41d1d0e0f4..42c4563a51a6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source.enumerator;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.source.split.HoodieSourceSplitState;
 
 import java.io.Serializable;
@@ -27,13 +28,28 @@ import java.util.Collection;
  * State of Hoodie split enumerator. Mainly include the states of pending 
splits of split provider.
  */
 public class HoodieSplitEnumeratorState implements Serializable {
+  private final Option<String> lastEnumeratedInstant;
+  private final Option<String> lastEnumeratedInstantOffset;
   private final Collection<HoodieSourceSplitState> pendingSplitStates;
 
-  public HoodieSplitEnumeratorState(Collection<HoodieSourceSplitState> 
pendingSplitStates) {
+  public HoodieSplitEnumeratorState(
+      Collection<HoodieSourceSplitState> pendingSplitStates,
+      Option<String> lastEnumeratedInstant,
+      Option<String> lastEnumeratedInstantOffset) {
     this.pendingSplitStates = pendingSplitStates;
+    this.lastEnumeratedInstant = lastEnumeratedInstant;
+    this.lastEnumeratedInstantOffset = lastEnumeratedInstantOffset;
   }
 
   public Collection<HoodieSourceSplitState> getPendingSplitStates() {
     return pendingSplitStates;
   }
+
+  public Option<String> getLastEnumeratedInstant() {
+    return lastEnumeratedInstant;
+  }
+
+  public Option<String> getLastEnumeratedInstantOffset() {
+    return lastEnumeratedInstantOffset;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
new file mode 100644
index 000000000000..718b78fe966e
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/DefaultHoodieSplitDiscover.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.source.IncrementalInputSplits;
+import org.apache.hudi.source.ScanContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of HoodieContinuousSplitDiscover.
+ */
+public class DefaultHoodieSplitDiscover implements 
HoodieContinuousSplitDiscover {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DefaultHoodieSplitDiscover.class);
+
+  private final HoodieTableMetaClient metaClient;
+  private final ScanContext scanContext;
+  private final IncrementalInputSplits incrementalInputSplits;
+
+  public DefaultHoodieSplitDiscover(
+      ScanContext scanContext,
+      HoodieTableMetaClient metaClient) {
+    this.scanContext = scanContext;
+    this.metaClient = metaClient;
+    this.incrementalInputSplits = IncrementalInputSplits.builder()
+        .conf(scanContext.getConf())
+        .path(scanContext.getPath())
+        .rowType(scanContext.getRowType())
+        
.maxCompactionMemoryInBytes(scanContext.getMaxCompactionMemoryInBytes())
+        .skipCompaction(scanContext.skipCompaction())
+        .skipClustering(scanContext.skipClustering())
+        .skipInsertOverwrite(scanContext.skipInsertOverwrite()).build();
+  }
+
+  @Override
+  public HoodieContinuousSplitBatch discoverSplits(String lastInstant) {
+    return incrementalInputSplits.inputHoodieSourceSplits(metaClient, 
lastInstant, scanContext.cdcEnabled());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
new file mode 100644
index 000000000000..3c1c7c090d85
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitBatch.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.source.IncrementalInputSplits;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Result from continuous enumerator. It has the same semantic to the {@link 
org.apache.hudi.source.IncrementalInputSplits.Result}.
+ */
+public class HoodieContinuousSplitBatch {
+  public static final HoodieContinuousSplitBatch EMPTY = new 
HoodieContinuousSplitBatch(Collections.emptyList(), "", "");
+  private final Collection<HoodieSourceSplit> splits;
+  private final String endInstant; // end instant to consume to
+  private final String offset;     // monotonic increasing consumption offset
+
+  /**
+   * @param splits should never be null. But it can be an empty collection
+   * @param endInstant should never be null, end instant to consume to
+   * @param offset could be null. monotonic increasing consumption offset
+   */
+  public HoodieContinuousSplitBatch(
+      Collection<HoodieSourceSplit> splits,
+      String endInstant,
+      String offset) {
+
+    ValidationUtils.checkArgument(splits != null, "Invalid to splits 
collection: null");
+    ValidationUtils.checkArgument(endInstant != null, "Invalid end instant: 
null");
+    this.splits = splits;
+    this.endInstant = endInstant;
+    this.offset = offset;
+  }
+
+  public static HoodieContinuousSplitBatch 
fromResult(IncrementalInputSplits.Result result) {
+    List<HoodieSourceSplit> splits = 
result.getInputSplits().stream().map(split ->
+        new HoodieSourceSplit(
+            HoodieSourceSplit.SPLIT_COUNTER.incrementAndGet(),
+            split.getBasePath().orElse(null),
+            split.getLogPaths(), split.getTablePath(),
+            split.getMergeType(), split.getFileId()
+        )
+    ).collect(Collectors.toList());
+
+    return new HoodieContinuousSplitBatch(splits, result.getEndInstant(), 
result.getOffset());
+  }
+
+  public Collection<HoodieSourceSplit> getSplits() {
+    return splits;
+  }
+
+  public String getEndInstant() {
+    return endInstant;
+  }
+
+  public String getOffset() {
+    return offset;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java
similarity index 55%
copy from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
copy to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java
index de41d1d0e0f4..ec09fae0fa76 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieSplitEnumeratorState.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieContinuousSplitDiscover.java
@@ -16,24 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.source.enumerator;
+package org.apache.hudi.source.split;
 
-import org.apache.hudi.source.split.HoodieSourceSplitState;
-
-import java.io.Serializable;
-import java.util.Collection;
+import org.apache.flink.annotation.Internal;
 
 /**
- * State of Hoodie split enumerator. Mainly include the states of pending 
splits of split provider.
+ * Interface for continuously discover new splits from active timeline.
  */
-public class HoodieSplitEnumeratorState implements Serializable {
-  private final Collection<HoodieSourceSplitState> pendingSplitStates;
-
-  public HoodieSplitEnumeratorState(Collection<HoodieSourceSplitState> 
pendingSplitStates) {
-    this.pendingSplitStates = pendingSplitStates;
-  }
+@Internal
+public interface HoodieContinuousSplitDiscover {
 
-  public Collection<HoodieSourceSplitState> getPendingSplitStates() {
-    return pendingSplitStates;
-  }
+  /** Discover the files appended between {@code lastInstant} and the last 
commit of current table */
+  HoodieContinuousSplitBatch discoverSplits(String lastInstant);
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
index ee05fe0e8ba7..45260b2a1edd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/split/HoodieSourceSplit.java
@@ -26,13 +26,14 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * Hoodie SourceSplit implementation for source V2.
+ * Hoodie SourceSplit implementation for source V2. It has the same semantic 
to the {@link org.apache.hudi.table.format.mor.MergeOnReadInputSplit}.
  */
 public class HoodieSourceSplit implements SourceSplit, Serializable {
+  public static AtomicInteger SPLIT_COUNTER = new AtomicInteger(0);
   private static final long serialVersionUID = 1L;
-
   private static final long NUM_NO_CONSUMPTION = 0L;
 
   // the split number
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index 31343c8e3af2..391daaf1e9e9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -77,6 +77,7 @@ import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertIterableEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -560,4 +561,139 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
             .findFirst()
             .orElse(-1);
   }
+
+  @Test
+  void testInputHoodieSourceSplits() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+        .filter(hoodieInstant -> 
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
+
+    IncrementalInputSplits iis = IncrementalInputSplits.builder()
+        .conf(conf)
+        .path(new Path(basePath))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .build();
+
+    // Test inputHoodieSourceSplits method
+    org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+        iis.inputHoodieSourceSplits(metaClient, 
firstInstant.getCompletionTime(), false);
+
+    assertNotNull(result, "Result should not be null");
+    assertNotNull(result.getSplits(), "Splits should not be null");
+    assertNotNull(result.getOffset(), "To instant should not be null");
+  }
+
+  @Test
+  void testInputHoodieSourceSplitsWithNullStartInstant() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    IncrementalInputSplits iis = IncrementalInputSplits.builder()
+        .conf(conf)
+        .path(new Path(basePath))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .build();
+
+    // Test with null start instant (should read from earliest)
+    org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+        iis.inputHoodieSourceSplits(metaClient, null, false);
+
+    assertNotNull(result, "Result should not be null");
+    assertNotNull(result.getSplits(), "Splits should not be null");
+  }
+
+  @Test
+  void testInputHoodieSourceSplitsWithNoNewInstants() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+        .filter(hoodieInstant -> 
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    String lastInstant = 
commitsTimeline.lastInstant().get().getCompletionTime();
+
+    IncrementalInputSplits iis = IncrementalInputSplits.builder()
+        .conf(conf)
+        .path(new Path(basePath))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .build();
+
+    // Query with the last instant - should return empty since there's nothing 
new
+    org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+        iis.inputHoodieSourceSplits(metaClient, lastInstant, false);
+
+    assertNotNull(result, "Result should not be null");
+    assertNotNull(result.getSplits(), "Splits should not be null");
+  }
+
+  @Test
+  void testInputHoodieSourceSplitsConvertToHoodieSourceSplit() throws 
Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    IncrementalInputSplits iis = IncrementalInputSplits.builder()
+        .conf(conf)
+        .path(new Path(basePath))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .build();
+
+    org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+        iis.inputHoodieSourceSplits(metaClient, null, false);
+
+    // Verify that splits are converted to HoodieSourceSplit type
+    result.getSplits().forEach(split -> {
+      assertTrue(split instanceof 
org.apache.hudi.source.split.HoodieSourceSplit,
+          "Split should be of type HoodieSourceSplit");
+      assertNotNull(split.splitId(), "Split ID should not be null");
+    });
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testInputHoodieSourceSplitsWithDifferentTableTypes(HoodieTableType 
tableType) throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, tableType);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+    conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    IncrementalInputSplits iis = IncrementalInputSplits.builder()
+        .conf(conf)
+        .path(new Path(basePath))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .build();
+
+    org.apache.hudi.source.split.HoodieContinuousSplitBatch result =
+        iis.inputHoodieSourceSplits(metaClient, null, false);
+
+    assertNotNull(result, "Result should not be null for table type: " + 
tableType);
+    assertNotNull(result.getSplits(), "Splits should not be null for table 
type: " + tableType);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
new file mode 100644
index 000000000000..8fbcad14997b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestScanContext.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link ScanContext}.
+ */
+public class TestScanContext {
+
+  @Test
+  public void testGetConf() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.PATH, "/tmp/test");
+
+    ScanContext scanContext = createTestScanContext(conf,  new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, false);
+
+    assertNotNull(scanContext.getConf(), "Configuration should not be null");
+    assertEquals("/tmp/test", scanContext.getConf().get(FlinkOptions.PATH),
+        "Configuration should match");
+  }
+
+  @Test
+  public void testGetPath() throws Exception {
+    Configuration conf = new Configuration();
+    Path expectedPath = new Path("/tmp/test/table");
+
+    ScanContext scanContext = createTestScanContext(conf, expectedPath,
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, false);
+
+    assertEquals(expectedPath, scanContext.getPath(), "Path should match");
+  }
+
+  @Test
+  public void testGetRowType() throws Exception {
+    Configuration conf = new Configuration();
+    RowType rowType = TestConfigurations.ROW_TYPE;
+
+    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+        rowType, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, false);
+
+    assertNotNull(scanContext.getRowType(), "RowType should not be null");
+    assertEquals(rowType, scanContext.getRowType(), "RowType should match");
+  }
+
+  @Test
+  public void testGetStartInstant() throws Exception {
+    Configuration conf = new Configuration();
+    String expectedInstant = "20231201000000000";
+
+    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, expectedInstant, 100 * 1024 * 1024,
+        1000, false, false, false, false);
+
+    assertEquals(expectedInstant, scanContext.getStartCommit(),
+        "Start instant should match");
+  }
+
+  @Test
+  public void testGetMaxCompactionMemoryInBytes() throws Exception {
+    Configuration conf = new Configuration();
+    long expectedMemory = 1024L * 1024L * 1024L; // 1GB
+
+    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", expectedMemory,
+        1000, false, false, false, false);
+
+    assertEquals(expectedMemory, scanContext.getMaxCompactionMemoryInBytes(),
+        "Max compaction memory should match");
+  }
+
+  @Test
+  public void testGetMaxPendingSplits() throws Exception {
+    Configuration conf = new Configuration();
+    long expectedMaxPendingSplits = 5000L;
+
+    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        expectedMaxPendingSplits, false, false, false, false);
+
+    assertEquals(expectedMaxPendingSplits, scanContext.getMaxPendingSplits(),
+        "Max pending splits should match");
+  }
+
+  @Test
+  public void testSkipCompaction() throws Exception {
+    Configuration conf = new Configuration();
+
+    ScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, true, false, false, false);
+    assertTrue(scanContextTrue.skipCompaction(), "Skip compaction should be 
true");
+
+    ScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, false);
+    assertFalse(scanContextFalse.skipCompaction(), "Skip compaction should be 
false");
+  }
+
+  @Test
+  public void testSkipClustering() throws Exception {
+    Configuration conf = new Configuration();
+
+    ScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, true, false, false);
+    assertTrue(scanContextTrue.skipClustering(), "Skip clustering should be 
true");
+
+    ScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, false);
+    assertFalse(scanContextFalse.skipClustering(), "Skip clustering should be 
false");
+  }
+
+  @Test
+  public void testSkipInsertOverwrite() throws Exception {
+    Configuration conf = new Configuration();
+
+    ScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, true, false);
+    assertTrue(scanContextTrue.skipInsertOverwrite(), "Skip insert overwrite 
should be true");
+
+    ScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, false);
+    assertFalse(scanContextFalse.skipInsertOverwrite(), "Skip insert overwrite 
should be false");
+  }
+
+  @Test
+  public void testCdcEnabled() throws Exception {
+    Configuration conf = new Configuration();
+
+    ScanContext scanContextTrue = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, true);
+    assertTrue(scanContextTrue.cdcEnabled(), "CDC should be enabled");
+
+    ScanContext scanContextFalse = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, false);
+    assertFalse(scanContextFalse.cdcEnabled(), "CDC should be disabled");
+  }
+
+  @Test
+  public void testGetScanInterval() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 5);
+
+    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, false);
+    Duration scanInterval = scanContext.getScanInterval();
+
+    assertNotNull(scanInterval, "Scan interval should not be null");
+    assertEquals(Duration.ofSeconds(5), scanInterval, "Scan interval should be 
5 minutes");
+  }
+
+  @Test
+  public void testGetScanIntervalDefaultValue() throws Exception {
+    Configuration conf = new Configuration();
+    // Not setting READ_STREAMING_CHECK_INTERVAL to use default
+
+    ScanContext scanContext = createTestScanContext(conf, new 
Path("/tmp/test"),
+        TestConfigurations.ROW_TYPE, "20231201000000000", 100 * 1024 * 1024,
+        1000, false, false, false, false);
+    Duration scanInterval = scanContext.getScanInterval();
+
+    assertNotNull(scanInterval, "Scan interval should not be null");
+    
assertEquals(Duration.ofSeconds(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.defaultValue()),
+        scanInterval, "Scan interval should use default value");
+  }
+
+  @Test
+  public void testAllFieldsInitialized() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 10);
+    Path path = new Path("/tmp/hoodie/table");
+    String startInstant = "20231215120000000";
+    long maxCompactionMemory = 2L * 1024L * 1024L * 1024L; // 2GB
+    long maxPendingSplits = 10000L;
+
+    ScanContext scanContext = createTestScanContext(conf, path,
+        TestConfigurations.ROW_TYPE, startInstant, maxCompactionMemory,
+        maxPendingSplits, true, true, true, true);
+
+    assertEquals(path, scanContext.getPath());
+    assertEquals(startInstant, scanContext.getStartCommit());
+    assertEquals(maxCompactionMemory, 
scanContext.getMaxCompactionMemoryInBytes());
+    assertEquals(maxPendingSplits, scanContext.getMaxPendingSplits());
+    assertTrue(scanContext.skipCompaction());
+    assertTrue(scanContext.skipClustering());
+    assertTrue(scanContext.skipInsertOverwrite());
+    assertTrue(scanContext.cdcEnabled());
+    assertEquals(Duration.ofSeconds(10), scanContext.getScanInterval());
+  }
+
+  // Helper method to create ScanContext using the Builder
+  private ScanContext createTestScanContext(
+      Configuration conf,
+      Path path,
+      RowType rowType,
+      String startInstant,
+      long maxCompactionMemoryInBytes,
+      long maxPendingSplits,
+      boolean skipCompaction,
+      boolean skipClustering,
+      boolean skipInsertOverwrite,
+      boolean cdcEnabled) throws Exception {
+    return new ScanContext.Builder()
+        .conf(conf)
+        .path(path)
+        .rowType(rowType)
+        .startInstant(startInstant)
+        .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
+        .maxPendingSplits(maxPendingSplits)
+        .skipCompaction(skipCompaction)
+        .skipClustering(skipClustering)
+        .skipInsertOverwrite(skipInsertOverwrite)
+        .cdcEnabled(cdcEnabled)
+        .build();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
new file mode 100644
index 000000000000..0559afd88aaa
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/enumerator/TestHoodieContinuousSplitEnumerator.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.enumerator;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.source.split.DefaultHoodieSplitProvider;
+import org.apache.hudi.source.split.HoodieContinuousSplitBatch;
+import org.apache.hudi.source.split.HoodieContinuousSplitDiscover;
+import org.apache.hudi.source.split.HoodieSourceSplit;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieContinuousSplitEnumerator}.
+ */
+public class TestHoodieContinuousSplitEnumerator {
+  private MockSplitEnumeratorContext context;
+  private DefaultHoodieSplitProvider splitProvider;
+  private MockContinuousSplitDiscover splitDiscover;
+  private ScanContext scanContext;
+  private HoodieContinuousSplitEnumerator enumerator;
+  private HoodieSourceSplit split1;
+  private HoodieSourceSplit split2;
+  private HoodieSourceSplit split3;
+
+  @BeforeEach
+  public void setUp() {
+    context = new MockSplitEnumeratorContext();
+    splitProvider = new DefaultHoodieSplitProvider();
+
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.PATH, "/tmp/test");
+    conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1);
+
+    scanContext = TestScanContext.builder()
+        .conf(conf)
+        .path(new Path("/tmp/test"))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant("20231201000000000")
+        .maxPendingSplits(1000)
+        .build();
+
+    splitDiscover = new MockContinuousSplitDiscover();
+
+    split1 = createTestSplit(1, "file1");
+    split2 = createTestSplit(2, "file2");
+    split3 = createTestSplit(3, "file3");
+  }
+
+  @Test
+  public void testStartEnumerator() {
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+    enumerator.start();
+
+    // Verify that async task was scheduled
+    assertEquals(1, context.getAsyncCallCount(), "Should schedule async split 
discovery");
+  }
+
+  @Test
+  public void testEnumeratorWithInitialState() {
+    HoodieSplitEnumeratorState state = new HoodieSplitEnumeratorState(
+        Collections.emptyList(), Option.of("20231201120000000"), 
Option.of("20231201120001000"));
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.of(state));
+
+    assertNotNull(enumerator, "Enumerator should be created with state");
+  }
+
+  @Test
+  public void testEnumeratorWithoutInitialState() {
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+
+    assertNotNull(enumerator, "Enumerator should be created without state");
+  }
+
+  @Test
+  public void testShouldWaitForMoreSplits() {
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+
+    assertTrue(enumerator.shouldWaitForMoreSplits(),
+        "Continuous enumerator should always wait for more splits");
+  }
+
+  @Test
+  public void testSnapshotState() throws Exception {
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+    splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2));
+
+    HoodieSplitEnumeratorState state = enumerator.snapshotState(1L);
+
+    assertNotNull(state, "Snapshot state should not be null");
+    assertNotNull(state.getPendingSplitStates(), "Split state should not be 
null");
+  }
+
+  @Test
+  public void testDiscoverSplitsWhenBelowThreshold() {
+    splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+        Arrays.asList(split1, split2), "20231201000000000", 
"20231201120000000"));
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+    enumerator.start();
+
+    // Trigger async callback manually
+    context.executeAsyncCallbacks();
+
+    // Verify splits were discovered and added
+    assertEquals(2, splitProvider.pendingSplitCount(),
+        "Should discover and add splits when below threshold");
+  }
+
+  @Test
+  public void testDiscoverSplitsPausedWhenAboveThreshold() {
+    // Fill provider with many splits to exceed threshold
+    List<HoodieSourceSplit> manySplits = new ArrayList<>();
+    for (int i = 0; i < 1100; i++) {
+      manySplits.add(createTestSplit(i, "file" + i));
+    }
+    splitProvider.onDiscoveredSplits(manySplits);
+
+    splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+        Arrays.asList(split1, split2), "20231201000000000", 
"20231201120000000"));
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+    enumerator.start();
+
+    int initialCount = splitProvider.pendingSplitCount();
+    context.executeAsyncCallbacks();
+
+    // Verify no new splits were added due to threshold
+    assertEquals(initialCount, splitProvider.pendingSplitCount(),
+        "Should not discover splits when above threshold");
+  }
+
+  @Test
+  public void testProcessDiscoveredSplitsWithMatchingInstant() {
+    splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+        Arrays.asList(split1, split2), "20231201000000000", 
"20231201120000000"));
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+    enumerator.start();
+    context.executeAsyncCallbacks();
+
+    assertEquals(2, splitProvider.pendingSplitCount(),
+        "Should add splits when instant matches");
+  }
+
+  @Test
+  public void testProcessDiscoveredSplitsWithEmptyResult() {
+    splitDiscover.setNextBatch(new HoodieContinuousSplitBatch(
+        Collections.emptyList(), "20231201000000000", "20231201120000000"));
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+    enumerator.start();
+    context.executeAsyncCallbacks();
+
+    assertEquals(0, splitProvider.pendingSplitCount(),
+        "Should handle empty split batch");
+  }
+
+  @Test
+  public void testProcessDiscoveredSplitsWithException() {
+    splitDiscover.setThrowException(true);
+
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+    enumerator.start();
+
+    try {
+      context.executeAsyncCallbacks();
+    } catch (RuntimeException e) {
+      assertTrue(e.getMessage().contains("Failed to discover new splits"),
+          "Should throw exception with correct message");
+    }
+  }
+
+  @Test
+  public void testHandleSplitRequest() {
+    splitProvider.onDiscoveredSplits(Arrays.asList(split1, split2));
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+    enumerator.start();
+
+    context.registerReader(new ReaderInfo(0, "localhost"));
+    enumerator.handleSplitRequest(0, "localhost");
+
+    assertTrue(context.getAssignedSplits().containsKey(0),
+        "Should assign splits to requesting reader");
+  }
+
+  @Test
+  public void testAddSplitsBack() {
+    enumerator = new HoodieContinuousSplitEnumerator(
+        context, splitProvider, splitDiscover, scanContext, Option.empty());
+    enumerator.start();
+
+    enumerator.addSplitsBack(Arrays.asList(split1), 0);
+
+    assertEquals(1, splitProvider.pendingSplitCount(),
+        "Split should be added back to provider");
+  }
+
+  private HoodieSourceSplit createTestSplit(int splitNum, String fileId) {
+    return new HoodieSourceSplit(
+        splitNum,
+        "basePath_" + splitNum,
+        Option.empty(),
+        "/table/path",
+        "read_optimized",
+        fileId
+    );
+  }
+
+  /**
+   * Mock implementation of HoodieContinuousSplitDiscover for testing.
+   */
+  private static class MockContinuousSplitDiscover implements 
HoodieContinuousSplitDiscover {
+    private HoodieContinuousSplitBatch nextBatch = 
HoodieContinuousSplitBatch.EMPTY;
+    private boolean throwException = false;
+
+    public void setNextBatch(HoodieContinuousSplitBatch batch) {
+      this.nextBatch = batch;
+    }
+
+    public void setThrowException(boolean throwException) {
+      this.throwException = throwException;
+    }
+
+    @Override
+    public HoodieContinuousSplitBatch discoverSplits(String lastInstant) {
+      if (throwException) {
+        throw new RuntimeException("Mock exception during split discovery");
+      }
+      return nextBatch;
+    }
+  }
+
+  /**
+   * Mock implementation of SplitEnumeratorContext for testing.
+   */
+  private static class MockSplitEnumeratorContext implements 
SplitEnumeratorContext<HoodieSourceSplit> {
+    private final Map<Integer, ReaderInfo> registeredReaders = new HashMap<>();
+    private final Map<Integer, List<HoodieSourceSplit>> assignedSplits = new 
HashMap<>();
+    private final List<Integer> noMoreSplitsSignaled = new ArrayList<>();
+    private final List<AsyncTask<?>> asyncTasks = new ArrayList<>();
+    private final AtomicInteger asyncCallCount = new AtomicInteger(0);
+
+    public void registerReader(ReaderInfo readerInfo) {
+      registeredReaders.put(readerInfo.getSubtaskId(), readerInfo);
+    }
+
+    public void unregisterReader(int subtaskId) {
+      registeredReaders.remove(subtaskId);
+    }
+
+    public Map<Integer, List<HoodieSourceSplit>> getAssignedSplits() {
+      return assignedSplits;
+    }
+
+    public List<Integer> getNoMoreSplitsSignaled() {
+      return noMoreSplitsSignaled;
+    }
+
+    public int getAsyncCallCount() {
+      return asyncCallCount.get();
+    }
+
+    @SuppressWarnings("unchecked")
+    public void executeAsyncCallbacks() {
+      for (AsyncTask<?> task : asyncTasks) {
+        try {
+          Object result = task.callable.call();
+          ((BiConsumer<Object, Throwable>) task.handler).accept(result, null);
+        } catch (Exception e) {
+          ((BiConsumer<Object, Throwable>) task.handler).accept(null, e);
+        }
+      }
+    }
+
+    @Override
+    public SplitEnumeratorMetricGroup metricGroup() {
+      return UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup();
+    }
+
+    @Override
+    public void sendEventToSourceReader(int subtaskId, 
org.apache.flink.api.connector.source.SourceEvent event) {
+      // No-op for testing
+    }
+
+    @Override
+    public int currentParallelism() {
+      return registeredReaders.size();
+    }
+
+    @Override
+    public Map<Integer, ReaderInfo> registeredReaders() {
+      return new HashMap<>(registeredReaders);
+    }
+
+    @Override
+    public void assignSplits(SplitsAssignment<HoodieSourceSplit> 
newSplitAssignments) {
+      newSplitAssignments.assignment().forEach((subtask, splits) -> {
+        assignedSplits.computeIfAbsent(subtask, k -> new 
ArrayList<>()).addAll(splits);
+      });
+    }
+
+    @Override
+    public void assignSplit(HoodieSourceSplit split, int subtask) {
+      assignedSplits.computeIfAbsent(subtask, k -> new 
ArrayList<>()).add(split);
+    }
+
+    @Override
+    public void signalNoMoreSplits(int subtask) {
+      noMoreSplitsSignaled.add(subtask);
+    }
+
+    @Override
+    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> 
handler) {
+      asyncCallCount.incrementAndGet();
+      asyncTasks.add(new AsyncTask<>(callable, handler));
+    }
+
+    @Override
+    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> 
handler, long initialDelay, long period) {
+      asyncCallCount.incrementAndGet();
+      asyncTasks.add(new AsyncTask<>(callable, handler));
+    }
+
+    @Override
+    public void runInCoordinatorThread(Runnable runnable) {
+      runnable.run();
+    }
+
+    private static class AsyncTask<T> {
+      final Callable<T> callable;
+      final BiConsumer<T, Throwable> handler;
+
+      AsyncTask(Callable<T> callable, BiConsumer<T, Throwable> handler) {
+        this.callable = callable;
+        this.handler = handler;
+      }
+    }
+  }
+
+  /**
+   * Test implementation of ScanContext for testing.
+   */
+  private static class TestScanContext extends ScanContext {
+    private TestScanContext(
+        Configuration conf,
+        Path path,
+        org.apache.flink.table.types.logical.RowType rowType,
+        String startInstant,
+        long maxPendingSplits) {
+      super(conf, path, rowType, startInstant, "",0L, maxPendingSplits, false, 
false, false, false);
+    }
+
+    public static Builder builder() {
+      return new Builder();
+    }
+
+    public static class Builder {
+      private Configuration conf;
+      private Path path;
+      private org.apache.flink.table.types.logical.RowType rowType;
+      private String startInstant;
+      private long maxPendingSplits = 1000;
+
+      public Builder conf(Configuration conf) {
+        this.conf = conf;
+        return this;
+      }
+
+      public Builder path(Path path) {
+        this.path = path;
+        return this;
+      }
+
+      public Builder rowType(org.apache.flink.table.types.logical.RowType 
rowType) {
+        this.rowType = rowType;
+        return this;
+      }
+
+      public Builder startInstant(String startInstant) {
+        this.startInstant = startInstant;
+        return this;
+      }
+
+      public Builder maxPendingSplits(long maxPendingSplits) {
+        this.maxPendingSplits = maxPendingSplits;
+        return this;
+      }
+
+      public ScanContext build() {
+        return new TestScanContext(conf, path, rowType, startInstant, 
maxPendingSplits);
+      }
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
new file mode 100644
index 000000000000..3c437e0e7e35
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/split/TestDefaultHoodieSplitDiscover.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.split;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.source.ScanContext;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Test cases for {@link DefaultHoodieSplitDiscover}.
+ */
+public class TestDefaultHoodieSplitDiscover extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  void init() {
+    initPath();
+  }
+
+  @Test
+  void testDiscoverSplitsWithNoNewInstants() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+        .filter(hoodieInstant -> 
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    String lastInstant = 
commitsTimeline.lastInstant().get().getCompletionTime();
+
+    ScanContext scanContext = createScanContext(conf);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext, metaClient);
+
+    // Query with the last instant - should return empty or minimal splits
+    HoodieContinuousSplitBatch result = discover.discoverSplits(lastInstant);
+
+    assertNotNull(result, "Result should not be null");
+    assertNotNull(result.getSplits(), "Splits should not be null");
+  }
+
+  @Test
+  void testDiscoverSplitsWithNewInstants() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+    // Insert initial data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+        .filter(hoodieInstant -> 
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
+
+    // Insert more data to create new instants
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    metaClient.reloadActiveTimeline();
+
+    ScanContext scanContext = createScanContext(conf);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext, metaClient);
+
+    // Discover splits after the first instant
+    HoodieContinuousSplitBatch result = 
discover.discoverSplits(firstInstant.getCompletionTime());
+
+    assertNotNull(result, "Result should not be null");
+    assertNotNull(result.getSplits(), "Splits should not be null");
+    assertNotNull(result.getOffset(), "To instant should not be null");
+  }
+
+  @Test
+  void testDiscoverSplitsFromEarliest() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.READ_START_COMMIT, 
FlinkOptions.START_COMMIT_EARLIEST);
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    ScanContext scanContext = createScanContext(conf);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext, metaClient);
+
+    // Discover splits from null (earliest)
+    HoodieContinuousSplitBatch result = discover.discoverSplits(null);
+
+    assertNotNull(result, "Result should not be null");
+    assertNotNull(result.getSplits(), "Splits should not be null");
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testDiscoverSplitsWithDifferentTableTypes(HoodieTableType tableType) 
throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, tableType);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    ScanContext scanContext = createScanContext(conf);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext, metaClient);
+
+    HoodieContinuousSplitBatch result = discover.discoverSplits(null);
+
+    assertNotNull(result, "Result should not be null for table type: " + 
tableType);
+    assertNotNull(result.getSplits(), "Splits should not be null for table 
type: " + tableType);
+  }
+
+  @Test
+  void testDiscoverSplitsReturnsCorrectInstantRange() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+
+    // Insert multiple commits
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline commitsTimeline = metaClient.getActiveTimeline()
+        .filter(hoodieInstant -> 
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
+    String firstCompletionTime = firstInstant.getCompletionTime();
+
+    ScanContext scanContext = createScanContext(conf);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext, metaClient);
+
+    HoodieContinuousSplitBatch result = 
discover.discoverSplits(firstCompletionTime);
+
+    assertNotNull(result, "Result should not be null");
+    assertNotNull(result.getOffset(), "To instant should not be null");
+  }
+
+  @Test
+  void testDiscoverSplitsWithSkipOptions() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true);
+    conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
+
+    // Insert test data
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    ScanContext scanContext = createScanContextWithSkipOptions(conf, true, 
true, false);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext, metaClient);
+
+    HoodieContinuousSplitBatch result = 
discover.discoverSplits(scanContext.getStartCommit());
+
+    assertNotNull(result, "Result should not be null");
+    assertNotNull(result.getSplits(), "Splits should not be null");
+  }
+
+  @Test
+  void testDiscoverSplitsConstructor() throws Exception {
+    metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+
+    ScanContext scanContext = createScanContext(conf);
+    DefaultHoodieSplitDiscover discover = new DefaultHoodieSplitDiscover(
+        scanContext, metaClient);
+
+    assertNotNull(discover, "Discover instance should not be null");
+  }
+
+  // Helper methods
+
+  private ScanContext createScanContext(Configuration conf) throws Exception {
+    return createScanContextWithSkipOptions(conf, false, false, false);
+  }
+
+  private ScanContext createScanContextWithSkipOptions(
+      Configuration conf,
+      boolean skipCompaction,
+      boolean skipClustering,
+      boolean skipInsertOverwrite) throws Exception {
+    return new ScanContext.Builder()
+        .conf(conf)
+        .path(new Path(basePath))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .startInstant(conf.get(FlinkOptions.READ_START_COMMIT))
+        .endInstant("")
+        .maxCompactionMemoryInBytes(100 * 1024 * 1024)
+        .maxPendingSplits(1000)
+        .skipCompaction(skipCompaction)
+        .skipClustering(skipClustering)
+        .skipInsertOverwrite(skipInsertOverwrite)
+        .cdcEnabled(false)
+        .build();
+  }
+}

Reply via email to