This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f2ba0fead2 [HUDI-3085] Improve bulk insert partitioner abstraction
(#4441)
f2ba0fead2 is described below
commit f2ba0fead24072e37c68477d0cfc3489fa098938
Author: Yuwei XIAO <[email protected]>
AuthorDate: Mon Apr 25 18:42:17 2022 +0800
[HUDI-3085] Improve bulk insert partitioner abstraction (#4441)
---
.../apache/hudi/table/BulkInsertPartitioner.java | 28 +++++++++++++++++++++-
.../table/action/commit/BaseBulkInsertHelper.java | 2 +-
.../run/strategy/JavaExecutionStrategy.java | 11 +++++----
.../table/action/commit/JavaBulkInsertHelper.java | 17 +++++++------
.../MultipleSparkJobExecutionStrategy.java | 5 ++--
.../SparkSingleFileSortExecutionStrategy.java | 1 +
.../bulkinsert/BulkInsertMapFunction.java | 11 +++++----
.../bulkinsert/RDDSpatialCurveSortPartitioner.java | 11 ++++-----
.../table/action/commit/SparkBulkInsertHelper.java | 24 +++++++------------
9 files changed, 65 insertions(+), 45 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
index fd1558a823..63b502531a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
@@ -18,12 +18,18 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.WriteHandleFactory;
+
+import java.io.Serializable;
+
/**
* Repartition input records into at least expected number of output spark
partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. -
Average records per output spark
* partitions should be almost equal to (#inputRecords /
#outputSparkPartitions) to avoid possible skews.
*/
-public interface BulkInsertPartitioner<I> {
+public interface BulkInsertPartitioner<I> extends Serializable {
/**
* Repartitions the input records into at least expected number of output
spark partitions.
@@ -38,4 +44,24 @@ public interface BulkInsertPartitioner<I> {
* @return {@code true} if the records within a partition are sorted; {@code
false} otherwise.
*/
boolean arePartitionRecordsSorted();
+
+ /**
+ * Return file group id prefix for the given data partition.
+ * By defauult, return a new file group id prefix, so that incoming records
will route to a fresh new file group
+ * @param partitionId data partition
+ * @return
+ */
+ default String getFileIdPfx(int partitionId) {
+ return FSUtils.createNewFileIdPfx();
+ }
+
+ /**
+ * Return write handle factory for the given partition.
+ * @param partitionId data partition
+ * @return
+ */
+ default Option<WriteHandleFactory> getWriteHandleFactory(int partitionId) {
+ return Option.empty();
+ }
+
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index ad2145c350..5355194ff7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends
HoodieRecordPayload, I, K,
public abstract O bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table,
HoodieWriteConfig config,
boolean performDedupe,
- Option<BulkInsertPartitioner>
userDefinedBulkInsertPartitioner,
+ BulkInsertPartitioner partitioner,
boolean addMetadataFields,
int parallelism,
WriteHandleFactory writeHandleFactory);
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 7d7609f0fa..233c70ecf9 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -41,6 +41,7 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
+import
org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
@@ -121,16 +122,16 @@ public abstract class JavaExecutionStrategy<T extends
HoodieRecordPayload<T>>
*
* @param strategyParams Strategy parameters containing columns to sort the
data by when clustering.
* @param schema Schema of the data including metadata fields.
- * @return empty for now.
+ * @return partitioner for the java engine
*/
- protected Option<BulkInsertPartitioner<List<HoodieRecord<T>>>>
getPartitioner(Map<String, String> strategyParams, Schema schema) {
+ protected BulkInsertPartitioner<List<HoodieRecord<T>>>
getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
- return Option.of(new JavaCustomColumnsSortPartitioner(
+ return new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema),
- getWriteConfig().isConsistentLogicalTimestampEnabled()));
+ getWriteConfig().isConsistentLogicalTimestampEnabled());
} else {
- return Option.empty();
+ return
JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode());
}
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
index 39b2916732..e126372aa9 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -77,8 +77,11 @@ public class JavaBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Base
config.shouldAllowMultiWriteOnSameInstant());
}
+ BulkInsertPartitioner partitioner =
userDefinedBulkInsertPartitioner.orElse(JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
+
// write new files
- List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime,
table, config, performDedupe, userDefinedBulkInsertPartitioner, false,
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
+ List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime,
table, config, performDedupe, partitioner, false,
+ config.getBulkInsertShuffleParallelism(), new
CreateHandleFactory(false));
//update index
((BaseJavaCommitActionExecutor)
executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
@@ -90,7 +93,7 @@ public class JavaBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Base
HoodieTable<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
- Option<BulkInsertPartitioner>
userDefinedBulkInsertPartitioner,
+ BulkInsertPartitioner partitioner,
boolean useWriterSchema,
int parallelism,
WriteHandleFactory writeHandleFactory) {
@@ -103,12 +106,7 @@ public class JavaBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Base
parallelism, table);
}
- final List<HoodieRecord<T>> repartitionedRecords;
- BulkInsertPartitioner partitioner =
userDefinedBulkInsertPartitioner.isPresent()
- ? userDefinedBulkInsertPartitioner.get()
- :
JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
- // only List is supported for Java partitioner, but it is not enforced by
BulkInsertPartitioner API. To improve this, TODO HUDI-3463
- repartitionedRecords = (List<HoodieRecord<T>>)
partitioner.repartitionRecords(dedupedRecords, parallelism);
+ final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>)
partitioner.repartitionRecords(dedupedRecords, parallelism);
FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider)
ReflectionUtils.loadClass(
config.getFileIdPrefixProviderClassName(),
@@ -119,7 +117,8 @@ public class JavaBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Base
new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true,
config, instantTime, table,
fileIdPrefixProvider.createFilePrefix(""),
table.getTaskContextSupplier(),
- new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
+ // Always get the first WriteHandleFactory, as there is only a single
data partition for hudi java engine.
+ (WriteHandleFactory)
partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll);
return writeStatuses;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 5a03cdf3bc..e09457f0e5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -46,6 +46,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
+import
org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.io.IOUtils;
@@ -137,7 +138,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T
extends HoodieRecordPa
* @param schema Schema of the data including metadata fields.
* @return {@link RDDCustomColumnsSortPartitioner} if sort columns are
provided, otherwise empty.
*/
- protected Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>>
getPartitioner(Map<String, String> strategyParams, Schema schema) {
+ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>
getPartitioner(Map<String, String> strategyParams, Schema schema) {
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));
@@ -159,7 +160,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T
extends HoodieRecordPa
default:
throw new UnsupportedOperationException(String.format("Layout
optimization strategy '%s' is not supported", layoutOptStrategy));
}
- });
+
}).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index 4a7ee7bcee..b61017c34c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -72,6 +72,7 @@ public class SparkSingleFileSortExecutionStrategy<T extends
HoodieRecordPayload<
.withProps(getWriteConfig().getProps()).build();
// Since clustering will write to single file group using
HoodieUnboundedCreateHandle, set max file size to a large value.
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(Long.MAX_VALUE));
+
return (HoodieData<WriteStatus>)
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime,
getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups,
new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(),
preserveHoodieMetadata));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
index 24cdd70603..66c3bdddcb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.function.Function2;
@@ -41,27 +42,27 @@ public class BulkInsertMapFunction<T extends
HoodieRecordPayload>
private boolean areRecordsSorted;
private HoodieWriteConfig config;
private HoodieTable hoodieTable;
- private List<String> fileIDPrefixes;
private boolean useWriterSchema;
+ private BulkInsertPartitioner partitioner;
private WriteHandleFactory writeHandleFactory;
public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted,
HoodieWriteConfig config, HoodieTable
hoodieTable,
- List<String> fileIDPrefixes, boolean
useWriterSchema,
+ boolean useWriterSchema, BulkInsertPartitioner
partitioner,
WriteHandleFactory writeHandleFactory) {
this.instantTime = instantTime;
this.areRecordsSorted = areRecordsSorted;
this.config = config;
this.hoodieTable = hoodieTable;
- this.fileIDPrefixes = fileIDPrefixes;
this.useWriterSchema = useWriterSchema;
this.writeHandleFactory = writeHandleFactory;
+ this.partitioner = partitioner;
}
@Override
public Iterator<List<WriteStatus>> call(Integer partition,
Iterator<HoodieRecord<T>> recordItr) {
return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config,
instantTime, hoodieTable,
- fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(),
useWriterSchema,
- writeHandleFactory);
+ partitioner.getFileIdPfx(partition),
hoodieTable.getTaskContextSupplier(), useWriterSchema,
+ (WriteHandleFactory)
partitioner.getWriteHandleFactory(partition).orElse(this.writeHandleFactory));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 219fb0b165..50a0a534f8 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -49,9 +49,9 @@ import java.util.List;
public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
- private final HoodieSparkEngineContext sparkEngineContext;
+ private final transient HoodieSparkEngineContext sparkEngineContext;
private final String[] orderByColumns;
- private final Schema schema;
+ private final SerializableSchema schema;
private final HoodieClusteringConfig.LayoutOptimizationStrategy
layoutOptStrategy;
private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType
curveCompositionStrategyType;
@@ -64,14 +64,13 @@ public class RDDSpatialCurveSortPartitioner<T extends
HoodieRecordPayload>
this.orderByColumns = orderByColumns;
this.layoutOptStrategy = layoutOptStrategy;
this.curveCompositionStrategyType = curveCompositionStrategyType;
- this.schema = schema;
+ this.schema = new SerializableSchema(schema);
}
@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>>
records, int outputSparkPartitions) {
- SerializableSchema serializableSchema = new SerializableSchema(schema);
JavaRDD<GenericRecord> genericRecordsRDD =
- records.map(f -> (GenericRecord)
f.getData().getInsertValue(serializableSchema.get()).get());
+ records.map(f -> (GenericRecord)
f.getData().getInsertValue(schema.get()).get());
Dataset<Row> sourceDataset =
AvroConversionUtils.createDataFrame(
@@ -82,7 +81,7 @@ public class RDDSpatialCurveSortPartitioner<T extends
HoodieRecordPayload>
Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
- return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(),
schema.getNamespace(), false, Option.empty())
+ return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(),
schema.get().getNamespace(), false, Option.empty())
.toJavaRDD()
.map(record -> {
String key =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
index 38e38101b0..1652c35eb6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
@@ -20,7 +20,6 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -39,8 +38,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* A spark implementation of {@link BaseBulkInsertHelper}.
@@ -76,9 +73,12 @@ public class SparkBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Bas
table.getActiveTimeline().transitionRequestedToInflight(new
HoodieInstant(HoodieInstant.State.REQUESTED,
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
+
+ BulkInsertPartitioner partitioner =
userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));
+
// write new files
- HoodieData<WriteStatus> writeStatuses =
- bulkInsert(inputRecords, instantTime, table, config, performDedupe,
userDefinedBulkInsertPartitioner, false,
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
+ HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords,
instantTime, table, config, performDedupe, partitioner, false,
+ config.getBulkInsertShuffleParallelism(), new
CreateHandleFactory(false));
//update index
((BaseSparkCommitActionExecutor)
executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
@@ -90,7 +90,7 @@ public class SparkBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Bas
HoodieTable<T,
HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>
table,
HoodieWriteConfig config,
boolean performDedupe,
- Option<BulkInsertPartitioner>
userDefinedBulkInsertPartitioner,
+ BulkInsertPartitioner partitioner,
boolean useWriterSchema,
int parallelism,
WriteHandleFactory
writeHandleFactory) {
@@ -103,20 +103,12 @@ public class SparkBulkInsertHelper<T extends
HoodieRecordPayload, R> extends Bas
parallelism, table);
}
- final HoodieData<HoodieRecord<T>> repartitionedRecords;
- BulkInsertPartitioner partitioner =
userDefinedBulkInsertPartitioner.isPresent()
- ? userDefinedBulkInsertPartitioner.get()
- :
BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
// only JavaRDD is supported for Spark partitioner, but it is not enforced
by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
- repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>)
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords),
parallelism));
-
- // generate new file ID prefixes for each output partition
- final List<String> fileIDPrefixes =
- IntStream.range(0, parallelism).mapToObj(i ->
FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
+ final HoodieData<HoodieRecord<T>> repartitionedRecords =
HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>)
partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords),
parallelism));
JavaRDD<WriteStatus> writeStatusRDD =
HoodieJavaRDD.getJavaRDD(repartitionedRecords)
.mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime,
- partitioner.arePartitionRecordsSorted(), config, table,
fileIDPrefixes, useWriterSchema, writeHandleFactory), true)
+ partitioner.arePartitionRecordsSorted(), config, table,
useWriterSchema, partitioner, writeHandleFactory), true)
.flatMap(List::iterator);
return HoodieJavaRDD.of(writeStatusRDD);