alexeykudinkin commented on a change in pull request #4739:
URL: https://github.com/apache/hudi/pull/4739#discussion_r798117376
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -87,40 +89,58 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
* @return a list of metadata table records
*/
public static List<HoodieRecord>
convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- List<String> allPartitions = new LinkedList<>();
- commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
- final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME)
? NON_PARTITIONED_NAME : partitionStatName;
- allPartitions.add(partition);
-
- Map<String, Long> newFiles = new HashMap<>(writeStats.size());
- writeStats.forEach(hoodieWriteStat -> {
- String pathWithPartition = hoodieWriteStat.getPath();
- if (pathWithPartition == null) {
- // Empty partition
- LOG.warn("Unable to find path in write stat to update metadata table
" + hoodieWriteStat);
- return;
- }
-
- int offset = partition.equals(NON_PARTITIONED_NAME) ?
(pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;
- String filename = pathWithPartition.substring(offset);
- long totalWriteBytes = newFiles.containsKey(filename)
- ? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes()
- : hoodieWriteStat.getTotalWriteBytes();
- newFiles.put(filename, totalWriteBytes);
- });
- // New files added to a partition
- HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
- partition, Option.of(newFiles), Option.empty());
- records.add(record);
- });
+ List<HoodieRecord> records = new
ArrayList<>(commitMetadata.getPartitionToWriteStats().size());
+
+ // Add record bearing partitions list
+ ArrayList<String> partitionsList = new
ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet());
+
+
records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList));
+
+ // New files added to a partition
+ List<HoodieRecord<HoodieMetadataPayload>> updatedFilesRecords =
+ commitMetadata.getPartitionToWriteStats().entrySet()
+ .stream()
+ .map(entry -> {
+ String partitionStatName = entry.getKey();
+ List<HoodieWriteStat> writeStats = entry.getValue();
+
+ String partition =
partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME :
partitionStatName;
+
+ HashMap<String, Long> updatedFilesToSizesMapping =
+ writeStats.stream().reduce(new HashMap<>(writeStats.size()),
+ (map, stat) -> {
+ String pathWithPartition = stat.getPath();
+ if (pathWithPartition == null) {
+ // Empty partition
+ LOG.warn("Unable to find path in write stat to
update metadata table " + stat);
+ return map;
+ }
+
+ int offset = partition.equals(NON_PARTITIONED_NAME)
+ ? (pathWithPartition.startsWith("/") ? 1 : 0)
+ : partition.length() + 1;
+ String filename = pathWithPartition.substring(offset);
+
+ // Since write-stats are coming in no particular
order, if the same
+ // file have previously been appended to w/in the txn,
we simply pick max
+ // of the sizes as reported after every write, since
file-sizes are
+ // monotonically increasing (ie file-size never goes
down, unless deleted)
+ map.merge(filename, stat.getFileSizeInBytes(),
Math::max);
Review comment:
It does -- only case where we might provide something other than the
file-size is `AppendHandle`, and it does set this to the full file size (it's a
contract of this API)
https://github.com/apache/hudi/blob/master/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java#L417
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]