This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f96ba7a [HUDI-3642] Handle NPE due to empty requested replacecommit metadata (#5090) f96ba7a is described below commit f96ba7abf043246e98f550f858e047411a12c550 Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Thu Mar 24 00:43:02 2022 +0530 [HUDI-3642] Handle NPE due to empty requested replacecommit metadata (#5090) --- .../client/transaction/ConcurrentOperation.java | 48 ++++++++++++++++------ .../org/apache/hudi/common/util/CommitUtils.java | 2 +- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java index e78a157..40da7dc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java @@ -18,7 +18,6 @@ package org.apache.hudi.client.transaction; -import java.io.IOException; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -27,15 +26,18 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; import java.util.Collections; import java.util.Set; import java.util.stream.Collectors; -import org.apache.hudi.common.util.Option; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; +import static org.apache.hudi.common.util.CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord; /** * This class is used to hold all information used to identify how to resolve conflicts between instants. @@ -52,7 +54,7 @@ public class ConcurrentOperation { private final String instantTime; private Set<String> mutatedFileIds = Collections.EMPTY_SET; - public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient metaClient) throws IOException { + public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient metaClient) throws IOException { this.metadataWrapper = new HoodieMetadataWrapper(MetadataConversionUtils.createMetaWrapper(instant, metaClient)); this.commitMetadataOption = Option.empty(); this.actionState = instant.getState().name(); @@ -106,24 +108,37 @@ public class ConcurrentOperation { break; case COMMIT_ACTION: case DELTA_COMMIT_ACTION: - this.mutatedFileIds = CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata() + this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata() .getPartitionToWriteStats()).keySet(); this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType()); break; case REPLACE_COMMIT_ACTION: if (instant.isCompleted()) { - this.mutatedFileIds = CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord( + this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord( this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats()).keySet(); this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getOperationType()); } else { + // we need to have different handling for requested and inflight replacecommit because + // for requested replacecommit, clustering will generate a plan and HoodieRequestedReplaceMetadata will not be empty, but insert_overwrite/insert_overwrite_table could have empty content + // for inflight replacecommit, clustering will have no content in metadata, but insert_overwrite/insert_overwrite_table will have some commit metadata HoodieRequestedReplaceMetadata requestedReplaceMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata(); - this.mutatedFileIds = requestedReplaceMetadata - .getClusteringPlan().getInputGroups() - .stream() - .flatMap(ig -> ig.getSlices().stream()) - .map(file -> file.getFileId()) - .collect(Collectors.toSet()); - this.operationType = WriteOperationType.CLUSTER; + org.apache.hudi.avro.model.HoodieCommitMetadata inflightCommitMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata(); + if (instant.isRequested()) { + if (requestedReplaceMetadata != null) { + this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata); + this.operationType = WriteOperationType.CLUSTER; + } + } else { + if (inflightCommitMetadata != null) { + this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet(); + this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType()); + } else if (requestedReplaceMetadata != null) { + // inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit + this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata); + this.operationType = WriteOperationType.CLUSTER; + } + // NOTE: it cannot be the case that instant is inflight, and both the requested and inflight replacecommit metadata are empty + } } break; default: @@ -142,6 +157,15 @@ public class ConcurrentOperation { } } + private static Set<String> getFileIdsFromRequestedReplaceMetadata(HoodieRequestedReplaceMetadata requestedReplaceMetadata) { + return requestedReplaceMetadata + .getClusteringPlan().getInputGroups() + .stream() + .flatMap(ig -> ig.getSlices().stream()) + .map(file -> file.getFileId()) + .collect(Collectors.toSet()); + } + @Override public String toString() { return "{" diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index 9970687..08b775f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -97,7 +97,7 @@ public class CommitUtils { String commitActionType, WriteOperationType operationType) { final HoodieCommitMetadata commitMetadata; - if (commitActionType == HoodieTimeline.REPLACE_COMMIT_ACTION) { + if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(commitActionType)) { HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); replaceMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds); commitMetadata = replaceMetadata;