This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 4093d3f0ac KYLIN-5681 fix scd2 modeling
4093d3f0ac is described below
commit 4093d3f0acb4b29788e20248f4d78eff321a7f17
Author: Pengfei Zhan <[email protected]>
AuthorDate: Wed Aug 9 12:45:25 2023 +0800
KYLIN-5681 fix scd2 modeling
---
src/modeling-service/pom.xml | 4 +
.../kylin/rest/service/ModelSemanticHelper.java | 86 +++++++++++++---------
.../kylin/rest/service/ModelSmartSupporter.java | 27 -------
.../org/apache/kylin/query/engine/QueryExec.java | 25 ++++++-
.../kylin/query/util/QueryContextCutter.java | 9 +++
5 files changed, 87 insertions(+), 64 deletions(-)
diff --git a/src/modeling-service/pom.xml b/src/modeling-service/pom.xml
index 838a96531c..f152bdcd3b 100644
--- a/src/modeling-service/pom.xml
+++ b/src/modeling-service/pom.xml
@@ -37,6 +37,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-common-service</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-query</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-datasource-service</artifactId>
diff --git
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
index 0c8435af9a..0badc8532a 100644
---
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
+++
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSemanticHelper.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
@@ -43,6 +44,7 @@ import org.apache.calcite.sql.util.SqlVisitor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.exception.CommonErrorCode;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.QueryErrorCode;
@@ -53,6 +55,11 @@ import
org.apache.kylin.common.util.ModifyTableNameSqlVisitor;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.utils.ComputedColumnEvalUtil;
+import org.apache.kylin.guava30.shaded.common.base.Throwables;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.manager.JobManager;
import org.apache.kylin.job.model.JobParam;
import org.apache.kylin.metadata.cube.cuboid.NAggregationGroup;
@@ -91,10 +98,13 @@ import
org.apache.kylin.metadata.model.util.ExpandableMeasureUtil;
import org.apache.kylin.metadata.model.util.scd2.SCD2CondChecker;
import org.apache.kylin.metadata.model.util.scd2.SCD2Exception;
import org.apache.kylin.metadata.model.util.scd2.SCD2NonEquiCondSimplification;
+import org.apache.kylin.metadata.model.util.scd2.SCD2SqlConverter;
import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinDesc;
import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinTableDesc;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2;
+import org.apache.kylin.query.engine.QueryExec;
+import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.rest.request.ModelRequest;
@@ -106,18 +116,10 @@ import org.apache.kylin.rest.util.SpringContext;
import org.apache.kylin.source.SourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.apache.kylin.guava30.shaded.common.base.Throwables;
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
import io.kyligence.kap.secondstorage.SecondStorageUpdater;
import io.kyligence.kap.secondstorage.SecondStorageUtil;
-import lombok.Setter;
import lombok.val;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@@ -126,10 +128,6 @@ import lombok.extern.slf4j.Slf4j;
@Service
public class ModelSemanticHelper extends BasicService {
- @Setter
- @Autowired(required = false)
- private ModelSmartSupporter modelSmartSupporter;
-
private static final Logger logger =
LoggerFactory.getLogger(ModelSemanticHelper.class);
private final ExpandableMeasureUtil expandableMeasureUtil = new
ExpandableMeasureUtil((model, ccDesc) -> {
String ccExpression = PushDownUtil.massageComputedColumn(model,
model.getProject(), ccDesc,
@@ -257,11 +255,13 @@ public class ModelSemanticHelper extends BasicService {
HashSet<JoinDescNonEquiCompBean> scd2NonEquiCondSets = new HashSet<>();
- val projectKylinConfig =
getManager(NProjectManager.class).getProject(dataModel.getProject()).getConfig();
+ String project = dataModel.getProject();
+ val projectKylinConfig = NProjectManager.getProjectConfig(project);
boolean isScd2Enabled =
projectKylinConfig.isQueryNonEquiJoinModelEnabled();
-
+
QueryContext.current().setAclInfo(AclPermissionUtil.createAclInfo(project,
getCurrentUserGroups()));
+ QueryExec queryExec = new QueryExec(project, projectKylinConfig,
false);
for (int i = 0; i < requestJoinTableDescs.size(); i++) {
- final JoinDesc modelJoinDesc =
dataModel.getJoinTables().get(i).getJoin();
+ final JoinDesc joinWithoutNonEquivInfo =
dataModel.getJoinTables().get(i).getJoin();
final SimplifiedJoinDesc requestJoinDesc =
requestJoinTableDescs.get(i).getSimplifiedJoinDesc();
if
(CollectionUtils.isEmpty(requestJoinDesc.getSimplifiedNonEquiJoinConditions()))
{
@@ -277,41 +277,42 @@ public class ModelSemanticHelper extends BasicService {
checkRequestNonEquiJoinConds(requestJoinDesc);
//3. suggest nonEquiModel
- final JoinDesc suggModelJoin =
modelSmartSupporter.suggNonEquiJoinModel(projectKylinConfig,
- dataModel.getProject(), modelJoinDesc, requestJoinDesc);
+ String scd2Sql =
SCD2SqlConverter.INSTANCE.genSCD2SqlStr(joinWithoutNonEquivInfo,
+ requestJoinDesc.getSimplifiedNonEquiJoinConditions());
+ final JoinDesc analyzedJoin = deriveJoins(queryExec, scd2Sql);
// restore table alias in non-equi conditions
final NonEquiJoinCondition nonEquiCondWithAliasRestored = new
NonEquiJoinConditionVisitor() {
@Override
public NonEquiJoinCondition visitColumn(NonEquiJoinCondition
cond) {
TableRef originalTableRef;
if (cond.getColRef().getTableRef().getTableIdentity()
-
.equals(modelJoinDesc.getPKSide().getTableIdentity())) {
- originalTableRef = modelJoinDesc.getPKSide();
+
.equals(joinWithoutNonEquivInfo.getPKSide().getTableIdentity())) {
+ originalTableRef = joinWithoutNonEquivInfo.getPKSide();
} else {
- originalTableRef = modelJoinDesc.getFKSide();
+ originalTableRef = joinWithoutNonEquivInfo.getFKSide();
}
return new
NonEquiJoinCondition(originalTableRef.getColumn(cond.getColRef().getName()),
cond.getDataType());
}
- }.visit(suggModelJoin.getNonEquiJoinCondition());
-
suggModelJoin.setNonEquiJoinCondition(nonEquiCondWithAliasRestored);
- String expr = suggModelJoin.getNonEquiJoinCondition().getExpr();
- expr = expr.replaceAll(suggModelJoin.getPKSide().getAlias(),
modelJoinDesc.getPKSide().getAlias());
- expr = expr.replaceAll(suggModelJoin.getFKSide().getAlias(),
modelJoinDesc.getFKSide().getAlias());
- suggModelJoin.getNonEquiJoinCondition().setExpr(expr);
- suggModelJoin.setPrimaryTableRef(modelJoinDesc.getPKSide());
- suggModelJoin.setPrimaryTable(modelJoinDesc.getPrimaryTable());
- suggModelJoin.setForeignTableRef(modelJoinDesc.getFKSide());
- suggModelJoin.setForeignTable(modelJoinDesc.getForeignTable());
+ }.visit(analyzedJoin.getNonEquiJoinCondition());
+ analyzedJoin.setNonEquiJoinCondition(nonEquiCondWithAliasRestored);
+ String expr = analyzedJoin.getNonEquiJoinCondition().getExpr();
+ expr = expr.replaceAll(analyzedJoin.getPKSide().getAlias(),
joinWithoutNonEquivInfo.getPKSide().getAlias());
+ expr = expr.replaceAll(analyzedJoin.getFKSide().getAlias(),
joinWithoutNonEquivInfo.getFKSide().getAlias());
+ analyzedJoin.getNonEquiJoinCondition().setExpr(expr);
+
analyzedJoin.setPrimaryTableRef(joinWithoutNonEquivInfo.getPKSide());
+
analyzedJoin.setPrimaryTable(joinWithoutNonEquivInfo.getPrimaryTable());
+
analyzedJoin.setForeignTableRef(joinWithoutNonEquivInfo.getFKSide());
+
analyzedJoin.setForeignTable(joinWithoutNonEquivInfo.getForeignTable());
//4. update dataModel
try {
-
SCD2NonEquiCondSimplification.INSTANCE.convertToSimplifiedSCD2Cond(suggModelJoin);
-
modelJoinDesc.setNonEquiJoinCondition(suggModelJoin.getNonEquiJoinCondition());
- modelJoinDesc.setForeignTable(suggModelJoin.getForeignTable());
- modelJoinDesc.setPrimaryTable(suggModelJoin.getPrimaryTable());
+
SCD2NonEquiCondSimplification.INSTANCE.convertToSimplifiedSCD2Cond(analyzedJoin);
+
joinWithoutNonEquivInfo.setNonEquiJoinCondition(analyzedJoin.getNonEquiJoinCondition());
+
joinWithoutNonEquivInfo.setForeignTable(analyzedJoin.getForeignTable());
+
joinWithoutNonEquivInfo.setPrimaryTable(analyzedJoin.getPrimaryTable());
} catch (SCD2Exception e) {
logger.error("Update datamodel failed...", e);
throw new KylinException(QueryErrorCode.SCD2_COMMON_ERROR,
Throwables.getRootCause(e).getMessage());
@@ -328,6 +329,23 @@ public class ModelSemanticHelper extends BasicService {
}
+ private JoinDesc deriveJoins(QueryExec queryExec, String sql) {
+ List<OLAPContext> contexts = queryExec.deriveOlapContexts(sql);
+ Optional<RuntimeException> th;
+ if (contexts.size() == 0) {
+ th = Optional.of(new SCD2Exception("Failed to extract joins from
the input sql: " + sql));
+ } else if (contexts.size() > 1) {
+ th = Optional.of(new SCD2Exception("Non-equiv-join conditions were
split. the input sql is: " + sql));
+ } else {
+ OLAPContext ctx = contexts.get(0);
+ if (ctx.joins.size() == 1) {
+ return ctx.joins.get(0);
+ }
+ th = Optional.of(new SCD2Exception("Non-equiv-join conditions were
split. the input sql is: " + sql));
+ }
+ throw th.get();
+ }
+
private void checkRequestNonEquiJoinConds(final SimplifiedJoinDesc
requestJoinDesc) {
if
(!SCD2CondChecker.INSTANCE.checkSCD2EquiJoinCond(requestJoinDesc.getForeignKey(),
diff --git
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSmartSupporter.java
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSmartSupporter.java
deleted file mode 100644
index 19bf5061ac..0000000000
---
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelSmartSupporter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.rest.service;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.util.scd2.SimplifiedJoinDesc;
-
-public interface ModelSmartSupporter {
- JoinDesc suggNonEquiJoinModel(final KylinConfig kylinConfig, final String
project, final JoinDesc modelJoinDesc,
- final SimplifiedJoinDesc requestJoinDesc);
-}
diff --git
a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
index dd0f66fcc9..80295c22e8 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
@@ -45,11 +45,14 @@ import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexExecutorImpl;
import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryTrace;
import org.apache.kylin.common.ReadFsSwitch;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.query.StructField;
@@ -68,15 +71,13 @@ import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.util.AsyncQueryUtil;
import org.apache.kylin.query.util.CalcitePlanRouterVisitor;
import org.apache.kylin.query.util.HepUtils;
+import org.apache.kylin.query.util.QueryContextCutter;
import org.apache.kylin.query.util.QueryInterruptChecker;
import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.query.util.RelAggPushDownUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-
import lombok.Getter;
import lombok.Setter;
@@ -211,6 +212,24 @@ public class QueryExec {
}
}
+ public List<OLAPContext> deriveOlapContexts(String sql) {
+ List<OLAPContext> contexts = Lists.newArrayList();
+ try {
+ OLAPContext.clearThreadLocalContexts();
+ RelNode relNode = parseAndOptimize(sql);
+ QueryContextCutter.analyzeOlapContext(relNode);
+ Collection<OLAPContext> tmp = OLAPContext.getThreadLocalContexts();
+ if (CollectionUtils.isNotEmpty(tmp)) {
+ contexts.addAll(tmp);
+ }
+ } catch (Exception e) {
+ logger.error("Sql Parsing error.", e);
+ } finally {
+ OLAPContext.clearThreadLocalContexts();
+ }
+ return contexts;
+ }
+
private void magicDirts(String sql) {
if (sql.contains("ReadFsSwitch.turnOnBackupFsWhile")) {
ReadFsSwitch.turnOnBackupFsWhile();
diff --git
a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
index 55006842b4..5d4e133447 100644
---
a/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
+++
b/src/query/src/main/java/org/apache/kylin/query/util/QueryContextCutter.java
@@ -46,6 +46,15 @@ public class QueryContextCutter {
private QueryContextCutter() {
}
+ /**
+ * Analyze a RelNode tree to get olapContexts.
+ * @param root The root relNode of a query statement
+ */
+ public static void analyzeOlapContext(RelNode root) {
+ cutContext(new ContextInitialCutStrategy(), (KapRel) root.getInput(0),
root);
+ fillOlapContextPropertiesWithRelTree(root);
+ }
+
/**
* For each query parse tree, the following steps are used for generating
OlapContexts
* and matching the precomputed indexes.