This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7f409595b32 [fix](Hudi-mtmv) Support asynchronous materialized view
partition refresh feature for Hudi external tables. (#49956)
7f409595b32 is described below
commit 7f409595b322798493ef771a9dccb2569885ead2
Author: Tiewei Fang <[email protected]>
AuthorDate: Wed Apr 23 13:52:11 2025 +0800
[fix](Hudi-mtmv) Support asynchronous materialized view partition refresh
feature for Hudi external tables. (#49956)
Problem Summary:
Support asynchronous materialized view partition refresh feature for
Hudi external tables.
---
.../doris/datasource/ExternalMetaCacheMgr.java | 3 +-
.../org/apache/doris/datasource/ExternalTable.java | 2 +-
.../doris/datasource/TablePartitionValues.java | 32 +--
.../apache/doris/datasource/hive/HMSDlaTable.java | 79 +++++++
.../doris/datasource/hive/HMSExternalTable.java | 160 ++++++-------
.../apache/doris/datasource/hive/HiveDlaTable.java | 141 ++++++++++++
.../datasource/hive/HiveMetaStoreClientHelper.java | 5 +-
.../apache/doris/datasource/hive/HudiDlaTable.java | 123 ++++++++++
.../doris/datasource/hive/source/HiveScanNode.java | 3 +-
.../doris/datasource/hudi/HudiMvccSnapshot.java | 74 ++++++
.../doris/datasource/hudi/HudiSchemaCacheKey.java | 82 +++++++
.../apache/doris/datasource/hudi/HudiUtils.java | 14 +-
.../hudi/source/HudiCachedPartitionProcessor.java | 30 ++-
.../doris/datasource/hudi/source/HudiScanNode.java | 19 +-
.../datasource/iceberg/IcebergExternalTable.java | 3 +-
.../maxcompute/MaxComputeExternalTable.java | 2 +-
.../{MvccTable.java => EmptyMvccSnapshot.java} | 14 +-
.../apache/doris/datasource/mvcc/MvccTable.java | 5 +-
.../datasource/paimon/PaimonExternalTable.java | 3 +-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 3 +-
.../org/apache/doris/nereids/NereidsPlanner.java | 1 -
.../org/apache/doris/nereids/StatementContext.java | 5 +-
.../doris/nereids/rules/analysis/BindRelation.java | 1 +
.../doris/tablefunction/MetadataGenerator.java | 3 +-
.../doris/datasource/hudi/HudiUtilsTest.java | 4 +-
.../apache/doris/external/hms/HmsCatalogTest.java | 5 +-
.../org/apache/doris/qe/HmsQueryCacheTest.java | 12 +-
.../hudi/hudi_mtmv/test_hudi_mtmv.out | Bin 0 -> 8110 bytes
.../hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out | Bin 0 -> 5610 bytes
.../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out | Bin 0 -> 250 bytes
.../hudi/hudi_mtmv/test_hudi_mtmv.groovy | 256 +++++++++++++++++++++
.../hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy | 108 +++++++++
.../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy | 91 ++++++++
33 files changed, 1111 insertions(+), 172 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index c2f50f929f8..3e0c4d90859 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -34,6 +34,7 @@ import
org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
import org.apache.doris.datasource.metacache.MetaCache;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonMetadataCache;
import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
import org.apache.doris.fs.FileSystemCache;
@@ -292,7 +293,7 @@ public class ExternalMetaCacheMgr {
if (metaCache != null) {
List<Type> partitionColumnTypes;
try {
- partitionColumnTypes = table.getPartitionColumnTypes();
+ partitionColumnTypes =
table.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(table));
} catch (NotSupportedException e) {
LOG.warn("Ignore not supported hms table, message: {} ",
e.getMessage());
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 30bf48c3d8b..60a1f172978 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -391,7 +391,7 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
throw new NotImplementedException("getChunkSized not implemented");
}
- protected Optional<SchemaCacheValue> getSchemaCacheValue() {
+ public Optional<SchemaCacheValue> getSchemaCacheValue() {
ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
return cache.getSchemaValue(dbName, name);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java
index c7f2ce6f712..e928d3c739e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/TablePartitionValues.java
@@ -51,6 +51,7 @@ public class TablePartitionValues {
private long nextPartitionId;
private final Map<Long, PartitionItem> idToPartitionItem;
private final Map<String, Long> partitionNameToIdMap;
+ private Map<String, Long> partitionNameToLastModifiedMap;
private final Map<Long, String> partitionIdToNameMap;
private Map<Long, List<UniqueId>> idToUniqueIdsMap;
@@ -68,15 +69,12 @@ public class TablePartitionValues {
nextPartitionId = 0;
idToPartitionItem = new HashMap<>();
partitionNameToIdMap = new HashMap<>();
+ partitionNameToLastModifiedMap = new HashMap<>();
partitionIdToNameMap = new HashMap<>();
}
- public TablePartitionValues(List<String> partitionNames,
List<List<String>> partitionValues, List<Type> types) {
- this();
- addPartitions(partitionNames, partitionValues, types);
- }
-
- public void addPartitions(List<String> partitionNames, List<List<String>>
partitionValues, List<Type> types) {
+ public void addPartitions(List<String> partitionNames, List<List<String>>
partitionValues, List<Type> types,
+ List<Long> partitionLastUpdateTimestamp) {
Preconditions.checkState(partitionNames.size() ==
partitionValues.size());
List<String> addPartitionNames = new ArrayList<>();
List<PartitionItem> addPartitionItems = new ArrayList<>();
@@ -90,6 +88,7 @@ public class TablePartitionValues {
addPartitionNames.add(partitionNames.get(i));
addPartitionItems.add(toListPartitionItem(partitionValues.get(i), types));
}
+ partitionNameToLastModifiedMap.put(partitionNames.get(i),
partitionLastUpdateTimestamp.get(i));
}
cleanPartitions();
@@ -123,23 +122,6 @@ public class TablePartitionValues {
partitionValuesMap =
ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
}
- public void dropPartitions(List<String> partitionNames, List<Type> types) {
- partitionNames.forEach(p -> {
- Long removedPartition = partitionNameToIdMap.get(p);
- if (removedPartition != null) {
- idToPartitionItem.remove(removedPartition);
- }
- });
- List<String> remainingPartitionNames = new ArrayList<>();
- List<PartitionItem> remainingPartitionItems = new ArrayList<>();
- partitionNameToIdMap.forEach((partitionName, partitionId) -> {
- remainingPartitionNames.add(partitionName);
- remainingPartitionItems.add(idToPartitionItem.get(partitionId));
- });
- cleanPartitions();
- addPartitionItems(remainingPartitionNames, remainingPartitionItems,
types);
- }
-
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
@@ -148,6 +130,10 @@ public class TablePartitionValues {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
+ public Map<String, Long> getPartitionNameToLastModifiedMap() {
+ return partitionNameToLastModifiedMap;
+ }
+
public Lock readLock() {
return readWriteLock.readLock();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
new file mode 100644
index 00000000000..1710646ce3d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVBaseTableIf;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This abstract class represents a Hive Metastore (HMS) Dla Table and
provides a blueprint for
+ * various operations related to metastore tables in Doris.
+ *
+ * Purpose:
+ * - To encapsulate common functionalities that HMS Dla tables should have for
implementing other interfaces
+ *
+ * Why needed:
+ * - To provide a unified way to manage and interact with different kinds of
Dla Table
+ * - To facilitate the implementation of multi-table materialized views (MTMV)
by providing necessary
+ * methods for snapshot and partition management.
+ * - To abstract out the specific details of HMS table operations, making the
code more modular and maintainable.
+ */
+public abstract class HMSDlaTable implements MTMVBaseTableIf {
+ protected HMSExternalTable hmsTable;
+
+ public HMSDlaTable(HMSExternalTable table) {
+ this.hmsTable = table;
+ }
+
+ abstract Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
+ throws AnalysisException;
+
+ abstract PartitionType getPartitionType(Optional<MvccSnapshot> snapshot);
+
+ abstract Set<String> getPartitionColumnNames(Optional<MvccSnapshot>
snapshot);
+
+ abstract List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot);
+
+ abstract MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
+ Optional<MvccSnapshot> snapshot) throws AnalysisException;
+
+ abstract MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
+ throws AnalysisException;
+
+ abstract boolean isPartitionColumnAllowNull();
+
+ @Override
+ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+ Env.getCurrentEnv().getRefreshManager()
+ .refreshTable(hmsTable.getCatalog().getName(),
hmsTable.getDbName(), hmsTable.getName(), true);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 77c7ce942aa..1d520fc4178 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -31,21 +31,25 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
+import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
+import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fs.FileSystemDirectoryLister;
import org.apache.doris.mtmv.MTMVBaseTableIf;
-import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
-import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.qe.GlobalVariable;
@@ -106,7 +110,7 @@ import java.util.stream.Collectors;
/**
* Hive metastore external table.
*/
-public class HMSExternalTable extends ExternalTable implements
MTMVRelatedTableIf, MTMVBaseTableIf {
+public class HMSExternalTable extends ExternalTable implements
MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
private static final Logger LOG =
LogManager.getLogger(HMSExternalTable.class);
public static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
@@ -168,6 +172,8 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
private DLAType dlaType = DLAType.UNKNOWN;
+ private HMSDlaTable dlaTable;
+
// record the event update time when enable hms event listener
protected volatile long eventUpdateTime;
@@ -205,10 +211,13 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
} else {
if (supportedIcebergTable()) {
dlaType = DLAType.ICEBERG;
+ dlaTable = new HiveDlaTable(this);
} else if (supportedHoodieTable()) {
dlaType = DLAType.HUDI;
+ dlaTable = new HudiDlaTable(this);
} else if (supportedHiveTable()) {
dlaType = DLAType.HIVE;
+ dlaTable = new HiveDlaTable(this);
} else {
// Should not reach here. Because `supportedHiveTable`
will throw exception if not return true.
throw new NotSupportedException("Unsupported dlaType for
table: " + getNameWithFullQualifiers());
@@ -299,23 +308,45 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return remoteTable;
}
- public List<Type> getPartitionColumnTypes() {
+ @Override
+ public List<Column> getFullSchema() {
makeSureInitialized();
+ ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+ if (getDlaType() == DLAType.HUDI) {
+ return ((HudiDlaTable)
dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this))
+ .getSchema();
+ }
+ Optional<SchemaCacheValue> schemaCacheValue =
cache.getSchemaValue(dbName, name);
+ return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
+ }
+
+ public List<Type> getPartitionColumnTypes(Optional<MvccSnapshot> snapshot)
{
+ makeSureInitialized();
+ if (getDlaType() == DLAType.HUDI) {
+ return ((HudiDlaTable)
dlaTable).getHudiSchemaCacheValue(snapshot).getPartitionColTypes();
+ }
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((HMSSchemaCacheValue)
value).getPartitionColTypes())
.orElse(Collections.emptyList());
}
- public List<Column> getPartitionColumns() {
+ public List<Type> getHudiPartitionColumnTypes(long timestamp) {
makeSureInitialized();
- Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
- return schemaCacheValue.map(value -> ((HMSSchemaCacheValue)
value).getPartitionColumns())
+ ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+ Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+ new HudiSchemaCacheKey(dbName, name, timestamp));
+ return schemaCacheValue.map(value -> ((HMSSchemaCacheValue)
value).getPartitionColTypes())
.orElse(Collections.emptyList());
}
+ public List<Column> getPartitionColumns() {
+ return getPartitionColumns(MvccUtil.getSnapshotFromContext(this));
+ }
+
@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
- return getPartitionColumns();
+ makeSureInitialized();
+ return dlaTable.getPartitionColumns(snapshot);
}
@Override
@@ -355,7 +386,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) this.getCatalog());
- List<Type> partitionColumnTypes = this.getPartitionColumnTypes();
+ List<Type> partitionColumnTypes =
this.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(this));
HiveMetaStoreCache.HivePartitionValues hivePartitionValues =
cache.getPartitionValues(
this.getDbName(), this.getName(), partitionColumnTypes);
Map<Long, PartitionItem> idToPartitionItem =
hivePartitionValues.getIdToPartitionItem();
@@ -564,10 +595,6 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
@Override
public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey
key) {
- return initSchemaAndUpdateTime();
- }
-
- public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
org.apache.hadoop.hive.metastore.api.Table table =
((HMSExternalCatalog) catalog).getClient()
.getTable(dbName, name);
// try to use transient_lastDdlTime from hms client
@@ -576,7 +603,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
?
Long.parseLong(table.getParameters().get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) *
1000
// use current timestamp if lastDdlTime does not exist (hive
views don't have this prop)
: System.currentTimeMillis();
- return initSchema();
+ return initSchema(key);
}
public long getLastDdlTime() {
@@ -589,12 +616,12 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
@Override
- public Optional<SchemaCacheValue> initSchema() {
+ public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
makeSureInitialized();
if (dlaType.equals(DLAType.ICEBERG)) {
return getIcebergSchema();
} else if (dlaType.equals(DLAType.HUDI)) {
- return getHudiSchema();
+ return getHudiSchema(key);
} else {
return getHiveSchema();
}
@@ -606,11 +633,12 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}
- private Optional<SchemaCacheValue> getHudiSchema() {
+ private Optional<SchemaCacheValue> getHudiSchema(SchemaCacheKey key) {
boolean[] enableSchemaEvolution = {false};
- InternalSchema hudiInternalSchema =
HiveMetaStoreClientHelper.getHudiTableSchema(this, enableSchemaEvolution);
+ HudiSchemaCacheKey hudiSchemaCacheKey = (HudiSchemaCacheKey) key;
+ InternalSchema hudiInternalSchema =
HiveMetaStoreClientHelper.getHudiTableSchema(this, enableSchemaEvolution,
+ Long.toString(hudiSchemaCacheKey.getTimestamp()));
org.apache.avro.Schema hudiSchema =
AvroInternalSchemaConverter.convert(hudiInternalSchema, name);
-
List<Column> tmpSchema =
Lists.newArrayListWithCapacity(hudiSchema.getFields().size());
List<String> colTypes = Lists.newArrayList();
for (int i = 0; i < hudiSchema.getFields().size(); i++) {
@@ -874,96 +902,46 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
- return getPartitionType();
+ makeSureInitialized();
+ return dlaTable.getPartitionType(snapshot);
}
- public PartitionType getPartitionType() {
- return getPartitionColumns().size() > 0 ? PartitionType.LIST :
PartitionType.UNPARTITIONED;
+ public Set<String> getPartitionColumnNames() {
+ return getPartitionColumnNames(MvccUtil.getSnapshotFromContext(this));
}
@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot>
snapshot) {
- return getPartitionColumnNames();
- }
-
- public Set<String> getPartitionColumnNames() {
- return getPartitionColumns().stream()
- .map(c ->
c.getName().toLowerCase()).collect(Collectors.toSet());
+ makeSureInitialized();
+ return dlaTable.getPartitionColumnNames(snapshot);
}
@Override
- public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
- return getNameToPartitionItems();
+ public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
+ throws AnalysisException {
+ makeSureInitialized();
+ return dlaTable.getAndCopyPartitionItems(snapshot);
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException {
- HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
- .getMetaStoreCache((HMSExternalCatalog) getCatalog());
- HiveMetaStoreCache.HivePartitionValues hivePartitionValues =
cache.getPartitionValues(
- getDbName(), getName(), getPartitionColumnTypes());
- Long partitionId =
getPartitionIdByNameOrAnalysisException(partitionName, hivePartitionValues);
- HivePartition hivePartition =
getHivePartitionByIdOrAnalysisException(partitionId,
- hivePartitionValues, cache);
- return new MTMVTimestampSnapshot(hivePartition.getLastModifiedTime());
+ makeSureInitialized();
+ return dlaTable.getPartitionSnapshot(partitionName, context, snapshot);
}
@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
- if (getPartitionType() == PartitionType.UNPARTITIONED) {
- return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
- }
- HivePartition maxPartition = null;
- long maxVersionTime = 0L;
- long visibleVersionTime;
- HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
- .getMetaStoreCache((HMSExternalCatalog) getCatalog());
- HiveMetaStoreCache.HivePartitionValues hivePartitionValues =
cache.getPartitionValues(
- getDbName(), getName(), getPartitionColumnTypes());
- List<HivePartition> partitionList =
cache.getAllPartitionsWithCache(getDbName(), getName(),
-
Lists.newArrayList(hivePartitionValues.getPartitionValuesMap().values()));
- if (CollectionUtils.isEmpty(partitionList)) {
- throw new AnalysisException("partitionList is empty, table name: "
+ getName());
- }
- for (HivePartition hivePartition : partitionList) {
- visibleVersionTime = hivePartition.getLastModifiedTime();
- if (visibleVersionTime > maxVersionTime) {
- maxVersionTime = visibleVersionTime;
- maxPartition = hivePartition;
- }
- }
- return new
MTMVMaxTimestampSnapshot(maxPartition.getPartitionName(getPartitionColumns()),
maxVersionTime);
- }
-
- private Long getPartitionIdByNameOrAnalysisException(String partitionName,
- HiveMetaStoreCache.HivePartitionValues hivePartitionValues)
- throws AnalysisException {
- Long partitionId =
hivePartitionValues.getPartitionNameToIdMap().get(partitionName);
- if (partitionId == null) {
- throw new AnalysisException("can not find partition: " +
partitionName);
- }
- return partitionId;
+ makeSureInitialized();
+ return dlaTable.getTableSnapshot(context, snapshot);
}
- private HivePartition getHivePartitionByIdOrAnalysisException(Long
partitionId,
- HiveMetaStoreCache.HivePartitionValues hivePartitionValues,
- HiveMetaStoreCache cache) throws AnalysisException {
- List<String> partitionValues =
hivePartitionValues.getPartitionValuesMap().get(partitionId);
- if (CollectionUtils.isEmpty(partitionValues)) {
- throw new AnalysisException("can not find partitionValues: " +
partitionId);
- }
- HivePartition partition = cache.getHivePartition(getDbName(),
getName(), partitionValues);
- if (partition == null) {
- throw new AnalysisException("can not find partition: " +
partitionId);
- }
- return partition;
- }
@Override
public boolean isPartitionColumnAllowNull() {
- return true;
+ makeSureInitialized();
+ return dlaTable.isPartitionColumnAllowNull();
}
/**
@@ -1025,7 +1003,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) catalog);
- List<Type> partitionColumnTypes = getPartitionColumnTypes();
+ List<Type> partitionColumnTypes =
getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(this));
HiveMetaStoreCache.HivePartitionValues partitionValues = null;
// Get table partitions from cache.
if (!partitionColumnTypes.isEmpty()) {
@@ -1099,6 +1077,16 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+ makeSureInitialized();
+ dlaTable.beforeMTMVRefresh(mtmv);
+ }
+
+ @Override
+ public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
+ if (getDlaType() == DLAType.HUDI) {
+ return new
HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this));
+ }
+ return new EmptyMvccSnapshot();
}
public boolean firstColumnIsString() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java
new file mode 100644
index 00000000000..296b2f3667a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDlaTable.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVTimestampSnapshot;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HiveDlaTable extends HMSDlaTable {
+
+ public HiveDlaTable(HMSExternalTable table) {
+ super(table);
+ }
+
+ @Override
+ public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+ return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST :
PartitionType.UNPARTITIONED;
+ }
+
+ @Override
+ public Set<String> getPartitionColumnNames(Optional<MvccSnapshot>
snapshot) {
+ return getPartitionColumns(snapshot).stream()
+ .map(c ->
c.getName().toLowerCase()).collect(Collectors.toSet());
+ }
+
+ @Override
+ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+ Optional<SchemaCacheValue> schemaCacheValue =
hmsTable.getSchemaCacheValue();
+ return schemaCacheValue.map(value -> ((HMSSchemaCacheValue)
value).getPartitionColumns())
+ .orElse(Collections.emptyList());
+ }
+
+ @Override
+ public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+ return hmsTable.getNameToPartitionItems();
+ }
+
+ @Override
+ public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
+ Optional<MvccSnapshot> snapshot) throws AnalysisException {
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
+ HiveMetaStoreCache.HivePartitionValues hivePartitionValues =
cache.getPartitionValues(
+ hmsTable.getDbName(), hmsTable.getName(),
hmsTable.getPartitionColumnTypes(snapshot));
+ Long partitionId =
getPartitionIdByNameOrAnalysisException(partitionName, hivePartitionValues);
+ HivePartition hivePartition =
getHivePartitionByIdOrAnalysisException(partitionId,
+ hivePartitionValues, cache);
+ return new MTMVTimestampSnapshot(hivePartition.getLastModifiedTime());
+ }
+
+ @Override
+ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
+ throws AnalysisException {
+ if (hmsTable.getPartitionType(snapshot) ==
PartitionType.UNPARTITIONED) {
+ return new MTMVMaxTimestampSnapshot(hmsTable.getName(),
hmsTable.getLastDdlTime());
+ }
+ HivePartition maxPartition = null;
+ long maxVersionTime = 0L;
+ long visibleVersionTime;
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
+ HiveMetaStoreCache.HivePartitionValues hivePartitionValues =
cache.getPartitionValues(
+ hmsTable.getDbName(), hmsTable.getName(),
hmsTable.getPartitionColumnTypes(snapshot));
+ List<HivePartition> partitionList =
cache.getAllPartitionsWithCache(hmsTable.getDbName(), hmsTable.getName(),
+
Lists.newArrayList(hivePartitionValues.getPartitionValuesMap().values()));
+ if (CollectionUtils.isEmpty(partitionList)) {
+ throw new AnalysisException("partitionList is empty, table name: "
+ hmsTable.getName());
+ }
+ for (HivePartition hivePartition : partitionList) {
+ visibleVersionTime = hivePartition.getLastModifiedTime();
+ if (visibleVersionTime > maxVersionTime) {
+ maxVersionTime = visibleVersionTime;
+ maxPartition = hivePartition;
+ }
+ }
+ return new MTMVMaxTimestampSnapshot(maxPartition.getPartitionName(
+ hmsTable.getPartitionColumns()), maxVersionTime);
+ }
+
+ private Long getPartitionIdByNameOrAnalysisException(String partitionName,
+ HiveMetaStoreCache.HivePartitionValues hivePartitionValues)
+ throws AnalysisException {
+ Long partitionId =
hivePartitionValues.getPartitionNameToIdMap().get(partitionName);
+ if (partitionId == null) {
+ throw new AnalysisException("can not find partition: " +
partitionName);
+ }
+ return partitionId;
+ }
+
+ private HivePartition getHivePartitionByIdOrAnalysisException(Long
partitionId,
+ HiveMetaStoreCache.HivePartitionValues hivePartitionValues,
+ HiveMetaStoreCache cache) throws AnalysisException {
+ List<String> partitionValues =
hivePartitionValues.getPartitionValuesMap().get(partitionId);
+ if (CollectionUtils.isEmpty(partitionValues)) {
+ throw new AnalysisException("can not find partitionValues: " +
partitionId);
+ }
+ HivePartition partition = cache.getHivePartition(hmsTable.getDbName(),
hmsTable.getName(), partitionValues);
+ if (partition == null) {
+ throw new AnalysisException("can not find partition: " +
partitionId);
+ }
+ return partition;
+ }
+
+ @Override
+ public boolean isPartitionColumnAllowNull() {
+ return true;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
index d98bf8227e1..96749b7c316 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java
@@ -803,7 +803,8 @@ public class HiveMetaStoreClientHelper {
return output.toString();
}
- public static InternalSchema getHudiTableSchema(HMSExternalTable table,
boolean[] enableSchemaEvolution) {
+ public static InternalSchema getHudiTableSchema(HMSExternalTable table,
boolean[] enableSchemaEvolution,
+ String timestamp) {
HoodieTableMetaClient metaClient = table.getHudiClient();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
@@ -815,7 +816,7 @@ public class HiveMetaStoreClientHelper {
// So, we should reload timeline so that we can read the latest commit
files.
metaClient.reloadActiveTimeline();
- Option<InternalSchema> internalSchemaOption =
schemaUtil.getTableInternalSchemaFromCommitMetadata();
+ Option<InternalSchema> internalSchemaOption =
schemaUtil.getTableInternalSchemaFromCommitMetadata(timestamp);
if (internalSchemaOption.isPresent()) {
enableSchemaEvolution[0] = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java
new file mode 100644
index 00000000000..24963ec10c5
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HudiDlaTable.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.TablePartitionValues;
+import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
+import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
+import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+import org.apache.doris.mtmv.MTMVTimestampSnapshot;
+
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HudiDlaTable extends HMSDlaTable {
+
+ public HudiDlaTable(HMSExternalTable table) {
+ super(table);
+ }
+
+ @Override
+ public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+ return getPartitionColumns(snapshot).size() > 0 ? PartitionType.LIST :
PartitionType.UNPARTITIONED;
+ }
+
+ @Override
+ public Set<String> getPartitionColumnNames(Optional<MvccSnapshot>
snapshot) {
+ return getPartitionColumns(snapshot).stream()
+ .map(c ->
c.getName().toLowerCase()).collect(Collectors.toSet());
+ }
+
+ @Override
+ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+ return getHudiSchemaCacheValue(snapshot).getPartitionColumns();
+ }
+
+ @Override
+ public Map<String, PartitionItem>
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+ TablePartitionValues tablePartitionValues =
getOrFetchHudiSnapshotCacheValue(snapshot);
+ Map<Long, PartitionItem> idToPartitionItem =
tablePartitionValues.getIdToPartitionItem();
+ Map<Long, String> partitionIdToNameMap =
tablePartitionValues.getPartitionIdToNameMap();
+ Map<String, PartitionItem> copiedPartitionItems = Maps.newHashMap();
+ for (Long key : partitionIdToNameMap.keySet()) {
+ copiedPartitionItems.put(partitionIdToNameMap.get(key),
idToPartitionItem.get(key));
+ }
+ return copiedPartitionItems;
+ }
+
+ @Override
+ public MTMVSnapshotIf getPartitionSnapshot(String partitionName,
MTMVRefreshContext context,
+ Optional<MvccSnapshot> snapshot) throws AnalysisException {
+ // Map<String, Long> partitionNameToLastModifiedMap =
getOrFetchHudiSnapshotCacheValue(
+ // snapshot).getPartitionNameToLastModifiedMap();
+ // return new
MTMVTimestampSnapshot(partitionNameToLastModifiedMap.get(partitionName));
+ return new MTMVTimestampSnapshot(0L);
+ }
+
+ @Override
+ public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
+ throws AnalysisException {
+ // return new
MTMVTimestampSnapshot(getOrFetchHudiSnapshotCacheValue(snapshot).getLastUpdateTimestamp());
+ return new MTMVTimestampSnapshot(0L);
+ }
+
+ @Override
+ public boolean isPartitionColumnAllowNull() {
+ return true;
+ }
+
+ public HMSSchemaCacheValue getHudiSchemaCacheValue(Optional<MvccSnapshot>
snapshot) {
+ TablePartitionValues snapshotCacheValue =
getOrFetchHudiSnapshotCacheValue(snapshot);
+ return
getHudiSchemaCacheValue(snapshotCacheValue.getLastUpdateTimestamp());
+ }
+
+ private TablePartitionValues
getOrFetchHudiSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
+ if (snapshot.isPresent()) {
+ return ((HudiMvccSnapshot)
snapshot.get()).getTablePartitionValues();
+ } else {
+ return HudiUtils.getPartitionValues(Optional.empty(), hmsTable);
+ }
+ }
+
+ private HMSSchemaCacheValue getHudiSchemaCacheValue(long timestamp) {
+ ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(hmsTable.getCatalog());
+ Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+ new HudiSchemaCacheKey(hmsTable.getDbName(),
hmsTable.getName(), timestamp));
+ if (!schemaCacheValue.isPresent()) {
+ throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+ null, hmsTable.getCatalog().getName(),
hmsTable.getDbName(), hmsTable.getName(), timestamp);
+ }
+ return (HMSSchemaCacheValue) schemaCacheValue.get();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index a65d057b8f3..5bcc65bb81f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -42,6 +42,7 @@ import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.HiveProperties;
import org.apache.doris.datasource.hive.HiveTransaction;
import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fs.DirectoryLister;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNodeId;
@@ -133,7 +134,7 @@ public class HiveScanNode extends FileQueryScanNode {
List<HivePartition> resPartitions = Lists.newArrayList();
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
- List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
+ List<Type> partitionColumnTypes =
hmsTable.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(hmsTable));
if (!partitionColumnTypes.isEmpty()) {
// partitioned table
Collection<PartitionItem> partitionItems;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
new file mode 100644
index 00000000000..0f01291e54c
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiMvccSnapshot.java
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi;
+
+import org.apache.doris.datasource.TablePartitionValues;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+
+/**
+ * Implementation of MvccSnapshot for Hudi tables that maintains partition
values
+ * for MVCC (Multiversion Concurrency Control) operations.
+ * This class is immutable to ensure thread safety.
+ */
+public class HudiMvccSnapshot implements MvccSnapshot {
+ private final TablePartitionValues tablePartitionValues;
+
+ /**
+ * Creates a new HudiMvccSnapshot with the specified partition values.
+ *
+ * @param tablePartitionValues The partition values for the snapshot
+ * @throws IllegalArgumentException if tablePartitionValues is null
+ */
+ public HudiMvccSnapshot(TablePartitionValues tablePartitionValues) {
+ if (tablePartitionValues == null) {
+ throw new IllegalArgumentException("TablePartitionValues cannot be
null");
+ }
+ this.tablePartitionValues = tablePartitionValues;
+ }
+
+ /**
+ * Gets the table partition values associated with this snapshot.
+ *
+ * @return The immutable TablePartitionValues object
+ */
+ public TablePartitionValues getTablePartitionValues() {
+ return tablePartitionValues;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HudiMvccSnapshot that = (HudiMvccSnapshot) o;
+ return tablePartitionValues.equals(that.tablePartitionValues);
+ }
+
+ @Override
+ public int hashCode() {
+ return tablePartitionValues.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("HudiMvccSnapshot{tablePartitionValues=%s}",
tablePartitionValues);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheKey.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheKey.java
new file mode 100644
index 00000000000..5a5b0dc044e
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiSchemaCacheKey.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi;
+
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
+
+import com.google.common.base.Objects;
+
+/**
+ * Cache key for Hudi table schemas that includes timestamp information.
+ * This allows for time-travel queries and ensures proper schema versioning.
+ */
+public class HudiSchemaCacheKey extends SchemaCacheKey {
+ private final long timestamp;
+
+ /**
+ * Creates a new cache key for Hudi table schemas.
+ *
+ * @param dbName The database name
+ * @param tableName The table name
+ * @param timestamp The timestamp for schema version
+ * @throws IllegalArgumentException if dbName or tableName is null or empty
+ */
+ public HudiSchemaCacheKey(String dbName, String tableName, long timestamp)
{
+ super(dbName, tableName);
+ if (timestamp < 0) {
+ throw new IllegalArgumentException("Timestamp cannot be negative");
+ }
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Gets the timestamp associated with this schema version.
+ *
+ * @return the timestamp value
+ */
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ HudiSchemaCacheKey that = (HudiSchemaCacheKey) o;
+ return timestamp == that.timestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.hashCode(), timestamp);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("HudiSchemaCacheKey{dbName='%s', tableName='%s',
timestamp=%d}",
+ getDbName(), getTblName(), timestamp);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
index 894103afe2c..c0e2a783ac3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.hive.HMSExternalTable;
@@ -257,10 +258,6 @@ public class HudiUtils {
public static TablePartitionValues
getPartitionValues(Optional<TableSnapshot> tableSnapshot,
HMSExternalTable hmsTable) {
TablePartitionValues partitionValues = new TablePartitionValues();
- if (hmsTable.getPartitionColumns().isEmpty()) {
- //isn't partition table.
- return partitionValues;
- }
HoodieTableMetaClient hudiClient = hmsTable.getHudiClient();
HudiCachedPartitionProcessor processor =
(HudiCachedPartitionProcessor) Env.getCurrentEnv()
@@ -312,10 +309,11 @@ public class HudiUtils {
return schemaInfo;
}
- public static HudiSchemaCacheValue getSchemaCacheValue(HMSExternalTable
hmsTable) {
- ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
- .getSchemaCache(hmsTable.getCatalog());
- Optional<SchemaCacheValue> schemaCacheValue =
cache.getSchemaValue(hmsTable.getDbName(), hmsTable.getName());
+ public static HudiSchemaCacheValue getSchemaCacheValue(HMSExternalTable
hmsTable, String queryInstant) {
+ ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(hmsTable.getCatalog());
+ SchemaCacheKey key = new HudiSchemaCacheKey(hmsTable.getDbName(),
hmsTable.getName(),
+ Long.parseLong(queryInstant));
+ Optional<SchemaCacheValue> schemaCacheValue =
cache.getSchemaValue(key);
return (HudiSchemaCacheValue) schemaCacheValue.get();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
index 62094b21c2b..ef921cbfa47 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
@@ -85,14 +86,15 @@ public class HudiCachedPartitionProcessor extends
HudiPartitionProcessor {
public TablePartitionValues getSnapshotPartitionValues(HMSExternalTable
table,
HoodieTableMetaClient tableMetaClient, String timestamp, boolean
useHiveSyncPartition) {
Preconditions.checkState(catalogId == table.getCatalog().getId());
+ TablePartitionValues partitionValues = new TablePartitionValues();
Option<String[]> partitionColumns =
tableMetaClient.getTableConfig().getPartitionFields();
- if (!partitionColumns.isPresent()) {
- return null;
+ if (!partitionColumns.isPresent() || partitionColumns.get().length ==
0) {
+ return partitionValues;
}
HoodieTimeline timeline =
tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Option<HoodieInstant> lastInstant = timeline.lastInstant();
if (!lastInstant.isPresent()) {
- return null;
+ return partitionValues;
}
long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp());
if (Long.parseLong(timestamp) == lastTimestamp) {
@@ -100,10 +102,13 @@ public class HudiCachedPartitionProcessor extends
HudiPartitionProcessor {
}
List<String> partitionNameAndValues =
getPartitionNamesBeforeOrEquals(timeline, timestamp);
List<String> partitionNames = Arrays.asList(partitionColumns.get());
- TablePartitionValues partitionValues = new TablePartitionValues();
+ // we don't support auto refresh hudi mtmv currently,
+ // so the list `partitionLastUpdateTimestamp` is full of 0L.
partitionValues.addPartitions(partitionNameAndValues,
partitionNameAndValues.stream().map(p ->
parsePartitionValues(partitionNames, p))
- .collect(Collectors.toList()),
table.getPartitionColumnTypes());
+ .collect(Collectors.toList()),
table.getHudiPartitionColumnTypes(Long.parseLong(timestamp)),
+ Collections.nCopies(partitionNameAndValues.size(), 0L));
+ partitionValues.setLastUpdateTimestamp(Long.parseLong(timestamp));
return partitionValues;
}
@@ -111,19 +116,21 @@ public class HudiCachedPartitionProcessor extends
HudiPartitionProcessor {
boolean
useHiveSyncPartition)
throws CacheException {
Preconditions.checkState(catalogId == table.getCatalog().getId());
+ TablePartitionValues partitionValues = new TablePartitionValues();
Option<String[]> partitionColumns =
tableMetaClient.getTableConfig().getPartitionFields();
- if (!partitionColumns.isPresent()) {
- return null;
+ if (!partitionColumns.isPresent() || partitionColumns.get().length ==
0) {
+ return partitionValues;
}
HoodieTimeline timeline =
tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
Option<HoodieInstant> lastInstant = timeline.lastInstant();
if (!lastInstant.isPresent()) {
- return null;
+ return partitionValues;
}
try {
long lastTimestamp =
Long.parseLong(lastInstant.get().getTimestamp());
- TablePartitionValues partitionValues = partitionCache.get(
- new TablePartitionKey(table.getDbName(), table.getName(),
table.getPartitionColumnTypes()));
+ partitionValues = partitionCache.get(
+ new TablePartitionKey(table.getDbName(), table.getName(),
+ table.getHudiPartitionColumnTypes(lastTimestamp)));
partitionValues.readLock().lock();
try {
long lastUpdateTimestamp =
partitionValues.getLastUpdateTimestamp();
@@ -159,7 +166,8 @@ public class HudiCachedPartitionProcessor extends
HudiPartitionProcessor {
partitionValues.cleanPartitions();
partitionValues.addPartitions(partitionNames,
partitionNames.stream().map(p ->
parsePartitionValues(partitionColumnsList, p))
- .collect(Collectors.toList()),
table.getPartitionColumnTypes());
+ .collect(Collectors.toList()),
table.getHudiPartitionColumnTypes(lastTimestamp),
+ Collections.nCopies(partitionNames.size(), 0L));
partitionValues.setLastUpdateTimestamp(lastTimestamp);
return partitionValues;
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 17543ec3949..00697a5469f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -29,14 +29,13 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalTable;
-import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.fs.DirectoryLister;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
@@ -166,11 +165,6 @@ public class HudiScanNode extends HiveScanNode {
basePath = hmsTable.getRemoteTable().getSd().getLocation();
inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
serdeLib =
hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib();
- ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(table.getCatalog());
- Optional<SchemaCacheValue> schemaCacheValue =
cache.getSchemaValue(table.getDbName(), table.getName());
- HudiSchemaCacheValue hudiSchemaCacheValue = (HudiSchemaCacheValue)
schemaCacheValue.get();
- columnNames =
hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList());
- columnTypes = hudiSchemaCacheValue.getColTypes();
if (scanParams != null && !scanParams.incrementalRead()) {
// Only support incremental read
@@ -211,11 +205,16 @@ public class HudiScanNode extends HiveScanNode {
}
queryInstant = snapshotInstant.get().getTimestamp();
}
+
+ HudiSchemaCacheValue hudiSchemaCacheValue =
HudiUtils.getSchemaCacheValue(hmsTable, queryInstant);
+ columnNames =
hudiSchemaCacheValue.getSchema().stream().map(Column::getName).collect(Collectors.toList());
+ columnTypes = hudiSchemaCacheValue.getColTypes();
+
fsView = Env.getCurrentEnv()
.getExtMetaCacheMgr()
.getFsViewProcessor(hmsTable.getCatalog())
.getFsView(hmsTable.getDbName(), hmsTable.getName(), hudiClient);
- if (HudiUtils.getSchemaCacheValue(hmsTable).isEnableSchemaEvolution())
{
+ if (hudiSchemaCacheValue.isEnableSchemaEvolution()) {
params.setHistorySchemaInfo(new ConcurrentHashMap<>());
}
}
@@ -270,7 +269,7 @@ public class HudiScanNode extends HiveScanNode {
// fileDesc.setNestedFields(hudiSplit.getNestedFields());
fileDesc.setHudiJniScanner(hudiSplit.getHudiJniScanner());
} else {
- HudiSchemaCacheValue hudiSchemaCacheValue =
HudiUtils.getSchemaCacheValue(hmsTable);
+ HudiSchemaCacheValue hudiSchemaCacheValue =
HudiUtils.getSchemaCacheValue(hmsTable, queryInstant);
if (hudiSchemaCacheValue.isEnableSchemaEvolution()) {
long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(
new File(hudiSplit.getPath().get()).getName()));
@@ -291,7 +290,7 @@ public class HudiScanNode extends HiveScanNode {
}
private List<HivePartition> getPrunedPartitions(HoodieTableMetaClient
metaClient) {
- List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
+ List<Type> partitionColumnTypes =
hmsTable.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(hmsTable));
if (!partitionColumnTypes.isEmpty()) {
this.totalPartitionNum = selectedPartitions.totalPartitionNum;
Map<String, PartitionItem> prunedPartitions =
selectedPartitions.selectedPartitions;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index fefd6b76b0f..16964ead1d2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -18,6 +18,7 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
@@ -278,7 +279,7 @@ public class IcebergExternalTable extends ExternalTable
implements MTMVRelatedTa
}
@Override
- public MvccSnapshot loadSnapshot() {
+ public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
index 1136fb079f3..a598c68703f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
@@ -134,7 +134,7 @@ public class MaxComputeExternalTable extends ExternalTable {
partitionSpecs.stream()
.map(p -> parsePartitionValues(partitionColumnNames,
p))
.collect(Collectors.toList()),
- partitionTypes);
+ partitionTypes, Collections.nCopies(partitionSpecs.size(),
0L));
return partitionValues;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/EmptyMvccSnapshot.java
similarity index 67%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/EmptyMvccSnapshot.java
index d69e0f3114d..35f63291a25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/EmptyMvccSnapshot.java
@@ -17,17 +17,5 @@
package org.apache.doris.datasource.mvcc;
-import org.apache.doris.catalog.TableIf;
-
-/**
- * The table that needs to query data based on the version needs to implement
this interface.
- */
-public interface MvccTable extends TableIf {
- /**
- * Retrieve the current snapshot information of the table,
- * and the returned result will be used for the entire process of this
query
- *
- * @return MvccSnapshot
- */
- MvccSnapshot loadSnapshot();
+public class EmptyMvccSnapshot implements MvccSnapshot {
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
index d69e0f3114d..89b1d6e9b07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccTable.java
@@ -17,8 +17,11 @@
package org.apache.doris.datasource.mvcc;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.TableIf;
+import java.util.Optional;
+
/**
* The table that needs to query data based on the version needs to implement
this interface.
*/
@@ -29,5 +32,5 @@ public interface MvccTable extends TableIf {
*
* @return MvccSnapshot
*/
- MvccSnapshot loadSnapshot();
+ MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 5d3e7dc3f5b..8a97aa856a4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.paimon;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
@@ -212,7 +213,7 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
}
@Override
- public MvccSnapshot loadSnapshot() {
+ public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 2721bc88b74..fc9b2ab0e6a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -80,6 +80,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
@@ -410,7 +411,7 @@ public class MTMVTask extends AbstractTask {
}
if (tableIf instanceof MvccTable) {
MvccTable mvccTable = (MvccTable) tableIf;
- MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot();
+ MvccSnapshot mvccSnapshot =
mvccTable.loadSnapshot(Optional.empty());
snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 50b87eef516..ad5f4f3d0e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -229,7 +229,6 @@ public class NereidsPlanner extends Planner {
// collect table and lock them in the order of table id
collectAndLockTable(showAnalyzeProcess(explainLevel,
showPlanProcess));
// after table collector, we should use a new context.
- statementContext.loadSnapshots();
Plan resultPlan = planWithoutLock(plan, requireProperties,
explainLevel, showPlanProcess);
lockCallback.accept(resultPlan);
if (statementContext.getConnectContext().getExecutor() != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index 815ebcb6d03..2fe0c4486f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -18,6 +18,7 @@
package org.apache.doris.nereids;
import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
@@ -660,13 +661,13 @@ public class StatementContext implements Closeable {
/**
* Load snapshot information of mvcc
*/
- public void loadSnapshots() {
+ public void loadSnapshots(Optional<TableSnapshot> tableSnapshot) {
for (TableIf tableIf : tables.values()) {
if (tableIf instanceof MvccTable) {
MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf);
// may be set by MTMV, we can not load again
if (!snapshots.containsKey(mvccTableInfo)) {
- snapshots.put(mvccTableInfo, ((MvccTable)
tableIf).loadSnapshot());
+ snapshots.put(mvccTableInfo, ((MvccTable)
tableIf).loadSnapshot(tableSnapshot));
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 82dddc9575a..dae2a8438d1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -378,6 +378,7 @@ public class BindRelation extends OneAnalysisRuleFactory {
List<String> qualifierWithoutTableName = Lists.newArrayList();
qualifierWithoutTableName.addAll(qualifiedTableName.subList(0,
qualifiedTableName.size() - 1));
+
cascadesContext.getStatementContext().loadSnapshots(unboundRelation.getTableSnapshot());
boolean isView = false;
try {
switch (table.getType()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 373e3d415bb..4e7cd6b5116 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -60,6 +60,7 @@ import
org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.task.AbstractTask;
@@ -1610,7 +1611,7 @@ public class MetadataGenerator {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) tbl.getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues =
cache.getPartitionValues(
- tbl.getDbName(), tbl.getName(), tbl.getPartitionColumnTypes());
+ tbl.getDbName(), tbl.getName(),
tbl.getPartitionColumnTypes(MvccUtil.getSnapshotFromContext(tbl)));
Map<Long, List<String>> valuesMap =
hivePartitionValues.getPartitionValuesMap();
List<TRow> dataBatch = Lists.newArrayList();
for (Map.Entry<Long, List<String>> entry : valuesMap.entrySet()) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
index 016b6616f0b..409fc1daf72 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hudi/HudiUtilsTest.java
@@ -175,7 +175,7 @@ public class HudiUtilsTest {
HMSExternalCatalog catalog = new HMSExternalCatalog();
HMSExternalDatabase db = new HMSExternalDatabase(catalog, 1, "db",
"db");
HMSExternalTable hmsExternalTable = new HMSExternalTable(2, "tb",
"tb", catalog, db);
- HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new
boolean[] {false});
+ HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new
boolean[] {false}, "20241219214518880");
// 4. delete the commit file,
// this operation is used to imitate the clean operation in hudi
@@ -189,7 +189,7 @@ public class HudiUtilsTest {
// because we will refresh timeline in this `getHudiTableSchema`
method,
// and we can get the latest commit.
// so that this error: `Could not read commit details from file
<table_path>/.hoodie/1.commit` will be not reported.
- HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new
boolean[] {false});
+ HiveMetaStoreClientHelper.getHudiTableSchema(hmsExternalTable, new
boolean[] {false}, "20241219214518880");
// 7. clean up
Assert.assertTrue(commit2.delete());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
index 30a233dd1a9..4abed2ee708 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
@@ -29,11 +29,13 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.CatalogMgr;
+import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
+import org.apache.doris.datasource.hive.HiveDlaTable;
import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
import org.apache.doris.qe.SessionVariable;
@@ -107,6 +109,7 @@ public class HmsCatalogTest extends AnalyzeCheckTestBase {
Deencapsulation.setField(tbl, "catalog", hmsCatalog);
Deencapsulation.setField(tbl, "dbName", "hms_db");
Deencapsulation.setField(tbl, "name", "hms_tbl");
+ Deencapsulation.setField(tbl, "dlaTable", new HiveDlaTable(tbl));
new Expectations(tbl) {
{
tbl.getId();
@@ -138,7 +141,7 @@ public class HmsCatalogTest extends AnalyzeCheckTestBase {
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
// mock initSchemaAndUpdateTime and do nothing
- tbl.initSchemaAndUpdateTime();
+ tbl.initSchemaAndUpdateTime(new
ExternalSchemaCache.SchemaCacheKey("hms_db", "hms_tbl"));
minTimes = 0;
tbl.getDatabase();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
index 0a981dab8a9..376f8cba4e8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
@@ -31,10 +31,12 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.CatalogMgr;
+import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
+import org.apache.doris.datasource.hive.HiveDlaTable;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
import org.apache.doris.planner.OlapScanNode;
@@ -123,6 +125,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase
{
Deencapsulation.setField(tbl, "catalog", hmsCatalog);
Deencapsulation.setField(tbl, "dbName", "hms_db");
Deencapsulation.setField(tbl, "name", "hms_tbl");
+ Deencapsulation.setField(tbl, "dlaTable", new HiveDlaTable(tbl));
new Expectations(tbl) {
{
tbl.getId();
@@ -158,7 +161,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase
{
result = DLAType.HIVE;
// mock initSchemaAndUpdateTime and do nothing
- tbl.initSchemaAndUpdateTime();
+ tbl.initSchemaAndUpdateTime(new
ExternalSchemaCache.SchemaCacheKey("hms_db", "hms_tbl"));
minTimes = 0;
tbl.getDatabase();
@@ -173,6 +176,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase
{
Deencapsulation.setField(tbl2, "catalog", hmsCatalog);
Deencapsulation.setField(tbl2, "dbName", "hms_db");
Deencapsulation.setField(tbl2, "name", "hms_tbl2");
+ Deencapsulation.setField(tbl2, "dlaTable", new HiveDlaTable(tbl2));
new Expectations(tbl2) {
{
tbl2.getId();
@@ -208,7 +212,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase
{
result = DLAType.HIVE;
// mock initSchemaAndUpdateTime and do nothing
- tbl2.initSchemaAndUpdateTime();
+ tbl2.initSchemaAndUpdateTime(new
ExternalSchemaCache.SchemaCacheKey("hms_db", "hms_tbl2"));
minTimes = 0;
tbl2.getDatabase();
@@ -386,7 +390,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase
{
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode4);
// invoke initSchemaAndUpdateTime first and init schemaUpdateTime
- tbl2.initSchemaAndUpdateTime();
+ tbl2.initSchemaAndUpdateTime(new
ExternalSchemaCache.SchemaCacheKey(tbl2.getDbName(), tbl2.getName()));
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt,
scanNodes);
ca.checkCacheMode(System.currentTimeMillis() +
Config.cache_last_version_interval_second * 1000L * 2);
@@ -434,7 +438,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase
{
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode4);
// invoke initSchemaAndUpdateTime first and init schemaUpdateTime
- tbl2.initSchemaAndUpdateTime();
+ tbl2.initSchemaAndUpdateTime(new
ExternalSchemaCache.SchemaCacheKey(tbl2.getDbName(), tbl2.getName()));
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt,
scanNodes);
ca.checkCacheModeForNereids(System.currentTimeMillis() +
Config.cache_last_version_interval_second * 1000L * 2);
diff --git
a/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.out
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.out
new file mode 100644
index 00000000000..f8870d0b287
Binary files /dev/null and
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.out
differ
diff --git
a/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out
new file mode 100644
index 00000000000..30b64a98ad8
Binary files /dev/null and
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.out
differ
diff --git
a/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out
new file mode 100644
index 00000000000..77597631587
Binary files /dev/null and
b/regression-test/data/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.out
differ
diff --git
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.groovy
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.groovy
new file mode 100644
index 00000000000..5bfcea11d67
--- /dev/null
+++
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_mtmv.groovy
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hudi_mtmv",
"p2,external,hudi,external_remote,external_remote_hudi") {
+ String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disabled hudi test")
+ return
+ }
+ String suiteName = "test_hudi_mtmv"
+ String catalogName = "${suiteName}_catalog"
+ String mvName = "${suiteName}_mv"
+ String dbName = context.config.getDbNameByFile(context.file)
+ String otherDbName = "${suiteName}_otherdb"
+ String tableName = "${suiteName}_table"
+
+ sql """drop database if exists ${otherDbName}"""
+ sql """create database ${otherDbName}"""
+ sql """
+ CREATE TABLE ${otherDbName}.${tableName} (
+ `user_id` INT,
+ `num` INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`user_id`)
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+
+ sql """
+ insert into ${otherDbName}.${tableName} values(1,2);
+ """
+
+ String props = context.config.otherConfigs.get("hudiEmrCatalog")
+
+ sql """drop catalog if exists ${catalogName}"""
+ sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES (
+ ${props}
+ );"""
+
+ order_qt_base_table """ select * from
${catalogName}.hudi_mtmv_regression_test.hudi_table_1; """
+
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`par`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1;
+ """
+ def showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertTrue(showPartitionsResult.toString().contains("p_a"))
+ assertTrue(showPartitionsResult.toString().contains("p_b"))
+
+ // refresh one partitions
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a);
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
+
+ //refresh auto
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} auto
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_refresh_auto "SELECT * FROM ${mvName} "
+ order_qt_is_sync_before_rebuild "select SyncWithBaseTables from
mv_infos('database'='${dbName}') where Name='${mvName}'"
+
+ // rebuild catalog, should not Affects MTMV
+ sql """drop catalog if exists ${catalogName}"""
+ sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES (
+ ${props}
+ );"""
+ order_qt_is_sync_after_rebuild "select SyncWithBaseTables from
mv_infos('database'='${dbName}') where Name='${mvName}'"
+
+ // should refresh normal after catalog rebuild
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} complete
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_refresh_complete_rebuild "SELECT * FROM ${mvName} "
+
+ sql """drop materialized view if exists ${mvName};"""
+
+ // not have partition
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ KEY(`id`)
+ COMMENT "comment1"
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1',"grace_period"="333")
+ AS
+ SELECT id,age,par FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1;
+ """
+ order_qt_not_partition_before "select SyncWithBaseTables from
mv_infos('database'='${dbName}') where Name='${mvName}'"
+ //should can refresh auto
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} auto
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_not_partition "SELECT * FROM ${mvName} "
+ order_qt_not_partition_after "select SyncWithBaseTables from
mv_infos('database'='${dbName}') where Name='${mvName}'"
+ sql """drop materialized view if exists ${mvName};"""
+
+ // refresh on schedule
+ // sql """
+ // CREATE MATERIALIZED VIEW ${mvName}
+ // BUILD IMMEDIATE REFRESH COMPLETE ON SCHEDULE EVERY 10 SECOND STARTS
"9999-12-13 21:07:09"
+ // KEY(`id`)
+ // COMMENT "comment1"
+ // DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ // PROPERTIES ('replication_num' = '1',"grace_period"="333")
+ // AS
+ // SELECT * FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1;
+ // """
+ // waitingMTMVTaskFinishedByMvName(mvName)
+ // sql """drop materialized view if exists ${mvName};"""
+
+ // refresh on schedule
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD IMMEDIATE REFRESH AUTO ON commit
+ KEY(`id`)
+ COMMENT "comment1"
+ DISTRIBUTED BY HASH(`id`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1',"grace_period"="333")
+ AS
+ SELECT id,age,par FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1;
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ sql """drop materialized view if exists ${mvName};"""
+
+ // cross db and join internal table
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`par`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 a left join
internal.${otherDbName}.${tableName} b on a.id=b.user_id;
+ """
+ def showJoinPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showJoinPartitionsResult: " +
showJoinPartitionsResult.toString())
+ assertTrue(showJoinPartitionsResult.toString().contains("p_a"))
+ assertTrue(showJoinPartitionsResult.toString().contains("p_b"))
+
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a);
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_join_one_partition "SELECT * FROM ${mvName} "
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`create_date`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_two_partitions;
+ """
+ def showTwoPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showTwoPartitionsResult: " +
showTwoPartitionsResult.toString())
+ assertTrue(showTwoPartitionsResult.toString().contains("p_20200101"))
+ assertTrue(showTwoPartitionsResult.toString().contains("p_20380101"))
+ assertTrue(showTwoPartitionsResult.toString().contains("p_20380102"))
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} auto;
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_two_partition "SELECT * FROM ${mvName} "
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`create_date`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' =
'1','partition_sync_limit'='2','partition_date_format'='%Y-%m-%d',
+ 'partition_sync_time_unit'='MONTH')
+ AS
+ SELECT * FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_two_partitions;
+ """
+ def showLimitPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showLimitPartitionsResult: " +
showLimitPartitionsResult.toString())
+ assertFalse(showLimitPartitionsResult.toString().contains("p_20200101"))
+ assertTrue(showLimitPartitionsResult.toString().contains("p_20380101"))
+ assertTrue(showLimitPartitionsResult.toString().contains("p_20380102"))
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} auto;
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_limit_partition "SELECT * FROM ${mvName} "
+ sql """drop materialized view if exists ${mvName};"""
+
+ // not allow date trunc
+ test {
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by (date_trunc(`create_date`,'month'))
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' =
'1','partition_sync_limit'='2','partition_date_format'='%Y-%m-%d',
+ 'partition_sync_time_unit'='MONTH')
+ AS
+ SELECT * FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_two_partitions;
+ """
+ exception "only support"
+ }
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`region`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_null_partition;
+ """
+ def showNullPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showNullPartitionsResult: " +
showNullPartitionsResult.toString())
+ // assertTrue(showNullPartitionsResult.toString().contains("p_null"))
+ assertTrue(showNullPartitionsResult.toString().contains("p_NULL"))
+ assertTrue(showNullPartitionsResult.toString().contains("p_bj"))
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} auto;
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ // Will lose null data
+ order_qt_null_partition "SELECT * FROM ${mvName} "
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """drop catalog if exists ${catalogName}"""
+
+}
diff --git
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy
new file mode 100644
index 00000000000..a0ac9b0783f
--- /dev/null
+++
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_olap_rewrite_mtmv.groovy
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hudi_olap_rewrite_mtmv",
"p2,external,hudi,external_remote,external_remote_hudi") {
+ String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disabled hudi test")
+ return
+ }
+ String suiteName = "test_hudi_olap_rewrite_mtmv"
+ String catalogName = "${suiteName}_catalog"
+ String mvName = "${suiteName}_mv"
+ String dbName = context.config.getDbNameByFile(context.file)
+ String tableName = "${suiteName}_table"
+ sql """drop table if exists ${tableName}"""
+ sql """
+ CREATE TABLE ${tableName} (
+ `user_id` INT,
+ `num` INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`user_id`)
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+ sql """
+ insert into ${tableName} values(1,2);
+ """
+
+ sql """analyze table internal.`${dbName}`. ${tableName} with sync"""
+ sql """alter table internal.`${dbName}`. ${tableName} modify column
user_id set stats ('row_count'='1');"""
+
+ String props = context.config.otherConfigs.get("hudiEmrCatalog")
+
+ sql """set materialized_view_rewrite_enable_contain_external_table=true;"""
+ String mvSql = "SELECT * FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 a left join
${tableName} b on a.id=b.user_id;";
+
+ sql """drop catalog if exists ${catalogName}"""
+ sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES (
+ ${props}
+ );"""
+
+ sql """analyze table
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 with sync"""
+ sql """alter table ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1
modify column par set stats ('row_count'='10');"""
+
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`par`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ ${mvSql}
+ """
+ def showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertTrue(showPartitionsResult.toString().contains("p_a"))
+ assertTrue(showPartitionsResult.toString().contains("p_b"))
+
+ // refresh one partitions
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a);
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
+
+ def explainOnePartition = sql """ explain ${mvSql} """
+ logger.info("explainOnePartition: " + explainOnePartition.toString())
+ assertTrue(explainOnePartition.toString().contains("VUNION"))
+ order_qt_refresh_one_partition_rewrite "${mvSql}"
+
+ mv_rewrite_success("${mvSql}", "${mvName}")
+
+ // select p_b should not rewrite
+ mv_rewrite_fail("SELECT * FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 a left join
${tableName} b on a.id=b.user_id where a.par='b';", "${mvName}")
+
+ //refresh auto
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} auto
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_refresh_auto "SELECT * FROM ${mvName} "
+
+ def explainAllPartition = sql """ explain ${mvSql}; """
+ logger.info("explainAllPartition: " + explainAllPartition.toString())
+ assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+ order_qt_refresh_all_partition_rewrite "${mvSql}"
+
+ mv_rewrite_success("${mvSql}", "${mvName}")
+
+ sql """drop materialized view if exists ${mvName};"""
+ sql """drop catalog if exists ${catalogName}"""
+}
diff --git
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
new file mode 100644
index 00000000000..95c71c48475
--- /dev/null
+++
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hudi_rewrite_mtmv",
"p2,external,hudi,external_remote,external_remote_hudi") {
+ String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disabled hudi test")
+ return
+ }
+ String suiteName = "test_hudi_rewrite_mtmv"
+ String catalogName = "${suiteName}_catalog"
+ String mvName = "${suiteName}_mv"
+ String dbName = context.config.getDbNameByFile(context.file)
+
+ String props = context.config.otherConfigs.get("hudiEmrCatalog")
+
+ sql """set materialized_view_rewrite_enable_contain_external_table=true;"""
+ String mvSql = "SELECT par,count(*) as num FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 group by par;";
+
+ sql """drop catalog if exists ${catalogName}"""
+ sql """CREATE CATALOG if not exists ${catalogName} PROPERTIES (
+ ${props}
+ );"""
+
+ sql """analyze table
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 with sync"""
+ sql """alter table ${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1
modify column par set stats ('row_count'='10');"""
+
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ BUILD DEFERRED REFRESH AUTO ON MANUAL
+ partition by(`par`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ ${mvSql}
+ """
+ def showPartitionsResult = sql """show partitions from ${mvName}"""
+ logger.info("showPartitionsResult: " + showPartitionsResult.toString())
+ assertTrue(showPartitionsResult.toString().contains("p_a"))
+ assertTrue(showPartitionsResult.toString().contains("p_b"))
+
+ // refresh one partitions
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} partitions(p_a);
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
+
+ def explainOnePartition = sql """ explain ${mvSql} """
+ logger.info("explainOnePartition: " + explainOnePartition.toString())
+ assertTrue(explainOnePartition.toString().contains("VUNION"))
+ order_qt_refresh_one_partition_rewrite "${mvSql}"
+
+ mv_rewrite_success("${mvSql}", "${mvName}")
+
+ // select p_b should not rewrite
+ mv_rewrite_fail("SELECT par,count(*) as num FROM
${catalogName}.`hudi_mtmv_regression_test`.hudi_table_1 where par='b' group by
par;", "${mvName}")
+
+ //refresh auto
+ sql """
+ REFRESH MATERIALIZED VIEW ${mvName} auto
+ """
+ waitingMTMVTaskFinishedByMvName(mvName)
+ order_qt_refresh_auto "SELECT * FROM ${mvName} "
+
+ def explainAllPartition = sql """ explain ${mvSql}; """
+ logger.info("explainAllPartition: " + explainAllPartition.toString())
+ assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+ order_qt_refresh_all_partition_rewrite "${mvSql}"
+
+ mv_rewrite_success("${mvSql}", "${mvName}")
+
+ sql """drop materialized view if exists ${mvName};"""
+ sql """drop catalog if exists ${catalogName}"""
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]