This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f3c5442605 [feature](mtmv)mtmv support partition (#28144)
0f3c5442605 is described below

commit 0f3c5442605305aae0e50bc4ffe7c66fdd33be5a
Author: zhangdong <493738...@qq.com>
AuthorDate: Sun Dec 17 18:28:03 2023 +0800

    [feature](mtmv)mtmv support partition (#28144)
    
    - create MTMV support partition and `AUTO` refresh method
    - refresh mtmv support support specified partitions
    - MTMV support incremental updates
    - add property `EXCLUDED_TRIGGER_TABLES` for mv
    - Maintain MTMVCache after successful task refresh for plan 
rewrite(MTMV.getOrGenerateCache)
    - show partitions add "SyncWithBaseTables"
    - drop job before drop MTMV
    - task tvf add 
"MvId,MvDatabaseId,ErrorMsg,TaskContext,RefreshMode,RefreshPartitions"
    - add `NotAllowFallback` for mtmv not fallback to old planner
    - add `MTMVUtils.getMTMVCanRewritePartitions() `and 
`Env.getCurrentEnv().getMtmvService().getRelationManager().getAvailableMTMVs()` 
for plan rewrite
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  11 +-
 .../org/apache/doris/analysis/CreateMTMVStmt.java  |  18 +-
 .../main/java/org/apache/doris/catalog/MTMV.java   |  56 ++-
 .../java/org/apache/doris/catalog/OlapTable.java   |  22 +
 .../org/apache/doris/catalog/OlapTableFactory.java |  22 +
 .../java/org/apache/doris/catalog/Partition.java   |  12 +
 .../org/apache/doris/catalog/PartitionInfo.java    |  11 +
 .../doris/common/proc/PartitionsProcDir.java       |  20 +-
 .../apache/doris/common/util/PropertyAnalyzer.java |   1 +
 .../apache/doris/datasource/InternalCatalog.java   |   6 +-
 .../apache/doris/job/extensions/mtmv/MTMVJob.java  |  35 +-
 .../apache/doris/job/extensions/mtmv/MTMVTask.java | 175 ++++++--
 .../doris/job/extensions/mtmv/MTMVTaskContext.java |  58 +++
 .../org/apache/doris/job/task/AbstractTask.java    |   1 +
 .../java/org/apache/doris/mtmv/BaseTableInfo.java  |  14 +
 .../doris/mtmv/{MVCache.java => MTMVCache.java}    |  10 +-
 .../java/org/apache/doris/mtmv/MTMVJobManager.java |   9 +-
 .../org/apache/doris/mtmv/MTMVPartitionInfo.java   | 110 +++++
 .../java/org/apache/doris/mtmv/MTMVPlanUtil.java   | 108 +++++
 .../org/apache/doris/mtmv/MTMVRefreshEnum.java     |   3 +-
 ...VCacheManager.java => MTMVRelationManager.java} | 150 +------
 .../java/org/apache/doris/mtmv/MTMVService.java    |  17 +-
 .../main/java/org/apache/doris/mtmv/MTMVUtil.java  | 489 +++++++++++++++++++++
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  58 ++-
 .../mv/AbstractMaterializedViewRule.java           |   2 +-
 .../mv/InitMaterializationContextHook.java         |   6 +-
 .../exploration/mv/MaterializationContext.java     |  14 +-
 .../trees/plans/commands/AlterMTMVCommand.java     |   2 +-
 .../trees/plans/commands/CreateMTMVCommand.java    |   2 +-
 .../trees/plans/commands/DropMTMVCommand.java      |   2 +-
 .../trees/plans/commands/NotAllowFallback.java}    |  48 +-
 .../trees/plans/commands/RefreshMTMVCommand.java   |   2 +-
 .../plans/commands/UpdateMvByPartitionCommand.java |  47 +-
 .../trees/plans/commands/info/CreateMTMVInfo.java  | 111 ++++-
 .../trees/plans/commands/info/RefreshMTMVInfo.java |  45 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  13 +
 .../doris/tablefunction/MetadataGenerator.java     |   3 +
 .../tablefunction/MvInfosTableValuedFunction.java  |   4 +-
 38 files changed, 1410 insertions(+), 307 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 674e4550976..be683f5ee4b 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -90,10 +90,11 @@ statement
         (REFRESH refreshMethod? refreshTrigger?)?
         (KEY keys=identifierList)?
         (COMMENT STRING_LITERAL)?
+        (PARTITION BY LEFT_PAREN partitionKey = identifier RIGHT_PAREN)?
         (DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM) (BUCKETS 
(INTEGER_VALUE | AUTO))?)?
         propertyClause?
         AS query                                                        
#createMTMV
-    | REFRESH MATERIALIZED VIEW mvName=multipartIdentifier              
#refreshMTMV
+    | REFRESH MATERIALIZED VIEW mvName=multipartIdentifier (partitionSpec | 
COMPLETE)?      #refreshMTMV
     | ALTER MATERIALIZED VIEW mvName=multipartIdentifier ((RENAME 
newName=identifier)
        | (REFRESH (refreshMethod | refreshTrigger | refreshMethod 
refreshTrigger))
        | (SET  LEFT_PAREN fileProperties=propertyItemList RIGHT_PAREN))   
#alterMTMV
@@ -158,15 +159,11 @@ refreshTrigger
     ;
 
 refreshSchedule
-    : EVERY INTEGER_VALUE mvRefreshUnit (STARTS STRING_LITERAL)?
-    ;
-
-mvRefreshUnit
-    : SECOND | MINUTE | HOUR | DAY | WEEK
+    : EVERY INTEGER_VALUE refreshUnit = identifier (STARTS STRING_LITERAL)?
     ;
 
 refreshMethod
