danny0405 commented on code in PR #13449:
URL: https://github.com/apache/hudi/pull/13449#discussion_r2155920705
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1134,26 +1138,48 @@ public void startCommit(String instantTime) {
@Override
public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
- List<MetadataPartitionType> partitionsToTag = new
ArrayList<>(enabledPartitionTypes);
- partitionsToTag.remove(FILES);
- partitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
- if (partitionsToTag.isEmpty()) {
+ Pair<List<MetadataPartitionType>, Set<String>> streamingMDTPartitionsPair
= getStreamingMetadataPartitionsToUpdate();
+ List<MetadataPartitionType> mdtPartitionsToTag =
streamingMDTPartitionsPair.getLeft();
+ Set<String> mdtPartitionPathsToTag = streamingMDTPartitionsPair.getRight();
+
+ if (mdtPartitionPathsToTag.isEmpty()) {
return engineContext.emptyHoodieData();
}
HoodieData<HoodieRecord> untaggedRecords = writeStatus.flatMap(
- new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(partitionsToTag,
dataWriteConfig));
+ new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(mdtPartitionsToTag,
dataWriteConfig));
// tag records w/ location
Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
hoodieFileGroupsToUpdateAndTaggedMdtRecords =
tagRecordsWithLocationForStreamingWrites(untaggedRecords,
-
partitionsToTag.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
+ mdtPartitionPathsToTag);
// write partial writes to MDT table (for those partitions where streaming
writes are enabled)
HoodieData<WriteStatus> writeStatusCollection =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
instantTime));
// dag not yet de-referenced. do not invoke any action on
writeStatusCollection yet.
return writeStatusCollection;
}
+ private Pair<List<MetadataPartitionType>, Set<String>>
getStreamingMetadataPartitionsToUpdate() {
+ List<MetadataPartitionType> mdtPartitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ mdtPartitionsToTag.remove(FILES);
+ mdtPartitionsToTag.remove(SECONDARY_INDEX); // mdt partition path will
have additional suffixes which will be added below.
+ mdtPartitionsToTag.remove(EXPRESSION_INDEX); // mdt partition path will
have additional suffixes which will be added below.
+ mdtPartitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+
+ Set<String> mdtPartitionPathsToTag = new
HashSet<>(mdtPartitionsToTag.stream().map(mdtPartitionToTag ->
mdtPartitionToTag.getPartitionPath()).collect(Collectors.toSet()));
+ if
(STREAMING_WRITES_SUPPORTED_PARTITION_PREFIXES.contains(SECONDARY_INDEX)) {
Review Comment:
This is always true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]