This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 3f3f114 check derived column for stream table
3f3f114 is described below
commit 3f3f114400e49cf8bd9ec48c888b253c81994f79
Author: liukun4515 <[email protected]>
AuthorDate: Thu Mar 4 18:40:02 2021 +0800
check derived column for stream table
---
.../kylin/rest/controller/StreamingV2Controller.java | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
index 1b5a81e..0a0cc62 100644
---
a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
+++
b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
@@ -34,7 +34,6 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.dimension.TimeDerivedColumnType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.ISourceAware;
@@ -196,10 +195,12 @@ public class StreamingV2Controller extends
BasicController {
// validate the compatibility for input table schema and the underline
hive table schema
if (tableDesc.getSourceType() == ISourceAware.ID_KAFKA_HIVE) {
List<FieldSchema> fields;
+ List<FieldSchema> partitionFields;
String db = tableDesc.getDatabase();
try {
HiveMetaStoreClient metaStoreClient = new
HiveMetaStoreClient(new HiveConf());
fields = metaStoreClient.getFields(db, tableDesc.getName());
+ partitionFields = metaStoreClient.getTable(db,
tableDesc.getName()).getPartitionKeys();
logger.info("Checking the {} in {}", tableDesc.getName(), db);
} catch (NoSuchObjectException noObjectException) {
logger.info("table not exist in hive meta store for table:" +
tableDesc.getIdentity(),
@@ -216,18 +217,16 @@ public class StreamingV2Controller extends
BasicController {
for (FieldSchema field : fields) {
fieldSchemaMap.put(field.getName().toUpperCase(Locale.ROOT),
field);
}
+ // partition column
+ for (FieldSchema field : partitionFields) {
+ fieldSchemaMap.put(field.getName().toUpperCase(Locale.ROOT),
field);
+ }
List<String> incompatibleMsgs = Lists.newArrayList();
for (ColumnDesc columnDesc : tableDesc.getColumns()) {
FieldSchema fieldSchema =
fieldSchemaMap.get(columnDesc.getName().toUpperCase(Locale.ROOT));
if (fieldSchema == null) {
- // Partition column cannot be fetched via Hive Metadata
API.
- if
(!TimeDerivedColumnType.isTimeDerivedColumn(columnDesc.getName())) {
- incompatibleMsgs.add("Column not exist in hive table:"
+ columnDesc.getName());
- continue;
- } else {
- logger.info("Column not exist in hive table: {}.",
columnDesc.getName());
- continue;
- }
+ incompatibleMsgs.add("Column not exist in hive table:" +
columnDesc.getName());
+ continue;
}
if (!checkHiveTableFieldCompatible(fieldSchema, columnDesc)) {
String msg = String.format(Locale.ROOT,