This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f96ba7a  [HUDI-3642] Handle NPE due to empty requested replacecommit 
metadata (#5090)
f96ba7a is described below

commit f96ba7abf043246e98f550f858e047411a12c550
Author: Sagar Sumit <sagarsumi...@gmail.com>
AuthorDate: Thu Mar 24 00:43:02 2022 +0530

    [HUDI-3642] Handle NPE due to empty requested replacecommit metadata (#5090)
---
 .../client/transaction/ConcurrentOperation.java    | 48 ++++++++++++++++------
 .../org/apache/hudi/common/util/CommitUtils.java   |  2 +-
 2 files changed, 37 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
index e78a157..40da7dc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentOperation.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.client.transaction;
 
-import java.io.IOException;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.client.utils.MetadataConversionUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -27,15 +26,18 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Option;
+
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.hudi.common.util.Option;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static 
org.apache.hudi.common.util.CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord;
 
 /**
  * This class is used to hold all information used to identify how to resolve 
conflicts between instants.
@@ -52,7 +54,7 @@ public class ConcurrentOperation {
   private final String instantTime;
   private Set<String> mutatedFileIds = Collections.EMPTY_SET;
 
-  public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient 
metaClient) throws IOException  {
+  public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient 
metaClient) throws IOException {
     this.metadataWrapper = new 
HoodieMetadataWrapper(MetadataConversionUtils.createMetaWrapper(instant, 
metaClient));
     this.commitMetadataOption = Option.empty();
     this.actionState = instant.getState().name();
@@ -106,24 +108,37 @@ public class ConcurrentOperation {
           break;
         case COMMIT_ACTION:
         case DELTA_COMMIT_ACTION:
-          this.mutatedFileIds = 
CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
+          this.mutatedFileIds = 
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
               .getPartitionToWriteStats()).keySet();
           this.operationType = 
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
           break;
         case REPLACE_COMMIT_ACTION:
           if (instant.isCompleted()) {
-            this.mutatedFileIds = 
CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(
+            this.mutatedFileIds = 
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(
                 
this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats()).keySet();
             this.operationType = 
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getOperationType());
           } else {
+            // we need to have different handling for requested and inflight 
replacecommit because
+            // for requested replacecommit, clustering will generate a plan 
and HoodieRequestedReplaceMetadata will not be empty, but 
insert_overwrite/insert_overwrite_table could have empty content
+            // for inflight replacecommit, clustering will have no content in 
metadata, but insert_overwrite/insert_overwrite_table will have some commit 
metadata
             HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata();
-            this.mutatedFileIds = requestedReplaceMetadata
-                .getClusteringPlan().getInputGroups()
-                .stream()
-                .flatMap(ig -> ig.getSlices().stream())
-                .map(file -> file.getFileId())
-                .collect(Collectors.toSet());
-            this.operationType = WriteOperationType.CLUSTER;
+            org.apache.hudi.avro.model.HoodieCommitMetadata 
inflightCommitMetadata = 
this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata();
+            if (instant.isRequested()) {
+              if (requestedReplaceMetadata != null) {
+                this.mutatedFileIds = 
getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
+                this.operationType = WriteOperationType.CLUSTER;
+              }
+            } else {
+              if (inflightCommitMetadata != null) {
+                this.mutatedFileIds = 
getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
+                this.operationType = 
WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
+              } else if (requestedReplaceMetadata != null) {
+                // inflight replacecommit metadata is empty due to clustering, 
read fileIds from requested replacecommit
+                this.mutatedFileIds = 
getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
+                this.operationType = WriteOperationType.CLUSTER;
+              }
+              // NOTE: it cannot be the case that instant is inflight, and 
both the requested and inflight replacecommit metadata are empty
+            }
           }
           break;
         default:
@@ -142,6 +157,15 @@ public class ConcurrentOperation {
     }
   }
 
+  private static Set<String> 
getFileIdsFromRequestedReplaceMetadata(HoodieRequestedReplaceMetadata 
requestedReplaceMetadata) {
+    return requestedReplaceMetadata
+        .getClusteringPlan().getInputGroups()
+        .stream()
+        .flatMap(ig -> ig.getSlices().stream())
+        .map(file -> file.getFileId())
+        .collect(Collectors.toSet());
+  }
+
   @Override
   public String toString() {
     return "{"
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index 9970687..08b775f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -97,7 +97,7 @@ public class CommitUtils {
                                                              String 
commitActionType,
                                                              
WriteOperationType operationType) {
     final HoodieCommitMetadata commitMetadata;
-    if (commitActionType == HoodieTimeline.REPLACE_COMMIT_ACTION) {
+    if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(commitActionType)) {
       HoodieReplaceCommitMetadata replaceMetadata = new 
HoodieReplaceCommitMetadata();
       replaceMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
       commitMetadata = replaceMetadata;

Reply via email to