This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4e5c867 [HUDI-740]Fix can not specify the sparkMaster and code clean
for SparkUtil (#1452)
4e5c867 is described below
commit 4e5c8671ef3213ffa5c40f09aae27aacfa20f907
Author: hongdd <[email protected]>
AuthorDate: Wed Apr 8 21:33:15 2020 +0800
[HUDI-740]Fix can not specify the sparkMaster and code clean for SparkUtil
(#1452)
---
.../org/apache/hudi/cli/HoodieCliSparkConfig.java | 46 ++++++++++++++++
.../apache/hudi/cli/commands/CleansCommand.java | 2 +-
.../hudi/cli/commands/CompactionCommand.java | 16 +++---
.../org/apache/hudi/cli/commands/SparkMain.java | 62 +++++++++-------------
.../java/org/apache/hudi/cli/utils/SparkUtil.java | 36 ++++++++-----
5 files changed, 103 insertions(+), 59 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCliSparkConfig.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCliSparkConfig.java
new file mode 100644
index 0000000..0d64135
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCliSparkConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli;
+
+/**
+ * Class storing configs for init spark.
+ */
+public class HoodieCliSparkConfig {
+ /**
+ * Configs to start spark application.
+ */
+ public static final String CLI_SPARK_MASTER = "SPARK_MASTER";
+ public static final String CLI_SERIALIZER = "spark.serializer";
+ public static final String CLI_DRIVER_MAX_RESULT_SIZE =
"spark.driver.maxResultSize";
+ public static final String CLI_EVENT_LOG_OVERWRITE =
"spark.eventLog.overwrite";
+ public static final String CLI_EVENT_LOG_ENABLED = "spark.eventLog.enabled";
+ public static final String CLI_EXECUTOR_MEMORY = "spark.executor.memory";
+
+ /**
+ * Hadoop output config.
+ */
+ public static final String CLI_MAPRED_OUTPUT_COMPRESS =
"spark.hadoop.mapred.output.compress";
+ public static final String CLI_MAPRED_OUTPUT_COMPRESSION_CODEC =
"spark.hadoop.mapred.output.compression.codec";
+ public static final String CLI_MAPRED_OUTPUT_COMPRESSION_TYPE =
"spark.hadoop.mapred.output.compression.type";
+
+ /**
+ * Parquet file config.
+ */
+ public static final String CLI_PARQUET_ENABLE_SUMMARY_METADATA =
"parquet.enable.summary-metadata";
+}
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
index 34321ef..609e44b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java
@@ -139,7 +139,7 @@ public class CleansCommand implements CommandMarker {
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
String cmd = SparkMain.SparkCommand.CLEAN.toString();
- sparkLauncher.addAppArgs(cmd, metaClient.getBasePath(), master,
propsFilePath, sparkMemory);
+ sparkLauncher.addAppArgs(cmd, master, sparkMemory,
metaClient.getBasePath(), propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 0843a87..a4c70da 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -423,8 +423,8 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher =
SparkUtil.initLauncher(sparkPropertiesPath);
- sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(),
client.getBasePath(),
- compactionInstant, outputPathStr, parallelism, master, sparkMemory);
+ sparkLauncher.addAppArgs(SparkCommand.COMPACT_VALIDATE.toString(),
master, sparkMemory, client.getBasePath(),
+ compactionInstant, outputPathStr, parallelism);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
@@ -484,8 +484,8 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher =
SparkUtil.initLauncher(sparkPropertiesPath);
-
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(),
client.getBasePath(),
- compactionInstant, outputPathStr, parallelism, master, sparkMemory,
Boolean.valueOf(skipV).toString(),
+
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_PLAN.toString(),
master, sparkMemory, client.getBasePath(),
+ compactionInstant, outputPathStr, parallelism,
Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -528,8 +528,8 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher =
SparkUtil.initLauncher(sparkPropertiesPath);
-
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(),
client.getBasePath(),
- fileId, outputPathStr, "1", master, sparkMemory,
Boolean.valueOf(skipV).toString(),
+
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(),
master, sparkMemory, client.getBasePath(),
+ fileId, outputPathStr, "1", Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -574,8 +574,8 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher =
SparkUtil.initLauncher(sparkPropertiesPath);
- sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(),
client.getBasePath(),
- compactionInstant, outputPathStr, parallelism, master, sparkMemory,
Boolean.valueOf(dryRun).toString());
+ sparkLauncher.addAppArgs(SparkCommand.COMPACT_REPAIR.toString(), master,
sparkMemory, client.getBasePath(),
+ compactionInstant, outputPathStr, parallelism,
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 89e62b9..e8f4c6a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -22,6 +22,7 @@ import org.apache.hudi.cli.DedupeSparkJob;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -62,7 +63,9 @@ public class SparkMain {
SparkCommand cmd = SparkCommand.valueOf(command);
- JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" +
command);
+ JavaSparkContext jsc = sparkMasterContained(cmd)
+ ? SparkUtil.initJavaSparkConf("hoodie-cli-" + command,
Option.of(args[1]), Option.of(args[2]))
+ : SparkUtil.initJavaSparkConf("hoodie-cli-" + command);
int returnCode = 0;
switch (cmd) {
case ROLLBACK:
@@ -118,38 +121,38 @@ public class SparkMain {
break;
case COMPACT_VALIDATE:
assert (args.length == 7);
- doCompactValidate(jsc, args[1], args[2], args[3],
Integer.parseInt(args[4]), args[5], args[6]);
+ doCompactValidate(jsc, args[3], args[4], args[5],
Integer.parseInt(args[6]));
returnCode = 0;
break;
case COMPACT_REPAIR:
assert (args.length == 8);
- doCompactRepair(jsc, args[1], args[2], args[3],
Integer.parseInt(args[4]), args[5], args[6],
+ doCompactRepair(jsc, args[3], args[4], args[5],
Integer.parseInt(args[6]),
Boolean.parseBoolean(args[7]));
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_FILE:
assert (args.length == 9);
- doCompactUnscheduleFile(jsc, args[1], args[2], args[3],
Integer.parseInt(args[4]), args[5], args[6],
+ doCompactUnscheduleFile(jsc, args[3], args[4], args[5],
Integer.parseInt(args[6]),
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_PLAN:
assert (args.length == 9);
- doCompactUnschedule(jsc, args[1], args[2], args[3],
Integer.parseInt(args[4]), args[5], args[6],
+ doCompactUnschedule(jsc, args[3], args[4], args[5],
Integer.parseInt(args[6]),
Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8]));
returnCode = 0;
break;
case CLEAN:
assert (args.length >= 5);
propsFilePath = null;
- if (!StringUtils.isNullOrEmpty(args[3])) {
- propsFilePath = args[3];
+ if (!StringUtils.isNullOrEmpty(args[4])) {
+ propsFilePath = args[4];
}
configs = new ArrayList<>();
if (args.length > 5) {
configs.addAll(Arrays.asList(args).subList(5, args.length));
}
- clean(jsc, args[1], args[2], propsFilePath, args[4], configs);
+ clean(jsc, args[3], propsFilePath, configs);
break;
default:
break;
@@ -157,14 +160,16 @@ public class SparkMain {
System.exit(returnCode);
}
- private static void clean(JavaSparkContext jsc, String basePath, String
sparkMaster, String propsFilePath,
- String sparkMemory, List<String> configs) throws
Exception {
+ private static boolean sparkMasterContained(SparkCommand command) {
+ List<SparkCommand> masterContained =
Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
+ SparkCommand.COMPACT_UNSCHEDULE_PLAN,
SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN);
+ return masterContained.contains(command);
+ }
+
+ private static void clean(JavaSparkContext jsc, String basePath, String
propsFilePath,
+ List<String> configs) {
HoodieCleaner.Config cfg = new HoodieCleaner.Config();
cfg.basePath = basePath;
- if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
- jsc.getConf().setMaster(sparkMaster);
- }
- jsc.getConf().set("spark.executor.memory", sparkMemory);
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
new HoodieCleaner(cfg, jsc).run();
@@ -172,7 +177,7 @@ public class SparkMain {
private static int dataLoad(JavaSparkContext jsc, String command, String
srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism,
String schemaFile, String sparkMemory,
- int retry, String propsFilePath, List<String>
configs) {
+ int retry, String propsFilePath, List<String> configs) {
Config cfg = new Config();
cfg.command = command;
cfg.srcPath = srcPath;
@@ -190,22 +195,18 @@ public class SparkMain {
}
private static void doCompactValidate(JavaSparkContext jsc, String basePath,
String compactionInstant,
- String outputPath, int parallelism, String sparkMaster, String
sparkMemory) throws Exception {
+ String outputPath, int parallelism) throws Exception {
HoodieCompactionAdminTool.Config cfg = new
HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.VALIDATE;
cfg.outputPath = outputPath;
cfg.compactionInstantTime = compactionInstant;
cfg.parallelism = parallelism;
- if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
- jsc.getConf().setMaster(sparkMaster);
- }
- jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static void doCompactRepair(JavaSparkContext jsc, String basePath,
String compactionInstant,
- String outputPath, int parallelism, String sparkMaster, String
sparkMemory, boolean dryRun) throws Exception {
+ String outputPath, int parallelism, boolean dryRun) throws Exception {
HoodieCompactionAdminTool.Config cfg = new
HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.REPAIR;
@@ -213,16 +214,11 @@ public class SparkMain {
cfg.compactionInstantTime = compactionInstant;
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
- if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
- jsc.getConf().setMaster(sparkMaster);
- }
- jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static void doCompactUnschedule(JavaSparkContext jsc, String
basePath, String compactionInstant,
- String outputPath, int parallelism, String sparkMaster, String
sparkMemory, boolean skipValidation,
- boolean dryRun) throws Exception {
+ String outputPath, int parallelism, boolean skipValidation, boolean
dryRun) throws Exception {
HoodieCompactionAdminTool.Config cfg = new
HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.UNSCHEDULE_PLAN;
@@ -231,15 +227,11 @@ public class SparkMain {
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
cfg.skipValidation = skipValidation;
- if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
- jsc.getConf().setMaster(sparkMaster);
- }
- jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static void doCompactUnscheduleFile(JavaSparkContext jsc, String
basePath, String fileId, String outputPath,
- int parallelism, String sparkMaster, String sparkMemory, boolean
skipValidation, boolean dryRun)
+ int parallelism, boolean skipValidation, boolean dryRun)
throws Exception {
HoodieCompactionAdminTool.Config cfg = new
HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
@@ -249,16 +241,12 @@ public class SparkMain {
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
cfg.skipValidation = skipValidation;
- if ((null != sparkMaster) && (!sparkMaster.isEmpty())) {
- jsc.getConf().setMaster(sparkMaster);
- }
- jsc.getConf().set("spark.executor.memory", sparkMemory);
new HoodieCompactionAdminTool(cfg).run(jsc);
}
private static int compact(JavaSparkContext jsc, String basePath, String
tableName, String compactionInstant,
int parallelism, String schemaFile, String sparkMemory, int retry,
boolean schedule, String propsFilePath,
- List<String> configs) {
+ List<String> configs) {
HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath;
cfg.tableName = tableName;
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
index c7ac6dd..ba9d1e5 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
@@ -18,10 +18,12 @@
package org.apache.hudi.cli.utils;
+import org.apache.hudi.cli.HoodieCliSparkConfig;
import org.apache.hudi.cli.commands.SparkEnvCommand;
import org.apache.hudi.cli.commands.SparkMain;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.spark.SparkConf;
@@ -38,7 +40,7 @@ import java.util.Objects;
*/
public class SparkUtil {
- public static final String DEFAULT_SPARK_MASTER = "yarn-client";
+ private static final String DEFAULT_SPARK_MASTER = "yarn-client";
/**
* TODO: Need to fix a bunch of hardcoded stuff here eg: history server,
spark distro.
@@ -61,29 +63,37 @@ public class SparkUtil {
}
public static JavaSparkContext initJavaSparkConf(String name) {
+ return initJavaSparkConf(name, Option.empty(), Option.empty());
+ }
+
+ public static JavaSparkContext initJavaSparkConf(String name, Option<String>
master,
+ Option<String> executorMemory) {
SparkConf sparkConf = new SparkConf().setAppName(name);
- String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER");
- if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) {
+ String defMaster =
master.orElse(sparkConf.getenv(HoodieCliSparkConfig.CLI_SPARK_MASTER));
+ if ((null == defMaster) || (defMaster.isEmpty())) {
sparkConf.setMaster(DEFAULT_SPARK_MASTER);
} else {
- sparkConf.setMaster(defMasterFromEnv);
+ sparkConf.setMaster(defMaster);
}
- sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
- sparkConf.set("spark.driver.maxResultSize", "2g");
- sparkConf.set("spark.eventLog.overwrite", "true");
- sparkConf.set("spark.eventLog.enabled", "true");
+ sparkConf.set(HoodieCliSparkConfig.CLI_SERIALIZER,
"org.apache.spark.serializer.KryoSerializer");
+ sparkConf.set(HoodieCliSparkConfig.CLI_DRIVER_MAX_RESULT_SIZE, "2g");
+ sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_OVERWRITE, "true");
+ sparkConf.set(HoodieCliSparkConfig.CLI_EVENT_LOG_ENABLED, "true");
+ if (executorMemory.isPresent()) {
+ sparkConf.set(HoodieCliSparkConfig.CLI_EXECUTOR_MEMORY,
executorMemory.get());
+ }
// Configure hadoop conf
- sparkConf.set("spark.hadoop.mapred.output.compress", "true");
- sparkConf.set("spark.hadoop.mapred.output.compression.codec", "true");
- sparkConf.set("spark.hadoop.mapred.output.compression.codec",
"org.apache.hadoop.io.compress.GzipCodec");
- sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
+ sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESS, "true");
+ sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
"true");
+ sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_CODEC,
"org.apache.hadoop.io.compress.GzipCodec");
+ sparkConf.set(HoodieCliSparkConfig.CLI_MAPRED_OUTPUT_COMPRESSION_TYPE,
"BLOCK");
HoodieWriteClient.registerClasses(sparkConf);
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata",
false);
+
jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA,
false);
FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
return jsc;
}