This is an automated email from the ASF dual-hosted git repository.
liway pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dde57b2 [HUDI-2164] Let users build cluster plan and execute this
plan at once using HoodieClusteringJob for async clustering (#3259)
dde57b2 is described below
commit dde57b293cd22ef1bd89f44054f109a52fc20e56
Author: zhangyue19921010 <[email protected]>
AuthorDate: Mon Aug 2 08:07:59 2021 +0800
[HUDI-2164] Let users build cluster plan and execute this plan at once
using HoodieClusteringJob for async clustering (#3259)
* add --mode schedule/execute/scheduleandexecute
* fix checkstyle
* add UT testHoodieAsyncClusteringJobWithScheduleAndExecute
* log changed
* try to make ut success
* try to fix ut
* modify ut
* review changed
* code review
* code review
* code review
* code review
Co-authored-by: yuezhang <[email protected]>
---
.../apache/hudi/utilities/HoodieClusteringJob.java | 105 +++++++++++++---
.../functional/TestHoodieDeltaStreamer.java | 139 ++++++++++++++++++---
2 files changed, 207 insertions(+), 37 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index a4dc741..8f74892 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -31,7 +31,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -49,6 +51,9 @@ public class HoodieClusteringJob {
private transient FileSystem fs;
private TypedProperties props;
private final JavaSparkContext jsc;
+ public static final String EXECUTE = "execute";
+ public static final String SCHEDULE = "schedule";
+ public static final String SCHEDULE_AND_EXECUTE = "scheduleandexecute";
public HoodieClusteringJob(JavaSparkContext jsc, Config cfg) {
this.cfg = cfg;
@@ -71,8 +76,8 @@ public class HoodieClusteringJob {
public String basePath = null;
@Parameter(names = {"--table-name", "-tn"}, description = "Table name",
required = true)
public String tableName = null;
- @Parameter(names = {"--instant-time", "-it"}, description = "Clustering
Instant time, only need when cluster. "
- + "And schedule clustering can generate it.", required = false)
+ @Parameter(names = {"--instant-time", "-it"}, description = "Clustering
Instant time, only need when set --mode execute. "
+ + "When set \"--mode scheduleAndExecute\" this instant-time will
be ignored.", required = false)
public String clusteringInstantTime = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert", required = false)
public int parallelism = 1;
@@ -83,8 +88,14 @@ public class HoodieClusteringJob {
@Parameter(names = {"--retry", "-rt"}, description = "number of retries",
required = false)
public int retry = 0;
- @Parameter(names = {"--schedule", "-sc"}, description = "Schedule
clustering")
+ @Parameter(names = {"--schedule", "-sc"}, description = "Schedule
clustering @desperate soon please use \"--mode schedule\" instead")
public Boolean runSchedule = false;
+
+ @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set
\"schedule\" means make a cluster plan; "
+ + "Set \"execute\" means execute a cluster plan at given instant
which means --instant-time is needed here; "
+ + "Set \"scheduleAndExecute\" means make a cluster plan first and
execute that plan immediately", required = false)
+ public String runningMode = null;
+
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
@@ -101,15 +112,17 @@ public class HoodieClusteringJob {
public static void main(String[] args) {
final Config cfg = new Config();
JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help || args.length == 0 || (!cfg.runSchedule &&
cfg.clusteringInstantTime == null)) {
+
+ if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
+
final JavaSparkContext jsc = UtilHelpers.buildSparkContext("clustering-" +
cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
HoodieClusteringJob clusteringJob = new HoodieClusteringJob(jsc, cfg);
int result = clusteringJob.cluster(cfg.retry);
- String resultMsg = String.format("Clustering with basePath: %s, tableName:
%s, runSchedule: %s",
- cfg.basePath, cfg.tableName, cfg.runSchedule);
+ String resultMsg = String.format("Clustering with basePath: %s, tableName:
%s, runningMode: %s",
+ cfg.basePath, cfg.tableName, cfg.runningMode);
if (result == -1) {
LOG.error(resultMsg + " failed");
} else {
@@ -118,20 +131,46 @@ public class HoodieClusteringJob {
jsc.stop();
}
+ // make sure that cfg.runningMode couldn't be null
+ private static void validateRunningMode(Config cfg) {
+ // --mode has a higher priority than --schedule
+ // If we remove --schedule option in the future we need to change
runningMode default value to EXECUTE
+ if (StringUtils.isNullOrEmpty(cfg.runningMode)) {
+ cfg.runningMode = cfg.runSchedule ? SCHEDULE : EXECUTE;
+ }
+
+ if (cfg.runningMode.equalsIgnoreCase(EXECUTE) && cfg.clusteringInstantTime
== null) {
+ throw new RuntimeException("--instant-time couldn't be null when
executing clustering plan.");
+ }
+ }
+
public int cluster(int retry) {
this.fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
+ // need to do validate in case that users call cluster() directly without
setting cfg.runningMode
+ validateRunningMode(cfg);
int ret = UtilHelpers.retry(retry, () -> {
- if (cfg.runSchedule) {
- LOG.info("Do schedule");
- Option<String> instantTime = doSchedule(jsc);
- int result = instantTime.isPresent() ? 0 : -1;
- if (result == 0) {
- LOG.info("The schedule instant time is " + instantTime.get());
+ switch (cfg.runningMode.toLowerCase()) {
+ case SCHEDULE: {
+ LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+ Option<String> instantTime = doSchedule(jsc);
+ int result = instantTime.isPresent() ? 0 : -1;
+ if (result == 0) {
+ LOG.info("The schedule instant time is " + instantTime.get());
+ }
+ return result;
+ }
+ case SCHEDULE_AND_EXECUTE: {
+ LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+ return doScheduleAndCluster(jsc);
+ }
+ case EXECUTE: {
+ LOG.info("Running Mode: [" + EXECUTE + "]; Do cluster");
+ return doCluster(jsc);
+ }
+ default: {
+ LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit
the job directly");
+ return -1;
}
- return result;
- } else {
- LOG.info("Do cluster");
- return doCluster(jsc);
}
}, "Cluster failed");
return ret;
@@ -164,11 +203,37 @@ public class HoodieClusteringJob {
private Option<String> doSchedule(JavaSparkContext jsc) throws Exception {
String schemaStr = getSchemaFromLatestInstant();
try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
- if (cfg.clusteringInstantTime != null) {
- client.scheduleClusteringAtInstant(cfg.clusteringInstantTime,
Option.empty());
- return Option.of(cfg.clusteringInstantTime);
+ return doSchedule(client);
+ }
+ }
+
+ private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload>
client) {
+ if (cfg.clusteringInstantTime != null) {
+ client.scheduleClusteringAtInstant(cfg.clusteringInstantTime,
Option.empty());
+ return Option.of(cfg.clusteringInstantTime);
+ }
+ return client.scheduleClustering(Option.empty());
+ }
+
+ public int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
+ LOG.info("Step 1: Do schedule");
+ String schemaStr = getSchemaFromLatestInstant();
+ try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
+
+ Option<String> instantTime = doSchedule(client);
+ int result = instantTime.isPresent() ? 0 : -1;
+
+ if (result == -1) {
+ LOG.info("Couldn't generate cluster plan");
+ return result;
}
- return client.scheduleClustering(Option.empty());
+
+ LOG.info("The schedule instant time is " + instantTime.get());
+ LOG.info("Step 2: Do cluster");
+ JavaRDD<WriteStatus> writeResponse =
+ (JavaRDD<WriteStatus>) client.cluster(instantTime.get(),
true).getWriteStatuses();
+ return UtilHelpers.handleErrors(jsc, instantTime.get(), writeResponse);
}
}
+
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 1c68476..0ae0aeb 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -18,6 +18,10 @@
package org.apache.hudi.utilities.functional;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.ExecutorService;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
@@ -77,22 +81,22 @@ import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -103,6 +107,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -142,6 +147,38 @@ public class TestHoodieDeltaStreamer extends
TestHoodieDeltaStreamerBase {
return props;
}
+ protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String
tableBasePath, int totalRecords, String asyncCluster) throws IOException {
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT);
+ cfg.continuousMode = true;
+ cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+ cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
asyncCluster, ""));
+ return new HoodieDeltaStreamer(cfg, jsc);
+ }
+
+ protected HoodieClusteringJob initialHoodieClusteringJob(String
tableBasePath, String clusteringInstantTime, boolean runSchedule, String
scheduleAndExecute) {
+ HoodieClusteringJob.Config scheduleClusteringConfig =
buildHoodieClusteringUtilConfig(tableBasePath,
+ clusteringInstantTime, runSchedule, scheduleAndExecute);
+ return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
+ }
+
+ @AfterAll
+ public static void cleanupClass() {
+ UtilitiesTestBase.cleanupClass();
+ if (testUtils != null) {
+ testUtils.teardown();
+ }
+ }
+
+ @BeforeEach
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ super.teardown();
+ }
+
static class TestHelpers {
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath,
WriteOperationType op) {
@@ -317,6 +354,22 @@ public class TestHoodieDeltaStreamer extends
TestHoodieDeltaStreamerBase {
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ",
exp >=" + minExpected);
}
+
+ static void assertNoReplaceCommits(int expected, String tablePath,
FileSystem fs) {
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+ HoodieTimeline timeline =
meta.getActiveTimeline().getCompletedReplaceTimeline();
+ LOG.info("Timeline Instants=" +
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+ int numDeltaCommits = (int) timeline.getInstants().count();
+ assertEquals(expected, numDeltaCommits, "Got=" + numDeltaCommits + ",
exp =" + expected);
+ }
+
+ static void assertAtLeastNReplaceRequests(int minExpected, String
tablePath, FileSystem fs) {
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+ HoodieTimeline timeline =
meta.getActiveTimeline().filterPendingReplaceTimeline();
+ LOG.info("Timeline Instants=" +
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+ int numDeltaCommits = (int) timeline.getInstants().count();
+ assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ",
exp >=" + minExpected);
+ }
}
@Test
@@ -794,6 +847,10 @@ public class TestHoodieDeltaStreamer extends
TestHoodieDeltaStreamerBase {
dsFuture.get();
}
+ private void deltaStreamerTestRunner(HoodieDeltaStreamer ds,
Function<Boolean, Boolean> condition) throws Exception {
+ deltaStreamerTestRunner(ds, null, condition);
+ }
+
@Test
public void testInlineClustering() throws Exception {
String tableBasePath = dfsBasePath + "/inlineClustering";
@@ -836,32 +893,34 @@ public class TestHoodieDeltaStreamer extends
TestHoodieDeltaStreamerBase {
}
private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String
basePath,
- String
clusteringInstantTime, boolean runSchedule) {
+ String
clusteringInstantTime,
+ boolean
runSchedule) {
+ return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime,
runSchedule, null);
+ }
+
+ private HoodieClusteringJob.Config buildHoodieClusteringUtilConfig(String
basePath,
+ String
clusteringInstantTime,
+ boolean
runSchedule,
+ String
runningMode) {
HoodieClusteringJob.Config config = new HoodieClusteringJob.Config();
config.basePath = basePath;
config.clusteringInstantTime = clusteringInstantTime;
config.runSchedule = runSchedule;
config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
+ config.runningMode = runningMode;
return config;
}
@Test
public void testHoodieAsyncClusteringJob() throws Exception {
String tableBasePath = dfsBasePath + "/asyncClustering";
- // Keep it higher than batch-size to test continuous mode
- int totalRecords = 3000;
- // Initial bulk insert
- HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT);
- cfg.continuousMode = true;
- cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
- cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "",
"true", ""));
- HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
- deltaStreamerTestRunner(ds, cfg, (r) -> {
+ HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000,
"true");
+ HoodieClusteringJob scheduleClusteringJob =
initialHoodieClusteringJob(tableBasePath, null, true, null);
+
+ deltaStreamerTestRunner(ds, (r) -> {
TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
- HoodieClusteringJob.Config scheduleClusteringConfig =
buildHoodieClusteringUtilConfig(tableBasePath,
- null, true);
- HoodieClusteringJob scheduleClusteringJob = new HoodieClusteringJob(jsc,
scheduleClusteringConfig);
+
Option<String> scheduleClusteringInstantTime = Option.empty();
try {
scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
@@ -923,6 +982,52 @@ public class TestHoodieDeltaStreamer extends
TestHoodieDeltaStreamerBase {
});
}
+ @ParameterizedTest
+ @ValueSource(strings = {"schedule", "execute", "scheduleAndExecute"})
+ public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String
runningMode) throws Exception {
+ String tableBasePath = dfsBasePath + "/asyncClustering2";
+ HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000,
"false");
+ HoodieClusteringJob scheduleClusteringJob =
initialHoodieClusteringJob(tableBasePath, null, true, runningMode);
+
+ deltaStreamerTestRunner(ds, (r) -> {
+ Exception exception = null;
+ TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+ try {
+ int result = scheduleClusteringJob.cluster(0);
+ if (result == 0) {
+ LOG.info("Cluster success");
+ } else {
+ LOG.warn("Import failed");
+ return false;
+ }
+ } catch (Exception e) {
+ LOG.warn("ScheduleAndExecute clustering failed", e);
+ exception = e;
+ if (!runningMode.equalsIgnoreCase(HoodieClusteringJob.EXECUTE)) {
+ return false;
+ }
+ }
+ switch (runningMode.toLowerCase()) {
+ case HoodieClusteringJob.SCHEDULE_AND_EXECUTE: {
+ TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+ return true;
+ }
+ case HoodieClusteringJob.SCHEDULE: {
+ TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs);
+ TestHelpers.assertNoReplaceCommits(0, tableBasePath, dfs);
+ return true;
+ }
+ case HoodieClusteringJob.EXECUTE: {
+ assertNotNull(exception);
+ assertEquals(exception.getMessage(), "--instant-time couldn't be
null when executing clustering plan.");
+ return true;
+ }
+ default:
+ throw new IllegalStateException("Unexpected value: " + runningMode);
+ }
+ });
+ }
+
/**
* Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental
processing using a 2 step pipeline The first
* step involves using a SQL template to transform a source TEST-DATA-SOURCE
============================> HUDI TABLE