alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791350112



##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
##########
@@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean 
populateMetaFields) throws Exception {
 
       assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> 
fileIdToSize.get(entry.getKey()) < entry.getValue()));
 
-      List<String> dataFiles = 
roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-      List<GenericRecord> recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+      List<String> inputPaths = roView.getLatestBaseFiles()
+          .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
+          .collect(Collectors.toList());

Review comment:
       Yes, these are correct -- previously they were actually working 
correctly just b/c we did the double file-listing (w/in `getRealtimeSplits`). 
   
   I have to pass _partition paths_, not base-file paths. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -166,10 +169,12 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
       JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
 
       JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, 
newCommitTime);
-      client.commit(newCommitTime, writeStatusJavaRDD);
+
       List<WriteStatus> statuses = writeStatusJavaRDD.collect();
       assertNoWriteErrors(statuses);
 
+      client.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
       These tests are actually written incorrectly -- they're dereferencing 
RDDs twice w/in `commit` and when the collect w/in the state itself. This leads 
to same base-files being double-written, which in turn fails assertion that i 
currently put in place to make sure that legacy flow and the new one yield 
identical results.

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
         copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
         copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
-        List<String> dataFiles = 
tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-        List<GenericRecord> recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+        List<String> inputPaths = tableView.getLatestBaseFiles()
+            .map(baseFile -> new 
Path(baseFile.getPath()).getParent().toString())
+            .collect(Collectors.toList());

Review comment:
       Preserving existing behavior

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
         copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
         copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
 
-        List<String> dataFiles = 
tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
-        List<GenericRecord> recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+        List<String> inputPaths = tableView.getLatestBaseFiles()
+            .map(baseFile -> new 
Path(baseFile.getPath()).getParent().toString())
+            .collect(Collectors.toList());

Review comment:
       Preserving existing behavior (hence keeping lists instead of Sets)

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -268,11 +281,13 @@ void testRollbackWithDeltaAndCompactionCommit(boolean 
rollbackUsingMarkers) thro
         thirdClient.startCommitWithTime(newCommitTime);
 
         writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
+
         statuses = writeStatusJavaRDD.collect();
-        thirdClient.commit(newCommitTime, writeStatusJavaRDD);
         // Verify there are no errors
         assertNoWriteErrors(statuses);
 
+        thirdClient.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
       Replied above

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -361,15 +386,19 @@ void testMultiRollbackWithDeltaAndCompactionCommit() 
throws Exception {
         copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
         copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
 
-        List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> 
hf.getPath()).collect(Collectors.toList());
+        List<String> dataFiles = tableView.getLatestBaseFiles()
+            .map(baseFile -> new 
Path(baseFile.getPath()).getParent().toString())
+            .collect(Collectors.toList());
         List<GenericRecord> recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
             basePath());
         assertEquals(200, recordsRead.size());
 
         statuses = nClient.upsert(jsc().parallelize(copyOfRecords, 1), 
newCommitTime).collect();
         // Verify there are no errors
         assertNoWriteErrors(statuses);
-        nClient.commit(newCommitTime, writeStatusJavaRDD);
+
+        nClient.commit(newCommitTime, jsc().parallelize(statuses));
+

Review comment:
       These are not cosmetic -- these are real test failures fixes (fixes 
base-files double-writing)

##########
File path: 
hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala
##########
@@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: 
HoodieEngineContext,
 
   refresh0()
 
+  /**
+   * Returns latest completed instant as seen by this instance of the 
file-index
+   */
+  def latestCompletedInstant(): Option[HoodieInstant] =

Review comment:
       I think this is the approach we do take in Java, but in Scala this seems 
