This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new ea7a8b4 KYLIN-4766 Delete job tmp and segment file after job be
discarded
ea7a8b4 is described below
commit ea7a8b454fd884fd504b17ec8dabcade89e76b0f
Author: yaqian.zhang <[email protected]>
AuthorDate: Fri Sep 25 14:55:41 2020 +0800
KYLIN-4766 Delete job tmp and segment file after job be discarded
---
.../apache/kylin/engine/spark/job/NSparkCubingJob.java | 17 +++++++++++++++++
.../java/org/apache/kylin/rest/service/JobService.java | 9 ++++++++-
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index c2e615d..da00f6d 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -18,6 +18,7 @@
package org.apache.kylin.engine.spark.job;
+import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
@@ -32,8 +33,10 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.base.Preconditions;
@@ -139,4 +142,18 @@ public class NSparkCubingJob extends CubingJob {
public void setCube(CubeInstance cube) {
this.cube = cube;
}
+
+ public void cleanupAfterJobDiscard() {
+ try {
+ PathManager.deleteJobTempPath(getConfig(),
getParam(MetadataConstants.P_PROJECT_NAME),
+ getParam(MetadataConstants.P_JOB_ID));
+
+ CubeManager cubeManager = CubeManager.getInstance(getConfig());
+ CubeInstance cube =
cubeManager.getCube(getParam(MetadataConstants.P_CUBE_NAME));
+ CubeSegment segment =
cube.getSegment(getParam(MetadataConstants.SEGMENT_NAME),
SegmentStatusEnum.NEW);
+ PathManager.deleteSegmentParquetStoragePath(cube, segment);
+ } catch (IOException e) {
+ logger.warn("Delete resource file failed after job be discarded,
due to", e);
+ }
+ }
}
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 1ce6521..f7be7d1 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -48,6 +48,7 @@ import org.apache.kylin.engine.mr.LookupSnapshotBuildJob;
import org.apache.kylin.engine.mr.common.CubeJobLockUtil;
import org.apache.kylin.engine.mr.common.JobInfoConverter;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.job.NSparkCubingJob;
import org.apache.kylin.engine.spark.metadata.cube.source.SourceFactory;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.JobSearchResult;
@@ -644,9 +645,15 @@ public class JobService extends BasicService implements
InitializingBean {
"The job " + job.getId() + " has already been finished and
cannot be discarded.");
}
+ AbstractExecutable executable =
getExecutableManager().getJob(job.getId());
+
if (job.getStatus() != JobStatusEnum.DISCARDED) {
- AbstractExecutable executable =
getExecutableManager().getJob(job.getId());
if (executable instanceof CubingJob) {
+ //Clean up job tmp and segment storage from hdfs after job be
discarded
+ if (executable instanceof NSparkCubingJob) {
+ ((NSparkCubingJob) executable).cleanupAfterJobDiscard();
+ }
+
cancelCubingJobInner((CubingJob) executable);
//release global mr hive dict lock if exists
if (executable.getStatus().isFinalState()) {