This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 700719a93c5 [feat](mtmv)Paimon queries the data in the cache instead 
of querying the latest data (#44911)
700719a93c5 is described below

commit 700719a93c570b89761a7f0ecf78c04fd1c0a4ec
Author: zhangdong <zhangd...@selectdb.com>
AuthorDate: Wed Dec 11 10:53:08 2024 +0800

    [feat](mtmv)Paimon queries the data in the cache instead of querying the 
latest data (#44911)
    
    ### What problem does this PR solve?
    Problem Summary:
    - add `PaimonMetadataCacheMgr` in `ExternalMetaCacheMgr` to manage
    snapshotCache of paimon table
    - move paimonSchemaCache to PaimonMetadataCacheMgr, and add schemaId as
    part of key
    - PaimonExternalTable overrides the methods in ExternalTable and
    supports partition pruning
    - PaimonExternalTable implements the MvcTable interface, supporting the
    retrieval of snapshot data from the cache during queries to avoid cache
    refreshes that may result in different versions of metadata being used
    in a single query
    - MTMVTask retrieves snapshot data of mvccTable before the task starts
    to avoid cache refresh that may result in different versions of metadata
    being used in a single refresh task
    
    ### Release note
    Paimon queries the data in the cache instead of querying the latest data
    
    ### behavior change
    behavior changes of query  paimon table:
    - FE has just started and is query the latest data
    - Paimon data has changed, Doris is still query the previous data
    - After the snapshot cache expires, Doris will query the latest data
    - desc paimon; The schema corresponding to the snapshotId in the
    snapshot cache is displayed
---
 .../apache/doris/datasource/ExternalCatalog.java   |   9 +-
 .../doris/datasource/ExternalMetaCacheMgr.java     |  12 +
 .../doris/datasource/ExternalSchemaCache.java      |   6 +-
 .../org/apache/doris/datasource/ExternalTable.java |   7 +-
 .../doris/datasource/hive/HMSExternalTable.java    |   5 +
 .../org/apache/doris/datasource/mvcc/MvccUtil.java |  44 +++
 .../datasource/paimon/PaimonExternalTable.java     | 304 ++++++++-------------
 .../datasource/paimon/PaimonMetadataCache.java     | 144 ++++++++++
 .../datasource/paimon/PaimonMetadataCacheMgr.java  |  49 ++++
 ...nPartitionInfo.java => PaimonMvccSnapshot.java} |  30 +-
 .../datasource/paimon/PaimonPartitionInfo.java     |   4 +-
 ...{PaimonPartitionInfo.java => PaimonSchema.java} |  34 ++-
 .../datasource/paimon/PaimonSchemaCacheKey.java    |  55 ++++
 .../datasource/paimon/PaimonSchemaCacheValue.java  |  25 +-
 ...aimonPartitionInfo.java => PaimonSnapshot.java} |  32 +--
 .../datasource/paimon/PaimonSnapshotCacheKey.java  |  75 +++++
 ...cheValue.java => PaimonSnapshotCacheValue.java} |  37 +--
 .../apache/doris/datasource/paimon/PaimonUtil.java | 122 ++++++++-
 .../datasource/paimon/source/PaimonSource.java     |   3 +-
 .../apache/doris/job/extensions/mtmv/MTMVTask.java |  14 +
 .../org/apache/doris/nereids/StatementContext.java |  26 +-
 .../rules/rewrite/PruneFileScanPartition.java      |   9 +-
 .../plans/commands/UpdateMvByPartitionCommand.java |   7 +-
 .../trees/plans/logical/LogicalFileScan.java       |   7 +-
 .../data/mtmv_p0/test_paimon_rewrite_mtmv.out      |  16 ++
 .../suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy |  95 +++++++
 26 files changed, 839 insertions(+), 332 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index cde08113373..d7cbee18c74 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.es.EsExternalDatabase;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalDatabase;
@@ -432,13 +433,13 @@ public abstract class ExternalCatalog
         }
     }
 
-    public final Optional<SchemaCacheValue> getSchema(String dbName, String 
tblName) {
+    public final Optional<SchemaCacheValue> getSchema(SchemaCacheKey key) {
         makeSureInitialized();
-        Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName);
+        Optional<ExternalDatabase<? extends ExternalTable>> db = 
getDb(key.getDbName());
         if (db.isPresent()) {
-            Optional<? extends ExternalTable> table = 
db.get().getTable(tblName);
+            Optional<? extends ExternalTable> table = 
db.get().getTable(key.getTblName());
             if (table.isPresent()) {
-                return table.get().initSchemaAndUpdateTime();
+                return table.get().initSchemaAndUpdateTime(key);
             }
         }
         return Optional.empty();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index cc40ad292ce..24f55e74266 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -31,6 +31,8 @@ import 
org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
 import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
 import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
 import org.apache.doris.datasource.metacache.MetaCache;
+import org.apache.doris.datasource.paimon.PaimonMetadataCache;
+import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
 import org.apache.doris.fs.FileSystemCache;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 
@@ -92,6 +94,7 @@ public class ExternalMetaCacheMgr {
     private ExternalRowCountCache rowCountCache;
     private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
     private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
+    private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
 
     public ExternalMetaCacheMgr() {
         rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
@@ -122,6 +125,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor);
         icebergMetadataCacheMgr = new 
IcebergMetadataCacheMgr(commonRefreshExecutor);
         maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
+        paimonMetadataCacheMgr = new 
PaimonMetadataCacheMgr(commonRefreshExecutor);
     }
 
     public ExecutorService getFileListingExecutor() {
@@ -167,6 +171,10 @@ public class ExternalMetaCacheMgr {
         return icebergMetadataCacheMgr.getIcebergMetadataCache();
     }
 
+    public PaimonMetadataCache getPaimonMetadataCache() {
+        return paimonMetadataCacheMgr.getPaimonMetadataCache();
+    }
+
     public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
         return 
maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
     }
@@ -189,6 +197,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr.removePartitionProcessor(catalogId);
         icebergMetadataCacheMgr.removeCache(catalogId);
         maxComputeMetadataCacheMgr.removeCache(catalogId);
+        paimonMetadataCacheMgr.removeCache(catalogId);
     }
 
     public void invalidateTableCache(long catalogId, String dbName, String 
tblName) {
@@ -204,6 +213,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
         icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, 
tblName);
         maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, 
tblName);
+        paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, 
tblName);
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, 
tblName, catalogId);
         }
@@ -222,6 +232,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
         icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
         maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
+        paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid db cache for {} in catalog {}", dbName, 
catalogId);
         }
@@ -239,6 +250,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr.cleanPartitionProcess(catalogId);
         icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
         maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
