This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 14bde81ed0da perf: Fixing streaming dag performance wrt dt write
status and mdt write status union (#13976)
14bde81ed0da is described below
commit 14bde81ed0da8c4a54fa3051530d7018693c95c3
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Oct 1 08:30:17 2025 -0700
perf: Fixing streaming dag performance wrt dt write status and mdt write
status union (#13976)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 8 +-
.../apache/hudi/client/CoalescingPartitioner.java | 47 ++++++
.../hudi/client/SparkRDDTableServiceClient.java | 35 +++-
.../apache/hudi/client/SparkRDDWriteClient.java | 5 +-
.../hudi/client/StreamingMetadataWriteHandler.java | 44 +++--
.../java/org/apache/hudi/data/HoodieJavaRDD.java | 5 +
.../hudi/client/TestCoalescingPartitioner.java | 187 +++++++++++++++++++++
.../hudi/client/TestSparkRDDWriteClient.java | 88 ++++++++++
.../client/TestStreamingMetadataWriteHandler.java | 110 ++++++++++++
.../org/apache/hudi/data/TestHoodieJavaRDD.java | 25 +++
.../hudi/common/config/HoodieMetadataConfig.java | 13 ++
.../org/apache/hudi/common/data/HoodieData.java | 9 +
.../apache/hudi/common/data/HoodieListData.java | 6 +
.../common/config/TestHoodieMetadataConfig.java | 23 +++
14 files changed, 587 insertions(+), 18 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index cd06a875d1a6..f31d9358883b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -235,7 +235,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
logCompactionTimer = metrics.getLogCompactionCtx();
WriteMarkersFactory.get(config.getMarkersType(), table,
logCompactionInstantTime);
HoodieWriteMetadata<T> writeMetadata = table.logCompact(context,
logCompactionInstantTime);
- HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime);
+ HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime,
WriteOperationType.LOG_COMPACT);
HoodieWriteMetadata<O> logCompactionMetadata =
convertToOutputMetadata(updatedWriteMetadata);
if (shouldComplete) {
commitLogCompaction(logCompactionInstantTime, logCompactionMetadata,
Option.of(table));
@@ -318,7 +318,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
}
compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime);
+ HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime,
WriteOperationType.COMPACT);
HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(updatedWriteMetadata);
if (shouldComplete) {
commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
@@ -331,7 +331,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
*
* @return The passed in {@code HoodieWriteMetadata} with probable partially
updated write statuses.
*/
- protected HoodieWriteMetadata<T> partialUpdateTableMetadata(HoodieTable
table, HoodieWriteMetadata<T> writeMetadata, String instantTime) {
+ protected HoodieWriteMetadata<T> partialUpdateTableMetadata(HoodieTable
table, HoodieWriteMetadata<T> writeMetadata, String instantTime,
WriteOperationType writeOperationType) {
return writeMetadata;
}
@@ -487,7 +487,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
clusteringTimer = metrics.getClusteringCtx();
LOG.info("Starting clustering at {} for table {}", clusteringInstant,
table.getConfig().getBasePath());
HoodieWriteMetadata<T> writeMetadata = table.cluster(context,
clusteringInstant);
- HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, clusteringInstant);
+ HoodieWriteMetadata<T> updatedWriteMetadata =
partialUpdateTableMetadata(table, writeMetadata, clusteringInstant,
WriteOperationType.CLUSTER);
HoodieWriteMetadata<O> clusteringMetadata =
convertToOutputMetadata(updatedWriteMetadata);
// TODO : Where is shouldComplete used ?
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/CoalescingPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/CoalescingPartitioner.java
new file mode 100644
index 000000000000..6a449ca39c95
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/CoalescingPartitioner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.spark.Partitioner;
+
+/**
+ * Partitioner to route all records to just 1 partition.
+ */
+public class CoalescingPartitioner extends Partitioner {
+
+ private final int numPartitions;
+
+ CoalescingPartitioner(int numPartitions) {
+ this.numPartitions = numPartitions;
+ }
+
+ @Override
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ if (numPartitions == 1) {
+ return 0;
+ } else {
+ return Math.abs(key.hashCode()) % numPartitions;
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
index a5e80f3fc755..77fe332c637f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.SparkReleaseResources;
import org.apache.hudi.client.utils.SparkValidatorUtils;
@@ -26,10 +27,16 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.table.HoodieSparkTable;
@@ -42,13 +49,24 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
public class SparkRDDTableServiceClient<T> extends
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>,
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
- private final StreamingMetadataWriteHandler streamingMetadataWriteHandler =
new StreamingMetadataWriteHandler();
+ private final StreamingMetadataWriteHandler streamingMetadataWriteHandler;
protected SparkRDDTableServiceClient(HoodieEngineContext context,
HoodieWriteConfig clientConfig,
Option<EmbeddedTimelineService>
timelineService) {
+ this(context, clientConfig, timelineService, new
StreamingMetadataWriteHandler());
+ }
+
+ @VisibleForTesting
+ public SparkRDDTableServiceClient(HoodieEngineContext context,
+ HoodieWriteConfig clientConfig,
+ Option<EmbeddedTimelineService>
timelineService,
+ StreamingMetadataWriteHandler
streamingMetadataWriteHandler) {
super(context, clientConfig, timelineService);
+ this.streamingMetadataWriteHandler = streamingMetadataWriteHandler;
}
@Override
@@ -80,9 +98,20 @@ public class SparkRDDTableServiceClient<T> extends
BaseHoodieTableServiceClient<
protected HoodieWriteMetadata<HoodieData<WriteStatus>>
partialUpdateTableMetadata(
HoodieTable table,
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
- String instantTime) {
+ String instantTime,
+ WriteOperationType writeOperationType) {
if (isStreamingWriteToMetadataEnabled(table)) {
-
writeMetadata.setWriteStatuses(streamingMetadataWriteHandler.streamWriteToMetadataTable(table,
writeMetadata.getWriteStatuses(), instantTime));
+ boolean enforceCoalesceWithRepartition = writeOperationType ==
WriteOperationType.CLUSTER && config.getBulkInsertSortMode() ==
BulkInsertSortMode.NONE;
+ if (enforceCoalesceWithRepartition) {
+ // check clustering plan for sort columns. only if there are no sort
columns, then we might still set enforceCoalesceWithRepartition to true.
+ HoodieClusteringPlan clusteringPlan =
ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(),
ClusteringUtils.getRequestedClusteringInstant(instantTime,
table.getActiveTimeline(), table.getInstantGenerator()).get())
+ .map(Pair::getRight).orElseThrow(() -> new
HoodieClusteringException(
+ "Unable to read clustering plan for instant: " + instantTime));
+ enforceCoalesceWithRepartition =
!clusteringPlan.getStrategy().getStrategyParams().containsKey(PLAN_STRATEGY_SORT_COLUMNS.key());
+ }
+
writeMetadata.setWriteStatuses(streamingMetadataWriteHandler.streamWriteToMetadataTable(table,
writeMetadata.getWriteStatuses(), instantTime,
+ enforceCoalesceWithRepartition,
config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites()));
}
return writeMetadata;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 678697521f1c..94b6e408a2f9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client;
import org.apache.hudi.callback.common.WriteStatusValidator;
+import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
@@ -108,7 +109,9 @@ public class SparkRDDWriteClient<T> extends
final JavaRDD<WriteStatus> writeStatuses;
if
(WriteOperationType.streamingWritesToMetadataSupported((getOperationType())) &&
isStreamingWriteToMetadataEnabled(table)) {
// this code block is expected to create a new Metadata Writer, start a
new commit in metadata table and trigger streaming write to metadata table.
- writeStatuses =
HoodieJavaRDD.getJavaRDD(streamingMetadataWriteHandler.streamWriteToMetadataTable(table,
HoodieJavaRDD.of(rawWriteStatuses), instantTime));
+ boolean enforceCoalesceWithRepartition = getOperationType() ==
WriteOperationType.BULK_INSERT && config.getBulkInsertSortMode() ==
BulkInsertSortMode.NONE;
+ writeStatuses =
HoodieJavaRDD.getJavaRDD(streamingMetadataWriteHandler.streamWriteToMetadataTable(table,
HoodieJavaRDD.of(rawWriteStatuses), instantTime,
+ enforceCoalesceWithRepartition,
config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites()));
} else {
writeStatuses = rawWriteStatuses;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
index effd7b9f1b18..93cf377cb6be 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
@@ -24,14 +24,21 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import scala.Tuple2;
+
/**
* Class to assist with streaming writes to metadata table.
*/
@@ -47,21 +54,25 @@ public class StreamingMetadataWriteHandler {
* @param table The {@link HoodieTable} instance for data
table of interest.
* @param dataTableWriteStatuses The {@link WriteStatus} from data table
writes.
* @param instantTime The instant time of interest.
- *
+ * @param enforceCoalesceWithRepartition true when repartition has to be
added to dag to coalesce data table write statuses to 1. false otherwise.
+ * @param coalesceDivisorForDataTableWrites assist with determining the
coalesce parallelism for data table write statuses. N data table write status
+ * spark partitions will be divied
by this value to find the coalesce parallelism.
* @return {@link HoodieData} of {@link WriteStatus} referring to both data
table writes and partial metadata table writes.
*/
- public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table,
HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime) {
+ public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table,
HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime,
+ boolean
enforceCoalesceWithRepartition, int coalesceDivisorForDataTableWrites) {
Option<HoodieTableMetadataWriter> metadataWriterOpt =
getMetadataWriter(instantTime, table);
ValidationUtils.checkState(metadataWriterOpt.isPresent(),
"Cannot instantiate metadata writer for the table of interest " +
table.getMetaClient().getBasePath());
- return streamWriteToMetadataTable(dataTableWriteStatuses,
metadataWriterOpt.get(), table, instantTime);
+ return streamWriteToMetadataTable(dataTableWriteStatuses,
metadataWriterOpt.get(), table, instantTime, enforceCoalesceWithRepartition,
+ coalesceDivisorForDataTableWrites);
}
/**
* To be invoked by write client or table service client to complete the
write to metadata table.
*
* <p>When streaming writes is enabled, writes to left over metadata
partitions
- * which is not covered in {@link #streamWriteToMetadataTable(HoodieTable,
HoodieData, String)},
+ * which is not covered in {@link #streamWriteToMetadataTable(HoodieTable,
HoodieData, String, Boolean, Integer)},
* otherwise writes to metadata table in legacy way(batch update without
partial updates).
*
* @param table The {@link HoodieTable} instance for data table of
interest.
@@ -87,12 +98,24 @@ public class StreamingMetadataWriteHandler {
private HoodieData<WriteStatus>
streamWriteToMetadataTable(HoodieData<WriteStatus> dataTableWriteStatuses,
HoodieTableMetadataWriter metadataWriter,
HoodieTable table,
- String
instantTime) {
- HoodieData<WriteStatus> allWriteStatus = dataTableWriteStatuses;
+ String
instantTime,
+ boolean
enforceCoalesceWithRepartition,
+ int
coalesceDivisorForDataTableWrites) {
HoodieData<WriteStatus> mdtWriteStatuses =
metadataWriter.streamWriteToMetadataPartitions(dataTableWriteStatuses,
instantTime);
- allWriteStatus = allWriteStatus.union(mdtWriteStatuses);
- allWriteStatus.persist("MEMORY_AND_DISK_SER", table.getContext(),
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
instantTime));
- return allWriteStatus;
+ mdtWriteStatuses.persist("MEMORY_AND_DISK_SER", table.getContext(),
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
instantTime));
+ HoodieData<WriteStatus> coalescedDataWriteStatuses;
+ int coalesceParallelism = Math.max(1,
dataTableWriteStatuses.getNumPartitions() / coalesceDivisorForDataTableWrites);
+ if (enforceCoalesceWithRepartition) {
+ // with bulk insert and NONE sort mode, simple coalesce on datatable
write statuses also impact record key generation stages.
+ // and hence we are adding a partitioner to cut the chain so that
coalesce(1) here does not impact record key generation stages.
+ coalescedDataWriteStatuses =
HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(dataTableWriteStatuses)
+ .mapToPair((PairFunction<WriteStatus, String, WriteStatus>)
writeStatus -> new Tuple2(writeStatus.getStat().getPath(), writeStatus))
+ .partitionBy(new CoalescingPartitioner(coalesceParallelism))
+ .map((Function<Tuple2<String, WriteStatus>, WriteStatus>) entry ->
entry._2));
+ } else {
+ coalescedDataWriteStatuses =
dataTableWriteStatuses.coalesce(coalesceParallelism);
+ }
+ return coalescedDataWriteStatuses.union(mdtWriteStatuses);
}
/**
@@ -103,7 +126,8 @@ public class StreamingMetadataWriteHandler {
*
* @return The metadata writer option.
*/
- private synchronized Option<HoodieTableMetadataWriter>
getMetadataWriter(String triggeringInstant, HoodieTable table) {
+ @VisibleForTesting
+ synchronized Option<HoodieTableMetadataWriter> getMetadataWriter(String
triggeringInstant, HoodieTable table) {
if
(!table.getMetaClient().getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT))
{
return Option.empty();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 193c7c955da8..5a4043f659fe 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -209,4 +209,9 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
public HoodieData<T> repartition(int parallelism) {
return HoodieJavaRDD.of(rddData.repartition(parallelism));
}
+
+ @Override
+ public HoodieData<T> coalesce(int parallelism) {
+ return HoodieJavaRDD.of(rddData.coalesce(parallelism));
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCoalescingPartitioner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCoalescingPartitioner.java
new file mode 100644
index 000000000000..0f3b972574ce
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCoalescingPartitioner.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static
org.apache.hudi.client.TestCoalescingPartitioner.FlatMapFunc.getWriteStatusForPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestCoalescingPartitioner extends HoodieClientTestBase {
+
+ @Test
+ public void simpleCoalescingPartitionerTest() {
+ int numPartitions = 100;
+ HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize(
+ IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()),
numPartitions));
+
+ // 100 keys spread across 10 partitions.
+ CoalescingPartitioner coalescingPartitioner = new
CoalescingPartitioner(10);
+ assertEquals(10, coalescingPartitioner.numPartitions());
+ rddData.collectAsList().forEach(entry -> {
+ assertEquals(entry.hashCode() % 10,
coalescingPartitioner.getPartition(entry));
+ });
+
+ // 1 partition
+ CoalescingPartitioner coalescingPartitioner1 = new
CoalescingPartitioner(1);
+ assertEquals(1, coalescingPartitioner1.numPartitions());
+ rddData.collectAsList().forEach(entry -> {
+ assertEquals(0, coalescingPartitioner1.getPartition(entry));
+ });
+
+ // empty rdd
+ rddData = HoodieJavaRDD.of(jsc.emptyRDD());
+ CoalescingPartitioner coalescingPartitioner2 = new
CoalescingPartitioner(1);
+ assertEquals(1, coalescingPartitioner2.numPartitions());
+ rddData.collectAsList().forEach(entry -> {
+ // since there is only one partition, any getPartition will return just
the same partition index
+ assertEquals(0, coalescingPartitioner2.getPartition(entry));
+ });
+ }
+
+ private static Stream<Arguments> coalesceTestArgs() {
+ return Arrays.stream(new Object[][] {
+ {100, 1},
+ {1, 1},
+ {1000, 10},
+ {100, 7},
+ {1200, 50},
+ {1000, 23},
+ {10, 2}
+ }).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("coalesceTestArgs")
+ public void testCoalescingPartitionerWithRDD(int inputNumPartitions, int
targetPartitions) {
+ int totalHudiPartitions = Math.max(1, inputNumPartitions /
targetPartitions);
+ String partitionPathPrefix = "pPath";
+ List<String> partitionPaths = IntStream.rangeClosed(1,
totalHudiPartitions).boxed().map(integer -> partitionPathPrefix + "_" +
integer).collect(Collectors.toList());
+ List<WriteStatus> writeStatuses = new
ArrayList<>(jsc.parallelize(partitionPaths, partitionPaths.size()).flatMap(new
FlatMapFunc(targetPartitions != 1
+ ? inputNumPartitions / totalHudiPartitions :
targetPartitions)).collect());
+
+ // for pending files, add to last partition.
+ if (targetPartitions != 1 && inputNumPartitions - writeStatuses.size() >
0) {
+ writeStatuses.addAll(getWriteStatusForPartition("/tmp/",
partitionPathPrefix + "_" + (totalHudiPartitions - 1), inputNumPartitions -
writeStatuses.size()));
+ }
+
+ assertEquals(writeStatuses.size(), inputNumPartitions);
+
+ JavaRDD<WriteStatus> data = jsc.parallelize(writeStatuses,
inputNumPartitions);
+ JavaRDD<WriteStatus> coalescedData = data.mapToPair(new
PairFunc()).partitionBy(new CoalescingPartitioner(targetPartitions)).map(new
MapFunc());
+ coalescedData.cache();
+
+ List<Pair<Integer, Integer>> countsPerPartition =
coalescedData.mapPartitionsWithIndex((partitionIndex, rows) -> {
+ int count = 0;
+ while (rows.hasNext()) {
+ rows.next();
+ count++;
+ }
+ return Collections.singletonList(Pair.of(partitionIndex,
count)).iterator();
+ }, true).collect();
+
+ assertEquals(targetPartitions, countsPerPartition.size());
+ // lets validate that atleast we have 50% of data in each spark partition
compared to ideal scenario (we can't assume hash of strings will evenly
distribute).
+ countsPerPartition.forEach(pair -> {
+ int numElements = pair.getValue();
+ int idealExpectedCount = inputNumPartitions / targetPartitions;
+ assertTrue(numElements > idealExpectedCount * 0.5);
+ });
+ assertEquals(targetPartitions, coalescedData.getNumPartitions());
+ List<WriteStatus> result = new ArrayList<>(coalescedData.collect());
+ // lets validate all paths from input are present in output as well.
+ List<String> expectedInputPaths = writeStatuses.stream().map(writeStatus
-> writeStatus.getStat().getPath()).collect(Collectors.toList());
+ List<String> actualPaths = result.stream().map(writeStatus ->
writeStatus.getStat().getPath()).collect(Collectors.toList());
+ Collections.sort(expectedInputPaths);
+ Collections.sort(actualPaths);
+ assertEquals(expectedInputPaths, actualPaths);
+ coalescedData.unpersist();
+ }
+
+ static class FlatMapFunc implements FlatMapFunction<String, WriteStatus> {
+
+ private int numWriteStatuses;
+
+ FlatMapFunc(int numWriteStatuses) {
+ this.numWriteStatuses = numWriteStatuses;
+ }
+
+ @Override
+ public Iterator<WriteStatus> call(String s) throws Exception {
+ return getWriteStatusForPartition("/tmp", s,
numWriteStatuses).iterator();
+ }
+
+ static List<WriteStatus> getWriteStatusForPartition(String basePath,
String partititionPath, int numWriteStatuses) {
+ String randomPrefix = UUID.randomUUID().toString() + "_";
+ List<WriteStatus> writeStatuses = new ArrayList<>();
+ for (int i = 0; i < numWriteStatuses; i++) {
+ String fileName = randomPrefix + i;
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(partititionPath);
+ String fullFilePath = basePath + "/" + partititionPath + "/" +
fileName;
+ writeStat.setPath(fullFilePath);
+ WriteStatus writeStatus = new WriteStatus();
+ writeStatus.setStat(writeStat);
+ writeStatuses.add(writeStatus);
+ }
+ return writeStatuses;
+ }
+ }
+
+ static class PairFunc implements PairFunction<WriteStatus, String,
WriteStatus> {
+ @Override
+ public Tuple2<String, WriteStatus> call(WriteStatus writeStatus) throws
Exception {
+ return Tuple2.apply(writeStatus.getStat().getPath(), writeStatus);
+ }
+ }
+
+ static class MapFunc implements Function<Tuple2<String, WriteStatus>,
WriteStatus> {
+
+ @Override
+ public WriteStatus call(Tuple2<String, WriteStatus> booleanIntegerTuple2)
throws Exception {
+ return booleanIntegerTuple2._2;
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
index 238bdabc4003..2eb4f852f020 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
@@ -19,20 +19,29 @@
package org.apache.hudi.client;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.avro.generic.GenericRecord;
@@ -42,20 +51,29 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.MockedStatic;
import java.io.IOException;
import java.net.URI;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness {
@@ -205,6 +223,62 @@ class TestSparkRDDWriteClient extends
SparkClientFunctionalTestHarness {
testAndAssertCompletionIsEarlierThanRequested(basePath, props);
}
+ private static Stream<Arguments> streamingMetadataWritesTestArgs() {
+ return Arrays.stream(new Object[][] {
+ {true, "COMPACT", "NONE", false, false},
+ {true, "COMPACT", "NONE", true, false},
+ {true, "COMPACT", "GLOBAL_SORT", true, false},
+ {true, "COMPACT", "GLOBAL_SORT", false, false},
+ {true, "LOG_COMPACT", "NONE", true, false},
+ {true, "LOG_COMPACT", "NONE", false, false},
+ {true, "LOG_COMPACT", "GLOBAL_SORT", true, false},
+ {true, "LOG_COMPACT", "GLOBAL_SORT", false, false},
+ {true, "CLUSTER", "NONE", true, false},
+ {true, "CLUSTER", "NONE", false, true},
+ {true, "CLUSTER", "GLOBAL_SORT", true, false},
+ {true, "CLUSTER", "GLOBAL_SORT", false, false},
+ }).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("streamingMetadataWritesTestArgs")
+ public void testStreamingMetadataWrites(boolean streamingWritesEnable,
WriteOperationType writeOperationType,
+ String bulkInsertSortMode, boolean
setSortColsinClusteringPlan,
+ boolean
expectedEnforceRepartitionWithCoalesce) throws IOException {
+ HoodieTableMetaClient metaClient =
+ getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(),
new Properties());
+ HoodieWriteConfig writeConfig = getConfigBuilder(true)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withStreamingWriteEnabled(streamingWritesEnable).build())
+ .withBulkInsertSortMode(bulkInsertSortMode)
+ .withPath(metaClient.getBasePath())
+ .build();
+ MockStreamingMetadataWriteHandler mockMetadataWriteHandler = new
MockStreamingMetadataWriteHandler();
+
+ try (MockedStatic<ClusteringUtils> mocked =
mockStatic(ClusteringUtils.class);) {
+ HoodieClusteringPlan clusteringPlan = mock(HoodieClusteringPlan.class);
+ HoodieClusteringStrategy clusteringStrategy =
mock(HoodieClusteringStrategy.class);
+ when(clusteringPlan.getStrategy()).thenReturn(clusteringStrategy);
+ Map<String, String> strategyParams = new HashMap<>();
+ if (setSortColsinClusteringPlan) {
+ strategyParams.put(PLAN_STRATEGY_SORT_COLUMNS.key(), "abc");
+ }
+ when(clusteringStrategy.getStrategyParams()).thenReturn(strategyParams);
+
+ HoodieInstant hoodieInstant = mock(HoodieInstant.class);
+ mocked.when(() -> ClusteringUtils.getClusteringPlan(any(),
any())).thenReturn(Option.of(Pair.of(hoodieInstant, clusteringPlan)));
+ mocked.when(() -> ClusteringUtils.getRequestedClusteringInstant(any(),
any(), any())).thenReturn(Option.of(hoodieInstant));
+
+ SparkRDDTableServiceClient tableServiceClient = new
SparkRDDTableServiceClient(context(), writeConfig, Option.empty(),
mockMetadataWriteHandler);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata =
mock(HoodieWriteMetadata.class);
+ HoodieData<WriteStatus> hoodieData = mock(HoodieData.class);
+ when(writeMetadata.getWriteStatuses()).thenReturn(hoodieData);
+ HoodieTable table = mock(HoodieTable.class);
+ when(table.getMetaClient()).thenReturn(metaClient);
+ tableServiceClient.partialUpdateTableMetadata(table, writeMetadata,
"00001", writeOperationType);
+ assertEquals(expectedEnforceRepartitionWithCoalesce,
mockMetadataWriteHandler.enforceCoalesceWithRepartition);
+ }
+ }
+
private void testAndAssertCompletionIsEarlierThanRequested(String basePath,
Properties properties) throws IOException {
HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(),
basePath, properties);
@@ -225,4 +299,18 @@ class TestSparkRDDWriteClient extends
SparkClientFunctionalTestHarness {
});
}
+ class MockStreamingMetadataWriteHandler extends
StreamingMetadataWriteHandler {
+
+ boolean enforceCoalesceWithRepartition;
+ int coalesceDivisorForDataTableWrites;
+
+ @Override
+ public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable
table, HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime,
+ boolean
enforceCoalesceWithRepartition, int coalesceDivisorForDataTableWrites) {
+ this.enforceCoalesceWithRepartition = enforceCoalesceWithRepartition;
+ this.coalesceDivisorForDataTableWrites =
coalesceDivisorForDataTableWrites;
+ return dataTableWriteStatuses;
+ }
+ }
+
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java
new file mode 100644
index 000000000000..39ac8bdba74a
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStreamingMetadataWriteHandler extends
SparkClientFunctionalTestHarness {
+
+ private final HoodieTable<?, ?, ?, ?> mockHoodieTable =
mock(HoodieTable.class);
+ private HoodieTableMetaClient metaClient;
+
+ @BeforeEach
+ void setUp() {
+ metaClient = mock(HoodieTableMetaClient.class);
+ when(metaClient.getBasePath()).thenReturn(new StoragePath("/tmp/"));
+ when(mockHoodieTable.getMetaClient()).thenReturn(metaClient);
+ HoodieEngineContext engineContext = mock(HoodieEngineContext.class);
+ when(mockHoodieTable.getContext()).thenReturn(engineContext);
+ }
+
+ private static Stream<Arguments> coalesceDivisorTestArgs() {
+ return Arrays.stream(new Object[][] {
+ {100, 20, 1000, true},
+ {100, 20, 1000, false},
+ {1, 1, 1000, true},
+ {1, 1, 1000, false},
+ {10000, 100, 5000, true},
+ {10000, 100, 5000, true},
+ {10000, 100, 20000, true},
+ {10000, 100, 20000, true}
+ }).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("coalesceDivisorTestArgs")
+ public void testCoalesceDividentConfig(int numDataTableWriteStatuses, int
numMdtWriteStatus, int coalesceDividentForDataTableWrites,
+ boolean
enforceCoalesceWithRepartition) {
+ HoodieData<WriteStatus> dataTableWriteStatus =
mockWriteStatuses(numDataTableWriteStatuses);
+ HoodieData<WriteStatus> mdtWriteStatus =
mockWriteStatuses(numMdtWriteStatus);
+ HoodieTableMetadataWriter mdtWriter =
mock(HoodieTableMetadataWriter.class);
+ when(mdtWriter.streamWriteToMetadataPartitions(any(),
any())).thenReturn(mdtWriteStatus);
+ StreamingMetadataWriteHandler metadataWriteHandler = new
MockStreamingMetadataWriteHandler(mdtWriter);
+
+ HoodieData<WriteStatus> allWriteStatuses =
metadataWriteHandler.streamWriteToMetadataTable(mockHoodieTable,
dataTableWriteStatus, "00001", enforceCoalesceWithRepartition,
+ coalesceDividentForDataTableWrites);
+ assertEquals(Math.max(1, numDataTableWriteStatuses /
coalesceDividentForDataTableWrites) + numMdtWriteStatus,
allWriteStatuses.getNumPartitions());
+ }
+
+ private HoodieData<WriteStatus> mockWriteStatuses(int size) {
+ List<WriteStatus> writeStatuses = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ writeStatuses.add(mock(WriteStatus.class));
+ }
+ return HoodieJavaRDD.of(jsc().parallelize(writeStatuses, size));
+ }
+
+ class MockStreamingMetadataWriteHandler extends
StreamingMetadataWriteHandler {
+
+ private HoodieTableMetadataWriter mdtWriter;
+
+ MockStreamingMetadataWriteHandler(HoodieTableMetadataWriter mdtWriter) {
+ this.mdtWriter = mdtWriter;
+ }
+
+ @Override
+ synchronized Option<HoodieTableMetadataWriter> getMetadataWriter(String
triggeringInstant, HoodieTable table) {
+ return Option.of(mdtWriter);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
index 2ba497264a5b..16380e18cdd5 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
@@ -71,6 +71,31 @@ public class TestHoodieJavaRDD extends HoodieClientTestBase {
assertEquals(11, shuffleRDD.deduceNumPartitions());
}
+ @Test
+ public void testRepartitionAndCoalesce() {
+ int numPartitions = 100;
+ // rdd parallelize
+ HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize(
+ IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()),
numPartitions));
+ assertEquals(100, rddData.getNumPartitions());
+
+ // repartition by 10.
+ rddData = rddData.repartition(10);
+ assertEquals(10, rddData.getNumPartitions());
+
+ // coalesce to 5
+ rddData = rddData.coalesce(5);
+ assertEquals(5, rddData.getNumPartitions());
+
+ // repartition to 20
+ rddData = rddData.repartition(20);
+ assertEquals(20, rddData.getNumPartitions());
+
+ // but colesce may not expand the num partitions
+ rddData = rddData.coalesce(40);
+ assertEquals(20, rddData.getNumPartitions());
+ }
+
@Test
void testMapPartitionsWithCloseable() {
String partition1 = "partition1";
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 70b0365dd67e..96c95708ab75 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -81,6 +81,15 @@ public final class HoodieMetadataConfig extends HoodieConfig
{
+ "in streaming manner rather than two disjoint writes. By default "
+ "streaming writes to metadata table is enabled for SPARK engine
for incremental operations and disabled for all other cases.");
+ public static final ConfigProperty<Integer>
STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR = ConfigProperty
+ .key(METADATA_PREFIX +
".streaming.write.datatable.write.statuses.coalesce.divisor")
+ .defaultValue(5000)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("When streaming writes to metadata table is enabled
via hoodie.metadata.streaming.write.enabled, the data table write statuses are
unioned "
+ + "with metadata table write statuses before triggering the entire
write dag. The data table write statuses will be coalesce down to the number of
write statuses "
+ + "divided by the specified divisor to avoid triggering thousands of
no-op tasks for the data table writes which have their status cached.");
+
public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true;
// Enable metrics for internal Metadata Table
@@ -609,6 +618,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getBoolean(STREAMING_WRITE_ENABLED);
}
+ public int getStreamingWritesCoalesceDivisorForDataTableWrites() {
+ return
getInt(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR);
+ }
+
public boolean isBloomFilterIndexEnabled() {
return getBooleanOrDefault(ENABLE_METADATA_INDEX_BLOOM_FILTER);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index a0b2ce75e967..cd95ee23a7e6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -220,6 +220,15 @@ public interface HoodieData<T> extends Serializable {
*/
HoodieData<T> repartition(int parallelism);
+ /**
+ * Coalesces underlying collection (if applicable) making sure new {@link
HoodieData} has
+ * exactly {@code parallelism} partitions or less.
+ *
+ * @param parallelism target number of partitions in the underlying
collection
+ * @return {@link HoodieData<T>} holding coalesced collection
+ */
+ HoodieData<T> coalesce(int parallelism);
+
default <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O>
keyGetter, int parallelism) {
return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
.reduceByKey((value1, value2) -> value1, parallelism)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index 7fd16af82382..6da031e1a5de 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -197,6 +197,12 @@ public class HoodieListData<T> extends
HoodieBaseListData<T> implements HoodieDa
return this;
}
+ @Override
+ public HoodieData<T> coalesce(int parallelism) {
+ // no op
+ return this;
+ }
+
@Override
public boolean isEmpty() {
return super.isEmpty();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
index eb97a4c4a345..73d705df907d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
@@ -59,4 +59,27 @@ class TestHoodieMetadataConfig {
.build();
assertEquals(-50,
configWithNegativeValue.getRecordPreparationParallelism());
}
+
+ @Test
+ void testStreamingWritesCoalesceDivisorForDataTableWrites() {
+ // Test default value
+ HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build();
+ assertEquals(5000,
config.getStreamingWritesCoalesceDivisorForDataTableWrites());
+
+ // Test custom value
+ Properties props = new Properties();
+
props.put(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR.key(),
"1");
+ HoodieMetadataConfig configWithCustomValue =
HoodieMetadataConfig.newBuilder()
+ .fromProperties(props)
+ .build();
+ assertEquals(1,
configWithCustomValue.getStreamingWritesCoalesceDivisorForDataTableWrites());
+
+ Properties propsZero = new Properties();
+
propsZero.put(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR.key(),
"10000");
+ HoodieMetadataConfig configWithZeroValue =
HoodieMetadataConfig.newBuilder()
+ .fromProperties(propsZero)
+ .build();
+ assertEquals(10000,
configWithZeroValue.getStreamingWritesCoalesceDivisorForDataTableWrites());
+ }
+
}