This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new d7d2064 Fix storage clean up tool
d7d2064 is described below
commit d7d20645d797aa43ee41456d61a0a96bfd42d378
Author: yaqian.zhang <[email protected]>
AuthorDate: Fri Jan 22 10:27:20 2021 +0800
Fix storage clean up tool
---
.../apache/kylin/rest/job/StorageCleanupJob.java | 27 ++++++++--------------
.../kylin/rest/job/StorageCleanupJobTest.java | 27 +++++++++++++++++++---
2 files changed, 34 insertions(+), 20 deletions(-)
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index cd7f9c5..cde1e53 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.HadoopUtil;
@@ -37,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
@@ -54,11 +56,13 @@ public class StorageCleanupJob extends AbstractApplication {
protected boolean delete = false;
+ protected static final List<String> protectedDir =
Arrays.asList("cube_statistics", "resources-jdbc");
+ protected static PathFilter pathFilter = status ->
!protectedDir.contains(status.getName());
+
public StorageCleanupJob() throws IOException {
this(KylinConfig.getInstanceFromEnv(),
HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration()));
}
-
public StorageCleanupJob(KylinConfig config, FileSystem fs) {
this.config = config;
this.fs = fs;
@@ -83,26 +87,14 @@ public class StorageCleanupJob extends AbstractApplication {
public void cleanup() throws Exception {
ProjectManager projectManager = ProjectManager.getInstance(config);
CubeManager cubeManager = CubeManager.getInstance(config);
-
- //clean up job temp files
- List<String> projects =
projectManager.listAllProjects().stream().map(ProjectInstance::getName).collect(Collectors.toList());
- for (String project : projects) {
- String tmpPath = config.getJobTmpDir(project);
- if (delete) {
- logger.info("Deleting HDFS path " + tmpPath);
- if (fs.exists(new Path(tmpPath))) {
- fs.delete(new Path(tmpPath), true);
- }
- } else {
- logger.info("Dry run, pending delete HDFS path " + tmpPath);
- }
- }
+ List<String> projects =
projectManager.listAllProjects().stream().map(ProjectInstance::getName)
+ .collect(Collectors.toList());
//clean up deleted projects and cubes
List<CubeInstance> cubes = cubeManager.listAllCubes();
Path metadataPath = new Path(config.getHdfsWorkingDirectory());
if (fs.exists(metadataPath)) {
- FileStatus[] projectStatus = fs.listStatus(metadataPath);
+ FileStatus[] projectStatus = fs.listStatus(metadataPath,
pathFilter);
if (projectStatus != null) {
for (FileStatus status : projectStatus) {
String projectName = status.getPath().getName();
@@ -114,7 +106,8 @@ public class StorageCleanupJob extends AbstractApplication {
logger.info("Dry run, pending delete HDFS path " +
status.getPath());
}
} else {
- cleanupDeletedCubes(projectName,
cubes.stream().map(CubeInstance::getName).collect(Collectors.toList()));
+ cleanupDeletedCubes(projectName,
+
cubes.stream().map(CubeInstance::getName).collect(Collectors.toList()));
}
}
}
diff --git
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
index daa9f4d..7cab98e 100644
---
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
+++
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
@@ -67,10 +67,10 @@ public class StorageCleanupJobTest {
job.execute(new String[] { "--delete", "true" });
ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
- verify(mockFs, times(5)).delete(pathCaptor.capture(), eq(true));
+ verify(mockFs, times(4)).delete(pathCaptor.capture(), eq(true));
ArrayList<Path> expected = Lists.newArrayList(
// Verify clean job temp directory
- new Path(basePath + "/default/job_tmp"),
+ // new Path(basePath + "/default/job_tmp"),
//Verify clean dropped cube
new Path(basePath + "/default/parquet/dropped_cube"),
@@ -100,6 +100,12 @@ public class StorageCleanupJobTest {
FileStatus project1 = mock(FileStatus.class);
FileStatus project2 = mock(FileStatus.class);
+ FileStatus[] protectedStatuses = new FileStatus[2];
+ FileStatus cubeStatistics = mock(FileStatus.class);
+ FileStatus resourcesJdbc = mock(FileStatus.class);
+
+ FileStatus[] allStatuses = new FileStatus[4];
+
// Remove job temp directory
Path jobTmpPath = new Path(basePath + "/default/job_tmp");
when(mockFs.exists(jobTmpPath)).thenReturn(true);
@@ -130,6 +136,20 @@ public class StorageCleanupJobTest {
projectStatuses[0] = project1;
projectStatuses[1] = project2;
+ Path cubeStatisticsPath = new Path(basePath + "/default/parquet");
+ Path resourcesJdbcPath = new Path(basePath +
"/deleted_project/parquet");
+ when(cubeStatistics.getPath()).thenReturn(cubeStatisticsPath);
+ when(resourcesJdbc.getPath()).thenReturn(resourcesJdbcPath);
+ protectedStatuses[0] = cubeStatistics;
+ protectedStatuses[1] = resourcesJdbc;
+ when(mockFs.delete(cubeStatisticsPath, true)).thenReturn(true);
+ when(mockFs.delete(resourcesJdbcPath, true)).thenReturn(true);
+
+ allStatuses[0] = project1;
+ allStatuses[1] = project2;
+ allStatuses[2] = cubeStatistics;
+ allStatuses[3] = resourcesJdbc;
+
Path defaultProjectParquetPath = new Path(basePath +
"/default/parquet");
Path deletedProjectParquetPath = new Path(basePath +
"/deleted_project/parquet");
when(mockFs.exists(defaultProjectParquetPath)).thenReturn(true);
@@ -138,6 +158,7 @@ public class StorageCleanupJobTest {
when(mockFs.exists(basePath)).thenReturn(true);
when(mockFs.listStatus(new Path(basePath +
"/default/parquet/ci_left_join_cube"))).thenReturn(segmentStatuses);
when(mockFs.listStatus(defaultProjectParquetPath)).thenReturn(cubeStatuses);
- when(mockFs.listStatus(basePath)).thenReturn(projectStatuses);
+ when(mockFs.listStatus(basePath)).thenReturn(allStatuses);
+ when(mockFs.listStatus(basePath,
StorageCleanupJob.pathFilter)).thenReturn(projectStatuses);
}
}