+        paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid catalog cache for {}", catalogId);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index a0558766e81..de3eeff75d9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -74,7 +74,7 @@ public class ExternalSchemaCache {
     }
 
     private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) {
-        Optional<SchemaCacheValue> schema = catalog.getSchema(key.dbName, 
key.tblName);
+        Optional<SchemaCacheValue> schema = catalog.getSchema(key);
         if (LOG.isDebugEnabled()) {
             LOG.debug("load schema for {} in catalog {}", key, 
catalog.getName());
         }
@@ -83,6 +83,10 @@ public class ExternalSchemaCache {
 
     public Optional<SchemaCacheValue> getSchemaValue(String dbName, String 
tblName) {
         SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
+        return getSchemaValue(key);
+    }
+
+    public Optional<SchemaCacheValue> getSchemaValue(SchemaCacheKey key) {
         return schemaCache.get(key);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 5451a219edf..91df061678f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -317,8 +318,12 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
      *
      * @return
      */
-    public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
+    public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey 
key) {
         schemaUpdateTime = System.currentTimeMillis();
+        return initSchema(key);
+    }
+
+    public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
         return initSchema();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 7584b5b392f..da4670d6d05 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -30,6 +30,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.TablePartitionValues;
@@ -501,6 +502,10 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     @Override
+    public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey 
key) {
+        return initSchemaAndUpdateTime();
+    }
+
     public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
         org.apache.hadoop.hive.metastore.api.Table table = 
((HMSExternalCatalog) catalog).getClient()
                 .getTable(dbName, name);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java
new file mode 100644
index 00000000000..ffdaff770e2
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.mvcc;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Optional;
+
+public class MvccUtil {
+    /**
+     * get Snapshot From StatementContext
+     *
+     * @param tableIf
+     * @return MvccSnapshot
+     */
+    public static Optional<MvccSnapshot> getSnapshotFromContext(TableIf 
tableIf) {
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext == null) {
+            return Optional.empty();
+        }
+        StatementContext statementContext = 
connectContext.getStatementContext();
+        if (statementContext == null) {
+            return Optional.empty();
+        }
+        return statementContext.getSnapshot(tableIf);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 7fe3c858448..8ddfca886d9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -22,13 +22,16 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionType;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.mtmv.MTMVBaseTableIf;
 import org.apache.doris.mtmv.MTMVRefreshContext;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
@@ -45,25 +48,20 @@ import org.apache.doris.thrift.TTableType;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.system.PartitionsTable;
-import org.apache.paimon.table.system.SnapshotsTable;
-import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.table.system.SchemasTable;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DecimalType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowType;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,12 +69,15 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public class PaimonExternalTable extends ExternalTable implements 
MTMVRelatedTableIf, MTMVBaseTableIf {
+public class PaimonExternalTable extends ExternalTable implements 
MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
 
     private static final Logger LOG = 
LogManager.getLogger(PaimonExternalTable.class);
 
+    private final Table paimonTable;
+
     public PaimonExternalTable(long id, String name, String dbName, 
PaimonExternalCatalog catalog) {
         super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE);
+        this.paimonTable = catalog.getPaimonTable(dbName, name);
     }
 
     public String getPaimonCatalogType() {
@@ -90,176 +91,27 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
         }
     }
 
-    public Table getPaimonTable() {
-        makeSureInitialized();
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) 
value).getPaimonTable()).orElse(null);
+    public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
+        return paimonTable.copy(
+                Collections.singletonMap(CoreOptions.SCAN_VERSION.key(),
+                        
String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId())));
     }
 
-    private PaimonPartitionInfo getPartitionInfoFromCache() {
-        makeSureInitialized();
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+    public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) {
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+                new PaimonSchemaCacheKey(dbName, name, schemaId));
         if (!schemaCacheValue.isPresent()) {
-            return new PaimonPartitionInfo();
+            throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+                    null, catalog.getName(), dbName, name, schemaId);
         }
