This is an automated email from the ASF dual-hosted git repository.
liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 8faef1ef3d KYLIN-5973 fix streaming function
8faef1ef3d is described below
commit 8faef1ef3ddf238a0962dd79df2afc5604af421d
Author: Zhimin Wu <[email protected]>
AuthorDate: Fri Sep 27 15:05:55 2024 +0800
KYLIN-5973 fix streaming function
---
.../metadata/mapper/FusionModelDynamicSqlSupport.java | 2 --
.../persistence/metadata/mapper/FusionModelMapper.java | 11 -----------
.../metadata/mapper/StreamingJobMapper.java | 6 ++++++
.../persistence/resources/FusionModelRawResource.java | 6 ++++--
.../src/main/resources/metadata-jdbc-h2.properties | 1 -
.../src/main/resources/metadata-jdbc-mysql.properties | 1 -
.../main/resources/metadata-jdbc-postgresql.properties | 1 -
.../src/test/resources/ut_big_sqls/sqls1.expect | 18 ++++++++++++++++++
.../kylin/query/runtime/plan/TableScanPlan.scala | 10 +++++-----
9 files changed, 33 insertions(+), 23 deletions(-)
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java
index 521608cbd6..76c30d62e1 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java
@@ -30,8 +30,6 @@ public final class FusionModelDynamicSqlSupport {
public static final class FusionModel extends BasicSqlTable<FusionModel> {
- public final SqlColumn<String> modelUuid = column("model_uuid",
JDBCType.CHAR);
-
public FusionModel() {
super("fusion_model", FusionModel::new);
}
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
index ed36d80473..69c39af557 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
@@ -51,7 +51,6 @@ public interface FusionModelMapper extends
BasicMapper<FusionModelRawResource> {
@Result(column = "meta_key", property = "metaKey", jdbcType =
JdbcType.VARCHAR),
@Result(column = "project", property = "project", jdbcType =
JdbcType.VARCHAR),
@Result(column = "uuid", property = "uuid", jdbcType =
JdbcType.CHAR),
- @Result(column = "model_uuid", property = "modelUuid", jdbcType =
JdbcType.CHAR),
@Result(column = "mvcc", property = "mvcc", jdbcType =
JdbcType.BIGINT),
@Result(column = "ts", property = "ts", jdbcType =
JdbcType.BIGINT),
@Result(column = "reserved_filed_1", property = "reservedFiled1",
jdbcType = JdbcType.VARCHAR),
@@ -60,21 +59,11 @@ public interface FusionModelMapper extends
BasicMapper<FusionModelRawResource> {
@Result(column = "reserved_filed_3", property = "reservedFiled3",
jdbcType = JdbcType.LONGVARBINARY) })
List<FusionModelRawResource> selectMany(SelectStatementProvider
selectStatement);
- @Override
- default UpdateDSL<UpdateModel> updateAllColumns(FusionModelRawResource
record, UpdateDSL<UpdateModel> dsl) {
- dsl = BasicMapper.super.updateAllColumns(record, dsl);
- return dsl.set(sqlTable.modelUuid).equalTo(record::getModelUuid);
- }
-
@Override
default BasicSqlTable getSqlTable() {
return sqlTable;
}
- @Override
- default BasicColumn[] getSelectList() {
- return getSelectListWithAdditions(sqlTable.modelUuid);
- }
@Override
@SelectProvider(type = SqlWithRecordLockProviderAdapter.class, method =
"select")
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
index 9b342c050a..fc797e1029 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
@@ -31,6 +31,7 @@ import org.apache.ibatis.type.JdbcType;
import org.apache.kylin.common.persistence.metadata.jdbc.ContentTypeHandler;
import
org.apache.kylin.common.persistence.metadata.jdbc.SqlWithRecordLockProviderAdapter;
import org.apache.kylin.common.persistence.resources.StreamingJobRawResource;
+import org.mybatis.dynamic.sql.BasicColumn;
import org.mybatis.dynamic.sql.select.SelectDSLCompleter;
import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
@@ -42,6 +43,11 @@ public interface StreamingJobMapper extends
BasicMapper<StreamingJobRawResource>
return sqlTable;
}
+ @Override
+ default BasicColumn[] getSelectList() {
+ return getSelectListWithAdditions(sqlTable.modelUuid);
+ }
+
@Override
@SelectProvider(type = SqlWithRecordLockProviderAdapter.class, method =
"select")
@ResultMap("StreamingJobResult")
diff --git
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java
index 705183dd27..9568c6d4f6 100644
---
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java
+++
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java
@@ -32,6 +32,8 @@ public class FusionModelRawResource extends RawResource {
@JsonProperty("project")
private String project;
- @JsonProperty("model_uuid")
- private String modelUuid;
+ @Override
+ public String getModelUuid() {
+ return this.getUuid();
+ }
}
diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties
b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
index 1b5f558840..84be416a25 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
@@ -439,7 +439,6 @@ id bigint AUTO_INCREMENT NOT NULL,\
meta_key varchar(255) NOT NULL,\
project varchar(255) NOT NULL,\
uuid CHAR(36) NOT NULL,\
-model_uuid CHAR(36) NOT NULL,\
mvcc bigint NOT NULL,\
ts bigint NOT NULL,\
content bytea NOT NULL,\
diff --git a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
index 3b7dd85199..2754feb704 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
@@ -488,7 +488,6 @@ CREATE TABLE IF NOT EXISTS `%s_fusion_model` \
`meta_key` varchar(255) NOT NULL, \
`project` varchar(255) NOT NULL, \
`uuid` CHAR(36) NOT NULL COLLATE utf8_bin, \
-`model_uuid` CHAR(36) NOT NULL COLLATE utf8_bin, \
`mvcc` bigint NOT NULL, \
`ts` bigint NOT NULL, \
`content` longblob NOT NULL, \
diff --git
a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
index 734720d886..944a469eac 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
@@ -439,7 +439,6 @@ id bigserial NOT NULL,\
meta_key varchar(255) NOT NULL,\
project varchar(255) NOT NULL,\
uuid CHAR(36) COLLATE "C" NOT NULL,\
-model_uuid CHAR(36) NOT NULL,\
mvcc bigint NOT NULL,\
ts bigint NOT NULL,\
content bytea NOT NULL,\
diff --git a/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect
b/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect
index ce48a1b0d3..9701a578e5 100644
--- a/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect
+++ b/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect
@@ -1,3 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
SELECT "db_3"."F_NAME" AS "F_NAME_3", "db_3"."P_NAME" AS "P_NAME_3",
"db_3"."VERSION" AS "VERSION_3", "db_3"."COUNT" AS "COUNT_3", "db_3"."COST" AS
"COST_3" FROM (SELECT COUNT(*) AS count_all, SUM(cost) AS cost,
test_d_factory.F_NAME AS F_NAME
, test_d_product.P_NAME AS P_NAME, descrip_c AS version
FROM t_demo_.t_demo_data t_demo_data
diff --git
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
index d3e1084157..0b8c030b23 100644
---
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
+++
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
@@ -262,17 +262,17 @@ object TableScanPlan extends LogEx {
val otherDims = Sets.newHashSet(context.getDimensions)
otherDims.removeAll(groups)
// expand derived (xxxD means contains host columns only, derived columns
were translated)
- val groupsD = expandDerived(context.getBatchCandidate, groups)
+ val groupsD = expandDerived(context.getCandidate, groups)
val otherDimsD: util.Set[TblColRef] =
- expandDerived(context.getBatchCandidate, otherDims)
+ expandDerived(context.getCandidate, otherDims)
otherDimsD.removeAll(groupsD)
// identify cuboid
val dimensionsD = new util.LinkedHashSet[TblColRef]
dimensionsD.addAll(groupsD)
dimensionsD.addAll(otherDimsD)
- val model = context.getBatchCandidate.getLayoutEntity.getModel
- context.getBatchCandidate.getDerivedToHostMap.asScala.toList.foreach(m => {
+ val model = context.getCandidate.getLayoutEntity.getModel
+ context.getCandidate.getDerivedToHostMap.asScala.toList.foreach(m => {
if (m._2.`type` == DeriveInfo.DeriveType.LOOKUP && !m._2.isOneToOne) {
m._2.columns.asScala.foreach(derivedId => {
if (mapping.getIndexOf(model.getColRef(derivedId)) != -1) {
@@ -288,7 +288,7 @@ object TableScanPlan extends LogEx {
dataflow.getLatestReadySegment,
gtColIdx,
olapContext.getReturnTupleInfo,
- context.getBatchCandidate)
+ context.getCandidate)
if (derived.hasDerived) {
newPlan = derived.joinDerived(newPlan)
}