to be off (since you don't add braces at the end of the file-name so get-prefix 
feels off.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java
##########
@@ -32,10 +32,6 @@
 
   private FileSplit bootstrapFileSplit;
 
-  public BootstrapBaseFileSplit() {
-    super();
-  }

Review comment:
       Yeah, this isn't used outside of our codebase

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,124 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {

Review comment:
       Moved methods aren't actually changing in here

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, 
legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} 
operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to 
the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. 
Instead of listing all
+   * partitions and then filtering based on the commits of interest, this 
logic first extracts the
+   * partitions touched by the desired commits and then lists only those 
partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          
HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> 
inputPaths,
+                                                          String 
incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = 
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, 
timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = 
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), 
tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by 
incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, 
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus 
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+                                                                      
Stream<HoodieLogFile> logFiles,
+                                                                      
Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      
HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = 
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+    try {
+      RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+      rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+      rtFileStatus.setBaseFilePath(baseFile.getPath());
+      rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+      if (latestCompletedInstantOpt.isPresent()) {
+        HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+        checkState(latestCompletedInstant.isCompleted());
+
+        rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+      }
+
+      if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || 
baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+        rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+      }
+
+      return rtFileStatus;
+    } catch (IOException e) {
+      throw new HoodieIOException(String.format("Failed to init %s", 
RealtimeFileStatus.class.getSimpleName()), e);
+    }
+  }
+
+  @Nonnull
+  private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, 
Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> 
snapshotPaths) throws IOException {
+    return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, 
tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
+  }
+
+  @Nonnull
+  private static RealtimeFileStatus 
createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
+                                                                      
Stream<HoodieLogFile> logFiles,
+                                                                      
Option<HoodieInstant> latestCompletedInstantOpt,
+                                                                      
HoodieTableMetaClient tableMetaClient) {
+    List<HoodieLogFile> sortedLogFiles = 
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    try {
+      RealtimeFileStatus rtFileStatus = new 
RealtimeFileStatus(latestLogFile.getFileStatus());
+      rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+      rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+      if (latestCompletedInstantOpt.isPresent()) {
+        HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+        checkState(latestCompletedInstant.isCompleted());

Review comment:
       I think we should control all assertion centrally (ie being to toggle 
on/off all of the assertions holistically) and i would prefer to have this 
assertion (since it's not bringing any performance penalty along)

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {
+    List<FileStatus> diff = CollectionUtils.diff(targetFiles, 
legacyFileStatuses);
+    checkState(diff.isEmpty(), "Should be empty");
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFile);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to get file-status", ioe);
+    }
+  }
+
+  /**
+   * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} 
operation to subclasses that
+   * lists files (returning an array of {@link FileStatus}) corresponding to 
the input paths specified
+   * as part of provided {@link JobConf}
+   */
+  protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+    return super.listStatus(job);
+  }
+
+  /**
+   * Achieves listStatus functionality for an incrementally queried table. 
Instead of listing all
+   * partitions and then filtering based on the commits of interest, this 
logic first extracts the
+   * partitions touched by the desired commits and then lists only those 
partitions.
+   */
+  protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+                                                          
HoodieTableMetaClient tableMetaClient,
+                                                          List<Path> 
inputPaths,
+                                                          String 
incrementalTable) throws IOException {
+    String tableName = tableMetaClient.getTableConfig().getTableName();
+    Job jobContext = Job.getInstance(job);
+    Option<HoodieTimeline> timeline = 
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+    if (!timeline.isPresent()) {
+      return null;
+    }
+    Option<List<HoodieInstant>> commitsToCheck = 
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, 
timeline.get());
+    if (!commitsToCheck.isPresent()) {
+      return null;
+    }
+    Option<String> incrementalInputPaths = 
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), 
tableMetaClient, timeline.get(), inputPaths);
+    // Mutate the JobConf to set the input paths to only partitions touched by 
incremental pull.
+    if (!incrementalInputPaths.isPresent()) {
+      return null;
+    }
+    setInputPaths(job, incrementalInputPaths.get());
+    FileStatus[] fileStatuses = doListStatus(job);
+    return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, 
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+  }
+
+  protected abstract boolean includeLogFilesForSnapshotView();
+
+  @Nonnull
+  private static RealtimeFileStatus 
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,

Review comment:
       These are simple ctor-like methods which are private in scope. What do 
you like to see as a description for it? 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java
##########
@@ -31,7 +31,7 @@
  */
 public class PathWithLogFilePath extends Path {
   // a flag to mark this split is produced by incremental query or not.
-  private boolean belongToIncrementalPath = false;
+  private boolean belongsToIncrementalPath = false;

Review comment:
       Then unfortunately will double number of stacked PRs -- not taking up 
any major renamings, other than touch a few files at most (this in particular 
to bring consistency across classes in naming correspondent fields) 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,124 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
     return returns.toArray(new FileStatus[0]);
   }
 
