This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b67da3  [Spark Load] Redirect the spark launcher's log to a separated 
log file (#4470)
7b67da3 is described below

commit 7b67da30d2e96ae03a93666f813952d6c1afc211
Author: xy720 <[email protected]>
AuthorDate: Sun Aug 30 21:10:04 2020 +0800

    [Spark Load] Redirect the spark launcher's log to a separated log file 
(#4470)
---
 .../src/main/java/org/apache/doris/catalog/Type.java   |  3 +--
 .../src/main/java/org/apache/doris/common/Config.java  |  6 ++++++
 .../java/org/apache/doris/load/loadv2/LoadManager.java |  4 +++-
 .../apache/doris/load/loadv2/SparkEtlJobHandler.java   | 18 ++++++++++++++++++
 .../apache/doris/load/loadv2/SparkLauncherMonitor.java | 16 +++++++++++++++-
 .../apache/doris/load/loadv2/SparkLoadAppHandle.java   |  8 ++++++++
 .../org/apache/doris/load/loadv2/SparkLoadJob.java     | 13 +++++++++++++
 .../doris/load/loadv2/SparkLauncherMonitorTest.java    | 18 ++++++++++++++++++
 8 files changed, 82 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
index 1b19bb3..fefaffa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.catalog;
 
-import com.google.common.primitives.Longs;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.common.Pair;
@@ -28,9 +27,9 @@ import org.apache.doris.thrift.TStructField;
 import org.apache.doris.thrift.TTypeDesc;
 import org.apache.doris.thrift.TTypeNode;
 import org.apache.doris.thrift.TTypeNodeType;
-
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 2f53fc4..b88bc06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -544,6 +544,12 @@ public class Config extends ConfigBase {
     public static String spark_resource_path = "";
 
     /**
+     * The specified spark launcher log dir
+     */
+    @ConfField
+    public static String spark_launcher_log_dir = sys_log_dir + 
"/spark_launcher_log";
+
+    /**
      * Default yarn client path
      */
     @ConfField
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 9b0741b..5f6d6c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -45,7 +45,6 @@ import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
-
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -379,6 +378,9 @@ public class LoadManager implements Writable{
                         && ((currentTimeMs - job.getFinishTimestamp()) / 1000 
> Config.label_keep_max_second)) {
                     iter.remove();
                     
dbIdToLabelToLoadJobs.get(job.getDbId()).get(job.getLabel()).remove(job);
+                    if (job instanceof SparkLoadJob) {
+                        ((SparkLoadJob) job).clearSparkLauncherLog();
+                    }
                 }
             }
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 96b7064..f3e2f91 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -47,6 +47,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.spark.launcher.SparkLauncher;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
@@ -66,6 +67,7 @@ public class SparkEtlJobHandler {
     private static final String CONFIG_FILE_NAME = "jobconfig.json";
     private static final String JOB_CONFIG_DIR = "configs";
     private static final String ETL_JOB_NAME = "doris__%s";
+    private static final String LAUNCHER_LOG = "spark_launcher_%s_%s.log";
     // 5min
     private static final long GET_APPID_TIMEOUT_MS = 300000L;
     // 30s
@@ -79,6 +81,11 @@ public class SparkEtlJobHandler {
         // delete outputPath
         deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);
 
+        // init local dir
+        if (!FeConstants.runningUnitTest) {
+            initLocalDir();
+        }
+
         // prepare dpp archive
         SparkRepository.SparkArchive archive = resource.prepareArchive();
         SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary();
@@ -96,6 +103,8 @@ public class SparkEtlJobHandler {
         String jobArchiveHdfsPath = spark2xLibrary.remotePath;
         // spark yarn stage dir
         String jobStageHdfsPath = resource.getWorkingDir();
+        // spark launcher log path
+        String logFilePath = Config.spark_launcher_log_dir + "/" + 
String.format(LAUNCHER_LOG, loadJobId, loadLabel);
 
         // update archive and stage configs here
         Map<String, String> sparkConfigs = resource.getSparkConfigs();
@@ -143,6 +152,7 @@ public class SparkEtlJobHandler {
             if (!FeConstants.runningUnitTest) {
                 SparkLauncherMonitor.LogMonitor logMonitor = 
SparkLauncherMonitor.createLogMonitor(handle);
                 logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
+                logMonitor.setRedirectLogPath(logFilePath);
                 logMonitor.start();
                 try {
                     logMonitor.join();
@@ -299,6 +309,14 @@ public class SparkEtlJobHandler {
         return filePathToSize;
     }
 
+    public static synchronized void initLocalDir() {
+        String logDir = Config.spark_launcher_log_dir;
+        File file = new File(logDir);
+        if (!file.exists()) {
+            file.mkdirs();
+        }
+    }
+
     public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
         try {
             BrokerUtil.deletePath(outputPath, brokerDesc);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
index 628037d..9a66448 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
@@ -27,8 +27,11 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -67,6 +70,7 @@ public class SparkLauncherMonitor {
         private SparkLoadAppHandle handle;
         private long submitTimeoutMs;
         private boolean isStop;
+        private OutputStream outputStream;
 
         private static final String STATE = "state";
         private static final String QUEUE = "queue";
@@ -89,6 +93,11 @@ public class SparkLauncherMonitor {
             this.submitTimeoutMs = submitTimeoutMs;
         }
 
+        public void setRedirectLogPath(String redirectLogPath) throws 
IOException {
+            this.outputStream = new FileOutputStream(new 
File(redirectLogPath), false);
+            this.handle.setLogPath(redirectLogPath);
+        }
+
         // Normally, log monitor will automatically stop if the spark app 
state changes
         // to RUNNING.
         // But if the spark app state changes to FAILED/KILLED/LOST, log 
monitor will stop
@@ -103,7 +112,9 @@ public class SparkLauncherMonitor {
             try {
                 outReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
                 while (!isStop && (line = outReader.readLine()) != null) {
-                    LOG.info("monitor log: " + line);
+                    if (outputStream != null) {
+                        outputStream.write((line + "\n").getBytes());
+                    }
                     SparkLoadAppHandle.State oldState = handle.getState();
                     SparkLoadAppHandle.State newState = oldState;
                     // parse state and appId
@@ -186,6 +197,9 @@ public class SparkLauncherMonitor {
                     if (outReader != null) {
                         outReader.close();
                     }
+                    if (outputStream != null) {
+                        outputStream.close();
+                    }
                 } catch (IOException e) {
                     LOG.warn("close buffered reader error", e);
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
index 5d75ae9..bfe335a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java
@@ -40,6 +40,7 @@ public class SparkLoadAppHandle {
     private FinalApplicationStatus finalStatus;
     private String trackingUrl;
     private String user;
+    private String logPath;
 
     private List<Listener> listeners;
 
@@ -112,6 +113,8 @@ public class SparkLoadAppHandle {
 
     public String getUser() { return this.user; }
 
+    public String getLogPath() { return this.logPath; }
+
     public void setState(State state) {
         this.state = state;
         this.fireEvent(false);
@@ -147,6 +150,11 @@ public class SparkLoadAppHandle {
         this.fireEvent(true);
     }
 
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+        this.fireEvent(true);
+    }
+
     private void fireEvent(boolean isInfoChanged) {
         if (this.listeners != null) {
             Iterator iterator = this.listeners.iterator();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 2cb28a2..c0f4f42 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -94,6 +94,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -713,6 +714,18 @@ public class SparkLoadJob extends BulkLoadJob {
         return etlStartTimestamp;
     }
 
+    public void clearSparkLauncherLog() {
+        if (sparkLoadAppHandle != null) {
+            String logPath = sparkLoadAppHandle.getLogPath();
+            if (!Strings.isNullOrEmpty(logPath)) {
+                File file = new File(logPath);
+                if (file.exists()) {
+                    file.delete();
+                }
+            }
+        }
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         super.write(out);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
index 8e55d41..8f97618 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
@@ -18,10 +18,12 @@
 package org.apache.doris.load.loadv2;
 
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 
@@ -33,6 +35,7 @@ public class SparkLauncherMonitorTest {
     private FinalApplicationStatus finalApplicationStatus;
     private String trackingUrl;
     private String user;
+    private String logPath;
 
     @Before
     public void setUp() {
@@ -43,6 +46,7 @@ public class SparkLauncherMonitorTest {
         finalApplicationStatus = FinalApplicationStatus.UNDEFINED;
         trackingUrl = 
"http://myhost:8388/proxy/application_1573630236805_6864759/";;
         user = "testugi";
+        logPath = "./spark-launcher.log";
     }
 
     @Test
@@ -54,6 +58,7 @@ public class SparkLauncherMonitorTest {
             Process process = Runtime.getRuntime().exec(cmd);
             handle = new SparkLoadAppHandle(process);
             SparkLauncherMonitor.LogMonitor logMonitor = 
SparkLauncherMonitor.createLogMonitor(handle);
+            logMonitor.setRedirectLogPath(logPath);
             logMonitor.start();
             try {
                 logMonitor.join();
@@ -63,6 +68,7 @@ public class SparkLauncherMonitorTest {
             Assert.fail();
         }
 
+        // check values
         Assert.assertEquals(appId, handle.getAppId());
         Assert.assertEquals(state, handle.getState());
         Assert.assertEquals(queue, handle.getQueue());
@@ -70,5 +76,17 @@ public class SparkLauncherMonitorTest {
         Assert.assertEquals(finalApplicationStatus, handle.getFinalStatus());
         Assert.assertEquals(trackingUrl, handle.getUrl());
         Assert.assertEquals(user, handle.getUser());
+
+        // check log
+        File file = new File(logPath);
+        Assert.assertTrue(file.exists());
+    }
+
+    @After
+    public void clear() {
+        File file = new File(logPath);
+        if (file.exists()) {
+            file.delete();
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to