This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 508fe2e366 KYLIN-5953 Minor bugfixes of InternalTable
508fe2e366 is described below

commit 508fe2e366da5382ee5b388a97347c0ff33ccb3d
Author: Zhimin Wu <[email protected]>
AuthorDate: Tue Jul 16 09:37:56 2024 +0800

    KYLIN-5953 Minor bugfixes of InternalTable
    
    A followUp issue of KYLIN-5948:
    1. Add table properties for internal table list
    2. Optimized delta delete
    3. Restore build script
    4. Skip validation only if source table is empty
    5. Optimized the execution time of deletion tasks
    6. Set jobName for internal table job
    7. Bugfix empty internal table update
    
    ---------
    Co-authored-by: Zhiting Guo <[email protected]>
    Co-authored-by: huangsheng <[email protected]>
    Co-authored-by: Zhimin Wu <[email protected]>
    Co-authored-by: 7mming7 <[email protected]>
---
 .../metadata/cube/cuboid/NLayoutCandidate.java     |   1 +
 .../rest/service/InternalTableServiceTest.java     | 165 +++++++++++++++++++--
 .../job/service/InternalTableLoadingService.java   |  42 ++++--
 .../rest/response/InternalTableDescResponse.java   |   5 +
 .../kylin/rest/service/InternalTableService.java   |  82 +++++++---
 .../rest/controller/InternalTableController.java   |   2 +-
 .../engine/spark/job/InternalTableLoadingJob.java  |   1 +
 .../engine/spark/builder/InternalTableLoader.scala |  17 ++-
 .../engine/spark/job/InternalTableLoadJob.java     |  54 ++++---
 9 files changed, 291 insertions(+), 78 deletions(-)

diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
index 8f47180b4c..0aac2056bf 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
@@ -65,6 +65,7 @@ public class NLayoutCandidate implements 
IRealizationCandidate {
         Preconditions.checkNotNull(layoutEntity);
         Preconditions.checkNotNull(dataLayoutDetails);
         this.layoutEntity = layoutEntity;
+        this.layoutId = layoutEntity.getId();
         this.dataLayoutDetails = dataLayoutDetails;
     }
 
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
index 071cc88c4c..22cf942423 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.service;
 
+import static org.apache.kylin.common.exception.QueryErrorCode.EMPTY_TABLE;
 import static org.awaitility.Awaitility.await;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -26,12 +27,15 @@ import static org.mockito.Mockito.when;
 import java.io.File;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kylin.common.AbstractTestCase;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.persistence.transaction.TransactionException;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase;
@@ -52,10 +56,12 @@ import 
org.apache.kylin.rest.response.InternalTableLoadingJobResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.SparkSession;
+import org.junit.Assert;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
@@ -101,7 +107,6 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
                 .setAuthentication(new TestingAuthenticationToken("ADMIN", 
"ADMIN", Constant.ROLE_ADMIN));
         SparkJobFactoryUtils.initJobFactory();
         overwriteSystemProp("kylin.source.provider.9", 
"org.apache.kylin.engine.spark.mockup.CsvSource");
-        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyy-MM-dd");
     }
 
     @Test
@@ -109,7 +114,7 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
         TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
-
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyy-MM-dd");
         // null value is valid
         internalTableService.checkParameters(null, table, null);
 
@@ -140,6 +145,28 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
 
     }
 
+    @Test
+    void testCheckParamsWithEmptySourceTable() throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
+        TableDesc table = tManager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenThrow(new KylinException(
+                EMPTY_TABLE, String.format(Locale.ROOT, 
MsgPicker.getMsg().getNoDataInTable(), table)));
+        // test right date format with source table data is empty
+        internalTableService.checkParameters(new String[] { "TRANS_ID", 
"CAL_DT" }, table, "yyyy-MM-dd");
+    }
+
+    @Test
+    void testCheckParamsWithFatalError() throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
+        TableDesc table = tManager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), any()))
+                .thenThrow(new IllegalStateException("fatal error"));
+        Assertions.assertThrows(IllegalStateException.class,
+                () -> internalTableService.checkParameters(new String[] { 
"TRANS_ID", "CAL_DT" }, table, "yyyy-MM-dd"));
+    }
+
     @Test
     void testCreateInternalTable() throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
@@ -148,6 +175,7 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
         TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
         String[] partitionCols = new String[] { DATE_COL };
         Map<String, String> tblProperties = new HashMap<>();
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyy-MM-dd");
         internalTableService.createInternalTable(PROJECT, table.getName(), 
table.getDatabase(), partitionCols,
                 "yyyy-MM-dd", tblProperties, 
InternalTableDesc.StorageType.PARQUET.name());
         InternalTableDesc internalTable = 
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
@@ -175,12 +203,39 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
     }
 
     @Test