+  private void validate(List<FileStatus> targetFiles, List<FileStatus> 
legacyFileStatuses) {

Review comment:
       The reason for re-ordering is to maintain the invariant that you order 
the methods in the order of loose > strict access (static methods are kept in 
the same order after all non-static methods). This allows for easier analysis 
of the hierarchy what's present in which part of the hierarchy and make sure 
they are structured correctly.
   
   Apologies for the jitter in the review.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, 
List<FileSplit> fileSplits) throws IOException {

Review comment:
       Simply limiting the amount of changes to avoid changing too many things 
at the same time

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java
##########
@@ -43,17 +43,20 @@
 
   private String basePath;
 
-  public RealtimeBootstrapBaseFileSplit() {
-    super();
-  }

Review comment:
       We should certainly clean up such unnecessary clauses

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -187,13 +286,28 @@ private static FileStatus 
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
               .map(fileSlice -> {
                 Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
                 Option<HoodieLogFile> latestLogFileOpt = 
fileSlice.getLatestLogFile();
-                if (baseFileOpt.isPresent()) {
-                  return getFileStatusUnchecked(baseFileOpt);
-                } else if (includeLogFilesForSnapShotView() && 
latestLogFileOpt.isPresent()) {
-                  return 
createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), 
fileSlice.getLogFiles());
+                Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
+
+                Option<HoodieInstant> latestCompletedInstantOpt =
+                    fromScala(fileIndex.latestCompletedInstant());
+
+                // Check if we're reading a MOR table
+                if (includeLogFilesForSnapshotView()) {
+                  if (baseFileOpt.isPresent()) {
+                    return 
createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, 
latestCompletedInstantOpt, tableMetaClient);
+                  } else if (latestLogFileOpt.isPresent()) {
+                    return 
createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, 
latestCompletedInstantOpt, tableMetaClient);
+                  } else {
+                    throw new IllegalStateException("Invalid state: either 
base-file or log-file has to be present");
+                  }
                 } else {
-                  throw new IllegalStateException("Invalid state: either 
base-file or log-file should be present");
+                  if (baseFileOpt.isPresent()) {
+                    return getFileStatusUnchecked(baseFileOpt.get());
+                  } else {
+                    throw new IllegalStateException("Invalid state: base-file 
has to be present");
+                  }
                 }

Review comment:
       `listStatusForSnapshotMode()` is used for both COW/MOR, and whether 
we're handling COW/MOR is controlled by `includeLogFilesForSnapshotView` (will 
clean this up in a follow up, cleaning up some assertions after majority of the 
PRs land)

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -144,28 +204,32 @@
     return rtSplits.toArray(new InputSplit[0]);
   }
 
+  /**
+   * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, 
List)}
+   */
   // get IncrementalRealtimeSplits
-  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
Stream<FileSplit> fileSplits) throws IOException {
+  public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, 
List<FileSplit> fileSplits) throws IOException {
+    
checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery),
+        "All splits have to belong to incremental query");
+
     List<InputSplit> rtSplits = new ArrayList<>();
-    List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList());
-    Set<Path> partitionSet = fileSplitList.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
+    Set<Path> partitionSet = fileSplits.stream().map(f -> 
f.getPath().getParent()).collect(Collectors.toSet());
     Map<Path, HoodieTableMetaClient> partitionsToMetaClient = 
getTableMetaClientByPartitionPath(conf, partitionSet);
     // Pre process tableConfig from first partition to fetch virtual key info
     Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
     if (partitionSet.size() > 0) {
       hoodieVirtualKeyInfo = 
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
     }
     Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = 
hoodieVirtualKeyInfo;
-    fileSplitList.stream().forEach(s -> {
+    fileSplits.stream().forEach(s -> {
       // deal with incremental query.
       try {
         if (s instanceof BaseFileWithLogsSplit) {
-          BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
-          if (bs.getBelongToIncrementalSplit()) {
-            rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
-          }
+          BaseFileWithLogsSplit bs = unsafeCast(s);
+          rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), 
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
         } else if (s instanceof RealtimeBootstrapBaseFileSplit) {
-          rtSplits.add(s);
+          RealtimeBootstrapBaseFileSplit bs = unsafeCast(s);

Review comment:
       This is Java, sir 
   
   :-)

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
 
-  public static InputSplit[] getRealtimeSplits(Configuration conf, 
Stream<FileSplit> fileSplits) {
+  public static InputSplit[] getRealtimeSplits(Configuration conf, 
List<FileSplit> fileSplits) throws IOException {
+    if (fileSplits.isEmpty()) {
+      return new InputSplit[0];
+    }
+
+    FileSplit fileSplit = fileSplits.get(0);
+
+    // Pre-process table-config to fetch virtual key info
+    Path partitionPath = fileSplit.getPath().getParent();
+    HoodieTableMetaClient metaClient = 
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+    Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = 
getHoodieVirtualKeyInfo(metaClient);
+
+    // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+    HoodieInstant latestCommitInstant =
+        
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+    InputSplit[] finalSplits = fileSplits.stream()
+      .map(split -> {
+        // There are 4 types of splits could we have to handle here
+        //    - {@code BootstrapBaseFileSplit}: in case base file does have 
associated bootstrap file,
+        //      but does NOT have any log files appended (convert it to {@code 
RealtimeBootstrapBaseFileSplit})
+        //    - {@code RealtimeBootstrapBaseFileSplit}: in case base file does 
have associated bootstrap file
+        //      and does have log files appended
+        //    - {@code BaseFileWithLogsSplit}: in case base file does NOT have 
associated bootstrap file

Review comment:
       This will be handled by `else` block

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
##########
@@ -44,9 +44,7 @@
 
   private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
 
-  public HoodieRealtimeFileSplit() {
-    super();

Review comment:
       Superfluous. It's done by default

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -440,6 +437,9 @@ public static HoodieMetadataConfig 
buildMetadataConfig(Configuration conf) {
         .build();
   }
 
+  /**
+   * @deprecated
+   */

Review comment:
       Method will be cleaned up altogether by HUDI-3280

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -516,8 +556,6 @@ void testMORTableRestore(boolean restoreAfterCompaction) 
throws Exception {
     JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
     JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, 
newCommitTime);
     client.commit(newCommitTime, writeStatusJavaRDD);
-    List<WriteStatus> statuses = writeStatusJavaRDD.collect();
-    assertNoWriteErrors(statuses);

Review comment:
       We can't actually keep this check, since we can't do double dereference 
of the RDD




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to