This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 8faef1ef3d KYLIN-5973 fix streaming function
8faef1ef3d is described below

commit 8faef1ef3ddf238a0962dd79df2afc5604af421d
Author: Zhimin Wu <[email protected]>
AuthorDate: Fri Sep 27 15:05:55 2024 +0800

    KYLIN-5973 fix streaming function
---
 .../metadata/mapper/FusionModelDynamicSqlSupport.java  |  2 --
 .../persistence/metadata/mapper/FusionModelMapper.java | 11 -----------
 .../metadata/mapper/StreamingJobMapper.java            |  6 ++++++
 .../persistence/resources/FusionModelRawResource.java  |  6 ++++--
 .../src/main/resources/metadata-jdbc-h2.properties     |  1 -
 .../src/main/resources/metadata-jdbc-mysql.properties  |  1 -
 .../main/resources/metadata-jdbc-postgresql.properties |  1 -
 .../src/test/resources/ut_big_sqls/sqls1.expect        | 18 ++++++++++++++++++
 .../kylin/query/runtime/plan/TableScanPlan.scala       | 10 +++++-----
 9 files changed, 33 insertions(+), 23 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java
index 521608cbd6..76c30d62e1 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelDynamicSqlSupport.java
@@ -30,8 +30,6 @@ public final class FusionModelDynamicSqlSupport {
 
     public static final class FusionModel extends BasicSqlTable<FusionModel> {
 
-        public final SqlColumn<String> modelUuid = column("model_uuid", 
JDBCType.CHAR);
-
         public FusionModel() {
             super("fusion_model", FusionModel::new);
         }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
index ed36d80473..69c39af557 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/FusionModelMapper.java
@@ -51,7 +51,6 @@ public interface FusionModelMapper extends 
BasicMapper<FusionModelRawResource> {
             @Result(column = "meta_key", property = "metaKey", jdbcType = 
JdbcType.VARCHAR),
             @Result(column = "project", property = "project", jdbcType = 
JdbcType.VARCHAR),
             @Result(column = "uuid", property = "uuid", jdbcType = 
JdbcType.CHAR),
-            @Result(column = "model_uuid", property = "modelUuid", jdbcType = 
JdbcType.CHAR),
             @Result(column = "mvcc", property = "mvcc", jdbcType = 
JdbcType.BIGINT),
             @Result(column = "ts", property = "ts", jdbcType = 
JdbcType.BIGINT),
             @Result(column = "reserved_filed_1", property = "reservedFiled1", 
jdbcType = JdbcType.VARCHAR),
@@ -60,21 +59,11 @@ public interface FusionModelMapper extends 
BasicMapper<FusionModelRawResource> {
             @Result(column = "reserved_filed_3", property = "reservedFiled3", 
jdbcType = JdbcType.LONGVARBINARY) })
     List<FusionModelRawResource> selectMany(SelectStatementProvider 
selectStatement);
 
-    @Override
-    default UpdateDSL<UpdateModel> updateAllColumns(FusionModelRawResource 
record, UpdateDSL<UpdateModel> dsl) {
-        dsl = BasicMapper.super.updateAllColumns(record, dsl);
-        return dsl.set(sqlTable.modelUuid).equalTo(record::getModelUuid);
-    }
-
     @Override
     default BasicSqlTable getSqlTable() {
         return sqlTable;
     }
 
-    @Override
-    default BasicColumn[] getSelectList() {
-        return getSelectListWithAdditions(sqlTable.modelUuid);
-    }
 
     @Override
     @SelectProvider(type = SqlWithRecordLockProviderAdapter.class, method = 
"select")
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
index 9b342c050a..fc797e1029 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/metadata/mapper/StreamingJobMapper.java
@@ -31,6 +31,7 @@ import org.apache.ibatis.type.JdbcType;
 import org.apache.kylin.common.persistence.metadata.jdbc.ContentTypeHandler;
 import 
org.apache.kylin.common.persistence.metadata.jdbc.SqlWithRecordLockProviderAdapter;
 import org.apache.kylin.common.persistence.resources.StreamingJobRawResource;
+import org.mybatis.dynamic.sql.BasicColumn;
 import org.mybatis.dynamic.sql.select.SelectDSLCompleter;
 import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
 import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
@@ -42,6 +43,11 @@ public interface StreamingJobMapper extends 
BasicMapper<StreamingJobRawResource>
         return sqlTable;
     }
 
+    @Override
+    default BasicColumn[] getSelectList() {
+        return getSelectListWithAdditions(sqlTable.modelUuid);
+    }
+
     @Override
     @SelectProvider(type = SqlWithRecordLockProviderAdapter.class, method = 
"select")
     @ResultMap("StreamingJobResult")
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java
index 705183dd27..9568c6d4f6 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/persistence/resources/FusionModelRawResource.java
@@ -32,6 +32,8 @@ public class FusionModelRawResource extends RawResource {
     @JsonProperty("project")
     private String project;
 
-    @JsonProperty("model_uuid")
-    private String modelUuid;
+    @Override
+    public String getModelUuid() {
+        return this.getUuid();
+    }
 }
diff --git a/src/core-common/src/main/resources/metadata-jdbc-h2.properties 
b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
index 1b5f558840..84be416a25 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-h2.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-h2.properties
@@ -439,7 +439,6 @@ id               bigint AUTO_INCREMENT NOT NULL,\
 meta_key         varchar(255)          NOT NULL,\
 project          varchar(255)          NOT NULL,\
 uuid             CHAR(36)              NOT NULL,\
-model_uuid       CHAR(36)              NOT NULL,\
 mvcc             bigint                NOT NULL,\
 ts               bigint                NOT NULL,\
 content          bytea                 NOT NULL,\
diff --git a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties 
b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
index 3b7dd85199..2754feb704 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-mysql.properties
@@ -488,7 +488,6 @@ CREATE TABLE IF NOT EXISTS `%s_fusion_model` \
 `meta_key`         varchar(255)    NOT NULL, \
 `project`          varchar(255)    NOT NULL, \
 `uuid`             CHAR(36)        NOT NULL COLLATE utf8_bin, \
-`model_uuid`       CHAR(36)        NOT NULL COLLATE utf8_bin, \
 `mvcc`             bigint          NOT NULL, \
 `ts`               bigint          NOT NULL, \
 `content`          longblob        NOT NULL, \
diff --git 
a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties 
b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
index 734720d886..944a469eac 100644
--- a/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
+++ b/src/core-common/src/main/resources/metadata-jdbc-postgresql.properties
@@ -439,7 +439,6 @@ id               bigserial            NOT NULL,\
 meta_key         varchar(255)         NOT NULL,\
 project          varchar(255)         NOT NULL,\
 uuid             CHAR(36) COLLATE "C" NOT NULL,\
-model_uuid       CHAR(36)             NOT NULL,\
 mvcc             bigint               NOT NULL,\
 ts               bigint               NOT NULL,\
 content          bytea                NOT NULL,\
diff --git a/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect 
b/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect
index ce48a1b0d3..9701a578e5 100644
--- a/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect
+++ b/src/rec-service/src/test/resources/ut_big_sqls/sqls1.expect
@@ -1,3 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
 SELECT "db_3"."F_NAME" AS "F_NAME_3", "db_3"."P_NAME" AS "P_NAME_3", 
"db_3"."VERSION" AS "VERSION_3", "db_3"."COUNT" AS "COUNT_3", "db_3"."COST" AS 
"COST_3" FROM (SELECT COUNT(*) AS count_all, SUM(cost) AS cost, 
test_d_factory.F_NAME AS F_NAME
     , test_d_product.P_NAME AS P_NAME, descrip_c AS version
 FROM t_demo_.t_demo_data t_demo_data
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
index d3e1084157..0b8c030b23 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
@@ -262,17 +262,17 @@ object TableScanPlan extends LogEx {
     val otherDims = Sets.newHashSet(context.getDimensions)
     otherDims.removeAll(groups)
     // expand derived (xxxD means contains host columns only, derived columns 
were translated)
-    val groupsD = expandDerived(context.getBatchCandidate, groups)
+    val groupsD = expandDerived(context.getCandidate, groups)
     val otherDimsD: util.Set[TblColRef] =
-      expandDerived(context.getBatchCandidate, otherDims)
+      expandDerived(context.getCandidate, otherDims)
     otherDimsD.removeAll(groupsD)
 
     // identify cuboid
     val dimensionsD = new util.LinkedHashSet[TblColRef]
     dimensionsD.addAll(groupsD)
     dimensionsD.addAll(otherDimsD)
-    val model = context.getBatchCandidate.getLayoutEntity.getModel
-    context.getBatchCandidate.getDerivedToHostMap.asScala.toList.foreach(m => {
+    val model = context.getCandidate.getLayoutEntity.getModel
+    context.getCandidate.getDerivedToHostMap.asScala.toList.foreach(m => {
       if (m._2.`type` == DeriveInfo.DeriveType.LOOKUP && !m._2.isOneToOne) {
         m._2.columns.asScala.foreach(derivedId => {
           if (mapping.getIndexOf(model.getColRef(derivedId)) != -1) {
@@ -288,7 +288,7 @@ object TableScanPlan extends LogEx {
       dataflow.getLatestReadySegment,
       gtColIdx,
       olapContext.getReturnTupleInfo,
-      context.getBatchCandidate)
+      context.getCandidate)
     if (derived.hasDerived) {
       newPlan = derived.joinDerived(newPlan)
     }

Reply via email to