This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7b67da3 [Spark Load] Redirect the spark launcher's log to a separated
log file (#4470)
7b67da3 is described below
commit 7b67da30d2e96ae03a93666f813952d6c1afc211
Author: xy720 <[email protected]>
AuthorDate: Sun Aug 30 21:10:04 2020 +0800
[Spark Load] Redirect the spark launcher's log to a separated log file
(#4470)
---
.../src/main/java/org/apache/doris/catalog/Type.java | 3 +--
.../src/main/java/org/apache/doris/common/Config.java | 6 ++++++
.../java/org/apache/doris/load/loadv2/LoadManager.java | 4 +++-
.../apache/doris/load/loadv2/SparkEtlJobHandler.java | 18 ++++++++++++++++++
.../apache/doris/load/loadv2/SparkLauncherMonitor.java | 16 +++++++++++++++-
.../apache/doris/load/loadv2/SparkLoadAppHandle.java | 8 ++++++++
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 13 +++++++++++++
.../doris/load/loadv2/SparkLauncherMonitorTest.java | 18 ++++++++++++++++++
8 files changed, 82 insertions(+), 4 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
index 1b19bb3..fefaffa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
@@ -17,7 +17,6 @@
package org.apache.doris.catalog;
-import com.google.common.primitives.Longs;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.common.Pair;
@@ -28,9 +27,9 @@ import org.apache.doris.thrift.TStructField;
import org.apache.doris.thrift.TTypeDesc;
import org.apache.doris.thrift.TTypeNode;
import org.apache.doris.thrift.TTypeNodeType;
-
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 2f53fc4..b88bc06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -544,6 +544,12 @@ public class Config extends ConfigBase {
public static String spark_resource_path = "";
/**
+ * The specified spark launcher log dir
+ */
+ @ConfField
+ public static String spark_launcher_log_dir = sys_log_dir +
"/spark_launcher_log";
+
+ /**
* Default yarn client path
*/
@ConfField
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 9b0741b..5f6d6c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -45,7 +45,6 @@ import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
-
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -379,6 +378,9 @@ public class LoadManager implements Writable{
&& ((currentTimeMs - job.getFinishTimestamp()) / 1000
> Config.label_keep_max_second)) {
iter.remove();
dbIdToLabelToLoadJobs.get(job.getDbId()).get(job.getLabel()).remove(job);
+ if (job instanceof SparkLoadJob) {
+ ((SparkLoadJob) job).clearSparkLauncherLog();
+ }
}
}
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 96b7064..f3e2f91 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -47,6 +47,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
+import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
@@ -66,6 +67,7 @@ public class SparkEtlJobHandler {
private static final String CONFIG_FILE_NAME = "jobconfig.json";
private static final String JOB_CONFIG_DIR = "configs";
private static final String ETL_JOB_NAME = "doris__%s";
+ private static final String LAUNCHER_LOG = "spark_launcher_%s_%s.log";
// 5min
private static final long GET_APPID_TIMEOUT_MS = 300000L;
// 30s
@@ -79,6 +81,11 @@ public class SparkEtlJobHandler {
// delete outputPath
deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);
+ // init local dir
+ if (!FeConstants.runningUnitTest) {
+ initLocalDir();
+ }
+
// prepare dpp archive
SparkRepository.SparkArchive archive = resource.prepareArchive();
SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary();
@@ -96,6 +103,8 @@ public class SparkEtlJobHandler {
String jobArchiveHdfsPath = spark2xLibrary.remotePath;
// spark yarn stage dir
String jobStageHdfsPath = resource.getWorkingDir();
+ // spark launcher log path
+ String logFilePath = Config.spark_launcher_log_dir + "/" +
String.format(LAUNCHER_LOG, loadJobId, loadLabel);
// update archive and stage configs here
Map<String, String> sparkConfigs = resource.getSparkConfigs();
@@ -143,6 +152,7 @@ public class SparkEtlJobHandler {
if (!FeConstants.runningUnitTest) {
SparkLauncherMonitor.LogMonitor logMonitor =
SparkLauncherMonitor.createLogMonitor(handle);
logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
+ logMonitor.setRedirectLogPath(logFilePath);
logMonitor.start();
try {
logMonitor.join();
@@ -299,6 +309,14 @@ public class SparkEtlJobHandler {
return filePathToSize;
}
+ public static synchronized void initLocalDir() {
+ String logDir = Config.spark_launcher_log_dir;
+ File file = new File(logDir);
+ if (!file.exists()) {
+ file.mkdirs();
+ }
+ }
+
public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
try {
BrokerUtil.deletePath(outputPath, brokerDesc);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
index 628037d..9a66448 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
@@ -27,8 +27,11 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStream;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -67,6 +70,7 @@ public class SparkLauncherMonitor {
private SparkLoadAppHandle handle;
private long submitTimeoutMs;
private boolean isStop;
+ private OutputStream outputStream;
private static final String STATE = "state";
private static final String QUEUE = "queue";
@@ -89,6 +93,11 @@ public class SparkLauncherMonitor {
this.submitTimeoutMs = submitTimeoutMs;
}
+ public void setRedirectLogPath(String redirectLogPath) throws
IOException {
+ this.outputStream = new FileOutputStream(new
File(redirectLogPath), false);
+ this.handle.setLogPath(redirectLogPath);
+ }
+
// Normally, log monitor will automatically stop if the spark app
state changes
// to RUNNING.
// But if the spark app state changes to FAILED/KILLED/LOST, log
monitor will stop
@@ -103,7 +112,9 @@ public class SparkLauncherMonitor {
try {
outReader = new BufferedReader(new
InputStreamReader(process.getInputStream()));
while (!isStop && (line = outReader.readLine()) != null) {
- LOG.info("monitor log: " + line);
+ if (outputStream != null) {
+ outputStream.write((line + "\n").getBytes());
+ }
SparkLoadAppHandle.State oldState = handle.getState();
SparkLoadAppHandle.State newState = oldState;
// parse state and appId
@@ -186,6 +197,9 @@ public class SparkLauncherMonitor {
if (outReader != null) {
outReader.close();
}
+ if (outputStream != null) {
+ outputStream.close();
+ }
} catch (IOException e) {
LOG.warn("close buffered reader error", e);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
index 5d75ae9..bfe335a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
@@ -40,6 +40,7 @@ public class SparkLoadAppHandle {
private FinalApplicationStatus finalStatus;
private String trackingUrl;
private String user;
+ private String logPath;
private List<Listener> listeners;
@@ -112,6 +113,8 @@ public class SparkLoadAppHandle {
public String getUser() { return this.user; }
+ public String getLogPath() { return this.logPath; }
+
public void setState(State state) {
this.state = state;
this.fireEvent(false);
@@ -147,6 +150,11 @@ public class SparkLoadAppHandle {
this.fireEvent(true);
}
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ this.fireEvent(true);
+ }
+
private void fireEvent(boolean isInfoChanged) {
if (this.listeners != null) {
Iterator iterator = this.listeners.iterator();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 2cb28a2..c0f4f42 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -94,6 +94,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -713,6 +714,18 @@ public class SparkLoadJob extends BulkLoadJob {
return etlStartTimestamp;
}
+ public void clearSparkLauncherLog() {
+ if (sparkLoadAppHandle != null) {
+ String logPath = sparkLoadAppHandle.getLogPath();
+ if (!Strings.isNullOrEmpty(logPath)) {
+ File file = new File(logPath);
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+ }
+ }
+
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
index 8e55d41..8f97618 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
@@ -18,10 +18,12 @@
package org.apache.doris.load.loadv2;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
import java.net.URL;
@@ -33,6 +35,7 @@ public class SparkLauncherMonitorTest {
private FinalApplicationStatus finalApplicationStatus;
private String trackingUrl;
private String user;
+ private String logPath;
@Before
public void setUp() {
@@ -43,6 +46,7 @@ public class SparkLauncherMonitorTest {
finalApplicationStatus = FinalApplicationStatus.UNDEFINED;
trackingUrl =
"http://myhost:8388/proxy/application_1573630236805_6864759/";
user = "testugi";
+ logPath = "./spark-launcher.log";
}
@Test
@@ -54,6 +58,7 @@ public class SparkLauncherMonitorTest {
Process process = Runtime.getRuntime().exec(cmd);
handle = new SparkLoadAppHandle(process);
SparkLauncherMonitor.LogMonitor logMonitor =
SparkLauncherMonitor.createLogMonitor(handle);
+ logMonitor.setRedirectLogPath(logPath);
logMonitor.start();
try {
logMonitor.join();
@@ -63,6 +68,7 @@ public class SparkLauncherMonitorTest {
Assert.fail();
}
+ // check values
Assert.assertEquals(appId, handle.getAppId());
Assert.assertEquals(state, handle.getState());
Assert.assertEquals(queue, handle.getQueue());
@@ -70,5 +76,17 @@ public class SparkLauncherMonitorTest {
Assert.assertEquals(finalApplicationStatus, handle.getFinalStatus());
Assert.assertEquals(trackingUrl, handle.getUrl());
Assert.assertEquals(user, handle.getUser());
+
+ // check log
+ File file = new File(logPath);
+ Assert.assertTrue(file.exists());
+ }
+
+ @After
+ public void clear() {
+ File file = new File(logPath);
+ if (file.exists()) {
+ file.delete();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]