nsivabalan commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r798098988



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -215,19 +215,16 @@ public HoodieMetadataPayload 
preCombine(HoodieMetadataPayload previousRecord) {
 
     if (filesystemMetadata != null) {
       filesystemMetadata.forEach((filename, fileInfo) -> {
-        // If the filename wasnt present then we carry it forward
-        if (!combinedFileInfo.containsKey(filename)) {
-          combinedFileInfo.put(filename, fileInfo);
+        if (fileInfo.getIsDeleted()) {
+          combinedFileInfo.remove(filename);
         } else {
-          if (fileInfo.getIsDeleted()) {
-            // file deletion
-            combinedFileInfo.remove(filename);
-          } else {
-            // file appends.
-            combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) -> {
-              return new HoodieMetadataFileInfo(oldFileInfo.getSize() + 
newFileInfo.getSize(), false);
-            });
-          }
+          // NOTE: There are 2 possible cases here:
+          //    - New file is created: in that case we're simply adding its 
info
+          //    - File is appended to (only log-files of MOR tables on 
supported FS): in that case
+          //      we simply pick the info w/ largest file-size as the most 
recent one, since file's
+          //      sizes are increasing monotonically (meaning that the larger 
file-size is more recent one)
+          combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, 
newFileInfo) ->

Review comment:
       merge func takes care of adding an entry for the first time and hence 
remove L219 and 220 ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -87,40 +89,58 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
    * @return a list of metadata table records
    */
   public static List<HoodieRecord> 
convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    List<String> allPartitions = new LinkedList<>();
-    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, 
writeStats) -> {
-      final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) 
? NON_PARTITIONED_NAME : partitionStatName;
-      allPartitions.add(partition);
-
-      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
-      writeStats.forEach(hoodieWriteStat -> {
-        String pathWithPartition = hoodieWriteStat.getPath();
-        if (pathWithPartition == null) {
-          // Empty partition
-          LOG.warn("Unable to find path in write stat to update metadata table 
" + hoodieWriteStat);
-          return;
-        }
-
-        int offset = partition.equals(NON_PARTITIONED_NAME) ? 
(pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;
-        String filename = pathWithPartition.substring(offset);
-        long totalWriteBytes = newFiles.containsKey(filename)
-            ? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes()
-            : hoodieWriteStat.getTotalWriteBytes();
-        newFiles.put(filename, totalWriteBytes);
-      });
-      // New files added to a partition
-      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
-          partition, Option.of(newFiles), Option.empty());
-      records.add(record);
-    });
+    List<HoodieRecord> records = new 
ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
+
+    // Add record bearing partitions list
+    ArrayList<String> partitionsList = new 
ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
+
+    
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList));
+
+    // New files added to a partition
+    List<HoodieRecord<HoodieMetadataPayload>> updatedFilesRecords =
+        commitMetadata.getPartitionToWriteStats().entrySet()
+            .stream()
+            .map(entry -> {
+              String partitionStatName = entry.getKey();
+              List<HoodieWriteStat> writeStats = entry.getValue();
+
+              String partition = 
partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : 
partitionStatName;
+
+              HashMap<String, Long> updatedFilesToSizesMapping =
+                  writeStats.stream().reduce(new HashMap<>(writeStats.size()),
+                      (map, stat) -> {
+                        String pathWithPartition = stat.getPath();
+                        if (pathWithPartition == null) {
+                          // Empty partition
+                          LOG.warn("Unable to find path in write stat to 
update metadata table " + stat);
+                          return map;
+                        }
+
+                        int offset = partition.equals(NON_PARTITIONED_NAME)
+                            ? (pathWithPartition.startsWith("/") ? 1 : 0)
+                            : partition.length() + 1;
+                        String filename = pathWithPartition.substring(offset);
+
+                        // Since write-stats are coming in no particular 
order, if the same
+                        // file have previously been appended to w/in the txn, 
we simply pick max
+                        // of the sizes as reported after every write, since 
file-sizes are
+                        // monotonically increasing (ie file-size never goes 
down, unless deleted)
+                        map.merge(filename, stat.getFileSizeInBytes(), 
Math::max);

Review comment:
       I am not sure if fileSizeInBytes will contain the entire file size. 
   Can you add some extra assertions to existing tests in TestHoodieLogFormat. 
we can be certain the fileSizeIbBytes always contain full file size. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to