This is an automated email from the ASF dual-hosted git repository.
liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 508fe2e366 KYLIN-5953 Minor bugfixes of InternalTable
508fe2e366 is described below
commit 508fe2e366da5382ee5b388a97347c0ff33ccb3d
Author: Zhimin Wu <[email protected]>
AuthorDate: Tue Jul 16 09:37:56 2024 +0800
KYLIN-5953 Minor bugfixes of InternalTable
A followUp issue of KYLIN-5948:
1. Add table properties for internal table list
2. Optimized delta delete
3. Restore build script
4. Skip validation only if source table is empty
5. Optimized the execution time of deletion tasks
6. Set jobName for internal table job
7. Bugfix empty internal table update
---------
Co-authored-by: Zhiting Guo <[email protected]>
Co-authored-by: huangsheng <[email protected]>
Co-authored-by: Zhimin Wu <[email protected]>
Co-authored-by: 7mming7 <[email protected]>
---
.../metadata/cube/cuboid/NLayoutCandidate.java | 1 +
.../rest/service/InternalTableServiceTest.java | 165 +++++++++++++++++++--
.../job/service/InternalTableLoadingService.java | 42 ++++--
.../rest/response/InternalTableDescResponse.java | 5 +
.../kylin/rest/service/InternalTableService.java | 82 +++++++---
.../rest/controller/InternalTableController.java | 2 +-
.../engine/spark/job/InternalTableLoadingJob.java | 1 +
.../engine/spark/builder/InternalTableLoader.scala | 17 ++-
.../engine/spark/job/InternalTableLoadJob.java | 54 ++++---
9 files changed, 291 insertions(+), 78 deletions(-)
diff --git
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
index 8f47180b4c..0aac2056bf 100644
---
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
+++
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/cuboid/NLayoutCandidate.java
@@ -65,6 +65,7 @@ public class NLayoutCandidate implements
IRealizationCandidate {
Preconditions.checkNotNull(layoutEntity);
Preconditions.checkNotNull(dataLayoutDetails);
this.layoutEntity = layoutEntity;
+ this.layoutId = layoutEntity.getId();
this.dataLayoutDetails = dataLayoutDetails;
}
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
index 071cc88c4c..22cf942423 100644
---
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.rest.service;
+import static org.apache.kylin.common.exception.QueryErrorCode.EMPTY_TABLE;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -26,12 +27,15 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.AbstractTestCase;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.persistence.transaction.TransactionException;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.engine.spark.NLocalWithSparkSessionTestBase;
@@ -52,10 +56,12 @@ import
org.apache.kylin.rest.response.InternalTableLoadingJobResponse;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
+import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@@ -101,7 +107,6 @@ public class InternalTableServiceTest extends
AbstractTestCase {
.setAuthentication(new TestingAuthenticationToken("ADMIN",
"ADMIN", Constant.ROLE_ADMIN));
SparkJobFactoryUtils.initJobFactory();
overwriteSystemProp("kylin.source.provider.9",
"org.apache.kylin.engine.spark.mockup.CsvSource");
- when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyy-MM-dd");
}
@Test
@@ -109,7 +114,7 @@ public class InternalTableServiceTest extends
AbstractTestCase {
KylinConfig config = KylinConfig.getInstanceFromEnv();
NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
-
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyy-MM-dd");
// null value is valid
internalTableService.checkParameters(null, table, null);
@@ -140,6 +145,28 @@ public class InternalTableServiceTest extends
AbstractTestCase {
}
+ @Test
+ void testCheckParamsWithEmptySourceTable() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
+ TableDesc table = tManager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenThrow(new KylinException(
+ EMPTY_TABLE, String.format(Locale.ROOT,
MsgPicker.getMsg().getNoDataInTable(), table)));
+ // test right date format with source table data is empty
+ internalTableService.checkParameters(new String[] { "TRANS_ID",
"CAL_DT" }, table, "yyyy-MM-dd");
+ }
+
+ @Test
+ void testCheckParamsWithFatalError() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
+ TableDesc table = tManager.getTableDesc("DEFAULT.TEST_KYLIN_FACT");
+ when(tableService.getPartitionColumnFormat(any(), any(), any(), any()))
+ .thenThrow(new IllegalStateException("fatal error"));
+ Assertions.assertThrows(IllegalStateException.class,
+ () -> internalTableService.checkParameters(new String[] {
"TRANS_ID", "CAL_DT" }, table, "yyyy-MM-dd"));
+ }
+
@Test
void testCreateInternalTable() throws Exception {
KylinConfig config = KylinConfig.getInstanceFromEnv();
@@ -148,6 +175,7 @@ public class InternalTableServiceTest extends
AbstractTestCase {
TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
String[] partitionCols = new String[] { DATE_COL };
Map<String, String> tblProperties = new HashMap<>();
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyy-MM-dd");
internalTableService.createInternalTable(PROJECT, table.getName(),
table.getDatabase(), partitionCols,
"yyyy-MM-dd", tblProperties,
InternalTableDesc.StorageType.PARQUET.name());
InternalTableDesc internalTable =
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
@@ -175,12 +203,39 @@ public class InternalTableServiceTest extends
AbstractTestCase {
}
@Test
- void testUpdateInternalTable() throws Exception {
+ void testCreateDeltaInternalTable() throws Exception {
KylinConfig config = KylinConfig.getInstanceFromEnv();
NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
InternalTableManager internalTableManager =
InternalTableManager.getInstance(config, PROJECT);
TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+ String[] partitionCols = new String[] { DATE_COL };
+ Map<String, String> tblProperties = new HashMap<>();
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyy-MM-dd");
+ internalTableService.createInternalTable(PROJECT, table.getName(),
table.getDatabase(), partitionCols,
+ "yyyy-MM-dd", tblProperties,
InternalTableDesc.StorageType.DELTALAKE.name());
+ InternalTableDesc internalTable =
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+ Assertions.assertNotNull(internalTable);
+ String workingDir =
config.getHdfsWorkingDirectory().replace("file://", "");
+ File internalTableFolder = new File(workingDir, INTERNAL_DIR);
+ Assertions.assertTrue(internalTableFolder.exists() &&
internalTableFolder.isDirectory());
+ // test create duplicated internal table
+ Assertions.assertThrows(TransactionException.class,
+ () -> internalTableService.createInternalTable(PROJECT,
table.getName(), table.getDatabase(),
+ partitionCols, "yyyy-MM-dd", tblProperties,
InternalTableDesc.StorageType.DELTALAKE.name()));
+ // test create internal table without tableDesc
+ Assertions.assertThrows(TransactionException.class,
+ () -> internalTableService.createInternalTable(PROJECT,
table.getName() + "_xxx", table.getDatabase(),
+ partitionCols, "yyyy-MM-dd", tblProperties,
InternalTableDesc.StorageType.DELTALAKE.name()));
+ internalTableService.dropInternalTable(PROJECT, TABLE_INDENTITY);
+ }
+ @Test
+ void testUpdateInternalTable() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
+ InternalTableManager internalTableManager =
InternalTableManager.getInstance(config, PROJECT);
+ TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyy-MM-dd");
internalTableService.createInternalTable(PROJECT, table.getName(),
table.getDatabase(), new String[] {}, null,
new HashMap<>(), InternalTableDesc.StorageType.PARQUET.name());
InternalTableDesc internalTable =
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
@@ -228,7 +283,7 @@ public class InternalTableServiceTest extends
AbstractTestCase {
// test update an internal table which has loaded data.
UnitOfWork.doInTransactionWithRetry(() -> {
InternalTableManager manager =
InternalTableManager.getInstance(KylinConfig.getInstanceFromEnv(), PROJECT);
- manager.updateInternalTable(TABLE_INDENTITY, copyForWrite ->
copyForWrite.setStorageSize(1L));
+ manager.updateInternalTable(TABLE_INDENTITY, copyForWrite ->
copyForWrite.setRowCount(1L));
return null;
}, PROJECT);
@@ -278,17 +333,15 @@ public class InternalTableServiceTest extends
AbstractTestCase {
// check internal table data exist
String workingDir =
config.getHdfsWorkingDirectory().replace("file://", "");
File internalTableFolder = new File(workingDir, INTERNAL_DIR);
- Assertions.assertEquals(1, internalTableFolder.list().length);
+ Assertions.assertEquals(1,
Objects.requireNonNull(internalTableFolder.list()).length);
// check query
SparkSession ss = SparderEnv.getSparkSession();
Assertions.assertFalse(ss.sql(BASE_SQL).isEmpty());
- // check truncate
- response = internalTableService.truncateInternalTable(PROJECT,
TABLE_INDENTITY);
- jobId = response.getJobs().get(0).getJobId();
- waitJobToFinished(config, jobId);
- Assertions.assertEquals(0, internalTableFolder.list().length);
+ // check truncate,will delete old path and not create empty schema for
parquet format
+ internalTableService.truncateInternalTable(PROJECT, TABLE_INDENTITY);
+ Assertions.assertFalse(internalTableFolder.exists());
// double truncate
response = internalTableService.truncateInternalTable(PROJECT,
TABLE_INDENTITY);
@@ -313,7 +366,7 @@ public class InternalTableServiceTest extends
AbstractTestCase {
NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
-
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyy-MM-dd");
internalTableService.createInternalTable(PROJECT, TABLE_INDENTITY, new
String[] { DATE_COL }, "yyyy-MM-dd",
new HashMap<>(), InternalTableDesc.StorageType.PARQUET.name());
String startDate = "1325347200000"; // 2012-01-01
@@ -343,9 +396,7 @@ public class InternalTableServiceTest extends
AbstractTestCase {
// remove some partitions and check
String[] toDeletePartitions = new String[] { "2012-01-03",
"2012-01-04" };
- response = internalTableService.dropPartitionsOnDeltaTable(PROJECT,
TABLE_INDENTITY, toDeletePartitions, null);
- jobId = response.getJobs().get(0).getJobId();
- waitJobToFinished(config, jobId);
+ internalTableService.dropPartitionsOnDeltaTable(PROJECT,
TABLE_INDENTITY, toDeletePartitions, null);
Assertions.assertEquals(6 - toDeletePartitions.length,
internalTableFolder.list().length);
long newCount = ss.sql(BASE_SQL).count();
Assertions.assertTrue(newCount > 0 && newCount < count);
@@ -355,6 +406,90 @@ public class InternalTableServiceTest extends
AbstractTestCase {
Assertions.assertFalse(internalTableFolder.exists());
}
+ @Test
+ void testDropNotExistsTablePartition() {
+ // remove some partitions and check
+ String[] toDeletePartitions = new String[] { "2012-01-03",
"2012-01-04" };
+ Assert.assertThrows(KylinException.class, () -> {
+ internalTableService.dropPartitionsOnDeltaTable(PROJECT,
"DEFAULT.TEST_KYLIN_FACT_NOT", toDeletePartitions,
+ null);
+ });
+ }
+
+ @Test
+ void testDropNonPartitionTablePartition() throws Exception {
+ internalTableService.createInternalTable(PROJECT, TABLE_INDENTITY,
null, null, new HashMap<>(),
+ InternalTableDesc.StorageType.PARQUET.name());
+ String[] toDeletePartitions = new String[] { "2012-01-03",
"2012-01-04" };
+ Assert.assertThrows(KylinException.class, () -> {
+ internalTableService.dropPartitionsOnDeltaTable(PROJECT,
"DEFAULT.TEST_KYLIN_FACT_NOT", toDeletePartitions,
+ null);
+ });
+ }
+
+ @Test
+ void testTruncatePartitionInternalTable() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
+ InternalTableManager internalTableManager =
InternalTableManager.getInstance(config, PROJECT);
+ TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+ String[] partitionCols = new String[] { DATE_COL };
+ Map<String, String> tblProperties = new HashMap<>();
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyy-MM-dd");
+ internalTableService.createInternalTable(PROJECT, table.getName(),
table.getDatabase(), partitionCols,
+ "yyyy-MM-dd", tblProperties,
InternalTableDesc.StorageType.PARQUET.name());
+ InternalTableLoadingJobResponse response =
internalTableService.loadIntoInternalTable(PROJECT, table.getName(),
+ table.getDatabase(), false, false, "", "", null);
+ String jobId = response.getJobs().get(0).getJobId();
+ waitJobToFinished(config, jobId);
+
+ InternalTableDesc internalTableDesc =
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+ // check internal table data exist
+ String workingDir =
config.getHdfsWorkingDirectory().replace("file://", "");
+ File internalTableFolder = new File(workingDir, INTERNAL_DIR);
+ // parquet format not have meta dir, so the partition size should
equal file size
+
Assertions.assertEquals(internalTableDesc.getTablePartition().getPartitionDetails().size(),
+ Objects.requireNonNull(internalTableFolder.list()).length);
+
+ // check truncate,will delete old path and not create empty schema for
parquet format
+ internalTableService.truncateInternalTable(PROJECT, TABLE_INDENTITY);
+ Assertions.assertFalse(internalTableFolder.exists());
+ InternalTableDesc afterTruncateTable =
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+ Assertions.assertEquals(-1, afterTruncateTable.getStorageSize());
+ }
+
+ @Disabled("gluten not support deltaLake yet.")
+ @Test
+ void testTruncatePartitionDeltaInternalTable() throws Exception {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
+ InternalTableManager internalTableManager =
InternalTableManager.getInstance(config, PROJECT);
+ TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
+ String[] partitionCols = new String[] { DATE_COL };
+ Map<String, String> tblProperties = new HashMap<>();
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyy-MM-dd");
+ internalTableService.createInternalTable(PROJECT, table.getName(),
table.getDatabase(), partitionCols,
+ "yyyy-MM-dd", tblProperties,
InternalTableDesc.StorageType.DELTALAKE.name());
+ InternalTableLoadingJobResponse response =
internalTableService.loadIntoInternalTable(PROJECT, table.getName(),
+ table.getDatabase(), false, false, "", "", null);
+ String jobId = response.getJobs().get(0).getJobId();
+ waitJobToFinished(config, jobId);
+
+ InternalTableDesc internalTableDesc =
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+ // check internal table data exist
+ String workingDir =
config.getHdfsWorkingDirectory().replace("file://", "");
+ File internalTableFolder = new File(workingDir, INTERNAL_DIR);
+ // delta format have meta dir, so the partition size + 1 should equal
file size
+
Assertions.assertEquals(internalTableDesc.getTablePartition().getPartitionDetails().size()
+ 1,
+ Objects.requireNonNull(internalTableFolder.list()).length);
+
+ // check truncate,will create a new empty schema dir for delta format
+ internalTableService.truncateInternalTable(PROJECT, TABLE_INDENTITY);
+ Assertions.assertTrue(internalTableFolder.exists());
+ InternalTableDesc afterTruncateTable =
internalTableManager.getInternalTableDesc(TABLE_INDENTITY);
+ Assertions.assertTrue(afterTruncateTable.getStorageSize() > 0);
+ }
+
private void waitJobToFinished(KylinConfig config, String jobId) {
ExecutableManager executableManager =
ExecutableManager.getInstance(config, PROJECT);
await().atMost(5, TimeUnit.MINUTES).until(() -> {
@@ -370,7 +505,7 @@ public class InternalTableServiceTest extends
AbstractTestCase {
KylinConfig config = KylinConfig.getInstanceFromEnv();
NTableMetadataManager tManager =
NTableMetadataManager.getInstance(config, PROJECT);
TableDesc table = tManager.getTableDesc(TABLE_INDENTITY);
-
+ when(tableService.getPartitionColumnFormat(any(), any(), any(),
any())).thenReturn("yyyy-MM-dd");
List<InternalTablePartitionDetail> details =
internalTableService.getTableDetail(PROJECT, table.getDatabase(),
table.getName());
Assertions.assertNull(details);
diff --git
a/src/datasource-service/src/main/java/org/apache/kylin/job/service/InternalTableLoadingService.java
b/src/datasource-service/src/main/java/org/apache/kylin/job/service/InternalTableLoadingService.java
index b299392b0d..7752804fcb 100644
---
a/src/datasource-service/src/main/java/org/apache/kylin/job/service/InternalTableLoadingService.java
+++
b/src/datasource-service/src/main/java/org/apache/kylin/job/service/InternalTableLoadingService.java
@@ -21,19 +21,19 @@ package org.apache.kylin.job.service;
import static
org.apache.kylin.common.exception.ServerErrorCode.INTERNAL_TABLE_ERROR;
import static
org.apache.kylin.common.exception.ServerErrorCode.INTERNAL_TABLE_NOT_EXIST;
import static org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_BUILD;
-import static
org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_DELETE_PARTITION;
import static
org.apache.kylin.job.execution.JobTypeEnum.INTERNAL_TABLE_REFRESH;
+import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
-import java.util.stream.Collectors;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.engine.spark.builder.InternalTableLoader;
+import org.apache.kylin.engine.spark.job.InternalTableLoadJob;
import org.apache.kylin.job.dao.JobStatisticsManager;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.manager.JobManager;
@@ -43,8 +43,11 @@ import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
import org.apache.kylin.metadata.table.InternalTableDesc;
import org.apache.kylin.metadata.table.InternalTableManager;
+import org.apache.kylin.metadata.table.InternalTablePartition;
import org.apache.kylin.rest.response.InternalTableLoadingJobResponse;
import org.apache.kylin.rest.service.BasicService;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -89,21 +92,32 @@ public class InternalTableLoadingService extends
BasicService {
}
public InternalTableLoadingJobResponse dropPartitions(String project,
String[] partitionValues,
- String tableIdentity, String yarnQueue) {
- List<String> jobIds = new ArrayList<>();
+ String tableIdentity, String yarnQueue) throws IOException {
+ // If internal table can not be obtained, will throw a kylin exception
InternalTableDesc internalTable = checkAndGetInternalTables(project,
tableIdentity);
+ InternalTableLoader internalTableLoader = new InternalTableLoader();
+ String toBeDelete = String.join(",", partitionValues);
+ SparkSession ss = SparderEnv.getSparkSession();
+ long start = System.currentTimeMillis();
+ logger.info("Start to drop partition for table {}", tableIdentity);
+ internalTableLoader.dropPartitions(ss, internalTable, toBeDelete);
+ InternalTableLoadJob internalTableLoadJob = new InternalTableLoadJob();
+ InternalTableLoadJob.InternalTableMetaUpdateInfo info =
internalTableLoadJob.extractUpdateInfo(project,
+ internalTable.getIdentity(), getConfig(), ss);
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
- String toBeDelete =
Arrays.stream(partitionValues).collect(Collectors.joining(","));
- logger.info("drop partitions for table: {}, partitions: {}",
internalTable.getIdentity(), toBeDelete);
- JobParam jobParam = new
JobParam().withProject(project).withOwner(BasicService.getUsername())
-
.withTable(internalTable.getIdentity()).withYarnQueue(yarnQueue)
- .withJobTypeEnum(INTERNAL_TABLE_DELETE_PARTITION)
- .addExtParams(NBatchConstants.P_DELETE_PARTITION, "true")
- .addExtParams(NBatchConstants.P_DELETE_PARTITION_VALUES,
toBeDelete);
- jobIds.add(getManager(JobManager.class, project).addJob(jobParam));
+ InternalTableManager internalTableManager =
InternalTableManager.getInstance(getConfig(), project);
+ InternalTableDesc oldTable = checkAndGetInternalTables(project,
tableIdentity);
+ InternalTablePartition tablePartition =
oldTable.getTablePartition();
+ tablePartition.setPartitionValues(info.getPartitionValues());
+ tablePartition.setPartitionDetails(info.getPartitionDetails());
+ oldTable.setRowCount(info.getFinalCount());
+ internalTableManager.saveOrUpdateInternalTable(oldTable);
return true;
}, project);
- return InternalTableLoadingJobResponse.of(jobIds,
INTERNAL_TABLE_DELETE_PARTITION.toString());
+
+ logger.info("Successfully drop partition[{}] for table {} in {} ms",
toBeDelete, tableIdentity,
+ System.currentTimeMillis() - start);
+ return InternalTableLoadingJobResponse.of(new ArrayList<>(), "");
}
private InternalTableDesc checkAndGetInternalTables(String project, String
tableIdentity) {
diff --git
a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/InternalTableDescResponse.java
b/src/datasource-service/src/main/java/org/apache/kylin/rest/response/InternalTableDescResponse.java
index 2c6992113b..b4e9c4fe8b 100644
---
a/src/datasource-service/src/main/java/org/apache/kylin/rest/response/InternalTableDescResponse.java
+++
b/src/datasource-service/src/main/java/org/apache/kylin/rest/response/InternalTableDescResponse.java
@@ -18,6 +18,8 @@
package org.apache.kylin.rest.response;
+import java.util.Map;
+
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@@ -52,4 +54,7 @@ public class InternalTableDescResponse {
@JsonProperty("update_time")
private long updateTime;
+ @JsonProperty("tbl_properties")
+ private Map<String, String> tblProperties;
+
}
diff --git
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
index e746e22515..23980b506f 100644
---
a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
+++
b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/InternalTableService.java
@@ -25,8 +25,8 @@ import static
org.apache.kylin.common.exception.ServerErrorCode.INVALID_INTERNAL
import static
org.apache.kylin.common.exception.ServerErrorCode.TABLE_NOT_EXIST;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import lombok.val;
import scala.Option;
@Service("internalTableService")
@@ -84,6 +85,7 @@ public class InternalTableService extends BasicService {
/**
* Create an internal table from an existing table
+ *
* @param projectName
* @param table
* @param database
@@ -151,20 +153,29 @@ public class InternalTableService extends BasicService {
if (StringUtils.isEmpty(datePartitionFormat) &&
dateCol.isPresent()) {
throw new KylinException(EMPTY_PARAMETER,
"date_partition_format can not be null, please check again");
}
- // detect date partition format
- if (dateCol.isPresent() &&
!StringUtils.isEmpty(datePartitionFormat)
- &&
!tableService.getPartitionColumnFormat(originTable.getProject(),
originTable.getIdentity(),
- dateCol.get().getName(),
null).equals(datePartitionFormat)) {
- String errorMsg = String.format(Locale.ROOT,
MsgPicker.getMsg().getIncorrectDateformat(),
- datePartitionFormat);
- throw new KylinException(INVALID_INTERNAL_TABLE_PARAMETER,
errorMsg);
+ if (dateCol.isPresent() &&
!StringUtils.isEmpty(datePartitionFormat)) {
+ boolean isFormatMatchRealDataFormat = true;
+ try {
+ // If the source table is empty, the true format cannot be
obtained
+ isFormatMatchRealDataFormat =
tableService.getPartitionColumnFormat(originTable.getProject(),
+ originTable.getIdentity(),
dateCol.get().getName(), null).equals(datePartitionFormat);
+ } catch (KylinException kylinException) {
+ logger.warn("Cannot get the real data format, skip the
date format check", kylinException);
+ // other non kylin-exception will throw out
+ }
+ if (!isFormatMatchRealDataFormat) {
+ String errorMsg = String.format(Locale.ROOT,
MsgPicker.getMsg().getIncorrectDateformat(),
+ datePartitionFormat);
+ throw new KylinException(INVALID_INTERNAL_TABLE_PARAMETER,
errorMsg);
+ }
}
}
}
public void createDeltaSchema(InternalTableDesc internalTable) throws
IOException {
- if (internalTable.getStorageType() ==
InternalTableDesc.StorageType.GLUTEN) {
+ if (internalTable.getStorageType() ==
InternalTableDesc.StorageType.GLUTEN
+ || internalTable.getStorageType() ==
InternalTableDesc.StorageType.DELTALAKE) {
Option<SparkSession> defaultSession =
SparkSession.getDefaultSession();
InternalTableLoader internalTableLoader = new
InternalTableLoader();
internalTableLoader.onlyLoadSchema(true);
@@ -176,6 +187,7 @@ public class InternalTableService extends BasicService {
/**
* create an internal table with source table
* and no partition columns and table properties specified
+ *
* @param project
* @param originTable
* @param storageType
@@ -198,8 +210,8 @@ public class InternalTableService extends BasicService {
String errorMsg = String.format(Locale.ROOT,
MsgPicker.getMsg().getInternalTableNotFound(), dbTblName);
throw new KylinException(INTERNAL_TABLE_NOT_EXIST, errorMsg);
}
- if (internalTable.getStorageSize() > 0L) {
- throw new KylinException(INTERNAL_TABLE_ERROR, "Loaded
internal table can not be updated");
+ if (internalTable.getRowCount() > 0L) {
+ throw new KylinException(INTERNAL_TABLE_ERROR, "Non-empty
internal table can not be updated");
}
checkParameters(partitionCols, originTable, datePartitionFormat);
if (partitionCols != null && partitionCols.length != 0) {
@@ -211,6 +223,7 @@ public class InternalTableService extends BasicService {
internalTable.setTblProperties(tblProperties);
internalTable.optimizeTblProperties();
internalTable.setStorageType(storageType);
+ suicideRunningInternalTableJob(project, table);
deleteMetaAndDataInFileSystem(internalTable);
createDeltaSchema(internalTable);
internalTableManager.saveOrUpdateInternalTable(internalTable);
@@ -235,6 +248,9 @@ public class InternalTableService extends BasicService {
if (fs.exists(location)) {
HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(),
location);
logger.info("Successfully deleted internal table on {}",
internalTable.getLocation());
+ } else {
+ logger.warn("Internal table {}'s root path {} is not exists,
skip delete", internalTable.getIdentity(),
+ internalTable.getLocation());
}
} catch (IOException e) {
logger.error("Failed to delete internal table on {}",
internalTable.getLocation(), e);
@@ -288,17 +304,35 @@ public class InternalTableService extends BasicService {
throw new KylinException(INTERNAL_TABLE_NOT_EXIST, errorMsg);
}
suicideRunningInternalTableJob(project, tableIdentity);
- if (internalTable.getRowCount() == 0) {
- logger.info("{} not have any data, skip truncate", tableIdentity);
- return InternalTableLoadingJobResponse.of(Collections.emptyList(),
"");
+ long start = System.currentTimeMillis();
+ deleteMetaAndDataInFileSystem(internalTable);
+ createDeltaSchema(internalTable);
+ val fs = HadoopUtil.getWorkingFileSystem();
+ // -1 indicates that an error occurred while obtaining files statistics
+ long storageSize = -1;
+ try {
+ storageSize = HadoopUtil.getContentSummary(fs, new
Path(internalTable.getLocation())).getLength();
+ } catch (IOException e) {
+ logger.warn("Fetch storage size for internal table {} from {}
failed caused by:",
+ internalTable.getIdentity(), internalTable.getLocation(),
e);
}
- return dropAllPartitionsOnDeltaTable(project, tableIdentity);
- }
-
- // 1. update delta log
- // 2. delete unused data files
- private InternalTableLoadingJobResponse
dropAllPartitionsOnDeltaTable(String project, String tableIdentity) {
- return dropPartitionsOnDeltaTable(project, tableIdentity, new
String[0], null);
+ long finalStorageSize = storageSize;
+ EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+ InternalTableManager img = getManager(InternalTableManager.class,
project);
+ InternalTableDesc oldTable =
img.getInternalTableDesc(tableIdentity);
+ InternalTablePartition tablePartition =
oldTable.getTablePartition();
+ if (tablePartition != null) {
+ tablePartition.setPartitionValues(new ArrayList<>());
+ tablePartition.setPartitionDetails(new ArrayList<>());
+ }
+ oldTable.setStorageSize(finalStorageSize);
+ oldTable.setRowCount(0);
+ img.saveOrUpdateInternalTable(oldTable);
+ return true;
+ }, project);
+ logger.info("Successfully truncate internal table {} in {} ms",
tableIdentity,
+ System.currentTimeMillis() - start);
+ return InternalTableLoadingJobResponse.of(new ArrayList<>(), "");
}
// 1. delete partition data in file system
@@ -306,7 +340,7 @@ public class InternalTableService extends BasicService {
// we shall do this delete action by a spark job and call delta delete api
// so that the delta meta could be updated!
public InternalTableLoadingJobResponse dropPartitionsOnDeltaTable(String
project, String tableIdentity,
- String[] partitionValues, String yarnQueue) {
+ String[] partitionValues, String yarnQueue) throws IOException {
aclEvaluate.checkProjectWritePermission(project);
return internalTableLoadingService.dropPartitions(project,
partitionValues, tableIdentity, yarnQueue);
}
@@ -325,13 +359,12 @@ public class InternalTableService extends BasicService {
}
/**
- *
* @param project
* @param table
* @param isIncremental
* @param startDate
* @param endDate
- * @param yarnQueue if not null, use hadoop yarn resource to build, else
use spark standalone
+ * @param yarnQueue if not null, use hadoop yarn resource to build,
else use spark standalone
* @return
* @throws Exception
*/
@@ -363,6 +396,7 @@ public class InternalTableService extends BasicService {
internalTableDescResponse.setTimePartitionCol(partitionColumn);
internalTableDescResponse.setUpdateTime(internalTableDesc.getLastModified());
internalTableDescResponse.setDatePartitionFormat(internalTableDesc.getDatePartitionFormat());
+
internalTableDescResponse.setTblProperties(internalTableDesc.getTblProperties());
descList.add(internalTableDescResponse);
});
return descList;
diff --git
a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/InternalTableController.java
b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/InternalTableController.java
index c84a5684ad..eba6071819 100644
---
a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/InternalTableController.java
+++
b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/InternalTableController.java
@@ -166,7 +166,7 @@ public class InternalTableController extends
NBasicController {
return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
DataResult.get(rep, offset, limit), "");
}
- @ApiOperation(value = "update_table", tags = { "AI" })
+ @ApiOperation(value = "get_table_detail", tags = { "AI" })
@GetMapping(value = "/{database:.+}/{table:.+}", produces = {
HTTP_VND_APACHE_KYLIN_JSON })
@ResponseBody
public EnvelopeResponse<DataResult<List<InternalTablePartitionDetail>>>
getTableDetail(
diff --git
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
index a488dbdb9b..fe0842dde9 100644
---
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
+++
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/InternalTableLoadingJob.java
@@ -50,6 +50,7 @@ public class InternalTableLoadingJob extends
DefaultExecutableOnTable {
Preconditions.checkArgument(param.getSubmitter() != null);
InternalTableLoadingJob job = new InternalTableLoadingJob();
job.setSubmitter(param.getSubmitter());
+ job.setName(param.getJobType().toString());
job.setJobType(param.getJobType());
job.setId(param.getJobId());
job.setTargetSubject(internalTable.getIdentity());
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
index 37673c5761..4849a6ddab 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
@@ -145,6 +145,8 @@ class InternalTableLoader extends Logging {
}
}
+ } else {
+ logInfo(s"$filePath does not exist, skip it")
}
} catch {
case e: IOException =>
@@ -170,11 +172,6 @@ class InternalTableLoader extends Logging {
def dropPartitions(ss: SparkSession,
internalTable: InternalTableDesc,
partitionValues: String): Unit = {
- // var clickhouseTable: DeltaTable = null
- // if (internalTable.getStorageType == StorageType.gluten) {
- // clickhouseTable =
- // }
-
val sparkTable = internalTable.getStorageType match {
case StorageType.GLUTEN => ClickhouseTable.forPath(ss,
internalTable.generateInternalTableLocation)
case StorageType.DELTALAKE => DeltaTable.forPath(ss,
internalTable.generateInternalTableLocation)
@@ -191,16 +188,22 @@ class InternalTableLoader extends Logging {
val toDeletedPaths = new util.ArrayList[String]()
val values = partitionValues.split(",")
val deleteStatementBuilder = StringBuilder.newBuilder
+ val toDeletedPartitionValues = new util.ArrayList[String]()
values.foreach {
partitionValue =>
deleteStatementBuilder.clear()
deleteStatementBuilder.append(partitionCol)
.append(" = ")
.append("'" + partitionValue + "'")
- val subPath = partitionCol + "=" + partitionValue
+ val subPath = partitionCol.toUpperCase(Locale.ROOT) + "=" +
partitionValue
val pathName = new Path(internalTable.getLocation, subPath).toString
toDeletedPaths.add(pathName)
- deleteDeltaMetaData(sparkTable, deleteStatementBuilder.toString())
+ toDeletedPartitionValues.add(deleteStatementBuilder.toString())
+ }
+ if (!toDeletedPartitionValues.isEmpty) {
+ val partitionCondition = StringUtils.join(toDeletedPartitionValues, "
or ")
+ logInfo(s"Dropping partitions for table: $internalTable, partition
condition: $partitionCondition")
+ deleteDeltaMetaData(sparkTable, partitionCondition)
}
if (!toDeletedPaths.isEmpty) {
truncateDataInFileSystem(toDeletedPaths, isInternalTableRootPath =
false)
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
index bfb63473d4..2c6c93c6b6 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/InternalTableLoadJob.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.InternalTableLoader;
@@ -42,10 +43,13 @@ import org.apache.kylin.metadata.table.InternalTableManager;
import org.apache.kylin.metadata.table.InternalTablePartition;
import org.apache.kylin.metadata.table.InternalTablePartitionDetail;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.delta.tables.DeltaTable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import lombok.val;
public class InternalTableLoadJob extends SparkApplication {
@@ -74,17 +78,34 @@ public class InternalTableLoadJob extends SparkApplication {
logger.info("Start to load data into table");
loadIntoInternalTable();
}
- updateMeta();
+ updateMate();
}
- private void updateMeta() {
+ private void updateMate() {
String tableName = getParam(NBatchConstants.P_TABLE_NAME);
+ InternalTableMetaUpdateInfo info = extractUpdateInfo(project,
tableName, config, ss);
+ EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+ InternalTableManager internalTableManager =
InternalTableManager.getInstance(config, project);
+ InternalTableDesc internalTable =
internalTableManager.getInternalTableDesc(tableName);
+ InternalTablePartition tablePartition =
internalTable.getTablePartition();
+ if (tablePartition != null) {
+ tablePartition.setPartitionValues(info.getPartitionValues());
+ tablePartition.setPartitionDetails(info.getPartitionDetails());
+ }
+ internalTable.setRowCount(info.getFinalCount());
+ internalTableManager.saveOrUpdateInternalTable(internalTable);
+ return true;
+ }, project);
+ }
+
+ public InternalTableMetaUpdateInfo extractUpdateInfo(String project,
String tableName, KylinConfig config,
+ SparkSession ss) {
InternalTableManager internalTableManager =
InternalTableManager.getInstance(config, project);
InternalTableDesc internalTable =
internalTableManager.getInternalTableDesc(tableName);
List<InternalTablePartitionDetail> partitionDetails;
- long count = getInternalTableCount(internalTable);
+ long count = getInternalTableCount(internalTable, ss);
if (internalTable.getTablePartition() != null) {
- partitionDetails = extractPartitionDetails(internalTable);
+ partitionDetails = extractPartitionDetails(ss, internalTable);
} else {
partitionDetails = Collections.emptyList();
}
@@ -95,20 +116,10 @@ public class InternalTableLoadJob extends SparkApplication
{
partitionDetails.size());
List<String> finalPartitionValues = partitionDetails.stream()
.map(InternalTablePartitionDetail::getPartitionValue).collect(Collectors.toList());
- long finalCount = count;
- EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
- InternalTablePartition tablePartition =
internalTable.getTablePartition();
- if (tablePartition != null) {
- tablePartition.setPartitionValues(finalPartitionValues);
- tablePartition.setPartitionDetails(partitionDetails);
- }
- internalTable.setRowCount(finalCount);
- internalTableManager.saveOrUpdateInternalTable(internalTable);
- return true;
- }, project);
+ return new InternalTableMetaUpdateInfo(count, finalPartitionValues,
partitionDetails);
}
- private long getInternalTableCount(InternalTableDesc internalTable) {
+ private long getInternalTableCount(InternalTableDesc internalTable,
SparkSession ss) {
if (internalTable.getStorageType() ==
InternalTableDesc.StorageType.PARQUET) {
try {
val internalTableDs =
ss.read().format(internalTable.getStorageType().getFormat())
@@ -124,7 +135,8 @@ public class InternalTableLoadJob extends SparkApplication {
}
}
- private List<InternalTablePartitionDetail>
extractPartitionDetails(InternalTableDesc internalTable) {
+ private List<InternalTablePartitionDetail>
extractPartitionDetails(SparkSession ss,
+ InternalTableDesc internalTable) {
val partitionCol =
internalTable.getTablePartition().getPartitionColumns()[0];
List<String> partitionValues;
List<InternalTablePartitionDetail> partitionDetails = new
ArrayList<>();
@@ -202,4 +214,12 @@ public class InternalTableLoadJob extends SparkApplication
{
.getInternalTableDesc(tableName);
loader.dropPartitions(ss, internalTable, toBeDelete);
}
+
+ @Getter
+ @AllArgsConstructor
+ public static class InternalTableMetaUpdateInfo {
+ long finalCount;
+ List<String> partitionValues;
+ List<InternalTablePartitionDetail> partitionDetails;
+ }
}