-    : COMPLETE
+    : COMPLETE | AUTO
     ;
 
 identifierOrText
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
index bb2efdd6281..9421bb047c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
@@ -20,7 +20,9 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.mtmv.EnvInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
 import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRelation;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -31,17 +33,21 @@ public class CreateMTMVStmt extends CreateTableStmt {
     private final String querySql;
     private final EnvInfo envInfo;
     private Map<String, String> mvProperties;
+    private MTMVPartitionInfo mvPartitionInfo;
+    private MTMVRelation relation;
 
     public CreateMTMVStmt(boolean ifNotExists, TableName mvName, List<Column> 
columns,
             MTMVRefreshInfo refreshInfo, KeysDesc keyDesc, DistributionDesc 
distributionDesc,
             Map<String, String> properties, Map<String, String> mvProperties, 
String querySql, String comment,
-            EnvInfo envInfo) {
-        super(ifNotExists, false, mvName, columns, new ArrayList<Index>(), 
DEFAULT_ENGINE_NAME, keyDesc, null,
+            EnvInfo envInfo, PartitionDesc partitionDesc, MTMVPartitionInfo 
mvPartitionInfo, MTMVRelation relation) {
+        super(ifNotExists, false, mvName, columns, new ArrayList<Index>(), 
DEFAULT_ENGINE_NAME, keyDesc, partitionDesc,
                 distributionDesc, properties, null, comment, null, null);
         this.refreshInfo = refreshInfo;
         this.querySql = querySql;
         this.envInfo = envInfo;
         this.mvProperties = mvProperties;
+        this.mvPartitionInfo = mvPartitionInfo;
+        this.relation = relation;
     }
 
     public MTMVRefreshInfo getRefreshInfo() {
@@ -59,4 +65,12 @@ public class CreateMTMVStmt extends CreateTableStmt {
     public Map<String, String> getMvProperties() {
         return mvProperties;
     }
+
+    public MTMVPartitionInfo getMvPartitionInfo() {
+        return mvPartitionInfo;
+    }
+
+    public MTMVRelation getRelation() {
+        return relation;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
index af0691d94fb..a2c87581fc8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
@@ -18,21 +18,25 @@
 package org.apache.doris.catalog;
 
 import org.apache.doris.catalog.OlapTableFactory.MTMVParams;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.job.common.TaskStatus;
 import org.apache.doris.job.extensions.mtmv.MTMVTask;
 import org.apache.doris.mtmv.EnvInfo;
+import org.apache.doris.mtmv.MTMVCache;
 import org.apache.doris.mtmv.MTMVJobInfo;
 import org.apache.doris.mtmv.MTMVJobManager;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVPlanUtil;
 import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
 import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
 import org.apache.doris.mtmv.MTMVRefreshInfo;
 import org.apache.doris.mtmv.MTMVRelation;
 import org.apache.doris.mtmv.MTMVStatus;
-import org.apache.doris.mtmv.MVCache;
 import org.apache.doris.persist.gson.GsonUtils;
 
+import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -41,6 +45,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
@@ -62,8 +67,10 @@ public class MTMV extends OlapTable {
     private Map<String, String> mvProperties;
     @SerializedName("r")
     private MTMVRelation relation;
-    // Should update after every fresh
-    private MVCache mvCache;
+    @SerializedName("mpi")
+    private MTMVPartitionInfo mvPartitionInfo;
+    // Should update after every fresh, not persist
+    private MTMVCache cache;
 
     // For deserialization
     public MTMV() {
@@ -87,6 +94,8 @@ public class MTMV extends OlapTable {
         this.status = new MTMVStatus();
         this.jobInfo = new MTMVJobInfo(MTMVJobManager.MTMV_JOB_PREFIX + 
params.tableId);
         this.mvProperties = params.mvProperties;
+        this.mvPartitionInfo = params.mvPartitionInfo;
+        this.relation = params.relation;
         mvRwLock = new ReentrantReadWriteLock(true);
     }
 
@@ -119,12 +128,12 @@ public class MTMV extends OlapTable {
         return relation;
     }
 
-    public MVCache getMvCache() {
-        return mvCache;
+    public MTMVCache getCache() {
+        return cache;
     }
 
-    public void setMvCache(MVCache mvCache) {
-        this.mvCache = mvCache;
+    public void setCache(MTMVCache cache) {
+        this.cache = cache;
     }
 
     public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) {
@@ -148,6 +157,12 @@ public class MTMV extends OlapTable {
                 this.status.setSchemaChangeDetail(null);
                 this.status.setRefreshState(MTMVRefreshState.SUCCESS);
                 this.relation = relation;
+                try {
+                    this.cache = MTMVCache.from(this, 
MTMVPlanUtil.createMTMVContext(this));
+                } catch (Throwable e) {
+                    this.cache = null;
+                    LOG.warn("generate cache failed", e);
+                }
             } else {
                 this.status.setRefreshState(MTMVRefreshState.FAIL);
             }
@@ -170,10 +185,36 @@ public class MTMV extends OlapTable {
         }
     }
 
+    public Set<String> getExcludedTriggerTables() {
+        if 
(!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES))
 {
+            return Sets.newHashSet();
+        }
+        String[] split = 
mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
+        return Sets.newHashSet(split);
+    }
+
+    public MTMVCache getOrGenerateCache() throws AnalysisException {
+        if (cache == null) {
+            writeMvLock();
+            try {
+                if (cache == null) {
+                    this.cache = MTMVCache.from(this, 
MTMVPlanUtil.createMTMVContext(this));
+                }
+            } finally {
+                writeMvUnlock();
+            }
+        }
+        return cache;
+    }
+
     public Map<String, String> getMvProperties() {
         return mvProperties;
     }
 
+    public MTMVPartitionInfo getMvPartitionInfo() {
+        return mvPartitionInfo;
+    }
+
     public void readMvLock() {
         this.mvRwLock.readLock().lock();
     }
@@ -207,6 +248,7 @@ public class MTMV extends OlapTable {
         jobInfo = materializedView.jobInfo;
         mvProperties = materializedView.mvProperties;
         relation = materializedView.relation;
+        mvPartitionInfo = materializedView.mvPartitionInfo;
     }
 
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 05a12ed387e..0ce2df5e6f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -947,6 +947,17 @@ public class OlapTable extends Table {
         return getPartition(partitionName, true);
     }
 
+    public Partition getPartitionOrAnalysisException(String partitionName) 
throws AnalysisException {
+        Partition partition = getPartition(partitionName, false);
+        if (partition == null) {
+            partition = getPartition(partitionName, true);
+        }
+        if (partition == null) {
+            throw new AnalysisException("partition not found: " + 
partitionName);
+        }
+        return partition;
+    }
+
     // get partition by name
     public Partition getPartition(String partitionName, boolean 
isTempPartition) {
         if (isTempPartition) {
@@ -965,6 +976,17 @@ public class OlapTable extends Table {
         return partition;
     }
 
+    public Partition getPartitionOrAnalysisException(long partitionId) throws 
AnalysisException {
+        Partition partition = idToPartition.get(partitionId);
+        if (partition == null) {
+            partition = tempPartitions.getPartition(partitionId);
+        }
+        if (partition == null) {
+            throw new AnalysisException("partition not found: " + partitionId);
+        }
+        return partition;
+    }
+
     // select the non-empty partition ids belonging to this table.
     //
     // ATTN: partitions not belonging to this table will be filtered.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
index af87ec47991..7d11ed7bdd9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
@@ -22,7 +22,9 @@ import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DdlStmt;
 import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.mtmv.EnvInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
 import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRelation;
 
 import com.google.common.base.Preconditions;
 
@@ -49,6 +51,8 @@ public class OlapTableFactory {
         public EnvInfo envInfo;
         public String querySql;
         public Map<String, String> mvProperties;
+        public MTMVPartitionInfo mvPartitionInfo;
+        public MTMVRelation relation;
     }
 
     private BuildParams params;
@@ -158,6 +162,22 @@ public class OlapTableFactory {
         return this;
     }
 
+    private OlapTableFactory withMvPartitionInfo(MTMVPartitionInfo 
mvPartitionInfo) {
+        Preconditions.checkState(params instanceof MTMVParams, "Invalid 
argument for "
+                + params.getClass().getSimpleName());
+        MTMVParams mtmvParams = (MTMVParams) params;
+        mtmvParams.mvPartitionInfo = mvPartitionInfo;
+        return this;
+    }
+
+    private OlapTableFactory withMvRelation(MTMVRelation relation) {
+        Preconditions.checkState(params instanceof MTMVParams, "Invalid 
argument for "
+                + params.getClass().getSimpleName());
+        MTMVParams mtmvParams = (MTMVParams) params;
+        mtmvParams.relation = relation;
+        return this;
+    }
+
     public OlapTableFactory withExtraParams(DdlStmt stmt) {
         boolean isMaterializedView = stmt instanceof CreateMTMVStmt;
         if (!isMaterializedView) {
@@ -168,6 +188,8 @@ public class OlapTableFactory {
             return withRefreshInfo(createMTMVStmt.getRefreshInfo())
                     .withQuerySql(createMTMVStmt.getQuerySql())
                     .withMvProperties(createMTMVStmt.getMvProperties())
+                    .withMvPartitionInfo(createMTMVStmt.getMvPartitionInfo())
+                    .withMvRelation(createMTMVStmt.getRelation())
                     .withEnvInfo(createMTMVStmt.getEnvInfo());
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 3a1905bd8e7..76c3097a033 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -168,6 +168,18 @@ public class Partition extends MetaObject implements 
Writable {
         return visibleVersionTime;
     }
 
+    /**
+     * if visibleVersion is 1, do not return creation time but 0
+     *
+     * @return
+     */
+    public long getVisibleVersionTimeIgnoreInit() {
+        if (visibleVersion == 1) {
+            return 0L;
+        }
+        return visibleVersionTime;
+    }
+
     // The method updateVisibleVersionAndVersionHash is called when fe 
restart, the visibleVersionTime is updated
     private void setVisibleVersion(long visibleVersion) {
         this.visibleVersion = visibleVersion;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
index 721bf0ebaef..e61a7a1070f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java
@@ -147,6 +147,17 @@ public class PartitionInfo implements Writable {
         return item;
     }
 
+    public PartitionItem getItemOrAnalysisException(long partitionId) throws 
AnalysisException {
+        PartitionItem item = idToItem.get(partitionId);
+        if (item == null) {
+            item = idToTempItem.get(partitionId);
+        }
+        if (item == null) {
+            throw new AnalysisException("PartitionItem not found: " + 
partitionId);
+        }
+        return item;
+    }
+
     public void setItem(long partitionId, boolean isTemp, PartitionItem item) {
         setItemInternal(partitionId, isTemp, item);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
index 9319c96e040..01feaf23683 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
 import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionInfo;
@@ -37,17 +38,20 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ListComparator;
 import org.apache.doris.common.util.OrderByPair;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.mtmv.MTMVUtil;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -68,7 +72,7 @@ public class PartitionsProcDir implements ProcDirInterface {
             
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
             
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime").add("RemoteStoragePolicy")
             
.add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation")
-            .add("IsMutable")
+            .add("IsMutable").add("SyncWithBaseTables").add("UnsyncTables")
             .build();
 
     private Database db;
@@ -303,6 +307,20 @@ public class PartitionsProcDir implements ProcDirInterface 
{
                 
partitionInfo.add(tblPartitionInfo.getReplicaAllocation(partitionId).toCreateStmt());
 
                 partitionInfo.add(tblPartitionInfo.getIsMutable(partitionId));
+                if (olapTable instanceof MTMV) {
+                    try {
+                        List<String> partitionUnSyncTables = MTMVUtil
+                                .getPartitionUnSyncTables((MTMV) olapTable, 
partitionId);
+                        
partitionInfo.add(CollectionUtils.isEmpty(partitionUnSyncTables));
+                        partitionInfo.add(partitionUnSyncTables.toString());
+                    } catch (AnalysisException e) {
+                        partitionInfo.add(false);
+                        partitionInfo.add(e.getMessage());
+                    }
+                } else {
+                    partitionInfo.add(true);
+                    partitionInfo.add(FeConstants.null_string);
+                }
 
                 partitionInfos.add(partitionInfo);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 75dc07e915d..32ba1c8306d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -158,6 +158,7 @@ public class PropertyAnalyzer {
     public static final String 
PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT =
             "enable_duplicate_without_keys_by_default";
     public static final String PROPERTIES_GRACE_PERIOD = "grace_period";
+    public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = 
"excluded_trigger_tables";
     // For unique key data model, the feature Merge-on-Write will leverage a 
primary
     // key index and a delete-bitmap to mark duplicate keys as deleted in load 
stage,
     // which can avoid the merging cost in read stage, and accelerate the 
aggregation
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 78b02a477f7..8d8e76c3cf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -886,6 +886,9 @@ public class InternalCatalog implements CatalogIf<Database> 
{
                                 + " please use \"DROP table FORCE\".");
                     }
                 }
+                if (table.getType() == TableType.MATERIALIZED_VIEW) {
+                    Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) 
table);
+                }
                 unprotectDropTable(db, table, stmt.isForceDrop(), false, 0);
                 if (!stmt.isForceDrop()) {
                     recycleTime = 
Env.getCurrentRecycleBin().getRecycleTimeById(table.getId());
@@ -898,9 +901,6 @@ public class InternalCatalog implements CatalogIf<Database> 
{
             
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
                     db.getId(), table.getId());
             
Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());
-            if (table.getType() == TableType.MATERIALIZED_VIEW) {
-                Env.getCurrentEnv().getMtmvService().dropMTMV((MTMV) table);
-            }
             Env.getCurrentEnv().getMtmvService().dropTable(table);
         } catch (UserException e) {
             throw new DdlException(e.getMessage(), e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
index 6321679b6b2..5ee9b43fe1f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.TaskType;
+import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ShowResultSetMetaData;
 import org.apache.doris.thrift.TCell;
@@ -39,7 +40,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.gson.annotations.SerializedName;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -47,9 +47,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
-public class MTMVJob extends AbstractJob<MTMVTask, Map> {
+public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
     private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
     private static final ShowResultSetMetaData JOB_META_DATA =
             ShowResultSetMetaData.builder()
@@ -98,6 +97,9 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> {
     @SerializedName(value = "mi")
     private long mtmvId;
 
+    public MTMVJob() {
+    }
+
     public MTMVJob(long dbId, long mtmvId) {
         this.dbId = dbId;
         this.mtmvId = mtmvId;
@@ -110,8 +112,11 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> {
     }
 
     @Override
-    public List<MTMVTask> createTasks(TaskType taskType, Map taskContext) {
-        MTMVTask task = new MTMVTask(dbId, mtmvId);
+    public List<MTMVTask> createTasks(TaskType taskType, MTMVTaskContext 
taskContext) {
+        if (taskContext == null) {
+            taskContext = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM);
+        }
+        MTMVTask task = new MTMVTask(dbId, mtmvId, taskContext);
         task.setTaskType(taskType);
         ArrayList<MTMVTask> tasks = new ArrayList<>();
         tasks.add(task);
@@ -119,9 +124,25 @@ public class MTMVJob extends AbstractJob<MTMVTask, Map> {
         return tasks;
     }
 
+    /**
+     * if user trigger, return true
+     * if system trigger, Check if there are any system triggered tasks, and 
if so, return false
+     *
+     * @param taskContext
+     * @return
+     */
     @Override
-    public boolean isReadyForScheduling(Map taskContext) {
-        return CollectionUtils.isEmpty(getRunningTasks());
+    public boolean isReadyForScheduling(MTMVTaskContext taskContext) {
+        if (taskContext != null) {
+            return true;
+        }
+        List<MTMVTask> runningTasks = getRunningTasks();
+        for (MTMVTask task : runningTasks) {
+            if (task.getTaskContext() == null || 
task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) {
+                return false;
+            }
+        }
+        return true;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 804113d9892..8b2d2835127 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -17,24 +17,26 @@
 
 package org.apache.doris.job.extensions.mtmv;
 
-import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.TableIf.TableType;
-import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
-import org.apache.doris.mtmv.MTMVCacheManager;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVPlanUtil;
+import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
 import org.apache.doris.mtmv.MTMVRelation;
-import org.apache.doris.mysql.privilege.Auth;
+import org.apache.doris.mtmv.MTMVUtil;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import 
org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
 import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.StmtExecutor;
@@ -44,10 +46,18 @@ import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
 import com.google.gson.annotations.SerializedName;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 
 public class MTMVTask extends AbstractTask {
@@ -58,12 +68,17 @@ public class MTMVTask extends AbstractTask {
             new Column("TaskId", ScalarType.createStringType()),
             new Column("JobId", ScalarType.createStringType()),
             new Column("JobName", ScalarType.createStringType()),
+            new Column("MvId", ScalarType.createStringType()),
+            new Column("MvDatabaseId", ScalarType.createStringType()),
             new Column("Status", ScalarType.createStringType()),
+            new Column("ErrorMsg", ScalarType.createStringType()),
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("StartTime", ScalarType.createStringType()),
             new Column("FinishTime", ScalarType.createStringType()),
             new Column("DurationMs", ScalarType.createStringType()),
-            new Column("ExecuteSql", ScalarType.createStringType()));
+            new Column("TaskContext", ScalarType.createStringType()),
+            new Column("RefreshMode", ScalarType.createStringType()),
+            new Column("RefreshPartitions", ScalarType.createStringType()));
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
@@ -75,34 +90,66 @@ public class MTMVTask extends AbstractTask {
         COLUMN_TO_INDEX = builder.build();
     }
 
+    public enum MTMVTaskTriggerMode {
+        MANUAL,
+        SYSTEM
+    }
+
+    public enum MTMVTaskRefreshMode {
+        COMPLETE,
+        PARTITION,
+        NOT_REFRESH
+    }
+
     @SerializedName(value = "di")
     private long dbId;
     @SerializedName(value = "mi")
     private long mtmvId;
-    @SerializedName("sql")
-    private String sql;
+    @SerializedName("taskContext")
+    private MTMVTaskContext taskContext;
+    @SerializedName("refreshPartitions")
+    List<String> refreshPartitions;
+    @SerializedName("refreshMode")
+    MTMVTaskRefreshMode refreshMode;
 
     private MTMV mtmv;
     private MTMVRelation relation;
     private StmtExecutor executor;
+    private Set<Long> refreshPartitionIds = Sets.newHashSet();
+
+    public MTMVTask() {
+    }
 
-    public MTMVTask(long dbId, long mtmvId) {
-        this.dbId = dbId;
-        this.mtmvId = mtmvId;
+    public MTMVTask(long dbId, long mtmvId, MTMVTaskContext taskContext) {
+        this.dbId = Objects.requireNonNull(dbId);
+        this.mtmvId = Objects.requireNonNull(mtmvId);
+        this.taskContext = Objects.requireNonNull(taskContext);
     }
 
     @Override
     public void run() throws JobException {
         try {
-            ConnectContext ctx = createContext();
+            ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
             TUniqueId queryId = generateQueryId();
             // Every time a task is run, the relation is regenerated because 
baseTables and baseViews may change,
             // such as deleting a table and creating a view with the same name
-            relation = MTMVCacheManager.generateMTMVRelation(mtmv, ctx);
-            executor = new StmtExecutor(ctx, sql);
-            executor.execute(queryId);
+            relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
+            calculateRefreshInfo();
+            Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
+            if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
+                return;
+            } else if (refreshMode == MTMVTaskRefreshMode.PARTITION) {
+                OlapTable relatedTable = (OlapTable) 
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+                tableWithPartKey.put(relatedTable, 
mtmv.getMvPartitionInfo().getRelatedCol());
+            }
+            refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, 
refreshPartitionIds);
+            UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
+                    .from(mtmv, refreshPartitionIds, tableWithPartKey);
+            executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, 
ctx.getStatementContext()));
+            ctx.setQueryId(queryId);
+            command.run(ctx, executor);
         } catch (Throwable e) {
-            LOG.warn(e);
+            LOG.warn("run task failed: ", e);
             throw new JobException(e);
         }
     }
@@ -134,9 +181,12 @@ public class MTMVTask extends AbstractTask {
         try {
             Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
             mtmv = (MTMV) db.getTableOrMetaException(mtmvId, 
TableType.MATERIALIZED_VIEW);
-            sql = generateSql(mtmv);
+            if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+                OlapTable relatedTable = (OlapTable) 
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+                MTMVUtil.alignMvPartition(mtmv, relatedTable);
+            }
         } catch (UserException e) {
-            LOG.warn(e);
+            LOG.warn("before task failed:", e);
             throw new JobException(e);
         }
     }
@@ -147,46 +197,27 @@ public class MTMVTask extends AbstractTask {
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(super.getTaskId())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(super.getJobId())));
         trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
+        trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(mtmvId)));
+        trow.addToColumnValue(new TCell().setStringVal(String.valueOf(dbId)));
         trow.addToColumnValue(new TCell()
                 .setStringVal(super.getStatus() == null ? 
FeConstants.null_string : super.getStatus().toString()));
+        trow.addToColumnValue(new TCell().setStringVal(super.getErrMsg()));
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(super.getCreateTimeMs())));
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(super.getStartTimeMs())));
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(super.getFinishTimeMs())));
         trow.addToColumnValue(new TCell().setStringVal(
                 (super.getFinishTimeMs() == null || super.getFinishTimeMs() == 
0) ? FeConstants.null_string
                         : String.valueOf(super.getFinishTimeMs() - 
super.getStartTimeMs())));
-        trow.addToColumnValue(new TCell().setStringVal(sql));
+        trow.addToColumnValue(new TCell()
+                .setStringVal(taskContext == null ? FeConstants.null_string : 
new Gson().toJson(taskContext)));
+        trow.addToColumnValue(
+                new TCell().setStringVal(refreshMode == null ? 
FeConstants.null_string : refreshMode.toString()));
+        trow.addToColumnValue(
+                new TCell().setStringVal(
+                        refreshPartitions == null ? FeConstants.null_string : 
new Gson().toJson(refreshPartitions)));
         return trow;
     }
 
-    private static String generateSql(MTMV mtmv) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("INSERT OVERWRITE TABLE ");
-        builder.append(mtmv.getDatabase().getCatalog().getName());
-        builder.append(".");
-        
builder.append(ClusterNamespace.getNameFromFullName(mtmv.getQualifiedDbName()));
-        builder.append(".");
-        builder.append(mtmv.getName());
-        builder.append(" ");
-        builder.append(mtmv.getQuerySql());
-        return builder.toString();
-    }
-
-    private ConnectContext createContext() throws AnalysisException {
-        ConnectContext ctx = new ConnectContext();
-        ctx.setEnv(Env.getCurrentEnv());
-        ctx.setQualifiedUser(Auth.ADMIN_USER);
-        ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
-        ctx.getState().reset();
-        ctx.setThreadLocalInfo();
-        CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()
-                .getCatalogOrAnalysisException(mtmv.getEnvInfo().getCtlId());
-        ctx.changeDefaultCatalog(catalog.getName());
-        
ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName());
-        ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
-        return ctx;
-    }
-
     private TUniqueId generateQueryId() {
         UUID taskId = UUID.randomUUID();
         return new TUniqueId(taskId.getMostSignificantBits(), 
taskId.getLeastSignificantBits());
@@ -198,5 +229,55 @@ public class MTMVTask extends AbstractTask {
         mtmv = null;
         relation = null;
         executor = null;
+        refreshPartitionIds = null;
+    }
+
+    private void calculateRefreshInfo() throws AnalysisException {
+        // check whether the user manually triggers it
+        if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
+            if (taskContext.isComplete()) {
+                this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
+                return;
+            } else if (!CollectionUtils
+                    .isEmpty(taskContext.getPartitions())) {
+                this.refreshMode = MTMVTaskRefreshMode.PARTITION;
+                this.refreshPartitionIds = 
MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
+                return;
+            }
+        }
+        // check if data is fresh
+        Set<String> excludedTriggerTables = mtmv.getExcludedTriggerTables();
+        boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), 
excludedTriggerTables, 0L);
+        if (fresh) {
+            this.refreshMode = MTMVTaskRefreshMode.NOT_REFRESH;
+            return;
+        }
+        // current, if partitionType is SELF_MANAGE, we can only FULL refresh
+        if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.SELF_MANAGE) {
+            this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
+            return;
+        }
+        // if refreshMethod is COMPLETE, we only FULL refresh
+        if (mtmv.getRefreshInfo().getRefreshMethod() == 
RefreshMethod.COMPLETE) {
+            this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
+            return;
+        }
+        OlapTable relatedTable = (OlapTable) 
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+        excludedTriggerTables.add(relatedTable.getName());
+        // check if every table except relatedTable is fresh
+        Set<Long> mtmvNeedRefreshPartitions = 
MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
+        // if true, we can use `Partition`, otherwise must `FULL`
+        if (mtmvNeedRefreshPartitions.size() != mtmv.getPartitionNum()) {
+            this.refreshMode = MTMVTaskRefreshMode.PARTITION;
+            this.refreshPartitionIds = mtmvNeedRefreshPartitions;
+            return;
+        } else {
+            this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
+            return;
+        }
+    }
+
+    public MTMVTaskContext getTaskContext() {
+        return taskContext;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java
new file mode 100644
index 00000000000..adb6fd5ef71
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTaskContext.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.mtmv;
+
+import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.List;
+
+public class MTMVTaskContext {
+
+    @SerializedName(value = "triggerMode")
+    private MTMVTaskTriggerMode triggerMode;
+
+    @SerializedName(value = "partitions")
+    private List<String> partitions;
+
+    @SerializedName(value = "isComplete")
+    private boolean isComplete;
+
+    public MTMVTaskContext(MTMVTaskTriggerMode triggerMode) {
+        this.triggerMode = triggerMode;
+    }
+
+    public MTMVTaskContext(MTMVTaskTriggerMode triggerMode, List<String> 
partitions, boolean isComplete) {
+        this.triggerMode = triggerMode;
+        this.partitions = partitions;
+        this.isComplete = isComplete;
+    }
+
+    public List<String> getPartitions() {
+        return partitions;
+    }
+
+    public MTMVTaskTriggerMode getTriggerMode() {
+        return triggerMode;
+    }
+
+    public boolean isComplete() {
+        return isComplete;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index 654ee4fbc2f..f887961230f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -120,6 +120,7 @@ public abstract class AbstractTask implements Task {
             run();
             onSuccess();
         } catch (Exception e) {
+            this.errMsg = e.getMessage();
             onFail();
             log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
index 575552cb61e..9b3b6be04f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java
@@ -19,13 +19,18 @@ package org.apache.doris.mtmv;
 
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.InternalCatalog;
 
 import com.google.common.base.Objects;
 import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class BaseTableInfo {
+    private static final Logger LOG = 
LogManager.getLogger(BaseTableInfo.class);
+
     @SerializedName("ti")
     private Long tableId;
     @SerializedName("di")
@@ -88,4 +93,13 @@ public class BaseTableInfo {
                 + ", ctlId=" + ctlId
                 + '}';
     }
+
+    public String getTableName() {
+        try {
+            return MTMVUtil.getTable(this).getName();
+        } catch (AnalysisException e) {
+            LOG.warn("can not get table: " + this);
+            return "";
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
similarity index 90%
rename from fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java
rename to fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
index b5cf92f87e4..e4aa36d9b61 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MVCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
@@ -36,14 +36,14 @@ import java.util.stream.Collectors;
 /**
  * The cache for materialized view cache
  */
-public class MVCache {
+public class MTMVCache {
 
     // the materialized view plan which should be optimized by the same rules 
to query
     private final Plan logicalPlan;
     // this should be shuttle expression with lineage
     private final List<NamedExpression> mvOutputExpressions;
 
-    public MVCache(MTMV materializedView, Plan logicalPlan, 
List<NamedExpression> mvOutputExpressions) {
+    public MTMVCache(MTMV materializedView, Plan logicalPlan, 
List<NamedExpression> mvOutputExpressions) {
         this.logicalPlan = logicalPlan;
         this.mvOutputExpressions = mvOutputExpressions;
     }
@@ -56,12 +56,12 @@ public class MVCache {
         return mvOutputExpressions;
     }
 
-    public MVCache(Plan logicalPlan, List<NamedExpression> 
mvOutputExpressions) {
+    public MTMVCache(Plan logicalPlan, List<NamedExpression> 
mvOutputExpressions) {
         this.logicalPlan = logicalPlan;
         this.mvOutputExpressions = mvOutputExpressions;
     }
 
-    public static MVCache from(MTMV mtmv, ConnectContext connectContext) {
+    public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
         LogicalPlan unboundMvPlan = new 
NereidsParser().parseSingle(mtmv.getQuerySql());
         // TODO: connect context set current db when create mv by use database
         StatementContext mvSqlStatementContext = new 
StatementContext(connectContext,
@@ -77,6 +77,6 @@ public class MVCache {
         List<NamedExpression> mvOutputExpressions = 
mvRewrittenPlan.getExpressions().stream()
                 .map(NamedExpression.class::cast)
                 .collect(Collectors.toList());
-        return new MVCache(mvPlan, mvOutputExpressions);
+        return new MTMVCache(mvPlan, mvOutputExpressions);
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index 416f7f764b6..bdbc3231181 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -33,6 +33,8 @@ import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.mtmv.MTMVJob;
 import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.job.extensions.mtmv.MTMVTask.MTMVTaskTriggerMode;
+import org.apache.doris.job.extensions.mtmv.MTMVTaskContext;
 import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
 import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
 import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
@@ -48,7 +50,7 @@ import java.util.List;
  * when do some operation, do something about job
  */
 public class MTMVJobManager implements MTMVHookService {
-    public static final String MTMV_JOB_PREFIX = "mtmv_";
+    public static final String MTMV_JOB_PREFIX = "inner_mtmv_";
 
     /**
      * create MTMVJob
@@ -174,7 +176,10 @@ public class MTMVJobManager implements MTMVHookService {
             throw new DdlException("jobs not normal,should have one job,but 
job num is: " + jobs.size());
         }
         try {
-            
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), null);
+            MTMVTaskContext mtmvTaskContext = new 
MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, info.getPartitions(),
+                    info.isComplete());
+            
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), 
mtmvTaskContext);
+
         } catch (JobException e) {
             e.printStackTrace();
             throw new DdlException(e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
new file mode 100644
index 00000000000..2b862bfab23
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionInfo.java
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * MTMVPartitionInfo
+ */
+public class MTMVPartitionInfo {
+
+    public enum MTMVPartitionType {
+        FOLLOW_BASE_TABLE,
+        SELF_MANAGE
+    }
+
+    @SerializedName("pt")
+    MTMVPartitionType partitionType;
+    @SerializedName("rt")
+    BaseTableInfo relatedTable;
+    @SerializedName("rc")
+    String relatedCol;
+    @SerializedName("pc")
+    String partitionCol;
+
+    public MTMVPartitionInfo() {
+    }
+
+    public MTMVPartitionInfo(MTMVPartitionType partitionType) {
+        this.partitionType = partitionType;
+    }
+
+    public MTMVPartitionInfo(MTMVPartitionType partitionType,
+            String partitionCol) {
+        this.partitionType = partitionType;
+        this.partitionCol = partitionCol;
+    }
+
+    public MTMVPartitionType getPartitionType() {
+        return partitionType;
+    }
+
+    public void setPartitionType(MTMVPartitionType partitionType) {
+        this.partitionType = partitionType;
+    }
+
+    public BaseTableInfo getRelatedTable() {
+        return relatedTable;
+    }
+
+    public void setRelatedTable(BaseTableInfo relatedTable) {
+        this.relatedTable = relatedTable;
+    }
+
+    public String getRelatedCol() {
+        return relatedCol;
+    }
+
+    public void setRelatedCol(String relatedCol) {
+        this.relatedCol = relatedCol;
+    }
+
+    public String getPartitionCol() {
+        return partitionCol;
+    }
+
+    public void setPartitionCol(String partitionCol) {
+        this.partitionCol = partitionCol;
+    }
+
+    @Override
+    public String toString() {
+        return "MTMVPartitionInfo{"
+                + "partitionType=" + partitionType
+                + ", relatedTable=" + relatedTable
+                + ", relatedCol='" + relatedCol + '\''
+                + ", partitionCol='" + partitionCol + '\''
+                + '}';
+    }
+
+    public String toNameString() {
+        if (partitionType == MTMVPartitionType.SELF_MANAGE) {
+            return "MTMVPartitionInfo{"
+                    + "partitionType=" + partitionType
+                    + '}';
+        } else {
+            return "MTMVPartitionInfo{"
+                    + "partitionType=" + partitionType
+                    + ", relatedTable=" + relatedTable.getTableName()
+                    + ", relatedCol='" + relatedCol + '\''
+                    + ", partitionCol='" + partitionCol + '\''
+                    + '}';
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
new file mode 100644
index 00000000000..a8bc43a159e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.mysql.privilege.Auth;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.exceptions.ParseException;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
+import 
org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.List;
+import java.util.Set;
+
+public class MTMVPlanUtil {
+
+    public static ConnectContext createMTMVContext(MTMV mtmv) throws 
AnalysisException {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setEnv(Env.getCurrentEnv());
+        ctx.setQualifiedUser(Auth.ADMIN_USER);
+        ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
+        ctx.getState().reset();
+        ctx.setThreadLocalInfo();
+        CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()
+                .getCatalogOrAnalysisException(mtmv.getEnvInfo().getCtlId());
+        ctx.changeDefaultCatalog(catalog.getName());
+        
ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName());
+        ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
+        return ctx;
+    }
+
+    public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext 
ctx) {
+        Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx);
+        return generateMTMVRelation(plan);
+    }
+
+    public static MTMVRelation generateMTMVRelation(Plan plan) {
+        return new MTMVRelation(getBaseTables(plan), getBaseViews(plan));
+    }
+
+    private static Set<BaseTableInfo> getBaseTables(Plan plan) {
+        TableCollectorContext collectorContext =
+                new TableCollector.TableCollectorContext(
+                        
com.google.common.collect.Sets.newHashSet(TableType.MATERIALIZED_VIEW, 
TableType.OLAP));
+        plan.accept(TableCollector.INSTANCE, collectorContext);
+        List<TableIf> collectedTables = collectorContext.getCollectedTables();
+        return transferTableIfToInfo(collectedTables);
+    }
+
+    private static Set<BaseTableInfo> getBaseViews(Plan plan) {
+        TableCollectorContext collectorContext =
+                new TableCollector.TableCollectorContext(
+                        
com.google.common.collect.Sets.newHashSet(TableType.VIEW));
+        plan.accept(TableCollector.INSTANCE, collectorContext);
+        List<TableIf> collectedTables = collectorContext.getCollectedTables();
+        return transferTableIfToInfo(collectedTables);
+    }
+
+    private static Set<BaseTableInfo> transferTableIfToInfo(List<TableIf> 
tables) {
+        Set<BaseTableInfo> result = 
com.google.common.collect.Sets.newHashSet();
+        for (TableIf table : tables) {
+            result.add(new BaseTableInfo(table));
+        }
+        return result;
+    }
+
+    private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
+        List<StatementBase> statements;
+        try {
+            statements = new NereidsParser().parseSQL(querySql);
+        } catch (Exception e) {
+            throw new ParseException("Nereids parse failed. " + 
e.getMessage());
+        }
+        StatementBase parsedStmt = statements.get(0);
+        LogicalPlan logicalPlan = ((LogicalPlanAdapter) 
parsedStmt).getLogicalPlan();
+        NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
+        return planner.plan(logicalPlan, PhysicalProperties.ANY, 
ExplainLevel.NONE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
index ed2f0f709f4..0f4f904c573 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
@@ -26,7 +26,8 @@ public class MTMVRefreshEnum {
      * RefreshMethod
      */
     public enum RefreshMethod {
-        COMPLETE //complete
+        COMPLETE, //complete
+        AUTO //try to update incrementally, if not possible, update in full
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
similarity index 50%
rename from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
index 950fc79ef7d..92149b3b465 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelationManager.java
@@ -17,38 +17,19 @@
 
 package org.apache.doris.mtmv;
 
-import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.job.common.TaskStatus;
 import org.apache.doris.job.extensions.mtmv.MTMVTask;
-import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
 import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
-import org.apache.doris.nereids.NereidsPlanner;
-import org.apache.doris.nereids.exceptions.ParseException;
-import org.apache.doris.nereids.glue.LogicalPlanAdapter;
-import org.apache.doris.nereids.parser.NereidsParser;
-import org.apache.doris.nereids.properties.PhysicalProperties;
-import org.apache.doris.nereids.trees.plans.Plan;
-import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
 import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
-import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
-import 
org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
 import org.apache.doris.persist.AlterMTMV;
-import org.apache.doris.qe.ConnectContext;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.CollectionUtils;
@@ -63,126 +44,34 @@ import java.util.Set;
 /**
  * when do some operation, do something about cache
  */
-public class MTMVCacheManager implements MTMVHookService {
-    private static final Logger LOG = 
LogManager.getLogger(MTMVCacheManager.class);
+public class MTMVRelationManager implements MTMVHookService {
+    private static final Logger LOG = 
LogManager.getLogger(MTMVRelationManager.class);
     private Map<BaseTableInfo, Set<BaseTableInfo>> tableMTMVs = 
Maps.newConcurrentMap();
 
     public Set<BaseTableInfo> getMtmvsByBaseTable(BaseTableInfo table) {
         return tableMTMVs.get(table);
     }
 
-    // TODO Implement the method which getting materialized view by tables
-    public List<MTMV> getAvailableMaterializedView(List<BaseTableInfo> tables) 
{
-        return ImmutableList.of();
-    }
-
-    public boolean isAvailableMTMV(MTMV mtmv, ConnectContext ctx) throws 
AnalysisException, DdlException {
-        // check session variable if enable rewrite
-        if (!ctx.getSessionVariable().isEnableMvRewrite()) {
-            return false;
-        }
-        MTMVRelation mtmvRelation = mtmv.getRelation();
-        if (mtmvRelation == null) {
-            return false;
-        }
-        // chaek mv is normal
-        if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
-                && mtmv.getStatus().getRefreshState() == 
MTMVRefreshState.SUCCESS)) {
-            return false;
-        }
-        // check external table
-        boolean containsExternalTable = 
containsExternalTable(mtmvRelation.getBaseTables());
-        if (containsExternalTable) {
-            return ctx.getSessionVariable().isEnableExternalMvRewrite();
-        }
-        // check gracePeriod
-        Long gracePeriod = mtmv.getGracePeriod();
-        // do not care data is delayed
-        if (gracePeriod < 0) {
-            return true;
-        }
-        // compare with base table
-        Long mtmvLastTime = getTableLastVisibleVersionTime(mtmv);
-        Long maxAvailableTime = mtmvLastTime + gracePeriod;
-        for (BaseTableInfo baseTableInfo : mtmvRelation.getBaseTables()) {
-            long tableLastVisibleVersionTime = 
getTableLastVisibleVersionTime(baseTableInfo);
-            if (tableLastVisibleVersionTime > maxAvailableTime) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private long getTableLastVisibleVersionTime(BaseTableInfo baseTableInfo) 
throws AnalysisException, DdlException {
-        Table table = Env.getCurrentEnv().getInternalCatalog()
-                .getDbOrAnalysisException(baseTableInfo.getDbId())
-                .getTableOrDdlException(baseTableInfo.getTableId(), 
TableType.OLAP);
-        return getTableLastVisibleVersionTime((OlapTable) table);
-    }
-
-    private long getTableLastVisibleVersionTime(OlapTable table) {
-        long result = 0L;
-        long visibleVersionTime;
-        for (Partition partition : table.getAllPartitions()) {
-            visibleVersionTime = partition.getVisibleVersionTime();
-            if (visibleVersionTime > result) {
-                result = visibleVersionTime;
-            }
-        }
-        return result;
-    }
-
-    private boolean containsExternalTable(Set<BaseTableInfo> baseTableInfos) {
-        for (BaseTableInfo baseTableInfo : baseTableInfos) {
-            if (InternalCatalog.INTERNAL_CATALOG_ID != 
baseTableInfo.getCtlId()) {
-                return true;
+    public Set<MTMV> getAvailableMTMVs(List<BaseTableInfo> tableInfos) {
+        Set<MTMV> res = Sets.newHashSet();
+        Set<BaseTableInfo> mvInfos = getAvailableMTMVInfos(tableInfos);
+        for (BaseTableInfo tableInfo : mvInfos) {
+            try {
+                res.add((MTMV) MTMVUtil.getTable(tableInfo));
+            } catch (AnalysisException e) {
+                // not throw exception to client, just ignore it
+                LOG.warn("getTable failed: {}", tableInfo.toString(), e);
             }
         }
-        return false;
-    }
-
-    public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext 
ctx) {
-        Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx);
-        return new MTMVRelation(getBaseTables(plan), getBaseViews(plan));
-    }
-
-    private static Set<BaseTableInfo> getBaseTables(Plan plan) {
-        TableCollectorContext collectorContext =
-                new TableCollector.TableCollectorContext(
-                        Sets.newHashSet(TableType.MATERIALIZED_VIEW, 
TableType.OLAP));
-        plan.accept(TableCollector.INSTANCE, collectorContext);
-        List<TableIf> collectedTables = collectorContext.getCollectedTables();
-        return transferTableIfToInfo(collectedTables);
-    }
-
-    private static Set<BaseTableInfo> getBaseViews(Plan plan) {
-        TableCollectorContext collectorContext =
-                new TableCollector.TableCollectorContext(
-                        Sets.newHashSet(TableType.VIEW));
-        plan.accept(TableCollector.INSTANCE, collectorContext);
-        List<TableIf> collectedTables = collectorContext.getCollectedTables();
-        return transferTableIfToInfo(collectedTables);
-    }
-
-    private static Set<BaseTableInfo> transferTableIfToInfo(List<TableIf> 
tables) {
-        Set<BaseTableInfo> result = Sets.newHashSet();
-        for (TableIf table : tables) {
-            result.add(new BaseTableInfo(table));
-        }
-        return result;
+        return res;
     }
 
-    private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
-        List<StatementBase> statements;
-        try {
-            statements = new NereidsParser().parseSQL(querySql);
-        } catch (Exception e) {
-            throw new ParseException("Nereids parse failed. " + 
e.getMessage());
+    public Set<BaseTableInfo> getAvailableMTMVInfos(List<BaseTableInfo> 
tableInfos) {
+        Set<BaseTableInfo> mvInfos = Sets.newHashSet();
+        for (BaseTableInfo tableInfo : tableInfos) {
+            mvInfos.addAll(getMtmvsByBaseTable(tableInfo));
         }
-        StatementBase parsedStmt = statements.get(0);
-        LogicalPlan logicalPlan = ((LogicalPlanAdapter) 
parsedStmt).getLogicalPlan();
-        NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
-        return planner.plan(logicalPlan, PhysicalProperties.ANY, 
ExplainLevel.NONE);
+        return mvInfos;
     }
 
     private Set<BaseTableInfo> getOrCreateMTMVs(BaseTableInfo baseTableInfo) {
@@ -233,6 +122,7 @@ public class MTMVCacheManager implements MTMVHookService {
 
     /**
      * modify `tableMTMVs` by MTMVRelation
+     *
      * @param mtmv
      * @param dbId
      */
@@ -243,6 +133,7 @@ public class MTMVCacheManager implements MTMVHookService {
 
     /**
      * remove cache of mtmv
+     *
      * @param mtmv
      */
     @Override
@@ -262,6 +153,7 @@ public class MTMVCacheManager implements MTMVHookService {
 
     /**
      * modify `tableMTMVs` by MTMVRelation
+     *
      * @param mtmv
      * @param relation
      * @param task
@@ -276,6 +168,7 @@ public class MTMVCacheManager implements MTMVHookService {
 
     /**
      * update mtmv status to `SCHEMA_CHANGE`
+     *
      * @param table
      */
     @Override
@@ -285,6 +178,7 @@ public class MTMVCacheManager implements MTMVHookService {
 
     /**
      * update mtmv status to `SCHEMA_CHANGE`
+     *
      * @param table
      */
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
index e83ca4e9496..9530467dee7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java
@@ -18,10 +18,13 @@
 package org.apache.doris.mtmv;
 
 import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
 import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
 import org.apache.doris.persist.AlterMTMV;
 
@@ -36,16 +39,16 @@ public class MTMVService {
     private static final Logger LOG = LogManager.getLogger(MTMVService.class);
 
     private Map<String, MTMVHookService> hooks = Maps.newConcurrentMap();
-    private MTMVCacheManager cacheManager = new MTMVCacheManager();
+    private MTMVRelationManager relationManager = new MTMVRelationManager();
     private MTMVJobManager jobManager = new MTMVJobManager();
 
     public MTMVService() {
         registerHook("MTMVJobManager", jobManager);
-        registerHook("MTMVCacheManager", cacheManager);
+        registerHook("MTMVRelationManager", relationManager);
     }
 
-    public MTMVCacheManager getCacheManager() {
-        return cacheManager;
+    public MTMVRelationManager getRelationManager() {
+        return relationManager;
     }
 
     public void registerHook(String name, MTMVHookService mtmvHookService) {
@@ -76,8 +79,12 @@ public class MTMVService {
         }
     }
 
-    public void createMTMV(MTMV mtmv) throws DdlException {
+    public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException {
         Objects.requireNonNull(mtmv);
+        if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+            OlapTable relatedTable = (OlapTable) 
MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+            MTMVUtil.alignMvPartition(mtmv, relatedTable);
+        }
         LOG.info("createMTMV: " + mtmv.getName());
         for (MTMVHookService mtmvHookService : hooks.values()) {
             mtmvHookService.createMTMV(mtmv);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
new file mode 100644
index 00000000000..81da2946ff1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVUtil.java
@@ -0,0 +1,489 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.analysis.AddPartitionClause;
+import org.apache.doris.analysis.DropPartitionClause;
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.analysis.SinglePartitionDesc;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
+import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+
+public class MTMVUtil {
+    private static final Logger LOG = LogManager.getLogger(MTMVUtil.class);
+
+    /**
+     * get Table by BaseTableInfo
+     *
+     * @param baseTableInfo
+     * @return
+     * @throws AnalysisException
+     */
+    public static TableIf getTable(BaseTableInfo baseTableInfo) throws 
AnalysisException {
+        TableIf table = Env.getCurrentEnv().getCatalogMgr()
+                .getCatalogOrAnalysisException(baseTableInfo.getCtlId())
+                .getDbOrAnalysisException(baseTableInfo.getDbId())
+                .getTableOrAnalysisException(baseTableInfo.getTableId());
+        return table;
+    }
+
+    /**
+     * Determine whether the mtmv is sync with tables
+     *
+     * @param mtmv
+     * @param tables
+     * @param excludedTriggerTables
+     * @param gracePeriod
+     * @return
+     */
+    public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
+            Set<String> excludedTriggerTables, Long gracePeriod) {
+        return isSync(getTableMinVisibleVersionTime(mtmv), tables, 
excludedTriggerTables, gracePeriod);
+    }
+
+    /**
+     * Determine whether the partition is sync with retated partition and 
other baseTables
+     *
+     * @param mtmv
+     * @param partitionId
+     * @param tables
+     * @param excludedTriggerTables
+     * @param gracePeriod
+     * @return
+     * @throws AnalysisException
+     */
+    public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, 
Set<BaseTableInfo> tables,
+            Set<String> excludedTriggerTables, Long gracePeriod) throws 
AnalysisException {
+        boolean isSyncWithPartition = true;
+        if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+            OlapTable relatedTable = (OlapTable) 
getTable(mtmv.getMvPartitionInfo().getRelatedTable());
+            // if follow base table, not need compare with related table, only 
should compare with related partition
+            excludedTriggerTables.add(relatedTable.getName());
+            PartitionItem item = 
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
+            long relatedPartitionId = getExistPartitionId(item,
+                    relatedTable.getPartitionInfo().getIdToItem(false));
+            if (relatedPartitionId == -1L) {
+                LOG.warn("can not found related partition: " + partitionId);
+                return false;
+            }
+            isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, 
relatedTable, relatedPartitionId);
+        }
+        return isSyncWithPartition && isSync(
+                
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(),
 tables,
+                excludedTriggerTables, gracePeriod);
+
+    }
+
+    /**
+     * Align the partitions of mtmv and related tables, delete more and add 
less
+     *
+     * @param mtmv
+     * @param relatedTable
+     * @throws DdlException
+     * @throws AnalysisException
+     */
+    public static void alignMvPartition(MTMV mtmv, OlapTable relatedTable)
+            throws DdlException, AnalysisException {
+        Map<Long, PartitionItem> relatedTableItems = 
relatedTable.getPartitionInfo().getIdToItem(false);
+        Map<Long, PartitionItem> mtmvItems = 
mtmv.getPartitionInfo().getIdToItem(false);
+        // drop partition of mtmv
+        for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
+            long partitionId = getExistPartitionId(entry.getValue(), 
relatedTableItems);
+            if (partitionId == -1L) {
+                dropPartition(mtmv, entry.getKey());
+            }
+        }
+        // add partition for mtmv
+        for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
+            long partitionId = getExistPartitionId(entry.getValue(), 
mtmvItems);
+            if (partitionId == -1L) {
+                addPartition(mtmv, relatedTable, entry.getKey());
+            }
+        }
+    }
+
+    /**
+     * get mv.partitions which not sync with relatedTable
+     * <p>
+     * Comparing the time of mtmv and relatedTable partitioning,
+     * if the visibleVersionTime of the base table is later,
+     * then the partitioning of this mtmv is considered stale
+     *
+     * @param mtmv
+     * @param relatedTable
+     * @return partitionIds
+     * @throws DdlException when partition can not found
+     */
+    public static Set<Long> getMTMVStalePartitions(MTMV mtmv, OlapTable 
relatedTable)
+            throws AnalysisException {
+        Set<Long> ids = Sets.newHashSet();
+        Map<Long, Set<Long>> mvToBasePartitions = getMvToBasePartitions(mtmv, 
relatedTable);
+        for (Entry<Long, Set<Long>> entry : mvToBasePartitions.entrySet()) {
+            for (Long relatedPartitionId : entry.getValue()) {
+                boolean syncWithRelatedPartition = isSyncWithPartition(mtmv, 
entry.getKey(), relatedTable,
+                        relatedPartitionId);
+                if (!syncWithRelatedPartition) {
+                    ids.add(entry.getKey());
+                    break;
+                }
+            }
+        }
+        return ids;
+    }
+
+    public static List<String> getPartitionNamesByIds(MTMV mtmv, Set<Long> 
ids) throws AnalysisException {
+        List<String> res = Lists.newArrayList();
+        for (Long partitionId : ids) {
+            
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
+        }
+        return res;
+    }
+
+    public static Set<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> 
partitions) throws AnalysisException {
+        Set<Long> res = Sets.newHashSet();
+        for (String partitionName : partitions) {
+            Partition partition = 
mtmv.getPartitionOrAnalysisException(partitionName);
+            res.add(partition.getId());
+        }
+        return res;
+    }
+
+    /**
+     * check if table is sync with all baseTables
+     *
+     * @param mtmv
+     * @return
+     */
+    public static boolean isMTMVSync(MTMV mtmv) {
+        MTMVRelation mtmvRelation = mtmv.getRelation();
+        if (mtmvRelation == null) {
+            return false;
+        }
+        return isMTMVSync(mtmv, mtmv.getRelation().getBaseTables(), 
Sets.newHashSet(), 0L);
+    }
+
+    /**
+     * get not sync tables
+     *
+     * @param mtmv
+     * @param partitionId
+     * @return
+     * @throws AnalysisException
+     */
+    public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long 
partitionId) throws AnalysisException {
+        List<String> res = Lists.newArrayList();
+        long maxAvailableTime = 
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
+        for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) 
{
+            TableIf table = getTable(baseTableInfo);
+            if (!(table instanceof OlapTable)) {
+                continue;
+            }
+            OlapTable olapTable = (OlapTable) table;
+            if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
+                    
.getMvPartitionInfo().getRelatedTable().equals(baseTableInfo)) {
+                PartitionItem item = 
mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
+                long relatedPartitionId = getExistPartitionId(item,
+                        olapTable.getPartitionInfo().getIdToItem(false));
+                if (relatedPartitionId == -1L) {
+                    throw new AnalysisException("can not found related 
partition");
+                }
+                boolean isSyncWithPartition = isSyncWithPartition(mtmv, 
partitionId, olapTable, relatedPartitionId);
+                if (!isSyncWithPartition) {
+                    res.add(olapTable.getName());
+                }
+            } else {
+                long tableLastVisibleVersionTime = 
getTableMaxVisibleVersionTime((OlapTable) table);
+                if (tableLastVisibleVersionTime > maxAvailableTime) {
+                    res.add(table.getName());
+                }
+            }
+        }
+        return res;
+    }
+
+    /**
+     * Determine which partition of mtmv can be rewritten
+     *
+     * @param mtmv
+     * @param ctx
+     * @return
+     */
+    public static Collection<Partition> getMTMVCanRewritePartitions(MTMV mtmv, 
ConnectContext ctx) {
+        List<Partition> res = Lists.newArrayList();
+        Collection<Partition> allPartitions = mtmv.getPartitions();
+        // check session variable if enable rewrite
+        if (!ctx.getSessionVariable().isEnableMvRewrite()) {
+            return res;
+        }
+        MTMVRelation mtmvRelation = mtmv.getRelation();
+        if (mtmvRelation == null) {
+            return res;
+        }
+        // check mv is normal
+        if (!(mtmv.getStatus().getState() == MTMVState.NORMAL
+                && mtmv.getStatus().getRefreshState() == 
MTMVRefreshState.SUCCESS)) {
+            return res;
+        }
+        // check gracePeriod
+        Long gracePeriod = mtmv.getGracePeriod();
+        // do not care data is delayed
+        if (gracePeriod < 0) {
+            return allPartitions;
+        }
+
+        for (Partition partition : allPartitions) {
+            try {
+                if (isMTMVPartitionSync(mtmv, partition.getId(), 
mtmvRelation.getBaseTables(), Sets.newHashSet(),
+                        gracePeriod)) {
+                    res.add(partition);
+                }
+            } catch (AnalysisException e) {
+                // ignore it
+                LOG.warn("check isMTMVPartitionSync failed", e);
+            }
+        }
+        return res;
+    }
+
+    public static Set<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
+        Collection<Partition> allPartitions = mtmv.getPartitions();
+        Set<Long> res = Sets.newHashSet();
+        for (Partition partition : allPartitions) {
+            try {
+                if (!isMTMVPartitionSync(mtmv, partition.getId(), 
mtmv.getRelation().getBaseTables(),
+                        mtmv.getExcludedTriggerTables(),
+                        0L)) {
+                    res.add(partition.getId());
+                }
+            } catch (AnalysisException e) {
+                res.add(partition.getId());
+                LOG.warn("check isMTMVPartitionSync failed", e);
+            }
+        }
+        return res;
+    }
+
+    /**
+     * compare last update time of mtmvPartition and tablePartition
+     *
+     * @param mtmv
+     * @param mtmvPartitionId
+     * @param relatedTable
+     * @param relatedTablePartitionId
+     * @return
+     * @throws AnalysisException
+     */
+    private static boolean isSyncWithPartition(MTMV mtmv, Long 
mtmvPartitionId, OlapTable relatedTable,
+            Long relatedTablePartitionId) throws AnalysisException {
+        return 
mtmv.getPartitionOrAnalysisException(mtmvPartitionId).getVisibleVersionTimeIgnoreInit()
 >= relatedTable
+                
.getPartitionOrAnalysisException(relatedTablePartitionId).getVisibleVersionTimeIgnoreInit();
+    }
+
+    /**
+     * like p_00000101_20170201
+     *
+     * @param desc
+     * @return
+     */
+    private static String generatePartitionName(PartitionKeyDesc desc) {
+        String partitionName = "p_";
+        partitionName += 
desc.toSql().trim().replaceAll("\\(|\\)|\\-|\\[|\\]|'|\\s+", "")
+                .replaceAll("\\(|\\)|\\,|\\[|\\]", "_");
+        if (partitionName.length() > 50) {
+            partitionName = partitionName.substring(0, 30) + 
Math.abs(Objects.hash(partitionName))
+                    + "_" + System.currentTimeMillis();
+        }
+        return partitionName;
+    }
+
+    /**
+     * drop partition of mtmv
+     *
+     * @param mtmv
+     * @param partitionId
+     */
+    private static void dropPartition(MTMV mtmv, Long partitionId) throws 
AnalysisException, DdlException {
+        Partition partition = 
mtmv.getPartitionOrAnalysisException(partitionId);
+        DropPartitionClause dropPartitionClause = new 
DropPartitionClause(false, partition.getName(), false, false);
+        Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, 
dropPartitionClause);
+    }
+
+    /**
+     * add partition for mtmv like relatedPartitionId of relatedTable
+     *
+     * @param mtmv
+     * @param relatedTable
+     * @param relatedPartitionId
+     * @throws AnalysisException
+     * @throws DdlException
+     */
+    private static void addPartition(MTMV mtmv, OlapTable relatedTable, Long 
relatedPartitionId)
+            throws AnalysisException, DdlException {
+        PartitionDesc partitionDesc = 
relatedTable.getPartitionInfo().toPartitionDesc(relatedTable);
+        Partition partition = 
relatedTable.getPartitionOrAnalysisException(relatedPartitionId);
+        SinglePartitionDesc oldPartitionDesc = 
partitionDesc.getSinglePartitionDescByName(partition.getName());
+
+        Map<String, String> partitionProperties = Maps.newHashMap();
+        SinglePartitionDesc singleRangePartitionDesc = new 
SinglePartitionDesc(true,
+                generatePartitionName(oldPartitionDesc.getPartitionKeyDesc()),
+                oldPartitionDesc.getPartitionKeyDesc(), partitionProperties);
+
+        AddPartitionClause addPartitionClause = new 
AddPartitionClause(singleRangePartitionDesc,
+                mtmv.getDefaultDistributionInfo().toDistributionDesc(), 
partitionProperties, false);
+        Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), 
mtmv.getName(), addPartitionClause);
+    }
+
+    /**
+     * compare PartitionItem and return equals partitionId
+     * if not found, return -1L
+     *
+     * @param target
+     * @param sources
+     * @return
+     */
+    private static long getExistPartitionId(PartitionItem target, Map<Long, 
PartitionItem> sources) {
+        for (Entry<Long, PartitionItem> entry : sources.entrySet()) {
+            if (target.equals(entry.getValue())) {
+                return entry.getKey();
+            }
+        }
+        return -1L;
+    }
+
+    /**
+     * Get the maximum update time among all partitions
+     *
+     * @param table
+     * @return
+     */
+    private static long getTableMaxVisibleVersionTime(OlapTable table) {
+        long result = 0L;
+        long visibleVersionTime;
+        for (Partition partition : table.getAllPartitions()) {
+            visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
+            if (visibleVersionTime > result) {
+                result = visibleVersionTime;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Get the minimum update time among all partitions
+     *
+     * @param table
+     * @return
+     */
+    private static long getTableMinVisibleVersionTime(OlapTable table) {
+        long result = Long.MAX_VALUE;
+        long visibleVersionTime;
+        for (Partition partition : table.getAllPartitions()) {
+            visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
+            if (visibleVersionTime < result) {
+                result = visibleVersionTime;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Obtain the partition correspondence between materialized views and base 
tables
+     * Currently, there is a one-to-one correspondence between the partitions 
of materialized views and base tables,
+     * but for scalability reasons, Set is used
+     * <p>
+     * before use this method,should call `alignMvPartition`
+     *
+     * @param mtmv
+     * @param relatedTable
+     * @return mv.partitionId ==> relatedTable.partitionId
+     */
+    private static Map<Long, Set<Long>> getMvToBasePartitions(MTMV mtmv, 
OlapTable relatedTable)
+            throws AnalysisException {
+        HashMap<Long, Set<Long>> res = Maps.newHashMap();
+        Map<Long, PartitionItem> relatedTableItems = 
relatedTable.getPartitionInfo().getIdToItem(false);
+        Map<Long, PartitionItem> mtmvItems = 
mtmv.getPartitionInfo().getIdToItem(false);
+        for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
+            long partitionId = getExistPartitionId(entry.getValue(), 
relatedTableItems);
+            if (partitionId == -1L) {
+                throw new AnalysisException("partition not found: " + 
entry.getValue().toString());
+            }
+            res.put(entry.getKey(), Sets.newHashSet(partitionId));
+        }
+        return res;
+    }
+
+    /**
+     * Determine is sync, ignoring excludedTriggerTables and non OlapTanle
+     *
+     * @param visibleVersionTime
+     * @param tables
+     * @param excludedTriggerTables
+     * @param gracePeriod
+     * @return
+     */
+    private static boolean isSync(long visibleVersionTime, Set<BaseTableInfo> 
tables,
+            Set<String> excludedTriggerTables, Long gracePeriod) {
+        long maxAvailableTime = visibleVersionTime + gracePeriod;
+        for (BaseTableInfo baseTableInfo : tables) {
+            TableIf table = null;
+            try {
+                table = getTable(baseTableInfo);
+            } catch (AnalysisException e) {
+                e.printStackTrace();
+                return false;
+            }
+            if (excludedTriggerTables.contains(table.getName())) {
+                continue;
+            }
+            if (!(table instanceof OlapTable)) {
+                continue;
+            }
+            long tableLastVisibleVersionTime = 
getTableMaxVisibleVersionTime((OlapTable) table);
+            if (tableLastVisibleVersionTime > maxAvailableTime) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 1c4d847e3d1..d0e731b6237 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -30,6 +30,8 @@ import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.job.common.IntervalUnit;
 import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
 import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
 import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
 import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
@@ -89,6 +91,7 @@ import 
org.apache.doris.nereids.DorisParser.GroupingSetContext;
 import org.apache.doris.nereids.DorisParser.HavingClauseContext;
 import org.apache.doris.nereids.DorisParser.HintAssignmentContext;
 import org.apache.doris.nereids.DorisParser.HintStatementContext;
+import org.apache.doris.nereids.DorisParser.IdentifierContext;
 import org.apache.doris.nereids.DorisParser.IdentifierListContext;
 import org.apache.doris.nereids.DorisParser.IdentifierOrTextContext;
 import org.apache.doris.nereids.DorisParser.IdentifierSeqContext;
@@ -112,7 +115,6 @@ import 
org.apache.doris.nereids.DorisParser.LogicalNotContext;
 import org.apache.doris.nereids.DorisParser.MapLiteralContext;
 import org.apache.doris.nereids.DorisParser.MultiStatementsContext;
 import org.apache.doris.nereids.DorisParser.MultipartIdentifierContext;
-import org.apache.doris.nereids.DorisParser.MvRefreshUnitContext;
 import org.apache.doris.nereids.DorisParser.NamedExpressionContext;
 import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext;
 import org.apache.doris.nereids.DorisParser.NullLiteralContext;
@@ -550,10 +552,25 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
                 comment,
                 desc, properties, logicalPlan, querySql,
                 new MTMVRefreshInfo(buildMode, refreshMethod, 
refreshTriggerInfo),
-                ctx.cols == null ? Lists.newArrayList() : 
visitSimpleColumnDefs(ctx.cols)
+                ctx.cols == null ? Lists.newArrayList() : 
visitSimpleColumnDefs(ctx.cols),
+                visitMTMVPartitionInfo(ctx.partitionKey)
         ));
     }
 
+    /**
+     * get MTMVPartitionInfo
+     *
+     * @param ctx IdentifierContext
+     * @return MTMVPartitionInfo
+     */
+    public MTMVPartitionInfo visitMTMVPartitionInfo(IdentifierContext ctx) {
+        if (ctx == null) {
+            return new MTMVPartitionInfo(MTMVPartitionType.SELF_MANAGE);
+        } else {
+            return new MTMVPartitionInfo(MTMVPartitionType.FOLLOW_BASE_TABLE, 
ctx.getText());
+        }
+    }
+
     @Override
     public List<SimpleColumnDefinition> 
visitSimpleColumnDefs(SimpleColumnDefsContext ctx) {
         if (ctx == null) {
@@ -601,19 +618,32 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         int interval = Integer.parseInt(ctx.INTEGER_VALUE().getText());
         String startTime = ctx.STARTS() == null ? null
                 : ctx.STRING_LITERAL().getText().substring(1, 
ctx.STRING_LITERAL().getText().length() - 1);
-        IntervalUnit unit = visitMvRefreshUnit(ctx.mvRefreshUnit());
+        IntervalUnit unit = visitMvRefreshUnit(ctx.refreshUnit);
         return new MTMVRefreshSchedule(startTime, interval, unit);
     }
 
-    @Override
-    public IntervalUnit visitMvRefreshUnit(MvRefreshUnitContext ctx) {
-        return IntervalUnit.valueOf(ctx.getText().toUpperCase());
+    /**
+     * get IntervalUnit,only enable_job_schedule_second_for_test is true, can 
use second
+     *
+     * @param ctx ctx
+     * @return IntervalUnit
+     */
+    public IntervalUnit visitMvRefreshUnit(IdentifierContext ctx) {
+        IntervalUnit intervalUnit = 
IntervalUnit.fromString(ctx.getText().toUpperCase());
+        if (null == intervalUnit) {
+            throw new AnalysisException("interval time unit can not be " + 
ctx.getText());
+        }
+        if (intervalUnit.equals(IntervalUnit.SECOND)
+                && !Config.enable_job_schedule_second_for_test) {
+            throw new AnalysisException("interval time unit can not be 
second");
+        }
+        return intervalUnit;
     }
 
     @Override
     public RefreshMethod visitRefreshMethod(RefreshMethodContext ctx) {
         if (ctx == null) {
-            return RefreshMethod.COMPLETE;
+            return RefreshMethod.AUTO;
         }
         return RefreshMethod.valueOf(ctx.getText().toUpperCase());
     }
@@ -631,7 +661,19 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
     @Override
     public RefreshMTMVCommand visitRefreshMTMV(RefreshMTMVContext ctx) {
         List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
-        return new RefreshMTMVCommand(new RefreshMTMVInfo(new 
TableNameInfo(nameParts)));
+        List<String> partitions = ImmutableList.of();
+        if (ctx.partitionSpec() != null) {
+            if (ctx.partitionSpec().TEMPORARY() != null) {
+                throw new AnalysisException("Not allowed to specify TEMPORARY 
");
+            }
+            if (ctx.partitionSpec().partition != null) {
+                partitions = 
ImmutableList.of(ctx.partitionSpec().partition.getText());
+            } else {
+                partitions = 
visitIdentifierList(ctx.partitionSpec().partitions);
+            }
+        }
+        return new RefreshMTMVCommand(new RefreshMTMVInfo(new 
TableNameInfo(nameParts),
+                partitions, ctx.COMPLETE() != null));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 3139d98f906..483e8e517a0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -82,7 +82,7 @@ public abstract class AbstractMaterializedViewRule {
                     
queryPlan.getGroupExpression().get().getOwnerGroup().getGroupId())) {
                 continue;
             }
-            Plan mvPlan = 
materializationContext.getMtmv().getMvCache().getLogicalPlan();
+            Plan mvPlan = 
materializationContext.getMtmv().getCache().getLogicalPlan();
             List<StructInfo> viewStructInfos = extractStructInfo(mvPlan, 
cascadesContext);
             if (viewStructInfos.size() > 1) {
                 // view struct info should only have one
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
index 8061b1835e6..b6c7234d5bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/InitMaterializationContextHook.java
@@ -24,7 +24,7 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.mtmv.BaseTableInfo;
-import org.apache.doris.mtmv.MTMVCacheManager;
+import org.apache.doris.mtmv.MTMVRelationManager;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.PlannerHook;
@@ -70,10 +70,10 @@ public class InitMaterializationContextHook implements 
PlannerHook {
         }
         List<BaseTableInfo> baseTableUsed =
                 
collectedTables.stream().map(BaseTableInfo::new).collect(Collectors.toList());
-        // TODO the logic should be move to MTMVCacheManager later when 
getAvailableMaterializedView is ready in
+        // TODO the logic should be move to MTMVRelationManager later when 
getAvailableMaterializedView is ready in
         //  MV Cache manager
         Env env = cascadesContext.getConnectContext().getEnv();
-        MTMVCacheManager cacheManager = env.getMtmvService().getCacheManager();
+        MTMVRelationManager cacheManager = 
env.getMtmvService().getRelationManager();
         Set<BaseTableInfo> materializedViews = new HashSet<>();
         for (BaseTableInfo baseTableInfo : baseTableUsed) {
             Set<BaseTableInfo> mtmvsByBaseTable = 
cacheManager.getMtmvsByBaseTable(baseTableInfo);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
index 3e1fe99c9c8..4f0b63d2ee7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
@@ -19,7 +19,7 @@ package org.apache.doris.nereids.rules.exploration.mv;
 
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.Table;
-import org.apache.doris.mtmv.MVCache;
+import org.apache.doris.mtmv.MTMVCache;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.memo.GroupId;
 import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
@@ -59,17 +59,17 @@ public class MaterializationContext {
         this.mvScanPlan = mvScanPlan;
         this.baseTables = baseTables;
         this.baseViews = baseViews;
-        MVCache mvCache = mtmv.getMvCache();
+        MTMVCache mtmvCache = mtmv.getCache();
         // TODO This logic should move to materialized view cache manager
-        if (mvCache == null) {
-            mvCache = MVCache.from(mtmv, cascadesContext.getConnectContext());
-            mtmv.setMvCache(mvCache);
+        if (mtmvCache == null) {
+            mtmvCache = mtmvCache.from(mtmv, 
cascadesContext.getConnectContext());
+            mtmv.setCache(mtmvCache);
         }
         // mv output expression shuttle, this will be used to expression 
rewrite
         this.mvExprToMvScanExprMapping = ExpressionMapping.generate(
                 ExpressionUtils.shuttleExpressionWithLineage(
-                        mvCache.getMvOutputExpressions(),
-                        mvCache.getLogicalPlan()),
+                        mtmvCache.getMvOutputExpressions(),
+                        mtmvCache.getLogicalPlan()),
                 mvScanPlan.getExpressions());
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java
index 9aadf0f1cf0..ab9d6e35c9d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterMTMVCommand.java
@@ -31,7 +31,7 @@ import java.util.Objects;
 /**
  * alter multi table materialized view
  */
-public class AlterMTMVCommand extends Command implements ForwardWithSync {
+public class AlterMTMVCommand extends Command implements ForwardWithSync, 
NotAllowFallback {
 
     public static final Logger LOG = 
LogManager.getLogger(AlterMTMVCommand.class);
     private final AlterMTMVInfo alterMTMVInfo;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java
index c246991e618..814804b8ce9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMTMVCommand.java
@@ -32,7 +32,7 @@ import java.util.Objects;
 /**
  * create multi table materialized view
  */
-public class CreateMTMVCommand extends Command implements ForwardWithSync {
+public class CreateMTMVCommand extends Command implements ForwardWithSync, 
NotAllowFallback {
 
     public static final Logger LOG = 
LogManager.getLogger(CreateMTMVCommand.class);
     private final CreateMTMVInfo createMTMVInfo;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java
index a0d614b163c..19eedb82746 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropMTMVCommand.java
@@ -29,7 +29,7 @@ import java.util.Objects;
 /**
  * refresh mtmv
  */
-public class DropMTMVCommand extends Command implements ForwardWithSync {
+public class DropMTMVCommand extends Command implements ForwardWithSync, 
NotAllowFallback {
     private final DropMTMVInfo dropMTMVInfo;
 
     public DropMTMVCommand(DropMTMVInfo dropMTMVInfo) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NotAllowFallback.java
similarity index 54%
copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NotAllowFallback.java
index ed2f0f709f4..72d8c82e599 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NotAllowFallback.java
@@ -15,51 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.mtmv;
+package org.apache.doris.nereids.trees.plans.commands;
 
 /**
- * refresh enum
+ * The class that implements this interface does not allow fallback to 
OriginalPlanner,
+ * for example, some new features are not implemented by the old parser
  */
-public class MTMVRefreshEnum {
-
-    /**
-     * RefreshMethod
-     */
-    public enum RefreshMethod {
-        COMPLETE //complete
-    }
-
-    /**
-     * BuildMode
-     */
-    public enum BuildMode {
-        IMMEDIATE, //right now
-        DEFERRED // deferred
-    }
-
-    /**
-     * RefreshTrigger
-     */
-    public enum RefreshTrigger {
-        MANUAL, //manual
-        SCHEDULE // schedule
-    }
-
-    /**
-     * MTMVState
-     */
-    public enum MTMVState {
-        INIT,
-        NORMAL,
-        SCHEMA_CHANGE
-    }
-
-    /**
-     * MTMVRefreshState
-     */
-    public enum MTMVRefreshState {
-        INIT,
-        FAIL,
-        SUCCESS
-    }
+public interface NotAllowFallback {
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java
index 982e8d86257..a918112555e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/RefreshMTMVCommand.java
@@ -29,7 +29,7 @@ import java.util.Objects;
 /**
  * refresh mtmv
  */
-public class RefreshMTMVCommand extends Command implements ForwardWithSync {
+public class RefreshMTMVCommand extends Command implements ForwardWithSync, 
NotAllowFallback {
     private final RefreshMTMVInfo refreshMTMVInfo;
 
     public RefreshMTMVCommand(RefreshMTMVInfo refreshMTMVInfo) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index b221e507b82..ae4e048f1c6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -17,12 +17,15 @@
 
 package org.apache.doris.nereids.trees.plans.commands;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.nereids.analyzer.UnboundRelation;
 import org.apache.doris.nereids.analyzer.UnboundSlot;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.parser.NereidsParser;
@@ -33,11 +36,13 @@ import org.apache.doris.nereids.trees.expressions.LessThan;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
-import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
 import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -62,38 +67,41 @@ public class UpdateMvByPartitionCommand extends 
InsertOverwriteTableCommand {
      * Construct command
      *
      * @param mv materialize view
-     * @param partitions update partitions in mv and tables
+     * @param partitionIds update partitions in mv and tables
      * @param tableWithPartKey the partitions key for different table
      * @return command
      */
-    public static UpdateMvByPartitionCommand from(MTMV mv, Set<PartitionItem> 
partitions,
+    public static UpdateMvByPartitionCommand from(MTMV mv, Set<Long> 
partitionIds,
             Map<OlapTable, String> tableWithPartKey) {
         NereidsParser parser = new NereidsParser();
         Map<OlapTable, Set<Expression>> predicates =
-                constructTableWithPredicates(partitions, tableWithPartKey);
-        List<String> parts = constructPartsForMv(mv, partitions);
+                constructTableWithPredicates(mv, partitionIds, 
tableWithPartKey);
+        List<String> parts = constructPartsForMv(mv, partitionIds);
         Plan plan = parser.parseSingle(mv.getQuerySql());
         plan = plan.accept(new PredicateAdder(), predicates);
+        if (plan instanceof Sink) {
+            plan = plan.child(0);
+        }
         UnboundTableSink<? extends Plan> sink =
                 new UnboundTableSink<>(mv.getFullQualifiers(), 
ImmutableList.of(), ImmutableList.of(),
                         parts, plan);
         return new UpdateMvByPartitionCommand(sink);
     }
 
-    private static List<String> constructPartsForMv(MTMV mv, 
Set<PartitionItem> partitions) {
-        return mv.getPartitionNames().stream()
-                .filter(name -> {
-                    PartitionItem mvPartItem = 
mv.getPartitionInfo().getItem(mv.getPartition(name).getId());
-                    return partitions.stream().anyMatch(p -> 
p.getIntersect(mvPartItem) != null);
-                })
+    private static List<String> constructPartsForMv(MTMV mv, Set<Long> 
partitionIds) {
+        return partitionIds.stream()
+                .map(id -> mv.getPartition(id).getName())
                 .collect(ImmutableList.toImmutableList());
     }
 
-    private static Map<OlapTable, Set<Expression>> 
constructTableWithPredicates(Set<PartitionItem> partitions,
-            Map<OlapTable, String> tableWithPartKey) {
+    private static Map<OlapTable, Set<Expression>> 
constructTableWithPredicates(MTMV mv,
+            Set<Long> partitionIds, Map<OlapTable, String> tableWithPartKey) {
+        Set<PartitionItem> items = partitionIds.stream()
+                .map(id -> mv.getPartitionInfo().getItem(id))
+                .collect(ImmutableSet.toImmutableSet());
         ImmutableMap.Builder<OlapTable, Set<Expression>> builder = new 
ImmutableMap.Builder<>();
         tableWithPartKey.forEach((table, colName) ->
-                builder.put(table, constructPredicates(partitions, colName))
+                builder.put(table, constructPredicates(items, colName))
         );
         return builder.build();
     }
@@ -131,8 +139,15 @@ public class UpdateMvByPartitionCommand extends 
InsertOverwriteTableCommand {
 
     static class PredicateAdder extends DefaultPlanRewriter<Map<OlapTable, 
Set<Expression>>> {
         @Override
-        public Plan visitLogicalOlapScan(LogicalOlapScan scan, Map<OlapTable, 
Set<Expression>> predicates) {
-            return new LogicalFilter<>(predicates.get(scan.getTable()), scan);
+        public Plan visitUnboundRelation(UnboundRelation unboundRelation, 
Map<OlapTable, Set<Expression>> predicates) {
+            List<String> tableQualifier = 
RelationUtil.getQualifierName(ConnectContext.get(),
+                    unboundRelation.getNameParts());
+            TableIf table = RelationUtil.getTable(tableQualifier, 
Env.getCurrentEnv());
+            if (table instanceof OlapTable && predicates.containsKey(table)) {
+                return new 
LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))),
+                        unboundRelation);
+            }
+            return unboundRelation;
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
index 4ea2a3b6982..66bc1b22107 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
@@ -19,28 +19,41 @@ package org.apache.doris.nereids.trees.plans.commands.info;
 
 import org.apache.doris.analysis.CreateMTMVStmt;
 import org.apache.doris.analysis.KeysDesc;
+import org.apache.doris.analysis.ListPartitionDesc;
+import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.RangePartitionDesc;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.mtmv.EnvInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
+import org.apache.doris.mtmv.MTMVPlanUtil;
 import org.apache.doris.mtmv.MTMVRefreshInfo;
+import org.apache.doris.mtmv.MTMVRelation;
+import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
+import 
org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
 import org.apache.doris.nereids.trees.TreeNode;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
 import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import 
org.apache.doris.nereids.trees.plans.visitor.NondeterministicFunctionCollector;
 import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
 import 
org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
@@ -56,6 +69,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -78,6 +92,9 @@ public class CreateMTMVInfo {
     private final List<ColumnDefinition> columns = Lists.newArrayList();
     private final List<SimpleColumnDefinition> simpleColumnDefinitions;
     private final EnvInfo envInfo;
+    private final MTMVPartitionInfo mvPartitionInfo;
+    private PartitionDesc partitionDesc;
+    private MTMVRelation relation;
 
     /**
      * constructor for create MTMV
@@ -87,7 +104,8 @@ public class CreateMTMVInfo {
             DistributionDescriptor distribution, Map<String, String> 
properties,
             LogicalPlan logicalQuery, String querySql,
             MTMVRefreshInfo refreshInfo,
-            List<SimpleColumnDefinition> simpleColumnDefinitions) {
+            List<SimpleColumnDefinition> simpleColumnDefinitions,
+            MTMVPartitionInfo mvPartitionInfo) {
         this.ifNotExists = Objects.requireNonNull(ifNotExists, "require 
ifNotExists object");
         this.mvName = Objects.requireNonNull(mvName, "require mvName object");
         this.keys = Utils.copyRequiredList(keys);
@@ -101,6 +119,8 @@ public class CreateMTMVInfo {
                 .requireNonNull(simpleColumnDefinitions, "require 
simpleColumnDefinitions object");
         this.envInfo = new 
EnvInfo(ConnectContext.get().getCurrentCatalog().getId(),
                 ConnectContext.get().getCurrentDbId());
+        this.mvPartitionInfo = Objects
+                .requireNonNull(mvPartitionInfo, "require mtmvPartitionInfo 
object");
     }
 
     /**
@@ -154,6 +174,11 @@ public class CreateMTMVInfo {
             mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, 
gracePeriod);
             properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
         }
+        if 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
+            String excludedTriggerTables = 
properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
+            
mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, 
excludedTriggerTables);
+            
properties.remove(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
+        }
     }
 
     /**
@@ -163,22 +188,93 @@ public class CreateMTMVInfo {
         // create table as select
         NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
         Plan plan = planner.plan(logicalQuery, PhysicalProperties.ANY, 
ExplainLevel.ALL_PLAN);
+        if (plan.anyMatch(node -> node instanceof OneRowRelation)) {
+            throw new AnalysisException("at least contain one table");
+        }
+        // can not contain VIEW or MTMV
         analyzeBaseTables(plan);
-        analyzeExpressions((PhysicalPlan) plan);
+        // can not contain Random function
+        analyzeExpressions(planner.getAnalyzedPlan());
+        // can not contain partition or tablets
+        boolean containTableQueryOperator = 
MaterializedViewUtils.containTableQueryOperator(planner.getAnalyzedPlan());
+        if (containTableQueryOperator) {
+            throw new AnalysisException("can not contain invalid expression");
+        }
+        getRelation(planner);
         getColumns(plan);
+        analyzePartition(planner);
+    }
+
+    private void getRelation(NereidsPlanner planner) {
+        Plan plan = planner.plan(logicalQuery, PhysicalProperties.ANY, 
ExplainLevel.NONE);
+        this.relation = MTMVPlanUtil.generateMTMVRelation(plan);
+    }
+
+    private void analyzePartition(NereidsPlanner planner) {
+        if (mvPartitionInfo.getPartitionType() == 
MTMVPartitionType.FOLLOW_BASE_TABLE) {
+            Plan mvRewrittenPlan =
+                    planner.plan(logicalQuery, PhysicalProperties.ANY, 
ExplainLevel.REWRITTEN_PLAN);
+            Optional<RelatedTableInfo> relatedTableInfo = MaterializedViewUtils
+                    .getRelatedTableInfo(mvPartitionInfo.getPartitionCol(), 
mvRewrittenPlan);
+            if (!relatedTableInfo.isPresent() || 
!relatedTableInfo.get().isPctPossible()) {
+                throw new AnalysisException("Unable to find a suitable base 
table for partitioning");
+            }
+            TableIf followTable = null;
+            try {
+                followTable = 
MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
+            } catch (org.apache.doris.common.AnalysisException e) {
+                throw new AnalysisException(e.getMessage(), e);
+            }
+            if (!(followTable instanceof OlapTable)) {
+                throw new AnalysisException("base table for partitioning only 
can be OlapTable.");
+            }
+            Set<String> partitionColumnNames;
+            try {
+                partitionColumnNames = ((OlapTable) 
followTable).getPartitionColumnNames();
+            } catch (DdlException e) {
+                throw new AnalysisException(e.getMessage(), e);
+            }
+
+            if 
(!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
+                throw new AnalysisException("error related column: " + 
relatedTableInfo.get().getColumn());
+            }
+            if (partitionColumnNames.size() != 1) {
+                throw new AnalysisException("base table for partitioning only 
support single column.");
+            }
+            
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
+            mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
+            partitionDesc = generatePartitionDesc((OlapTable) followTable);
+        }
+    }
+
+    private PartitionDesc generatePartitionDesc(OlapTable relatedTable) {
+        PartitionType type = relatedTable.getPartitionInfo().getType();
+        try {
+            if (type == PartitionType.RANGE) {
+                return new 
RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
+                        Lists.newArrayList());
+            } else if (type == PartitionType.LIST) {
+                return new 
ListPartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
+                        Lists.newArrayList());
+            } else {
+                return null;
+            }
+        } catch (org.apache.doris.common.AnalysisException e) {
+            throw new AnalysisException("can not generate partitionDesc", e);
+        }
     }
 
     private void analyzeBaseTables(Plan plan) {
         TableCollectorContext collectorContext =
-                new 
TableCollector.TableCollectorContext(Sets.newHashSet(TableType.MATERIALIZED_VIEW));
+                new 
TableCollector.TableCollectorContext(Sets.newHashSet(TableType.MATERIALIZED_VIEW,
 TableType.VIEW));
         plan.accept(TableCollector.INSTANCE, collectorContext);
         List<TableIf> collectedTables = collectorContext.getCollectedTables();
         if (!CollectionUtils.isEmpty(collectedTables)) {
-            throw new AnalysisException("can not contain MATERIALIZED_VIEW");
+            throw new AnalysisException("can not contain MATERIALIZED_VIEW or 
VIEW");
         }
     }
 
-    private void analyzeExpressions(PhysicalPlan plan) {
+    private void analyzeExpressions(Plan plan) {
         List<TreeNode<Expression>> functionCollectResult = new ArrayList<>();
         plan.accept(NondeterministicFunctionCollector.INSTANCE, 
functionCollectResult);
         if (!CollectionUtils.isEmpty(functionCollectResult)) {
@@ -225,7 +321,8 @@ public class CreateMTMVInfo {
                 .map(ColumnDefinition::translateToCatalogStyle)
                 .collect(Collectors.toList());
         return new CreateMTMVStmt(ifNotExists, tableName, catalogColumns, 
refreshInfo, keysDesc,
-                distribution.translateToCatalogStyle(), properties, 
mvProperties, querySql, comment, envInfo);
+                distribution.translateToCatalogStyle(), properties, 
mvProperties, querySql, comment, envInfo,
+                partitionDesc, mvPartitionInfo, relation);
     }
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
index 422b9697c7c..e6a4368a850 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/RefreshMTMVInfo.java
@@ -17,12 +17,22 @@
 
 package org.apache.doris.nereids.trees.plans.commands.info;
 
+import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.qe.ConnectContext;
 
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -30,13 +40,18 @@ import java.util.Objects;
  */
 public class RefreshMTMVInfo {
     private final TableNameInfo mvName;
+    private List<String> partitions;
+    private boolean isComplete;
 
-    public RefreshMTMVInfo(TableNameInfo mvName) {
+    public RefreshMTMVInfo(TableNameInfo mvName, List<String> partitions, 
boolean isComplete) {
         this.mvName = Objects.requireNonNull(mvName, "require mvName object");
+        this.partitions = Utils.copyRequiredList(partitions);
+        this.isComplete = Objects.requireNonNull(isComplete, "require 
isComplete object");
     }
 
     /**
      * analyze refresh info
+     *
      * @param ctx ConnectContext
      */
     public void analyze(ConnectContext ctx) {
@@ -48,13 +63,41 @@ public class RefreshMTMVInfo {
                     mvName.getDb() + ": " + mvName.getTbl());
             throw new AnalysisException(message);
         }
+        try {
+            Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(mvName.getDb());
+            MTMV mtmv = (MTMV) db.getTableOrMetaException(mvName.getTbl(), 
TableType.MATERIALIZED_VIEW);
+            if (!CollectionUtils.isEmpty(partitions)) {
+                MTMVUtil.getPartitionsIdsByNames(mtmv, partitions);
+            }
+        } catch (org.apache.doris.common.AnalysisException | 
MetaNotFoundException | DdlException e) {
+            throw new AnalysisException(e.getMessage());
+        }
     }
 
     /**
      * getMvName
+     *
      * @return TableNameInfo
      */
     public TableNameInfo getMvName() {
         return mvName;
     }
+
+    /**
+     * getPartitions
+     *
+     * @return partitionNames
+     */
+    public List<String> getPartitions() {
+        return partitions;
+    }
+
+    /**
+     * isComplete
+     *
+     * @return isComplete
+     */
+    public boolean isComplete() {
+        return isComplete;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 92d1d47e5e5..ff837283056 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -127,6 +127,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.Command;
 import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.Forward;
 import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.OlapScanNode;
@@ -439,6 +440,14 @@ public class StmtExecutor {
         execute(queryId);
     }
 
+    public boolean notAllowFallback() {
+        if (parsedStmt instanceof LogicalPlanAdapter) {
+            LogicalPlan logicalPlan = ((LogicalPlanAdapter) 
parsedStmt).getLogicalPlan();
+            return logicalPlan instanceof NotAllowFallback;
+        }
+        return false;
+    }
+
     public void execute(TUniqueId queryId) throws Exception {
         SessionVariable sessionVariable = context.getSessionVariable();
         if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
@@ -457,6 +466,10 @@ public class StmtExecutor {
                     // try to fall back to legacy planner
                     LOG.debug("nereids cannot process statement\n" + 
originStmt.originStmt
                             + "\n because of " + e.getMessage(), e);
+                    if (notAllowFallback()) {
+                        LOG.warn("Analyze failed. {}", 
context.getQueryIdentifier(), e);
+                        throw ((NereidsException) e).getException();
+                    }
                     boolean isInsertIntoCommand = parsedStmt != null && 
parsedStmt instanceof LogicalPlanAdapter
                             && ((LogicalPlanAdapter) 
parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand;
                     if (e instanceof NereidsException
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index f2a7bccdb00..e8620b105b6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
 import org.apache.doris.qe.ConnectContext;
@@ -537,6 +538,8 @@ public class MetadataGenerator {
                 trow.addToColumnValue(new 
TCell().setStringVal(mv.getQuerySql()));
                 trow.addToColumnValue(new 
TCell().setStringVal(mv.getEnvInfo().toString()));
                 trow.addToColumnValue(new 
TCell().setStringVal(mv.getMvProperties().toString()));
+                trow.addToColumnValue(new 
TCell().setStringVal(mv.getMvPartitionInfo().toNameString()));
+                trow.addToColumnValue(new 
TCell().setBoolVal(MTMVUtil.isMTMVSync(mv)));
                 dataBatch.add(trow);
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
index af0efcc1ce5..44e3ed58400 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
@@ -54,7 +54,9 @@ public class MvInfosTableValuedFunction extends 
MetadataTableValuedFunction {
             new Column("RefreshInfo", ScalarType.createStringType()),
             new Column("QuerySql", ScalarType.createStringType()),
             new Column("EnvInfo", ScalarType.createStringType()),
-            new Column("MvProperties", ScalarType.createStringType()));
+            new Column("MvProperties", ScalarType.createStringType()),
+            new Column("MvPartitionInfo", ScalarType.createStringType()),
+            new Column("SyncWithBaseTables", 
ScalarType.createType(PrimitiveType.BOOLEAN)));
 
     private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to