This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f05b5fc9db3 [HUDI-6962] Fix the conflicts resolution for bulk insert 
under NB-CC (#9896)
f05b5fc9db3 is described below

commit f05b5fc9db38e0bc4ccc2941cccf049991b67db2
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Oct 25 09:29:13 2023 +0800

    [HUDI-6962] Fix the conflicts resolution for bulk insert under NB-CC (#9896)
    
    * Flink bulk_insert with fixed file group id suffix if NB-CC is enabled;
    * The bulk_insert writer should resolve conflicts with other writers under 
OCC strategies.
---
 .../apache/hudi/client/utils/TransactionUtils.java |   5 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  11 +
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   4 +-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |   2 +-
 .../sink/bucket/BucketBulkInsertWriterHelper.java  |  14 +-
 .../hudi/sink/bulk/BulkInsertWriteFunction.java    |  15 +-
 .../java/org/apache/hudi/sink/utils/Pipelines.java |   3 +-
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java | 116 +++++++++++
 .../hudi/sink/utils/BulkInsertFunctionWrapper.java | 232 +++++++++++++++++++++
 .../org/apache/hudi/sink/utils/TestWriteBase.java  |  25 +++
 .../test/java/org/apache/hudi/utils/TestData.java  |   5 +-
 .../org/apache/hudi/adapter/TestStreamConfigs.java |  32 +++
 .../org/apache/hudi/adapter/TestStreamConfigs.java |  32 +++
 .../org/apache/hudi/adapter/TestStreamConfigs.java |  32 +++
 .../org/apache/hudi/adapter/TestStreamConfigs.java |  35 ++++
 .../org/apache/hudi/adapter/TestStreamConfigs.java |  35 ++++
 16 files changed, 581 insertions(+), 17 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index 15f6be8f79a..1bea51721c8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.utils;
 import org.apache.hudi.client.transaction.ConcurrentOperation;
 import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -67,8 +68,8 @@ public class TransactionUtils {
       Option<HoodieInstant> lastCompletedTxnOwnerInstant,
       boolean reloadActiveTimeline,
       Set<String> pendingInstants) throws HoodieWriteConflictException {
-    // Skip to resolve conflict if using non-blocking concurrency control
-    if 
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl() && 
!config.isNonBlockingConcurrencyControl()) {
+    WriteOperationType operationType = 
thisCommitMetadata.map(HoodieCommitMetadata::getOperationType).orElse(null);
+    if (config.needResolveWriteConflict(operationType)) {
       // deal with pendingInstants
       Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation = 
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), 
pendingInstants);
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index c9e9b94b1a9..8c08beaaef9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -46,6 +46,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.RecordPayloadType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.marker.MarkerType;
@@ -2616,6 +2617,16 @@ public class HoodieWriteConfig extends HoodieConfig {
     return props.getInteger(WRITES_FILEID_ENCODING, 
HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
   }
 
+  public boolean needResolveWriteConflict(WriteOperationType operationType) {
+    if (getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+      // NB-CC don't need to resolve write conflict except bulk insert 
operation
+      return WriteOperationType.BULK_INSERT == operationType || 
!isNonBlockingConcurrencyControl();
+    } else {
+      // SINGLE_WRITER case don't need to resolve write conflict
+      return false;
+    }
+  }
+
   public boolean isNonBlockingConcurrencyControl() {
     return getTableType().equals(HoodieTableType.MERGE_ON_READ)
         && getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index e4126785b24..a52e547195f 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -302,8 +302,8 @@ public class HoodieFlinkWriteClient<T> extends
    * Refresh the last transaction metadata,
    * should be called before the Driver starts a new transaction.
    */
-  public void preTxn(HoodieTableMetaClient metaClient) {
-    if (txnManager.isLockRequired() && 
!config.isNonBlockingConcurrencyControl()) {
+  public void preTxn(WriteOperationType operationType, HoodieTableMetaClient 
metaClient) {
+    if (txnManager.isLockRequired() && 
config.needResolveWriteConflict(operationType)) {
       // refresh the meta client which is reused
       metaClient.reloadActiveTimeline();
       this.lastCompletedTxnAndMetadata = 
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 34fb5cf3c67..860dffe99eb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -387,7 +387,7 @@ public class StreamWriteOperatorCoordinator
 
   private void startInstant() {
     // refresh the last txn metadata
-    this.writeClient.preTxn(this.metaClient);
+    this.writeClient.preTxn(tableState.operationType, this.metaClient);
     // put the assignment in front of metadata generation,
     // because the instant request from write task is asynchronous.
     this.instant = this.writeClient.startCommit(tableState.commitAction, 
this.metaClient);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index fcf38e112ad..c25a69df2c1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -92,16 +92,22 @@ public class BucketBulkInsertWriterHelper extends 
BulkInsertWriterHelper {
     return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
   }
 
-  private static String getFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) {
+  private static String getFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean 
needFixedFileIdSuffix) {
     String recordKey = keyGen.getRecordKey(record);
     String partition = keyGen.getPartitionPath(record);
     final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys, 
numBuckets);
     String bucketId = partition + bucketNum;
-    return bucketIdToFileId.computeIfAbsent(bucketId, k -> 
BucketIdentifier.newBucketFileIdPrefix(bucketNum));
+    return bucketIdToFileId.computeIfAbsent(bucketId, k -> {
+      if (needFixedFileIdSuffix) {
+        return BucketIdentifier.newBucketFileIdFixedSuffix(bucketNum);
+      } else {
+        return BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+      }
+    });
   }
 
-  public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) {
-    final String fileId = getFileId(bucketIdToFileId, keyGen, record, 
indexKeys, numBuckets);
+  public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets, boolean 
needFixedFileIdSuffix) {
+    final String fileId = getFileId(bucketIdToFileId, keyGen, record, 
indexKeys, numBuckets, needFixedFileIdSuffix);
     return GenericRowData.of(StringData.fromString(fileId), record);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index d44ef25ee4b..dfd8e0e7563 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -117,11 +117,11 @@ public class BulkInsertWriteFunction<I>
     this.ckpMetadata = 
CkpMetadataFactory.getCkpMetadata(writeClient.getConfig(), config);
     this.initInstant = lastPendingInstant();
     sendBootstrapEvent();
-    initWriterHelper();
   }
 
   @Override
   public void processElement(I value, Context ctx, Collector<Object> out) 
throws IOException {
+    initWriterHelperIfNeeded();
     this.writerHelper.write((RowData) value);
   }
 
@@ -136,6 +136,7 @@ public class BulkInsertWriteFunction<I>
    * End input action for batch source.
    */
   public void endInput() {
+    initWriterHelperIfNeeded();
     final List<WriteStatus> writeStatus = 
this.writerHelper.getWriteStatuses(this.taskID);
 
     final WriteMetadataEvent event = WriteMetadataEvent.builder()
@@ -165,11 +166,13 @@ public class BulkInsertWriteFunction<I>
   //  Utilities
   // -------------------------------------------------------------------------
 
-  private void initWriterHelper() {
-    String instant = instantToWrite();
-    this.writerHelper = WriterHelpers.getWriterHelper(this.config, 
this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
-        instant, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
-        this.rowType);
+  private void initWriterHelperIfNeeded() {
+    if (writerHelper == null) {
+      String instant = instantToWrite();
+      this.writerHelper = WriterHelpers.getWriterHelper(this.config, 
this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+          instant, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
+          this.rowType);
+    }
   }
 
   private void sendBootstrapEvent() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index cb9344f8d6c..e66009aa551 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -124,10 +124,11 @@ public class Pipelines {
       RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
       RowType rowTypeWithFileId = 
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
       InternalTypeInfo<RowData> typeInfo = 
InternalTypeInfo.of(rowTypeWithFileId);
+      boolean needFixedFileIdSuffix = 
OptionsResolver.isNonBlockingConcurrencyControl(conf);
 
       Map<String, String> bucketIdToFileId = new HashMap<>();
       dataStream = dataStream.partitionCustom(partitioner, 
keyGen::getHoodieKey)
-          .map(record -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, 
indexKeys, numBuckets), typeInfo)
+          .map(record -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, 
indexKeys, numBuckets, needFixedFileIdSuffix), typeInfo)
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same 
parallelism as write task to avoid shuffle
       if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
         SortOperatorGen sortOperatorGen = 
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 79320e1549f..816c9a4e655 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.utils.TestData;
@@ -228,6 +229,121 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
   }
 
+  // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+  //      |----------- txn1 -----------|
+  //                       |----- txn2 ------|
+  // the txn2 would fail to commit caused by conflict
+  @Test
+  public void testBulkInsertInMultiWriter() throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+    conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
+    conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
PartialUpdateAvroPayload.class.getName());
+    // disable schedule compaction in writers
+    conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+    conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+    // start pipeline1 and insert record: [id1,Danny,null,1,par1], suspend the 
tx commit
+    List<RowData> dataset1 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), StringData.fromString("Danny"), null,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+    TestHarness pipeline1 = preparePipeline(conf)
+        .consume(dataset1)
+        .assertEmptyDataFiles();
+
+    // start pipeline2 and bulk insert record: [id1,null,23,1,par1], suspend 
the tx commit
+    Configuration conf2 = conf.clone();
+    conf2.setString(FlinkOptions.OPERATION, "BULK_INSERT");
+    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+    List<RowData> dataset2 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), null, 23,
+            TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+    TestHarness pipeline2 = preparePipeline(conf2)
+        .consume(dataset2);
+
+    // step to commit the 1st txn
+    pipeline1.checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1);
+
+    // step to commit the 2nd txn, should throw exception
+    pipeline2.endInputThrows(HoodieWriteConflictException.class, "Cannot 
resolve conflicts");
+  }
+
+  // case1: txn1 is upsert writer, txn2 is bulk_insert writer.
+  //                       |----- txn1 ------|
+  //      |----------- txn2 -----------|
+  // both two txn would success to commit
+  @Test
+  public void testBulkInsertInSequence() throws Exception {
+    conf.setString(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), 
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name());
+    conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.BUCKET.name());
+    conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
PartialUpdateAvroPayload.class.getName());
+    // disable schedule compaction in writers
+    conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
+    conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+
+    Configuration conf1 = conf.clone();
+    conf1.setString(FlinkOptions.OPERATION, "BULK_INSERT");
+    // start pipeline1 and bulk insert record: [id1,Danny,null,1,par1], 
suspend the tx commit
+    List<RowData> dataset1 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), StringData.fromString("Danny"), null,
+            TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
+    TestHarness pipeline1 = preparePipeline(conf1)
+        .consume(dataset1);
+
+    // start pipeline2 and insert record: [id1,null,23,2,par1], suspend the tx 
commit
+    Configuration conf2 = conf.clone();
+    conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2");
+    List<RowData> dataset2 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id1"), null, 23,
+            TimestampData.fromEpochMillis(2), StringData.fromString("par1")));
+    TestHarness pipeline2 = preparePipeline(conf2)
+        .consume(dataset2);
+
+    // step to commit the 1st txn
+    pipeline1.endInput();
+
+    // step to commit the 2nd data
+    pipeline2.checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1);
+
+    // snapshot result is [(id1,Danny,23,2,par1)] after two writers finish to 
commit
+    Map<String, String> tmpSnapshotResult = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
+    pipeline2.checkWrittenData(tmpSnapshotResult, 1);
+
+    // schedule compaction outside of all writers
+    try (HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
+      Option<String> scheduleInstant = 
writeClient.scheduleCompaction(Option.empty());
+      assertNotNull(scheduleInstant.get());
+    }
+
+    // step to commit the 3rd txn
+    // it also triggers inline compactor
+    List<RowData> dataset3 = Collections.singletonList(
+        insertRow(
+            StringData.fromString("id3"), StringData.fromString("Julian"), 53,
+            TimestampData.fromEpochMillis(4), StringData.fromString("par1")));
+    pipeline2.consume(dataset3)
+        .checkpoint(2)
+        .assertNextEvent()
+        .checkpointComplete(2);
+
+    // snapshot read result is [(id1,Danny,23,2,par1), (id3,Julian,53,4,par1)] 
after three writers finish to commit
+    Map<String, String> finalSnapshotResult = Collections.singletonMap(
+        "par1",
+        "[id1,par1,id1,Danny,23,2,par1, id3,par1,id3,Julian,53,4,par1]");
+    pipeline2.checkWrittenData(finalSnapshotResult, 1);
+    // read optimized read result is [(id1,Danny,23,2,par1)]
+    // because the data files belongs 3rd commit is not included in the last 
compaction.
+    Map<String, String> readOptimizedResult = Collections.singletonMap("par1", 
"[id1,par1,id1,Danny,23,2,par1]");
+    TestData.checkWrittenData(tempFile, readOptimizedResult, 1);
+  }
+
   @Override
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
new file mode 100644
index 00000000000..92f8f6decda
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.adapter.TestStreamConfigs;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.BulkInsertWriteFunction;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.sort.SortOperator;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.MockStreamTaskBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper class to manipulate the {@link BulkInsertWriteFunction} instance 
for testing.
+ *
+ * @param <I> Input type
+ */
+public class BulkInsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
+  private final Configuration conf;
+  private final RowType rowType;
+  private final RowType rowTypeWithFileId;
+
+  private final IOManager ioManager;
+  private final MockStreamingRuntimeContext runtimeContext;
+  private final MockOperatorEventGateway gateway;
+  private final MockOperatorCoordinatorContext coordinatorContext;
+  private final StreamWriteOperatorCoordinator coordinator;
+  private final boolean needSortInput;
+
+  private BulkInsertWriteFunction<RowData> writeFunction;
+  private MapFunction<RowData, RowData> mapFunction;
+  private Map<String, String> bucketIdToFileId;
+  private SortOperator sortOperator;
+  private CollectorOutput<RowData> output;
+
+  public BulkInsertFunctionWrapper(String tablePath, Configuration conf) 
throws Exception {
+    ioManager = new IOManagerAsync();
+    MockEnvironment environment = new MockEnvironmentBuilder()
+        .setTaskName("mockTask")
+        .setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+        .setIOManager(ioManager)
+        .build();
+    this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, 
environment);
+    this.gateway = new MockOperatorEventGateway();
+    this.conf = conf;
+    this.rowType = (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)).getLogicalType();
+    this.rowTypeWithFileId = 
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
+    this.coordinatorContext = new MockOperatorCoordinatorContext(new 
OperatorID(), 1);
+    this.coordinator = new StreamWriteOperatorCoordinator(conf, 
this.coordinatorContext);
+    this.needSortInput = 
conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
+  }
+
+  public void openFunction() throws Exception {
+    this.coordinator.start();
+    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+    setupWriteFunction();
+    setupMapFunction();
+    if (needSortInput) {
+      setupSortOperator();
+    }
+  }
+
+  public void invoke(I record) throws Exception {
+    RowData recordWithFileId = mapFunction.map((RowData) record);
+    if (needSortInput) {
+      // Sort input first, trigger writeFunction at the #endInput
+      sortOperator.processElement(new StreamRecord(recordWithFileId));
+    } else {
+      writeFunction.processElement(recordWithFileId, null, null);
+    }
+  }
+
+  public WriteMetadataEvent[] getEventBuffer() {
+    return this.coordinator.getEventBuffer();
+  }
+
+  public OperatorEvent getNextEvent() {
+    return this.gateway.getNextEvent();
+  }
+
+  public void checkpointFunction(long checkpointId) {
+    // Do nothing
+  }
+
+  @Override
+  public void endInput() {
+    if (needSortInput) {
+      // sort all inputs of SortOperator and flush to WriteFunction
+      try {
+        sortOperator.endInput();
+        List<RowData> sortedRecords = output.getRecords();
+        for (RowData record : sortedRecords) {
+          writeFunction.processElement(record, null, null);
+        }
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    }
+    writeFunction.endInput();
+    if (bucketIdToFileId != null) {
+      this.bucketIdToFileId.clear();
+    }
+  }
+
+  public void checkpointComplete(long checkpointId) {
+    coordinator.notifyCheckpointComplete(checkpointId);
+  }
+
+  public void coordinatorFails() throws Exception {
+    this.coordinator.close();
+    this.coordinator.start();
+    this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+  }
+
+  public void checkpointFails(long checkpointId) {
+    coordinator.notifyCheckpointAborted(checkpointId);
+  }
+
+  public StreamWriteOperatorCoordinator getCoordinator() {
+    return coordinator;
+  }
+
+  public MockOperatorCoordinatorContext getCoordinatorContext() {
+    return coordinatorContext;
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.coordinator.close();
+    this.ioManager.close();
+    this.writeFunction.close();
+    if (this.bucketIdToFileId != null) {
+      this.bucketIdToFileId.clear();
+    }
+    if (needSortInput) {
+      this.sortOperator.close();
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private void setupWriteFunction() throws Exception {
+    writeFunction = new BulkInsertWriteFunction<>(conf, rowType);
+    writeFunction.setRuntimeContext(runtimeContext);
+    writeFunction.setOperatorEventGateway(gateway);
+    writeFunction.open(conf);
+    // handle the bootstrap event
+    coordinator.handleEventFromOperator(0, getNextEvent());
+  }
+
+  private void setupMapFunction() {
+    RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
+    String indexKeys = OptionsResolver.getIndexKeyField(conf);
+    int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+    boolean needFixedFileIdSuffix = 
OptionsResolver.isNonBlockingConcurrencyControl(conf);
+    this.bucketIdToFileId = new HashMap<>();
+    this.mapFunction = r -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, r, 
indexKeys, numBuckets, needFixedFileIdSuffix);
+  }
+
+  private void setupSortOperator() throws Exception {
+    MockEnvironment environment = new MockEnvironmentBuilder()
+        .setTaskName("mockTask")
+        .setManagedMemorySize(12 * MemoryManager.DEFAULT_PAGE_SIZE)
+        .setIOManager(ioManager)
+        .build();
+    StreamTask<?, ?> streamTask = new MockStreamTaskBuilder(environment)
+        .setConfig(new StreamConfig(conf))
+        .setExecutionConfig(new ExecutionConfig().enableObjectReuse())
+        .build();
+    SortOperatorGen sortOperatorGen = 
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
+    this.sortOperator = (SortOperator) 
sortOperatorGen.createSortOperator(conf);
+    this.sortOperator.setProcessingTimeService(new 
TestProcessingTimeService());
+    this.output = new CollectorOutput<>();
+    StreamConfig streamConfig = new StreamConfig(conf);
+    streamConfig.setOperatorID(new OperatorID());
+    RowDataSerializer inputSerializer = new 
RowDataSerializer(rowTypeWithFileId);
+    TestStreamConfigs.setupNetworkInputs(streamConfig, inputSerializer);
+    
streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR,
 .99);
+    this.sortOperator.setup(streamTask, streamConfig, output);
+    this.sortOperator.open();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index 1b301cb3cb1..b2208b88b53 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -305,6 +305,31 @@ public class TestWriteBase {
       return this;
     }
 
+    /**
+     * Flush data and commit using endInput. Asserts the commit would fail.
+     */
+    public void endInputThrows(Class<?> cause, String msg) {
+      // this triggers the data write and event send
+      this.pipeline.endInput();
+      final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+      this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+      assertTrue(this.pipeline.getCoordinatorContext().isJobFailed(), "Job 
should have been failed");
+      Throwable throwable = 
this.pipeline.getCoordinatorContext().getJobFailureReason().getCause();
+      assertThat(throwable, instanceOf(cause));
+      assertThat(throwable.getMessage(), containsString(msg));
+    }
+
+    /**
+     * Flush data and commit using endInput.
+     */
+    public TestHarness endInput() {
+      // this triggers the data write and event send
+      this.pipeline.endInput();
+      final OperatorEvent nextEvent = this.pipeline.getNextEvent();
+      this.pipeline.getCoordinator().handleEventFromOperator(0, nextEvent);
+      return this;
+    }
+
     /**
      * Asserts the checkpoint with id {@code checkpointId} throws when 
completes .
      */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index afaf3608049..732065c0a3c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -34,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.utils.BucketStreamWriteFunctionWrapper;
+import org.apache.hudi.sink.utils.BulkInsertFunctionWrapper;
 import org.apache.hudi.sink.utils.ConsistentBucketStreamWriteFunctionWrapper;
 import org.apache.hudi.sink.utils.InsertFunctionWrapper;
 import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
@@ -553,7 +554,9 @@ public class TestData {
    * Initializes a writing pipeline with given configuration.
    */
   public static TestFunctionWrapper<RowData> getWritePipeline(String basePath, 
Configuration conf) throws Exception {
-    if (OptionsResolver.isAppendMode(conf)) {
+    if (OptionsResolver.isBulkInsertOperation(conf)) {
+      return new BulkInsertFunctionWrapper<>(basePath, conf);
+    } else if (OptionsResolver.isAppendMode(conf)) {
       return new InsertFunctionWrapper<>(basePath, conf);
     } else if (OptionsResolver.isBucketIndexType(conf)) {
       if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
diff --git 
a/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
 
b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 00000000000..4b62c790b58
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.13.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, 
TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 00000000000..4b62c790b58
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, 
TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 00000000000..4b62c790b58
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, 
TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 00000000000..a7a620b4ec1
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, 
TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+    // Since Flink 1.16, need call serializeAllConfigs to serialize all object 
configs synchronously.
+    // See https://issues.apache.org/jira/browse/FLINK-26675.
+    streamConfig.serializeAllConfigs();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
new file mode 100644
index 00000000000..a7a620b4ec1
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/test/java/org/apache/hudi/adapter/TestStreamConfigs.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.adapter;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/**
+ * StreamConfig for test goals.
+ */
+public class TestStreamConfigs {
+
+  public static void setupNetworkInputs(StreamConfig streamConfig, 
TypeSerializer<?>... inputSerializers) {
+    streamConfig.setupNetworkInputs(inputSerializers);
+    // Since Flink 1.16, need call serializeAllConfigs to serialize all object 
configs synchronously.
+    // See https://issues.apache.org/jira/browse/FLINK-26675.
+    streamConfig.serializeAllConfigs();
+  }
+}


Reply via email to