-    void testUpdateInternalTable() throws Exception {
+    void testCreateDeltaInternalTable() throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
         InternalTableManager internalTableManager = 
InternalTableManager.getInstance(config, PROJECT);
         TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+        String[] partitionCols = new String[] { DATE_COL };
+        Map<String, String> tblProperties = new HashMap<>();
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyy-MM-dd");
+        internalTableService.createInternalTable(PROJECT, table.getName(), 
table.getDatabase(), partitionCols,
+                "yyyy-MM-dd", tblProperties, 
InternalTableDesc.StorageType.DELTALAKE.name());
+        InternalTableDesc internalTable = 
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+        Assertions.assertNotNull(internalTable);
+        String workingDir = 
config.getHdfsWorkingDirectory().replace("file://", "");
+        File internalTableFolder = new File(workingDir, INTERNAL_DIR);
+        Assertions.assertTrue(internalTableFolder.exists() && 
internalTableFolder.isDirectory());
+        // test create duplicated internal table
+        Assertions.assertThrows(TransactionException.class,
+                () -> internalTableService.createInternalTable(PROJECT, 
table.getName(), table.getDatabase(),
+                        partitionCols, "yyyy-MM-dd", tblProperties, 
InternalTableDesc.StorageType.DELTALAKE.name()));
+        // test create internal table without tableDesc
+        Assertions.assertThrows(TransactionException.class,
+                () -> internalTableService.createInternalTable(PROJECT, 
table.getName() + "_xxx", table.getDatabase(),
+                        partitionCols, "yyyy-MM-dd", tblProperties, 
InternalTableDesc.StorageType.DELTALAKE.name()));
+        internalTableService.dropInternalTable(PROJECT, TABLE_INDENTITY);
+    }
 
+    @Test
+    void testUpdateInternalTable() throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
+        InternalTableManager internalTableManager = 
InternalTableManager.getInstance(config, PROJECT);
+        TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyy-MM-dd");
         internalTableService.createInternalTable(PROJECT, table.getName(), 
table.getDatabase(), new String[] {}, null,
                 new HashMap<>(), InternalTableDesc.StorageType.PARQUET.name());
         InternalTableDesc internalTable = 
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
@@ -228,7 +283,7 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
         // test update an internal table which has loaded data.
         UnitOfWork.doInTransactionWithRetry(() -> {
             InternalTableManager manager = 
InternalTableManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
-            manager.updateInternalTable(TABLE_INDENTITY, copyForWrite -> 
copyForWrite.setStorageSize(1L));
+            manager.updateInternalTable(TABLE_INDENTITY, copyForWrite -> 
copyForWrite.setRowCount(1L));
             return null;
         }, PROJECT);
 
@@ -278,17 +333,15 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
         // check internal table data exist
         String workingDir = 
config.getHdfsWorkingDirectory().replace("file://", "");
         File internalTableFolder = new File(workingDir, INTERNAL_DIR);
-        Assertions.assertEquals(1, internalTableFolder.list().length);
+        Assertions.assertEquals(1, 
Objects.requireNonNull(internalTableFolder.list()).length);
 
         // check query
         SparkSession ss = SparderEnv.getSparkSession();
         Assertions.assertFalse(ss.sql(BASE_SQL).isEmpty());
 
-        // check truncate
-        response = internalTableService.truncateInternalTable(PROJECT, 
TABLE_INDENTITY);
-        jobId = response.getJobs().get(0).getJobId();
-        waitJobToFinished(config, jobId);
-        Assertions.assertEquals(0, internalTableFolder.list().length);
+        // check truncate,will delete old path and not create empty schema for 
parquet format
+        internalTableService.truncateInternalTable(PROJECT, TABLE_INDENTITY);
+        Assertions.assertFalse(internalTableFolder.exists());
 
         // double truncate
         response = internalTableService.truncateInternalTable(PROJECT, 
TABLE_INDENTITY);
@@ -313,7 +366,7 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
 
         NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
         TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
-
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyy-MM-dd");
         internalTableService.createInternalTable(PROJECT, TABLE_INDENTITY, new 
String[] { DATE_COL }, "yyyy-MM-dd",
                 new HashMap<>(), InternalTableDesc.StorageType.PARQUET.name());
         String startDate = "1325347200000"; // 2012-01-01
@@ -343,9 +396,7 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
 
         // remove some partitions and check
         String[] toDeletePartitions = new String[] { "2012-01-03", 
"2012-01-04" };
-        response = internalTableService.dropPartitionsOnDeltaTable(PROJECT, 
TABLE_INDENTITY, toDeletePartitions, null);
-        jobId = response.getJobs().get(0).getJobId();
-        waitJobToFinished(config, jobId);
+        internalTableService.dropPartitionsOnDeltaTable(PROJECT, 
TABLE_INDENTITY, toDeletePartitions, null);
         Assertions.assertEquals(6 - toDeletePartitions.length, 
internalTableFolder.list().length);
         long newCount = ss.sql(BASE_SQL).count();
         Assertions.assertTrue(newCount > 0 && newCount < count);
