This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a93c6eec678 [HUDI-8866] Unified the file naming rules in NBCC mode 
(#12627)
a93c6eec678 is described below

commit a93c6eec6788c4a984eb93a875d97f25211e4509
Author: TheR1sing3un <chaoy...@apache.org>
AuthorDate: Fri Jan 17 11:44:54 2025 +0800

    [HUDI-8866] Unified the file naming rules in NBCC mode (#12627)
---
 .../apache/hudi/index/bucket/BucketIdentifier.java |   7 +
 .../BucketBulkInsertDataInternalWriterHelper.java  |   4 +-
 .../action/commit/SparkBucketIndexPartitioner.java |  12 +-
 .../sink/bucket/BucketBulkInsertWriterHelper.java  |   2 +-
 .../sink/bucket/BucketStreamWriteFunction.java     |   2 +-
 .../TestSparkNonBlockingConcurrencyControl.java    | 229 +++++++++++++++++++--
 6 files changed, 238 insertions(+), 18 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index ea98ecc009d..3c433b7760a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -115,6 +115,13 @@ public class BucketIdentifier implements Serializable {
     return newBucketFileIdFixedSuffix(bucketIdStr(bucketId));
   }
 
+  /**
+   * Generate a new file id for NBCC mode, file id is fixed for each bucket 
with format: "{bucket_id}-0000-0000-0000-000000000000-0"
+   */
+  public static String newBucketFileIdForNBCC(int bucketId) {
+    return FSUtils.createNewFileId(newBucketFileIdFixedSuffix(bucketId), 0);
+  }
+
   public static boolean isBucketFileName(String name) {
     return BUCKET_NAME.matcher(name).matches();
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
index b4b1f03473f..68123c02b79 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -50,6 +50,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
   private final Map<Pair<UTF8String, Integer>, HoodieRowCreateHandle> handles;
   protected final String indexKeyFields;
   protected final int bucketNum;
+  private final boolean isNonBlockingConcurrencyControl;
 
   public BucketBulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
                                                   String instantTime, int 
taskPartitionId, long taskId, long taskEpochId, StructType structType,
@@ -64,6 +65,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
     this.indexKeyFields = 
writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, 
writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));
     this.bucketNum = 
writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
     this.handles = new HashMap<>();
+    this.isNonBlockingConcurrencyControl = 
writeConfig.isNonBlockingConcurrencyControl();
   }
 
   public void write(InternalRow row) throws IOException {
@@ -126,6 +128,6 @@ public class BucketBulkInsertDataInternalWriterHelper 
extends BulkInsertDataInte
   }
 
   protected String getNextBucketFileId(int bucketInt) {
-    return BucketIdentifier.newBucketFileIdPrefix(getNextFileId(), bucketInt);
+    return BucketIdentifier.newBucketFileIdPrefix(bucketInt, 
isNonBlockingConcurrencyControl);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
index 3b3b6667b9c..95fc1e5d951 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -118,6 +119,8 @@ public class SparkBucketIndexPartitioner<T> extends
     String bucketId = BucketIdentifier.bucketIdStr(bucketNumber % numBuckets);
     // Insert overwrite always generates new bucket file id
     if (isOverwrite) {
+      ValidationUtils.checkArgument(!isNonBlockingConcurrencyControl,
+          "Insert overwrite is not supported with non-blocking concurrency 
control");
       return new BucketInfo(BucketType.INSERT, 
BucketIdentifier.newBucketFileIdPrefix(bucketId), partitionPath);
     }
     Option<String> fileIdOption = 
Option.fromJavaOptional(updatePartitionPathFileIds
@@ -128,9 +131,12 @@ public class SparkBucketIndexPartitioner<T> extends
       return new BucketInfo(BucketType.UPDATE, fileIdOption.get(), 
partitionPath);
     } else {
       // Always write into log file instead of base file if using NB-CC
-      BucketType bucketType = isNonBlockingConcurrencyControl ? 
BucketType.UPDATE : BucketType.INSERT;
-      String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId, 
isNonBlockingConcurrencyControl);
-      return new BucketInfo(bucketType, fileIdPrefix, partitionPath);
+      if (isNonBlockingConcurrencyControl) {
+        String fileId = BucketIdentifier.newBucketFileIdForNBCC(bucketNumber);
+        return new BucketInfo(BucketType.UPDATE, fileId, partitionPath);
+      }
+      String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId);
+      return new BucketInfo(BucketType.INSERT, fileIdPrefix, partitionPath);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index b84c44af832..7685bffb468 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -97,7 +97,7 @@ public class BucketBulkInsertWriterHelper extends 
BulkInsertWriterHelper {
     String partition = keyGen.getPartitionPath(record);
     final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys, 
numBuckets);
     String bucketId = partition + bucketNum;
-    return bucketIdToFileId.computeIfAbsent(bucketId, k -> 
BucketIdentifier.newBucketFileIdPrefix(bucketNum, needFixedFileIdSuffix));
+    return bucketIdToFileId.computeIfAbsent(bucketId, k -> 
needFixedFileIdSuffix ? BucketIdentifier.newBucketFileIdForNBCC(bucketNum) : 
BucketIdentifier.newBucketFileIdPrefix(bucketNum));
   }
 
   public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean 
