This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e4cc7f05c9e [HUDI-8215] Support composite compaction strategy (#11963)
e4cc7f05c9e is described below
commit e4cc7f05c9e1a860c9f95a914ff15cb3c2a4578c
Author: TheR1sing3un <[email protected]>
AuthorDate: Mon Sep 23 13:07:05 2024 +0800
[HUDI-8215] Support composite compaction strategy (#11963)
---
.../apache/hudi/config/HoodieCompactionConfig.java | 13 +++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 7 ++-
.../BaseHoodieCompactionPlanGenerator.java | 4 +-
.../generators/HoodieCompactionPlanGenerator.java | 11 +++-
.../strategy/CompositeCompactionStrategy.java | 70 ++++++++++++++++++++++
.../strategy/TestHoodieCompactionStrategy.java | 53 +++++++++++++---
.../apache/hudi/cli/HDFSParquetImporterUtils.java | 6 +-
.../org/apache/hudi/utilities/UtilHelpers.java | 6 +-
8 files changed, 148 insertions(+), 22 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 497945f7e9b..1c595a57c41 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -146,7 +146,7 @@ public class HoodieCompactionConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Compaction strategy decides which file groups are
picked up for "
+ "compaction during each compaction run. By default. Hudi picks the
log file "
- + "with most accumulated unmerged data");
+ + "with most accumulated unmerged data. The strategy can be composed
with multiple strategies by concatenating the class names with ','.");
public static final ConfigProperty<String>
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty
.key("hoodie.compaction.daybased.target.partitions")
@@ -408,8 +408,15 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder withCompactionStrategy(CompactionStrategy
compactionStrategy) {
- compactionConfig.setValue(COMPACTION_STRATEGY,
compactionStrategy.getClass().getName());
+ public Builder withCompactionStrategy(CompactionStrategy...
compactionStrategies) {
+ StringBuilder compactionStrategyBuilder = new StringBuilder();
+ for (CompactionStrategy compactionStrategy : compactionStrategies) {
+
compactionStrategyBuilder.append(compactionStrategy.getClass().getName()).append(",");
+ }
+ if (compactionStrategyBuilder.length() > 0) {
+
compactionStrategyBuilder.deleteCharAt(compactionStrategyBuilder.length() - 1);
+ }
+ compactionConfig.setValue(COMPACTION_STRATEGY,
compactionStrategyBuilder.toString());
return this;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 19b3aa09082..68efe732b9a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -83,6 +83,7 @@ import
org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import
org.apache.hudi.table.action.compact.strategy.CompositeCompactionStrategy;
import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.orc.CompressionKind;
@@ -1655,7 +1656,11 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public CompactionStrategy getCompactionStrategy() {
- return
ReflectionUtils.loadClass(getString(HoodieCompactionConfig.COMPACTION_STRATEGY));
+ String compactionStrategiesStr =
getString(HoodieCompactionConfig.COMPACTION_STRATEGY);
+ String[] compactionStrategyArr = compactionStrategiesStr.split(",");
+ List<CompactionStrategy> compactionStrategies =
Arrays.stream(compactionStrategyArr)
+ .map(className -> (CompactionStrategy)
ReflectionUtils.loadClass(className)).collect(Collectors.toList());
+ return compactionStrategies.size() == 1 ? compactionStrategies.get(0) :
new CompositeCompactionStrategy(compactionStrategies);
}
public Long getTargetIOPerCompactionInMB() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index f7390cdd56f..a84d6ad7b63 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -88,7 +88,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
int allPartitionSize = partitionPaths.size();
// filter the partition paths if needed to reduce list status
- partitionPaths = filterPartitionPathsByStrategy(writeConfig,
partitionPaths);
+ partitionPaths = filterPartitionPathsByStrategy(partitionPaths);
LOG.info("Strategy: {} matched {} partition paths from all {} partitions",
writeConfig.getCompactionStrategy().getClass().getSimpleName(),
partitionPaths.size(), allPartitionSize);
if (partitionPaths.isEmpty()) {
@@ -185,7 +185,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
protected abstract boolean filterLogCompactionOperations();
- protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig
writeConfig, List<String> partitionPaths) {
+ protected List<String> filterPartitionPathsByStrategy(List<String>
partitionPaths) {
return partitionPaths;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
index 445c8eb7756..a93ece710b0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,21 +41,25 @@ public class HoodieCompactionPlanGenerator<T extends
HoodieRecordPayload, I, K,
private static final Logger LOG =
LoggerFactory.getLogger(HoodieCompactionPlanGenerator.class);
+ private final CompactionStrategy compactionStrategy;
+
public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
+ this.compactionStrategy = writeConfig.getCompactionStrategy();
+ LOG.info("Compaction Strategy used is: " + compactionStrategy.toString());
}
@Override
protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient
metaClient, List<HoodieCompactionOperation> operations) {
// Filter the compactions with the passed in filter. This lets us choose
most effective
// compactions only
- return
writeConfig.getCompactionStrategy().generateCompactionPlan(writeConfig,
operations,
+ return compactionStrategy.generateCompactionPlan(writeConfig, operations,
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
}
@Override
- protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig
writeConfig, List<String> partitionPaths) {
- return
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitionPaths);
+ protected List<String> filterPartitionPathsByStrategy(List<String>
partitionPaths) {
+ return compactionStrategy.filterPartitionPaths(writeConfig,
partitionPaths);
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
new file mode 100644
index 00000000000..da90269509d
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.compact.strategy;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.List;
+
+/**
+ * CompositeCompactionStrategy chains multiple compaction strategies together.
+ * Multiple strategies perform like a pipeline with `and` condition instead of
`or`.
+ * The order of the strategies in the chain is important as the output of one
strategy is passed as input to the next.
+ */
+public class CompositeCompactionStrategy extends CompactionStrategy {
+
+ private List<CompactionStrategy> strategies;
+
+ public CompositeCompactionStrategy(List<CompactionStrategy> strategies) {
+ this.strategies = strategies;
+ }
+
+ @Override
+ public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig
writeConfig, List<HoodieCompactionOperation> operations,
List<HoodieCompactionPlan> pendingCompactionPlans) {
+ List<HoodieCompactionOperation> finalOperations = operations;
+ for (CompactionStrategy strategy : strategies) {
+ finalOperations = strategy.orderAndFilter(writeConfig, finalOperations,
pendingCompactionPlans);
+ }
+ return finalOperations;
+ }
+
+ @Override
+ public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig,
List<String> allPartitionPaths) {
+ List<String> finalPartitionPaths = allPartitionPaths;
+ for (CompactionStrategy strategy : strategies) {
+ finalPartitionPaths = strategy.filterPartitionPaths(writeConfig,
finalPartitionPaths);
+ }
+ return finalPartitionPaths;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("CompactionStrategyChain [");
+ for (CompactionStrategy strategy : strategies) {
+ builder.append(strategy.getClass());
+ builder.append(" ===> ");
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index a67f51face4..a368d43da25 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -63,7 +63,7 @@ public class TestHoodieCompactionStrategy {
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build();
List<HoodieCompactionOperation> operations =
createCompactionOperations(writeConfig, sizesMap);
- List<HoodieCompactionOperation> returned =
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+ List<HoodieCompactionOperation> returned =
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new
ArrayList<>());
assertEquals(operations, returned, "UnBounded should not re-order or
filter");
}
@@ -79,7 +79,7 @@ public class TestHoodieCompactionStrategy {
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
.build();
List<HoodieCompactionOperation> operations =
createCompactionOperations(writeConfig, sizesMap);
- List<HoodieCompactionOperation> returned =
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+ List<HoodieCompactionOperation> returned =
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new
ArrayList<>());
assertTrue(returned.size() < operations.size(), "BoundedIOCompaction
should have resulted in fewer compactions");
assertEquals(2, returned.size(), "BoundedIOCompaction should have resulted
in 2 compactions being chosen");
@@ -103,7 +103,7 @@ public class TestHoodieCompactionStrategy {
.withLogFileSizeThresholdBasedCompaction(100 * 1024 *
1024).build())
.build();
List<HoodieCompactionOperation> operations =
createCompactionOperations(writeConfig, sizesMap);
- List<HoodieCompactionOperation> returned =
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+ List<HoodieCompactionOperation> returned =
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new
ArrayList<>());
assertTrue(returned.size() < operations.size(),
"LogFileSizeBasedCompactionStrategy should have resulted in fewer
compactions");
@@ -137,7 +137,7 @@ public class TestHoodieCompactionStrategy {
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(1).build()).build();
- List<String> filterPartitions = strategy.filterPartitionPaths(writeConfig,
Arrays.asList(partitionPaths));
+ List<String> filterPartitions =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
Arrays.asList(partitionPaths));
assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy
should have resulted in fewer partitions");
List<HoodieCompactionOperation> operations =
createCompactionOperationsForPartition(writeConfig, sizesMap,
keyToPartitionMap, filterPartitions);
@@ -182,11 +182,11 @@ public class TestHoodieCompactionStrategy {
.build())
.build();
- List<String> filterPartitions = strategy.filterPartitionPaths(writeConfig,
Arrays.asList(partitionPaths));
+ List<String> filterPartitions =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
Arrays.asList(partitionPaths));
assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy
should have resulted in fewer partitions");
List<HoodieCompactionOperation> operations =
createCompactionOperationsForPartition(writeConfig, sizesMap,
keyToPartitionMap, filterPartitions);
- List<HoodieCompactionOperation> returned =
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+ List<HoodieCompactionOperation> returned =
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new
ArrayList<>());
assertEquals(1, returned.size(),
"DayBasedAndBoundedIOCompactionStrategy should have resulted in fewer
compactions");
@@ -241,7 +241,7 @@ public class TestHoodieCompactionStrategy {
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
List<HoodieCompactionOperation> operations =
createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
- List<HoodieCompactionOperation> returned =
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+ List<HoodieCompactionOperation> returned =
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new
ArrayList<>());
assertTrue(returned.size() < operations.size(),
"BoundedPartitionAwareCompactionStrategy should have resulted in fewer
compactions");
@@ -290,7 +290,7 @@ public class TestHoodieCompactionStrategy {
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
List<HoodieCompactionOperation> operations =
createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
- List<HoodieCompactionOperation> returned =
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+ List<HoodieCompactionOperation> returned =
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new
ArrayList<>());
assertTrue(returned.size() < operations.size(),
"UnBoundedPartitionAwareCompactionStrategy should not include last "
@@ -312,7 +312,7 @@ public class TestHoodieCompactionStrategy {
.withCompactionLogFileNumThreshold(2).build())
.build();
List<HoodieCompactionOperation> operations =
createCompactionOperations(writeConfig, sizesMap);
- List<HoodieCompactionOperation> returned =
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+ List<HoodieCompactionOperation> returned =
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new
ArrayList<>());
assertTrue(returned.size() < operations.size(),
"LogFileLengthBasedCompactionStrategy should have resulted in fewer
compactions");
@@ -331,8 +331,43 @@ public class TestHoodieCompactionStrategy {
// TOTAL_IO_MB: ( 120 + 90 ) * 2 + 521 + 521 + 60 + 10 + 80
assertEquals(1594, (long) returnedSize,
"Should chose the first 2 compactions which should result in a total
IO of 1594 MB");
+ }
+ @Test
+ public void testCompositeCompactionStrategy() {
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+ HoodieCompactionConfig.newBuilder().withCompactionStrategy(new
NumStrategy(), new PrefixStrategy()).withTargetIOPerCompactionInMB(1024)
+ .withCompactionLogFileNumThreshold(2).build()).build();
+ List<String> allPartitionPaths = Arrays.asList(
+ "2017/01/01", "2018/01/02", "2017/02/01"
+ );
+ List<String> returned =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
allPartitionPaths);
+ // filter by num first and then filter by prefix
+ assertEquals(1, returned.size());
+ assertEquals("2017/01/01", returned.get(0));
+
+ writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+ HoodieCompactionConfig.newBuilder().withCompactionStrategy(new
PrefixStrategy(), new NumStrategy()).withTargetIOPerCompactionInMB(1024)
+ .withCompactionLogFileNumThreshold(2).build()).build();
+ returned =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
allPartitionPaths);
+ // filter by prefix first and then filter by num
+ assertEquals(2, returned.size());
+ assertEquals("2017/01/01", returned.get(0));
+ assertEquals("2017/02/01", returned.get(1));
+ }
+ public static class NumStrategy extends CompactionStrategy {
+ @Override
+ public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig,
List<String> allPartitionPaths) {
+ return allPartitionPaths.stream().limit(2).collect(Collectors.toList());
+ }
+ }
+
+ public static class PrefixStrategy extends CompactionStrategy {
+ @Override
+ public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig,
List<String> allPartitionPaths) {
+ return allPartitionPaths.stream().filter(s ->
s.startsWith("2017")).collect(Collectors.toList());
+ }
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
index d178fdd8e0d..839b5e0d9de 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
@@ -40,6 +40,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -277,9 +278,10 @@ public class HDFSParquetImporterUtils implements
Serializable {
*/
public static SparkRDDWriteClient<HoodieRecordPayload>
createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties
properties) {
- HoodieCompactionConfig compactionConfig = compactionStrategyClass
+ Option<CompactionStrategy> strategyOpt =
compactionStrategyClass.map(ReflectionUtils::loadClass);
+ HoodieCompactionConfig compactionConfig = strategyOpt
.map(strategy ->
HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
-
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
+ .withCompactionStrategy(strategy).build())
.orElseGet(() ->
HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 8cec63917b8..61976cbd724 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -53,6 +53,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StorageSchemes;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
@@ -392,9 +393,10 @@ public class UtilHelpers {
*/
public static SparkRDDWriteClient<HoodieRecordPayload>
createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
int parallelism, Option<String> compactionStrategyClass, TypedProperties
properties) {
- HoodieCompactionConfig compactionConfig = compactionStrategyClass
+ Option<CompactionStrategy> strategyOpt =
compactionStrategyClass.map(ReflectionUtils::loadClass);
+ HoodieCompactionConfig compactionConfig = strategyOpt
.map(strategy ->
HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
-
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
+ .withCompactionStrategy(strategy).build())
.orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)