This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new beabeaa  KYLIN-4754 The Cleanup tool cannot clean the the parquet file 
of the deleted cube and project
beabeaa is described below

commit beabeaa2ec9b81c06a60d5ca62e639de0b660358
Author: yaqian.zhang <[email protected]>
AuthorDate: Tue Sep 15 14:05:18 2020 +0800

    KYLIN-4754 The Cleanup tool cannot clean the the parquet file of the 
deleted cube and project
---
 .../apache/kylin/rest/job/StorageCleanupJob.java   | 52 +++++++++++++++++--
 .../kylin/rest/job/StorageCleanupJobTest.java      | 58 ++++++++++++++++++----
 2 files changed, 95 insertions(+), 15 deletions(-)

diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java 
b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index c23ad9a..cd7f9c5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -98,8 +98,29 @@ public class StorageCleanupJob extends AbstractApplication {
             }
         }
 
-        //clean up no used segments
+        //clean up deleted projects and cubes
         List<CubeInstance> cubes = cubeManager.listAllCubes();
+        Path metadataPath = new Path(config.getHdfsWorkingDirectory());
+        if (fs.exists(metadataPath)) {
+            FileStatus[] projectStatus = fs.listStatus(metadataPath);
+            if (projectStatus != null) {
+                for (FileStatus status : projectStatus) {
+                    String projectName = status.getPath().getName();
+                    if (!projects.contains(projectName)) {
+                        if (delete) {
+                            logger.info("Deleting HDFS path " + 
status.getPath());
+                            fs.delete(status.getPath(), true);
+                        } else {
+                            logger.info("Dry run, pending delete HDFS path " + 
status.getPath());
+                        }
+                    } else {
+                        cleanupDeletedCubes(projectName, 
cubes.stream().map(CubeInstance::getName).collect(Collectors.toList()));
+                    }
+                }
+            }
+        }
+
+        //clean up no used segments
         for (CubeInstance cube : cubes) {
             List<String> segments = cube.getSegments().stream().map(segment -> 
{
                 return segment.getName() + "_" + 
segment.getStorageLocationIdentifier();
@@ -109,9 +130,9 @@ public class StorageCleanupJob extends AbstractApplication {
             //list all segment directory
             Path cubePath = new Path(config.getHdfsWorkingDirectory(project) + 
"/parquet/" + cube.getName());
             if (fs.exists(cubePath)) {
-                FileStatus[] fStatus = fs.listStatus(cubePath);
-                if (fStatus != null) {
-                    for (FileStatus status : fStatus) {
+                FileStatus[] segmentStatus = fs.listStatus(cubePath);
+                if (segmentStatus != null) {
+                    for (FileStatus status : segmentStatus) {
                         String segment = status.getPath().getName();
                         if (!segments.contains(segment)) {
                             if (delete) {
@@ -128,4 +149,27 @@ public class StorageCleanupJob extends AbstractApplication 
{
             }
         }
     }
+
+    private void cleanupDeletedCubes(String project, List<String> cubes) 
throws Exception {
+        //clean up deleted cubes
+        Path parquetPath = new Path(config.getHdfsWorkingDirectory(project) + 
"/parquet");
+        if (fs.exists(parquetPath)) {
+            FileStatus[] cubeStatus = fs.listStatus(parquetPath);
+            if (cubeStatus != null) {
+                for (FileStatus status : cubeStatus) {
+                    if (status.getPath() != null) {
+                        String cubeName = status.getPath().getName();
+                        if (!cubes.contains(cubeName)) {
+                            if (delete) {
+                                logger.info("Deleting HDFS path " + 
status.getPath());
+                                fs.delete(status.getPath(), true);
+                            } else {
+                                logger.info("Dry run, pending delete HDFS path 
" + status.getPath());
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
 }
diff --git 
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
 
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
index eb5641d..4836fb3 100644
--- 
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
+++ 
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
@@ -67,11 +67,17 @@ public class StorageCleanupJobTest {
         job.execute(new String[] { "--delete", "true" });
 
         ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
-        verify(mockFs, times(3)).delete(pathCaptor.capture(), eq(true));
+        verify(mockFs, times(5)).delete(pathCaptor.capture(), eq(true));
         ArrayList<Path> expected = Lists.newArrayList(
                 // Verify clean job temp directory
                 new Path(basePath + "/default/job_tmp"),
 
+                //Verify clean dropped cube
+                new Path(basePath + "/default/parquet/dropped_cube"),
+
+                //Verify clean deleted project
+                new Path(basePath + "/deleted_project"),
+
                 // Verify clean none used segments
                 new Path(basePath + 
"/default/parquet/ci_left_join_cube/20120101000000_20130101000000_VRC"),
                 new Path(basePath + 
"/default/parquet/ci_left_join_cube/20130101000000_20140101000000_PCN")
@@ -81,27 +87,57 @@ public class StorageCleanupJobTest {
 
     private void prepareHDFSFiles(Path basePath, FileSystem mockFs) throws 
IOException {
 
-        FileStatus[] statuses = new FileStatus[2];
-        FileStatus f1 = mock(FileStatus.class);
-        FileStatus f2 = mock(FileStatus.class);
+        FileStatus[] segmentStatuses = new FileStatus[2];
+        FileStatus segment1 = mock(FileStatus.class);
+        FileStatus segment2 = mock(FileStatus.class);
 
-        // Remove job temp directory
+        FileStatus[] cubeStatuses = new FileStatus[3];
+        FileStatus cube1 = mock(FileStatus.class);
+        FileStatus cube2 = mock(FileStatus.class);
+        FileStatus cube3 = mock(FileStatus.class);
 
+        FileStatus[] projectStatuses = new FileStatus[2];
+        FileStatus project1 = mock(FileStatus.class);
+        FileStatus project2 = mock(FileStatus.class);
+
+        // Remove job temp directory
         Path jobTmpPath = new Path(basePath + "/default/job_tmp");
         when(mockFs.exists(jobTmpPath)).thenReturn(true);
         when(mockFs.delete(jobTmpPath, true)).thenReturn(true);
 
         // remove every segment working dir from deletion list, so this 
exclude.
-        when(f1.getPath()).thenReturn(new Path(basePath + 
"/default/parquet/ci_left_join_cube/20120101000000_20130101000000_VRC"));
-        when(f2.getPath()).thenReturn(new Path(basePath + 
"/default/parquet/ci_left_join_cube/20130101000000_20140101000000_PCN"));
-        statuses[0] = f1;
-        statuses[1] = f2;
+        when(segment1.getPath()).thenReturn(new Path(basePath + 
"/default/parquet/ci_left_join_cube/20120101000000_20130101000000_VRC"));
+        when(segment2.getPath()).thenReturn(new Path(basePath + 
"/default/parquet/ci_left_join_cube/20130101000000_20140101000000_PCN"));
+        segmentStatuses[0] = segment1;
+        segmentStatuses[1] = segment2;
 
         Path cubePath1 = new Path(basePath + 
"/default/parquet/ci_left_join_cube");
         Path cubePath2 = new Path(basePath + 
"/default/parquet/ci_inner_join_cube");
+        Path cubePath3 = new Path(basePath + "/default/parquet/dropped_cube");
         when(mockFs.exists(cubePath1)).thenReturn(true);
         when(mockFs.exists(cubePath2)).thenReturn(false);
-
-        when(mockFs.listStatus(new Path(basePath + 
"/default/parquet/ci_left_join_cube"))).thenReturn(statuses);
+        when(mockFs.exists(cubePath3)).thenReturn(true);
+
+        when(cube1.getPath()).thenReturn(cubePath1);
+        when(cube2.getPath()).thenReturn(cubePath2);
+        when(cube3.getPath()).thenReturn(cubePath3);
+        cubeStatuses[0] = cube1;
+        cubeStatuses[1] = cube2;
+        cubeStatuses[2] = cube3;
+
+        when(project1.getPath()).thenReturn(new Path(basePath + "/default"));
+        when(project2.getPath()).thenReturn(new Path(basePath + 
"/deleted_project"));
+        projectStatuses[0] = project1;
+        projectStatuses[1] = project2;
+
+        Path defaultProjectParquetPath = new Path(basePath + 
"/default/parquet");
+        Path deletedProjectParquetPath = new Path(basePath + 
"/deleted_project/parquet");
+        when(mockFs.exists(defaultProjectParquetPath)).thenReturn(true);
+        when(mockFs.exists(deletedProjectParquetPath)).thenReturn(true);
+
+        when(mockFs.exists(basePath)).thenReturn(true);
+        when(mockFs.listStatus(new Path(basePath + 
"/default/parquet/ci_left_join_cube"))).thenReturn(segmentStatuses);
+        
when(mockFs.listStatus(defaultProjectParquetPath)).thenReturn(cubeStatuses);
+        when(mockFs.listStatus(basePath)).thenReturn(projectStatuses);
     }
 }

Reply via email to