This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new 33c068f KYLIN-4766 Cleanup segment storage after job be discarded
(#1716)
33c068f is described below
commit 33c068fd66a028a174a15acb28bbca8394903f28
Author: Yaqian Zhang <[email protected]>
AuthorDate: Sat Aug 7 13:20:55 2021 +0800
KYLIN-4766 Cleanup segment storage after job be discarded (#1716)
* KYLIN-4766 Cleanup segment storage after job be discarded
* fix
---
.../org/apache/kylin/engine/spark/metadata/cube/PathManager.java | 7 +++----
.../java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java | 6 ++----
.../spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java | 2 +-
.../src/main/java/org/apache/kylin/rest/service/CubeService.java | 2 +-
.../src/main/java/org/apache/kylin/rest/service/JobService.java | 8 +++++---
5 files changed, 12 insertions(+), 13 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
index 6444715..0d34451 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
+++
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
@@ -58,12 +58,11 @@ public final class PathManager {
/**
* Delete segment path
*/
- public static boolean deleteSegmentParquetStoragePath(CubeInstance cube,
CubeSegment segment) throws IOException {
- if (cube == null || segment == null) {
+ public static boolean deleteSegmentParquetStoragePath(CubeInstance cube,
String segmentName, String identifier) throws IOException {
+ if (cube == null || StringUtils.isNoneBlank(segmentName)||
StringUtils.isNoneBlank(identifier)) {
return false;
}
- String path = getSegmentParquetStoragePath(cube, segment.getName(),
- segment.getStorageLocationIdentifier());
+ String path = getSegmentParquetStoragePath(cube, segmentName,
identifier);
logger.info("Deleting segment parquet path {}", path);
HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new
Path(path));
return true;
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index 886e476..1b196ba 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -36,7 +36,6 @@ import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,15 +142,14 @@ public class NSparkCubingJob extends CubingJob {
this.cube = cube;
}
- public void cleanupAfterJobDiscard() {
+ public void cleanupAfterJobDiscard(String segmentName, String
segmentIdentifier) {
try {
PathManager.deleteJobTempPath(getConfig(),
getParam(MetadataConstants.P_PROJECT_NAME),
getParam(MetadataConstants.P_JOB_ID));
CubeManager cubeManager = CubeManager.getInstance(getConfig());
CubeInstance cube =
cubeManager.getCube(getParam(MetadataConstants.P_CUBE_NAME));
- CubeSegment segment =
cube.getSegment(getParam(MetadataConstants.SEGMENT_NAME),
SegmentStatusEnum.NEW);
- PathManager.deleteSegmentParquetStoragePath(cube, segment);
+ PathManager.deleteSegmentParquetStoragePath(cube, segmentName,
segmentIdentifier);
} catch (IOException e) {
logger.warn("Delete resource file failed after job be discarded,
due to", e);
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
index d01f714..8c12f4e 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
@@ -59,7 +59,7 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends
NSparkExecutable {
// delete segments which were merged
for (CubeSegment segment : mergingSegments) {
try {
- PathManager.deleteSegmentParquetStoragePath(cube, segment);
+ PathManager.deleteSegmentParquetStoragePath(cube,
segment.getName(), segment.getStorageLocationIdentifier());
} catch (IOException e) {
throw new ExecuteException("Can not delete segment: " +
segment.getName() + ", in cube: " + cube.getName());
}
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 6fcdc75..4fdbf1f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -669,7 +669,7 @@ public class CubeService extends BasicService implements
InitializingBean {
if (toRemoveSegs != null && !toRemoveSegs.isEmpty()) {
for (CubeSegment segment : toRemoveSegs) {
- PathManager.deleteSegmentParquetStoragePath(cube, segment);
+ PathManager.deleteSegmentParquetStoragePath(cube,
segment.getName(), segment.getStorageLocationIdentifier());
}
}
}
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index d5b926f..eb28958 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -688,12 +688,14 @@ public class JobService extends BasicService implements
InitializingBean {
if (job.getStatus() != JobStatusEnum.DISCARDED) {
if (executable instanceof CubingJob) {
+ String segmentName = job.getRelatedSegmentName();
+ CubeSegment segment =
getCubeManager().getCube(job.getRelatedCube()).getSegment(segmentName,
SegmentStatusEnum.NEW);
+ String segmentIdentifier =
segment.getStorageLocationIdentifier();
+ cancelCubingJobInner((CubingJob) executable);
//Clean up job tmp and segment storage from hdfs after job be
discarded
if (executable instanceof NSparkCubingJob) {
- ((NSparkCubingJob) executable).cleanupAfterJobDiscard();
+ ((NSparkCubingJob)
executable).cleanupAfterJobDiscard(segmentName, segmentIdentifier);
}
-
- cancelCubingJobInner((CubingJob) executable);
//release global mr hive dict lock if exists
if (executable.getStatus().isFinalState()) {
try {