alexeykudinkin commented on a change in pull request #4556: URL: https://github.com/apache/hudi/pull/4556#discussion_r791350112
########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java ########## @@ -189,8 +190,10 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { assertTrue(fileIdToNewSize.entrySet().stream().anyMatch(entry -> fileIdToSize.get(entry.getKey()) < entry.getValue())); - List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + List<String> inputPaths = roView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); Review comment: Yes, these are correct -- previously they were actually working correctly just b/c we did the double file-listing (w/in `getRealtimeSplits`). I have to pass _partition paths_, not base-file paths. ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ########## @@ -166,10 +169,12 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1); JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); - client.commit(newCommitTime, writeStatusJavaRDD); + List<WriteStatus> statuses = writeStatusJavaRDD.collect(); assertNoWriteErrors(statuses); + client.commit(newCommitTime, jsc().parallelize(statuses)); + Review comment: These tests are actually written incorrectly -- they're dereferencing RDDs twice w/in `commit` and when the collect w/in the state itself. This leads to same base-files being double-written, which in turn fails assertion that i currently put in place to make sure that legacy flow and the new one yield identical results. ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ########## @@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); - List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + List<String> inputPaths = tableView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); Review comment: Preserving existing behavior ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ########## @@ -201,8 +206,10 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200)); - List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList()); - List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, + List<String> inputPaths = tableView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); Review comment: Preserving existing behavior (hence keeping lists instead of Sets) ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ########## @@ -268,11 +281,13 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro thirdClient.startCommitWithTime(newCommitTime); writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime); + statuses = writeStatusJavaRDD.collect(); - thirdClient.commit(newCommitTime, writeStatusJavaRDD); // Verify there are no errors assertNoWriteErrors(statuses); + thirdClient.commit(newCommitTime, jsc().parallelize(statuses)); + Review comment: Replied above ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ########## @@ -361,15 +386,19 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords); copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200)); - List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); + List<String> dataFiles = tableView.getLatestBaseFiles() + .map(baseFile -> new Path(baseFile.getPath()).getParent().toString()) + .collect(Collectors.toList()); List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles, basePath()); assertEquals(200, recordsRead.size()); statuses = nClient.upsert(jsc().parallelize(copyOfRecords, 1), newCommitTime).collect(); // Verify there are no errors assertNoWriteErrors(statuses); - nClient.commit(newCommitTime, writeStatusJavaRDD); + + nClient.commit(newCommitTime, jsc().parallelize(statuses)); + Review comment: These are not cosmetic -- these are real test failures fixes (fixes base-files double-writing) ########## File path: hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala ########## @@ -87,6 +89,12 @@ abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext, refresh0() + /** + * Returns latest completed instant as seen by this instance of the file-index + */ + def latestCompletedInstant(): Option[HoodieInstant] = Review comment: I think this is the approach we do take in Java, but in Scala this seems to be off (since you don't add braces at the end of the file-name so get-prefix feels off. ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/BootstrapBaseFileSplit.java ########## @@ -32,10 +32,6 @@ private FileSplit bootstrapFileSplit; - public BootstrapBaseFileSplit() { - super(); - } Review comment: Yeah, this isn't used outside of our codebase ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java ########## @@ -143,6 +121,124 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile return returns.toArray(new FileStatus[0]); } + private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) { Review comment: Moved methods aren't actually changing in here ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java ########## @@ -143,6 +121,127 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile return returns.toArray(new FileStatus[0]); } + private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) { + List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); + checkState(diff.isEmpty(), "Should be empty"); + } + + @Nonnull + private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { + try { + return HoodieInputFormatUtils.getFileStatus(baseFile); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to get file-status", ioe); + } + } + + /** + * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that + * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified + * as part of provided {@link JobConf} + */ + protected final FileStatus[] doListStatus(JobConf job) throws IOException { + return super.listStatus(job); + } + + /** + * Achieves listStatus functionality for an incrementally queried table. Instead of listing all + * partitions and then filtering based on the commits of interest, this logic first extracts the + * partitions touched by the desired commits and then lists only those partitions. + */ + protected List<FileStatus> listStatusForIncrementalMode(JobConf job, + HoodieTableMetaClient tableMetaClient, + List<Path> inputPaths, + String incrementalTable) throws IOException { + String tableName = tableMetaClient.getTableConfig().getTableName(); + Job jobContext = Job.getInstance(job); + Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); + if (!timeline.isPresent()) { + return null; + } + Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get()); + if (!commitsToCheck.isPresent()) { + return null; + } + Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths); + // Mutate the JobConf to set the input paths to only partitions touched by incremental pull. + if (!incrementalInputPaths.isPresent()) { + return null; + } + setInputPaths(job, incrementalInputPaths.get()); + FileStatus[] fileStatuses = doListStatus(job); + return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); + } + + protected abstract boolean includeLogFilesForSnapshotView(); + + @Nonnull + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, + Stream<HoodieLogFile> logFiles, + Option<HoodieInstant> latestCompletedInstantOpt, + HoodieTableMetaClient tableMetaClient) { + List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + FileStatus baseFileStatus = getFileStatusUnchecked(baseFile); + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus); + rtFileStatus.setDeltaLogFiles(sortedLogFiles); + rtFileStatus.setBaseFilePath(baseFile.getPath()); + rtFileStatus.setBasePath(tableMetaClient.getBasePath()); + + if (latestCompletedInstantOpt.isPresent()) { + HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); + checkState(latestCompletedInstant.isCompleted()); + + rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp()); + } + + if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile || baseFileStatus instanceof FileStatusWithBootstrapBaseFile) { + rtFileStatus.setBootStrapFileStatus(baseFileStatus); + } + + return rtFileStatus; + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to init %s", RealtimeFileStatus.class.getSimpleName()), e); + } + } + + @Nonnull + private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job, Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path> snapshotPaths) throws IOException { + return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView()); + } + + @Nonnull + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile, + Stream<HoodieLogFile> logFiles, + Option<HoodieInstant> latestCompletedInstantOpt, + HoodieTableMetaClient tableMetaClient) { + List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); + try { + RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus()); + rtFileStatus.setDeltaLogFiles(sortedLogFiles); + rtFileStatus.setBasePath(tableMetaClient.getBasePath()); + + if (latestCompletedInstantOpt.isPresent()) { + HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get(); + checkState(latestCompletedInstant.isCompleted()); Review comment: I think we should control all assertion centrally (ie being to toggle on/off all of the assertions holistically) and i would prefer to have this assertion (since it's not bringing any performance penalty along) ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java ########## @@ -143,6 +121,127 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile return returns.toArray(new FileStatus[0]); } + private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) { + List<FileStatus> diff = CollectionUtils.diff(targetFiles, legacyFileStatuses); + checkState(diff.isEmpty(), "Should be empty"); + } + + @Nonnull + private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) { + try { + return HoodieInputFormatUtils.getFileStatus(baseFile); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to get file-status", ioe); + } + } + + /** + * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)} operation to subclasses that + * lists files (returning an array of {@link FileStatus}) corresponding to the input paths specified + * as part of provided {@link JobConf} + */ + protected final FileStatus[] doListStatus(JobConf job) throws IOException { + return super.listStatus(job); + } + + /** + * Achieves listStatus functionality for an incrementally queried table. Instead of listing all + * partitions and then filtering based on the commits of interest, this logic first extracts the + * partitions touched by the desired commits and then lists only those partitions. + */ + protected List<FileStatus> listStatusForIncrementalMode(JobConf job, + HoodieTableMetaClient tableMetaClient, + List<Path> inputPaths, + String incrementalTable) throws IOException { + String tableName = tableMetaClient.getTableConfig().getTableName(); + Job jobContext = Job.getInstance(job); + Option<HoodieTimeline> timeline = HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient); + if (!timeline.isPresent()) { + return null; + } + Option<List<HoodieInstant>> commitsToCheck = HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName, timeline.get()); + if (!commitsToCheck.isPresent()) { + return null; + } + Option<String> incrementalInputPaths = HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(), tableMetaClient, timeline.get(), inputPaths); + // Mutate the JobConf to set the input paths to only partitions touched by incremental pull. + if (!incrementalInputPaths.isPresent()) { + return null; + } + setInputPaths(job, incrementalInputPaths.get()); + FileStatus[] fileStatuses = doListStatus(job); + return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get()); + } + + protected abstract boolean includeLogFilesForSnapshotView(); + + @Nonnull + private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile, Review comment: These are simple ctor-like methods which are private in scope. What do you like to see as a description for it? ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java ########## @@ -31,7 +31,7 @@ */ public class PathWithLogFilePath extends Path { // a flag to mark this split is produced by incremental query or not. - private boolean belongToIncrementalPath = false; + private boolean belongsToIncrementalPath = false; Review comment: Then unfortunately will double number of stacked PRs -- not taking up any major renamings, other than touch a few files at most (this in particular to bring consistency across classes in naming correspondent fields) ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java ########## @@ -143,6 +121,124 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile return returns.toArray(new FileStatus[0]); } + private void validate(List<FileStatus> targetFiles, List<FileStatus> legacyFileStatuses) { Review comment: The reason for re-ordering is to maintain the invariant that you order the methods in the order of loose > strict access (static methods are kept in the same order after all non-static methods). This allows for easier analysis of the hierarchy what's present in which part of the hierarchy and make sure they are structured correctly. Apologies for the jitter in the review. ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ########## @@ -65,11 +65,71 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.TypeUtils.unsafeCast; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); - public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) { + public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException { Review comment: Simply limiting the amount of changes to avoid changing too many things at the same time ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java ########## @@ -43,17 +43,20 @@ private String basePath; - public RealtimeBootstrapBaseFileSplit() { - super(); - } Review comment: We should certainly clean up such unnecessary clauses ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java ########## @@ -187,13 +286,28 @@ private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFile .map(fileSlice -> { Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile(); Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile(); - if (baseFileOpt.isPresent()) { - return getFileStatusUnchecked(baseFileOpt); - } else if (includeLogFilesForSnapShotView() && latestLogFileOpt.isPresent()) { - return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), fileSlice.getLogFiles()); + Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles(); + + Option<HoodieInstant> latestCompletedInstantOpt = + fromScala(fileIndex.latestCompletedInstant()); + + // Check if we're reading a MOR table + if (includeLogFilesForSnapshotView()) { + if (baseFileOpt.isPresent()) { + return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient); + } else if (latestLogFileOpt.isPresent()) { + return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles, latestCompletedInstantOpt, tableMetaClient); + } else { + throw new IllegalStateException("Invalid state: either base-file or log-file has to be present"); + } } else { - throw new IllegalStateException("Invalid state: either base-file or log-file should be present"); + if (baseFileOpt.isPresent()) { + return getFileStatusUnchecked(baseFileOpt.get()); + } else { + throw new IllegalStateException("Invalid state: base-file has to be present"); + } } Review comment: `listStatusForSnapshotMode()` is used for both COW/MOR, and whether we're handling COW/MOR is controlled by `includeLogFilesForSnapshotView` (will clean this up in a follow up, cleaning up some assertions after majority of the PRs land) ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ########## @@ -144,28 +204,32 @@ return rtSplits.toArray(new InputSplit[0]); } + /** + * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration, List)} + */ // get IncrementalRealtimeSplits - public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) throws IOException { + public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException { + checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery), + "All splits have to belong to incremental query"); + List<InputSplit> rtSplits = new ArrayList<>(); - List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList()); - Set<Path> partitionSet = fileSplitList.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); + Set<Path> partitionSet = fileSplits.stream().map(f -> f.getPath().getParent()).collect(Collectors.toSet()); Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByPartitionPath(conf, partitionSet); // Pre process tableConfig from first partition to fetch virtual key info Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty(); if (partitionSet.size() > 0) { hoodieVirtualKeyInfo = getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next())); } Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo = hoodieVirtualKeyInfo; - fileSplitList.stream().forEach(s -> { + fileSplits.stream().forEach(s -> { // deal with incremental query. try { if (s instanceof BaseFileWithLogsSplit) { - BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s; - if (bs.getBelongToIncrementalSplit()) { - rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); - } + BaseFileWithLogsSplit bs = unsafeCast(s); + rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(), bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo)); } else if (s instanceof RealtimeBootstrapBaseFileSplit) { - rtSplits.add(s); + RealtimeBootstrapBaseFileSplit bs = unsafeCast(s); Review comment: This is Java, sir :-) ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java ########## @@ -65,11 +65,71 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.TypeUtils.unsafeCast; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { private static final Logger LOG = LogManager.getLogger(HoodieRealtimeInputFormatUtils.class); - public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSplit> fileSplits) { + public static InputSplit[] getRealtimeSplits(Configuration conf, List<FileSplit> fileSplits) throws IOException { + if (fileSplits.isEmpty()) { + return new InputSplit[0]; + } + + FileSplit fileSplit = fileSplits.get(0); + + // Pre-process table-config to fetch virtual key info + Path partitionPath = fileSplit.getPath().getParent(); + HoodieTableMetaClient metaClient = getTableMetaClientForBasePathUnchecked(conf, partitionPath); + + Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt = getHoodieVirtualKeyInfo(metaClient); + + // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase} + HoodieInstant latestCommitInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get(); + + InputSplit[] finalSplits = fileSplits.stream() + .map(split -> { + // There are 4 types of splits could we have to handle here + // - {@code BootstrapBaseFileSplit}: in case base file does have associated bootstrap file, + // but does NOT have any log files appended (convert it to {@code RealtimeBootstrapBaseFileSplit}) + // - {@code RealtimeBootstrapBaseFileSplit}: in case base file does have associated bootstrap file + // and does have log files appended + // - {@code BaseFileWithLogsSplit}: in case base file does NOT have associated bootstrap file Review comment: This will be handled by `else` block ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java ########## @@ -44,9 +44,7 @@ private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty(); - public HoodieRealtimeFileSplit() { - super(); Review comment: Superfluous. It's done by default ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java ########## @@ -440,6 +437,9 @@ public static HoodieMetadataConfig buildMetadataConfig(Configuration conf) { .build(); } + /** + * @deprecated + */ Review comment: Method will be cleaned up altogether by HUDI-3280 ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java ########## @@ -516,8 +556,6 @@ void testMORTableRestore(boolean restoreAfterCompaction) throws Exception { JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1); JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime); client.commit(newCommitTime, writeStatusJavaRDD); - List<WriteStatus> statuses = writeStatusJavaRDD.collect(); - assertNoWriteErrors(statuses); Review comment: We can't actually keep this check, since we can't do double dereference of the RDD -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org