This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0f3c5442605 [feature](mtmv)mtmv support partition (#28144) 0f3c5442605 is described below commit 0f3c5442605305aae0e50bc4ffe7c66fdd33be5a Author: zhangdong <493738...@qq.com> AuthorDate: Sun Dec 17 18:28:03 2023 +0800 [feature](mtmv)mtmv support partition (#28144) - create MTMV support partition and `AUTO` refresh method - refresh mtmv support support specified partitions - MTMV support incremental updates - add property `EXCLUDED_TRIGGER_TABLES` for mv - Maintain MTMVCache after successful task refresh for plan rewrite(MTMV.getOrGenerateCache) - show partitions add "SyncWithBaseTables" - drop job before drop MTMV - task tvf add "MvId,MvDatabaseId,ErrorMsg,TaskContext,RefreshMode,RefreshPartitions" - add `NotAllowFallback` for mtmv not fallback to old planner - add `MTMVUtils.getMTMVCanRewritePartitions() `and `Env.getCurrentEnv().getMtmvService().getRelationManager().getAvailableMTMVs()` for plan rewrite --- .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 11 +- .../org/apache/doris/analysis/CreateMTMVStmt.java | 18 +- .../main/java/org/apache/doris/catalog/MTMV.java | 56 ++- .../java/org/apache/doris/catalog/OlapTable.java | 22 + .../org/apache/doris/catalog/OlapTableFactory.java | 22 + .../java/org/apache/doris/catalog/Partition.java | 12 + .../org/apache/doris/catalog/PartitionInfo.java | 11 + .../doris/common/proc/PartitionsProcDir.java | 20 +- .../apache/doris/common/util/PropertyAnalyzer.java | 1 + .../apache/doris/datasource/InternalCatalog.java | 6 +- .../apache/doris/job/extensions/mtmv/MTMVJob.java | 35 +- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 175 ++++++-- .../doris/job/extensions/mtmv/MTMVTaskContext.java | 58 +++ .../org/apache/doris/job/task/AbstractTask.java | 1 + .../java/org/apache/doris/mtmv/BaseTableInfo.java | 14 + .../doris/mtmv/{MVCache.java => MTMVCache.java} | 10 +- .../java/org/apache/doris/mtmv/MTMVJobManager.java | 9 +- .../org/apache/doris/mtmv/MTMVPartitionInfo.java | 110 +++++ .../java/org/apache/doris/mtmv/MTMVPlanUtil.java | 108 +++++ .../org/apache/doris/mtmv/MTMVRefreshEnum.java | 3 +- ...VCacheManager.java => MTMVRelationManager.java} | 150 +------ .../java/org/apache/doris/mtmv/MTMVService.java | 17 +- .../main/java/org/apache/doris/mtmv/MTMVUtil.java | 489 +++++++++++++++++++++ .../doris/nereids/parser/LogicalPlanBuilder.java | 58 ++- .../mv/AbstractMaterializedViewRule.java | 2 +- .../mv/InitMaterializationContextHook.java | 6 +- .../exploration/mv/MaterializationContext.java | 14 +- .../trees/plans/commands/AlterMTMVCommand.java | 2 +- .../trees/plans/commands/CreateMTMVCommand.java | 2 +- .../trees/plans/commands/DropMTMVCommand.java | 2 +- .../trees/plans/commands/NotAllowFallback.java} | 48 +- .../trees/plans/commands/RefreshMTMVCommand.java | 2 +- .../plans/commands/UpdateMvByPartitionCommand.java | 47 +- .../trees/plans/commands/info/CreateMTMVInfo.java | 111 ++++- .../trees/plans/commands/info/RefreshMTMVInfo.java | 45 +- .../java/org/apache/doris/qe/StmtExecutor.java | 13 + .../doris/tablefunction/MetadataGenerator.java | 3 + .../tablefunction/MvInfosTableValuedFunction.java | 4 +- 38 files changed, 1410 insertions(+), 307 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 674e4550976..be683f5ee4b 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -90,10 +90,11 @@ statement (REFRESH refreshMethod? refreshTrigger?)? (KEY keys=identifierList)? (COMMENT STRING_LITERAL)? + (PARTITION BY LEFT_PAREN partitionKey = identifier RIGHT_PAREN)? (DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM) (BUCKETS (INTEGER_VALUE | AUTO))?)? propertyClause? AS query #createMTMV - | REFRESH MATERIALIZED VIEW mvName=multipartIdentifier #refreshMTMV + | REFRESH MATERIALIZED VIEW mvName=multipartIdentifier (partitionSpec | COMPLETE)? #refreshMTMV | ALTER MATERIALIZED VIEW mvName=multipartIdentifier ((RENAME newName=identifier) | (REFRESH (refreshMethod | refreshTrigger | refreshMethod refreshTrigger)) | (SET LEFT_PAREN fileProperties=propertyItemList RIGHT_PAREN)) #alterMTMV @@ -158,15 +159,11 @@ refreshTrigger ; refreshSchedule - : EVERY INTEGER_VALUE mvRefreshUnit (STARTS STRING_LITERAL)? - ; - -mvRefreshUnit - : SECOND | MINUTE | HOUR | DAY | WEEK + : EVERY INTEGER_VALUE refreshUnit = identifier (STARTS STRING_LITERAL)? ; refreshMethod - : COMPLETE + : COMPLETE | AUTO ; identifierOrText diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java index bb2efdd6281..9421bb047c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java @@ -20,7 +20,9 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Index; import org.apache.doris.mtmv.EnvInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVRefreshInfo; +import org.apache.doris.mtmv.MTMVRelation; import java.util.ArrayList; import java.util.List; @@ -31,17 +33,21 @@ public class CreateMTMVStmt extends CreateTableStmt { private final String querySql; private final EnvInfo envInfo; private Map<String, String> mvProperties; + private MTMVPartitionInfo mvPartitionInfo; + private MTMVRelation relation; public CreateMTMVStmt(boolean ifNotExists, TableName mvName, List<Column> columns, MTMVRefreshInfo refreshInfo, KeysDesc keyDesc, DistributionDesc distributionDesc, Map<String, String> properties, Map<String, String> mvProperties, String querySql, String comment, - EnvInfo envInfo) { - super(ifNotExists, false, mvName, columns, new ArrayList<Index>(), DEFAULT_ENGINE_NAME, keyDesc, null, + EnvInfo envInfo, PartitionDesc partitionDesc, MTMVPartitionInfo mvPartitionInfo, MTMVRelation relation) { + super(ifNotExists, false, mvName, columns, new ArrayList<Index>(), DEFAULT_ENGINE_NAME, keyDesc, partitionDesc, distributionDesc, properties, null, comment, null, null); this.refreshInfo = refreshInfo; this.querySql = querySql; this.envInfo = envInfo; this.mvProperties = mvProperties; + this.mvPartitionInfo = mvPartitionInfo; + this.relation = relation; } public MTMVRefreshInfo getRefreshInfo() { @@ -59,4 +65,12 @@ public class CreateMTMVStmt extends CreateTableStmt { public Map<String, String> getMvProperties() { return mvProperties; } + + public MTMVPartitionInfo getMvPartitionInfo() { + return mvPartitionInfo; + } + + public MTMVRelation getRelation() { + return relation; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index af0691d94fb..a2c87581fc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -18,21 +18,25 @@ package org.apache.doris.catalog; import org.apache.doris.catalog.OlapTableFactory.MTMVParams; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.mtmv.EnvInfo; +import org.apache.doris.mtmv.MTMVCache; import org.apache.doris.mtmv.MTMVJobInfo; import org.apache.doris.mtmv.MTMVJobManager; +import org.apache.doris.mtmv.MTMVPartitionInfo; +import org.apache.doris.mtmv.MTMVPlanUtil; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; import org.apache.doris.mtmv.MTMVRefreshInfo; import org.apache.doris.mtmv.MTMVRelation; import org.apache.doris.mtmv.MTMVStatus; -import org.apache.doris.mtmv.MVCache; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -41,6 +45,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -62,8 +67,10 @@ public class MTMV extends OlapTable { private Map<String, String> mvProperties; @SerializedName("r") private MTMVRelation relation; - // Should update after every fresh - private MVCache mvCache; + @SerializedName("mpi") + private MTMVPartitionInfo mvPartitionInfo; + // Should update after every fresh, not persist + private MTMVCache cache; // For deserialization public MTMV() { @@ -87,6 +94,8 @@ public class MTMV extends OlapTable { this.status = new MTMVStatus(); this.jobInfo = new MTMVJobInfo(MTMVJobManager.MTMV_JOB_PREFIX + params.tableId); this.mvProperties = params.mvProperties; + this.mvPartitionInfo = params.mvPartitionInfo; + this.relation = params.relation; mvRwLock = new ReentrantReadWriteLock(true); } @@ -119,12 +128,12 @@ public class MTMV extends OlapTable { return relation; } - public MVCache getMvCache() { - return mvCache; + public MTMVCache getCache() { + return cache; } - public void setMvCache(MVCache mvCache) { - this.mvCache = mvCache; + public void setCache(MTMVCache cache) { + this.cache = cache; } public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) { @@ -148,6 +157,12 @@ public class MTMV extends OlapTable { this.status.setSchemaChangeDetail(null); this.status.setRefreshState(MTMVRefreshState.SUCCESS); this.relation = relation; + try { + this.cache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this)); + } catch (Throwable e) { + this.cache = null; + LOG.warn("generate cache failed", e); + } } else { this.status.setRefreshState(MTMVRefreshState.FAIL); } @@ -170,10 +185,36 @@ public class MTMV extends OlapTable { } } + public Set<String> getExcludedTriggerTables() { + if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { + return Sets.newHashSet(); + } + String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(","); + return Sets.newHashSet(split); + } + + public MTMVCache getOrGenerateCache() throws AnalysisException { + if (cache == null) { + writeMvLock(); + try { + if (cache == null) { + this.cache = MTMVCache.from(this, MTMVPlanUtil.createMTMVContext(this)); + } + } finally { + writeMvUnlock(); + } + } + return cache; + } + public Map<String, String> getMvProperties() { return mvProperties; } + public MTMVPartitionInfo getMvPartitionInfo() { + return mvPartitionInfo; + } + public void readMvLock() { this.mvRwLock.readLock().lock(); } @@ -207,6 +248,7 @@ public class MTMV extends OlapTable { jobInfo = materializedView.jobInfo; mvProperties = materializedView.mvProperties; relation = materializedView.relation; + mvPartitionInfo = materializedView.mvPartitionInfo; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 05a12ed387e..0ce2df5e6f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -947,6 +947,17 @@ public class OlapTable extends Table { return getPartition(partitionName, true); } + public Partition getPartitionOrAnalysisException(String partitionName) throws AnalysisException { + Partition partition = getPartition(partitionName, false); + if (partition == null) { + partition = getPartition(partitionName, true); + } + if (partition == null) { + throw new AnalysisException("partition not found: " + partitionName); + } + return partition; + } + // get partition by name public Partition getPartition(String partitionName, boolean isTempPartition) { if (isTempPartition) { @@ -965,6 +976,17 @@ public class OlapTable extends Table { return partition; } + public Partition getPartitionOrAnalysisException(long partitionId) throws AnalysisException { + Partition partition = idToPartition.get(partitionId); + if (partition == null) { + partition = tempPartitions.getPartition(partitionId); + } + if (partition == null) { + throw new AnalysisException("partition not found: " + partitionId); + } + return partition; + } + // select the non-empty partition ids belonging to this table. // // ATTN: partitions not belonging to this table will be filtered. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java index af87ec47991..7d11ed7bdd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java @@ -22,7 +22,9 @@ import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DdlStmt; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.mtmv.EnvInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVRefreshInfo; +import org.apache.doris.mtmv.MTMVRelation; import com.google.common.base.Preconditions; @@ -49,6 +51,8 @@ public class OlapTableFactory { public EnvInfo envInfo; public String querySql; public Map<String, String> mvProperties; + public MTMVPartitionInfo mvPartitionInfo; + public MTMVRelation relation; } private BuildParams params; @@ -158,6 +162,22 @@ public class OlapTableFactory { return this; } + private OlapTableFactory withMvPartitionInfo(MTMVPartitionInfo mvPartitionInfo) { + Preconditions.checkState(params instanceof MTMVParams, "Invalid argument for " + + params.getClass().getSimpleName()); + MTMVParams mtmvParams = (MTMVParams) params; + mtmvParams.mvPartitionInfo = mvPartitionInfo; + return this; + } + + private OlapTableFactory withMvRelation(MTMVRelation relation) { + Preconditions.checkState(params instanceof MTMVParams, "Invalid argument for " + + params.getClass().getSimpleName()); + MTMVParams mtmvParams = (MTMVParams) params; + mtmvParams.relation = relation; + return this; + } + public OlapTableFactory withExtraParams(DdlStmt stmt) { boolean isMaterializedView = stmt instanceof CreateMTMVStmt; if (!isMaterializedView) { @@ -168,6 +188,8 @@ public class OlapTableFactory { return withRefreshInfo(createMTMVStmt.getRefreshInfo()) .withQuerySql(createMTMVStmt.getQuerySql()) .withMvProperties(createMTMVStmt.getMvProperties()) + .withMvPartitionInfo(createMTMVStmt.getMvPartitionInfo()) + .withMvRelation(createMTMVStmt.getRelation()) .withEnvInfo(createMTMVStmt.getEnvInfo()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 3a1905bd8e7..76c3097a033 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -168,6 +168,18 @@ public class Partition extends MetaObject implements Writable { return visibleVersionTime; } + /** + * if visibleVersion is 1, do not return creation time but 0 + * + * @return + */ + public long getVisibleVersionTimeIgnoreInit() { + if (visibleVersion == 1) { + return 0L; + } + return visibleVersionTime; + } + // The method updateVisibleVersionAndVersionHash is called when fe restart, the visibleVersionTime is updated private void setVisibleVersion(long visibleVersion) { this.visibleVersion = visibleVersion; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 721bf0ebaef..e61a7a1070f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -147,6 +147,17 @@ public class PartitionInfo implements Writable { return item; } + public PartitionItem getItemOrAnalysisException(long partitionId) throws AnalysisException { + PartitionItem item = idToItem.get(partitionId); + if (item == null) { + item = idToTempItem.get(partitionId); + } + if (item == null) { + throw new AnalysisException("PartitionItem not found: " + partitionId); + } + return item; + } + public void setItem(long partitionId, boolean isTemp, PartitionItem item) { setItemInternal(partitionId, isTemp, item); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 9319c96e040..01feaf23683 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -29,6 +29,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; @@ -37,17 +38,20 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.mtmv.MTMVUtil; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; import java.util.ArrayList; import java.util.Collection; @@ -68,7 +72,7 @@ public class PartitionsProcDir implements ProcDirInterface { .add("State").add("PartitionKey").add("Range").add("DistributionKey") .add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime").add("RemoteStoragePolicy") .add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation") - .add("IsMutable") + .add("IsMutable").add("SyncWithBaseTables").add("UnsyncTables") .build(); private Database db; @@ -303,6 +307,20 @@ public class PartitionsProcDir implements ProcDirInterface { partitionInfo.add(tblPartitionInfo.getReplicaAllocation(partitionId).toCreateStmt()); partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId)); + if (olapTable instanceof MTMV) { + try { + List<String> partitionUnSyncTables = MTMVUtil + .getPartitionUnSyncTables((MTMV) olapTable, partitionId); + partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables)); + partitionInfo.add(partitionUnSyncTables.toString()); + } catch (AnalysisException e) { + partitionInfo.add(false); + partitionInfo.add(e.getMessage()); + } + } else { + partitionInfo.add(true); + partitionInfo.add(FeConstants.null_string); + } partitionInfos.add(partitionInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 75dc07e915d..32ba1c8306d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -158,6 +158,7 @@ public class PropertyAnalyzer { public static final String PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT = "enable_duplicate_without_keys_by_default"; public static final String PROPERTIES_GRACE_PERIOD = "grace_period"; + public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = "excluded_trigger_tables"; // For unique key data model, the feature Merge-on-Write will leverage a primary // key index and a delete-bitmap to mark duplicate keys as deleted in load stage, // which can avoid the merging cost in read stage, and accelerate the aggregation diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 78b02a477f7..8d8e76c3cf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -886,6 +886,9 @@ public class InternalCatalog implements CatalogIf<Database> { + " please use \"DROP table FORCE\"."); } } + if (table.getType() == TableType.MATERIALIZED_VIEW) { + Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table); + } unprotectDropTable(db, table, stmt.isForceDrop(), false, 0); if (!stmt.isForceDrop()) { recycleTime = Env.getCurrentRecycleBin().getRecycleTimeById(table.getId()); @@ -898,9 +901,6 @@ public class InternalCatalog implements CatalogIf<Database> { Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(), db.getId(), table.getId()); Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId()); - if (table.getType() == TableType.MATERIALIZED_VIEW) { - Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table); - } Env.getCurrentEnv().getMtmvService().dropTable(table); } catch (UserException e) { throw new DdlException(e.getMessage(), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index 6321679b6b2..5ee9b43fe1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -30,6 +30,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.common.JobType; import org.apache.doris.job.common.TaskType; +import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ShowResultSetMetaData; import org.apache.doris.thrift.TCell; @@ -39,7 +40,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; -import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,9 +47,8 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; -public class MTMVJob extends AbstractJob<MTMVTask, Map> { +public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> { private static final Logger LOG = LogManager.getLogger(MTMVJob.class); private static final ShowResultSetMetaData JOB_META_DATA = ShowResultSetMetaData.builder() @@ -98,6 +97,9 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> { @SerializedName(value = "mi") private long mtmvId; + public MTMVJob() { + } + public MTMVJob(long dbId, long mtmvId) { this.dbId = dbId; this.mtmvId = mtmvId; @@ -110,8 +112,11 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> { } @Override - public List<MTMVTask> createTasks(TaskType taskType, Map taskContext) { - MTMVTask task = new MTMVTask(dbId, mtmvId); + public List<MTMVTask> createTasks(TaskType taskType, MTMVTaskContext taskContext) { + if (taskContext == null) { + taskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); + } + MTMVTask task = new MTMVTask(dbId, mtmvId, taskContext); task.setTaskType(taskType); ArrayList<MTMVTask> tasks = new ArrayList<>(); tasks.add(task); @@ -119,9 +124,25 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> { return tasks; } + /** + * if user trigger, return true + * if system trigger, Check if there are any system triggered tasks, and if so, return false + * + * @param taskContext + * @return + */ @Override - public boolean isReadyForScheduling(Map taskContext) { - return CollectionUtils.isEmpty(getRunningTasks()); + public boolean isReadyForScheduling(MTMVTaskContext taskContext) { + if (taskContext != null) { + return true; + } + List<MTMVTask> runningTasks = getRunningTasks(); + for (MTMVTask task : runningTasks) { + if (task.getTaskContext() == null || task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) { + return false; + } + } + return true; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 804113d9892..8b2d2835127 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -17,24 +17,26 @@ package org.apache.doris.job.extensions.mtmv; -import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf.TableType; -import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.CatalogIf; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; -import org.apache.doris.mtmv.MTMVCacheManager; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.mtmv.MTMVPlanUtil; +import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import org.apache.doris.mtmv.MTMVRelation; -import org.apache.doris.mysql.privilege.Auth; +import org.apache.doris.mtmv.MTMVUtil; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; @@ -44,10 +46,18 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.gson.Gson; import com.google.gson.annotations.SerializedName; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.UUID; public class MTMVTask extends AbstractTask { @@ -58,12 +68,17 @@ public class MTMVTask extends AbstractTask { new Column("TaskId", ScalarType.createStringType()), new Column("JobId", ScalarType.createStringType()), new Column("JobName", ScalarType.createStringType()), + new Column("MvId", ScalarType.createStringType()), + new Column("MvDatabaseId", ScalarType.createStringType()), new Column("Status", ScalarType.createStringType()), + new Column("ErrorMsg", ScalarType.createStringType()), new Column("CreateTime", ScalarType.createStringType()), new Column("StartTime", ScalarType.createStringType()), new Column("FinishTime", ScalarType.createStringType()), new Column("DurationMs", ScalarType.createStringType()), - new Column("ExecuteSql", ScalarType.createStringType())); + new Column("TaskContext", ScalarType.createStringType()), + new Column("RefreshMode", ScalarType.createStringType()), + new Column("RefreshPartitions", ScalarType.createStringType())); public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX; @@ -75,34 +90,66 @@ public class MTMVTask extends AbstractTask { COLUMN_TO_INDEX = builder.build(); } + public enum MTMVTaskTriggerMode { + MANUAL, + SYSTEM + } + + public enum MTMVTaskRefreshMode { + COMPLETE, + PARTITION, + NOT_REFRESH + } + @SerializedName(value = "di") private long dbId; @SerializedName(value = "mi") private long mtmvId; - @SerializedName("sql") - private String sql; + @SerializedName("taskContext") + private MTMVTaskContext taskContext; + @SerializedName("refreshPartitions") + List<String> refreshPartitions; + @SerializedName("refreshMode") + MTMVTaskRefreshMode refreshMode; private MTMV mtmv; private MTMVRelation relation; private StmtExecutor executor; + private Set<Long> refreshPartitionIds = Sets.newHashSet(); + + public MTMVTask() { + } - public MTMVTask(long dbId, long mtmvId) { - this.dbId = dbId; - this.mtmvId = mtmvId; + public MTMVTask(long dbId, long mtmvId, MTMVTaskContext taskContext) { + this.dbId = Objects.requireNonNull(dbId); + this.mtmvId = Objects.requireNonNull(mtmvId); + this.taskContext = Objects.requireNonNull(taskContext); } @Override public void run() throws JobException { try { - ConnectContext ctx = createContext(); + ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); TUniqueId queryId = generateQueryId(); // Every time a task is run, the relation is regenerated because baseTables and baseViews may change, // such as deleting a table and creating a view with the same name - relation = MTMVCacheManager.generateMTMVRelation(mtmv, ctx); - executor = new StmtExecutor(ctx, sql); - executor.execute(queryId); + relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); + calculateRefreshInfo(); + Map<OlapTable, String> tableWithPartKey = Maps.newHashMap(); + if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { + return; + } else if (refreshMode == MTMVTaskRefreshMode.PARTITION) { + OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); + tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol()); + } + refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, refreshPartitionIds); + UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand + .from(mtmv, refreshPartitionIds, tableWithPartKey); + executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); + ctx.setQueryId(queryId); + command.run(ctx, executor); } catch (Throwable e) { - LOG.warn(e); + LOG.warn("run task failed: ", e); throw new JobException(e); } } @@ -134,9 +181,12 @@ public class MTMVTask extends AbstractTask { try { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId); mtmv = (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW); - sql = generateSql(mtmv); + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); + MTMVUtil.alignMvPartition(mtmv, relatedTable); + } } catch (UserException e) { - LOG.warn(e); + LOG.warn("before task failed:", e); throw new JobException(e); } } @@ -147,46 +197,27 @@ public class MTMVTask extends AbstractTask { trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getTaskId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId()))); trow.addToColumnValue(new TCell().setStringVal(super.getJobName())); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(mtmvId))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(dbId))); trow.addToColumnValue(new TCell() .setStringVal(super.getStatus() == null ? FeConstants.null_string : super.getStatus().toString())); + trow.addToColumnValue(new TCell().setStringVal(super.getErrMsg())); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs()))); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getStartTimeMs()))); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(super.getFinishTimeMs()))); trow.addToColumnValue(new TCell().setStringVal( (super.getFinishTimeMs() == null || super.getFinishTimeMs() == 0) ? FeConstants.null_string : String.valueOf(super.getFinishTimeMs() - super.getStartTimeMs()))); - trow.addToColumnValue(new TCell().setStringVal(sql)); + trow.addToColumnValue(new TCell() + .setStringVal(taskContext == null ? FeConstants.null_string : new Gson().toJson(taskContext))); + trow.addToColumnValue( + new TCell().setStringVal(refreshMode == null ? FeConstants.null_string : refreshMode.toString())); + trow.addToColumnValue( + new TCell().setStringVal( + refreshPartitions == null ? FeConstants.null_string : new Gson().toJson(refreshPartitions))); return trow; } - private static String generateSql(MTMV mtmv) { - StringBuilder builder = new StringBuilder(); - builder.append("INSERT OVERWRITE TABLE "); - builder.append(mtmv.getDatabase().getCatalog().getName()); - builder.append("."); - builder.append(ClusterNamespace.getNameFromFullName(mtmv.getQualifiedDbName())); - builder.append("."); - builder.append(mtmv.getName()); - builder.append(" "); - builder.append(mtmv.getQuerySql()); - return builder.toString(); - } - - private ConnectContext createContext() throws AnalysisException { - ConnectContext ctx = new ConnectContext(); - ctx.setEnv(Env.getCurrentEnv()); - ctx.setQualifiedUser(Auth.ADMIN_USER); - ctx.setCurrentUserIdentity(UserIdentity.ADMIN); - ctx.getState().reset(); - ctx.setThreadLocalInfo(); - CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr() - .getCatalogOrAnalysisException(mtmv.getEnvInfo().getCtlId()); - ctx.changeDefaultCatalog(catalog.getName()); - ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName()); - ctx.getSessionVariable().enableFallbackToOriginalPlanner = false; - return ctx; - } - private TUniqueId generateQueryId() { UUID taskId = UUID.randomUUID(); return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); @@ -198,5 +229,55 @@ public class MTMVTask extends AbstractTask { mtmv = null; relation = null; executor = null; + refreshPartitionIds = null; + } + + private void calculateRefreshInfo() throws AnalysisException { + // check whether the user manually triggers it + if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) { + if (taskContext.isComplete()) { + this.refreshMode = MTMVTaskRefreshMode.COMPLETE; + return; + } else if (!CollectionUtils + .isEmpty(taskContext.getPartitions())) { + this.refreshMode = MTMVTaskRefreshMode.PARTITION; + this.refreshPartitionIds = MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions()); + return; + } + } + // check if data is fresh + Set<String> excludedTriggerTables = mtmv.getExcludedTriggerTables(); + boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), excludedTriggerTables, 0L); + if (fresh) { + this.refreshMode = MTMVTaskRefreshMode.NOT_REFRESH; + return; + } + // current, if partitionType is SELF_MANAGE, we can only FULL refresh + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.SELF_MANAGE) { + this.refreshMode = MTMVTaskRefreshMode.COMPLETE; + return; + } + // if refreshMethod is COMPLETE, we only FULL refresh + if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE) { + this.refreshMode = MTMVTaskRefreshMode.COMPLETE; + return; + } + OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); + excludedTriggerTables.add(relatedTable.getName()); + // check if every table except relatedTable is fresh + Set<Long> mtmvNeedRefreshPartitions = MTMVUtil.getMTMVNeedRefreshPartitions(mtmv); + // if true, we can use `Partition`, otherwise must `FULL` + if (mtmvNeedRefreshPartitions.size() != mtmv.getPartitionNum()) { + this.refreshMode = MTMVTaskRefreshMode.PARTITION; + this.refreshPartitionIds = mtmvNeedRefreshPartitions; + return; + } else { + this.refreshMode = MTMVTaskRefreshMode.COMPLETE; + return; + } + } + + public MTMVTaskContext getTaskContext() { + return taskContext; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java new file mode 100644 index 00000000000..adb6fd5ef71 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.job.extensions.mtmv; + +import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; + +public class MTMVTaskContext { + + @SerializedName(value = "triggerMode") + private MTMVTaskTriggerMode triggerMode; + + @SerializedName(value = "partitions") + private List<String> partitions; + + @SerializedName(value = "isComplete") + private boolean isComplete; + + public MTMVTaskContext(MTMVTaskTriggerMode triggerMode) { + this.triggerMode = triggerMode; + } + + public MTMVTaskContext(MTMVTaskTriggerMode triggerMode, List<String> partitions, boolean isComplete) { + this.triggerMode = triggerMode; + this.partitions = partitions; + this.isComplete = isComplete; + } + + public List<String> getPartitions() { + return partitions; + } + + public MTMVTaskTriggerMode getTriggerMode() { + return triggerMode; + } + + public boolean isComplete() { + return isComplete; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 654ee4fbc2f..f887961230f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -120,6 +120,7 @@ public abstract class AbstractTask implements Task { run(); onSuccess(); } catch (Exception e) { + this.errMsg = e.getMessage(); onFail(); log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index 575552cb61e..9b3b6be04f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -19,13 +19,18 @@ package org.apache.doris.mtmv; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; import com.google.common.base.Objects; import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class BaseTableInfo { + private static final Logger LOG = LogManager.getLogger(BaseTableInfo.class); + @SerializedName("ti") private Long tableId; @SerializedName("di") @@ -88,4 +93,13 @@ public class BaseTableInfo { + ", ctlId=" + ctlId + '}'; } + + public String getTableName() { + try { + return MTMVUtil.getTable(this).getName(); + } catch (AnalysisException e) { + LOG.warn("can not get table: " + this); + return ""; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java similarity index 90% rename from fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java rename to fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java index b5cf92f87e4..e4aa36d9b61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java @@ -36,14 +36,14 @@ import java.util.stream.Collectors; /** * The cache for materialized view cache */ -public class MVCache { +public class MTMVCache { // the materialized view plan which should be optimized by the same rules to query private final Plan logicalPlan; // this should be shuttle expression with lineage private final List<NamedExpression> mvOutputExpressions; - public MVCache(MTMV materializedView, Plan logicalPlan, List<NamedExpression> mvOutputExpressions) { + public MTMVCache(MTMV materializedView, Plan logicalPlan, List<NamedExpression> mvOutputExpressions) { this.logicalPlan = logicalPlan; this.mvOutputExpressions = mvOutputExpressions; } @@ -56,12 +56,12 @@ public class MVCache { return mvOutputExpressions; } - public MVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) { + public MTMVCache(Plan logicalPlan, List<NamedExpression> mvOutputExpressions) { this.logicalPlan = logicalPlan; this.mvOutputExpressions = mvOutputExpressions; } - public static MVCache from(MTMV mtmv, ConnectContext connectContext) { + public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) { LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(mtmv.getQuerySql()); // TODO: connect context set current db when create mv by use database StatementContext mvSqlStatementContext = new StatementContext(connectContext, @@ -77,6 +77,6 @@ public class MVCache { List<NamedExpression> mvOutputExpressions = mvRewrittenPlan.getExpressions().stream() .map(NamedExpression.class::cast) .collect(Collectors.toList()); - return new MVCache(mvPlan, mvOutputExpressions); + return new MTMVCache(mvPlan, mvOutputExpressions); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index 416f7f764b6..bdbc3231181 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -33,6 +33,8 @@ import org.apache.doris.job.common.JobType; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.mtmv.MTMVJob; import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode; +import org.apache.doris.job.extensions.mtmv.MTMVTaskContext; import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; @@ -48,7 +50,7 @@ import java.util.List; * when do some operation, do something about job */ public class MTMVJobManager implements MTMVHookService { - public static final String MTMV_JOB_PREFIX = "mtmv_"; + public static final String MTMV_JOB_PREFIX = "inner_mtmv_"; /** * create MTMVJob @@ -174,7 +176,10 @@ public class MTMVJobManager implements MTMVHookService { throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size()); } try { - Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), null); + MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(), + info.isComplete()); + Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), mtmvTaskContext); + } catch (JobException e) { e.printStackTrace(); throw new DdlException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java new file mode 100644 index 00000000000..2b862bfab23 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import com.google.gson.annotations.SerializedName; + +/** + * MTMVPartitionInfo + */ +public class MTMVPartitionInfo { + + public enum MTMVPartitionType { + FOLLOW_BASE_TABLE, + SELF_MANAGE + } + + @SerializedName("pt") + MTMVPartitionType partitionType; + @SerializedName("rt") + BaseTableInfo relatedTable; + @SerializedName("rc") + String relatedCol; + @SerializedName("pc") + String partitionCol; + + public MTMVPartitionInfo() { + } + + public MTMVPartitionInfo(MTMVPartitionType partitionType) { + this.partitionType = partitionType; + } + + public MTMVPartitionInfo(MTMVPartitionType partitionType, + String partitionCol) { + this.partitionType = partitionType; + this.partitionCol = partitionCol; + } + + public MTMVPartitionType getPartitionType() { + return partitionType; + } + + public void setPartitionType(MTMVPartitionType partitionType) { + this.partitionType = partitionType; + } + + public BaseTableInfo getRelatedTable() { + return relatedTable; + } + + public void setRelatedTable(BaseTableInfo relatedTable) { + this.relatedTable = relatedTable; + } + + public String getRelatedCol() { + return relatedCol; + } + + public void setRelatedCol(String relatedCol) { + this.relatedCol = relatedCol; + } + + public String getPartitionCol() { + return partitionCol; + } + + public void setPartitionCol(String partitionCol) { + this.partitionCol = partitionCol; + } + + @Override + public String toString() { + return "MTMVPartitionInfo{" + + "partitionType=" + partitionType + + ", relatedTable=" + relatedTable + + ", relatedCol='" + relatedCol + '\'' + + ", partitionCol='" + partitionCol + '\'' + + '}'; + } + + public String toNameString() { + if (partitionType == MTMVPartitionType.SELF_MANAGE) { + return "MTMVPartitionInfo{" + + "partitionType=" + partitionType + + '}'; + } else { + return "MTMVPartitionInfo{" + + "partitionType=" + partitionType + + ", relatedTable=" + relatedTable.getTableName() + + ", relatedCol='" + relatedCol + '\'' + + ", partitionCol='" + partitionCol + '\'' + + '}'; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java new file mode 100644 index 00000000000..a8bc43a159e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.mysql.privilege.Auth; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.exceptions.ParseException; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.visitor.TableCollector; +import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext; +import org.apache.doris.qe.ConnectContext; + +import java.util.List; +import java.util.Set; + +public class MTMVPlanUtil { + + public static ConnectContext createMTMVContext(MTMV mtmv) throws AnalysisException { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setQualifiedUser(Auth.ADMIN_USER); + ctx.setCurrentUserIdentity(UserIdentity.ADMIN); + ctx.getState().reset(); + ctx.setThreadLocalInfo(); + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(mtmv.getEnvInfo().getCtlId()); + ctx.changeDefaultCatalog(catalog.getName()); + ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName()); + ctx.getSessionVariable().enableFallbackToOriginalPlanner = false; + return ctx; + } + + public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) { + Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx); + return generateMTMVRelation(plan); + } + + public static MTMVRelation generateMTMVRelation(Plan plan) { + return new MTMVRelation(getBaseTables(plan), getBaseViews(plan)); + } + + private static Set<BaseTableInfo> getBaseTables(Plan plan) { + TableCollectorContext collectorContext = + new TableCollector.TableCollectorContext( + com.google.common.collect.Sets.newHashSet(TableType.MATERIALIZED_VIEW, TableType.OLAP)); + plan.accept(TableCollector.INSTANCE, collectorContext); + List<TableIf> collectedTables = collectorContext.getCollectedTables(); + return transferTableIfToInfo(collectedTables); + } + + private static Set<BaseTableInfo> getBaseViews(Plan plan) { + TableCollectorContext collectorContext = + new TableCollector.TableCollectorContext( + com.google.common.collect.Sets.newHashSet(TableType.VIEW)); + plan.accept(TableCollector.INSTANCE, collectorContext); + List<TableIf> collectedTables = collectorContext.getCollectedTables(); + return transferTableIfToInfo(collectedTables); + } + + private static Set<BaseTableInfo> transferTableIfToInfo(List<TableIf> tables) { + Set<BaseTableInfo> result = com.google.common.collect.Sets.newHashSet(); + for (TableIf table : tables) { + result.add(new BaseTableInfo(table)); + } + return result; + } + + private static Plan getPlanBySql(String querySql, ConnectContext ctx) { + List<StatementBase> statements; + try { + statements = new NereidsParser().parseSQL(querySql); + } catch (Exception e) { + throw new ParseException("Nereids parse failed. " + e.getMessage()); + } + StatementBase parsedStmt = statements.get(0); + LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); + return planner.plan(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java index ed2f0f709f4..0f4f904c573 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java @@ -26,7 +26,8 @@ public class MTMVRefreshEnum { * RefreshMethod */ public enum RefreshMethod { - COMPLETE //complete + COMPLETE, //complete + AUTO //try to update incrementally, if not possible, update in full } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java similarity index 50% rename from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java rename to fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java index 950fc79ef7d..92149b3b465 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java @@ -17,38 +17,19 @@ package org.apache.doris.mtmv; -import org.apache.doris.analysis.StatementBase; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.extensions.mtmv.MTMVTask; -import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; -import org.apache.doris.nereids.NereidsPlanner; -import org.apache.doris.nereids.exceptions.ParseException; -import org.apache.doris.nereids.glue.LogicalPlanAdapter; -import org.apache.doris.nereids.parser.NereidsParser; -import org.apache.doris.nereids.properties.PhysicalProperties; -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; -import org.apache.doris.nereids.trees.plans.visitor.TableCollector; -import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext; import org.apache.doris.persist.AlterMTMV; -import org.apache.doris.qe.ConnectContext; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; @@ -63,126 +44,34 @@ import java.util.Set; /** * when do some operation, do something about cache */ -public class MTMVCacheManager implements MTMVHookService { - private static final Logger LOG = LogManager.getLogger(MTMVCacheManager.class); +public class MTMVRelationManager implements MTMVHookService { + private static final Logger LOG = LogManager.getLogger(MTMVRelationManager.class); private Map<BaseTableInfo, Set<BaseTableInfo>> tableMTMVs = Maps.newConcurrentMap(); public Set<BaseTableInfo> getMtmvsByBaseTable(BaseTableInfo table) { return tableMTMVs.get(table); } - // TODO Implement the method which getting materialized view by tables - public List<MTMV> getAvailableMaterializedView(List<BaseTableInfo> tables) { - return ImmutableList.of(); - } - - public boolean isAvailableMTMV(MTMV mtmv, ConnectContext ctx) throws AnalysisException, DdlException { - // check session variable if enable rewrite - if (!ctx.getSessionVariable().isEnableMvRewrite()) { - return false; - } - MTMVRelation mtmvRelation = mtmv.getRelation(); - if (mtmvRelation == null) { - return false; - } - // chaek mv is normal - if (!(mtmv.getStatus().getState() == MTMVState.NORMAL - && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { - return false; - } - // check external table - boolean containsExternalTable = containsExternalTable(mtmvRelation.getBaseTables()); - if (containsExternalTable) { - return ctx.getSessionVariable().isEnableExternalMvRewrite(); - } - // check gracePeriod - Long gracePeriod = mtmv.getGracePeriod(); - // do not care data is delayed - if (gracePeriod < 0) { - return true; - } - // compare with base table - Long mtmvLastTime = getTableLastVisibleVersionTime(mtmv); - Long maxAvailableTime = mtmvLastTime + gracePeriod; - for (BaseTableInfo baseTableInfo : mtmvRelation.getBaseTables()) { - long tableLastVisibleVersionTime = getTableLastVisibleVersionTime(baseTableInfo); - if (tableLastVisibleVersionTime > maxAvailableTime) { - return false; - } - } - return true; - } - - private long getTableLastVisibleVersionTime(BaseTableInfo baseTableInfo) throws AnalysisException, DdlException { - Table table = Env.getCurrentEnv().getInternalCatalog() - .getDbOrAnalysisException(baseTableInfo.getDbId()) - .getTableOrDdlException(baseTableInfo.getTableId(), TableType.OLAP); - return getTableLastVisibleVersionTime((OlapTable) table); - } - - private long getTableLastVisibleVersionTime(OlapTable table) { - long result = 0L; - long visibleVersionTime; - for (Partition partition : table.getAllPartitions()) { - visibleVersionTime = partition.getVisibleVersionTime(); - if (visibleVersionTime > result) { - result = visibleVersionTime; - } - } - return result; - } - - private boolean containsExternalTable(Set<BaseTableInfo> baseTableInfos) { - for (BaseTableInfo baseTableInfo : baseTableInfos) { - if (InternalCatalog.INTERNAL_CATALOG_ID != baseTableInfo.getCtlId()) { - return true; + public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos) { + Set<MTMV> res = Sets.newHashSet(); + Set<BaseTableInfo> mvInfos = getAvailableMTMVInfos(tableInfos); + for (BaseTableInfo tableInfo : mvInfos) { + try { + res.add((MTMV) MTMVUtil.getTable(tableInfo)); + } catch (AnalysisException e) { + // not throw exception to client, just ignore it + LOG.warn("getTable failed: {}", tableInfo.toString(), e); } } - return false; - } - - public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) { - Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx); - return new MTMVRelation(getBaseTables(plan), getBaseViews(plan)); - } - - private static Set<BaseTableInfo> getBaseTables(Plan plan) { - TableCollectorContext collectorContext = - new TableCollector.TableCollectorContext( - Sets.newHashSet(TableType.MATERIALIZED_VIEW, TableType.OLAP)); - plan.accept(TableCollector.INSTANCE, collectorContext); - List<TableIf> collectedTables = collectorContext.getCollectedTables(); - return transferTableIfToInfo(collectedTables); - } - - private static Set<BaseTableInfo> getBaseViews(Plan plan) { - TableCollectorContext collectorContext = - new TableCollector.TableCollectorContext( - Sets.newHashSet(TableType.VIEW)); - plan.accept(TableCollector.INSTANCE, collectorContext); - List<TableIf> collectedTables = collectorContext.getCollectedTables(); - return transferTableIfToInfo(collectedTables); - } - - private static Set<BaseTableInfo> transferTableIfToInfo(List<TableIf> tables) { - Set<BaseTableInfo> result = Sets.newHashSet(); - for (TableIf table : tables) { - result.add(new BaseTableInfo(table)); - } - return result; + return res; } - private static Plan getPlanBySql(String querySql, ConnectContext ctx) { - List<StatementBase> statements; - try { - statements = new NereidsParser().parseSQL(querySql); - } catch (Exception e) { - throw new ParseException("Nereids parse failed. " + e.getMessage()); + public Set<BaseTableInfo> getAvailableMTMVInfos(List<BaseTableInfo> tableInfos) { + Set<BaseTableInfo> mvInfos = Sets.newHashSet(); + for (BaseTableInfo tableInfo : tableInfos) { + mvInfos.addAll(getMtmvsByBaseTable(tableInfo)); } - StatementBase parsedStmt = statements.get(0); - LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); - NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); - return planner.plan(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE); + return mvInfos; } private Set<BaseTableInfo> getOrCreateMTMVs(BaseTableInfo baseTableInfo) { @@ -233,6 +122,7 @@ public class MTMVCacheManager implements MTMVHookService { /** * modify `tableMTMVs` by MTMVRelation + * * @param mtmv * @param dbId */ @@ -243,6 +133,7 @@ public class MTMVCacheManager implements MTMVHookService { /** * remove cache of mtmv + * * @param mtmv */ @Override @@ -262,6 +153,7 @@ public class MTMVCacheManager implements MTMVHookService { /** * modify `tableMTMVs` by MTMVRelation + * * @param mtmv * @param relation * @param task @@ -276,6 +168,7 @@ public class MTMVCacheManager implements MTMVHookService { /** * update mtmv status to `SCHEMA_CHANGE` + * * @param table */ @Override @@ -285,6 +178,7 @@ public class MTMVCacheManager implements MTMVHookService { /** * update mtmv status to `SCHEMA_CHANGE` + * * @param table */ @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index e83ca4e9496..9530467dee7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -18,10 +18,13 @@ package org.apache.doris.mtmv; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; import org.apache.doris.persist.AlterMTMV; @@ -36,16 +39,16 @@ public class MTMVService { private static final Logger LOG = LogManager.getLogger(MTMVService.class); private Map<String, MTMVHookService> hooks = Maps.newConcurrentMap(); - private MTMVCacheManager cacheManager = new MTMVCacheManager(); + private MTMVRelationManager relationManager = new MTMVRelationManager(); private MTMVJobManager jobManager = new MTMVJobManager(); public MTMVService() { registerHook("MTMVJobManager", jobManager); - registerHook("MTMVCacheManager", cacheManager); + registerHook("MTMVRelationManager", relationManager); } - public MTMVCacheManager getCacheManager() { - return cacheManager; + public MTMVRelationManager getRelationManager() { + return relationManager; } public void registerHook(String name, MTMVHookService mtmvHookService) { @@ -76,8 +79,12 @@ public class MTMVService { } } - public void createMTMV(MTMV mtmv) throws DdlException { + public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException { Objects.requireNonNull(mtmv); + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable()); + MTMVUtil.alignMvPartition(mtmv, relatedTable); + } LOG.info("createMTMV: " + mtmv.getName()); for (MTMVHookService mtmvHookService : hooks.values()) { mtmvHookService.createMTMV(mtmv); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java new file mode 100644 index 00000000000..81da2946ff1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java @@ -0,0 +1,489 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.AddPartitionClause; +import org.apache.doris.analysis.DropPartitionClause; +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.PartitionKeyDesc; +import org.apache.doris.analysis.SinglePartitionDesc; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; + +public class MTMVUtil { + private static final Logger LOG = LogManager.getLogger(MTMVUtil.class); + + /** + * get Table by BaseTableInfo + * + * @param baseTableInfo + * @return + * @throws AnalysisException + */ + public static TableIf getTable(BaseTableInfo baseTableInfo) throws AnalysisException { + TableIf table = Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(baseTableInfo.getCtlId()) + .getDbOrAnalysisException(baseTableInfo.getDbId()) + .getTableOrAnalysisException(baseTableInfo.getTableId()); + return table; + } + + /** + * Determine whether the mtmv is sync with tables + * + * @param mtmv + * @param tables + * @param excludedTriggerTables + * @param gracePeriod + * @return + */ + public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, + Set<String> excludedTriggerTables, Long gracePeriod) { + return isSync(getTableMinVisibleVersionTime(mtmv), tables, excludedTriggerTables, gracePeriod); + } + + /** + * Determine whether the partition is sync with retated partition and other baseTables + * + * @param mtmv + * @param partitionId + * @param tables + * @param excludedTriggerTables + * @param gracePeriod + * @return + * @throws AnalysisException + */ + public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables, + Set<String> excludedTriggerTables, Long gracePeriod) throws AnalysisException { + boolean isSyncWithPartition = true; + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + OlapTable relatedTable = (OlapTable) getTable(mtmv.getMvPartitionInfo().getRelatedTable()); + // if follow base table, not need compare with related table, only should compare with related partition + excludedTriggerTables.add(relatedTable.getName()); + PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); + long relatedPartitionId = getExistPartitionId(item, + relatedTable.getPartitionInfo().getIdToItem(false)); + if (relatedPartitionId == -1L) { + LOG.warn("can not found related partition: " + partitionId); + return false; + } + isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId); + } + return isSyncWithPartition && isSync( + mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(), tables, + excludedTriggerTables, gracePeriod); + + } + + /** + * Align the partitions of mtmv and related tables, delete more and add less + * + * @param mtmv + * @param relatedTable + * @throws DdlException + * @throws AnalysisException + */ + public static void alignMvPartition(MTMV mtmv, OlapTable relatedTable) + throws DdlException, AnalysisException { + Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionInfo().getIdToItem(false); + Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionInfo().getIdToItem(false); + // drop partition of mtmv + for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) { + long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems); + if (partitionId == -1L) { + dropPartition(mtmv, entry.getKey()); + } + } + // add partition for mtmv + for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) { + long partitionId = getExistPartitionId(entry.getValue(), mtmvItems); + if (partitionId == -1L) { + addPartition(mtmv, relatedTable, entry.getKey()); + } + } + } + + /** + * get mv.partitions which not sync with relatedTable + * <p> + * Comparing the time of mtmv and relatedTable partitioning, + * if the visibleVersionTime of the base table is later, + * then the partitioning of this mtmv is considered stale + * + * @param mtmv + * @param relatedTable + * @return partitionIds + * @throws DdlException when partition can not found + */ + public static Set<Long> getMTMVStalePartitions(MTMV mtmv, OlapTable relatedTable) + throws AnalysisException { + Set<Long> ids = Sets.newHashSet(); + Map<Long, Set<Long>> mvToBasePartitions = getMvToBasePartitions(mtmv, relatedTable); + for (Entry<Long, Set<Long>> entry : mvToBasePartitions.entrySet()) { + for (Long relatedPartitionId : entry.getValue()) { + boolean syncWithRelatedPartition = isSyncWithPartition(mtmv, entry.getKey(), relatedTable, + relatedPartitionId); + if (!syncWithRelatedPartition) { + ids.add(entry.getKey()); + break; + } + } + } + return ids; + } + + public static List<String> getPartitionNamesByIds(MTMV mtmv, Set<Long> ids) throws AnalysisException { + List<String> res = Lists.newArrayList(); + for (Long partitionId : ids) { + res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName()); + } + return res; + } + + public static Set<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException { + Set<Long> res = Sets.newHashSet(); + for (String partitionName : partitions) { + Partition partition = mtmv.getPartitionOrAnalysisException(partitionName); + res.add(partition.getId()); + } + return res; + } + + /** + * check if table is sync with all baseTables + * + * @param mtmv + * @return + */ + public static boolean isMTMVSync(MTMV mtmv) { + MTMVRelation mtmvRelation = mtmv.getRelation(); + if (mtmvRelation == null) { + return false; + } + return isMTMVSync(mtmv, mtmv.getRelation().getBaseTables(), Sets.newHashSet(), 0L); + } + + /** + * get not sync tables + * + * @param mtmv + * @param partitionId + * @return + * @throws AnalysisException + */ + public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException { + List<String> res = Lists.newArrayList(); + long maxAvailableTime = mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(); + for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) { + TableIf table = getTable(baseTableInfo); + if (!(table instanceof OlapTable)) { + continue; + } + OlapTable olapTable = (OlapTable) table; + if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv + .getMvPartitionInfo().getRelatedTable().equals(baseTableInfo)) { + PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId); + long relatedPartitionId = getExistPartitionId(item, + olapTable.getPartitionInfo().getIdToItem(false)); + if (relatedPartitionId == -1L) { + throw new AnalysisException("can not found related partition"); + } + boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, olapTable, relatedPartitionId); + if (!isSyncWithPartition) { + res.add(olapTable.getName()); + } + } else { + long tableLastVisibleVersionTime = getTableMaxVisibleVersionTime((OlapTable) table); + if (tableLastVisibleVersionTime > maxAvailableTime) { + res.add(table.getName()); + } + } + } + return res; + } + + /** + * Determine which partition of mtmv can be rewritten + * + * @param mtmv + * @param ctx + * @return + */ + public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, ConnectContext ctx) { + List<Partition> res = Lists.newArrayList(); + Collection<Partition> allPartitions = mtmv.getPartitions(); + // check session variable if enable rewrite + if (!ctx.getSessionVariable().isEnableMvRewrite()) { + return res; + } + MTMVRelation mtmvRelation = mtmv.getRelation(); + if (mtmvRelation == null) { + return res; + } + // check mv is normal + if (!(mtmv.getStatus().getState() == MTMVState.NORMAL + && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { + return res; + } + // check gracePeriod + Long gracePeriod = mtmv.getGracePeriod(); + // do not care data is delayed + if (gracePeriod < 0) { + return allPartitions; + } + + for (Partition partition : allPartitions) { + try { + if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet(), + gracePeriod)) { + res.add(partition); + } + } catch (AnalysisException e) { + // ignore it + LOG.warn("check isMTMVPartitionSync failed", e); + } + } + return res; + } + + public static Set<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) { + Collection<Partition> allPartitions = mtmv.getPartitions(); + Set<Long> res = Sets.newHashSet(); + for (Partition partition : allPartitions) { + try { + if (!isMTMVPartitionSync(mtmv, partition.getId(), mtmv.getRelation().getBaseTables(), + mtmv.getExcludedTriggerTables(), + 0L)) { + res.add(partition.getId()); + } + } catch (AnalysisException e) { + res.add(partition.getId()); + LOG.warn("check isMTMVPartitionSync failed", e); + } + } + return res; + } + + /** + * compare last update time of mtmvPartition and tablePartition + * + * @param mtmv + * @param mtmvPartitionId + * @param relatedTable + * @param relatedTablePartitionId + * @return + * @throws AnalysisException + */ + private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, OlapTable relatedTable, + Long relatedTablePartitionId) throws AnalysisException { + return mtmv.getPartitionOrAnalysisException(mtmvPartitionId).getVisibleVersionTimeIgnoreInit() >= relatedTable + .getPartitionOrAnalysisException(relatedTablePartitionId).getVisibleVersionTimeIgnoreInit(); + } + + /** + * like p_00000101_20170201 + * + * @param desc + * @return + */ + private static String generatePartitionName(PartitionKeyDesc desc) { + String partitionName = "p_"; + partitionName += desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "") + .replaceAll("\\(|\\)|\\,|\\[|\\]", "_"); + if (partitionName.length() > 50) { + partitionName = partitionName.substring(0, 30) + Math.abs(Objects.hash(partitionName)) + + "_" + System.currentTimeMillis(); + } + return partitionName; + } + + /** + * drop partition of mtmv + * + * @param mtmv + * @param partitionId + */ + private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException { + Partition partition = mtmv.getPartitionOrAnalysisException(partitionId); + DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false); + Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause); + } + + /** + * add partition for mtmv like relatedPartitionId of relatedTable + * + * @param mtmv + * @param relatedTable + * @param relatedPartitionId + * @throws AnalysisException + * @throws DdlException + */ + private static void addPartition(MTMV mtmv, OlapTable relatedTable, Long relatedPartitionId) + throws AnalysisException, DdlException { + PartitionDesc partitionDesc = relatedTable.getPartitionInfo().toPartitionDesc(relatedTable); + Partition partition = relatedTable.getPartitionOrAnalysisException(relatedPartitionId); + SinglePartitionDesc oldPartitionDesc = partitionDesc.getSinglePartitionDescByName(partition.getName()); + + Map<String, String> partitionProperties = Maps.newHashMap(); + SinglePartitionDesc singleRangePartitionDesc = new SinglePartitionDesc(true, + generatePartitionName(oldPartitionDesc.getPartitionKeyDesc()), + oldPartitionDesc.getPartitionKeyDesc(), partitionProperties); + + AddPartitionClause addPartitionClause = new AddPartitionClause(singleRangePartitionDesc, + mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false); + Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause); + } + + /** + * compare PartitionItem and return equals partitionId + * if not found, return -1L + * + * @param target + * @param sources + * @return + */ + private static long getExistPartitionId(PartitionItem target, Map<Long, PartitionItem> sources) { + for (Entry<Long, PartitionItem> entry : sources.entrySet()) { + if (target.equals(entry.getValue())) { + return entry.getKey(); + } + } + return -1L; + } + + /** + * Get the maximum update time among all partitions + * + * @param table + * @return + */ + private static long getTableMaxVisibleVersionTime(OlapTable table) { + long result = 0L; + long visibleVersionTime; + for (Partition partition : table.getAllPartitions()) { + visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit(); + if (visibleVersionTime > result) { + result = visibleVersionTime; + } + } + return result; + } + + /** + * Get the minimum update time among all partitions + * + * @param table + * @return + */ + private static long getTableMinVisibleVersionTime(OlapTable table) { + long result = Long.MAX_VALUE; + long visibleVersionTime; + for (Partition partition : table.getAllPartitions()) { + visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit(); + if (visibleVersionTime < result) { + result = visibleVersionTime; + } + } + return result; + } + + /** + * Obtain the partition correspondence between materialized views and base tables + * Currently, there is a one-to-one correspondence between the partitions of materialized views and base tables, + * but for scalability reasons, Set is used + * <p> + * before use this method,should call `alignMvPartition` + * + * @param mtmv + * @param relatedTable + * @return mv.partitionId ==> relatedTable.partitionId + */ + private static Map<Long, Set<Long>> getMvToBasePartitions(MTMV mtmv, OlapTable relatedTable) + throws AnalysisException { + HashMap<Long, Set<Long>> res = Maps.newHashMap(); + Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionInfo().getIdToItem(false); + Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionInfo().getIdToItem(false); + for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) { + long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems); + if (partitionId == -1L) { + throw new AnalysisException("partition not found: " + entry.getValue().toString()); + } + res.put(entry.getKey(), Sets.newHashSet(partitionId)); + } + return res; + } + + /** + * Determine is sync, ignoring excludedTriggerTables and non OlapTanle + * + * @param visibleVersionTime + * @param tables + * @param excludedTriggerTables + * @param gracePeriod + * @return + */ + private static boolean isSync(long visibleVersionTime, Set<BaseTableInfo> tables, + Set<String> excludedTriggerTables, Long gracePeriod) { + long maxAvailableTime = visibleVersionTime + gracePeriod; + for (BaseTableInfo baseTableInfo : tables) { + TableIf table = null; + try { + table = getTable(baseTableInfo); + } catch (AnalysisException e) { + e.printStackTrace(); + return false; + } + if (excludedTriggerTables.contains(table.getName())) { + continue; + } + if (!(table instanceof OlapTable)) { + continue; + } + long tableLastVisibleVersionTime = getTableMaxVisibleVersionTime((OlapTable) table); + if (tableLastVisibleVersionTime > maxAvailableTime) { + return false; + } + } + return true; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 1c4d847e3d1..d0e731b6237 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -30,6 +30,8 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.job.common.IntervalUnit; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.mtmv.MTMVPartitionInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod; import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger; @@ -89,6 +91,7 @@ import org.apache.doris.nereids.DorisParser.GroupingSetContext; import org.apache.doris.nereids.DorisParser.HavingClauseContext; import org.apache.doris.nereids.DorisParser.HintAssignmentContext; import org.apache.doris.nereids.DorisParser.HintStatementContext; +import org.apache.doris.nereids.DorisParser.IdentifierContext; import org.apache.doris.nereids.DorisParser.IdentifierListContext; import org.apache.doris.nereids.DorisParser.IdentifierOrTextContext; import org.apache.doris.nereids.DorisParser.IdentifierSeqContext; @@ -112,7 +115,6 @@ import org.apache.doris.nereids.DorisParser.LogicalNotContext; import org.apache.doris.nereids.DorisParser.MapLiteralContext; import org.apache.doris.nereids.DorisParser.MultiStatementsContext; import org.apache.doris.nereids.DorisParser.MultipartIdentifierContext; -import org.apache.doris.nereids.DorisParser.MvRefreshUnitContext; import org.apache.doris.nereids.DorisParser.NamedExpressionContext; import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext; import org.apache.doris.nereids.DorisParser.NullLiteralContext; @@ -550,10 +552,25 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { comment, desc, properties, logicalPlan, querySql, new MTMVRefreshInfo(buildMode, refreshMethod, refreshTriggerInfo), - ctx.cols == null ? Lists.newArrayList() : visitSimpleColumnDefs(ctx.cols) + ctx.cols == null ? Lists.newArrayList() : visitSimpleColumnDefs(ctx.cols), + visitMTMVPartitionInfo(ctx.partitionKey) )); } + /** + * get MTMVPartitionInfo + * + * @param ctx IdentifierContext + * @return MTMVPartitionInfo + */ + public MTMVPartitionInfo visitMTMVPartitionInfo(IdentifierContext ctx) { + if (ctx == null) { + return new MTMVPartitionInfo(MTMVPartitionType.SELF_MANAGE); + } else { + return new MTMVPartitionInfo(MTMVPartitionType.FOLLOW_BASE_TABLE, ctx.getText()); + } + } + @Override public List<SimpleColumnDefinition> visitSimpleColumnDefs(SimpleColumnDefsContext ctx) { if (ctx == null) { @@ -601,19 +618,32 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { int interval = Integer.parseInt(ctx.INTEGER_VALUE().getText()); String startTime = ctx.STARTS() == null ? null : ctx.STRING_LITERAL().getText().substring(1, ctx.STRING_LITERAL().getText().length() - 1); - IntervalUnit unit = visitMvRefreshUnit(ctx.mvRefreshUnit()); + IntervalUnit unit = visitMvRefreshUnit(ctx.refreshUnit); return new MTMVRefreshSchedule(startTime, interval, unit); } - @Override - public IntervalUnit visitMvRefreshUnit(MvRefreshUnitContext ctx) { - return IntervalUnit.valueOf(ctx.getText().toUpperCase()); + /** + * get IntervalUnit,only enable_job_schedule_second_for_test is true, can use second + * + * @param ctx ctx + * @return IntervalUnit + */ + public IntervalUnit visitMvRefreshUnit(IdentifierContext ctx) { + IntervalUnit intervalUnit = IntervalUnit.fromString(ctx.getText().toUpperCase()); + if (null == intervalUnit) { + throw new AnalysisException("interval time unit can not be " + ctx.getText()); + } + if (intervalUnit.equals(IntervalUnit.SECOND) + && !Config.enable_job_schedule_second_for_test) { + throw new AnalysisException("interval time unit can not be second"); + } + return intervalUnit; } @Override public RefreshMethod visitRefreshMethod(RefreshMethodContext ctx) { if (ctx == null) { - return RefreshMethod.COMPLETE; + return RefreshMethod.AUTO; } return RefreshMethod.valueOf(ctx.getText().toUpperCase()); } @@ -631,7 +661,19 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { @Override public RefreshMTMVCommand visitRefreshMTMV(RefreshMTMVContext ctx) { List<String> nameParts = visitMultipartIdentifier(ctx.mvName); - return new RefreshMTMVCommand(new RefreshMTMVInfo(new TableNameInfo(nameParts))); + List<String> partitions = ImmutableList.of(); + if (ctx.partitionSpec() != null) { + if (ctx.partitionSpec().TEMPORARY() != null) { + throw new AnalysisException("Not allowed to specify TEMPORARY "); + } + if (ctx.partitionSpec().partition != null) { + partitions = ImmutableList.of(ctx.partitionSpec().partition.getText()); + } else { + partitions = visitIdentifierList(ctx.partitionSpec().partitions); + } + } + return new RefreshMTMVCommand(new RefreshMTMVInfo(new TableNameInfo(nameParts), + partitions, ctx.COMPLETE() != null)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index 3139d98f906..483e8e517a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -82,7 +82,7 @@ public abstract class AbstractMaterializedViewRule { queryPlan.getGroupExpression().get().getOwnerGroup().getGroupId())) { continue; } - Plan mvPlan = materializationContext.getMtmv().getMvCache().getLogicalPlan(); + Plan mvPlan = materializationContext.getMtmv().getCache().getLogicalPlan(); List<StructInfo> viewStructInfos = extractStructInfo(mvPlan, cascadesContext); if (viewStructInfos.size() > 1) { // view struct info should only have one diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java index 8061b1835e6..b6c7234d5bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java @@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.mtmv.BaseTableInfo; -import org.apache.doris.mtmv.MTMVCacheManager; +import org.apache.doris.mtmv.MTMVRelationManager; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.PlannerHook; @@ -70,10 +70,10 @@ public class InitMaterializationContextHook implements PlannerHook { } List<BaseTableInfo> baseTableUsed = collectedTables.stream().map(BaseTableInfo::new).collect(Collectors.toList()); - // TODO the logic should be move to MTMVCacheManager later when getAvailableMaterializedView is ready in + // TODO the logic should be move to MTMVRelationManager later when getAvailableMaterializedView is ready in // MV Cache manager Env env = cascadesContext.getConnectContext().getEnv(); - MTMVCacheManager cacheManager = env.getMtmvService().getCacheManager(); + MTMVRelationManager cacheManager = env.getMtmvService().getRelationManager(); Set<BaseTableInfo> materializedViews = new HashSet<>(); for (BaseTableInfo baseTableInfo : baseTableUsed) { Set<BaseTableInfo> mtmvsByBaseTable = cacheManager.getMtmvsByBaseTable(baseTableInfo); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java index 3e1fe99c9c8..4f0b63d2ee7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java @@ -19,7 +19,7 @@ package org.apache.doris.nereids.rules.exploration.mv; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Table; -import org.apache.doris.mtmv.MVCache; +import org.apache.doris.mtmv.MTMVCache; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.GroupId; import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping; @@ -59,17 +59,17 @@ public class MaterializationContext { this.mvScanPlan = mvScanPlan; this.baseTables = baseTables; this.baseViews = baseViews; - MVCache mvCache = mtmv.getMvCache(); + MTMVCache mtmvCache = mtmv.getCache(); // TODO This logic should move to materialized view cache manager - if (mvCache == null) { - mvCache = MVCache.from(mtmv, cascadesContext.getConnectContext()); - mtmv.setMvCache(mvCache); + if (mtmvCache == null) { + mtmvCache = mtmvCache.from(mtmv, cascadesContext.getConnectContext()); + mtmv.setCache(mtmvCache); } // mv output expression shuttle, this will be used to expression rewrite this.mvExprToMvScanExprMapping = ExpressionMapping.generate( ExpressionUtils.shuttleExpressionWithLineage( - mvCache.getMvOutputExpressions(), - mvCache.getLogicalPlan()), + mtmvCache.getMvOutputExpressions(), + mtmvCache.getLogicalPlan()), mvScanPlan.getExpressions()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java index 9aadf0f1cf0..ab9d6e35c9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java @@ -31,7 +31,7 @@ import java.util.Objects; /** * alter multi table materialized view */ -public class AlterMTMVCommand extends Command implements ForwardWithSync { +public class AlterMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback { public static final Logger LOG = LogManager.getLogger(AlterMTMVCommand.class); private final AlterMTMVInfo alterMTMVInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java index c246991e618..814804b8ce9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java @@ -32,7 +32,7 @@ import java.util.Objects; /** * create multi table materialized view */ -public class CreateMTMVCommand extends Command implements ForwardWithSync { +public class CreateMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback { public static final Logger LOG = LogManager.getLogger(CreateMTMVCommand.class); private final CreateMTMVInfo createMTMVInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java index a0d614b163c..19eedb82746 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java @@ -29,7 +29,7 @@ import java.util.Objects; /** * refresh mtmv */ -public class DropMTMVCommand extends Command implements ForwardWithSync { +public class DropMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback { private final DropMTMVInfo dropMTMVInfo; public DropMTMVCommand(DropMTMVInfo dropMTMVInfo) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NotAllowFallback.java similarity index 54% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NotAllowFallback.java index ed2f0f709f4..72d8c82e599 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NotAllowFallback.java @@ -15,51 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; +package org.apache.doris.nereids.trees.plans.commands; /** - * refresh enum + * The class that implements this interface does not allow fallback to OriginalPlanner, + * for example, some new features are not implemented by the old parser */ -public class MTMVRefreshEnum { - - /** - * RefreshMethod - */ - public enum RefreshMethod { - COMPLETE //complete - } - - /** - * BuildMode - */ - public enum BuildMode { - IMMEDIATE, //right now - DEFERRED // deferred - } - - /** - * RefreshTrigger - */ - public enum RefreshTrigger { - MANUAL, //manual - SCHEDULE // schedule - } - - /** - * MTMVState - */ - public enum MTMVState { - INIT, - NORMAL, - SCHEMA_CHANGE - } - - /** - * MTMVRefreshState - */ - public enum MTMVRefreshState { - INIT, - FAIL, - SUCCESS - } +public interface NotAllowFallback { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java index 982e8d86257..a918112555e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java @@ -29,7 +29,7 @@ import java.util.Objects; /** * refresh mtmv */ -public class RefreshMTMVCommand extends Command implements ForwardWithSync { +public class RefreshMTMVCommand extends Command implements ForwardWithSync, NotAllowFallback { private final RefreshMTMVInfo refreshMTMVInfo; public RefreshMTMVCommand(RefreshMTMVInfo refreshMTMVInfo) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index b221e507b82..ae4e048f1c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -17,12 +17,15 @@ package org.apache.doris.nereids.trees.plans.commands; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.parser.NereidsParser; @@ -33,11 +36,13 @@ import org.apache.doris.nereids.trees.expressions.LessThan; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; -import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.util.ExpressionUtils; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -62,38 +67,41 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { * Construct command * * @param mv materialize view - * @param partitions update partitions in mv and tables + * @param partitionIds update partitions in mv and tables * @param tableWithPartKey the partitions key for different table * @return command */ - public static UpdateMvByPartitionCommand from(MTMV mv, Set<PartitionItem> partitions, + public static UpdateMvByPartitionCommand from(MTMV mv, Set<Long> partitionIds, Map<OlapTable, String> tableWithPartKey) { NereidsParser parser = new NereidsParser(); Map<OlapTable, Set<Expression>> predicates = - constructTableWithPredicates(partitions, tableWithPartKey); - List<String> parts = constructPartsForMv(mv, partitions); + constructTableWithPredicates(mv, partitionIds, tableWithPartKey); + List<String> parts = constructPartsForMv(mv, partitionIds); Plan plan = parser.parseSingle(mv.getQuerySql()); plan = plan.accept(new PredicateAdder(), predicates); + if (plan instanceof Sink) { + plan = plan.child(0); + } UnboundTableSink<? extends Plan> sink = new UnboundTableSink<>(mv.getFullQualifiers(), ImmutableList.of(), ImmutableList.of(), parts, plan); return new UpdateMvByPartitionCommand(sink); } - private static List<String> constructPartsForMv(MTMV mv, Set<PartitionItem> partitions) { - return mv.getPartitionNames().stream() - .filter(name -> { - PartitionItem mvPartItem = mv.getPartitionInfo().getItem(mv.getPartition(name).getId()); - return partitions.stream().anyMatch(p -> p.getIntersect(mvPartItem) != null); - }) + private static List<String> constructPartsForMv(MTMV mv, Set<Long> partitionIds) { + return partitionIds.stream() + .map(id -> mv.getPartition(id).getName()) .collect(ImmutableList.toImmutableList()); } - private static Map<OlapTable, Set<Expression>> constructTableWithPredicates(Set<PartitionItem> partitions, - Map<OlapTable, String> tableWithPartKey) { + private static Map<OlapTable, Set<Expression>> constructTableWithPredicates(MTMV mv, + Set<Long> partitionIds, Map<OlapTable, String> tableWithPartKey) { + Set<PartitionItem> items = partitionIds.stream() + .map(id -> mv.getPartitionInfo().getItem(id)) + .collect(ImmutableSet.toImmutableSet()); ImmutableMap.Builder<OlapTable, Set<Expression>> builder = new ImmutableMap.Builder<>(); tableWithPartKey.forEach((table, colName) -> - builder.put(table, constructPredicates(partitions, colName)) + builder.put(table, constructPredicates(items, colName)) ); return builder.build(); } @@ -131,8 +139,15 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { static class PredicateAdder extends DefaultPlanRewriter<Map<OlapTable, Set<Expression>>> { @Override - public Plan visitLogicalOlapScan(LogicalOlapScan scan, Map<OlapTable, Set<Expression>> predicates) { - return new LogicalFilter<>(predicates.get(scan.getTable()), scan); + public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map<OlapTable, Set<Expression>> predicates) { + List<String> tableQualifier = RelationUtil.getQualifierName(ConnectContext.get(), + unboundRelation.getNameParts()); + TableIf table = RelationUtil.getTable(tableQualifier, Env.getCurrentEnv()); + if (table instanceof OlapTable && predicates.containsKey(table)) { + return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))), + unboundRelation); + } + return unboundRelation; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java index 4ea2a3b6982..66bc1b22107 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java @@ -19,28 +19,41 @@ package org.apache.doris.nereids.trees.plans.commands.info; import org.apache.doris.analysis.CreateMTMVStmt; import org.apache.doris.analysis.KeysDesc; +import org.apache.doris.analysis.ListPartitionDesc; +import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.RangePartitionDesc; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.mtmv.EnvInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo; +import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType; +import org.apache.doris.mtmv.MTMVPlanUtil; import org.apache.doris.mtmv.MTMVRefreshInfo; +import org.apache.doris.mtmv.MTMVRelation; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils; +import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo; import org.apache.doris.nereids.trees.TreeNode; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.visitor.NondeterministicFunctionCollector; import org.apache.doris.nereids.trees.plans.visitor.TableCollector; import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext; @@ -56,6 +69,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.stream.Collectors; @@ -78,6 +92,9 @@ public class CreateMTMVInfo { private final List<ColumnDefinition> columns = Lists.newArrayList(); private final List<SimpleColumnDefinition> simpleColumnDefinitions; private final EnvInfo envInfo; + private final MTMVPartitionInfo mvPartitionInfo; + private PartitionDesc partitionDesc; + private MTMVRelation relation; /** * constructor for create MTMV @@ -87,7 +104,8 @@ public class CreateMTMVInfo { DistributionDescriptor distribution, Map<String, String> properties, LogicalPlan logicalQuery, String querySql, MTMVRefreshInfo refreshInfo, - List<SimpleColumnDefinition> simpleColumnDefinitions) { + List<SimpleColumnDefinition> simpleColumnDefinitions, + MTMVPartitionInfo mvPartitionInfo) { this.ifNotExists = Objects.requireNonNull(ifNotExists, "require ifNotExists object"); this.mvName = Objects.requireNonNull(mvName, "require mvName object"); this.keys = Utils.copyRequiredList(keys); @@ -101,6 +119,8 @@ public class CreateMTMVInfo { .requireNonNull(simpleColumnDefinitions, "require simpleColumnDefinitions object"); this.envInfo = new EnvInfo(ConnectContext.get().getCurrentCatalog().getId(), ConnectContext.get().getCurrentDbId()); + this.mvPartitionInfo = Objects + .requireNonNull(mvPartitionInfo, "require mtmvPartitionInfo object"); } /** @@ -154,6 +174,11 @@ public class CreateMTMVInfo { mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, gracePeriod); properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD); } + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { + String excludedTriggerTables = properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES); + mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, excludedTriggerTables); + properties.remove(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES); + } } /** @@ -163,22 +188,93 @@ public class CreateMTMVInfo { // create table as select NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); Plan plan = planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN); + if (plan.anyMatch(node -> node instanceof OneRowRelation)) { + throw new AnalysisException("at least contain one table"); + } + // can not contain VIEW or MTMV analyzeBaseTables(plan); - analyzeExpressions((PhysicalPlan) plan); + // can not contain Random function + analyzeExpressions(planner.getAnalyzedPlan()); + // can not contain partition or tablets + boolean containTableQueryOperator = MaterializedViewUtils.containTableQueryOperator(planner.getAnalyzedPlan()); + if (containTableQueryOperator) { + throw new AnalysisException("can not contain invalid expression"); + } + getRelation(planner); getColumns(plan); + analyzePartition(planner); + } + + private void getRelation(NereidsPlanner planner) { + Plan plan = planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE); + this.relation = MTMVPlanUtil.generateMTMVRelation(plan); + } + + private void analyzePartition(NereidsPlanner planner) { + if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) { + Plan mvRewrittenPlan = + planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN); + Optional<RelatedTableInfo> relatedTableInfo = MaterializedViewUtils + .getRelatedTableInfo(mvPartitionInfo.getPartitionCol(), mvRewrittenPlan); + if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) { + throw new AnalysisException("Unable to find a suitable base table for partitioning"); + } + TableIf followTable = null; + try { + followTable = MTMVUtil.getTable(relatedTableInfo.get().getTableInfo()); + } catch (org.apache.doris.common.AnalysisException e) { + throw new AnalysisException(e.getMessage(), e); + } + if (!(followTable instanceof OlapTable)) { + throw new AnalysisException("base table for partitioning only can be OlapTable."); + } + Set<String> partitionColumnNames; + try { + partitionColumnNames = ((OlapTable) followTable).getPartitionColumnNames(); + } catch (DdlException e) { + throw new AnalysisException(e.getMessage(), e); + } + + if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) { + throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn()); + } + if (partitionColumnNames.size() != 1) { + throw new AnalysisException("base table for partitioning only support single column."); + } + mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo()); + mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn()); + partitionDesc = generatePartitionDesc((OlapTable) followTable); + } + } + + private PartitionDesc generatePartitionDesc(OlapTable relatedTable) { + PartitionType type = relatedTable.getPartitionInfo().getType(); + try { + if (type == PartitionType.RANGE) { + return new RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()), + Lists.newArrayList()); + } else if (type == PartitionType.LIST) { + return new ListPartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()), + Lists.newArrayList()); + } else { + return null; + } + } catch (org.apache.doris.common.AnalysisException e) { + throw new AnalysisException("can not generate partitionDesc", e); + } } private void analyzeBaseTables(Plan plan) { TableCollectorContext collectorContext = - new TableCollector.TableCollectorContext(Sets.newHashSet(TableType.MATERIALIZED_VIEW)); + new TableCollector.TableCollectorContext(Sets.newHashSet(TableType.MATERIALIZED_VIEW, TableType.VIEW)); plan.accept(TableCollector.INSTANCE, collectorContext); List<TableIf> collectedTables = collectorContext.getCollectedTables(); if (!CollectionUtils.isEmpty(collectedTables)) { - throw new AnalysisException("can not contain MATERIALIZED_VIEW"); + throw new AnalysisException("can not contain MATERIALIZED_VIEW or VIEW"); } } - private void analyzeExpressions(PhysicalPlan plan) { + private void analyzeExpressions(Plan plan) { List<TreeNode<Expression>> functionCollectResult = new ArrayList<>(); plan.accept(NondeterministicFunctionCollector.INSTANCE, functionCollectResult); if (!CollectionUtils.isEmpty(functionCollectResult)) { @@ -225,7 +321,8 @@ public class CreateMTMVInfo { .map(ColumnDefinition::translateToCatalogStyle) .collect(Collectors.toList()); return new CreateMTMVStmt(ifNotExists, tableName, catalogColumns, refreshInfo, keysDesc, - distribution.translateToCatalogStyle(), properties, mvProperties, querySql, comment, envInfo); + distribution.translateToCatalogStyle(), properties, mvProperties, querySql, comment, envInfo, + partitionDesc, mvPartitionInfo, relation); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java index 422b9697c7c..e6a4368a850 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java @@ -17,12 +17,22 @@ package org.apache.doris.nereids.trees.plans.commands.info; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; +import org.apache.commons.collections.CollectionUtils; + +import java.util.List; import java.util.Objects; /** @@ -30,13 +40,18 @@ import java.util.Objects; */ public class RefreshMTMVInfo { private final TableNameInfo mvName; + private List<String> partitions; + private boolean isComplete; - public RefreshMTMVInfo(TableNameInfo mvName) { + public RefreshMTMVInfo(TableNameInfo mvName, List<String> partitions, boolean isComplete) { this.mvName = Objects.requireNonNull(mvName, "require mvName object"); + this.partitions = Utils.copyRequiredList(partitions); + this.isComplete = Objects.requireNonNull(isComplete, "require isComplete object"); } /** * analyze refresh info + * * @param ctx ConnectContext */ public void analyze(ConnectContext ctx) { @@ -48,13 +63,41 @@ public class RefreshMTMVInfo { mvName.getDb() + ": " + mvName.getTbl()); throw new AnalysisException(message); } + try { + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb()); + MTMV mtmv = (MTMV) db.getTableOrMetaException(mvName.getTbl(), TableType.MATERIALIZED_VIEW); + if (!CollectionUtils.isEmpty(partitions)) { + MTMVUtil.getPartitionsIdsByNames(mtmv, partitions); + } + } catch (org.apache.doris.common.AnalysisException | MetaNotFoundException | DdlException e) { + throw new AnalysisException(e.getMessage()); + } } /** * getMvName + * * @return TableNameInfo */ public TableNameInfo getMvName() { return mvName; } + + /** + * getPartitions + * + * @return partitionNames + */ + public List<String> getPartitions() { + return partitions; + } + + /** + * isComplete + * + * @return isComplete + */ + public boolean isComplete() { + return isComplete; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 92d1d47e5e5..ff837283056 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -127,6 +127,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; import org.apache.doris.nereids.trees.plans.commands.Forward; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.OlapScanNode; @@ -439,6 +440,14 @@ public class StmtExecutor { execute(queryId); } + public boolean notAllowFallback() { + if (parsedStmt instanceof LogicalPlanAdapter) { + LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + return logicalPlan instanceof NotAllowFallback; + } + return false; + } + public void execute(TUniqueId queryId) throws Exception { SessionVariable sessionVariable = context.getSessionVariable(); if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) { @@ -457,6 +466,10 @@ public class StmtExecutor { // try to fall back to legacy planner LOG.debug("nereids cannot process statement\n" + originStmt.originStmt + "\n because of " + e.getMessage(), e); + if (notAllowFallback()) { + LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); + throw ((NereidsException) e).getException(); + } boolean isInsertIntoCommand = parsedStmt != null && parsedStmt instanceof LogicalPlanAdapter && ((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand; if (e instanceof NereidsException diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index f2a7bccdb00..e8620b105b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -32,6 +32,7 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.common.JobType; import org.apache.doris.job.task.AbstractTask; +import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.external.iceberg.IcebergMetadataCache; import org.apache.doris.qe.ConnectContext; @@ -537,6 +538,8 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setStringVal(mv.getQuerySql())); trow.addToColumnValue(new TCell().setStringVal(mv.getEnvInfo().toString())); trow.addToColumnValue(new TCell().setStringVal(mv.getMvProperties().toString())); + trow.addToColumnValue(new TCell().setStringVal(mv.getMvPartitionInfo().toNameString())); + trow.addToColumnValue(new TCell().setBoolVal(MTMVUtil.isMTMVSync(mv))); dataBatch.add(trow); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java index af0efcc1ce5..44e3ed58400 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java @@ -54,7 +54,9 @@ public class MvInfosTableValuedFunction extends MetadataTableValuedFunction { new Column("RefreshInfo", ScalarType.createStringType()), new Column("QuerySql", ScalarType.createStringType()), new Column("EnvInfo", ScalarType.createStringType()), - new Column("MvProperties", ScalarType.createStringType())); + new Column("MvProperties", ScalarType.createStringType()), + new Column("MvPartitionInfo", ScalarType.createStringType()), + new Column("SyncWithBaseTables", ScalarType.createType(PrimitiveType.BOOLEAN))); private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org