This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch mtmv_rewrite_performance
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/mtmv_rewrite_performance by
this push:
new e6055bdb1f8 [opt](mtmv) Optimize get available materialized view logic
by reducing partition version comparison (#48477)
e6055bdb1f8 is described below
commit e6055bdb1f8b230ee60ced2bdb130af2ba76ea4d
Author: seawinde <[email protected]>
AuthorDate: Fri Feb 28 17:55:32 2025 +0800
[opt](mtmv) Optimize get available materialized view logic by reducing
partition version comparison (#48477)
---
.../doris/common/profile/SummaryProfile.java | 18 +-
.../org/apache/doris/mtmv/MTMVRelationManager.java | 16 +-
.../org/apache/doris/mtmv/MTMVRewriteUtil.java | 5 +-
.../org/apache/doris/nereids/CascadesContext.java | 5 +
.../org/apache/doris/nereids/NereidsPlanner.java | 19 ++
.../java/org/apache/doris/nereids/PlannerHook.java | 12 +
.../org/apache/doris/nereids/StatementContext.java | 48 +++-
.../jobs/executor/TablePartitionCollector.java | 46 ++++
.../apache/doris/nereids/memo/StructInfoMap.java | 40 ++-
.../org/apache/doris/nereids/rules/RuleType.java | 1 +
.../mv/AbstractMaterializedViewRule.java | 184 +++++---------
.../rules/exploration/mv/HyperGraphComparator.java | 41 ++--
.../mv/InitMaterializationContextHook.java | 2 +-
.../exploration/mv/MaterializationContext.java | 15 ++
...terializedViewAggregateOnNoneAggregateRule.java | 12 +-
.../exploration/mv/MaterializedViewUtils.java | 2 +-
.../rules/exploration/mv/PartitionCompensator.java | 214 ++++++++++++++++
.../nereids/rules/exploration/mv/StructInfo.java | 113 ++-------
.../rules/rewrite/PruneFileScanPartition.java | 1 -
.../rules/rewrite/QueryPartitionCollector.java | 90 +++++++
.../plans/visitor/ExpressionLineageReplacer.java | 29 +--
.../java/org/apache/doris/qe/SessionVariable.java | 18 ++
.../org/apache/doris/mtmv/MTMVRewriteUtilTest.java | 34 ++-
.../doris/nereids/memo/StructInfoMapTest.java | 26 +-
.../doris/nereids/mv/IdStatisticsMapTest.java | 4 +-
.../nereids/mv/MtmvCacheNewConnectContextTest.java | 6 +-
.../doris/nereids/mv/MvTableIdIsLongTest.java | 6 +-
.../nereids/mv/OptimizeGetAvailableMvsTest.java | 271 +++++++++++++++++++++
.../org/apache/doris/nereids/util/PlanChecker.java | 2 +
29 files changed, 988 insertions(+), 292 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 7215b8a9c65..f4a8cc3fd72 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -99,6 +99,8 @@ public class SummaryProfile {
public static final String NEREIDS_LOCK_TABLE_TIME = "Nereids Lock Table
Time";
public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";
+
+ public static final String NEREIDS_COLLECT_TABLE_PARTITION_TIME = "Nereids
Collect Table Partition Time";
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate
Time";
public static final String NEREIDS_DISTRIBUTE_TIME = "Nereids Distribute
Time";
@@ -227,6 +229,9 @@ public class SummaryProfile {
private long parseSqlFinishTime = -1;
@SerializedName(value = "nereidsLockTableFinishTime")
private long nereidsLockTableFinishTime = -1;
+
+ @SerializedName(value = "nereidsCollectTablePartitionTime")
+ private long nereidsCollectTablePartitionTime = -1;
@SerializedName(value = "nereidsAnalysisFinishTime")
private long nereidsAnalysisFinishTime = -1;
@SerializedName(value = "nereidsRewriteFinishTime")
@@ -416,6 +421,8 @@ public class SummaryProfile {
executionSummaryProfile.addInfoString(NEREIDS_LOCK_TABLE_TIME,
getPrettyNereidsLockTableTime());
executionSummaryProfile.addInfoString(NEREIDS_ANALYSIS_TIME,
getPrettyNereidsAnalysisTime());
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME,
getPrettyNereidsRewriteTime());
+
executionSummaryProfile.addInfoString(NEREIDS_COLLECT_TABLE_PARTITION_TIME,
+ getPrettyNereidsCollectTablePartitionTime());
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME,
getPrettyNereidsOptimizeTime());
executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME,
getPrettyNereidsTranslateTime());
executionSummaryProfile.addInfoString(NEREIDS_DISTRIBUTE_TIME,
getPrettyNereidsDistributeTime());
@@ -514,6 +521,10 @@ public class SummaryProfile {
this.nereidsLockTableFinishTime = TimeUtils.getStartTimeMs();
}
+ public void setNereidsCollectTablePartitionTime() {
+ this.nereidsCollectTablePartitionTime = TimeUtils.getStartTimeMs();
+ }
+
public void setNereidsAnalysisTime() {
this.nereidsAnalysisFinishTime = TimeUtils.getStartTimeMs();
}
@@ -788,8 +799,13 @@ public class SummaryProfile {
return getPrettyTime(nereidsRewriteFinishTime,
nereidsAnalysisFinishTime, TUnit.TIME_MS);
}
+
+ public String getPrettyNereidsCollectTablePartitionTime() {
+ return getPrettyTime(nereidsCollectTablePartitionTime,
nereidsRewriteFinishTime, TUnit.TIME_MS);
+ }
+
public String getPrettyNereidsOptimizeTime() {
- return getPrettyTime(nereidsOptimizeFinishTime,
nereidsRewriteFinishTime, TUnit.TIME_MS);
+ return getPrettyTime(nereidsOptimizeFinishTime,
nereidsCollectTablePartitionTime, TUnit.TIME_MS);
}
public String getPrettyNereidsTranslateTime() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
index 125b8ce38de..5c48a2a982d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
@@ -19,6 +19,7 @@ package org.apache.doris.mtmv;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
@@ -27,6 +28,7 @@ import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.nereids.rules.exploration.mv.PartitionCompensator;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
@@ -43,6 +45,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -87,7 +90,7 @@ public class MTMVRelationManager implements MTMVHookService {
if (predicate.test(ctx, mtmv)) {
continue;
}
- if (isMVPartitionValid(mtmv, ctx, forceConsistent)) {
+ if (isMVPartitionValid(mtmv, ctx, forceConsistent,
PartitionCompensator.getQueryUsedPartitions(ctx))) {
res.add(mtmv);
}
} catch (Exception e) {
@@ -116,10 +119,15 @@ public class MTMVRelationManager implements
MTMVHookService {
}
@VisibleForTesting
- public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean
forceConsistent) {
+ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx, boolean
forceConsistent,
+ Map<List<String>, Set<String>> queryUsedRelatedTablePartitionsMap)
{
long currentTimeMillis = System.currentTimeMillis();
- return !CollectionUtils
- .isEmpty(MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
ctx, currentTimeMillis, forceConsistent));
+ Collection<Partition> mtmvCanRewritePartitions =
MTMVRewriteUtil.getMTMVCanRewritePartitions(
+ mtmv, ctx, currentTimeMillis, forceConsistent,
queryUsedRelatedTablePartitionsMap);
+ // MTMVRewriteUtil.getMTMVCanRewritePartitions is time-consuming
behavior, So record for used later
+ ctx.getStatementContext().getMvCanRewritePartitionsMap().putIfAbsent(
+ new BaseTableInfo(mtmv), mtmvCanRewritePartitions);
+ return !CollectionUtils.isEmpty(mtmvCanRewritePartitions);
}
private Set<BaseTableInfo> getMTMVInfos(List<BaseTableInfo> tableInfos) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
index ff1b3263d34..e1cb899c23c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java
@@ -31,6 +31,8 @@ import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class MTMVRewriteUtil {
private static final Logger LOG =
LogManager.getLogger(MTMVRewriteUtil.class);
@@ -43,7 +45,8 @@ public class MTMVRewriteUtil {
* @return
*/
public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv,
ConnectContext ctx,
- long currentTimeMills, boolean forceConsistent) {
+ long currentTimeMills, boolean forceConsistent,
+ Map<List<String>, Set<String>> queryUsedRelatedTablePartitionsMap)
{
List<Partition> res = Lists.newArrayList();
Collection<Partition> allPartitions = mtmv.getPartitions();
MTMVRelation mtmvRelation = mtmv.getRelation();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
index 4f81dde82d9..586cd071997 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.jobs.Job;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.jobs.executor.Analyzer;
import org.apache.doris.nereids.jobs.executor.TableCollector;
+import org.apache.doris.nereids.jobs.executor.TablePartitionCollector;
import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
import org.apache.doris.nereids.jobs.rewrite.RewriteTopDownJob;
import
org.apache.doris.nereids.jobs.rewrite.RootPlanTreeRewriteJob.RootRewriteJobContext;
@@ -228,6 +229,10 @@ public class CascadesContext implements ScheduleContext {
return new TableCollector(this);
}
+ public TablePartitionCollector newTablePartitionCollector() {
+ return new TablePartitionCollector(this);
+ }
+
public Analyzer newAnalyzer() {
return new Analyzer(this);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 7b55d12722b..54e191b9e3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -267,6 +267,9 @@ public class NereidsPlanner extends Planner {
}
}
+ // collect partitions table used, this is for query rewrite by
materialized view
+ collectTableUsedPartitions(showPlanProcess);
+
optimize();
// print memo before choose plan.
// if chooseNthPlan failed, we could get memo to debug
@@ -357,6 +360,21 @@ public class NereidsPlanner extends Planner {
}
}
+ protected void collectTableUsedPartitions(boolean showPlanProcess) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start to collect table used partition");
+ }
+ keepOrShowPlanProcess(showPlanProcess, () ->
cascadesContext.newTablePartitionCollector().execute());
+ NereidsTracer.logImportantTime("EndCollectTablePartitions");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Start to collect table used partition");
+ }
+ if (statementContext.getConnectContext().getExecutor() != null) {
+
statementContext.getConnectContext().getExecutor().getSummaryProfile()
+ .setNereidsCollectTablePartitionTime();
+ }
+ }
+
protected void analyze(boolean showPlanProcess) {
if (LOG.isDebugEnabled()) {
LOG.debug("Start analyze plan");
@@ -382,6 +400,7 @@ public class NereidsPlanner extends Planner {
LOG.debug("Start rewrite plan");
}
keepOrShowPlanProcess(showPlanProcess, () ->
Rewriter.getWholeTreeRewriter(cascadesContext).execute());
+ this.statementContext.getPlannerHooks().forEach(hook ->
hook.afterRewrite(this));
NereidsTracer.logImportantTime("EndRewritePlan");
if (LOG.isDebugEnabled()) {
LOG.debug("End rewrite plan");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerHook.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerHook.java
index 18d71b539a9..76a68f0b22c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerHook.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlannerHook.java
@@ -35,4 +35,16 @@ public interface PlannerHook {
*/
default void afterAnalyze(NereidsPlanner planner) {
}
+
+ /**
+ * the hook before rewrite
+ */
+ default void beforeRewrite(NereidsPlanner planner) {
+ }
+
+ /**
+ * the hook after rewrite
+ */
+ default void afterRewrite(NereidsPlanner planner) {
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index 75353f446a4..fc1fcddbb19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.constraint.TableIdentifier;
@@ -29,6 +30,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccTable;
import org.apache.doris.datasource.mvcc.MvccTableInfo;
+import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.hint.UseMvHint;
@@ -61,6 +63,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -106,6 +109,7 @@ public class StatementContext implements Closeable {
private ConnectContext connectContext;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+ private final Stopwatch materializedViewStopwatch =
Stopwatch.createUnstarted();
@GuardedBy("this")
private final Map<String, Supplier<Object>> contextCacheMap =
Maps.newLinkedHashMap();
@@ -173,7 +177,14 @@ public class StatementContext implements Closeable {
// tables in this query directly
private final Map<List<String>, TableIf> tables = Maps.newHashMap();
- // tables maybe used by mtmv rewritten in this query
+ // tables maybe used by mtmv rewritten in this query,
+ // this contains mvs which use table in tables and the tables in mvs
+ // such as
+ // mv1 use t1, t2.
+ // mv2 use mv1, t3, t4.
+ // mv3 use t3, t4, t5
+ // if query is: select * from t2 join t5
+ // mtmvRelatedTables is mv1, mv2, mv3, t1, t2, t3, t4, t5
private final Map<List<String>, TableIf> mtmvRelatedTables =
Maps.newHashMap();
// insert into target tables
private final Map<List<String>, TableIf> insertTargetTables =
Maps.newHashMap();
@@ -213,6 +224,17 @@ public class StatementContext implements Closeable {
private boolean privChecked;
+ // the duration which materialized view rewrite used, if -1 means this
duration is exceeded
+ // if greater than 0 means the duration has used
+ private long materializedViewRewriteDuration = 0L;
+
+ // Record used table and it's used partitions
+ private final Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap =
+ HashMultimap.create();
+
+ // Record mtmv and valid partitions map because this is time-consuming
behavior
+ private final Map<BaseTableInfo, Collection<Partition>>
mvCanRewritePartitionsMap = new HashMap<>();
+
public StatementContext() {
this(ConnectContext.get(), null, 0);
}
@@ -343,6 +365,22 @@ public class StatementContext implements Closeable {
return stopwatch;
}
+ public Stopwatch getMaterializedViewStopwatch() {
+ return materializedViewStopwatch;
+ }
+
+ public long getMaterializedViewRewriteDuration() {
+ return materializedViewRewriteDuration;
+ }
+
+ public void addMaterializedViewRewriteDuration(long millisecond) {
+ materializedViewRewriteDuration = materializedViewRewriteDuration +
millisecond;
+ }
+
+ public void materializedViewRewriteDurationExceeded() {
+ materializedViewRewriteDuration = -1;
+ }
+
public void setMaxNAryInnerJoin(int maxNAryInnerJoin) {
if (maxNAryInnerJoin > this.maxNAryInnerJoin) {
this.maxNAryInnerJoin = maxNAryInnerJoin;
@@ -773,4 +811,12 @@ public class StatementContext implements Closeable {
public void setPrivChecked(boolean privChecked) {
this.privChecked = privChecked;
}
+
+ public Multimap<List<String>, Pair<RelationId, Set<String>>>
getTableUsedPartitionNameMap() {
+ return tableUsedPartitionNameMap;
+ }
+
+ public Map<BaseTableInfo, Collection<Partition>>
getMvCanRewritePartitionsMap() {
+ return mvCanRewritePartitionsMap;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/TablePartitionCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/TablePartitionCollector.java
new file mode 100644
index 00000000000..e67b94d1314
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/TablePartitionCollector.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.executor;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.jobs.rewrite.RewriteJob;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.QueryPartitionCollector;
+
+import java.util.List;
+
+/**
+ * Collect partitions which query used, this is useful for optimizing get
available mvs,
+ * should collect after RBO
+ */
+public class TablePartitionCollector extends AbstractBatchJobExecutor {
+ public TablePartitionCollector(CascadesContext cascadesContext) {
+ super(cascadesContext);
+ }
+
+ @Override
+ public List<RewriteJob> getJobs() {
+ return buildCollectorJobs();
+ }
+
+ private static List<RewriteJob> buildCollectorJobs() {
+ return jobs(
+ custom(RuleType.COLLECT_PARTITIONS,
QueryPartitionCollector::new)
+ );
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/StructInfoMap.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/StructInfoMap.java
index 4aa4f146b87..7d11121cf67 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/StructInfoMap.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/StructInfoMap.java
@@ -20,11 +20,14 @@ package org.apache.doris.nereids.memo;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.BitSet;
@@ -42,6 +45,8 @@ import javax.annotation.Nullable;
* Representation for group in cascades optimizer.
*/
public class StructInfoMap {
+
+ public static final Logger LOG = LogManager.getLogger(StructInfoMap.class);
private final Map<BitSet, Pair<GroupExpression, List<BitSet>>>
groupExpressionMap = new HashMap<>();
private final Map<BitSet, StructInfo> infoMap = new HashMap<>();
private long refreshVersion = 0;
@@ -60,7 +65,7 @@ public class StructInfoMap {
return structInfo;
}
if (groupExpressionMap.isEmpty() ||
!groupExpressionMap.containsKey(tableMap)) {
- refresh(group, cascadesContext);
+ refresh(group, cascadesContext, new HashSet<>());
group.getstructInfoMap().setRefreshVersion(cascadesContext.getMemo().getRefreshVersion());
}
if (groupExpressionMap.containsKey(tableMap)) {
@@ -116,13 +121,13 @@ public class StructInfoMap {
* @param group the root group
*
*/
- public void refresh(Group group, CascadesContext cascadesContext) {
+ public void refresh(Group group, CascadesContext cascadesContext,
Set<Integer> refreshedGroup) {
+ refreshedGroup.add(group.getGroupId().asInt());
StructInfoMap structInfoMap = group.getstructInfoMap();
long memoVersion = cascadesContext.getMemo().getRefreshVersion();
if (!structInfoMap.getTableMaps().isEmpty() && memoVersion ==
structInfoMap.refreshVersion) {
return;
}
- Set<Integer> refreshedGroup = new HashSet<>();
for (GroupExpression groupExpression : group.getLogicalExpressions()) {
List<Set<BitSet>> childrenTableMap = new LinkedList<>();
if (groupExpression.children().isEmpty()) {
@@ -136,10 +141,9 @@ public class StructInfoMap {
for (Group child : groupExpression.children()) {
StructInfoMap childStructInfoMap = child.getstructInfoMap();
if (!refreshedGroup.contains(child.getGroupId().asInt())) {
- childStructInfoMap.refresh(child, cascadesContext);
+ childStructInfoMap.refresh(child, cascadesContext,
refreshedGroup);
childStructInfoMap.setRefreshVersion(memoVersion);
}
- refreshedGroup.add(child.getGroupId().asInt());
childrenTableMap.add(child.getstructInfoMap().getTableMaps());
}
// if one same groupExpression have refreshed, continue
@@ -156,12 +160,38 @@ public class StructInfoMap {
// if cumulative child table map is different from current
// or current group expression map is empty, should update the
groupExpressionMap currently
Collection<Pair<BitSet, List<BitSet>>> bitSetWithChildren =
cartesianProduct(childrenTableMap);
+ long maxCombineCount = cascadesContext.getConnectContext()
+
.getSessionVariable().materializedViewStructInfoMaxCombineCount;
+ if (bitSetWithChildren.size() > maxCombineCount) {
+ LOG.warn("StructInfoMap refresh bitSetWithChildren is larger
than threshold,"
+ + "bitSetWithChildren size is {}, sql hash is
{}, current groupExpression plan is {}",
+ bitSetWithChildren.size(),
+ cascadesContext.getConnectContext().getSqlHash(),
+ groupExpression.getPlan().treeString());
+ // calc sqrt to reduce the number of combinations
+ long maxCombineCountSqrt = (long)
Math.sqrt(bitSetWithChildren.size());
+ bitSetWithChildren =
pruneStructInfoBitsets(bitSetWithChildren, (int) maxCombineCountSqrt,
+ cascadesContext.getStatementContext());
+ }
for (Pair<BitSet, List<BitSet>> bitSetWithChild :
bitSetWithChildren) {
groupExpressionMap.putIfAbsent(bitSetWithChild.first,
Pair.of(groupExpression, bitSetWithChild.second));
}
+ }
+ }
+ // Get struct info bit set according to size
+ private static List<Pair<BitSet, List<BitSet>>> pruneStructInfoBitsets(
+ Collection<Pair<BitSet, List<BitSet>>> allBitsets, int size,
StatementContext context) {
+ int pruneSize = Math.min(allBitsets.size(), size);
+ List<Pair<BitSet, List<BitSet>>> prunedStructInfos = new ArrayList<>();
+ for (Pair<BitSet, List<BitSet>> pair : allBitsets) {
+ if (prunedStructInfos.size() >= pruneSize) {
+ break;
+ }
+ prunedStructInfos.add(pair);
}
+ return prunedStructInfos;
}
private BitSet constructLeaf(GroupExpression groupExpression,
CascadesContext cascadesContext) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 254af1bf1f5..7d5da74dae7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -376,6 +376,7 @@ public enum RuleType {
LEADING_JOIN(RuleTypeClass.REWRITE),
REWRITE_SENTINEL(RuleTypeClass.REWRITE),
COLLECT_COLUMNS(RuleTypeClass.REWRITE),
+ COLLECT_PARTITIONS(RuleTypeClass.REWRITE),
// topn opts
DEFER_MATERIALIZE_TOP_N_RESULT(RuleTypeClass.REWRITE),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 9339f0066a9..9717c6785ff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -18,16 +18,11 @@
package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PartitionInfo;
-import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.constraint.TableIdentifier;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Id;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
-import org.apache.doris.mtmv.MTMVPartitionInfo;
-import org.apache.doris.mtmv.MTMVRewriteUtil;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.properties.LogicalProperties;
@@ -70,7 +65,6 @@ import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -79,15 +73,13 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -114,10 +106,12 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
public List<Plan> rewrite(Plan queryPlan, CascadesContext cascadesContext)
{
List<Plan> rewrittenPlans = new ArrayList<>();
// if available materialization list is empty, bail out
- if (cascadesContext.getMaterializationContexts().isEmpty()) {
+ if (cascadesContext.getMaterializationContexts().isEmpty()
+ ||
cascadesContext.getStatementContext().getMaterializedViewRewriteDuration() ==
-1) {
return rewrittenPlans;
}
for (MaterializationContext context :
cascadesContext.getMaterializationContexts()) {
+
cascadesContext.getStatementContext().getMaterializedViewStopwatch().reset().start();
if (checkIfRewritten(queryPlan, context)) {
continue;
}
@@ -131,10 +125,23 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
if (queryStructInfos.isEmpty()) {
continue;
}
+
cascadesContext.getStatementContext().addMaterializedViewRewriteDuration(cascadesContext
+
.getStatementContext().getMaterializedViewStopwatch().elapsed(TimeUnit.MILLISECONDS));
for (StructInfo queryStructInfo : queryStructInfos) {
+
cascadesContext.getStatementContext().getMaterializedViewStopwatch().reset().start();
+ SessionVariable sessionVariable =
cascadesContext.getConnectContext().getSessionVariable();
+ if
(cascadesContext.getStatementContext().getMaterializedViewRewriteDuration()
+ >
sessionVariable.materializedViewRewriteDurationThreshold) {
+
cascadesContext.getStatementContext().materializedViewRewriteDurationExceeded();
+
cascadesContext.getStatementContext().getMaterializedViewStopwatch().stop();
+ LOG.warn("materialized view rewrite duration is exceeded,
the query sql hash is {}",
+ cascadesContext.getConnectContext().getSqlHash());
+
MaterializationContext.makeFailWithDurationExceeded(queryStructInfo,
+ cascadesContext.getMaterializationContexts());
+ return rewrittenPlans;
+ }
try {
- if (rewrittenPlans.size() <
cascadesContext.getConnectContext()
-
.getSessionVariable().getMaterializedViewRewriteSuccessCandidateNum()) {
+ if (rewrittenPlans.size() <
sessionVariable.getMaterializedViewRewriteSuccessCandidateNum()) {
rewrittenPlans.addAll(doRewrite(queryStructInfo,
cascadesContext, context));
}
} catch (Exception exception) {
@@ -142,6 +149,9 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
context.recordFailReason(queryStructInfo,
"Materialized view rule exec fail",
exception::toString);
}
+
cascadesContext.getStatementContext().addMaterializedViewRewriteDuration(cascadesContext
+
.getStatementContext().getMaterializedViewStopwatch().elapsed(TimeUnit.MILLISECONDS)
+ );
}
}
return rewrittenPlans;
@@ -278,15 +288,29 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
continue;
}
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo,
Set<String>>> invalidPartitions;
- if (materializationContext instanceof AsyncMaterializationContext)
{
+ if (PartitionCompensator.needUnionRewrite(materializationContext))
{
+ MTMV mtmv = ((AsyncMaterializationContext)
materializationContext).getMtmv();
+ BaseTableInfo relatedTableInfo =
mtmv.getMvPartitionInfo().getRelatedTableInfo();
try {
- invalidPartitions = calcInvalidPartitions(queryPlan,
rewrittenPlan,
- (AsyncMaterializationContext)
materializationContext, cascadesContext);
+ Set<String> queryUsedPartition =
PartitionCompensator.getQueryTableUsedPartition(
+ relatedTableInfo.toList(), queryStructInfo,
cascadesContext);
+ if (queryUsedPartition.isEmpty()) {
+
materializationContext.recordFailReason(queryStructInfo,
+ String.format("queryUsedPartition is empty,
table is %s, sql hash is %s",
+ relatedTableInfo.toList(),
cascadesContext.getConnectContext().getSqlHash()),
+ () -> String.format("queryUsedPartition is
empty, table is %s, sql hash is %s",
+ relatedTableInfo.toList(),
cascadesContext.getConnectContext().getSqlHash()));
+ LOG.warn(String.format("queryUsedPartition is empty,
table is %s, sql hash is %s",
+ relatedTableInfo.toList(),
cascadesContext.getConnectContext().getSqlHash()));
+ continue;
+ }
+ invalidPartitions =
calcInvalidPartitions(queryUsedPartition, rewrittenPlan,
+ cascadesContext, (AsyncMaterializationContext)
materializationContext);
} catch (AnalysisException e) {
materializationContext.recordFailReason(queryStructInfo,
"Calc invalid partitions fail",
() -> String.format("Calc invalid partitions fail,
mv partition names are %s",
- ((AsyncMaterializationContext)
materializationContext).getMtmv().getPartitions()));
+ mtmv.getPartitions()));
LOG.warn("Calc invalid partitions fail", e);
continue;
}
@@ -294,27 +318,21 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
// if mv can not offer any partition for query, query
rewrite bail out to avoid cycle run
materializationContext.recordFailReason(queryStructInfo,
"mv can not offer any partition for query",
- () -> String.format("mv partition info %s",
- ((AsyncMaterializationContext)
materializationContext).getMtmv()
- .getMvPartitionInfo()));
+ () -> String.format("mv partition info %s",
mtmv.getMvPartitionInfo()));
return rewriteResults;
}
- boolean partitionNeedUnion =
needUnionRewrite(invalidPartitions, cascadesContext);
- boolean canUnionRewrite = canUnionRewrite(queryPlan,
- ((AsyncMaterializationContext)
materializationContext).getMtmv(),
- cascadesContext);
+ boolean partitionNeedUnion =
PartitionCompensator.needUnionRewrite(invalidPartitions, cascadesContext);
+ boolean canUnionRewrite = canUnionRewrite(queryPlan, mtmv,
cascadesContext);
if (partitionNeedUnion && !canUnionRewrite) {
materializationContext.recordFailReason(queryStructInfo,
"need compensate union all, but can not, because
the query structInfo",
() -> String.format("mv partition info is %s, and
the query plan is %s",
- ((AsyncMaterializationContext)
materializationContext).getMtmv()
- .getMvPartitionInfo(),
queryPlan.treeString()));
+ mtmv.getMvPartitionInfo(),
queryPlan.treeString()));
return rewriteResults;
}
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo,
Set<String>>> finalInvalidPartitions =
invalidPartitions;
if (partitionNeedUnion) {
- MTMV mtmv = ((AsyncMaterializationContext)
materializationContext).getMtmv();
Pair<Plan, Boolean> planAndNeedAddFilterPair =
StructInfo.addFilterOnTableScan(queryPlan,
invalidPartitions.value(),
mtmv.getMvPartitionInfo().getRelatedCol(),
cascadesContext);
@@ -396,13 +414,6 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
}
}
- protected boolean needUnionRewrite(
- Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo,
Set<String>>> invalidPartitions,
- CascadesContext cascadesContext) {
- return invalidPartitions != null
- && (!invalidPartitions.key().isEmpty() ||
!invalidPartitions.value().isEmpty());
- }
-
/**
* Not all query after rewritten successfully can compensate union all
* Such as:
@@ -450,84 +461,13 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
* @return the key in pair is mvNeedRemovePartitionNameSet, the value in
pair is baseTableNeedUnionPartitionNameSet
*/
protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo,
Set<String>>> calcInvalidPartitions(
- Plan queryPlan, Plan rewrittenPlan,
- AsyncMaterializationContext materializationContext,
CascadesContext cascadesContext)
+ Set<String> queryUsedPartition,
+ Plan rewrittenPlan,
+ CascadesContext cascadesContext,
+ AsyncMaterializationContext materializationContext)
throws AnalysisException {
- Set<String> mvNeedRemovePartitionNameSet = new HashSet<>();
- Set<String> baseTableNeedUnionPartitionNameSet = new HashSet<>();
- // check partition is valid or not
- MTMV mtmv = materializationContext.getMtmv();
- PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo();
- if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) {
- // if not partition, if rewrite success, it means mv is available
- return Pair.of(ImmutableMap.of(), ImmutableMap.of());
- }
- MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
- BaseTableInfo relatedPartitionTable =
mvCustomPartitionInfo.getRelatedTableInfo();
- if (relatedPartitionTable == null) {
- return Pair.of(ImmutableMap.of(), ImmutableMap.of());
- }
- // Collect the mv related base table partitions which query used
- Map<BaseTableInfo, Set<String>> queryUsedBaseTablePartitions = new
LinkedHashMap<>();
- queryUsedBaseTablePartitions.put(relatedPartitionTable, new
HashSet<>());
- queryPlan.accept(new StructInfo.QueryScanPartitionsCollector(),
queryUsedBaseTablePartitions);
- // Bail out, not check invalid partition if not olap scan, support
later
- if (queryUsedBaseTablePartitions.isEmpty()) {
- return Pair.of(ImmutableMap.of(), ImmutableMap.of());
- }
- Set<String> queryUsedBaseTablePartitionNameSet =
queryUsedBaseTablePartitions.get(relatedPartitionTable);
-
- Collection<Partition> mvValidPartitions =
MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
- cascadesContext.getConnectContext(),
System.currentTimeMillis(), false);
- Set<String> mvValidPartitionNameSet = new HashSet<>();
- Set<String> mvValidBaseTablePartitionNameSet = new HashSet<>();
- Set<String> mvValidHasDataRelatedBaseTableNameSet = new HashSet<>();
- Pair<Map<String, Set<String>>, Map<String, String>> partitionMapping =
mtmv.calculateDoublyPartitionMappings();
- for (Partition mvValidPartition : mvValidPartitions) {
- mvValidPartitionNameSet.add(mvValidPartition.getName());
- Set<String> relatedBaseTablePartitions =
partitionMapping.key().get(mvValidPartition.getName());
- if (relatedBaseTablePartitions != null) {
-
mvValidBaseTablePartitionNameSet.addAll(relatedBaseTablePartitions);
- }
- if
(!mtmv.selectNonEmptyPartitionIds(ImmutableList.of(mvValidPartition.getId())).isEmpty())
{
- if (relatedBaseTablePartitions != null) {
-
mvValidHasDataRelatedBaseTableNameSet.addAll(relatedBaseTablePartitions);
- }
- }
- }
- if (Sets.intersection(mvValidHasDataRelatedBaseTableNameSet,
queryUsedBaseTablePartitionNameSet).isEmpty()) {
- // if mv can not offer any partition for query, query rewrite bail
out
- return null;
- }
- // Check when mv partition relates base table partition data change or
delete partition
- Set<String> rewrittenPlanUsePartitionNameSet = new HashSet<>();
- List<Object> mvOlapScanList = rewrittenPlan.collectToList(node ->
- node instanceof LogicalOlapScan
- && Objects.equals(((CatalogRelation)
node).getTable().getName(), mtmv.getName()));
- for (Object olapScanObj : mvOlapScanList) {
- LogicalOlapScan olapScan = (LogicalOlapScan) olapScanObj;
- olapScan.getSelectedPartitionIds().forEach(id ->
-
rewrittenPlanUsePartitionNameSet.add(olapScan.getTable().getPartition(id).getName()));
- }
- // If rewritten plan use but not in mv valid partition name set, need
remove in mv and base table union
- Sets.difference(rewrittenPlanUsePartitionNameSet,
mvValidPartitionNameSet)
- .copyInto(mvNeedRemovePartitionNameSet);
- for (String partitionName : mvNeedRemovePartitionNameSet) {
-
baseTableNeedUnionPartitionNameSet.addAll(partitionMapping.key().get(partitionName));
- }
- // If related base table create partitions or mv is created with ttl,
need base table union
- Sets.difference(queryUsedBaseTablePartitionNameSet,
mvValidBaseTablePartitionNameSet)
- .copyInto(baseTableNeedUnionPartitionNameSet);
- // Construct result map
- Map<BaseTableInfo, Set<String>> mvPartitionNeedRemoveNameMap = new
HashMap<>();
- if (!mvNeedRemovePartitionNameSet.isEmpty()) {
- mvPartitionNeedRemoveNameMap.put(new BaseTableInfo(mtmv),
mvNeedRemovePartitionNameSet);
- }
- Map<BaseTableInfo, Set<String>> baseTablePartitionNeedUnionNameMap =
new HashMap<>();
- if (!baseTableNeedUnionPartitionNameSet.isEmpty()) {
- baseTablePartitionNeedUnionNameMap.put(relatedPartitionTable,
baseTableNeedUnionPartitionNameSet);
- }
- return Pair.of(mvPartitionNeedRemoveNameMap,
baseTablePartitionNeedUnionNameMap);
+ return PartitionCompensator.calcInvalidPartitions(queryUsedPartition,
rewrittenPlan,
+ materializationContext, cascadesContext);
}
/**
@@ -935,6 +875,18 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
// check mv plan is valid or not, this can use cache for performance
private boolean isMaterializationValid(Plan queryPlan, CascadesContext
cascadesContext,
MaterializationContext context) {
+ if (!context.getStructInfo().isValid()) {
+ context.recordFailReason(context.getStructInfo(),
+ "View original struct info is invalid", () ->
String.format("view plan is %s",
+
context.getStructInfo().getOriginalPlan().treeString()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("View struct info is invalid, mv
identifier is %s, query plan is %s,"
+ + "view plan is %s",
+ context.generateMaterializationIdentifier(),
queryPlan.treeString(),
+ context.getStructInfo().getTopPlan().treeString()));
+ }
+ return false;
+ }
long materializationId =
context.generateMaterializationIdentifier().hashCode();
Boolean cachedCheckResult =
cascadesContext.getMemo().materializationHasChecked(this.getClass(),
materializationId);
@@ -971,18 +923,6 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
}
return false;
}
- if (!context.getStructInfo().isValid()) {
- context.recordFailReason(context.getStructInfo(),
- "View original struct info is invalid", () ->
String.format("view plan is %s",
-
context.getStructInfo().getOriginalPlan().treeString()));
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("View struct info is invalid, mv
identifier is %s, query plan is %s,"
- + "view plan is %s",
- context.generateMaterializationIdentifier(),
queryPlan.treeString(),
- context.getStructInfo().getTopPlan().treeString()));
- }
- return false;
- }
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/HyperGraphComparator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/HyperGraphComparator.java
index 22282a23516..fce8e2f520f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/HyperGraphComparator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/HyperGraphComparator.java
@@ -406,13 +406,20 @@ public class HyperGraphComparator {
}
private Map<Edge, Edge> constructQueryToViewJoinMapWithExpr() {
- Map<Expression, Edge> viewExprToEdge = getViewJoinEdges().stream()
- .flatMap(e -> e.getExpressions().stream().map(expr ->
Pair.of(expr, e)))
- .collect(ImmutableMap.toImmutableMap(p -> p.first, p ->
p.second));
- Map<Expression, Edge> queryExprToEdge = getQueryJoinEdges().stream()
- .flatMap(e -> e.getExpressions().stream().map(expr ->
Pair.of(expr, e)))
- .collect(ImmutableMap.toImmutableMap(p -> p.first, p ->
p.second));
-
+ Map<Expression, Edge> viewExprToEdge = new HashMap<>();
+ List<JoinEdge> viewJoinEdges = getViewJoinEdges();
+ for (JoinEdge viewJoin : viewJoinEdges) {
+ for (Expression expression : viewJoin.getExpressions()) {
+ viewExprToEdge.put(expression, viewJoin);
+ }
+ }
+ Map<Expression, Edge> queryExprToEdge = new HashMap<>();
+ List<JoinEdge> queryJoinEdges = getQueryJoinEdges();
+ for (JoinEdge queryJoin : queryJoinEdges) {
+ for (Expression expression : queryJoin.getExpressions()) {
+ queryExprToEdge.put(expression, queryJoin);
+ }
+ }
HashMap<Edge, Edge> edgeMap = new HashMap<>();
for (Entry<Expression, Edge> entry : queryExprToEdge.entrySet()) {
if (edgeMap.containsKey(entry.getValue())) {
@@ -444,15 +451,19 @@ public class HyperGraphComparator {
// +--LogicalOlapScan
private Map<Edge, Edge> constructQueryToViewFilterMapWithExpr() {
Multimap<Expression, Edge> viewExprToEdge = HashMultimap.create();
- getViewFilterEdges().stream()
- .flatMap(e -> e.getExpressions().stream().map(expr ->
Pair.of(expr, e)))
- .forEach(pair -> viewExprToEdge.put(pair.key(), pair.value()));
-
+ List<FilterEdge> viewFilterEdges = getViewFilterEdges();
+ for (FilterEdge viewEdge : viewFilterEdges) {
+ for (Expression expression : viewEdge.getExpressions()) {
+ viewExprToEdge.put(expression, viewEdge);
+ }
+ }
Multimap<Expression, Edge> queryExprToEdge = HashMultimap.create();
- getQueryFilterEdges().stream()
- .flatMap(e -> e.getExpressions().stream().map(expr ->
Pair.of(expr, e)))
- .forEach(pair -> queryExprToEdge.put(pair.key(),
pair.value()));
-
+ List<FilterEdge> queryFilterEdges = getQueryFilterEdges();
+ for (FilterEdge queryEdge : queryFilterEdges) {
+ for (Expression expression : queryEdge.getExpressions()) {
+ queryExprToEdge.put(expression, queryEdge);
+ }
+ }
HashMap<Edge, Edge> queryToViewEdgeMap = new HashMap<>();
for (Entry<Expression, Collection<Edge>> entry :
queryExprToEdge.asMap().entrySet()) {
Expression queryExprViewBased = null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
index 63b336e8eca..d94c6f41df7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
@@ -63,7 +63,7 @@ public class InitMaterializationContextHook implements
PlannerHook {
public static final InitMaterializationContextHook INSTANCE = new
InitMaterializationContextHook();
@Override
- public void afterAnalyze(NereidsPlanner planner) {
+ public void afterRewrite(NereidsPlanner planner) {
initMaterializationContext(planner.getCascadesContext());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
index 38eba2ac340..c1de92d2b44 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
@@ -410,6 +410,21 @@ public abstract class MaterializationContext {
return builder.toString();
}
+ /**
+ * If materialized view rewrite duration is exceeded, make all
materializationContexts with reason
+ * materialized view rewrite duration is exceeded
+ * */
+ public static void makeFailWithDurationExceeded(StructInfo queryInfo,
+ List<MaterializationContext> materializationContexts) {
+ for (MaterializationContext context : materializationContexts) {
+ if (context.isSuccess()) {
+ continue;
+ }
+ context.recordFailReason(queryInfo, "materialized view rewrite
duration is exceeded",
+ () -> "materialized view rewrite duration is exceeded");
+ }
+ }
+
private static String generateIdentifierName(List<String> qualifiers) {
return String.join(".", qualifiers);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewAggregateOnNoneAggregateRule.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewAggregateOnNoneAggregateRule.java
index 45514f6fb15..06e1e77e56b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewAggregateOnNoneAggregateRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewAggregateOnNoneAggregateRule.java
@@ -92,11 +92,15 @@ public class MaterializedViewAggregateOnNoneAggregateRule
extends AbstractMateri
@Override
protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo,
Set<String>>> calcInvalidPartitions(
- Plan queryPlan, Plan rewrittenPlan, AsyncMaterializationContext
materializationContext,
- CascadesContext cascadesContext) throws AnalysisException {
+ Set<String> queryUsedPartition,
+ Plan rewrittenPlan,
+ CascadesContext cascadesContext,
+ AsyncMaterializationContext materializationContext)
+ throws AnalysisException {
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>>
invalidPartitions
- = super.calcInvalidPartitions(queryPlan, rewrittenPlan,
materializationContext, cascadesContext);
- if (needUnionRewrite(invalidPartitions, cascadesContext)) {
+ = super.calcInvalidPartitions(queryUsedPartition,
rewrittenPlan, cascadesContext,
+ materializationContext);
+ if (PartitionCompensator.needUnionRewrite(invalidPartitions,
cascadesContext)) {
// if query use some invalid partition in mv, bail out
return null;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
index f5254eb9772..ddae11fb735 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java
@@ -183,7 +183,7 @@ public class MaterializedViewUtils {
Group ownerGroup = plan.getGroupExpression().get().getOwnerGroup();
StructInfoMap structInfoMap = ownerGroup.getstructInfoMap();
// Refresh struct info in current level plan from top to bottom
- structInfoMap.refresh(ownerGroup, cascadesContext);
+ structInfoMap.refresh(ownerGroup, cascadesContext, new
HashSet<>());
structInfoMap.setRefreshVersion(cascadesContext.getMemo().getRefreshVersion());
Set<BitSet> queryTableSets = structInfoMap.getTableMaps();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
new file mode 100644
index 00000000000..0e1466a1745
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.exploration.mv;
+
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Handle materialized view partition union compensate handler
+ * */
+public class PartitionCompensator {
+
+ public static final Logger LOG =
LogManager.getLogger(PartitionCompensator.class);
+ public static final Pair<RelationId, Set<String>> ALL_PARTITIONS =
Pair.of(null, null);
+
+ /**
+ * Get table used partitions by the table full qualifiers
+ * */
+ public static Set<String> getQueryTableUsedPartition(
+ List<String> targetTableFullQualifiers,
+ StructInfo queryStructInfo,
+ CascadesContext cascadesContext) {
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap
+ =
cascadesContext.getStatementContext().getTableUsedPartitionNameMap();
+ Collection<Pair<RelationId, Set<String>>> tableUsedPartitions =
+ tableUsedPartitionNameMap.get(targetTableFullQualifiers);
+ Set<RelationId> queryUsedRelationSet =
queryStructInfo.getRelationIdStructInfoNodeMap().keySet();
+ Set<String> queryUsedPartitionSet = new HashSet<>();
+ for (Pair<RelationId, Set<String>> relationPartition :
tableUsedPartitions) {
+ if (queryUsedRelationSet.contains(relationPartition.key())) {
+ queryUsedPartitionSet.addAll(relationPartition.value());
+ }
+ }
+ return queryUsedPartitionSet;
+ }
+
+ /**
+ * Maybe only some partitions is invalid in materialized view, or base
table maybe add, modify, delete partition
+ * So we should calc the invalid partition used in query
+ * @param queryUsedBaseTablePartitionNameSet partitions used by query
related partition table
+ * @param rewrittenPlan tmp rewrittenPlan when mv rewrite
+ * @param materializationContext the context of materialization,which hold
materialized view meta and other info
+ * @param cascadesContext the context of cascades
+ * @return the key in pair is mvNeedRemovePartitionNameSet, the value in
pair is baseTableNeedUnionPartitionNameSet
+ */
+ public static Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo,
Set<String>>> calcInvalidPartitions(
+ Set<String> queryUsedBaseTablePartitionNameSet, Plan rewrittenPlan,
+ AsyncMaterializationContext materializationContext,
CascadesContext cascadesContext)
+ throws AnalysisException {
+ Set<String> mvNeedRemovePartitionNameSet = new HashSet<>();
+ Set<String> baseTableNeedUnionPartitionNameSet = new HashSet<>();
+ // check partition is valid or not
+ MTMV mtmv = materializationContext.getMtmv();
+ PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo();
+ if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) {
+ // if not partition, if rewrite success, it means mv is available
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ }
+ MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
+ BaseTableInfo relatedPartitionTable =
mvCustomPartitionInfo.getRelatedTableInfo();
+ if (relatedPartitionTable == null ||
queryUsedBaseTablePartitionNameSet.isEmpty()) {
+ // if mv is not partitioned or query not query any partition,
doesn't compensate
+ return Pair.of(ImmutableMap.of(), ImmutableMap.of());
+ }
+ Collection<Partition> mvValidPartitions =
cascadesContext.getStatementContext()
+ .getMvCanRewritePartitionsMap().get(new BaseTableInfo(mtmv));
+ Set<String> mvValidPartitionNameSet = new HashSet<>();
+ Set<String> mvValidBaseTablePartitionNameSet = new HashSet<>();
+ Set<String> mvValidHasDataRelatedBaseTableNameSet = new HashSet<>();
+ Pair<Map<String, Set<String>>, Map<String, String>> partitionMapping =
mtmv.calculateDoublyPartitionMappings();
+ for (Partition mvValidPartition : mvValidPartitions) {
+ mvValidPartitionNameSet.add(mvValidPartition.getName());
+ Set<String> relatedBaseTablePartitions =
partitionMapping.key().get(mvValidPartition.getName());
+ if (relatedBaseTablePartitions != null) {
+
mvValidBaseTablePartitionNameSet.addAll(relatedBaseTablePartitions);
+ }
+ if
(!mtmv.selectNonEmptyPartitionIds(ImmutableList.of(mvValidPartition.getId())).isEmpty())
{
+ if (relatedBaseTablePartitions != null) {
+
mvValidHasDataRelatedBaseTableNameSet.addAll(relatedBaseTablePartitions);
+ }
+ }
+ }
+ if (Sets.intersection(mvValidHasDataRelatedBaseTableNameSet,
queryUsedBaseTablePartitionNameSet).isEmpty()) {
+ // if mv can not offer any partition for query, query rewrite bail
out
+ return null;
+ }
+ // Check when mv partition relates base table partition data change or
delete partition
+ Set<String> rewrittenPlanUsePartitionNameSet = new HashSet<>();
+ List<Object> mvOlapScanList = rewrittenPlan.collectToList(node ->
+ node instanceof LogicalOlapScan
+ && Objects.equals(((CatalogRelation)
node).getTable().getName(), mtmv.getName()));
+ for (Object olapScanObj : mvOlapScanList) {
+ LogicalOlapScan olapScan = (LogicalOlapScan) olapScanObj;
+ olapScan.getSelectedPartitionIds().forEach(id ->
+
rewrittenPlanUsePartitionNameSet.add(olapScan.getTable().getPartition(id).getName()));
+ }
+ // If rewritten plan use but not in mv valid partition name set, need
remove in mv and base table union
+ Sets.difference(rewrittenPlanUsePartitionNameSet,
mvValidPartitionNameSet)
+ .copyInto(mvNeedRemovePartitionNameSet);
+ for (String partitionName : mvNeedRemovePartitionNameSet) {
+
baseTableNeedUnionPartitionNameSet.addAll(partitionMapping.key().get(partitionName));
+ }
+ // If related base table create partitions or mv is created with ttl,
need base table union
+ Sets.difference(queryUsedBaseTablePartitionNameSet,
mvValidBaseTablePartitionNameSet)
+ .copyInto(baseTableNeedUnionPartitionNameSet);
+ // Construct result map
+ Map<BaseTableInfo, Set<String>> mvPartitionNeedRemoveNameMap = new
HashMap<>();
+ if (!mvNeedRemovePartitionNameSet.isEmpty()) {
+ mvPartitionNeedRemoveNameMap.put(new BaseTableInfo(mtmv),
mvNeedRemovePartitionNameSet);
+ }
+ Map<BaseTableInfo, Set<String>> baseTablePartitionNeedUnionNameMap =
new HashMap<>();
+ if (!baseTableNeedUnionPartitionNameSet.isEmpty()) {
+ baseTablePartitionNeedUnionNameMap.put(relatedPartitionTable,
baseTableNeedUnionPartitionNameSet);
+ }
+ return Pair.of(mvPartitionNeedRemoveNameMap,
baseTablePartitionNeedUnionNameMap);
+ }
+
+ public static boolean needUnionRewrite(
+ Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo,
Set<String>>> invalidPartitions,
+ CascadesContext cascadesContext) {
+ return invalidPartitions != null
+ && (!invalidPartitions.key().isEmpty() ||
!invalidPartitions.value().isEmpty());
+ }
+
+ /**
+ * Check if need union compensate or not
+ */
+ public static boolean needUnionRewrite(MaterializationContext
materializationContext) {
+ if (!(materializationContext instanceof AsyncMaterializationContext)) {
+ return false;
+ }
+ MTMV mtmv = ((AsyncMaterializationContext)
materializationContext).getMtmv();
+ PartitionType type = mtmv.getPartitionInfo().getType();
+ BaseTableInfo relatedTableInfo =
mtmv.getMvPartitionInfo().getRelatedTableInfo();
+ return !PartitionType.UNPARTITIONED.equals(type) && relatedTableInfo
!= null;
+ }
+
+ public static boolean isAllPartition(Pair<RelationId, Set<String>>
usedPartition) {
+ return ALL_PARTITIONS.equals(usedPartition);
+ }
+
+ /**
+ * Get query used partitions
+ * this is calculated from tableUsedPartitionNameMap and tables in
statementContext
+ * */
+ public static Map<List<String>, Set<String>>
getQueryUsedPartitions(ConnectContext ctx) {
+ // get table used partitions
+ // if table is not in statementContext().getTables() which means the
table is partition prune as empty relation
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap = ctx.getStatementContext()
+ .getTableUsedPartitionNameMap();
+ // if value is empty, means query no partitions
+ // if value is null, means query all partitions
+ // if value is not empty, means query some partitions
+ Map<List<String>, Set<String>> queryUsedRelatedTablePartitionsMap =
new HashMap<>();
+ for (Map.Entry<List<String>, TableIf> tableIfEntry :
ctx.getStatementContext().getTables().entrySet()) {
+ Set<String> usedPartitionSet = new HashSet<>();
+ if
(!tableUsedPartitionNameMap.get(tableIfEntry.getKey()).isEmpty()) {
+ for (Pair<RelationId, Set<String>> partitionPair
+ :
tableUsedPartitionNameMap.get(tableIfEntry.getKey())) {
+ if (PartitionCompensator.isAllPartition(partitionPair)) {
+
queryUsedRelatedTablePartitionsMap.put(tableIfEntry.getKey(), null);
+ break;
+ }
+ usedPartitionSet.addAll(partitionPair.value());
+ }
+ }
+ queryUsedRelatedTablePartitionsMap.put(tableIfEntry.getKey(),
usedPartitionSet);
+ }
+ return queryUsedRelatedTablePartitionsMap;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
index 3f0397dc411..2cbb311bea7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java
@@ -17,9 +17,7 @@
package org.apache.doris.nereids.rules.exploration.mv;
-import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
-import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
@@ -33,7 +31,6 @@ import
org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
import org.apache.doris.nereids.trees.copier.DeepCopierContext;
import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
import org.apache.doris.nereids.trees.expressions.EqualTo;
-import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
@@ -50,9 +47,6 @@ import org.apache.doris.nereids.trees.plans.algebra.Project;
import
org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAddContext;
import
org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
-import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
-import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
-import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -67,7 +61,6 @@ import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
@@ -133,8 +126,6 @@ public class StructInfo {
// this is for building LogicalCompatibilityContext later.
private final Map<ExpressionPosition, Map<Expression, Expression>>
expressionToShuttledExpressionToMap;
- // Record the exprId and the corresponding expr map, this is used by
expression shuttled
- private final Map<ExprId, Expression> namedExprIdAndExprMapping;
private final List<? extends Expression> planOutputShuttledExpressions;
/**
@@ -147,7 +138,6 @@ public class StructInfo {
Map<ExpressionPosition, Multimap<Expression, Pair<Expression,
HyperElement>>>
shuttledExpressionsToExpressionsMap,
Map<ExpressionPosition, Map<Expression, Expression>>
expressionToShuttledExpressionToMap,
- Map<ExprId, Expression> namedExprIdAndExprMapping,
BitSet tableIdSet,
SplitPredicate splitPredicate,
EquivalenceClass equivalenceClass,
@@ -166,7 +156,6 @@ public class StructInfo {
this.equivalenceClass = equivalenceClass;
this.shuttledExpressionsToExpressionsMap =
shuttledExpressionsToExpressionsMap;
this.expressionToShuttledExpressionToMap =
expressionToShuttledExpressionToMap;
- this.namedExprIdAndExprMapping = namedExprIdAndExprMapping;
this.planOutputShuttledExpressions = planOutputShuttledExpressions;
}
@@ -177,8 +166,7 @@ public class StructInfo {
return new StructInfo(this.originalPlan, this.originalPlanId,
this.hyperGraph, this.valid, this.topPlan,
this.bottomPlan, this.relations,
this.relationIdStructInfoNodeMap, predicates,
this.shuttledExpressionsToExpressionsMap,
this.expressionToShuttledExpressionToMap,
- this.namedExprIdAndExprMapping, this.tableBitSet,
- null, null, this.planOutputShuttledExpressions);
+ this.tableBitSet, null, null,
this.planOutputShuttledExpressions);
}
/**
@@ -188,8 +176,7 @@ public class StructInfo {
return new StructInfo(this.originalPlan, this.originalPlanId,
this.hyperGraph, this.valid, this.topPlan,
this.bottomPlan, this.relations,
this.relationIdStructInfoNodeMap, this.predicates,
this.shuttledExpressionsToExpressionsMap,
this.expressionToShuttledExpressionToMap,
- this.namedExprIdAndExprMapping, tableBitSet,
- this.splitPredicate, this.equivalenceClass,
this.planOutputShuttledExpressions);
+ tableBitSet, this.splitPredicate, this.equivalenceClass,
this.planOutputShuttledExpressions);
}
private static boolean collectStructInfoFromGraph(HyperGraph hyperGraph,
@@ -197,7 +184,6 @@ public class StructInfo {
Map<ExpressionPosition, Multimap<Expression, Pair<Expression,
HyperElement>>>
shuttledExpressionsToExpressionsMap,
Map<ExpressionPosition, Map<Expression, Expression>>
expressionToShuttledExpressionToMap,
- Map<ExprId, Expression> namedExprIdAndExprMapping,
List<CatalogRelation> relations,
Map<RelationId, StructInfoNode> relationIdStructInfoNodeMap,
BitSet hyperTableBitSet,
@@ -215,22 +201,16 @@ public class StructInfo {
// plan relation collector and set to map
StructInfoNode structInfoNode = (StructInfoNode) node;
// record expressions in node
- if (structInfoNode.getExpressions() != null) {
- structInfoNode.getExpressions().forEach(expression -> {
- ExpressionLineageReplacer.ExpressionReplaceContext
replaceContext =
- new
ExpressionLineageReplacer.ExpressionReplaceContext(
- Lists.newArrayList(expression),
ImmutableSet.of(),
- ImmutableSet.of(), new BitSet());
-
structInfoNode.getPlan().accept(ExpressionLineageReplacer.INSTANCE,
replaceContext);
- // Replace expressions by expression map
- List<Expression> replacedExpressions =
replaceContext.getReplacedExpressions();
+ List<Expression> nodeExpressions = structInfoNode.getExpressions();
+ if (nodeExpressions != null) {
+ List<? extends Expression> shuttledExpressions =
ExpressionUtils.shuttleExpressionWithLineage(
+ nodeExpressions, structInfoNode.getPlan(),
+ new BitSet());
+ for (int index = 0; index < nodeExpressions.size(); index++) {
putShuttledExpressionToExpressionsMap(shuttledExpressionsToExpressionsMap,
expressionToShuttledExpressionToMap,
- ExpressionPosition.NODE,
replacedExpressions.get(0), expression, node);
- // Record this, will be used in top level expression
shuttle later, see the method
- // ExpressionLineageReplacer#visitGroupPlan
-
namedExprIdAndExprMapping.putAll(replaceContext.getExprIdExpressionMap());
- });
+ ExpressionPosition.NODE,
shuttledExpressions.get(index), nodeExpressions.get(index), node);
+ }
}
// every node should only have one relation, this is for
LogicalCompatibilityContext
if (!nodeRelations.isEmpty()) {
@@ -242,7 +222,6 @@ public class StructInfo {
List<? extends Expression> joinConjunctExpressions =
edge.getExpressions();
// shuttle expression in edge for the build of
LogicalCompatibilityContext later.
// Record the exprId to expr map in the processing to strut info
- // TODO get exprId to expr map when complex project is ready in
join dege
ExpressionLineageReplacer.ExpressionReplaceContext replaceContext =
new ExpressionLineageReplacer.ExpressionReplaceContext(
joinConjunctExpressions.stream().map(expr ->
(Expression) expr)
@@ -250,29 +229,26 @@ public class StructInfo {
ImmutableSet.of(), ImmutableSet.of(), new
BitSet());
topPlan.accept(ExpressionLineageReplacer.INSTANCE, replaceContext);
// Replace expressions by expression map
- List<Expression> replacedExpressions =
replaceContext.getReplacedExpressions();
- for (int i = 0; i < replacedExpressions.size(); i++) {
+ List<? extends Expression> shuttledExpressions =
ExpressionUtils.shuttleExpressionWithLineage(
+ joinConjunctExpressions, topPlan, new BitSet());
+ for (int i = 0; i < shuttledExpressions.size(); i++) {
putShuttledExpressionToExpressionsMap(shuttledExpressionsToExpressionsMap,
expressionToShuttledExpressionToMap,
- ExpressionPosition.JOIN_EDGE,
replacedExpressions.get(i), joinConjunctExpressions.get(i),
- edge);
+ ExpressionPosition.JOIN_EDGE,
shuttledExpressions.get(i),
+ joinConjunctExpressions.get(i), edge);
}
- // Record this, will be used in top level expression shuttle
later, see the method
- // ExpressionLineageReplacer#visitGroupPlan
-
namedExprIdAndExprMapping.putAll(replaceContext.getExprIdExpressionMap());
}
// Collect expression from where in hyper graph
hyperGraph.getFilterEdges().forEach(filterEdge -> {
List<? extends Expression> filterExpressions =
filterEdge.getExpressions();
- filterExpressions.forEach(predicate -> {
- // this is used for LogicalCompatibilityContext
- ExpressionUtils.extractConjunction(predicate).forEach(expr ->
-
putShuttledExpressionToExpressionsMap(shuttledExpressionsToExpressionsMap,
- expressionToShuttledExpressionToMap,
- ExpressionPosition.FILTER_EDGE,
-
ExpressionUtils.shuttleExpressionWithLineage(predicate, topPlan, new BitSet()),
- predicate, filterEdge));
- });
+ List<? extends Expression> shuttledExpressions =
ExpressionUtils.shuttleExpressionWithLineage(
+ filterExpressions, topPlan, new BitSet());
+ for (int i = 0; i < shuttledExpressions.size(); i++) {
+
putShuttledExpressionToExpressionsMap(shuttledExpressionsToExpressionsMap,
+ expressionToShuttledExpressionToMap,
+ ExpressionPosition.FILTER_EDGE,
shuttledExpressions.get(i),
+ filterExpressions.get(i), filterEdge);
+ }
});
return true;
}
@@ -345,12 +321,10 @@ public class StructInfo {
Map<RelationId, StructInfoNode> relationIdStructInfoNodeMap = new
LinkedHashMap<>();
Map<ExpressionPosition, Multimap<Expression, Pair<Expression,
HyperElement>>>
shuttledHashConjunctsToConjunctsMap = new LinkedHashMap<>();
- Map<ExprId, Expression> namedExprIdAndExprMapping = new
LinkedHashMap<>();
BitSet tableBitSet = new BitSet();
Map<ExpressionPosition, Map<Expression, Expression>>
expressionToShuttledExpressionToMap = new HashMap<>();
boolean valid = collectStructInfoFromGraph(hyperGraph, topPlan,
shuttledHashConjunctsToConjunctsMap,
expressionToShuttledExpressionToMap,
- namedExprIdAndExprMapping,
relationList,
relationIdStructInfoNodeMap,
tableBitSet,
@@ -372,7 +346,7 @@ public class StructInfo {
return new StructInfo(originalPlan, originalPlanId, hyperGraph, valid,
topPlan, bottomPlan,
relationList, relationIdStructInfoNodeMap, predicates,
shuttledHashConjunctsToConjunctsMap,
expressionToShuttledExpressionToMap,
- namedExprIdAndExprMapping, tableBitSet, null, null,
+ tableBitSet, null, null,
planOutputShuttledExpressions);
}
@@ -473,10 +447,6 @@ public class StructInfo {
return originalPlanId;
}
- public Map<ExprId, Expression> getNamedExprIdAndExprMapping() {
- return namedExprIdAndExprMapping;
- }
-
public BitSet getTableBitSet() {
return tableBitSet;
}
@@ -766,41 +736,6 @@ public class StructInfo {
}
}
- /**
- * Collect partitions on base table
- */
- public static class QueryScanPartitionsCollector extends
DefaultPlanVisitor<Plan,
- Map<BaseTableInfo, Set<String>>> {
- @Override
- public Plan visitLogicalCatalogRelation(LogicalCatalogRelation
catalogRelation,
- Map<BaseTableInfo, Set<String>> targetTablePartitionMap) {
- TableIf table = catalogRelation.getTable();
- BaseTableInfo relatedPartitionTable = new BaseTableInfo(table);
- if (!targetTablePartitionMap.containsKey(relatedPartitionTable)) {
- return catalogRelation;
- }
- Set<String> tablePartitions =
targetTablePartitionMap.get(relatedPartitionTable);
- if (catalogRelation instanceof LogicalOlapScan) {
- // Handle olap table
- LogicalOlapScan logicalOlapScan = (LogicalOlapScan)
catalogRelation;
- for (Long partitionId :
logicalOlapScan.getSelectedPartitionIds()) {
-
tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId).getName());
- }
- } else if (catalogRelation instanceof LogicalFileScan
- && catalogRelation.getTable() instanceof ExternalTable
- && ((ExternalTable)
catalogRelation.getTable()).supportInternalPartitionPruned()) {
- LogicalFileScan logicalFileScan = (LogicalFileScan)
catalogRelation;
- SelectedPartitions selectedPartitions =
logicalFileScan.getSelectedPartitions();
-
tablePartitions.addAll(selectedPartitions.selectedPartitions.keySet());
- } else {
- // todo Support other type partition table
- // Not support to partition check now when query external
catalog table, support later.
- targetTablePartitionMap.clear();
- }
- return catalogRelation;
- }
- }
-
/**
* Add filter on table scan according to table filter map
*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
index 04d8b8ffe65..a43acd7f5bc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
@@ -69,7 +69,6 @@ public class PruneFileScanPartition extends
OneRewriteRuleFactory {
// set isPruned so that it won't go pass the partition
prune again
selectedPartitions = new SelectedPartitions(0,
ImmutableMap.of(), true);
}
-
LogicalFileScan rewrittenScan =
scan.withSelectedPartitions(selectedPartitions);
return new LogicalFilter<>(filter.getConjuncts(),
rewrittenScan);
}).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/QueryPartitionCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/QueryPartitionCollector.java
new file mode 100644
index 00000000000..3ce0667c6a0
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/QueryPartitionCollector.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.rules.exploration.mv.PartitionCompensator;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
+import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Multimap;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Used to collect query partitions, only collect once
+ * */
+public class QueryPartitionCollector extends
DefaultPlanRewriter<ConnectContext> implements CustomRewriter {
+
+ @Override
+ public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext != null &&
connectContext.getSessionVariable().internalSession) {
+ return plan;
+ }
+ plan.accept(this, connectContext);
+ return plan;
+ }
+
+ @Override
+ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation
catalogRelation, ConnectContext context) {
+
+ TableIf table = catalogRelation.getTable();
+ if (table.getDatabase() == null) {
+ // logic for test
+ return catalogRelation;
+ }
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap = context.getStatementContext()
+ .getTableUsedPartitionNameMap();
+ Set<String> tablePartitions = new HashSet<>();
+ if (catalogRelation instanceof LogicalOlapScan) {
+ // Handle olap table
+ LogicalOlapScan logicalOlapScan = (LogicalOlapScan)
catalogRelation;
+ for (Long partitionId : logicalOlapScan.getSelectedPartitionIds())
{
+
tablePartitions.add(logicalOlapScan.getTable().getPartition(partitionId).getName());
+ }
+ } else if (catalogRelation instanceof LogicalFileScan
+ && catalogRelation.getTable() != null
+ && ((ExternalTable)
catalogRelation.getTable()).supportInternalPartitionPruned()) {
+ LogicalFileScan logicalFileScan = (LogicalFileScan)
catalogRelation;
+ SelectedPartitions selectedPartitions =
logicalFileScan.getSelectedPartitions();
+
tablePartitions.addAll(selectedPartitions.selectedPartitions.keySet());
+ } else {
+ // todo when supported get query used partitions, should put
actual used partitions but not ALL_PARTITIONS
+ tableUsedPartitionNameMap.put(table.getFullQualifiers(),
PartitionCompensator.ALL_PARTITIONS);
+ }
+ // only collect once and maybe query more than once, we collect all of
them to make sure query data is
+ // correct
+ tableUsedPartitionNameMap.put(table.getFullQualifiers(),
+ Pair.of(catalogRelation.getRelationId(), tablePartitions));
+ return catalogRelation;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java
index 1252f3b4bbf..68eb11e79ed 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java
@@ -18,8 +18,6 @@
package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.catalog.TableIf.TableType;
-import org.apache.doris.nereids.memo.Group;
-import org.apache.doris.nereids.rules.exploration.mv.StructInfo;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
@@ -31,12 +29,14 @@ import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import
org.apache.doris.nereids.trees.plans.visitor.ExpressionLineageReplacer.ExpressionReplaceContext;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -47,6 +47,7 @@ import java.util.stream.Collectors;
*/
public class ExpressionLineageReplacer extends DefaultPlanVisitor<Expression,
ExpressionReplaceContext> {
+ public static final Logger LOG =
LogManager.getLogger(ExpressionLineageReplacer.class);
public static final ExpressionLineageReplacer INSTANCE = new
ExpressionLineageReplacer();
@Override
@@ -63,26 +64,8 @@ public class ExpressionLineageReplacer extends
DefaultPlanVisitor<Expression, Ex
@Override
public Expression visitGroupPlan(GroupPlan groupPlan,
ExpressionReplaceContext context) {
- Group group = groupPlan.getGroup();
- if (group == null) {
- return visit(groupPlan, context);
- }
- Collection<StructInfo> structInfos =
group.getstructInfoMap().getStructInfos();
- if (structInfos.isEmpty()) {
- return visit(groupPlan, context);
- }
- // Find first info which the context's bitmap contains all to make
sure that
- // the expression lineage is correct
- Optional<StructInfo> structInfoOptional = structInfos.stream()
- .filter(info -> (context.getTableBitSet().isEmpty()
- || StructInfo.containsAll(context.getTableBitSet(),
info.getTableBitSet()))
- && !info.getNamedExprIdAndExprMapping().isEmpty())
- .findFirst();
- if (!structInfoOptional.isPresent()) {
- return visit(groupPlan, context);
- }
-
context.getExprIdExpressionMap().putAll(structInfoOptional.get().getNamedExprIdAndExprMapping());
- return null;
+ LOG.error("ExpressionLineageReplacer should not meet groupPlan, plan
is {}", groupPlan.treeString());
+ return visit(groupPlan, context);
}
/**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index f3d81529513..c88d43ca56b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -619,9 +619,15 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_SYNC_MV_COST_BASED_REWRITE
= "enable_sync_mv_cost_based_rewrite";
+ public static final String MATERIALIZED_VIEW_REWRITE_DURATION_THRESHOLD
+ = "materialized_view_rewrite_duration_threshold";
+
public static final String MATERIALIZED_VIEW_RELATION_MAPPING_MAX_COUNT
= "materialized_view_relation_mapping_max_count";
+ public static final String MATERIALIZED_VIEW_STRUCT_INFO_MAX_COMBINE_COUNT
+ = "materialized_view_struct_info_max_combine_count";
+
public static final String CREATE_TABLE_PARTITION_MAX_NUM
= "create_table_partition_max_num";
@@ -2098,6 +2104,18 @@ public class SessionVariable implements Serializable,
Writable {
"Whether enable cost based rewrite for sync mv"})
public boolean enableSyncMvCostBasedRewrite = true;
+ @VariableMgr.VarAttr(name = MATERIALIZED_VIEW_REWRITE_DURATION_THRESHOLD,
needForward = true,
+ description = {"物化视图透明改写允许的最长耗时,超过此时长不再进行透明改写",
+ "The maximum duration allowed for transparent rewriting of
materialized views; "
+ + "if this duration is exceeded, transparent
rewriting will no longer be performed."})
+ public long materializedViewRewriteDurationThreshold = 1000L;
+
+ @VariableMgr.VarAttr(name =
MATERIALIZED_VIEW_STRUCT_INFO_MAX_COMBINE_COUNT, needForward = true,
+ description = {"嵌套改写时,允许多元操作符各个输入的结构信息笛卡尔积最大组合数量",
+ "Maximum number of Cartesian product combinations of
structural information from multiple "
+ + "input sources when rewriting nested operations
with multi-operand operators"})
+ public long materializedViewStructInfoMaxCombineCount = 4096L;
+
@VariableMgr.VarAttr(name = CREATE_TABLE_PARTITION_MAX_NUM, needForward =
true,
description = {"建表时创建分区的最大数量",
"The maximum number of partitions created during table
creation"})
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
index c2e402adb82..6470b14f914 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.nereids.rules.exploration.mv.PartitionCompensator;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@@ -135,14 +136,16 @@ public class MTMVRewriteUtilTest {
// currentTimeMills is 3, grace period is 2, and partition
getVisibleVersionTime is 1
// if forceConsistent this should get 0 partitions which mtmv can use.
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
true);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills, true,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
}
@Test
public void testGetMTMVCanRewritePartitionsNormal() {
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
Assert.assertEquals(1, mtmvCanRewritePartitions.size());
}
@@ -163,7 +166,8 @@ public class MTMVRewriteUtilTest {
};
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
Assert.assertEquals(1, mtmvCanRewritePartitions.size());
}
@@ -184,7 +188,8 @@ public class MTMVRewriteUtilTest {
};
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
}
@@ -198,7 +203,8 @@ public class MTMVRewriteUtilTest {
}
};
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
// getMTMVCanRewritePartitions only check the partition is valid or
not, it doesn't care the
// isEnableMaterializedViewRewriteWhenBaseTableUnawareness
Assert.assertEquals(1, mtmvCanRewritePartitions.size());
@@ -216,7 +222,8 @@ public class MTMVRewriteUtilTest {
}
};
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
}
@@ -234,7 +241,8 @@ public class MTMVRewriteUtilTest {
}
};
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
Assert.assertEquals(1, mtmvCanRewritePartitions.size());
}
@@ -252,7 +260,8 @@ public class MTMVRewriteUtilTest {
}
};
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
// getMTMVCanRewritePartitions only check the partition is valid or
not, it doesn't care the
// isEnableMaterializedViewRewriteWhenBaseTableUnawareness
Assert.assertEquals(1, mtmvCanRewritePartitions.size());
@@ -268,7 +277,8 @@ public class MTMVRewriteUtilTest {
}
};
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
}
@@ -282,7 +292,8 @@ public class MTMVRewriteUtilTest {
}
};
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
Assert.assertEquals(1, mtmvCanRewritePartitions.size());
}
@@ -296,7 +307,8 @@ public class MTMVRewriteUtilTest {
}
};
Collection<Partition> mtmvCanRewritePartitions = MTMVRewriteUtil
- .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false);
+ .getMTMVCanRewritePartitions(mtmv, ctx, currentTimeMills,
false,
+ PartitionCompensator.getQueryUsedPartitions(ctx));
Assert.assertEquals(0, mtmvCanRewritePartitions.size());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/StructInfoMapTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/StructInfoMapTest.java
index db77da76c4b..41bd275bc7d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/StructInfoMapTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/StructInfoMapTest.java
@@ -33,6 +33,9 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -60,11 +63,12 @@ class StructInfoMapTest extends SqlTestBase {
Group root = c1.getMemo().getRoot();
Set<BitSet> tableMaps = root.getstructInfoMap().getTableMaps();
Assertions.assertTrue(tableMaps.isEmpty());
- root.getstructInfoMap().refresh(root, c1);
+ root.getstructInfoMap().refresh(root, c1, new HashSet<>());
Assertions.assertEquals(1, tableMaps.size());
new MockUp<MTMVRelationManager>() {
@Mock
- public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean forceConsistent) {
+ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean forceConsistent,
+ Map<List<String>, Set<String>>
queryUsedRelatedTablePartitionsMap) {
return true;
}
};
@@ -88,7 +92,7 @@ class StructInfoMapTest extends SqlTestBase {
.optimize()
.printlnBestPlanTree();
root = c1.getMemo().getRoot();
- root.getstructInfoMap().refresh(root, c1);
+ root.getstructInfoMap().refresh(root, c1, new HashSet<>());
tableMaps = root.getstructInfoMap().getTableMaps();
Assertions.assertEquals(2, tableMaps.size());
dropMvByNereids("drop materialized view mv1");
@@ -117,12 +121,13 @@ class StructInfoMapTest extends SqlTestBase {
Group root = c1.getMemo().getRoot();
Set<BitSet> tableMaps = root.getstructInfoMap().getTableMaps();
Assertions.assertTrue(tableMaps.isEmpty());
- root.getstructInfoMap().refresh(root, c1);
- root.getstructInfoMap().refresh(root, c1);
+ root.getstructInfoMap().refresh(root, c1, new HashSet<>());
+ root.getstructInfoMap().refresh(root, c1, new HashSet<>());
Assertions.assertEquals(1, tableMaps.size());
new MockUp<MTMVRelationManager>() {
@Mock
- public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean isMVPartitionValid) {
+ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean forceConsistent,
+ Map<List<String>, Set<String>>
queryUsedRelatedTablePartitionsMap) {
return true;
}
};
@@ -145,8 +150,8 @@ class StructInfoMapTest extends SqlTestBase {
.optimize()
.printlnBestPlanTree();
root = c1.getMemo().getRoot();
- root.getstructInfoMap().refresh(root, c1);
- root.getstructInfoMap().refresh(root, c1);
+ root.getstructInfoMap().refresh(root, c1, new HashSet<>());
+ root.getstructInfoMap().refresh(root, c1, new HashSet<>());
tableMaps = root.getstructInfoMap().getTableMaps();
Assertions.assertEquals(2, tableMaps.size());
dropMvByNereids("drop materialized view mv1");
@@ -170,7 +175,8 @@ class StructInfoMapTest extends SqlTestBase {
);
new MockUp<MTMVRelationManager>() {
@Mock
- public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean isMVPartitionValid) {
+ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean forceConsistent,
+ Map<List<String>, Set<String>>
queryUsedRelatedTablePartitionsMap) {
return true;
}
};
@@ -192,7 +198,7 @@ class StructInfoMapTest extends SqlTestBase {
.rewrite()
.optimize();
Group root = c1.getMemo().getRoot();
- root.getstructInfoMap().refresh(root, c1);
+ root.getstructInfoMap().refresh(root, c1, new HashSet<>());
StructInfoMap structInfoMap = root.getstructInfoMap();
Assertions.assertEquals(2, structInfoMap.getTableMaps().size());
BitSet mvMap = structInfoMap.getTableMaps().stream()
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java
index 0090982db00..e12eced6330 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.nereids.mv;
import org.apache.doris.catalog.MTMV;
+import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVRelationManager;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.sqltest.SqlTestBase;
@@ -54,7 +55,8 @@ public class IdStatisticsMapTest extends SqlTestBase {
};
new MockUp<MTMVRelationManager>() {
@Mock
- public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean isMVPartitionValid) {
+ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean isMVPartitionValid,
+ Map<BaseTableInfo, Set<String>>
queryUsedRelatedTablePartitionsMap) {
return true;
}
};
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MtmvCacheNewConnectContextTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MtmvCacheNewConnectContextTest.java
index 0134d5df4e7..a15aad19de4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MtmvCacheNewConnectContextTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MtmvCacheNewConnectContextTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.nereids.mv;
import org.apache.doris.catalog.MTMV;
+import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVRelationManager;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.sqltest.SqlTestBase;
@@ -31,6 +32,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.BitSet;
+import java.util.Map;
+import java.util.Set;
/**
* The connectContext would new instance when generate MTMVCache, after
generate, the connectContext should
@@ -52,7 +55,8 @@ public class MtmvCacheNewConnectContextTest extends
SqlTestBase {
};
new MockUp<MTMVRelationManager>() {
@Mock
- public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean forceConsistent) {
+ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean isMVPartitionValid,
+ Map<BaseTableInfo, Set<String>>
queryUsedRelatedTablePartitionsMap) {
return true;
}
};
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MvTableIdIsLongTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MvTableIdIsLongTest.java
index dd15b5e06c7..953849b5d15 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MvTableIdIsLongTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/MvTableIdIsLongTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.nereids.mv;
import org.apache.doris.catalog.MTMV;
+import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVRelationManager;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.sqltest.SqlTestBase;
@@ -31,6 +32,8 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.BitSet;
+import java.util.Map;
+import java.util.Set;
/**
* Test mv rewrite when base table id is lager then integer
@@ -49,7 +52,8 @@ public class MvTableIdIsLongTest extends SqlTestBase {
};
new MockUp<MTMVRelationManager>() {
@Mock
- public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean isMVPartitionValid) {
+ public boolean isMVPartitionValid(MTMV mtmv, ConnectContext ctx,
boolean isMVPartitionValid,
+ Map<BaseTableInfo, Set<String>>
queryUsedRelatedTablePartitionsMap) {
return true;
}
};
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/OptimizeGetAvailableMvsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/OptimizeGetAvailableMvsTest.java
new file mode 100644
index 00000000000..cc5027c25bb
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/OptimizeGetAvailableMvsTest.java
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.mv;
+
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexState;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.common.Pair;
+import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.rules.expression.rules.PartitionPruner;
+import
org.apache.doris.nereids.rules.expression.rules.PartitionPruner.PartitionTableType;
+import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
+import org.apache.doris.nereids.sqltest.SqlTestBase;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.qe.SessionVariable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Test get available mvs after rewrite by rules
+ */
+public class OptimizeGetAvailableMvsTest extends SqlTestBase {
+
+ @Test
+ void testWhenNotPartitionPrune() throws Exception {
+
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+ BitSet disableNereidsRules =
connectContext.getSessionVariable().getDisableNereidsRules();
+ new MockUp<SessionVariable>() {
+ @Mock
+ public BitSet getDisableNereidsRules() {
+ return disableNereidsRules;
+ }
+ };
+
+ new MockUp<OlapTable>() {
+ @Mock
+ public Partition getPartition(long partitionId) {
+ return new Partition() {
+ @Override
+ public long getId() {
+ return 1L;
+ }
+
+ @Override
+ public String getName() {
+ return "mock_partition";
+ }
+
+ @Override
+ public PartitionState getState() {
+ return PartitionState.NORMAL;
+ }
+
+ @Override
+ public MaterializedIndex getIndex(long indexId) {
+ return new MaterializedIndex(1L, IndexState.NORMAL);
+ }
+
+ @Override
+ public DistributionInfo getDistributionInfo() {
+ return new DistributionInfo() {
+ @Override
+ public DistributionInfoType getType() {
+ return DistributionInfoType.RANDOM;
+ }
+ };
+ }
+ };
+ }
+ };
+
+ new MockUp<LogicalOlapScan>() {
+ @Mock
+ public List<Long> getSelectedPartitionIds() {
+ return Lists.newArrayList(1L);
+ }
+ };
+
+ connectContext.getSessionVariable().enableMaterializedViewRewrite =
true;
+ connectContext.getSessionVariable().enableMaterializedViewNestRewrite
= true;
+ createMvByNereids("create materialized view mv1 "
+ + " BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL\n"
+ + " PARTITION BY (id)\n"
+ + " DISTRIBUTED BY RANDOM BUCKETS 1\n"
+ + " PROPERTIES ('replication_num' = '1') \n"
+ + " as "
+ + " select T4.id from T4 inner join T2 "
+ + " on T4.id = T2.id;");
+ CascadesContext c1 = createCascadesContext(
+ "select T4.id "
+ + "from T4 "
+ + "inner join T2 on T4.id = T2.id "
+ + "inner join T3 on T4.id = T3.id",
+ connectContext
+ );
+ PlanChecker.from(c1)
+ .analyze()
+ .rewrite()
+ .optimize()
+ .printlnBestPlanTree();
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap = c1.getStatementContext()
+ .getTableUsedPartitionNameMap();
+ Map<BaseTableInfo, Collection<Partition>> mvCanRewritePartitionsMap =
c1.getStatementContext()
+ .getMvCanRewritePartitionsMap();
+ Assertions.assertFalse(tableUsedPartitionNameMap.isEmpty());
+
+ for (Map.Entry<List<String>, Pair<RelationId, Set<String>>>
tableInfoEntry
+ : tableUsedPartitionNameMap.entries()) {
+ if (tableInfoEntry.getKey().contains("T2")) {
+ Assertions.assertEquals(tableInfoEntry.getValue().value(),
Sets.newHashSet("mock_partition"));
+ } else if (tableInfoEntry.getKey().contains("T3")) {
+ Assertions.assertEquals(tableInfoEntry.getValue().value(),
Sets.newHashSet("mock_partition"));
+ } else if (tableInfoEntry.getKey().contains("T4")) {
+ Assertions.assertEquals(tableInfoEntry.getValue().value(),
Sets.newHashSet("mock_partition"));
+ }
+ }
+
+ Assertions.assertEquals(1, mvCanRewritePartitionsMap.size());
+
Assertions.assertTrue(mvCanRewritePartitionsMap.keySet().iterator().next().getTableName()
+ .equalsIgnoreCase("mv1"));
+
+ dropMvByNereids("drop materialized view mv1");
+ }
+
+ @Test
+ void testWhenPartitionPrune() throws Exception {
+
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+ BitSet disableNereidsRules =
connectContext.getSessionVariable().getDisableNereidsRules();
+ new MockUp<SessionVariable>() {
+ @Mock
+ public BitSet getDisableNereidsRules() {
+ return disableNereidsRules;
+ }
+ };
+
+ new MockUp<PartitionPruner>() {
+ @Mock
+ public <K extends Comparable<K>> List<Long> prune(List<Slot>
partitionSlots, Expression partitionPredicate,
+ Map<K, PartitionItem> idToPartitions, CascadesContext
cascadesContext,
+ PartitionTableType partitionTableType,
Optional<SortedPartitionRanges<K>> sortedPartitionRanges) {
+ return Lists.newArrayList(1L);
+ }
+ };
+
+ new MockUp<OlapTable>() {
+ @Mock
+ public Partition getPartition(long partitionId) {
+ return new Partition() {
+ @Override
+ public long getId() {
+ return 1L;
+ }
+
+ @Override
+ public String getName() {
+ return "mock_partition";
+ }
+
+ @Override
+ public PartitionState getState() {
+ return PartitionState.NORMAL;
+ }
+
+ @Override
+ public MaterializedIndex getIndex(long indexId) {
+ return new MaterializedIndex(1L, IndexState.NORMAL);
+ }
+
+ @Override
+ public DistributionInfo getDistributionInfo() {
+ return new DistributionInfo() {
+ @Override
+ public DistributionInfoType getType() {
+ return DistributionInfoType.RANDOM;
+ }
+ };
+ }
+ };
+ }
+ };
+
+ new MockUp<LogicalOlapScan>() {
+ @Mock
+ public List<Long> getSelectedPartitionIds() {
+ return Lists.newArrayList(1L);
+ }
+ };
+
+ connectContext.getSessionVariable().enableMaterializedViewRewrite =
true;
+ connectContext.getSessionVariable().enableMaterializedViewNestRewrite
= true;
+ createMvByNereids("create materialized view mv2 "
+ + " BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL\n"
+ + " PARTITION BY (id)\n"
+ + " DISTRIBUTED BY RANDOM BUCKETS 1\n"
+ + " PROPERTIES ('replication_num' = '1') \n"
+ + " as "
+ + " select T4.id from T4 inner join T2 "
+ + " on T4.id = T2.id;");
+ CascadesContext c1 = createCascadesContext(
+ "select T4.id "
+ + "from T4 "
+ + "inner join T2 on T4.id = T2.id "
+ + "inner join T3 on T4.id = T3.id "
+ + "where T4.id > 0",
+ connectContext
+ );
+ PlanChecker.from(c1)
+ .analyze()
+ .rewrite()
+ .optimize()
+ .printlnBestPlanTree();
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap = c1.getStatementContext()
+ .getTableUsedPartitionNameMap();
+ Map<BaseTableInfo, Collection<Partition>> mvCanRewritePartitionsMap =
c1.getStatementContext()
+ .getMvCanRewritePartitionsMap();
+ Assertions.assertFalse(tableUsedPartitionNameMap.isEmpty());
+
+ for (Map.Entry<List<String>, Pair<RelationId, Set<String>>>
tableInfoEntry
+ : tableUsedPartitionNameMap.entries()) {
+ if (tableInfoEntry.getKey().contains("T2")) {
+ Assertions.assertEquals(tableInfoEntry.getValue().value(),
Sets.newHashSet("mock_partition"));
+ } else if (tableInfoEntry.getKey().contains("T3")) {
+ Assertions.assertEquals(tableInfoEntry.getValue().value(),
Sets.newHashSet("mock_partition"));
+ } else if (tableInfoEntry.getKey().contains("T4")) {
+ Assertions.assertEquals(tableInfoEntry.getValue().value(),
Sets.newHashSet("mock_partition"));
+ }
+ }
+
+ Assertions.assertEquals(1, mvCanRewritePartitionsMap.size());
+
Assertions.assertTrue(mvCanRewritePartitionsMap.keySet().iterator().next().getTableName()
+ .equalsIgnoreCase("mv2"));
+
+ dropMvByNereids("drop materialized view mv2");
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
index 77ecbd5dc7c..3599b5f7bee 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
@@ -245,7 +245,9 @@ public class PlanChecker {
public PlanChecker rewrite() {
Rewriter.getWholeTreeRewriter(cascadesContext).execute();
+
InitMaterializationContextHook.INSTANCE.initMaterializationContext(this.cascadesContext);
cascadesContext.toMemo();
+ cascadesContext.newTablePartitionCollector().execute();
return this;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]