This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e0ef594ed2e70dbc60f03b0d83794d1ecf7ecd69
Author: Danny Chan <[email protected]>
AuthorDate: Fri Feb 3 19:03:16 2023 +0800

    [HUDI-5682] Bucket index does not work correctly for multi-writer scenarios 
(#7838)
    
    Resolve conflicts with the bucket Ids instead of the whole file Ids because 
the last 24 characters are randomly generated.
    
    We do not make the last 24 characters in the file group Id as a constant, 
mainly because of the replace commit now shadows all the replaced files based 
on the file group ID, if the same file Id is used for the new file and the 
replaced file, then all the data in the new files can not be queried.
---
 ...urrentFileWritesConflictResolutionStrategy.java |  55 ++++++++
 .../org/apache/hudi/config/HoodieLockConfig.java   |   9 ++
 .../apache/hudi/index/bucket/BucketIdentifier.java |   6 +-
 ...urrentFileWritesConflictResolutionStrategy.java | 150 +++++++++++++++++++++
 4 files changed, 219 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
new file mode 100644
index 00000000000..503f1c42185
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction;
+
+import org.apache.hudi.index.bucket.BucketIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for 
concurrent writes {@link ConflictResolutionStrategy} using bucket index.
+ */
+public class BucketIndexConcurrentFileWritesConflictResolutionStrategy
+    extends SimpleConcurrentFileWritesConflictResolutionStrategy {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BucketIndexConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public boolean hasConflict(ConcurrentOperation thisOperation, 
ConcurrentOperation otherOperation) {
+    // TODO : UUID's can clash even for insert/insert, handle that case.
+    Set<String> bucketIdsSetForFirstInstant = 
extractBucketIds(thisOperation.getMutatedFileIds());
+    Set<String> bucketIdsSetForSecondInstant = 
extractBucketIds(otherOperation.getMutatedFileIds());
+    Set<String> intersection = new HashSet<>(bucketIdsSetForFirstInstant);
+    intersection.retainAll(bucketIdsSetForSecondInstant);
+    if (!intersection.isEmpty()) {
+      LOG.info("Found conflicting writes between first operation = " + 
thisOperation
+          + ", second operation = " + otherOperation + " , intersecting bucket 
ids " + intersection);
+      return true;
+    }
+    return false;
+  }
+
+  private static Set<String> extractBucketIds(Set<String> fileIds) {
+    return 
fileIds.stream().map(BucketIdentifier::bucketIdStrFromFileId).collect(Collectors.toSet());
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index f4aeef09b5c..3c932756685 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.hudi.config;
 
+import 
org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
 import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.lock.LockProvider;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.index.HoodieIndex;
 
 import java.io.File;
 import java.io.FileReader;
@@ -185,6 +187,13 @@ public class HoodieLockConfig extends HoodieConfig {
   public static final ConfigProperty<String> 
WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME = ConfigProperty
       .key(LOCK_PREFIX + "conflict.resolution.strategy")
       
.defaultValue(SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName())
+      .withInferFunction(hoodieConfig -> {
+        if 
(HoodieIndex.IndexType.BUCKET.name().equalsIgnoreCase(hoodieConfig.getStringOrDefault(HoodieIndexConfig.INDEX_TYPE,
 null))) {
+          return 
Option.of(BucketIndexConcurrentFileWritesConflictResolutionStrategy.class.getName());
+        } else {
+          return 
Option.of(SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName());
+        }
+      })
       .sinceVersion("0.8.0")
       .withDocumentation("Lock provider class name, this should be subclass of 
"
           + "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 35f9205a8e5..5264c8b39f1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -76,7 +76,11 @@ public class BucketIdentifier implements Serializable {
   }
 
   public static int bucketIdFromFileId(String fileId) {
-    return Integer.parseInt(fileId.substring(0, 8));
+    return Integer.parseInt(bucketIdStrFromFileId(fileId));
+  }
+
+  public static String bucketIdStrFromFileId(String fileId) {
+    return fileId.substring(0, 8);
   }
 
   public static String bucketIdStr(int n) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
new file mode 100644
index 00000000000..24c578606d4
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class TestBucketIndexConcurrentFileWritesConflictResolutionStrategy 
extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  public void init() throws IOException {
+    initMetaClient();
+  }
+
+  @Test
+  public void testNoConcurrentWrites() throws Exception {
+    String newInstantTime = HoodieTestTable.makeNewCommitTime();
+    createCommit(newInstantTime);
+    // consider commits before this are all successful
+
+    Option<HoodieInstant> lastSuccessfulInstant = 
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    newInstantTime = HoodieTestTable.makeNewCommitTime();
+    Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime));
+
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    Stream<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient.getActiveTimeline(), 
currentInstant.get(), lastSuccessfulInstant);
+    Assertions.assertEquals(0, candidateInstants.count());
+  }
+
+  @Test
+  public void testConcurrentWrites() throws Exception {
+    String newInstantTime = HoodieTestTable.makeNewCommitTime();
+    createCommit(newInstantTime);
+    // consider commits before this are all successful
+    // writer 1
+    createInflightCommit(HoodieTestTable.makeNewCommitTime());
+    // writer 2
+    createInflightCommit(HoodieTestTable.makeNewCommitTime());
+    Option<HoodieInstant> lastSuccessfulInstant = 
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    newInstantTime = HoodieTestTable.makeNewCommitTime();
+    Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    Stream<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(metaClient.getActiveTimeline(), 
currentInstant.get(), lastSuccessfulInstant);
+    Assertions.assertEquals(0, candidateInstants.count());
+  }
+
+  @Test
+  public void testConcurrentWritesWithInterleavingSuccessfulCommit() throws 
Exception {
+    createCommit(HoodieActiveTimeline.createNewInstantTime());
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    // consider commits before this are all successful
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    // writer 1 starts
+    String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
+    createInflightCommit(currentWriterInstant);
+    // writer 2 starts and finishes
+    String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    createCommit(newInstantTime);
+
+    Option<HoodieInstant> currentInstant = Option.of(new 
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
currentWriterInstant));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new 
BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = 
createCommitMetadata(currentWriterInstant);
+    timeline = timeline.reload();
+    List<HoodieInstant> candidateInstants = 
strategy.getCandidateInstants(timeline, currentInstant.get(), 
lastSuccessfulInstant).collect(
+        Collectors.toList());
+    // writer 1 conflicts with writer 2
+    Assertions.assertEquals(1, candidateInstants.size());
+    ConcurrentOperation thatCommitOperation = new 
ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation thisCommitOperation = new 
ConcurrentOperation(currentInstant.get(), currentMetadata);
+    Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, 
thatCommitOperation));
+    try {
+      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
+      Assertions.fail("Cannot reach here, writer 1 and writer 2 should have 
thrown a conflict");
+    } catch (HoodieWriteConflictException e) {
+      // expected
+    }
+  }
+
+  private void createCommit(String instantTime) throws Exception {
+    String fileId1 = "00000001-file-" + instantTime + "-1";
+    String fileId2 = "00000002-file-"  + instantTime + "-2";
+
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addMetadata("test", "test");
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setFileId(fileId1);
+    
commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 writeStat);
+    commitMetadata.setOperationType(WriteOperationType.INSERT);
+    HoodieTestTable.of(metaClient)
+        .addCommit(instantTime, Option.of(commitMetadata))
+        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+  }
+
+  private HoodieCommitMetadata createCommitMetadata(String instantTime, String 
writeFileName) {
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addMetadata("test", "test");
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setFileId(writeFileName);
+    
commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 writeStat);
+    commitMetadata.setOperationType(WriteOperationType.INSERT);
+    return commitMetadata;
+  }
+
+  private HoodieCommitMetadata createCommitMetadata(String instantTime) {
+    return createCommitMetadata(instantTime, "00000001-file-" + instantTime + 
"-1");
+  }
+
+  private void createInflightCommit(String instantTime) throws Exception {
+    String fileId1 = "00000001-file-" + instantTime + "-1";
+    String fileId2 = "00000002-file-" + instantTime + "-2";
+    HoodieTestTable.of(metaClient)
+        .addInflightCommit(instantTime)
+        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+  }
+}

Reply via email to