-        return ((PaimonSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+        return (PaimonSchemaCacheValue) schemaCacheValue.get();
     }
 
-    private List<Column> getPartitionColumnsFromCache() {
+    private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() {
         makeSureInitialized();
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        if (!schemaCacheValue.isPresent()) {
-            return Lists.newArrayList();
-        }
-        return ((PaimonSchemaCacheValue) 
schemaCacheValue.get()).getPartitionColumns();
-    }
-
-    public long getLatestSnapshotIdFromCache() throws AnalysisException {
-        makeSureInitialized();
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        if (!schemaCacheValue.isPresent()) {
-            throw new AnalysisException("not present");
-        }
-        return ((PaimonSchemaCacheValue) 
schemaCacheValue.get()).getSnapshootId();
-    }
-
-    @Override
-    public Optional<SchemaCacheValue> initSchema() {
-        Table paimonTable = ((PaimonExternalCatalog) 
catalog).getPaimonTable(dbName, name);
-        TableSchema schema = ((FileStoreTable) paimonTable).schema();
-        List<DataField> columns = schema.fields();
-        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
-        Set<String> partitionColumnNames = 
Sets.newHashSet(paimonTable.partitionKeys());
-        List<Column> partitionColumns = Lists.newArrayList();
-        for (DataField field : columns) {
-            Column column = new Column(field.name().toLowerCase(),
-                    paimonTypeToDorisType(field.type()), true, null, true, 
field.description(), true,
-                    field.id());
-            tmpSchema.add(column);
-            if (partitionColumnNames.contains(field.name())) {
-                partitionColumns.add(column);
-            }
-        }
-        try {
-            // after 0.9.0 paimon will support table.getLatestSnapshotId()
-            long latestSnapshotId = loadLatestSnapshotId();
-            PaimonPartitionInfo partitionInfo = 
loadPartitionInfo(partitionColumns);
-            return Optional.of(new PaimonSchemaCacheValue(tmpSchema, 
partitionColumns, paimonTable, latestSnapshotId,
-                    partitionInfo));
-        } catch (IOException | AnalysisException e) {
-            LOG.warn(e);
-            return Optional.empty();
-        }
-    }
-
-    private long loadLatestSnapshotId() throws IOException {
-        Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
-                name + Catalog.SYSTEM_TABLE_SPLITTER + 
SnapshotsTable.SNAPSHOTS);
-        // snapshotId
-        List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}});
-        long latestSnapshotId = 0L;
-        for (InternalRow row : rows) {
-            long snapshotId = row.getLong(0);
-            if (snapshotId > latestSnapshotId) {
-                latestSnapshotId = snapshotId;
-            }
-        }
-        return latestSnapshotId;
-    }
-
-    private PaimonPartitionInfo loadPartitionInfo(List<Column> 
partitionColumns) throws IOException, AnalysisException {
-        if (CollectionUtils.isEmpty(partitionColumns)) {
-            return new PaimonPartitionInfo();
-        }
-        List<PaimonPartition> paimonPartitions = loadPartitions();
-        return PaimonUtil.generatePartitionInfo(partitionColumns, 
paimonPartitions);
-    }
-
-    private List<PaimonPartition> loadPartitions()
-            throws IOException {
-        Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
-                name + Catalog.SYSTEM_TABLE_SPLITTER + 
PartitionsTable.PARTITIONS);
-        List<InternalRow> rows = PaimonUtil.read(table, null);
-        List<PaimonPartition> res = 
Lists.newArrayListWithCapacity(rows.size());
-        for (InternalRow row : rows) {
-            res.add(PaimonUtil.rowToPartition(row));
-        }
-        return res;
-    }
-
-    private Type 
paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
-        int tsScale = 3; // default
-        switch (dataType.getTypeRoot()) {
-            case BOOLEAN:
-                return Type.BOOLEAN;
-            case INTEGER:
-                return Type.INT;
-            case BIGINT:
-                return Type.BIGINT;
-            case FLOAT:
-                return Type.FLOAT;
-            case DOUBLE:
-                return Type.DOUBLE;
-            case SMALLINT:
-                return Type.SMALLINT;
-            case TINYINT:
-                return Type.TINYINT;
-            case VARCHAR:
-            case BINARY:
-            case CHAR:
-            case VARBINARY:
-                return Type.STRING;
-            case DECIMAL:
-                DecimalType decimal = (DecimalType) dataType;
-                return ScalarType.createDecimalV3Type(decimal.getPrecision(), 
decimal.getScale());
-            case DATE:
-                return ScalarType.createDateV2Type();
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                if (dataType instanceof org.apache.paimon.types.TimestampType) 
{
-                    tsScale = ((org.apache.paimon.types.TimestampType) 
dataType).getPrecision();
-                    if (tsScale > 6) {
-                        tsScale = 6;
-                    }
-                } else if (dataType instanceof 
org.apache.paimon.types.LocalZonedTimestampType) {
-                    tsScale = 
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
-                    if (tsScale > 6) {
-                        tsScale = 6;
-                    }
-                }
-                return ScalarType.createDatetimeV2Type(tsScale);
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                if (dataType instanceof 
org.apache.paimon.types.LocalZonedTimestampType) {
-                    tsScale = 
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
-                    if (tsScale > 6) {
-                        tsScale = 6;
-                    }
-                }
-                return ScalarType.createDatetimeV2Type(tsScale);
-            case ARRAY:
-                ArrayType arrayType = (ArrayType) dataType;
-                Type innerType = 
paimonPrimitiveTypeToDorisType(arrayType.getElementType());
-                return org.apache.doris.catalog.ArrayType.create(innerType, 
true);
-            case MAP:
-                MapType mapType = (MapType) dataType;
-                return new org.apache.doris.catalog.MapType(
-                        paimonTypeToDorisType(mapType.getKeyType()), 
paimonTypeToDorisType(mapType.getValueType()));
-            case ROW:
-                RowType rowType = (RowType) dataType;
-                List<DataField> fields = rowType.getFields();
-                return new org.apache.doris.catalog.StructType(fields.stream()
-                        .map(field -> new 
org.apache.doris.catalog.StructField(field.name(),
-                                paimonTypeToDorisType(field.type())))
-                        .collect(Collectors.toCollection(ArrayList::new)));
-            case TIME_WITHOUT_TIME_ZONE:
-                return Type.UNSUPPORTED;
-            default:
-                LOG.warn("Cannot transform unknown type: " + 
dataType.getTypeRoot());
-                return Type.UNSUPPORTED;
-        }
-    }
-
-    protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType 
type) {
-        return paimonPrimitiveTypeToDorisType(type);
+        return 
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
+                .getPaimonSnapshot(catalog, dbName, name);
     }
 
     @Override
@@ -289,13 +141,6 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
     public long fetchRowCount() {
         makeSureInitialized();
         long rowCount = 0;
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        Table paimonTable = schemaCacheValue.map(value -> 
((PaimonSchemaCacheValue) value).getPaimonTable())
-                .orElse(null);
-        if (paimonTable == null) {
-            LOG.info("Paimon table {} is null.", name);
-            return UNKNOWN_ROW_COUNT;
-        }
         List<Split> splits = 
paimonTable.newReadBuilder().newScan().plan().splits();
         for (Split split : splits) {
             rowCount += split.rowCount();
@@ -314,30 +159,31 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
 
     @Override
     public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
-        return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+        return Maps.newHashMap(getNameToPartitionItems(snapshot));
     }
 
     @Override
     public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
-        return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST 
: PartitionType.UNPARTITIONED;
+        return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST : 
PartitionType.UNPARTITIONED;
     }
 
     @Override
     public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) {
-        return getPartitionColumnsFromCache().stream()
+        return getPartitionColumns(snapshot).stream()
                 .map(c -> 
c.getName().toLowerCase()).collect(Collectors.toSet());
     }
 
     @Override
     public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
-        return getPartitionColumnsFromCache();
+        return getPaimonSchemaCacheValue(snapshot).getPartitionColumns();
     }
 
     @Override
     public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
             Optional<MvccSnapshot> snapshot)
             throws AnalysisException {
-        PaimonPartition paimonPartition = 
getPartitionInfoFromCache().getNameToPartition().get(partitionName);
+        PaimonPartition paimonPartition = 
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartition()
+                .get(partitionName);
         if (paimonPartition == null) {
             throw new AnalysisException("can not find partition: " + 
partitionName);
         }
@@ -347,7 +193,8 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
     @Override
     public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
             throws AnalysisException {
-        return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
+        PaimonSnapshotCacheValue paimonSnapshot = 
getOrFetchSnapshotCacheValue(snapshot);
+        return new 
MTMVVersionSnapshot(paimonSnapshot.getSnapshot().getSnapshotId());
     }
 
     @Override
@@ -359,4 +206,83 @@ public class PaimonExternalTable extends ExternalTable 
implements MTMVRelatedTab
         // The cost is that Paimon partition writes a null value, and the 
materialized view cannot detect this data.
         return true;
     }
