alexeykudinkin commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r798117376



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -87,40 +89,58 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
    * @return a list of metadata table records
    */
   public static List<HoodieRecord> 
convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String 
instantTime) {
-    List<HoodieRecord> records = new LinkedList<>();
-    List<String> allPartitions = new LinkedList<>();
-    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, 
writeStats) -> {
-      final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) 
? NON_PARTITIONED_NAME : partitionStatName;
-      allPartitions.add(partition);
-
-      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
-      writeStats.forEach(hoodieWriteStat -> {
-        String pathWithPartition = hoodieWriteStat.getPath();
-        if (pathWithPartition == null) {
-          // Empty partition
-          LOG.warn("Unable to find path in write stat to update metadata table 
" + hoodieWriteStat);
-          return;
-        }
-
-        int offset = partition.equals(NON_PARTITIONED_NAME) ? 
(pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;
-        String filename = pathWithPartition.substring(offset);
-        long totalWriteBytes = newFiles.containsKey(filename)
-            ? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes()
-            : hoodieWriteStat.getTotalWriteBytes();
-        newFiles.put(filename, totalWriteBytes);
-      });
-      // New files added to a partition
-      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
-          partition, Option.of(newFiles), Option.empty());
-      records.add(record);
-    });
+    List<HoodieRecord> records = new 
ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
+
+    // Add record bearing partitions list
+    ArrayList<String> partitionsList = new 
ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
+
+    
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList));
+
+    // New files added to a partition
+    List<HoodieRecord<HoodieMetadataPayload>> updatedFilesRecords =
+        commitMetadata.getPartitionToWriteStats().entrySet()
+            .stream()
+            .map(entry -> {
+              String partitionStatName = entry.getKey();
+              List<HoodieWriteStat> writeStats = entry.getValue();
+
+              String partition = 
partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : 
partitionStatName;
+
+              HashMap<String, Long> updatedFilesToSizesMapping =
+                  writeStats.stream().reduce(new HashMap<>(writeStats.size()),
+                      (map, stat) -> {
+                        String pathWithPartition = stat.getPath();
+                        if (pathWithPartition == null) {
+                          // Empty partition
+                          LOG.warn("Unable to find path in write stat to 
update metadata table " + stat);
+                          return map;
+                        }
+
+                        int offset = partition.equals(NON_PARTITIONED_NAME)
+                            ? (pathWithPartition.startsWith("/") ? 1 : 0)
+                            : partition.length() + 1;
+                        String filename = pathWithPartition.substring(offset);
+
+                        // Since write-stats are coming in no particular 
order, if the same
+                        // file have previously been appended to w/in the txn, 
we simply pick max
+                        // of the sizes as reported after every write, since 
file-sizes are
+                        // monotonically increasing (ie file-size never goes 
down, unless deleted)
+                        map.merge(filename, stat.getFileSizeInBytes(), 
Math::max);

Review comment:
       It does -- only case where we might provide something other than the 
file-size is `AppendHandle`, and it does set this to the full file size (it's a 
contract of this API)
   
   
https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java#L417




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to