danny0405 commented on code in PR #12984:
URL: https://github.com/apache/hudi/pull/12984#discussion_r2006671882


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -903,6 +875,166 @@ public static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(Hoodi
     }
   }
 
+  private static Iterator<HoodieRecord> 
getRecordIndexRecordsForFileSliceWithLogFiles(HoodieTableMetaClient 
dataTableMetaClient, String partitionPath, String fileId,
+                                                                               
       HoodieTableFileSystemView fsView, List<HoodieWriteStat> 
logFileWriteStats,
+                                                                               
       String basePath, EngineType engineType, String instantTime, int 
writesFileIdEncoding,
+                                                                               
       Option<Schema> finalWriterSchemaOpt) throws Exception {
+    if 
(dataTableMetaClient.getTableConfig().getRecordMergeMode().equals(RecordMergeMode.COMMIT_TIME_ORDERING))
 {
+      // with commit time ordering, we don't need to merge log files with base 
files and hence could compute the record index records in an optimized manner
+      return 
getRecordIndexRecordsForLogFilesWithCommitTimeOrdering(dataTableMetaClient, 
partitionPath, fileId, logFileWriteStats, engineType, instantTime,
+          writesFileIdEncoding, finalWriterSchemaOpt);
+    } else {
+      List<HoodieRecord> allRecords = new ArrayList<>();
+      List<FileSlice> previousFileSliceForFileIdList = 
fsView.getLatestMergedFileSlicesBeforeOrOn(partitionPath, instantTime)
+          .filter(fileSlice -> 
fileSlice.getFileId().equals(fileId)).collect(toList());
+      if (previousFileSliceForFileIdList.size() > 1) {
+        throw new HoodieException("Found two file slices for same fileId " + 
fileId + ", in partition " + partitionPath);
+      }
+      if (previousFileSliceForFileIdList.isEmpty()) {
+        throw new HoodieException("Cannot find the file slice for fileId " + 
fileId + ", in partition + " + partitionPath + ", in FileSystemView. ");
+      }
+      FileSlice previousFileSliceForFileId = 
previousFileSliceForFileIdList.get(0);
+      FileSlice latestFileSlicesIncludingInflight = new 
FileSlice(previousFileSliceForFileId);
+      logFileWriteStats.forEach(logFileWriteStat -> {
+        latestFileSlicesIncludingInflight.addLogFile(new HoodieLogFile(new 
StoragePath(basePath + "/" + logFileWriteStat.getPath())));
+      });
+
+      Set<String> prevSliceRecordKeys = new HashSet<>();
+      if (!previousFileSliceForFileId.hasLogFiles()) {
+        // if previous slice only contains base file, directly read base file 
by projecting just record key instead of going via File slice or file group 
reader.
+        prevSliceRecordKeys = new 
HashSet<>(RecordIndexUtils.getRecordKeyStatuses(dataTableMetaClient.getBasePath().toString(),
 partitionPath,
+                previousFileSliceForFileId.getBaseFile().get().getFileName(), 
null, dataTableMetaClient.getStorage(), 
Collections.singleton(RecordIndexUtils.RecordStatus.INSERT))
+            .get(RecordIndexUtils.RecordStatus.INSERT));
+      } else {
+        prevSliceRecordKeys = 
getValidRecordKeysForFileSlice(dataTableMetaClient, engineType,
+            previousFileSliceForFileId.getLogFiles().map(entry -> 
entry.getPath().toString()).collect(toList()),
+            tryResolveSchemaForTable(dataTableMetaClient), partitionPath, 
previousFileSliceForFileId.getBaseFile()
+                .map(entry -> new StoragePath(entry.getPath())), instantTime);
+      }
+
+      Set<String> latestSliceRecordKeys = 
getValidRecordKeysForFileSlice(dataTableMetaClient, engineType,
+          latestFileSlicesIncludingInflight.getLogFiles().map(entry -> 
entry.getPath().toString()).collect(toList()),
+          tryResolveSchemaForTable(dataTableMetaClient), partitionPath, 
previousFileSliceForFileId.getBaseFile()
+              .map(entry -> new StoragePath(entry.getPath())), instantTime);
+
+      // find inserts and deleted record keys for this file slice.
+      if (prevSliceRecordKeys.isEmpty()) {
+        LOG.warn("Did not find previous file slice for " + fileId + ", in 
partition " + partitionPath + ", while generating RLI records");
+        // everything is in INSERT
+        latestSliceRecordKeys.forEach(latestSliceRecordKey -> {
+          allRecords.add((HoodieRecord) 
HoodieMetadataPayload.createRecordIndexUpdate(latestSliceRecordKey, 
partitionPath, fileId,
+              instantTime, writesFileIdEncoding));
+        });
+      } else {
+        // if a record key is present in prev slice, but not in latest file 
slice, is considered as delete record.
+        prevSliceRecordKeys.forEach(prevSliceRecordKey -> {
+          if (!latestSliceRecordKeys.contains(prevSliceRecordKey)) {
+            // deleted
+            
allRecords.add(HoodieMetadataPayload.createRecordIndexDelete(prevSliceRecordKey));

Review Comment:
   How about a key exists both on previous and current file slices, and the 
current file slice contains a `DELETE` record with lower ordering value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to