This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e4cc7f05c9e [HUDI-8215] Support composite compaction strategy (#11963)
e4cc7f05c9e is described below

commit e4cc7f05c9e1a860c9f95a914ff15cb3c2a4578c
Author: TheR1sing3un <[email protected]>
AuthorDate: Mon Sep 23 13:07:05 2024 +0800

    [HUDI-8215] Support composite compaction strategy (#11963)
---
 .../apache/hudi/config/HoodieCompactionConfig.java | 13 +++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  7 ++-
 .../BaseHoodieCompactionPlanGenerator.java         |  4 +-
 .../generators/HoodieCompactionPlanGenerator.java  | 11 +++-
 .../strategy/CompositeCompactionStrategy.java      | 70 ++++++++++++++++++++++
 .../strategy/TestHoodieCompactionStrategy.java     | 53 +++++++++++++---
 .../apache/hudi/cli/HDFSParquetImporterUtils.java  |  6 +-
 .../org/apache/hudi/utilities/UtilHelpers.java     |  6 +-
 8 files changed, 148 insertions(+), 22 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 497945f7e9b..1c595a57c41 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -146,7 +146,7 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Compaction strategy decides which file groups are 
picked up for "
           + "compaction during each compaction run. By default. Hudi picks the 
log file "
-          + "with most accumulated unmerged data");
+          + "with most accumulated unmerged data. The strategy can be composed 
with multiple strategies by concatenating the class names with ','.");
 
   public static final ConfigProperty<String> 
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty
       .key("hoodie.compaction.daybased.target.partitions")
@@ -408,8 +408,15 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withCompactionStrategy(CompactionStrategy 
compactionStrategy) {
-      compactionConfig.setValue(COMPACTION_STRATEGY, 
compactionStrategy.getClass().getName());
+    public Builder withCompactionStrategy(CompactionStrategy... 
compactionStrategies) {
+      StringBuilder compactionStrategyBuilder = new StringBuilder();
+      for (CompactionStrategy compactionStrategy : compactionStrategies) {
+        
compactionStrategyBuilder.append(compactionStrategy.getClass().getName()).append(",");
+      }
+      if (compactionStrategyBuilder.length() > 0) {
+        
compactionStrategyBuilder.deleteCharAt(compactionStrategyBuilder.length() - 1);
+      }
+      compactionConfig.setValue(COMPACTION_STRATEGY, 
compactionStrategyBuilder.toString());
       return this;
     }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 19b3aa09082..68efe732b9a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -83,6 +83,7 @@ import 
org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import 
org.apache.hudi.table.action.compact.strategy.CompositeCompactionStrategy;
 import org.apache.hudi.table.storage.HoodieStorageLayout;
 
 import org.apache.orc.CompressionKind;
@@ -1655,7 +1656,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public CompactionStrategy getCompactionStrategy() {
-    return 
ReflectionUtils.loadClass(getString(HoodieCompactionConfig.COMPACTION_STRATEGY));
+    String compactionStrategiesStr = 
getString(HoodieCompactionConfig.COMPACTION_STRATEGY);
+    String[] compactionStrategyArr = compactionStrategiesStr.split(",");
+    List<CompactionStrategy> compactionStrategies = 
Arrays.stream(compactionStrategyArr)
+        .map(className -> (CompactionStrategy) 
ReflectionUtils.loadClass(className)).collect(Collectors.toList());
+    return compactionStrategies.size() == 1 ? compactionStrategies.get(0) : 
new CompositeCompactionStrategy(compactionStrategies);
   }
 
   public Long getTargetIOPerCompactionInMB() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index f7390cdd56f..a84d6ad7b63 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -88,7 +88,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     int allPartitionSize = partitionPaths.size();
 
     // filter the partition paths if needed to reduce list status
-    partitionPaths = filterPartitionPathsByStrategy(writeConfig, 
partitionPaths);
+    partitionPaths = filterPartitionPathsByStrategy(partitionPaths);
     LOG.info("Strategy: {} matched {} partition paths from all {} partitions",
         writeConfig.getCompactionStrategy().getClass().getSimpleName(), 
partitionPaths.size(), allPartitionSize);
     if (partitionPaths.isEmpty()) {
@@ -185,7 +185,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
 
   protected abstract boolean filterLogCompactionOperations();
 
-  protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig 
writeConfig, List<String> partitionPaths) {
+  protected List<String> filterPartitionPathsByStrategy(List<String> 
partitionPaths) {
     return partitionPaths;
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
index 445c8eb7756..a93ece710b0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,21 +41,25 @@ public class HoodieCompactionPlanGenerator<T extends 
HoodieRecordPayload, I, K,
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieCompactionPlanGenerator.class);
 
+  private final CompactionStrategy compactionStrategy;
+
   public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig writeConfig) {
     super(table, engineContext, writeConfig);
+    this.compactionStrategy = writeConfig.getCompactionStrategy();
+    LOG.info("Compaction Strategy used is: " + compactionStrategy.toString());
   }
 
   @Override
   protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, List<HoodieCompactionOperation> operations) {
     // Filter the compactions with the passed in filter. This lets us choose 
most effective
     // compactions only
-    return 
writeConfig.getCompactionStrategy().generateCompactionPlan(writeConfig, 
operations,
+    return compactionStrategy.generateCompactionPlan(writeConfig, operations,
         
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
   }
 
   @Override
-  protected List<String> filterPartitionPathsByStrategy(HoodieWriteConfig 
writeConfig, List<String> partitionPaths) {
-    return 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitionPaths);
+  protected List<String> filterPartitionPathsByStrategy(List<String> 
partitionPaths) {
+    return compactionStrategy.filterPartitionPaths(writeConfig, 
partitionPaths);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
new file mode 100644
index 00000000000..da90269509d
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.compact.strategy;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.List;
+
+/**
+ * CompositeCompactionStrategy chains multiple compaction strategies together.
+ * Multiple strategies perform like a pipeline with `and` condition instead of 
`or`.
+ * The order of the strategies in the chain is important as the output of one 
strategy is passed as input to the next.
+ */
+public class CompositeCompactionStrategy extends CompactionStrategy {
+
+  private List<CompactionStrategy> strategies;
+
+  public CompositeCompactionStrategy(List<CompactionStrategy> strategies) {
+    this.strategies = strategies;
+  }
+
+  @Override
+  public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig, List<HoodieCompactionOperation> operations, 
List<HoodieCompactionPlan> pendingCompactionPlans) {
+    List<HoodieCompactionOperation> finalOperations = operations;
+    for (CompactionStrategy strategy : strategies) {
+      finalOperations = strategy.orderAndFilter(writeConfig, finalOperations, 
pendingCompactionPlans);
+    }
+    return finalOperations;
+  }
+
+  @Override
+  public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
+    List<String> finalPartitionPaths = allPartitionPaths;
+    for (CompactionStrategy strategy : strategies) {
+      finalPartitionPaths = strategy.filterPartitionPaths(writeConfig, 
finalPartitionPaths);
+    }
+    return finalPartitionPaths;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("CompactionStrategyChain [");
+    for (CompactionStrategy strategy : strategies) {
+      builder.append(strategy.getClass());
+      builder.append(" ===> ");
+    }
+    builder.append("]");
+    return builder.toString();
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index a67f51face4..a368d43da25 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -63,7 +63,7 @@ public class TestHoodieCompactionStrategy {
     HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp")
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap);
-    List<HoodieCompactionOperation> returned = 
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
     assertEquals(operations, returned, "UnBounded should not re-order or 
filter");
   }
 
@@ -79,7 +79,7 @@ public class TestHoodieCompactionStrategy {
             
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
         .build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap);
-    List<HoodieCompactionOperation> returned = 
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
 
     assertTrue(returned.size() < operations.size(), "BoundedIOCompaction 
should have resulted in fewer compactions");
     assertEquals(2, returned.size(), "BoundedIOCompaction should have resulted 
in 2 compactions being chosen");
@@ -103,7 +103,7 @@ public class TestHoodieCompactionStrategy {
                 .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 
1024).build())
         .build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap);
-    List<HoodieCompactionOperation> returned = 
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
 
     assertTrue(returned.size() < operations.size(),
         "LogFileSizeBasedCompactionStrategy should have resulted in fewer 
compactions");
@@ -137,7 +137,7 @@ public class TestHoodieCompactionStrategy {
         
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(1).build()).build();
 
-    List<String> filterPartitions = strategy.filterPartitionPaths(writeConfig, 
Arrays.asList(partitionPaths));
+    List<String> filterPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
Arrays.asList(partitionPaths));
     assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy 
should have resulted in fewer partitions");
 
     List<HoodieCompactionOperation> operations = 
createCompactionOperationsForPartition(writeConfig, sizesMap, 
keyToPartitionMap, filterPartitions);
@@ -182,11 +182,11 @@ public class TestHoodieCompactionStrategy {
             .build())
         .build();
 
-    List<String> filterPartitions = strategy.filterPartitionPaths(writeConfig, 
Arrays.asList(partitionPaths));
+    List<String> filterPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
Arrays.asList(partitionPaths));
     assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy 
should have resulted in fewer partitions");
 
     List<HoodieCompactionOperation> operations = 
createCompactionOperationsForPartition(writeConfig, sizesMap, 
keyToPartitionMap, filterPartitions);
-    List<HoodieCompactionOperation> returned = 
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
 
     assertEquals(1, returned.size(),
         "DayBasedAndBoundedIOCompactionStrategy should have resulted in fewer 
compactions");
@@ -241,7 +241,7 @@ public class TestHoodieCompactionStrategy {
         
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
-    List<HoodieCompactionOperation> returned = 
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
 
     assertTrue(returned.size() < operations.size(),
         "BoundedPartitionAwareCompactionStrategy should have resulted in fewer 
compactions");
@@ -290,7 +290,7 @@ public class TestHoodieCompactionStrategy {
         
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
-    List<HoodieCompactionOperation> returned = 
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
 
     assertTrue(returned.size() < operations.size(),
         "UnBoundedPartitionAwareCompactionStrategy should not include last "
@@ -312,7 +312,7 @@ public class TestHoodieCompactionStrategy {
                 .withCompactionLogFileNumThreshold(2).build())
         .build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap);
-    List<HoodieCompactionOperation> returned = 
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
 
     assertTrue(returned.size() < operations.size(),
         "LogFileLengthBasedCompactionStrategy should have resulted in fewer 
compactions");
@@ -331,8 +331,43 @@ public class TestHoodieCompactionStrategy {
     // TOTAL_IO_MB: ( 120 + 90 ) * 2 + 521 + 521 + 60 + 10 + 80
     assertEquals(1594, (long) returnedSize,
         "Should chose the first 2 compactions which should result in a total 
IO of 1594 MB");
+  }
 
+  @Test
+  public void testCompositeCompactionStrategy() {
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+        HoodieCompactionConfig.newBuilder().withCompactionStrategy(new 
NumStrategy(), new PrefixStrategy()).withTargetIOPerCompactionInMB(1024)
+            .withCompactionLogFileNumThreshold(2).build()).build();
+    List<String> allPartitionPaths = Arrays.asList(
+        "2017/01/01", "2018/01/02", "2017/02/01"
+    );
+    List<String> returned = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
allPartitionPaths);
+    // filter by num first and then filter by prefix
+    assertEquals(1, returned.size());
+    assertEquals("2017/01/01", returned.get(0));
+
+    writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+        HoodieCompactionConfig.newBuilder().withCompactionStrategy(new 
PrefixStrategy(), new NumStrategy()).withTargetIOPerCompactionInMB(1024)
+            .withCompactionLogFileNumThreshold(2).build()).build();
+    returned = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
allPartitionPaths);
+    // filter by prefix first and then filter by num
+    assertEquals(2, returned.size());
+    assertEquals("2017/01/01", returned.get(0));
+    assertEquals("2017/02/01", returned.get(1));
+  }
 
+  public static class NumStrategy extends CompactionStrategy {
+    @Override
+    public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
+      return allPartitionPaths.stream().limit(2).collect(Collectors.toList());
+    }
+  }
+
+  public static class PrefixStrategy extends CompactionStrategy {
+    @Override
+    public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
+      return allPartitionPaths.stream().filter(s -> 
s.startsWith("2017")).collect(Collectors.toList());
+    }
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
index d178fdd8e0d..839b5e0d9de 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java
@@ -40,6 +40,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -277,9 +278,10 @@ public class HDFSParquetImporterUtils implements 
Serializable {
    */
   public static SparkRDDWriteClient<HoodieRecordPayload> 
createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
                                                                             
int parallelism, Option<String> compactionStrategyClass, TypedProperties 
properties) {
-    HoodieCompactionConfig compactionConfig = compactionStrategyClass
+    Option<CompactionStrategy> strategyOpt = 
compactionStrategyClass.map(ReflectionUtils::loadClass);
+    HoodieCompactionConfig compactionConfig = strategyOpt
         .map(strategy -> 
HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
-            
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
+            .withCompactionStrategy(strategy).build())
         .orElseGet(() -> 
HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder().withPath(basePath)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 8cec63917b8..61976cbd724 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -53,6 +53,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StorageSchemes;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
 import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
@@ -392,9 +393,10 @@ public class UtilHelpers {
    */
   public static SparkRDDWriteClient<HoodieRecordPayload> 
createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr,
       int parallelism, Option<String> compactionStrategyClass, TypedProperties 
properties) {
-    HoodieCompactionConfig compactionConfig = compactionStrategyClass
+    Option<CompactionStrategy> strategyOpt = 
compactionStrategyClass.map(ReflectionUtils::loadClass);
+    HoodieCompactionConfig compactionConfig = strategyOpt
         .map(strategy -> 
HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
-            
.withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build())
+            .withCompactionStrategy(strategy).build())
         
.orElse(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build());
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder().withPath(basePath)

Reply via email to