needFixedFileIdSuffix) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 6c31f37cc91..b0461969d97 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -141,7 +141,7 @@ public class BucketStreamWriteFunction<I> extends 
StreamWriteFunction<I> {
     } else if (bucketToFileId.containsKey(bucketNum)) {
       location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
     } else {
-      String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum, 
isNonBlockingConcurrencyControl);
+      String newFileId = isNonBlockingConcurrencyControl ? 
BucketIdentifier.newBucketFileIdForNBCC(bucketNum) : 
BucketIdentifier.newBucketFileIdPrefix(bucketNum);
       location = new HoodieRecordLocation("I", newFileId);
       bucketToFileId.put(bucketNum, newFileId);
       incBucketIndex.add(bucketId);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
index 704d6e8420b..b215fd5b9d4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
@@ -36,6 +36,7 @@ import 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -299,12 +300,58 @@ public class TestSparkNonBlockingConcurrencyControl 
extends SparkClientFunctiona
     }
   }
 
-  // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
-  //      |----------- txn1 -----------|
-  //                       |----- txn2 ------|
-  // the txn2 would fail to commit caused by conflict
+  /**
+   * case1:
+   * 1. insert start
+   * 2. insert commit
+   * 3. bulk_insert start
+   * 4. bulk_insert commit
+   *
+   *  |------ txn1: insert ------|
+   *                             |------ txn2: bulk_insert ------|
+   *
+   *  both two txn would success to commit
+   */
   @Test
-  public void testBulkInsertInMultiWriter() throws Exception {
+  public void testBulkInsertAndInsertConcurrentCase1() throws Exception {
+    HoodieWriteConfig config = createHoodieWriteConfig();
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
+
+    // start the 1st txn and insert record: [id1,Danny,null,1,par1], commit 
the 1st txn
+    SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+    List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+    String insertTime1 = client1.createNewInstantTime();
+    writeData(client1, insertTime1, dataset1, true, WriteOperationType.INSERT);
+
+    // start the 2nd txn and bulk_insert record: [id1,null,23,2,par1], commit 
the 2nd txn
+    SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+    List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+    String insertTime2 = client2.createNewInstantTime();
+    writeData(client2, insertTime2, dataset2, true, 
WriteOperationType.BULK_INSERT);
+
+    // do compaction
+    String compactionTime = (String) 
client1.scheduleCompaction(Option.empty()).get();
+    client1.compact(compactionTime);
+
+    // result is [(id1,Danny,23,2,par1)]
+    Map<String, String> result = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
+    checkWrittenData(result, 1);
+  }
+
+  /**
+   * case2:
+   * 1. insert start
+   * 2. bulk_insert start
+   * 3. insert commit
+   * 4. bulk_insert commit
+   *
+   *  |------ txn1: insert ------|
+   *                      |------ txn2: bulk_insert ------|
+   *
+   *  the txn2 should be fail to commit caused by conflict
+   */
+  @Test
+  public void testBulkInsertAndInsertConcurrentCase2() throws Exception {
     HoodieWriteConfig config = createHoodieWriteConfig();
     metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
     SparkRDDWriteClient client1 = getHoodieWriteClient(config);
@@ -337,12 +384,170 @@ public class TestSparkNonBlockingConcurrencyControl 
extends SparkClientFunctiona
     });
   }
 
