This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 3aa72f3 KYLIN-4875 Remove executor configurations when execute
resource detect step (local mode)
3aa72f3 is described below
commit 3aa72f3fdc1265aa2a819e5a475f4419d8efcc71
Author: Zhichao Zhang <[email protected]>
AuthorDate: Mon Jan 18 17:21:14 2021 +0800
KYLIN-4875 Remove executor configurations when execute resource detect step
(local mode)
---
build/conf/spark-driver-log4j.properties | 2 --
.../org/apache/kylin/common/KylinConfigBase.java | 22 +++++++++++----
.../common/logging/AbstractHdfsLogAppender.java | 33 +++++++++++++++++++---
.../engine/spark/job/NResourceDetectStep.java | 9 ++++++
.../kylin/engine/spark/job/NSparkExecutable.java | 16 +++++++++--
5 files changed, 69 insertions(+), 13 deletions(-)
diff --git a/build/conf/spark-driver-log4j.properties
b/build/conf/spark-driver-log4j.properties
index 8b0e82d..04c648c 100644
--- a/build/conf/spark-driver-log4j.properties
+++ b/build/conf/spark-driver-log4j.properties
@@ -20,9 +20,7 @@
log4j.rootLogger=INFO,hdfs
log4j.logger.org.apache.kylin=DEBUG
log4j.logger.org.springframework=WARN
-log4j.logger.org.springframework.security=WARN
log4j.logger.org.apache.spark=WARN
-log4j.logger.org.apache.spark.ContextCleaner=WARN
# hdfs file appender
log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index b297806..4941595 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2918,14 +2918,19 @@ public abstract class KylinConfigBase implements
Serializable {
/**
* Used to upload user-defined log4j configuration
+ *
+ * @param isLocal run spark local mode or not
*/
- public String sparkUploadFiles() {
+ public String sparkUploadFiles(boolean isLocal) {
try {
- File storageFile =
FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf",
- "spark-executor-log4j.properties");
String path1 = "";
- if (storageFile != null) {
- path1 = storageFile.getCanonicalPath();
+ if (!isLocal) {
+ File storageFile =
FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf",
+ "spark-executor-log4j.properties");
+ if (storageFile != null) {
+ path1 = storageFile.getCanonicalPath();
+
+ }
}
return getOptional("kylin.query.engine.sparder-additional-files",
path1);
@@ -2934,6 +2939,13 @@ public abstract class KylinConfigBase implements
Serializable {
}
}
+ /**
+ * Used to upload user-defined log4j configuration
+ */
+ public String sparkUploadFiles() {
+ return sparkUploadFiles(false);
+ }
+
@ConfigTag(ConfigTag.Tag.NOT_CLEAR)
public String sparderJars() {
try {
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
index 5a90fb2..ff5240c 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
+++
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
@@ -133,7 +133,7 @@ public abstract class AbstractHdfsLogAppender extends
AppenderSkeleton {
logBufferQue = new LinkedBlockingDeque<>(getLogQueueCapacity());
appendHdfsService = Executors.newSingleThreadExecutor();
appendHdfsService.execute(this::checkAndFlushLog);
- Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+ Runtime.getRuntime().addShutdownHook(new Thread(this::closing));
LogLog.warn(String.format(Locale.ROOT, "%s started ...",
getAppenderName()));
}
@@ -153,20 +153,45 @@ public abstract class AbstractHdfsLogAppender extends
AppenderSkeleton {
}
}
+ /**
+ * flush log when shutdowning
+ */
+ public void closing() {
+ LogLog.warn(String.format(Locale.ROOT, "%s flush log when shutdown
...",
+ getAppenderName()));
+ synchronized (closeLock) {
+ if (!this.closed) {
+ List<LoggingEvent> transaction = Lists.newArrayList();
+ try {
+ flushLog(getLogBufferQue().size(), transaction);
+ } catch (Exception e) {
+ transaction.forEach(this::printLoggingEvent);
+ try {
+ while (!getLogBufferQue().isEmpty()) {
+ printLoggingEvent(getLogBufferQue().take());
+ }
+ } catch (Exception ie) {
+ LogLog.error("clear the logging buffer queue failed!",
ie);
+ }
+ }
+ }
+ }
+ }
+
@Override
public void close() {
+ LogLog.warn(String.format(Locale.ROOT, "%s attempt to closing ...",
+ getAppenderName()));
synchronized (closeLock) {
if (!this.closed) {
this.closed = true;
-
List<LoggingEvent> transaction = Lists.newArrayList();
try {
flushLog(getLogBufferQue().size(), transaction);
-
- closeWriter();
if (appendHdfsService != null &&
!appendHdfsService.isShutdown()) {
appendHdfsService.shutdownNow();
}
+ closeWriter();
} catch (Exception e) {
transaction.forEach(this::printLoggingEvent);
try {
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
index 811dc11..2aed71e 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
@@ -28,6 +28,10 @@ import
org.apache.kylin.job.execution.DefaultChainedExecutable;
public class NResourceDetectStep extends NSparkExecutable {
+ private final static String[] excludedSparkConf = new String[]
{"spark.executor.cores",
+ "spark.executor.memoryOverhead", "spark.executor.extraJavaOptions",
+ "spark.executor.instances", "spark.executor.memory",
"spark.executor.extraClassPath"};
+
// called by reflection
public NResourceDetectStep() {
@@ -62,6 +66,11 @@ public class NResourceDetectStep extends NSparkExecutable {
sparkConfigOverride.put("spark.master", "local");
sparkConfigOverride.put("spark.sql.autoBroadcastJoinThreshold", "-1");
sparkConfigOverride.put("spark.sql.adaptive.enabled", "false");
+ for (String sparkConf : excludedSparkConf) {
+ if (sparkConfigOverride.containsKey(sparkConf)) {
+ sparkConfigOverride.remove(sparkConf);
+ }
+ }
return sparkConfigOverride;
}
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 4558cf0..475713f 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -337,6 +337,7 @@ public class NSparkExecutable extends AbstractExecutable {
}
sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ",
hdfsWorkingDir));
sb.append(String.format(Locale.ROOT, "
-Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
+ sb.append(String.format(Locale.ROOT, " -Dlog4j.debug=%s ", "true"));
sb.append(String.format(Locale.ROOT, "
-Dspark.driver.rest.server.ip=%s ", serverIp));
sb.append(String.format(Locale.ROOT, "
-Dspark.driver.rest.server.port=%s ", serverPort));
sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s
", getId()));
@@ -356,13 +357,18 @@ public class NSparkExecutable extends AbstractExecutable {
for (Entry<String, String> entry : sparkConfs.entrySet()) {
appendSparkConf(sb, entry.getKey(), entry.getValue());
}
- appendSparkConf(sb, "spark.executor.extraClassPath",
Paths.get(kylinJobJar).getFileName().toString());
+ if (!isLocalMaster(sparkConfs)) {
+ appendSparkConf(sb, "spark.executor.extraClassPath",
Paths.get(kylinJobJar).getFileName().toString());
+ }
appendSparkConf(sb, "spark.driver.extraClassPath", kylinJobJar);
if (sparkConfs.containsKey("spark.sql.hive.metastore.jars")) {
jars = jars + "," +
sparkConfs.get("spark.sql.hive.metastore.jars");
}
- sb.append("--files ").append(config.sparkUploadFiles()).append(" ");
+ String sparkUploadFiles =
config.sparkUploadFiles(isLocalMaster(sparkConfs));
+ if (StringUtils.isNotBlank(sparkUploadFiles)) {
+ sb.append("--files ").append(sparkUploadFiles).append(" ");
+ }
sb.append("--name job_step_%s ");
sb.append("--jars %s %s %s");
String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf,
sparkSubmitCmd, getId(), jars, kylinJobJar,
@@ -410,6 +416,12 @@ public class NSparkExecutable extends AbstractExecutable {
}
}
+ protected boolean isLocalMaster(Map<String, String> sparkConfs) {
+ String master = sparkConfs.getOrDefault("spark.master", "yarn");
+ return (master.equalsIgnoreCase("local")) ||
(master.toLowerCase(Locale.ROOT)
+ .startsWith("local["));
+ }
+
public boolean needMergeMetadata() {
return false;
}