+
+    @Override
+    public MvccSnapshot loadSnapshot() {
+        return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue());
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return 
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem();
+    }
+
+    @Override
+    public boolean supportInternalPartitionPruned() {
+        return true;
+    }
+
+    @Override
+    public List<Column> getFullSchema() {
+        return 
getPaimonSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)).getSchema();
+    }
+
+    @Override
+    public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
+        makeSureInitialized();
+        PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key;
+        try {
+            PaimonSchema schema = 
loadPaimonSchemaBySchemaId(paimonSchemaCacheKey);
+            List<DataField> columns = schema.getFields();
+            List<Column> dorisColumns = 
Lists.newArrayListWithCapacity(columns.size());
+            Set<String> partitionColumnNames = 
Sets.newHashSet(schema.getPartitionKeys());
+            List<Column> partitionColumns = Lists.newArrayList();
+            for (DataField field : columns) {
+                Column column = new Column(field.name().toLowerCase(),
+                        PaimonUtil.paimonTypeToDorisType(field.type()), true, 
null, true, field.description(), true,
+                        field.id());
+                dorisColumns.add(column);
+                if (partitionColumnNames.contains(field.name())) {
+                    partitionColumns.add(column);
+                }
+            }
+            return Optional.of(new PaimonSchemaCacheValue(dorisColumns, 
partitionColumns));
+        } catch (Exception e) {
+            throw new CacheException("failed to initSchema for: %s.%s.%s.%s",
+                    null, getCatalog().getName(), key.getDbName(), 
key.getTblName(),
+                    paimonSchemaCacheKey.getSchemaId());
+        }
+
+    }
+
+    private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) 
throws IOException {
+        Table table = ((PaimonExternalCatalog) 
getCatalog()).getPaimonTable(key.getDbName(),
+                name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS);
+        PredicateBuilder builder = new PredicateBuilder(table.rowType());
+        Predicate predicate = builder.equal(0, key.getSchemaId());
+        // Adding predicates will also return excess data
+        List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1}, 
{2}}, predicate);
+        for (InternalRow row : rows) {
+            PaimonSchema schema = PaimonUtil.rowToSchema(row);
+            if (schema.getSchemaId() == key.getSchemaId()) {
+                return schema;
+            }
+        }
+        throw new CacheException("failed to initSchema for: %s.%s.%s.%s",
+                null, getCatalog().getName(), key.getDbName(), 
key.getTblName(), key.getSchemaId());
+    }
+
+    private PaimonSchemaCacheValue 
getPaimonSchemaCacheValue(Optional<MvccSnapshot> snapshot) {
+        PaimonSnapshotCacheValue snapshotCacheValue = 
getOrFetchSnapshotCacheValue(snapshot);
+        return 
getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId());
+    }
+
+    private PaimonSnapshotCacheValue 
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
+        if (snapshot.isPresent()) {
+            return ((PaimonMvccSnapshot) 
snapshot.get()).getSnapshotCacheValue();
+        } else {
+            return getPaimonSnapshotCacheValue();
+        }
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
new file mode 100644
index 00000000000..5b711e07066
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CacheFactory;
+import org.apache.doris.common.Config;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.PartitionsTable;
+import org.apache.paimon.table.system.SnapshotsTable;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+
+public class PaimonMetadataCache {
+
+    private final LoadingCache<PaimonSnapshotCacheKey, 
PaimonSnapshotCacheValue> snapshotCache;
+
+    public PaimonMetadataCache(ExecutorService executor) {
+        CacheFactory snapshotCacheFactory = new CacheFactory(
+                OptionalLong.of(28800L),
+                
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
+                Config.max_external_table_cache_num,
+                true,
+                null);
+        this.snapshotCache = snapshotCacheFactory.buildCache(key -> 
loadSnapshot(key), null, executor);
+    }
+
+    @NotNull
+    private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) {
+        try {
+            PaimonSnapshot latestSnapshot = loadLatestSnapshot(key);
+            PaimonExternalTable table = (PaimonExternalTable) 
key.getCatalog().getDbOrAnalysisException(key.getDbName())
+                    .getTableOrAnalysisException(key.getTableName());
+            List<Column> partitionColumns = 
table.getPaimonSchemaCacheValue(latestSnapshot.getSchemaId())
+                    .getPartitionColumns();
+            PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, 
partitionColumns);
+            return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot);
+        } catch (IOException | AnalysisException e) {
+            throw new CacheException("failed to loadSnapshot for: %s.%s.%s",
+                    e, key.getCatalog().getName(), key.getDbName(), 
key.getTableName());
+        }
+    }
+
+    private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, 
List<Column> partitionColumns)
+            throws IOException, AnalysisException {
+        if (CollectionUtils.isEmpty(partitionColumns)) {
+            return new PaimonPartitionInfo();
+        }
+        List<PaimonPartition> paimonPartitions = loadPartitions(key);
+        return PaimonUtil.generatePartitionInfo(partitionColumns, 
paimonPartitions);
+    }
+
+    private List<PaimonPartition> loadPartitions(PaimonSnapshotCacheKey key)
+            throws IOException {
+        Table table = ((PaimonExternalCatalog) 
key.getCatalog()).getPaimonTable(key.getDbName(),
+                key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + 
PartitionsTable.PARTITIONS);
+        List<InternalRow> rows = PaimonUtil.read(table, null, null);
+        List<PaimonPartition> res = 
Lists.newArrayListWithCapacity(rows.size());
+        for (InternalRow row : rows) {
+            res.add(PaimonUtil.rowToPartition(row));
+        }
+        return res;
+    }
+
+    private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) 
throws IOException {
+        Table table = ((PaimonExternalCatalog) 
key.getCatalog()).getPaimonTable(key.getDbName(),
+                key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + 
SnapshotsTable.SNAPSHOTS);
+        // snapshotId and schemaId
+        List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, 
{1}}, null);
+        long latestSnapshotId = 0L;
+        long latestSchemaId = 0L;
+        for (InternalRow row : rows) {
+            long snapshotId = row.getLong(0);
+            if (snapshotId > latestSnapshotId) {
+                latestSnapshotId = snapshotId;
+                latestSchemaId = row.getLong(1);
+            }
+        }
+        return new PaimonSnapshot(latestSnapshotId, latestSchemaId);
+    }
+
+    public void invalidateCatalogCache(long catalogId) {
+        snapshotCache.asMap().keySet().stream()
+                .filter(key -> key.getCatalog().getId() == catalogId)
+                .forEach(snapshotCache::invalidate);
+    }
+
+    public void invalidateTableCache(long catalogId, String dbName, String 
tblName) {
+        snapshotCache.asMap().keySet().stream()
+                .filter(key -> key.getCatalog().getId() == catalogId && 
key.getDbName().equals(dbName)
+                        && key.getTableName().equals(
+                        tblName))
+                .forEach(snapshotCache::invalidate);
+    }
+
+    public void invalidateDbCache(long catalogId, String dbName) {
+        snapshotCache.asMap().keySet().stream()
+                .filter(key -> key.getCatalog().getId() == catalogId && 
key.getDbName().equals(dbName))
+                .forEach(snapshotCache::invalidate);
+    }
+
+    public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, 
String dbName, String tbName) {
+        PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(catalog, 
dbName, tbName);
+        return snapshotCache.get(key);
+    }
+
+    public Map<String, Map<String, String>> getCacheStats() {
+        Map<String, Map<String, String>> res = Maps.newHashMap();
+        res.put("paimon_snapshot_cache", 
ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(),
+                snapshotCache.estimatedSize()));
+        return res;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
new file mode 100644
index 00000000000..a282fde665b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import java.util.concurrent.ExecutorService;
+
+public class PaimonMetadataCacheMgr {
+
+    private PaimonMetadataCache paimonMetadataCache;
+
+    public PaimonMetadataCacheMgr(ExecutorService executor) {
+        this.paimonMetadataCache = new PaimonMetadataCache(executor);
+    }
+
+    public PaimonMetadataCache getPaimonMetadataCache() {
+        return paimonMetadataCache;
+    }
+
+    public void removeCache(long catalogId) {
+        paimonMetadataCache.invalidateCatalogCache(catalogId);
+    }
+
+    public void invalidateCatalogCache(long catalogId) {
+        paimonMetadataCache.invalidateCatalogCache(catalogId);
+    }
+
+    public void invalidateTableCache(long catalogId, String dbName, String 
tblName) {
+        paimonMetadataCache.invalidateTableCache(catalogId, dbName, tblName);
+    }
+
+    public void invalidateDbCache(long catalogId, String dbName) {
+        paimonMetadataCache.invalidateDbCache(catalogId, dbName);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java
similarity index 50%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java
index 8f54f0834e4..2307e91adb3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java
@@ -17,32 +17,16 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
 
-import com.google.common.collect.Maps;
+public class PaimonMvccSnapshot implements MvccSnapshot {
+    private final PaimonSnapshotCacheValue snapshotCacheValue;
 
-import java.util.Map;
-
-public class PaimonPartitionInfo {
-    private Map<String, PartitionItem> nameToPartitionItem;
-    private Map<String, PaimonPartition> nameToPartition;
-
-    public PaimonPartitionInfo() {
-        this.nameToPartitionItem = Maps.newHashMap();
-        this.nameToPartition = Maps.newHashMap();
-    }
-
-    public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
-            Map<String, PaimonPartition> nameToPartition) {
-        this.nameToPartitionItem = nameToPartitionItem;
-        this.nameToPartition = nameToPartition;
-    }
-
-    public Map<String, PartitionItem> getNameToPartitionItem() {
-        return nameToPartitionItem;
+    public PaimonMvccSnapshot(PaimonSnapshotCacheValue snapshotCacheValue) {
+        this.snapshotCacheValue = snapshotCacheValue;
     }
 
-    public Map<String, PaimonPartition> getNameToPartition() {
-        return nameToPartition;
+    public PaimonSnapshotCacheValue getSnapshotCacheValue() {
+        return snapshotCacheValue;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
index 8f54f0834e4..4d3326f8e48 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
@@ -24,8 +24,8 @@ import com.google.common.collect.Maps;
 import java.util.Map;
 
 public class PaimonPartitionInfo {
-    private Map<String, PartitionItem> nameToPartitionItem;
-    private Map<String, PaimonPartition> nameToPartition;
+    private final Map<String, PartitionItem> nameToPartitionItem;
+    private final Map<String, PaimonPartition> nameToPartition;
 
     public PaimonPartitionInfo() {
         this.nameToPartitionItem = Maps.newHashMap();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
 b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
similarity index 51%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
index 8f54f0834e4..ef26e1ed208 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
@@ -17,32 +17,30 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.catalog.PartitionItem;
+import org.apache.paimon.types.DataField;
 
-import com.google.common.collect.Maps;
+import java.util.List;
 
-import java.util.Map;
+public class PaimonSchema {
+    private final long schemaId;
+    private final List<DataField> fields;
+    private final List<String> partitionKeys;
 
-public class PaimonPartitionInfo {
-    private Map<String, PartitionItem> nameToPartitionItem;
-    private Map<String, PaimonPartition> nameToPartition;
-
-    public PaimonPartitionInfo() {
-        this.nameToPartitionItem = Maps.newHashMap();
-        this.nameToPartition = Maps.newHashMap();
+    public PaimonSchema(long schemaId, List<DataField> fields, List<String> 
partitionKeys) {
+        this.schemaId = schemaId;
+        this.fields = fields;
+        this.partitionKeys = partitionKeys;
     }
 
-    public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
-            Map<String, PaimonPartition> nameToPartition) {
-        this.nameToPartitionItem = nameToPartitionItem;
-        this.nameToPartition = nameToPartition;
+    public long getSchemaId() {
+        return schemaId;
     }
 
-    public Map<String, PartitionItem> getNameToPartitionItem() {
-        return nameToPartitionItem;
+    public List<DataField> getFields() {
+        return fields;
     }
 
-    public Map<String, PaimonPartition> getNameToPartition() {
-        return nameToPartition;
+    public List<String> getPartitionKeys() {
+        return partitionKeys;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java
new file mode 100644
index 00000000000..f74555b369b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
+
+import com.google.common.base.Objects;
+
+public class PaimonSchemaCacheKey extends SchemaCacheKey {
+    private final long schemaId;
+
+    public PaimonSchemaCacheKey(String dbName, String tableName, long 
schemaId) {
+        super(dbName, tableName);
+        this.schemaId = schemaId;
+    }
+
+    public long getSchemaId() {
+        return schemaId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof PaimonSchemaCacheKey)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        PaimonSchemaCacheKey that = (PaimonSchemaCacheKey) o;
+        return schemaId == that.schemaId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(super.hashCode(), schemaId);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
index 20d27b2425d..ccb530a3cbc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
@@ -20,41 +20,18 @@ package org.apache.doris.datasource.paimon;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.datasource.SchemaCacheValue;
 
-import org.apache.paimon.table.Table;
-
 import java.util.List;
 
 public class PaimonSchemaCacheValue extends SchemaCacheValue {
 
-    private Table paimonTable;
     private List<Column> partitionColumns;
-    private PaimonPartitionInfo partitionInfo;
-
-    private long snapshootId;
 
-    public PaimonSchemaCacheValue(List<Column> schema, List<Column> 
partitionColumns, Table paimonTable,
-            long snapshootId,
-            PaimonPartitionInfo partitionInfo) {
+    public PaimonSchemaCacheValue(List<Column> schema, List<Column> 
partitionColumns) {
         super(schema);
         this.partitionColumns = partitionColumns;
-        this.paimonTable = paimonTable;
-        this.snapshootId = snapshootId;
-        this.partitionInfo = partitionInfo;
-    }
-
-    public Table getPaimonTable() {
-        return paimonTable;
     }
 
     public List<Column> getPartitionColumns() {
         return partitionColumns;
     }
-
-    public PaimonPartitionInfo getPartitionInfo() {
-        return partitionInfo;
-    }
-
-    public long getSnapshootId() {
-        return snapshootId;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
similarity index 50%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
index 8f54f0834e4..4a536dd72cc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
@@ -17,32 +17,20 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.catalog.PartitionItem;
+public class PaimonSnapshot {
+    private final long snapshotId;
+    private final long schemaId;
 
-import com.google.common.collect.Maps;
-
-import java.util.Map;
-
-public class PaimonPartitionInfo {
-    private Map<String, PartitionItem> nameToPartitionItem;
-    private Map<String, PaimonPartition> nameToPartition;
-
-    public PaimonPartitionInfo() {
-        this.nameToPartitionItem = Maps.newHashMap();
-        this.nameToPartition = Maps.newHashMap();
-    }
-
-    public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
-            Map<String, PaimonPartition> nameToPartition) {
-        this.nameToPartitionItem = nameToPartitionItem;
-        this.nameToPartition = nameToPartition;
+    public PaimonSnapshot(long snapshotId, long schemaId) {
+        this.snapshotId = snapshotId;
+        this.schemaId = schemaId;
     }
 
-    public Map<String, PartitionItem> getNameToPartitionItem() {
-        return nameToPartitionItem;
+    public long getSnapshotId() {
+        return snapshotId;
     }
 
-    public Map<String, PaimonPartition> getNameToPartition() {
-        return nameToPartition;
+    public long getSchemaId() {
+        return schemaId;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
new file mode 100644
index 00000000000..970f111a721
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.CatalogIf;
+
+import java.util.Objects;
+import java.util.StringJoiner;
+
+public class PaimonSnapshotCacheKey {
+    private final CatalogIf catalog;
+    private final String dbName;
+    private final String tableName;
+
+    public PaimonSnapshotCacheKey(CatalogIf catalog, String dbName, String 
tableName) {
+        this.catalog = catalog;
+        this.dbName = dbName;
+        this.tableName = tableName;
+    }
+
+    public CatalogIf getCatalog() {
+        return catalog;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o;
+        return catalog.getId() == that.catalog.getId()
+                && Objects.equals(dbName, that.dbName)
+                && Objects.equals(tableName, that.tableName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(catalog.getId(), dbName, tableName);
+    }
+
+    @Override
+    public String toString() {
+        return new StringJoiner(", ", 
PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]")
+                .add("catalog=" + catalog)
+                .add("dbName='" + dbName + "'")
+                .add("tableName='" + tableName + "'")
+                .toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java
similarity index 51%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java
index 20d27b2425d..c50ecdabfde 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java
@@ -17,44 +17,21 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+public class PaimonSnapshotCacheValue {
 
-import org.apache.paimon.table.Table;
+    private final PaimonPartitionInfo partitionInfo;
+    private final PaimonSnapshot snapshot;
 
-import java.util.List;
-
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
-
-    private Table paimonTable;
-    private List<Column> partitionColumns;
-    private PaimonPartitionInfo partitionInfo;
-
-    private long snapshootId;
-
-    public PaimonSchemaCacheValue(List<Column> schema, List<Column> 
partitionColumns, Table paimonTable,
-            long snapshootId,
-            PaimonPartitionInfo partitionInfo) {
-        super(schema);
-        this.partitionColumns = partitionColumns;
-        this.paimonTable = paimonTable;
-        this.snapshootId = snapshootId;
+    public PaimonSnapshotCacheValue(PaimonPartitionInfo partitionInfo, 
PaimonSnapshot snapshot) {
         this.partitionInfo = partitionInfo;
-    }
-
-    public Table getPaimonTable() {
-        return paimonTable;
-    }
-
-    public List<Column> getPartitionColumns() {
-        return partitionColumns;
+        this.snapshot = snapshot;
     }
 
     public PaimonPartitionInfo getPartitionInfo() {
         return partitionInfo;
     }
 
-    public long getSnapshootId() {
-        return snapshootId;
+    public PaimonSnapshot getSnapshot() {
+        return snapshot;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
index 8b7017cac29..1f7576dca51 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.datasource.hive.HiveUtil;
@@ -30,12 +31,22 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Projection;
 
@@ -48,8 +59,11 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 public class PaimonUtil {
+    private static final Logger LOG = LogManager.getLogger(PaimonUtil.class);
+
     public static List<InternalRow> read(
-            Table table, @Nullable int[][] projection, Pair<ConfigOption<?>, 
String>... dynamicOptions)
+            Table table, @Nullable int[][] projection, @Nullable Predicate 
predicate,
+            Pair<ConfigOption<?>, String>... dynamicOptions)
             throws IOException {
         Map<String, String> options = new HashMap<>();
         for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
@@ -60,6 +74,9 @@ public class PaimonUtil {
         if (projection != null) {
             readBuilder.withProjection(projection);
         }
+        if (predicate != null) {
+            readBuilder.withFilter(predicate);
+        }
         RecordReader<InternalRow> reader =
                 
readBuilder.newRead().createReader(readBuilder.newScan().plan());
         InternalRowSerializer serializer =
@@ -152,4 +169,107 @@ public class PaimonUtil {
         ListPartitionItem listPartitionItem = new 
ListPartitionItem(Lists.newArrayList(key));
         return listPartitionItem;
     }
+
+    private static Type 
paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
+        int tsScale = 3; // default
+        switch (dataType.getTypeRoot()) {
+            case BOOLEAN:
+                return Type.BOOLEAN;
+            case INTEGER:
+                return Type.INT;
+            case BIGINT:
+                return Type.BIGINT;
+            case FLOAT:
+                return Type.FLOAT;
+            case DOUBLE:
+                return Type.DOUBLE;
+            case SMALLINT:
+                return Type.SMALLINT;
+            case TINYINT:
+                return Type.TINYINT;
+            case VARCHAR:
+            case BINARY:
+            case CHAR:
+            case VARBINARY:
+                return Type.STRING;
+            case DECIMAL:
+                DecimalType decimal = (DecimalType) dataType;
+                return ScalarType.createDecimalV3Type(decimal.getPrecision(), 
decimal.getScale());
+            case DATE:
+                return ScalarType.createDateV2Type();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                if (dataType instanceof org.apache.paimon.types.TimestampType) 
{
+                    tsScale = ((org.apache.paimon.types.TimestampType) 
dataType).getPrecision();
+                    if (tsScale > 6) {
+                        tsScale = 6;
+                    }
+                } else if (dataType instanceof 
org.apache.paimon.types.LocalZonedTimestampType) {
+                    tsScale = 
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
+                    if (tsScale > 6) {
+                        tsScale = 6;
+                    }
+                }
+                return ScalarType.createDatetimeV2Type(tsScale);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (dataType instanceof 
org.apache.paimon.types.LocalZonedTimestampType) {
+                    tsScale = 
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
+                    if (tsScale > 6) {
+                        tsScale = 6;
+                    }
+                }
+                return ScalarType.createDatetimeV2Type(tsScale);
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) dataType;
+                Type innerType = 
paimonPrimitiveTypeToDorisType(arrayType.getElementType());
+                return org.apache.doris.catalog.ArrayType.create(innerType, 
true);
+            case MAP:
+                MapType mapType = (MapType) dataType;
+                return new org.apache.doris.catalog.MapType(
+                        paimonTypeToDorisType(mapType.getKeyType()), 
paimonTypeToDorisType(mapType.getValueType()));
+            case ROW:
+                RowType rowType = (RowType) dataType;
+                List<DataField> fields = rowType.getFields();
+                return new org.apache.doris.catalog.StructType(fields.stream()
+                        .map(field -> new 
org.apache.doris.catalog.StructField(field.name(),
+                                paimonTypeToDorisType(field.type())))
+                        .collect(Collectors.toCollection(ArrayList::new)));
+            case TIME_WITHOUT_TIME_ZONE:
+                return Type.UNSUPPORTED;
+            default:
+                LOG.warn("Cannot transform unknown type: " + 
dataType.getTypeRoot());
+                return Type.UNSUPPORTED;
+        }
+    }
+
+    public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType 
type) {
+        return paimonPrimitiveTypeToDorisType(type);
+    }
+
+    /**
+     * 
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table
+     * demo:
+     * 0
+     * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"},
+     * {"id":1,"name":"item_id","type":"BIGINT"},
+     * {"id":2,"name":"behavior","type":"STRING"},
+     * {"id":3,"name":"dt","type":"STRING NOT NULL"},
+     * {"id":4,"name":"hh","type":"STRING NOT NULL"}]
+     * ["dt"]
+     * ["dt","hh","user_id"]
+     * {"owner":"hadoop","provider":"paimon"}
+     * 2024-12-03 15:38:14.734
+     *
+     * @param row
+     * @return
+     */
+    public static PaimonSchema rowToSchema(InternalRow row) {
+        long schemaId = row.getLong(0);
+        String fieldsStr = row.getString(1).toString();
+        String partitionKeysStr = row.getString(2).toString();
+        List<DataField> fields = JsonSerdeUtil.fromJson(fieldsStr, new 
TypeReference<List<DataField>>() {
+        });
+        List<String> partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr, 
new TypeReference<List<String>>() {
+        });
+        return new PaimonSchema(schemaId, fields, partitionKeys);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index 885eba06ed9..a8bb814f1d3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.property.constants.PaimonProperties;
 import org.apache.doris.thrift.TFileAttributes;
@@ -36,7 +37,7 @@ public class PaimonSource {
     public PaimonSource(TupleDescriptor desc) {
         this.desc = desc;
         this.paimonExtTable = (PaimonExternalTable) desc.getTable();
-        this.originTable = paimonExtTable.getPaimonTable();
+        this.originTable = 
paimonExtTable.getPaimonTable(MvccUtil.getSnapshotFromContext(paimonExtTable));
     }
 
     public TupleDescriptor getDesc() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 68ed0cc9b23..353e024ed97 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -29,6 +29,9 @@ import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccTable;
+import org.apache.doris.datasource.mvcc.MvccTableInfo;
 import org.apache.doris.job.common.TaskStatus;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
@@ -70,6 +73,7 @@ import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -141,6 +145,8 @@ public class MTMVTask extends AbstractTask {
     private StmtExecutor executor;
     private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
 
+    private final Map<MvccTableInfo, MvccSnapshot> snapshots = 
Maps.newHashMap();
+
     public MTMVTask() {
     }
 
@@ -218,6 +224,9 @@ public class MTMVTask extends AbstractTask {
             throws Exception {
         ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
         StatementContext statementContext = new StatementContext();
+        for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) {
+            statementContext.setSnapshot(entry.getKey(), entry.getValue());
+        }
         ctx.setStatementContext(statementContext);
         TUniqueId queryId = generateQueryId();
         lastQueryId = DebugUtil.printId(queryId);
@@ -305,6 +314,11 @@ public class MTMVTask extends AbstractTask {
                 MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf;
                 baseTableIf.beforeMTMVRefresh(mtmv);
             }
+            if (tableIf instanceof MvccTable) {
+                MvccTable mvccTable = (MvccTable) tableIf;
+                MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot();
+                snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot);
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index 008a2c8ac70..f21600ac585 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -555,7 +555,11 @@ public class StatementContext implements Closeable {
         }
         for (TableIf tableIf : tables.values()) {
             if (tableIf instanceof MvccTable) {
-                snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) 
tableIf).loadSnapshot());
+                MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf);
+                // may be set by MTMV, we can not load again
+                if (!snapshots.containsKey(mvccTableInfo)) {
+                    snapshots.put(mvccTableInfo, ((MvccTable) 
tableIf).loadSnapshot());
+                }
             }
         }
     }
@@ -563,11 +567,25 @@ public class StatementContext implements Closeable {
     /**
      * Obtain snapshot information of mvcc
      *
-     * @param mvccTable mvccTable
+     * @param tableIf tableIf
      * @return MvccSnapshot
      */
-    public MvccSnapshot getSnapshot(MvccTable mvccTable) {
-        return snapshots.get(new MvccTableInfo(mvccTable));
+    public Optional<MvccSnapshot> getSnapshot(TableIf tableIf) {
+        if (!(tableIf instanceof MvccTable)) {
+            return Optional.empty();
+        }
+        MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf);
+        return Optional.ofNullable(snapshots.get(mvccTableInfo));
+    }
+
+    /**
+     * Obtain snapshot information of mvcc
+     *
+     * @param mvccTableInfo mvccTableInfo
+     * @param snapshot snapshot
+     */
+    public void setSnapshot(MvccTableInfo mvccTableInfo, MvccSnapshot 
snapshot) {
+        snapshots.put(mvccTableInfo, snapshot);
     }
 
     private static class CloseableResource implements Closeable {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
index ba8b270d1f3..e99906f5e13 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
@@ -36,7 +36,6 @@ import org.apache.commons.collections.CollectionUtils;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -74,8 +73,8 @@ public class PruneFileScanPartition extends 
OneRewriteRuleFactory {
     private SelectedPartitions pruneExternalPartitions(ExternalTable 
externalTable,
             LogicalFilter<LogicalFileScan> filter, LogicalFileScan scan, 
CascadesContext ctx) {
         Map<String, PartitionItem> selectedPartitionItems = Maps.newHashMap();
-        // todo: real snapshotId
-        if 
(CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) {
+        if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(
+                ctx.getStatementContext().getSnapshot(externalTable)))) {
             // non partitioned table, return NOT_PRUNED.
             // non partition table will be handled in HiveScanNode.
             return SelectedPartitions.NOT_PRUNED;
@@ -83,8 +82,8 @@ public class PruneFileScanPartition extends 
OneRewriteRuleFactory {
         Map<String, Slot> scanOutput = scan.getOutput()
                 .stream()
                 .collect(Collectors.toMap(slot -> 
slot.getName().toLowerCase(), Function.identity()));
-        // todo: real snapshotId
-        List<Slot> partitionSlots = 
externalTable.getPartitionColumns(Optional.empty())
+        List<Slot> partitionSlots = externalTable.getPartitionColumns(
+                        ctx.getStatementContext().getSnapshot(externalTable))
                 .stream()
                 .map(column -> scanOutput.get(column.getName().toLowerCase()))
                 .collect(Collectors.toList());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index 869c2d0b38b..b5be72950cd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.nereids.analyzer.UnboundRelation;
@@ -315,10 +316,8 @@ public class UpdateMvByPartitionCommand extends 
InsertOverwriteTableCommand {
                         }
                         if (targetTable instanceof ExternalTable) {
                             // Add filter only when partition has data when 
external table
-                            // TODO: 2024/12/4 real snapshot
-                            partitionHasDataItems.add(
-                                    ((ExternalTable) 
targetTable).getNameToPartitionItems(Optional.empty())
-                                            .get(partitionName));
+                            partitionHasDataItems.add(((ExternalTable) 
targetTable).getNameToPartitionItems(
+                                    
MvccUtil.getSnapshotFromContext(targetTable)).get(partitionName));
                         }
                     }
                     if (partitionHasDataItems.isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
index 96b8e032d11..1f5f71f7baf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.logical;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.TableSample;
@@ -60,10 +61,10 @@ public class LogicalFileScan extends LogicalCatalogRelation 
{
     }
 
     public LogicalFileScan(RelationId id, ExternalTable table, List<String> 
qualifier,
-                           Optional<TableSample> tableSample, 
Optional<TableSnapshot> tableSnapshot) {
-        // todo: real snapshotId
+            Optional<TableSample> tableSample, Optional<TableSnapshot> 
tableSnapshot) {
         this(id, table, qualifier, Optional.empty(), Optional.empty(),
-                table.initSelectedPartitions(Optional.empty()), tableSample, 
tableSnapshot);
+                
table.initSelectedPartitions(MvccUtil.getSnapshotFromContext(table)),
+                tableSample, tableSnapshot);
     }
 
     public SelectedPartitions getSelectedPartitions() {
diff --git a/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out 
b/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out
new file mode 100644
index 00000000000..63bda82c1db
--- /dev/null
+++ b/regression-test/data/mtmv_p0/test_paimon_rewrite_mtmv.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !refresh_one_partition --
+a      10
+
+-- !refresh_one_partition_rewrite --
+a      10
+b      10
+
+-- !refresh_auto --
+a      10
+b      10
+
+-- !refresh_all_partition_rewrite --
+a      10
+b      10
+
diff --git a/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy
new file mode 100644
index 00000000000..985443875c7
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_paimon_rewrite_mtmv.groovy
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_rewrite_mtmv", 
"p0,external,mtmv,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disabled paimon test")
+        return
+    }
+    String suiteName = "test_paimon_rewrite_mtmv"
+    String catalogName = "${suiteName}_catalog"
+    String mvName = "${suiteName}_mv"
+    String dbName = context.config.getDbNameByFile(context.file)
+
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+    sql """set materialized_view_rewrite_enable_contain_external_table=true;"""
+    String mvSql = "SELECT par,count(*) as num FROM 
${catalogName}.`test_paimon_spark`.test_tb_mix_format group by par;";
+
+    sql """drop catalog if exists ${catalogName}"""
+    sql """CREATE CATALOG ${catalogName} PROPERTIES (
+            'type'='paimon',
+            'warehouse' = 's3://warehouse/wh/',
+            "s3.access_key" = "admin",
+            "s3.secret_key" = "password",
+            "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+            "s3.region" = "us-east-1"
+        );"""
+
+    sql """analyze table ${catalogName}.`test_paimon_spark`.test_tb_mix_format 
with sync"""
+    sql """alter table ${catalogName}.`test_paimon_spark`.test_tb_mix_format 
modify column par set stats ('row_count'='20');"""
+
+    sql """drop materialized view if exists ${mvName};"""
+
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            BUILD DEFERRED REFRESH AUTO ON MANUAL
+            partition by(`par`)
+            DISTRIBUTED BY RANDOM BUCKETS 2
+            PROPERTIES ('replication_num' = '1')
+            AS
+             ${mvSql}
+        """
+    def showPartitionsResult = sql """show partitions from ${mvName}"""
+    logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+    assertTrue(showPartitionsResult.toString().contains("p_a"))
+    assertTrue(showPartitionsResult.toString().contains("p_b"))
+
+    // refresh one partitions
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a);
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
+
+    def explainOnePartition = sql """ explain  ${mvSql} """
+    logger.info("explainOnePartition: " + explainOnePartition.toString())
+    assertTrue(explainOnePartition.toString().contains("VUNION"))
+    order_qt_refresh_one_partition_rewrite "${mvSql}"
+
+    mv_rewrite_success("${mvSql}", "${mvName}")
+
+    //refresh auto
+    sql """
+            REFRESH MATERIALIZED VIEW ${mvName} auto
+        """
+    waitingMTMVTaskFinishedByMvName(mvName)
+    order_qt_refresh_auto "SELECT * FROM ${mvName} "
+
+    def explainAllPartition = sql """ explain  ${mvSql}; """
+    logger.info("explainAllPartition: " + explainAllPartition.toString())
+    assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+    order_qt_refresh_all_partition_rewrite "${mvSql}"
+
+    mv_rewrite_success("${mvSql}", "${mvName}")
+
+    sql """drop materialized view if exists ${mvName};"""
+    sql """drop catalog if exists ${catalogName}"""
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to