This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new a93c6eec678 [HUDI-8866] Unified the file naming rules in NBCC mode (#12627) a93c6eec678 is described below commit a93c6eec6788c4a984eb93a875d97f25211e4509 Author: TheR1sing3un <chaoy...@apache.org> AuthorDate: Fri Jan 17 11:44:54 2025 +0800 [HUDI-8866] Unified the file naming rules in NBCC mode (#12627) --- .../apache/hudi/index/bucket/BucketIdentifier.java | 7 + .../BucketBulkInsertDataInternalWriterHelper.java | 4 +- .../action/commit/SparkBucketIndexPartitioner.java | 12 +- .../sink/bucket/BucketBulkInsertWriterHelper.java | 2 +- .../sink/bucket/BucketStreamWriteFunction.java | 2 +- .../TestSparkNonBlockingConcurrencyControl.java | 229 +++++++++++++++++++-- 6 files changed, 238 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java index ea98ecc009d..3c433b7760a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -115,6 +115,13 @@ public class BucketIdentifier implements Serializable { return newBucketFileIdFixedSuffix(bucketIdStr(bucketId)); } + /** + * Generate a new file id for NBCC mode, file id is fixed for each bucket with format: "{bucket_id}-0000-0000-0000-000000000000-0" + */ + public static String newBucketFileIdForNBCC(int bucketId) { + return FSUtils.createNewFileId(newBucketFileIdFixedSuffix(bucketId), 0); + } + public static boolean isBucketFileName(String name) { return BUCKET_NAME.matcher(name).matches(); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java index b4b1f03473f..68123c02b79 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java @@ -50,6 +50,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends BulkInsertDataInte private final Map<Pair<UTF8String, Integer>, HoodieRowCreateHandle> handles; protected final String indexKeyFields; protected final int bucketNum; + private final boolean isNonBlockingConcurrencyControl; public BucketBulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, @@ -64,6 +65,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends BulkInsertDataInte this.indexKeyFields = writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())); this.bucketNum = writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS); this.handles = new HashMap<>(); + this.isNonBlockingConcurrencyControl = writeConfig.isNonBlockingConcurrencyControl(); } public void write(InternalRow row) throws IOException { @@ -126,6 +128,6 @@ public class BucketBulkInsertDataInternalWriterHelper extends BulkInsertDataInte } protected String getNextBucketFileId(int bucketInt) { - return BucketIdentifier.newBucketFileIdPrefix(getNextFileId(), bucketInt); + return BucketIdentifier.newBucketFileIdPrefix(bucketInt, isNonBlockingConcurrencyControl); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java index 3b3b6667b9c..95fc1e5d951 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -118,6 +119,8 @@ public class SparkBucketIndexPartitioner<T> extends String bucketId = BucketIdentifier.bucketIdStr(bucketNumber % numBuckets); // Insert overwrite always generates new bucket file id if (isOverwrite) { + ValidationUtils.checkArgument(!isNonBlockingConcurrencyControl, + "Insert overwrite is not supported with non-blocking concurrency control"); return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucketId), partitionPath); } Option<String> fileIdOption = Option.fromJavaOptional(updatePartitionPathFileIds @@ -128,9 +131,12 @@ public class SparkBucketIndexPartitioner<T> extends return new BucketInfo(BucketType.UPDATE, fileIdOption.get(), partitionPath); } else { // Always write into log file instead of base file if using NB-CC - BucketType bucketType = isNonBlockingConcurrencyControl ? BucketType.UPDATE : BucketType.INSERT; - String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId, isNonBlockingConcurrencyControl); - return new BucketInfo(bucketType, fileIdPrefix, partitionPath); + if (isNonBlockingConcurrencyControl) { + String fileId = BucketIdentifier.newBucketFileIdForNBCC(bucketNumber); + return new BucketInfo(BucketType.UPDATE, fileId, partitionPath); + } + String fileIdPrefix = BucketIdentifier.newBucketFileIdPrefix(bucketId); + return new BucketInfo(BucketType.INSERT, fileIdPrefix, partitionPath); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java index b84c44af832..7685bffb468 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java @@ -97,7 +97,7 @@ public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper { String partition = keyGen.getPartitionPath(record); final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys, numBuckets); String bucketId = partition + bucketNum; - return bucketIdToFileId.computeIfAbsent(bucketId, k -> BucketIdentifier.newBucketFileIdPrefix(bucketNum, needFixedFileIdSuffix)); + return bucketIdToFileId.computeIfAbsent(bucketId, k -> needFixedFileIdSuffix ? BucketIdentifier.newBucketFileIdForNBCC(bucketNum) : BucketIdentifier.newBucketFileIdPrefix(bucketNum)); } public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean needFixedFileIdSuffix) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index 6c31f37cc91..b0461969d97 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -141,7 +141,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> { } else if (bucketToFileId.containsKey(bucketNum)) { location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum)); } else { - String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum, isNonBlockingConcurrencyControl); + String newFileId = isNonBlockingConcurrencyControl ? BucketIdentifier.newBucketFileIdForNBCC(bucketNum) : BucketIdentifier.newBucketFileIdPrefix(bucketNum); location = new HoodieRecordLocation("I", newFileId); bucketToFileId.put(bucketNum, newFileId); incBucketIndex.add(bucketId); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java index 704d6e8420b..b215fd5b9d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -299,12 +300,58 @@ public class TestSparkNonBlockingConcurrencyControl extends SparkClientFunctiona } } - // case1: txn1 is upsert writer, txn2 is bulk_insert writer. - // |----------- txn1 -----------| - // |----- txn2 ------| - // the txn2 would fail to commit caused by conflict + /** + * case1: + * 1. insert start + * 2. insert commit + * 3. bulk_insert start + * 4. bulk_insert commit + * + * |------ txn1: insert ------| + * |------ txn2: bulk_insert ------| + * + * both two txn would success to commit + */ @Test - public void testBulkInsertInMultiWriter() throws Exception { + public void testBulkInsertAndInsertConcurrentCase1() throws Exception { + HoodieWriteConfig config = createHoodieWriteConfig(); + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); + + // start the 1st txn and insert record: [id1,Danny,null,1,par1], commit the 1st txn + SparkRDDWriteClient client1 = getHoodieWriteClient(config); + List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1"); + String insertTime1 = client1.createNewInstantTime(); + writeData(client1, insertTime1, dataset1, true, WriteOperationType.INSERT); + + // start the 2nd txn and bulk_insert record: [id1,null,23,2,par1], commit the 2nd txn + SparkRDDWriteClient client2 = getHoodieWriteClient(config); + List<String> dataset2 = Collections.singletonList("id1,,23,2,par1"); + String insertTime2 = client2.createNewInstantTime(); + writeData(client2, insertTime2, dataset2, true, WriteOperationType.BULK_INSERT); + + // do compaction + String compactionTime = (String) client1.scheduleCompaction(Option.empty()).get(); + client1.compact(compactionTime); + + // result is [(id1,Danny,23,2,par1)] + Map<String, String> result = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]"); + checkWrittenData(result, 1); + } + + /** + * case2: + * 1. insert start + * 2. bulk_insert start + * 3. insert commit + * 4. bulk_insert commit + * + * |------ txn1: insert ------| + * |------ txn2: bulk_insert ------| + * + * the txn2 should be fail to commit caused by conflict + */ + @Test + public void testBulkInsertAndInsertConcurrentCase2() throws Exception { HoodieWriteConfig config = createHoodieWriteConfig(); metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); SparkRDDWriteClient client1 = getHoodieWriteClient(config); @@ -337,12 +384,170 @@ public class TestSparkNonBlockingConcurrencyControl extends SparkClientFunctiona }); } - // case1: txn1 is upsert writer, txn2 is bulk_insert writer. - // |----- txn1 ------| - // |--- txn2 ----| - // both two txn would success to commit + /** + * case3: + * 1. bulk_insert start + * 2. insert start + * 3. insert commit + * 4. bulk_insert commit + * + * |------ txn2: insert ------| + * |---------- txn1: bulk_insert ----------| + * + * the txn2 should be fail to commit caused by conflict + */ + @Test + public void testBulkInsertAndInsertConcurrentCase3() throws Exception { + HoodieWriteConfig config = createHoodieWriteConfig(); + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); + SparkRDDWriteClient client1 = getHoodieWriteClient(config); + + // start the 1st txn and bulk insert record: [id1,null,23,2,par1], suspend the tx commit + SparkRDDWriteClient client2 = getHoodieWriteClient(config); + List<String> dataset2 = Collections.singletonList("id1,,23,2,par1"); + String insertTime2 = client2.createNewInstantTime(); + List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2, dataset2, false, WriteOperationType.BULK_INSERT); + + // start the 2nd txn and insert record: [id1,Danny,null,1,par1], suspend the tx commit + List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1"); + String insertTime1 = client1.createNewInstantTime(); + List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT); + + // step to commit the 2nd txn + client1.commitStats( + insertTime1, + writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.empty(), + metaClient.getCommitActionType()); + + // step to commit the 1st txn + assertThrows(HoodieWriteConflictException.class, () -> { + client2.commitStats( + insertTime2, + writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.empty(), + metaClient.getCommitActionType()); + }); + } + + /** + * case4: + * 1. insert start + * 2. bulk_insert start + * 3. bulk_insert commit + * 4. insert commit + * + * |------------ txn1: insert ------------| + * |------ txn2: bulk_insert ------| + * + * both two txn would success to commit + */ + @Test + public void testBulkInsertAndInsertConcurrentCase4() throws Exception { + HoodieWriteConfig config = createHoodieWriteConfig(); + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); + SparkRDDWriteClient client1 = getHoodieWriteClient(config); + + // start the 1st txn and insert record: [id1,Danny,null,1,par1], suspend the tx commit + List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1"); + String insertTime1 = client1.createNewInstantTime(); + List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT); + + // start the 2nd txn and bulk insert record: [id1,null,23,2,par1], suspend the tx commit + SparkRDDWriteClient client2 = getHoodieWriteClient(config); + List<String> dataset2 = Collections.singletonList("id1,,23,2,par1"); + String insertTime2 = client2.createNewInstantTime(); + List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2, dataset2, false, WriteOperationType.BULK_INSERT); + + // step to commit the 2nd txn + client2.commitStats( + insertTime2, + writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.empty(), + metaClient.getCommitActionType()); + + // step to commit the 1st txn + client1.commitStats( + insertTime1, + writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.empty(), + metaClient.getCommitActionType()); + + // do compaction + String compactionTime = (String) client1.scheduleCompaction(Option.empty()).get(); + client1.compact(compactionTime); + + // result is [(id1,Danny,23,2,par1)] + Map<String, String> result = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]"); + checkWrittenData(result, 1); + } + + /** + * case5: + * 1. bulk_insert start + * 2. insert start + * 3. bulk_insert commit + * 4. insert commit + * + * |------ txn2: insert ------| + * |---------- txn1: bulk_insert ----------| + * + * both two txn would success to commit + */ + @Test + public void testBulkInsertAndInsertConcurrentCase5() throws Exception { + HoodieWriteConfig config = createHoodieWriteConfig(); + metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); + SparkRDDWriteClient client1 = getHoodieWriteClient(config); + + // start the 1st txn and bulk insert record: [id1,null,23,2,par1], suspend the tx commit + SparkRDDWriteClient client2 = getHoodieWriteClient(config); + List<String> dataset2 = Collections.singletonList("id1,,23,2,par1"); + String insertTime2 = client2.createNewInstantTime(); + List<WriteStatus> writeStatuses2 = writeData(client2, insertTime2, dataset2, false, WriteOperationType.BULK_INSERT); + + // start the 2nd txn and insert record: [id1,Danny,null,1,par1], suspend the tx commit + List<String> dataset1 = Collections.singletonList("id1,Danny,,1,par1"); + String insertTime1 = client1.createNewInstantTime(); + List<WriteStatus> writeStatuses1 = writeData(client1, insertTime1, dataset1, false, WriteOperationType.INSERT); + + // step to commit the 1st txn + client2.commitStats( + insertTime2, + writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.empty(), + metaClient.getCommitActionType()); + + // step to commit the 2nd txn + client1.commitStats( + insertTime1, + writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Option.empty(), + metaClient.getCommitActionType()); + + // do compaction + String compactionTime = (String) client1.scheduleCompaction(Option.empty()).get(); + client1.compact(compactionTime); + + // result is [(id1,Danny,23,2,par1)] + Map<String, String> result = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]"); + checkWrittenData(result, 1); + } + + /** + * case6: + * 1. bulk_insert start + * 2. bulk_insert commit + * 3. insert start + * 4. insert commit + * + * |------ txn2: insert ------| + * |------ txn1: bulk_insert ------| + * + * both two txn would success to commit + */ @Test - public void testBulkInsertInSequence() throws Exception { + public void testBulkInsertAndInsertConcurrentCase6() throws Exception { HoodieWriteConfig config = createHoodieWriteConfig(); metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps()); @@ -352,7 +557,7 @@ public class TestSparkNonBlockingConcurrencyControl extends SparkClientFunctiona String insertTime1 = client1.createNewInstantTime(); writeData(client1, insertTime1, dataset1, true, WriteOperationType.BULK_INSERT); - // start the 1st txn and insert record: [id1,null,23,2,par1], commit the 2nd txn + // start the 2nd txn and insert record: [id1,null,23,2,par1], commit the 2nd txn SparkRDDWriteClient client2 = getHoodieWriteClient(config); List<String> dataset2 = Collections.singletonList("id1,,23,2,par1"); String insertTime2 = client2.createNewInstantTime(); @@ -380,6 +585,7 @@ public class TestSparkNonBlockingConcurrencyControl extends SparkClientFunctiona props.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); String basePath = basePath(); return HoodieWriteConfig.newBuilder() + .withProps(Collections.singletonMap(HoodieTableConfig.PRECOMBINE_FIELD.key(), "ts")) .forTable("test") .withPath(basePath) .withSchema(jsonSchema) @@ -389,7 +595,6 @@ public class TestSparkNonBlockingConcurrencyControl extends SparkClientFunctiona .withPayloadConfig( HoodiePayloadConfig.newBuilder() .withPayloadClass(payloadClassName) - .withPayloadOrderingField("ts") .build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withMaxNumDeltaCommitsBeforeCompaction(1).build())