This is an automated email from the ASF dual-hosted git repository.

liway pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dde57b2  [HUDI-2164] Let users build cluster plan and execute this 
plan at once using HoodieClusteringJob for async clustering (#3259)
dde57b2 is described below

commit dde57b293cd22ef1bd89f44054f109a52fc20e56
Author: zhangyue19921010 <[email protected]>
AuthorDate: Mon Aug 2 08:07:59 2021 +0800

    [HUDI-2164] Let users build cluster plan and execute this plan at once 
using HoodieClusteringJob for async clustering (#3259)
    
    * add --mode schedule/execute/scheduleandexecute
    
    * fix checkstyle
    
    * add UT testHoodieAsyncClusteringJobWithScheduleAndExecute
    
    * log changed
    
    * try to make ut success
    
    * try to fix ut
    
    * modify ut
    
    * review changed
    
    * code review
    
    * code review
    
    * code review
    
    * code review
    
    Co-authored-by: yuezhang <[email protected]>
---
 .../apache/hudi/utilities/HoodieClusteringJob.java | 105 +++++++++++++---
 .../functional/TestHoodieDeltaStreamer.java        | 139 ++++++++++++++++++---
 2 files changed, 207 insertions(+), 37 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index a4dc741..8f74892 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -31,7 +31,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -49,6 +51,9 @@ public class HoodieClusteringJob {
   private transient FileSystem fs;
   private TypedProperties props;
   private final JavaSparkContext jsc;
+  public static final String EXECUTE = "execute";
+  public static final String SCHEDULE = "schedule";
+  public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
 
   public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
     this.cfg = cfg;
@@ -71,8 +76,8 @@ public class HoodieClusteringJob {
     public String basePath = null;
     @Parameter(names = {"--table-name", "-tn"}, description = "Table name", 
required = true)
     public String tableName = null;
-    @Parameter(names = {"--instant-time", "-it"}, description = "Clustering 
Instant time, only need when cluster. "
-        + "And schedule clustering can generate it.", required = false)
+    @Parameter(names = {"--instant-time", "-it"}, description = "Clustering 
Instant time, only need when set --mode execute. "
+            + "When set \"--mode scheduleAndExecute\" this instant-time will 
be ignored.", required = false)
     public String clusteringInstantTime = null;
     @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism 
for hoodie insert", required = false)
     public int parallelism = 1;
@@ -83,8 +88,14 @@ public class HoodieClusteringJob {
     @Parameter(names = {"--retry", "-rt"}, description = "number of retries", 
required = false)
     public int retry = 0;
 
-    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule 
clustering")
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule 
clustering @desperate soon please use \"--mode schedule\" instead")
     public Boolean runSchedule = false;
+
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set 
\"schedule\" means make a cluster plan; "
+            + "Set \"execute\" means execute a cluster plan at given instant 
which means --instant-time is needed here; "
+            + "Set \"scheduleAndExecute\" means make a cluster plan first and 
execute that plan immediately", required = false)
+    public String runningMode = null;
+
     @Parameter(names = {"--help", "-h"}, help = true)
     public Boolean help = false;
 
@@ -101,15 +112,17 @@ public class HoodieClusteringJob {
   public static void main(String[] args) {
     final Config cfg = new Config();
     JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0 || (!cfg.runSchedule && 
cfg.clusteringInstantTime == null)) {
+
+    if (cfg.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
+
     final JavaSparkContext jsc = UtilHelpers.buildSparkContext("clustering-" + 
cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
     HoodieClusteringJob clusteringJob = new HoodieClusteringJob(jsc, cfg);
     int result = clusteringJob.cluster(cfg.retry);
-    String resultMsg = String.format("Clustering with basePath: %s, tableName: 
%s, runSchedule: %s",
-        cfg.basePath, cfg.tableName, cfg.runSchedule);
+    String resultMsg = String.format("Clustering with basePath: %s, tableName: 
%s, runningMode: %s",
+        cfg.basePath, cfg.tableName, cfg.runningMode);
     if (result == -1) {
       LOG.error(resultMsg + " failed");
     } else {
@@ -118,20 +131,46 @@ public class HoodieClusteringJob {
     jsc.stop();
   }
 
+  // make sure that cfg.runningMode couldn't be null
+  private static void validateRunningMode(Config cfg) {
+    // --mode has a higher priority than --schedule
+    // If we remove --schedule option in the future we need to change 
runningMode default value to EXECUTE
+    if (StringUtils.isNullOrEmpty(cfg.runningMode)) {
+      cfg.runningMode = cfg.runSchedule ? SCHEDULE : EXECUTE;
+    }
+
+    if (cfg.runningMode.equalsIgnoreCase(EXECUTE) && cfg.clusteringInstantTime 
== null) {
+      throw new RuntimeException("--instant-time couldn't be null when 
executing clustering plan.");
+    }
+  }
+
   public int cluster(int retry) {
     this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
+    // need to do validate in case that users call cluster() directly without 
setting cfg.runningMode
+    validateRunningMode(cfg);
     int ret = UtilHelpers.retry(retry, () -> {
-      if (cfg.runSchedule) {
-        LOG.info("Do schedule");
-        Option<String> instantTime = doSchedule(jsc);
-        int result = instantTime.isPresent() ? 0 : -1;
-        if (result == 0) {
-          LOG.info("The schedule instant time is " + instantTime.get());
+      switch (cfg.runningMode.toLowerCase()) {
+        case SCHEDULE: {
+          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          Option<String> instantTime = doSchedule(jsc);
+          int result = instantTime.isPresent() ? 0 : -1;
+          if (result == 0) {
+            LOG.info("The schedule instant time is " + instantTime.get());
+          }
+          return result;
+        }
+        case SCHEDULE_AND_EXECUTE: {
+          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+          return doScheduleAndCluster(jsc);
+        }
+        case EXECUTE: {
+          LOG.info("Running Mode: [" + EXECUTE + "]; Do cluster");
+          return doCluster(jsc);
+        }
+        default: {
+          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit 
the job directly");
+          return -1;
         }
-        return result;
-      } else {
-        LOG.info("Do cluster");
-        return doCluster(jsc);
       }
     }, "Cluster failed");
     return ret;
@@ -164,11 +203,37 @@ public class HoodieClusteringJob {
   private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
     String schemaStr = getSchemaFromLatestInstant();
     try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
-      if (cfg.clusteringInstantTime != null) {
-        client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, 
Option.empty());
-        return Option.of(cfg.clusteringInstantTime);
+      return doSchedule(client);
+    }
+  }
+
+  private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> 
client) {
+    if (cfg.clusteringInstantTime != null) {
+      client.scheduleClusteringAtInstant(cfg.clusteringInstantTime, 
Option.empty());
+      return Option.of(cfg.clusteringInstantTime);
+    }
+    return client.scheduleClustering(Option.empty());
+  }
+
+  public int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
+    LOG.info("Step 1: Do schedule");
+    String schemaStr = getSchemaFromLatestInstant();
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = 
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, 
Option.empty(), props)) {
+
+      Option<String> instantTime = doSchedule(client);
+      int result = instantTime.isPresent() ? 0 : -1;
+
+      if (result == -1) {
+        LOG.info("Couldn't generate cluster plan");
+        return result;
       }
-      return client.scheduleClustering(Option.empty());
+
+      LOG.info("The schedule instant time is " + instantTime.get());
+      LOG.info("Step 2: Do cluster");
+      JavaRDD<WriteStatus> writeResponse =
+              (JavaRDD<WriteStatus>) client.cluster(instantTime.get(), 
true).getWriteStatuses();
+      return UtilHelpers.handleErrors(jsc, instantTime.get(), writeResponse);
     }
   }
+
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 1c68476..0ae0aeb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.utilities.functional;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.ExecutorService;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
@@ -77,22 +81,22 @@ import org.apache.spark.sql.api.java.UDF4;
 import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -103,6 +107,7 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -142,6 +147,38 @@ public class TestHoodieDeltaStreamer extends 
TestHoodieDeltaStreamerBase {
     return props;
   }
 
+  protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String 
tableBasePath, int totalRecords, String asyncCluster) throws IOException {
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
+    cfg.continuousMode = true;
+    cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
asyncCluster, ""));
+    return new HoodieDeltaStreamer(cfg, jsc);
+  }
+
+  protected HoodieClusteringJob initialHoodieClusteringJob(String 
tableBasePath, String clusteringInstantTime, boolean runSchedule, String 
scheduleAndExecute) {
+    HoodieClusteringJob.Config scheduleClusteringConfig = 
buildHoodieClusteringUtilConfig(tableBasePath,
+            clusteringInstantTime, runSchedule, scheduleAndExecute);
+    return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
+  }
+
+  @AfterAll
+  public static void cleanupClass() {
+    UtilitiesTestBase.cleanupClass();
+    if (testUtils != null) {
+      testUtils.teardown();
+    }
+  }
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+  }
+
   static class TestHelpers {
 
     static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, 
WriteOperationType op) {
@@ -317,6 +354,22 @@ public class TestHoodieDeltaStreamer extends 
TestHoodieDeltaStreamerBase {
       int numDeltaCommits = (int) timeline.getInstants().count();
       assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
     }
+
+    static void assertNoReplaceCommits(int expected, String tablePath, 
FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCompletedReplaceTimeline();
+      LOG.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+      int numDeltaCommits = (int) timeline.getInstants().count();
+      assertEquals(expected, numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp =" + expected);
+    }
+
+    static void assertAtLeastNReplaceRequests(int minExpected, String 
tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().filterPendingReplaceTimeline();
+      LOG.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+      int numDeltaCommits = (int) timeline.getInstants().count();
+      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
+    }
   }
 
   @Test
@@ -794,6 +847,10 @@ public class TestHoodieDeltaStreamer extends 
TestHoodieDeltaStreamerBase {
     dsFuture.get();
   }
 
+  private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, 
Function<Boolean, Boolean> condition) throws Exception {
+    deltaStreamerTestRunner(ds, null, condition);
+  }
+
   @Test
   public void testInlineClustering() throws Exception {
     String tableBasePath = dfsBasePath + "/inlineClustering";
@@ -836,32 +893,34 @@ public class TestHoodieDeltaStreamer extends 
TestHoodieDeltaStreamerBase {
   }
 
   private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String 
basePath,
-                                                                  String 
clusteringInstantTime, boolean runSchedule) {
+                                                                     String 
clusteringInstantTime,
+                                                                     boolean 
runSchedule) {
+    return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, 
runSchedule, null);
+  }
+
+  private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String 
basePath,
+                                                                     String 
clusteringInstantTime,
+                                                                     boolean 
runSchedule,
+                                                                     String 
runningMode) {
     HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
     config.basePath = basePath;
     config.clusteringInstantTime = clusteringInstantTime;
     config.runSchedule = runSchedule;
     config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
+    config.runningMode = runningMode;
     return config;
   }
 
   @Test
   public void testHoodieAsyncClusteringJob() throws Exception {
     String tableBasePath = dfsBasePath + "/asyncClustering";
-    // Keep it higher than batch-size to test continuous mode
-    int totalRecords = 3000;
 
-    // Initial bulk insert
-    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
-    cfg.continuousMode = true;
-    cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", ""));
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    deltaStreamerTestRunner(ds, cfg, (r) -> {
+    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"true");
+    HoodieClusteringJob scheduleClusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, true, null);
+
+    deltaStreamerTestRunner(ds, (r) -> {
       TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
-      HoodieClusteringJob.Config scheduleClusteringConfig = 
buildHoodieClusteringUtilConfig(tableBasePath,
-          null, true);
-      HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc, 
scheduleClusteringConfig);
+
       Option<String> scheduleClusteringInstantTime = Option.empty();
       try {
         scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
@@ -923,6 +982,52 @@ public class TestHoodieDeltaStreamer extends 
TestHoodieDeltaStreamerBase {
     });
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"})
+  public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String 
runningMode) throws Exception {
+    String tableBasePath = dfsBasePath + "/asyncClustering2";
+    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"false");
+    HoodieClusteringJob scheduleClusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, true, runningMode);
+
+    deltaStreamerTestRunner(ds, (r) -> {
+      Exception exception = null;
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      try {
+        int result = scheduleClusteringJob.cluster(0);
+        if (result == 0) {
+          LOG.info("Cluster success");
+        } else {
+          LOG.warn("Import failed");
+          return false;
+        }
+      } catch (Exception e) {
+        LOG.warn("ScheduleAndExecute clustering failed", e);
+        exception = e;
+        if (!runningMode.equalsIgnoreCase(HoodieClusteringJob.EXECUTE)) {
+          return false;
+        }
+      }
+      switch (runningMode.toLowerCase()) {
+        case HoodieClusteringJob.SCHEDULE_AND_EXECUTE: {
+          TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+          return true;
+        }
+        case HoodieClusteringJob.SCHEDULE: {
+          TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs);
+          TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs);
+          return true;
+        }
+        case HoodieClusteringJob.EXECUTE: {
+          assertNotNull(exception);
+          assertEquals(exception.getMessage(), "--instant-time couldn't be 
null when executing clustering plan.");
+          return true;
+        }
+        default:
+          throw new IllegalStateException("Unexpected value: " + runningMode);
+      }
+    });
+  }
+
   /**
    * Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental 
processing using a 2 step pipeline The first
    * step involves using a SQL template to transform a source TEST-DATA-SOURCE 
============================> HUDI TABLE

Reply via email to