@@ -355,6 +406,90 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
         Assertions.assertFalse(internalTableFolder.exists());
     }
 
+    @Test
+    void testDropNotExistsTablePartition() {
+        // remove some partitions and check
+        String[] toDeletePartitions = new String[] { "2012-01-03", 
"2012-01-04" };
+        Assert.assertThrows(KylinException.class, () -> {
+            internalTableService.dropPartitionsOnDeltaTable(PROJECT, 
"DEFAULT.TEST_KYLIN_FACT_NOT", toDeletePartitions,
+                    null);
+        });
+    }
+
+    @Test
+    void testDropNonPartitionTablePartition() throws Exception {
+        internalTableService.createInternalTable(PROJECT, TABLE_INDENTITY, 
null, null, new HashMap<>(),
+                InternalTableDesc.StorageType.PARQUET.name());
+        String[] toDeletePartitions = new String[] { "2012-01-03", 
"2012-01-04" };
+        Assert.assertThrows(KylinException.class, () -> {
+            internalTableService.dropPartitionsOnDeltaTable(PROJECT, 
"DEFAULT.TEST_KYLIN_FACT_NOT", toDeletePartitions,
+                    null);
+        });
+    }
+
+    @Test
+    void testTruncatePartitionInternalTable() throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
+        InternalTableManager internalTableManager = 
InternalTableManager.getInstance(config, PROJECT);
+        TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+        String[] partitionCols = new String[] { DATE_COL };
+        Map<String, String> tblProperties = new HashMap<>();
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyy-MM-dd");
+        internalTableService.createInternalTable(PROJECT, table.getName(), 
table.getDatabase(), partitionCols,
+                "yyyy-MM-dd", tblProperties, 
InternalTableDesc.StorageType.PARQUET.name());
+        InternalTableLoadingJobResponse response = 
internalTableService.loadIntoInternalTable(PROJECT, table.getName(),
+                table.getDatabase(), false, false, "", "", null);
+        String jobId = response.getJobs().get(0).getJobId();
+        waitJobToFinished(config, jobId);
+
+        InternalTableDesc internalTableDesc = 
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+        // check internal table data exist
+        String workingDir = 
config.getHdfsWorkingDirectory().replace("file://", "");
+        File internalTableFolder = new File(workingDir, INTERNAL_DIR);
+        // parquet format not have meta dir, so the partition size should 
equal file size
+        
Assertions.assertEquals(internalTableDesc.getTablePartition().getPartitionDetails().size(),
+                Objects.requireNonNull(internalTableFolder.list()).length);
+
+        // check truncate,will delete old path and not create empty schema for 
parquet format
+        internalTableService.truncateInternalTable(PROJECT, TABLE_INDENTITY);
+        Assertions.assertFalse(internalTableFolder.exists());
+        InternalTableDesc afterTruncateTable = 
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+        Assertions.assertEquals(-1, afterTruncateTable.getStorageSize());
+    }
+
+    @Disabled("gluten not support deltaLake yet.")
+    @Test
+    void testTruncatePartitionDeltaInternalTable() throws Exception {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
+        InternalTableManager internalTableManager = 
InternalTableManager.getInstance(config, PROJECT);
+        TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+        String[] partitionCols = new String[] { DATE_COL };
+        Map<String, String> tblProperties = new HashMap<>();
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyy-MM-dd");
+        internalTableService.createInternalTable(PROJECT, table.getName(), 
table.getDatabase(), partitionCols,
+                "yyyy-MM-dd", tblProperties, 
InternalTableDesc.StorageType.DELTALAKE.name());
+        InternalTableLoadingJobResponse response = 
internalTableService.loadIntoInternalTable(PROJECT, table.getName(),
+                table.getDatabase(), false, false, "", "", null);
+        String jobId = response.getJobs().get(0).getJobId();
+        waitJobToFinished(config, jobId);
+
+        InternalTableDesc internalTableDesc = 
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+        // check internal table data exist
+        String workingDir = 
config.getHdfsWorkingDirectory().replace("file://", "");
+        File internalTableFolder = new File(workingDir, INTERNAL_DIR);
+        // delta format have meta dir, so the partition size + 1 should equal 
file size
+        
Assertions.assertEquals(internalTableDesc.getTablePartition().getPartitionDetails().size()
 + 1,
+                Objects.requireNonNull(internalTableFolder.list()).length);
+
+        // check truncate,will create a new empty schema dir for delta format
+        internalTableService.truncateInternalTable(PROJECT, TABLE_INDENTITY);
+        Assertions.assertTrue(internalTableFolder.exists());
+        InternalTableDesc afterTruncateTable = 
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+        Assertions.assertTrue(afterTruncateTable.getStorageSize() > 0);
+    }
+
     private void waitJobToFinished(KylinConfig config, String jobId) {
         ExecutableManager executableManager = 
ExecutableManager.getInstance(config, PROJECT);
         await().atMost(5, TimeUnit.MINUTES).until(() -> {
@@ -370,7 +505,7 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         NTableMetadataManager tManager = 
NTableMetadataManager.getInstance(config, PROJECT);
         TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
-
+        when(tableService.getPartitionColumnFormat(any(), any(), any(), 
any())).thenReturn("yyyy-MM-dd");
         List<InternalTablePartitionDetail> details = 
internalTableService.getTableDetail(PROJECT, table.getDatabase(),
                 table.getName());
         Assertions.assertNull(details);
diff --git 
a/src/datasource-service/src/main/java/org/apache/kylin/job/service/InternalTableLoadingService.java
 
b/src/datasource-service/src/main/java/org/apache/kylin/job/service/InternalTableLoadingService.java
index b299392b0d..7752804fcb 100644
--- 
a/src/datasource-service/src/main/java/org/apache/kylin/job/service/InternalTableLoadingService.java
+++ 
b/src/datasource-service/src/main/java/org/apache/kylin/job/service/InternalTableLoadingService.java
@@ -21,19 +21,19 @@ package org.apache.kylin.job.service;
 import static 
org.apache.kylin.common.exception.ServerErrorCode.INTERNAL_TABLE_ERROR;
 import static 
org.apache.kylin.common.exception.ServerErrorCode.INTERNAL_TABLE_NOT_EXIST;
 import static org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_BUILD;
-import static 
org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_DELETE_PARTITION;
 import static 
org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_REFRESH;
 
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.engine.spark.builder.InternalTableLoader;
+import org.apache.kylin.engine.spark.job.InternalTableLoadJob;
 import org.apache.kylin.job.dao.JobStatisticsManager;
 import org.apache.kylin.job.execution.JobTypeEnum;
 import org.apache.kylin.job.manager.JobManager;
@@ -43,8 +43,11 @@ import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
 import org.apache.kylin.metadata.table.InternalTableDesc;
 import org.apache.kylin.metadata.table.InternalTableManager;
+import org.apache.kylin.metadata.table.InternalTablePartition;
 import org.apache.kylin.rest.response.InternalTableLoadingJobResponse;
 import org.apache.kylin.rest.service.BasicService;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
@@ -89,21 +92,32 @@ public class InternalTableLoadingService extends 
BasicService {
     }
 
     public InternalTableLoadingJobResponse dropPartitions(String project, 
String[] partitionValues,
-            String tableIdentity, String yarnQueue) {
-        List<String> jobIds = new ArrayList<>();
+            String tableIdentity, String yarnQueue) throws IOException {
+        // If internal table can not be obtained, will throw a kylin exception
         InternalTableDesc internalTable = checkAndGetInternalTables(project, 
tableIdentity);
+        InternalTableLoader internalTableLoader = new InternalTableLoader();
+        String toBeDelete = String.join(",", partitionValues);
+        SparkSession ss = SparderEnv.getSparkSession();
+        long start = System.currentTimeMillis();
+        logger.info("Start to drop partition for table {}", tableIdentity);
+        internalTableLoader.dropPartitions(ss, internalTable, toBeDelete);
+        InternalTableLoadJob internalTableLoadJob = new InternalTableLoadJob();
+        InternalTableLoadJob.InternalTableMetaUpdateInfo info = 
internalTableLoadJob.extractUpdateInfo(project,
+                internalTable.getIdentity(), getConfig(), ss);
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-            String toBeDelete = 
Arrays.stream(partitionValues).collect(Collectors.joining(","));
-            logger.info("drop partitions for table: {}, partitions: {}", 
internalTable.getIdentity(), toBeDelete);
-            JobParam jobParam = new 
JobParam().withProject(project).withOwner(BasicService.getUsername())
-                    
.withTable(internalTable.getIdentity()).withYarnQueue(yarnQueue)
-                    .withJobTypeEnum(INTERNAL_TABLE_DELETE_PARTITION)
-                    .addExtParams(NBatchConstants.P_DELETE_PARTITION, "true")
-                    .addExtParams(NBatchConstants.P_DELETE_PARTITION_VALUES, 
toBeDelete);
-            jobIds.add(getManager(JobManager.class, project).addJob(jobParam));
+            InternalTableManager internalTableManager = 
InternalTableManager.getInstance(getConfig(), project);
+            InternalTableDesc oldTable = checkAndGetInternalTables(project, 
tableIdentity);
+            InternalTablePartition tablePartition = 
oldTable.getTablePartition();
+            tablePartition.setPartitionValues(info.getPartitionValues());
+            tablePartition.setPartitionDetails(info.getPartitionDetails());
+            oldTable.setRowCount(info.getFinalCount());
+            internalTableManager.saveOrUpdateInternalTable(oldTable);
             return true;
         }, project);
-        return InternalTableLoadingJobResponse.of(jobIds, 
INTERNAL_TABLE_DELETE_PARTITION.toString());
+
+        logger.info("Successfully drop partition[{}] for table {} in {} ms", 
toBeDelete, tableIdentity,
+                System.currentTimeMillis() - start);
+        return InternalTableLoadingJobResponse.of(new ArrayList<>(), "");
     }
 
     private InternalTableDesc checkAndGetInternalTables(String project, String 
tableIdentity) {
diff --git 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/InternalTableDescResponse.java
 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/response/InternalTableDescResponse.java
index 2c6992113b..b4e9c4fe8b 100644
--- 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/InternalTableDescResponse.java
+++ 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/response/InternalTableDescResponse.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.rest.response;
 
+import java.util.Map;
+
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 import lombok.Data;
@@ -52,4 +54,7 @@ public class InternalTableDescResponse {
     @JsonProperty("update_time")
     private long updateTime;
 
+    @JsonProperty("tbl_properties")
+    private Map<String, String> tblProperties;
+
 }
diff --git 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
index e746e22515..23980b506f 100644
--- 
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
+++ 
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
@@ -25,8 +25,8 @@ import static 
org.apache.kylin.common.exception.ServerErrorCode.INVALID_INTERNAL
 import static 
org.apache.kylin.common.exception.ServerErrorCode.TABLE_NOT_EXIST;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import lombok.val;
 import scala.Option;
 
 @Service("internalTableService")
@@ -84,6 +85,7 @@ public class InternalTableService extends BasicService {
 
     /**
      * Create an internal table from an existing table
+     *
      * @param projectName
      * @param table
      * @param database
@@ -151,20 +153,29 @@ public class InternalTableService extends BasicService {
             if (StringUtils.isEmpty(datePartitionFormat) && 
dateCol.isPresent()) {
                 throw new KylinException(EMPTY_PARAMETER, 
"date_partition_format can not be null, please check again");
             }
-            // detect date partition format
-            if (dateCol.isPresent() && 
!StringUtils.isEmpty(datePartitionFormat)
-                    && 
!tableService.getPartitionColumnFormat(originTable.getProject(), 
originTable.getIdentity(),
-                            dateCol.get().getName(), 
null).equals(datePartitionFormat)) {
-                String errorMsg = String.format(Locale.ROOT, 
MsgPicker.getMsg().getIncorrectDateformat(),
-                        datePartitionFormat);
-                throw new KylinException(INVALID_INTERNAL_TABLE_PARAMETER, 
errorMsg);
 
+            if (dateCol.isPresent() && 
!StringUtils.isEmpty(datePartitionFormat)) {
+                boolean isFormatMatchRealDataFormat = true;
+                try {
+                    // If the source table is empty, the true format cannot be 
obtained
+                    isFormatMatchRealDataFormat = 
tableService.getPartitionColumnFormat(originTable.getProject(),
+                            originTable.getIdentity(), 
dateCol.get().getName(), null).equals(datePartitionFormat);
+                } catch (KylinException kylinException) {
+                    logger.warn("Cannot get the real data format, skip the 
date format check", kylinException);
+                    // other non kylin-exception will throw out
+                }
+                if (!isFormatMatchRealDataFormat) {
+                    String errorMsg = String.format(Locale.ROOT, 
MsgPicker.getMsg().getIncorrectDateformat(),
+                            datePartitionFormat);
+                    throw new KylinException(INVALID_INTERNAL_TABLE_PARAMETER, 
errorMsg);
+                }
             }
         }
     }
 
     public void createDeltaSchema(InternalTableDesc internalTable) throws 
IOException {
-        if (internalTable.getStorageType() == 
InternalTableDesc.StorageType.GLUTEN) {
+        if (internalTable.getStorageType() == 
InternalTableDesc.StorageType.GLUTEN
+                || internalTable.getStorageType() == 
InternalTableDesc.StorageType.DELTALAKE) {
             Option<SparkSession> defaultSession = 
SparkSession.getDefaultSession();
             InternalTableLoader internalTableLoader = new 
InternalTableLoader();
             internalTableLoader.onlyLoadSchema(true);
@@ -176,6 +187,7 @@ public class InternalTableService extends BasicService {
     /**
      * create an internal table with source table
      * and no partition columns and table properties specified
+     *
      * @param project
      * @param originTable
      * @param storageType
@@ -198,8 +210,8 @@ public class InternalTableService extends BasicService {
                 String errorMsg = String.format(Locale.ROOT, 
MsgPicker.getMsg().getInternalTableNotFound(), dbTblName);
                 throw new KylinException(INTERNAL_TABLE_NOT_EXIST, errorMsg);
             }
-            if (internalTable.getStorageSize() > 0L) {
-                throw new KylinException(INTERNAL_TABLE_ERROR, "Loaded 
internal table can not be updated");
+            if (internalTable.getRowCount() > 0L) {
+                throw new KylinException(INTERNAL_TABLE_ERROR, "Non-empty 
internal table can not be updated");
             }
             checkParameters(partitionCols, originTable, datePartitionFormat);
             if (partitionCols != null && partitionCols.length != 0) {
@@ -211,6 +223,7 @@ public class InternalTableService extends BasicService {
             internalTable.setTblProperties(tblProperties);
             internalTable.optimizeTblProperties();
             internalTable.setStorageType(storageType);
+            suicideRunningInternalTableJob(project, table);
             deleteMetaAndDataInFileSystem(internalTable);
             createDeltaSchema(internalTable);
             internalTableManager.saveOrUpdateInternalTable(internalTable);
@@ -235,6 +248,9 @@ public class InternalTableService extends BasicService {
             if (fs.exists(location)) {
                 HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), 
location);
                 logger.info("Successfully deleted internal table on {}", 
internalTable.getLocation());
+            } else {
+                logger.warn("Internal table {}'s root path {} is not exists, 
skip delete", internalTable.getIdentity(),
+                        internalTable.getLocation());
             }
         } catch (IOException e) {
             logger.error("Failed to delete internal table on {}", 
internalTable.getLocation(), e);
@@ -288,17 +304,35 @@ public class InternalTableService extends BasicService {
             throw new KylinException(INTERNAL_TABLE_NOT_EXIST, errorMsg);
         }
         suicideRunningInternalTableJob(project, tableIdentity);
-        if (internalTable.getRowCount() == 0) {
-            logger.info("{} not have any data, skip truncate", tableIdentity);
-            return InternalTableLoadingJobResponse.of(Collections.emptyList(), 
"");
+        long start = System.currentTimeMillis();
+        deleteMetaAndDataInFileSystem(internalTable);
+        createDeltaSchema(internalTable);
+        val fs = HadoopUtil.getWorkingFileSystem();
+        // -1 indicates that an error occurred while obtaining files statistics
+        long storageSize = -1;
+        try {
+            storageSize = HadoopUtil.getContentSummary(fs, new 
Path(internalTable.getLocation())).getLength();
+        } catch (IOException e) {
+            logger.warn("Fetch storage size for internal table {} from {} 
failed caused by:",
+                    internalTable.getIdentity(), internalTable.getLocation(), 
e);
         }
-        return dropAllPartitionsOnDeltaTable(project, tableIdentity);
-    }
-
-    // 1. update delta log
-    // 2. delete unused data files
-    private InternalTableLoadingJobResponse 
dropAllPartitionsOnDeltaTable(String project, String tableIdentity) {
-        return dropPartitionsOnDeltaTable(project, tableIdentity, new 
String[0], null);
+        long finalStorageSize = storageSize;
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            InternalTableManager img = getManager(InternalTableManager.class, 
project);
+            InternalTableDesc oldTable = 
img.getInternalTableDesc(tableIdentity);
+            InternalTablePartition tablePartition = 
oldTable.getTablePartition();
+            if (tablePartition != null) {
+                tablePartition.setPartitionValues(new ArrayList<>());
+                tablePartition.setPartitionDetails(new ArrayList<>());
+            }
+            oldTable.setStorageSize(finalStorageSize);
+            oldTable.setRowCount(0);
+            img.saveOrUpdateInternalTable(oldTable);
+            return true;
+        }, project);
+        logger.info("Successfully truncate internal table {} in {} ms", 
tableIdentity,
+                System.currentTimeMillis() - start);
+        return InternalTableLoadingJobResponse.of(new ArrayList<>(), "");
     }
 
     // 1. delete partition data in file system
@@ -306,7 +340,7 @@ public class InternalTableService extends BasicService {
     // we shall do this delete action by a spark job and call delta delete api
     // so that the delta meta could be updated!
     public InternalTableLoadingJobResponse dropPartitionsOnDeltaTable(String 
project, String tableIdentity,
-            String[] partitionValues, String yarnQueue) {
+            String[] partitionValues, String yarnQueue) throws IOException {
         aclEvaluate.checkProjectWritePermission(project);
         return internalTableLoadingService.dropPartitions(project, 
partitionValues, tableIdentity, yarnQueue);
     }
@@ -325,13 +359,12 @@ public class InternalTableService extends BasicService {
     }
 
     /**
-     *
      * @param project
      * @param table
      * @param isIncremental
      * @param startDate
      * @param endDate
-     * @param yarnQueue if not null, use hadoop yarn resource to build, else 
use spark standalone
+     * @param yarnQueue     if not null, use hadoop yarn resource to build, 
else use spark standalone
      * @return
      * @throws Exception
      */
@@ -363,6 +396,7 @@ public class InternalTableService extends BasicService {
             internalTableDescResponse.setTimePartitionCol(partitionColumn);
             
internalTableDescResponse.setUpdateTime(internalTableDesc.getLastModified());
             
internalTableDescResponse.setDatePartitionFormat(internalTableDesc.getDatePartitionFormat());
+            
internalTableDescResponse.setTblProperties(internalTableDesc.getTblProperties());
             descList.add(internalTableDescResponse);
         });
         return descList;
diff --git 
a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/InternalTableController.java
 
b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/InternalTableController.java
index c84a5684ad..eba6071819 100644
--- 
a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/InternalTableController.java
+++ 
b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/InternalTableController.java
@@ -166,7 +166,7 @@ public class InternalTableController extends 
NBasicController {
         return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, 
DataResult.get(rep, offset, limit), "");
     }
 
-    @ApiOperation(value = "update_table", tags = { "AI" })
+    @ApiOperation(value = "get_table_detail", tags = { "AI" })
     @GetMapping(value = "/{database:.+}/{table:.+}", produces = { 
HTTP_VND_APACHE_KYLIN_JSON })
     @ResponseBody
     public EnvelopeResponse<DataResult<List<InternalTablePartitionDetail>>> 
getTableDetail(
diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
index a488dbdb9b..fe0842dde9 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
@@ -50,6 +50,7 @@ public class InternalTableLoadingJob extends 
DefaultExecutableOnTable {
         Preconditions.checkArgument(param.getSubmitter() != null);
         InternalTableLoadingJob job = new InternalTableLoadingJob();
         job.setSubmitter(param.getSubmitter());
+        job.setName(param.getJobType().toString());
         job.setJobType(param.getJobType());
         job.setId(param.getJobId());
         job.setTargetSubject(internalTable.getIdentity());
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
index 37673c5761..4849a6ddab 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
@@ -145,6 +145,8 @@ class InternalTableLoader extends Logging {
             }
 
           }
+        } else {
+          logInfo(s"$filePath does not exist, skip it")
         }
       } catch {
         case e: IOException =>
@@ -170,11 +172,6 @@ class InternalTableLoader extends Logging {
   def dropPartitions(ss: SparkSession,
                      internalTable: InternalTableDesc,
                      partitionValues: String): Unit = {
-    //    var clickhouseTable: DeltaTable = null
-    //    if (internalTable.getStorageType == StorageType.gluten) {
-    //      clickhouseTable =
-    //    }
-
     val sparkTable = internalTable.getStorageType match {
       case StorageType.GLUTEN => ClickhouseTable.forPath(ss, 
internalTable.generateInternalTableLocation)
       case StorageType.DELTALAKE => DeltaTable.forPath(ss, 
internalTable.generateInternalTableLocation)
@@ -191,16 +188,22 @@ class InternalTableLoader extends Logging {
       val toDeletedPaths = new util.ArrayList[String]()
       val values = partitionValues.split(",")
       val deleteStatementBuilder = StringBuilder.newBuilder
+      val toDeletedPartitionValues = new util.ArrayList[String]()
       values.foreach {
         partitionValue =>
           deleteStatementBuilder.clear()
           deleteStatementBuilder.append(partitionCol)
             .append(" = ")
             .append("'" + partitionValue + "'")
-          val subPath = partitionCol + "=" + partitionValue
+          val subPath = partitionCol.toUpperCase(Locale.ROOT) + "=" + 
partitionValue
           val pathName = new Path(internalTable.getLocation, subPath).toString
           toDeletedPaths.add(pathName)
-          deleteDeltaMetaData(sparkTable, deleteStatementBuilder.toString())
+          toDeletedPartitionValues.add(deleteStatementBuilder.toString())
+      }
+      if (!toDeletedPartitionValues.isEmpty) {
+        val partitionCondition = StringUtils.join(toDeletedPartitionValues, " 
or ")
+        logInfo(s"Dropping partitions for table: $internalTable, partition 
condition: $partitionCondition")
+        deleteDeltaMetaData(sparkTable, partitionCondition)
       }
       if (!toDeletedPaths.isEmpty) {
         truncateDataInFileSystem(toDeletedPaths, isInternalTableRootPath = 
false)
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
index bfb63473d4..2c6c93c6b6 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.engine.spark.application.SparkApplication;
 import org.apache.kylin.engine.spark.builder.InternalTableLoader;
@@ -42,10 +43,13 @@ import org.apache.kylin.metadata.table.InternalTableManager;
 import org.apache.kylin.metadata.table.InternalTablePartition;
 import org.apache.kylin.metadata.table.InternalTablePartitionDetail;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.delta.tables.DeltaTable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import lombok.val;
 
 public class InternalTableLoadJob extends SparkApplication {
@@ -74,17 +78,34 @@ public class InternalTableLoadJob extends SparkApplication {
             logger.info("Start to load data into table");
             loadIntoInternalTable();
         }
-        updateMeta();
+        updateMate();
     }
 
-    private void updateMeta() {
+    private void updateMate() {
         String tableName = getParam(NBatchConstants.P_TABLE_NAME);
+        InternalTableMetaUpdateInfo info = extractUpdateInfo(project, 
tableName, config, ss);
+        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            InternalTableManager internalTableManager = 
InternalTableManager.getInstance(config, project);
+            InternalTableDesc internalTable = 
internalTableManager.getInternalTableDesc(tableName);
+            InternalTablePartition tablePartition = 
internalTable.getTablePartition();
+            if (tablePartition != null) {
+                tablePartition.setPartitionValues(info.getPartitionValues());
+                tablePartition.setPartitionDetails(info.getPartitionDetails());
+            }
+            internalTable.setRowCount(info.getFinalCount());
+            internalTableManager.saveOrUpdateInternalTable(internalTable);
+            return true;
+        }, project);
+    }
+
+    public InternalTableMetaUpdateInfo extractUpdateInfo(String project, 
String tableName, KylinConfig config,
+            SparkSession ss) {
         InternalTableManager internalTableManager = 
InternalTableManager.getInstance(config, project);
         InternalTableDesc internalTable = 
internalTableManager.getInternalTableDesc(tableName);
         List<InternalTablePartitionDetail> partitionDetails;
-        long count = getInternalTableCount(internalTable);
+        long count = getInternalTableCount(internalTable, ss);
         if (internalTable.getTablePartition() != null) {
-            partitionDetails = extractPartitionDetails(internalTable);
+            partitionDetails = extractPartitionDetails(ss, internalTable);
         } else {
             partitionDetails = Collections.emptyList();
         }
@@ -95,20 +116,10 @@ public class InternalTableLoadJob extends SparkApplication 
{
                 partitionDetails.size());
         List<String> finalPartitionValues = partitionDetails.stream()
                 
.map(InternalTablePartitionDetail::getPartitionValue).collect(Collectors.toList());
-        long finalCount = count;
-        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-            InternalTablePartition tablePartition = 
internalTable.getTablePartition();
-            if (tablePartition != null) {
-                tablePartition.setPartitionValues(finalPartitionValues);
-                tablePartition.setPartitionDetails(partitionDetails);
-            }
-            internalTable.setRowCount(finalCount);
-            internalTableManager.saveOrUpdateInternalTable(internalTable);
-            return true;
-        }, project);
+        return new InternalTableMetaUpdateInfo(count, finalPartitionValues, 
partitionDetails);
     }
 
-    private long getInternalTableCount(InternalTableDesc internalTable) {
+    private long getInternalTableCount(InternalTableDesc internalTable, 
SparkSession ss) {
         if (internalTable.getStorageType() == 
InternalTableDesc.StorageType.PARQUET) {
             try {
                 val internalTableDs = 
ss.read().format(internalTable.getStorageType().getFormat())
@@ -124,7 +135,8 @@ public class InternalTableLoadJob extends SparkApplication {
         }
     }
 
-    private List<InternalTablePartitionDetail> 
extractPartitionDetails(InternalTableDesc internalTable) {
+    private List<InternalTablePartitionDetail> 
extractPartitionDetails(SparkSession ss,
+            InternalTableDesc internalTable) {
         val partitionCol = 
internalTable.getTablePartition().getPartitionColumns()[0];
         List<String> partitionValues;
         List<InternalTablePartitionDetail> partitionDetails = new 
ArrayList<>();
@@ -202,4 +214,12 @@ public class InternalTableLoadJob extends SparkApplication 
{
                 .getInternalTableDesc(tableName);
         loader.dropPartitions(ss, internalTable, toBeDelete);
     }
+
+    @Getter
+    @AllArgsConstructor
+    public static class InternalTableMetaUpdateInfo {
+        long finalCount;
+        List<String> partitionValues;
+        List<InternalTablePartitionDetail> partitionDetails;
+    }
 }


Reply via email to