-  // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
-  //                       |----- txn1 ------|
-  //      |--- txn2 ----|
-  // both two txn would success to commit
+  /**
+   * case3:
+   * 1. bulk_insert start
+   * 2. insert start
+   * 3. insert commit
+   * 4. bulk_insert commit
+   *
+   *          |------ txn2: insert ------|
+   *    |---------- txn1: bulk_insert ----------|
+   *
+   *  the txn2 should be fail to commit caused by conflict
+   */
+  @Test
+  public void testBulkInsertAndInsertConcurrentCase3() throws Exception {
+    HoodieWriteConfig config = createHoodieWriteConfig();
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
+    SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+
+    // start the 1st txn and bulk insert record: [id1,null,23,2,par1], suspend 
the tx commit
+    SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+    List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+    String insertTime2 = client2.createNewInstantTime();
+    List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2, 
dataset2, false, WriteOperationType.BULK_INSERT);
+
+    // start the 2nd txn and insert record: [id1,Danny,null,1,par1], suspend 
the tx commit
+    List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+    String insertTime1 = client1.createNewInstantTime();
+    List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, 
dataset1, false, WriteOperationType.INSERT);
+
+    // step to commit the 2nd txn
+    client1.commitStats(
+        insertTime1,
+        
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // step to commit the 1st txn
+    assertThrows(HoodieWriteConflictException.class, () -> {
+      client2.commitStats(
+          insertTime2,
+          
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+          Option.empty(),
+          metaClient.getCommitActionType());
+    });
+  }
+
+  /**
+   * case4:
+   * 1. insert start
+   * 2. bulk_insert start
+   * 3. bulk_insert commit
+   * 4. insert commit
+   *
+   *  |------------ txn1: insert ------------|
+   *      |------ txn2: bulk_insert ------|
+   *
+   *  both two txn would success to commit
+   */
+  @Test
+  public void testBulkInsertAndInsertConcurrentCase4() throws Exception {
+    HoodieWriteConfig config = createHoodieWriteConfig();
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
+    SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+
+    // start the 1st txn and insert record: [id1,Danny,null,1,par1], suspend 
the tx commit
+    List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+    String insertTime1 = client1.createNewInstantTime();
+    List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, 
dataset1, false, WriteOperationType.INSERT);
+
+    // start the 2nd txn and bulk insert record: [id1,null,23,2,par1], suspend 
the tx commit
+    SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+    List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+    String insertTime2 = client2.createNewInstantTime();
+    List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2, 
dataset2, false, WriteOperationType.BULK_INSERT);
+
+    // step to commit the 2nd txn
+    client2.commitStats(
+        insertTime2,
+        
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // step to commit the 1st txn
+    client1.commitStats(
+        insertTime1,
+        
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // do compaction
+    String compactionTime = (String) 
client1.scheduleCompaction(Option.empty()).get();
+    client1.compact(compactionTime);
+
+    // result is [(id1,Danny,23,2,par1)]
+    Map<String, String> result = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
+    checkWrittenData(result, 1);
+  }
+
+  /**
+   * case5:
+   * 1. bulk_insert start
+   * 2. insert start
+   * 3. bulk_insert commit
+   * 4. insert commit
+   *
+   *                          |------ txn2: insert ------|
+   *    |---------- txn1: bulk_insert ----------|
+   *
+   *  both two txn would success to commit
+   */
+  @Test
+  public void testBulkInsertAndInsertConcurrentCase5() throws Exception {
+    HoodieWriteConfig config = createHoodieWriteConfig();
+    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
+    SparkRDDWriteClient client1 = getHoodieWriteClient(config);
+
+    // start the 1st txn and bulk insert record: [id1,null,23,2,par1], suspend 
the tx commit
+    SparkRDDWriteClient client2 = getHoodieWriteClient(config);
+    List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
+    String insertTime2 = client2.createNewInstantTime();
+    List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2, 
dataset2, false, WriteOperationType.BULK_INSERT);
+
+    // start the 2nd txn and insert record: [id1,Danny,null,1,par1], suspend 
the tx commit
+    List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1");
+    String insertTime1 = client1.createNewInstantTime();
+    List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, 
dataset1, false, WriteOperationType.INSERT);
+
+    // step to commit the 1st txn
+    client2.commitStats(
+        insertTime2,
+        
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // step to commit the 2nd txn
+    client1.commitStats(
+        insertTime1,
+        
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(),
+        metaClient.getCommitActionType());
+
+    // do compaction
+    String compactionTime = (String) 
client1.scheduleCompaction(Option.empty()).get();
+    client1.compact(compactionTime);
+
+    // result is [(id1,Danny,23,2,par1)]
+    Map<String, String> result = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
+    checkWrittenData(result, 1);
+  }
+
+  /**
+   * case6:
+   * 1. bulk_insert start
+   * 2. bulk_insert commit
+   * 3. insert start
+   * 4. insert commit
+   *
+   *                                   |------ txn2: insert ------|
+   *  |------ txn1: bulk_insert ------|
+   *
+   *  both two txn would success to commit
+   */
   @Test
-  public void testBulkInsertInSequence() throws Exception {
+  public void testBulkInsertAndInsertConcurrentCase6() throws Exception {
     HoodieWriteConfig config = createHoodieWriteConfig();
     metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, 
config.getProps());
 
@@ -352,7 +557,7 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     String insertTime1 = client1.createNewInstantTime();
     writeData(client1, insertTime1, dataset1, true, 
WriteOperationType.BULK_INSERT);
 
-    // start the 1st txn and insert record: [id1,null,23,2,par1], commit the 
2nd txn
+    // start the 2nd txn and insert record: [id1,null,23,2,par1], commit the 
2nd txn
     SparkRDDWriteClient client2 = getHoodieWriteClient(config);
     List<String> dataset2 = Collections.singletonList("id1,,23,2,par1");
     String insertTime2 = client2.createNewInstantTime();
@@ -380,6 +585,7 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
     props.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
     String basePath = basePath();
     return HoodieWriteConfig.newBuilder()
+        
.withProps(Collections.singletonMap(HoodieTableConfig.PRECOMBINE_FIELD.key(), 
"ts"))
         .forTable("test")
         .withPath(basePath)
         .withSchema(jsonSchema)
@@ -389,7 +595,6 @@ public class TestSparkNonBlockingConcurrencyControl extends 
SparkClientFunctiona
         .withPayloadConfig(
             HoodiePayloadConfig.newBuilder()
                 .withPayloadClass(payloadClassName)
-                .withPayloadOrderingField("ts")
                 .build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withMaxNumDeltaCommitsBeforeCompaction(1).build())

Reply via email to