This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 435147d449c5b1bbab59c882f0a8ae68adc3c845 Author: zhangdong <493738...@qq.com> AuthorDate: Fri May 17 17:31:22 2024 +0800 [enhance](mtmv) MTMV deal partition use name instead of id (#34910) partition id will change when insert overwrite When the materialized view runs a task, if the base table is in insert overwrite, the materialized view task may report an error: partition not found by partitionId Upgrade compatibility: Hive currently does not support automatic refresh, so it has no impact --- .../main/java/org/apache/doris/catalog/MTMV.java | 22 ++-- .../java/org/apache/doris/catalog/OlapTable.java | 27 ++++- .../doris/datasource/hive/HMSExternalTable.java | 36 ++++--- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 41 ++++---- .../doris/mtmv/MTMVMaxTimestampSnapshot.java | 19 ++-- .../org/apache/doris/mtmv/MTMVPartitionUtil.java | 112 ++++++++++----------- ...latedPartitionDescOnePartitionColGenerator.java | 6 +- .../MTMVRelatedPartitionDescRollUpGenerator.java | 20 ++-- ...MTMVRelatedPartitionDescSyncLimitGenerator.java | 6 +- .../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 6 +- .../org/apache/doris/mtmv/MTMVRewriteUtil.java | 6 +- .../doris/mtmv/RelatedPartitionDescResult.java | 12 +-- .../plans/commands/UpdateMvByPartitionCommand.java | 26 ++--- .../apache/doris/mtmv/MTMVPartitionUtilTest.java | 18 ++-- ...TMVRelatedPartitionDescRollUpGeneratorTest.java | 28 +++--- .../org/apache/doris/mtmv/MTMVRewriteUtilTest.java | 12 ++- .../java/org/apache/doris/mtmv/MTMVTaskTest.java | 46 ++++----- 17 files changed, 236 insertions(+), 207 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index fb32741afa3..d95c2388a28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -305,12 +305,12 @@ public class MTMV extends OlapTable { /** * generateMvPartitionDescs * - * @return mvPartitionId ==> mvPartitionKeyDesc + * @return mvPartitionName ==> mvPartitionKeyDesc */ - public Map<Long, PartitionKeyDesc> generateMvPartitionDescs() { - Map<Long, PartitionItem> mtmvItems = getAndCopyPartitionItems(); - Map<Long, PartitionKeyDesc> result = Maps.newHashMap(); - for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) { + public Map<String, PartitionKeyDesc> generateMvPartitionDescs() { + Map<String, PartitionItem> mtmvItems = getAndCopyPartitionItems(); + Map<String, PartitionKeyDesc> result = Maps.newHashMap(); + for (Entry<String, PartitionItem> entry : mtmvItems.entrySet()) { result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc()); } return result; @@ -321,19 +321,19 @@ public class MTMV extends OlapTable { * It is the result of real-time comparison calculation, so there may be some costs, * so it should be called with caution * - * @return mvPartitionId ==> relationPartitionIds + * @return mvPartitionName ==> relationPartitionNames * @throws AnalysisException */ - public Map<Long, Set<Long>> calculatePartitionMappings() throws AnalysisException { + public Map<String, Set<String>> calculatePartitionMappings() throws AnalysisException { if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) { return Maps.newHashMap(); } long start = System.currentTimeMillis(); - Map<Long, Set<Long>> res = Maps.newHashMap(); - Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = MTMVPartitionUtil + Map<String, Set<String>> res = Maps.newHashMap(); + Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = MTMVPartitionUtil .generateRelatedPartitionDescs(mvPartitionInfo, mvProperties); - Map<Long, PartitionItem> mvPartitionItems = getAndCopyPartitionItems(); - for (Entry<Long, PartitionItem> entry : mvPartitionItems.entrySet()) { + Map<String, PartitionItem> mvPartitionItems = getAndCopyPartitionItems(); + for (Entry<String, PartitionItem> entry : mvPartitionItems.entrySet()) { res.put(entry.getKey(), relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 9cf9be00915..0d3477b3a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -95,6 +95,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -1037,6 +1038,17 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { return partition; } + public PartitionItem getPartitionItemOrAnalysisException(String partitionName) throws AnalysisException { + Partition partition = nameToPartition.get(partitionName); + if (partition == null) { + partition = tempPartitions.getPartition(partitionName); + } + if (partition == null) { + throw new AnalysisException("partition not found: " + partitionName); + } + return partitionInfo.getItem(partition.getId()); + } + public Partition getPartitionOrAnalysisException(long partitionId) throws AnalysisException { Partition partition = idToPartition.get(partitionId); if (partition == null) { @@ -2657,10 +2669,17 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override - public Map<Long, PartitionItem> getAndCopyPartitionItems() { + public Map<String, PartitionItem> getAndCopyPartitionItems() { readLock(); try { - return Maps.newHashMap(getPartitionInfo().getIdToItem(false)); + Map<String, PartitionItem> res = Maps.newHashMap(); + for (Entry<Long, PartitionItem> entry : getPartitionInfo().getIdToItem(false).entrySet()) { + Partition partition = idToPartition.get(entry.getKey()); + if (partition != null) { + res.put(partition.getName(), entry.getValue()); + } + } + return res; } finally { readUnlock(); } @@ -2672,8 +2691,8 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override - public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException { - long visibleVersion = getPartitionOrAnalysisException(partitionId).getVisibleVersion(); + public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException { + long visibleVersion = getPartitionOrAnalysisException(partitionName).getVisibleVersion(); return new MTMVVersionSnapshot(visibleVersion); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 57dee5debb0..e5624fb58b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -878,12 +878,22 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public Map<Long, PartitionItem> getAndCopyPartitionItems() { + public Map<String, PartitionItem> getAndCopyPartitionItems() { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( getDbName(), getName(), getPartitionColumnTypes()); - return hivePartitionValues.getIdToPartitionItem(); + Map<String, PartitionItem> res = Maps.newHashMap(); + Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem(); + for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) { + try { + res.put(getPartitionName(entry.getKey()), entry.getValue()); + } catch (AnalysisException e) { + LOG.info("can not get partitionName by: " + entry.getKey()); + } + + } + return res; } @Override @@ -905,35 +915,35 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException { - long partitionLastModifyTime = getPartitionLastModifyTime(partitionId); + public MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException { + long partitionLastModifyTime = getPartitionLastModifyTime(partitionName); return new MTMVTimestampSnapshot(partitionLastModifyTime); } @Override public MTMVSnapshotIf getTableSnapshot() throws AnalysisException { if (getPartitionType() == PartitionType.UNPARTITIONED) { - return new MTMVMaxTimestampSnapshot(-1L, getLastDdlTime()); + return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime()); } - long partitionId = 0L; + String partitionName = ""; long maxVersionTime = 0L; long visibleVersionTime; - for (Entry<Long, PartitionItem> entry : getAndCopyPartitionItems().entrySet()) { + for (Entry<String, PartitionItem> entry : getAndCopyPartitionItems().entrySet()) { visibleVersionTime = getPartitionLastModifyTime(entry.getKey()); if (visibleVersionTime > maxVersionTime) { maxVersionTime = visibleVersionTime; - partitionId = entry.getKey(); + partitionName = entry.getKey(); } } - return new MTMVMaxTimestampSnapshot(partitionId, maxVersionTime); + return new MTMVMaxTimestampSnapshot(partitionName, maxVersionTime); } - private long getPartitionLastModifyTime(long partitionId) throws AnalysisException { - return getPartitionById(partitionId).getLastModifiedTime(); + private long getPartitionLastModifyTime(String partitionName) throws AnalysisException { + return getPartitionByName(partitionName).getLastModifiedTime(); } - private HivePartition getPartitionById(long partitionId) throws AnalysisException { - PartitionItem item = getAndCopyPartitionItems().get(partitionId); + private HivePartition getPartitionByName(String partitionName) throws AnalysisException { + PartitionItem item = getAndCopyPartitionItems().get(partitionName); List<List<String>> partitionValuesList = transferPartitionItemToPartitionValues(item); List<HivePartition> partitions = getPartitionsByPartitionValues(partitionValuesList); if (partitions.size() != 1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 7a4c5277591..71051be3958 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -173,30 +173,28 @@ public class MTMVTask extends AbstractTask { if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVPartitionUtil.alignMvPartition(mtmv); } - Map<Long, Set<Long>> partitionMappings = mtmv.calculatePartitionMappings(); - List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions(partitionMappings); - this.needRefreshPartitions = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds); - this.refreshMode = generateRefreshMode(needRefreshPartitionIds); + Map<String, Set<String>> partitionMappings = mtmv.calculatePartitionMappings(); + this.needRefreshPartitions = calculateNeedRefreshPartitions(partitionMappings); + this.refreshMode = generateRefreshMode(needRefreshPartitions); if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) { return; } Map<TableIf, String> tableWithPartKey = getIncrementalTableMap(); this.completedPartitions = Lists.newCopyOnWriteArrayList(); int refreshPartitionNum = mtmv.getRefreshPartitionNum(); - long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size() + long execNum = (needRefreshPartitions.size() / refreshPartitionNum) + ((needRefreshPartitions.size() % refreshPartitionNum) > 0 ? 1 : 0); this.partitionSnapshots = Maps.newConcurrentMap(); for (int i = 0; i < execNum; i++) { int start = i * refreshPartitionNum; int end = start + refreshPartitionNum; - Set<Long> execPartitionIds = Sets.newHashSet(needRefreshPartitionIds - .subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end)); + Set<String> execPartitionNames = Sets.newHashSet(needRefreshPartitions + .subList(start, end > needRefreshPartitions.size() ? needRefreshPartitions.size() : end)); // need get names before exec - List<String> execPartitionNames = MTMVPartitionUtil.getPartitionNamesByIds(mtmv, execPartitionIds); Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil - .generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds, + .generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionNames, partitionMappings); - exec(ctx, execPartitionIds, tableWithPartKey); + exec(ctx, execPartitionNames, tableWithPartKey); completedPartitions.addAll(execPartitionNames); partitionSnapshots.putAll(execPartitionSnapshots); } @@ -218,15 +216,15 @@ public class MTMVTask extends AbstractTask { } } - private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, - Map<TableIf, String> tableWithPartKey) + private void exec(ConnectContext ctx, Set<String> refreshPartitionNames, + Map<TableIf, String> tableWithPartKey) throws Exception { TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); // if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE - ? refreshPartitionIds : Sets.newHashSet(), tableWithPartKey); + ? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey); executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); ctx.setExecutor(executor); ctx.setQueryId(queryId); @@ -252,7 +250,7 @@ public class MTMVTask extends AbstractTask { } @Override - protected synchronized void executeCancelLogic() { + protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); if (executor != null) { executor.cancel(); @@ -407,29 +405,30 @@ public class MTMVTask extends AbstractTask { return tableWithPartKey; } - private MTMVTaskRefreshMode generateRefreshMode(List<Long> needRefreshPartitionIds) { + private MTMVTaskRefreshMode generateRefreshMode(List<String> needRefreshPartitionIds) { if (CollectionUtils.isEmpty(needRefreshPartitionIds)) { return MTMVTaskRefreshMode.NOT_REFRESH; - } else if (needRefreshPartitionIds.size() == mtmv.getPartitionIds().size()) { + } else if (needRefreshPartitionIds.size() == mtmv.getPartitionNames().size()) { return MTMVTaskRefreshMode.COMPLETE; } else { return MTMVTaskRefreshMode.PARTIAL; } } - public List<Long> calculateNeedRefreshPartitions(Map<Long, Set<Long>> partitionMappings) throws AnalysisException { + public List<String> calculateNeedRefreshPartitions(Map<String, Set<String>> partitionMappings) + throws AnalysisException { // check whether the user manually triggers it if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) { if (taskContext.isComplete()) { - return mtmv.getPartitionIds(); + return Lists.newArrayList(mtmv.getPartitionNames()); } else if (!CollectionUtils .isEmpty(taskContext.getPartitions())) { - return MTMVPartitionUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions()); + return taskContext.getPartitions(); } } // if refreshMethod is COMPLETE, we must FULL refresh, avoid external table MTMV always not refresh if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE) { - return mtmv.getPartitionIds(); + return Lists.newArrayList(mtmv.getPartitionNames()); } // check if data is fresh // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() @@ -441,7 +440,7 @@ public class MTMVTask extends AbstractTask { } // current, if partitionType is SELF_MANAGE, we can only FULL refresh if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.SELF_MANAGE) { - return mtmv.getPartitionIds(); + return Lists.newArrayList(mtmv.getPartitionNames()); } // We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation() // to avoid rebuilding the baseTable and causing a change in the tableId diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java index 53f9df542cf..0a127857b48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVMaxTimestampSnapshot.java @@ -25,17 +25,17 @@ import com.google.gson.annotations.SerializedName; * so the update time is used instead of the version */ public class MTMVMaxTimestampSnapshot implements MTMVSnapshotIf { - // partitionId corresponding to timestamp - // The reason why both timestamp and partitionId are stored is to avoid + // partitionName corresponding to timestamp + // The reason why both timestamp and partitionName are stored is to avoid // deleting the partition corresponding to timestamp - @SerializedName("p") - private long partitionId; + @SerializedName("pn") + private String partitionName; // The maximum modify time in all partitions @SerializedName("t") private long timestamp; - public MTMVMaxTimestampSnapshot(long partitionId, long timestamp) { - this.partitionId = partitionId; + public MTMVMaxTimestampSnapshot(String partitionName, long timestamp) { + this.partitionName = partitionName; this.timestamp = timestamp; } @@ -48,19 +48,18 @@ public class MTMVMaxTimestampSnapshot implements MTMVSnapshotIf { return false; } MTMVMaxTimestampSnapshot that = (MTMVMaxTimestampSnapshot) o; - return partitionId == that.partitionId - && timestamp == that.timestamp; + return timestamp == that.timestamp && Objects.equal(partitionName, that.partitionName); } @Override public int hashCode() { - return Objects.hashCode(partitionId, timestamp); + return Objects.hashCode(partitionName, timestamp); } @Override public String toString() { return "MTMVMaxTimestampSnapshot{" - + "partitionId=" + partitionId + + "partitionName='" + partitionName + '\'' + ", timestamp=" + timestamp + '}'; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index cd0312c419e..f2b7b146ee5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -73,14 +73,14 @@ public class MTMVPartitionUtil { * Determine whether the partition is sync with retated partition and other baseTables * * @param mtmv - * @param partitionId - * @param relatedPartitionIds + * @param partitionName + * @param relatedPartitionNames * @param tables * @param excludedTriggerTables * @return * @throws AnalysisException */ - public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<Long> relatedPartitionIds, + public static boolean isMTMVPartitionSync(MTMV mtmv, String partitionName, Set<String> relatedPartitionNames, Set<BaseTableInfo> tables, Set<String> excludedTriggerTables) throws AnalysisException { boolean isSyncWithPartition = true; @@ -88,14 +88,14 @@ public class MTMVPartitionUtil { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); // if follow base table, not need compare with related table, only should compare with related partition excludedTriggerTables.add(relatedTable.getName()); - if (CollectionUtils.isEmpty(relatedPartitionIds)) { + if (CollectionUtils.isEmpty(relatedPartitionNames)) { LOG.warn("can not found related partition, partitionId: {}, mtmvName: {}, relatedTableName: {}", - partitionId, mtmv.getName(), relatedTable.getName()); + partitionName, mtmv.getName(), relatedTable.getName()); return false; } - isSyncWithPartition = isSyncWithPartitions(mtmv, partitionId, relatedTable, relatedPartitionIds); + isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName, relatedTable, relatedPartitionNames); } - return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables); + return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionName, tables, excludedTriggerTables); } @@ -108,11 +108,11 @@ public class MTMVPartitionUtil { */ public static void alignMvPartition(MTMV mtmv) throws DdlException, AnalysisException { - Map<Long, PartitionKeyDesc> mtmvPartitionDescs = mtmv.generateMvPartitionDescs(); + Map<String, PartitionKeyDesc> mtmvPartitionDescs = mtmv.generateMvPartitionDescs(); Set<PartitionKeyDesc> relatedPartitionDescs = generateRelatedPartitionDescs(mtmv.getMvPartitionInfo(), mtmv.getMvProperties()).keySet(); // drop partition of mtmv - for (Entry<Long, PartitionKeyDesc> entry : mtmvPartitionDescs.entrySet()) { + for (Entry<String, PartitionKeyDesc> entry : mtmvPartitionDescs.entrySet()) { if (!relatedPartitionDescs.contains(entry.getValue())) { dropPartition(mtmv, entry.getKey()); } @@ -152,7 +152,7 @@ public class MTMVPartitionUtil { return res; } - public static Map<PartitionKeyDesc, Set<Long>> generateRelatedPartitionDescs(MTMVPartitionInfo mvPartitionInfo, + public static Map<PartitionKeyDesc, Set<String>> generateRelatedPartitionDescs(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties) throws AnalysisException { long start = System.currentTimeMillis(); RelatedPartitionDescResult result = new RelatedPartitionDescResult(); @@ -219,11 +219,11 @@ public class MTMVPartitionUtil { * @throws AnalysisException */ public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables, - Map<Long, Set<Long>> partitionMappings) + Map<String, Set<String>> partitionMappings) throws AnalysisException { - List<Long> partitionIds = mtmv.getPartitionIds(); - for (Long partitionId : partitionIds) { - if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), tables, + Set<String> partitionNames = mtmv.getPartitionNames(); + for (String partitionName : partitionNames) { + if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), tables, excludeTables)) { return false; } @@ -236,20 +236,22 @@ public class MTMVPartitionUtil { * * @param mtmv * @param partitionIds - * @return partitionId ==> UnSyncTableNames + * @return partitionName ==> UnSyncTableNames * @throws AnalysisException */ public static Map<Long, List<String>> getPartitionsUnSyncTables(MTMV mtmv, List<Long> partitionIds) throws AnalysisException { Map<Long, List<String>> res = Maps.newHashMap(); - Map<Long, Set<Long>> partitionMappings = mtmv.calculatePartitionMappings(); + Map<String, Set<String>> partitionMappings = mtmv.calculatePartitionMappings(); for (Long partitionId : partitionIds) { - res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionId, partitionMappings.get(partitionId))); + String partitionName = mtmv.getPartitionOrAnalysisException(partitionId).getName(); + res.put(partitionId, getPartitionUnSyncTables(mtmv, partitionName, partitionMappings.get(partitionName))); } return res; } - private static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId, Set<Long> relatedPartitionIds) + private static List<String> getPartitionUnSyncTables(MTMV mtmv, String partitionName, + Set<String> relatedPartitionNames) throws AnalysisException { List<String> res = Lists.newArrayList(); for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) { @@ -263,16 +265,16 @@ public class MTMVPartitionUtil { } if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE && mtmv .getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) { - if (CollectionUtils.isEmpty(relatedPartitionIds)) { + if (CollectionUtils.isEmpty(relatedPartitionNames)) { throw new AnalysisException("can not found related partition"); } - boolean isSyncWithPartition = isSyncWithPartitions(mtmv, partitionId, mtmvRelatedTableIf, - relatedPartitionIds); + boolean isSyncWithPartition = isSyncWithPartitions(mtmv, partitionName, mtmvRelatedTableIf, + relatedPartitionNames); if (!isSyncWithPartition) { res.add(mtmvRelatedTableIf.getName()); } } else { - if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) { + if (!isSyncWithBaseTable(mtmv, partitionName, baseTableInfo)) { res.add(table.getName()); } } @@ -287,18 +289,18 @@ public class MTMVPartitionUtil { * @param baseTables * @return */ - public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables, - Map<Long, Set<Long>> partitionMappings) { - List<Long> partitionIds = mtmv.getPartitionIds(); - List<Long> res = Lists.newArrayList(); - for (Long partitionId : partitionIds) { + public static List<String> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables, + Map<String, Set<String>> partitionMappings) { + Set<String> partitionNames = mtmv.getPartitionNames(); + List<String> res = Lists.newArrayList(); + for (String partitionName : partitionNames) { try { - if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), baseTables, + if (!isMTMVPartitionSync(mtmv, partitionName, partitionMappings.get(partitionName), baseTables, mtmv.getExcludedTriggerTables())) { - res.add(partitionId); + res.add(partitionName); } } catch (AnalysisException e) { - res.add(partitionId); + res.add(partitionName); LOG.warn("check isMTMVPartitionSync failed", e); } } @@ -309,23 +311,21 @@ public class MTMVPartitionUtil { * Compare the current and last updated partition (or table) snapshot of the associated partition (or table) * * @param mtmv - * @param mtmvPartitionId + * @param mtmvPartitionName * @param relatedTable - * @param relatedPartitionIds + * @param relatedPartitionNames * @return * @throws AnalysisException */ - public static boolean isSyncWithPartitions(MTMV mtmv, Long mtmvPartitionId, + public static boolean isSyncWithPartitions(MTMV mtmv, String mtmvPartitionName, MTMVRelatedTableIf relatedTable, - Set<Long> relatedPartitionIds) throws AnalysisException { + Set<String> relatedPartitionNames) throws AnalysisException { if (!relatedTable.needAutoRefresh()) { return true; } - String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); - for (Long relatedPartitionId : relatedPartitionIds) { + for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionId); - String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId); + .getPartitionSnapshot(relatedPartitionName); if (!mtmv.getRefreshSnapshot() .equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot)) { @@ -355,15 +355,14 @@ public class MTMVPartitionUtil { * drop partition of mtmv * * @param mtmv - * @param partitionId + * @param partitionName */ - private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException { + private static void dropPartition(MTMV mtmv, String partitionName) throws DdlException { if (!mtmv.writeLockIfExist()) { return; } try { - Partition partition = mtmv.getPartitionOrAnalysisException(partitionId); - DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false); + DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partitionName, false, false); Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause); } finally { mtmv.writeUnlock(); @@ -394,12 +393,12 @@ public class MTMVPartitionUtil { /** * Determine is sync, ignoring excludedTriggerTables and non OlapTanle * - * @param mtmvPartitionId + * @param mtmvPartitionName * @param tables * @param excludedTriggerTables * @return */ - private static boolean isSyncWithAllBaseTables(MTMV mtmv, long mtmvPartitionId, Set<BaseTableInfo> tables, + private static boolean isSyncWithAllBaseTables(MTMV mtmv, String mtmvPartitionName, Set<BaseTableInfo> tables, Set<String> excludedTriggerTables) throws AnalysisException { for (BaseTableInfo baseTableInfo : tables) { TableIf table = null; @@ -412,7 +411,7 @@ public class MTMVPartitionUtil { if (excludedTriggerTables.contains(table.getName())) { continue; } - boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionId, baseTableInfo); + boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionName, baseTableInfo); if (!syncWithBaseTable) { return false; } @@ -420,7 +419,7 @@ public class MTMVPartitionUtil { return true; } - private static boolean isSyncWithBaseTable(MTMV mtmv, long mtmvPartitionId, BaseTableInfo baseTableInfo) + private static boolean isSyncWithBaseTable(MTMV mtmv, String mtmvPartitionName, BaseTableInfo baseTableInfo) throws AnalysisException { TableIf table = null; try { @@ -440,7 +439,6 @@ public class MTMVPartitionUtil { return true; } MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(); - String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId); return mtmv.getRefreshSnapshot() .equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot); } @@ -450,35 +448,35 @@ public class MTMVPartitionUtil { * * @param mtmv * @param baseTables - * @param partitionIds + * @param partitionNames * @param partitionMappings * @return * @throws AnalysisException */ public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv, - Set<BaseTableInfo> baseTables, Set<Long> partitionIds, - Map<Long, Set<Long>> partitionMappings) + Set<BaseTableInfo> baseTables, Set<String> partitionNames, + Map<String, Set<String>> partitionMappings) throws AnalysisException { Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap(); - for (Long partitionId : partitionIds) { - res.put(mtmv.getPartitionName(partitionId), - generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionId))); + for (String partitionName : partitionNames) { + res.put(partitionName, + generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionName))); } return res; } private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv, - Set<BaseTableInfo> baseTables, Set<Long> relatedPartitionIds) + Set<BaseTableInfo> baseTables, Set<String> relatedPartitionNames) throws AnalysisException { MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot(); if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) { MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable(); - for (Long relatedPartitionId : relatedPartitionIds) { + for (String relatedPartitionName : relatedPartitionNames) { MTMVSnapshotIf partitionSnapshot = relatedTable - .getPartitionSnapshot(relatedPartitionId); + .getPartitionSnapshot(relatedPartitionName); refreshPartitionSnapshot.getPartitions() - .put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot); + .put(relatedPartitionName, partitionSnapshot); } } for (BaseTableInfo baseTableInfo : baseTables) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescOnePartitionColGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescOnePartitionColGenerator.java index ab14f302e75..c5dad9bdb41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescOnePartitionColGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescOnePartitionColGenerator.java @@ -51,10 +51,10 @@ public class MTMVRelatedPartitionDescOnePartitionColGenerator implements MTMVRel if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) { return; } - Map<PartitionKeyDesc, Set<Long>> res = Maps.newHashMap(); - Map<Long, PartitionItem> relatedPartitionItems = lastResult.getItems(); + Map<PartitionKeyDesc, Set<String>> res = Maps.newHashMap(); + Map<String, PartitionItem> relatedPartitionItems = lastResult.getItems(); int relatedColPos = mvPartitionInfo.getRelatedColPos(); - for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) { + for (Entry<String, PartitionItem> entry : relatedPartitionItems.entrySet()) { PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos); if (res.containsKey(partitionKeyDesc)) { res.get(partitionKeyDesc).add(entry.getKey()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java index e9b4b1fe6a5..76e20ef70f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGenerator.java @@ -69,27 +69,27 @@ public class MTMVRelatedPartitionDescRollUpGenerator implements MTMVRelatedParti * @return * @throws AnalysisException */ - public Map<PartitionKeyDesc, Set<Long>> rollUpList(Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs, + public Map<PartitionKeyDesc, Set<String>> rollUpList(Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs, MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties) throws AnalysisException { Map<String, Set<String>> identityToValues = Maps.newHashMap(); - Map<String, Set<Long>> identityToPartitionIds = Maps.newHashMap(); + Map<String, Set<String>> identityToPartitionNames = Maps.newHashMap(); MTMVPartitionExprService exprSerice = MTMVPartitionExprFactory.getExprService(mvPartitionInfo.getExpr()); - for (Entry<PartitionKeyDesc, Set<Long>> entry : relatedPartitionDescs.entrySet()) { + for (Entry<PartitionKeyDesc, Set<String>> entry : relatedPartitionDescs.entrySet()) { String rollUpIdentity = exprSerice.getRollUpIdentity(entry.getKey(), mvProperties); Preconditions.checkNotNull(rollUpIdentity); if (identityToValues.containsKey(rollUpIdentity)) { identityToValues.get(rollUpIdentity).addAll(getStringValues(entry.getKey())); - identityToPartitionIds.get(rollUpIdentity).addAll(entry.getValue()); + identityToPartitionNames.get(rollUpIdentity).addAll(entry.getValue()); } else { identityToValues.put(rollUpIdentity, getStringValues(entry.getKey())); - identityToPartitionIds.put(rollUpIdentity, entry.getValue()); + identityToPartitionNames.put(rollUpIdentity, entry.getValue()); } } - Map<PartitionKeyDesc, Set<Long>> result = Maps.newHashMap(); + Map<PartitionKeyDesc, Set<String>> result = Maps.newHashMap(); for (Entry<String, Set<String>> entry : identityToValues.entrySet()) { result.put(PartitionKeyDesc.createIn(getPartitionValues(entry.getValue())), - identityToPartitionIds.get(entry.getKey())); + identityToPartitionNames.get(entry.getKey())); } return result; } @@ -125,11 +125,11 @@ public class MTMVRelatedPartitionDescRollUpGenerator implements MTMVRelatedParti * @return * @throws AnalysisException */ - public Map<PartitionKeyDesc, Set<Long>> rollUpRange(Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs, + public Map<PartitionKeyDesc, Set<String>> rollUpRange(Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs, MTMVPartitionInfo mvPartitionInfo) throws AnalysisException { - Map<PartitionKeyDesc, Set<Long>> result = Maps.newHashMap(); + Map<PartitionKeyDesc, Set<String>> result = Maps.newHashMap(); MTMVPartitionExprService exprSerice = MTMVPartitionExprFactory.getExprService(mvPartitionInfo.getExpr()); - for (Entry<PartitionKeyDesc, Set<Long>> entry : relatedPartitionDescs.entrySet()) { + for (Entry<PartitionKeyDesc, Set<String>> entry : relatedPartitionDescs.entrySet()) { PartitionKeyDesc rollUpDesc = exprSerice.generateRollUpPartitionKeyDesc(entry.getKey(), mvPartitionInfo); if (result.containsKey(rollUpDesc)) { result.get(rollUpDesc).addAll(entry.getValue()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGenerator.java index e031071192e..c6fb331631f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescSyncLimitGenerator.java @@ -43,16 +43,16 @@ public class MTMVRelatedPartitionDescSyncLimitGenerator implements MTMVRelatedPa @Override public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties, RelatedPartitionDescResult lastResult) throws AnalysisException { - Map<Long, PartitionItem> partitionItems = lastResult.getItems(); + Map<String, PartitionItem> partitionItems = lastResult.getItems(); MTMVPartitionSyncConfig config = generateMTMVPartitionSyncConfigByProperties(mvProperties); if (config.getSyncLimit() <= 0) { return; } long nowTruncSubSec = getNowTruncSubSec(config.getTimeUnit(), config.getSyncLimit()); Optional<String> dateFormat = config.getDateFormat(); - Map<Long, PartitionItem> res = Maps.newHashMap(); + Map<String, PartitionItem> res = Maps.newHashMap(); int relatedColPos = mvPartitionInfo.getRelatedColPos(); - for (Entry<Long, PartitionItem> entry : partitionItems.entrySet()) { + for (Entry<String, PartitionItem> entry : partitionItems.entrySet()) { if (entry.getValue().isGreaterThanSpecifiedTime(relatedColPos, dateFormat, nowTruncSubSec)) { res.put(entry.getKey(), entry.getValue()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index ec99a04d73f..8aee7741cee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -38,7 +38,7 @@ public interface MTMVRelatedTableIf extends TableIf { * * @return partitionId->PartitionItem */ - Map<Long, PartitionItem> getAndCopyPartitionItems(); + Map<String, PartitionItem> getAndCopyPartitionItems(); /** * getPartitionType LIST/RANGE/UNPARTITIONED @@ -65,11 +65,11 @@ public interface MTMVRelatedTableIf extends TableIf { /** * getPartitionSnapshot * - * @param partitionId + * @param partitionName * @return partition snapshot at current time * @throws AnalysisException */ - MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException; + MTMVSnapshotIf getPartitionSnapshot(String partitionName) throws AnalysisException; /** * getTableSnapshot diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java index f0199169859..03a1aefeba1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRewriteUtil.java @@ -66,7 +66,7 @@ public class MTMVRewriteUtil { && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { return res; } - Map<Long, Set<Long>> partitionMappings = null; + Map<String, Set<String>> partitionMappings = null; // check gracePeriod long gracePeriodMills = mtmv.getGracePeriod(); for (Partition partition : allPartitions) { @@ -79,8 +79,8 @@ public class MTMVRewriteUtil { if (partitionMappings == null) { partitionMappings = mtmv.calculatePartitionMappings(); } - if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getId(), - partitionMappings.get(partition.getId()), mtmvRelation.getBaseTables(), + if (MTMVPartitionUtil.isMTMVPartitionSync(mtmv, partition.getName(), + partitionMappings.get(partition.getName()), mtmvRelation.getBaseTables(), Sets.newHashSet())) { res.add(partition); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/RelatedPartitionDescResult.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/RelatedPartitionDescResult.java index 068cf1522a7..b349722a76d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/RelatedPartitionDescResult.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/RelatedPartitionDescResult.java @@ -27,27 +27,27 @@ import java.util.Set; public class RelatedPartitionDescResult { // PartitionKeyDesc to relatedTable partition ids(Different partitions may have the same PartitionKeyDesc) - private Map<PartitionKeyDesc, Set<Long>> descs; - private Map<Long, PartitionItem> items; + private Map<PartitionKeyDesc, Set<String>> descs; + private Map<String, PartitionItem> items; public RelatedPartitionDescResult() { this.descs = Maps.newHashMap(); this.items = Maps.newHashMap(); } - public Map<PartitionKeyDesc, Set<Long>> getDescs() { + public Map<PartitionKeyDesc, Set<String>> getDescs() { return descs; } - public void setDescs(Map<PartitionKeyDesc, Set<Long>> descs) { + public void setDescs(Map<PartitionKeyDesc, Set<String>> descs) { this.descs = descs; } - public Map<Long, PartitionItem> getItems() { + public Map<String, PartitionItem> getItems() { return items; } - public void setItems(Map<Long, PartitionItem> items) { + public void setItems(Map<String, PartitionItem> items) { this.items = items; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index ac8db7f9762..7c97b0f881a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.analyzer.UnboundSlot; @@ -56,6 +57,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.collect.Range; import com.google.common.collect.Sets; @@ -79,16 +81,16 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { * Construct command * * @param mv materialize view - * @param partitionIds update partitions in mv and tables + * @param partitionNames update partitions in mv and tables * @param tableWithPartKey the partitions key for different table * @return command */ - public static UpdateMvByPartitionCommand from(MTMV mv, Set<Long> partitionIds, + public static UpdateMvByPartitionCommand from(MTMV mv, Set<String> partitionNames, Map<TableIf, String> tableWithPartKey) throws UserException { NereidsParser parser = new NereidsParser(); Map<TableIf, Set<Expression>> predicates = - constructTableWithPredicates(mv, partitionIds, tableWithPartKey); - List<String> parts = constructPartsForMv(mv, partitionIds); + constructTableWithPredicates(mv, partitionNames, tableWithPartKey); + List<String> parts = constructPartsForMv(partitionNames); Plan plan = parser.parseSingle(mv.getQuerySql()); plan = plan.accept(new PredicateAdder(), predicates); if (plan instanceof Sink) { @@ -99,17 +101,17 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { return new UpdateMvByPartitionCommand(sink); } - private static List<String> constructPartsForMv(MTMV mv, Set<Long> partitionIds) { - return partitionIds.stream() - .map(id -> mv.getPartition(id).getName()) - .collect(ImmutableList.toImmutableList()); + private static List<String> constructPartsForMv(Set<String> partitionNames) { + return Lists.newArrayList(partitionNames); } private static Map<TableIf, Set<Expression>> constructTableWithPredicates(MTMV mv, - Set<Long> partitionIds, Map<TableIf, String> tableWithPartKey) { - Set<PartitionItem> items = partitionIds.stream() - .map(id -> mv.getPartitionInfo().getItem(id)) - .collect(ImmutableSet.toImmutableSet()); + Set<String> partitionNames, Map<TableIf, String> tableWithPartKey) throws AnalysisException { + Set<PartitionItem> items = Sets.newHashSet(); + for (String partitionName : partitionNames) { + PartitionItem partitionItem = mv.getPartitionItemOrAnalysisException(partitionName); + items.add(partitionItem); + } ImmutableMap.Builder<TableIf, Set<Expression>> builder = new ImmutableMap.Builder<>(); tableWithPartKey.forEach((table, colName) -> builder.put(table, constructPredicates(items, colName)) diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index bf819553e86..261b750c796 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -71,13 +71,13 @@ public class MTMVPartitionUtilTest { minTimes = 0; result = Lists.newArrayList(p1); - mtmv.getPartitionIds(); + mtmv.getPartitionNames(); minTimes = 0; - result = Lists.newArrayList(1L); + result = Sets.newHashSet("name1"); - p1.getId(); + p1.getName(); minTimes = 0; - result = 1L; + result = "name1"; mtmv.getMvPartitionInfo(); minTimes = 0; @@ -101,7 +101,7 @@ public class MTMVPartitionUtilTest { mtmv.getPartitionName(anyLong); minTimes = 0; - result = "p1"; + result = "name1"; mtmv.getRefreshSnapshot(); minTimes = 0; @@ -119,13 +119,13 @@ public class MTMVPartitionUtilTest { minTimes = 0; result = true; - baseOlapTable.getPartitionSnapshot(anyLong); + baseOlapTable.getPartitionSnapshot(anyString); minTimes = 0; result = baseSnapshotIf; baseOlapTable.getPartitionName(anyLong); minTimes = 0; - result = "p1"; + result = "name1"; refreshSnapshot.equalsWithRelatedPartition(anyString, anyString, (MTMVSnapshotIf) any); minTimes = 0; @@ -156,7 +156,7 @@ public class MTMVPartitionUtilTest { @Test public void testIsSyncWithPartition() throws AnalysisException { boolean isSyncWithPartition = MTMVPartitionUtil - .isSyncWithPartitions(mtmv, 1L, baseOlapTable, Sets.newHashSet(2L)); + .isSyncWithPartitions(mtmv, "name1", baseOlapTable, Sets.newHashSet("name2")); Assert.assertTrue(isSyncWithPartition); } @@ -170,7 +170,7 @@ public class MTMVPartitionUtilTest { } }; boolean isSyncWithPartition = MTMVPartitionUtil - .isSyncWithPartitions(mtmv, 1L, baseOlapTable, Sets.newHashSet(2L)); + .isSyncWithPartitions(mtmv, "name1", baseOlapTable, Sets.newHashSet("name2")); Assert.assertFalse(isSyncWithPartition); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGeneratorTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGeneratorTest.java index b866100b63d..15036c5e497 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGeneratorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRelatedPartitionDescRollUpGeneratorTest.java @@ -72,7 +72,7 @@ public class MTMVRelatedPartitionDescRollUpGeneratorTest { } }; MTMVRelatedPartitionDescRollUpGenerator generator = new MTMVRelatedPartitionDescRollUpGenerator(); - Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = Maps.newHashMap(); + Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = Maps.newHashMap(); PartitionKeyDesc desc20200101 = PartitionKeyDesc.createFixed( Lists.newArrayList(new PartitionValue("2020-01-01")), Lists.newArrayList(new PartitionValue("2020-01-02"))); @@ -82,10 +82,10 @@ public class MTMVRelatedPartitionDescRollUpGeneratorTest { PartitionKeyDesc desc20200201 = PartitionKeyDesc.createFixed( Lists.newArrayList(new PartitionValue("2020-02-01")), Lists.newArrayList(new PartitionValue("2020-02-02"))); - relatedPartitionDescs.put(desc20200101, Sets.newHashSet(1L)); - relatedPartitionDescs.put(desc20200102, Sets.newHashSet(2L)); - relatedPartitionDescs.put(desc20200201, Sets.newHashSet(3L)); - Map<PartitionKeyDesc, Set<Long>> res = generator.rollUpRange(relatedPartitionDescs, + relatedPartitionDescs.put(desc20200101, Sets.newHashSet("name1")); + relatedPartitionDescs.put(desc20200102, Sets.newHashSet("name2")); + relatedPartitionDescs.put(desc20200201, Sets.newHashSet("name3")); + Map<PartitionKeyDesc, Set<String>> res = generator.rollUpRange(relatedPartitionDescs, mtmvPartitionInfo); PartitionKeyDesc expectDesc202001 = PartitionKeyDesc.createFixed( @@ -95,8 +95,8 @@ public class MTMVRelatedPartitionDescRollUpGeneratorTest { Lists.newArrayList(new PartitionValue("2020-02-01")), Lists.newArrayList(new PartitionValue("2020-03-01"))); Assert.assertEquals(2, res.size()); - Assert.assertEquals(Sets.newHashSet(1L, 2L), res.get(expectDesc202001)); - Assert.assertEquals(Sets.newHashSet(3L), res.get(expectDesc202002)); + Assert.assertEquals(Sets.newHashSet("name1", "name2"), res.get(expectDesc202001)); + Assert.assertEquals(Sets.newHashSet("name3"), res.get(expectDesc202002)); } @Test @@ -127,18 +127,18 @@ public class MTMVRelatedPartitionDescRollUpGeneratorTest { } }; MTMVRelatedPartitionDescRollUpGenerator generator = new MTMVRelatedPartitionDescRollUpGenerator(); - Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = Maps.newHashMap(); - relatedPartitionDescs.put(generateInDesc("2020-01-01"), Sets.newHashSet(1L)); - relatedPartitionDescs.put(generateInDesc("2020-01-02"), Sets.newHashSet(2L)); - relatedPartitionDescs.put(generateInDesc("2020-02-01"), Sets.newHashSet(3L)); - Map<PartitionKeyDesc, Set<Long>> res = generator.rollUpList(relatedPartitionDescs, + Map<PartitionKeyDesc, Set<String>> relatedPartitionDescs = Maps.newHashMap(); + relatedPartitionDescs.put(generateInDesc("2020-01-01"), Sets.newHashSet("name1")); + relatedPartitionDescs.put(generateInDesc("2020-01-02"), Sets.newHashSet("name2")); + relatedPartitionDescs.put(generateInDesc("2020-02-01"), Sets.newHashSet("name3")); + Map<PartitionKeyDesc, Set<String>> res = generator.rollUpList(relatedPartitionDescs, mtmvPartitionInfo, Maps.newHashMap()); PartitionKeyDesc expectDesc202001 = generateInDesc("2020-01-01", "2020-01-02"); PartitionKeyDesc expectDesc202002 = generateInDesc("2020-02-01"); Assert.assertEquals(2, res.size()); - Assert.assertEquals(Sets.newHashSet(1L, 2L), res.get(expectDesc202001)); - Assert.assertEquals(Sets.newHashSet(3L), res.get(expectDesc202002)); + Assert.assertEquals(Sets.newHashSet("name1", "name2"), res.get(expectDesc202001)); + Assert.assertEquals(Sets.newHashSet("name3"), res.get(expectDesc202002)); } private PartitionKeyDesc generateInDesc(String... values) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java index 8de7ed75ccd..864478933c0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVRewriteUtilTest.java @@ -103,7 +103,8 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = true; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<Long>) any, (Set<BaseTableInfo>) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set<String>) any, + (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; result = true; @@ -130,7 +131,8 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = 2L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<Long>) any, (Set<BaseTableInfo>) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set<String>) any, + (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; result = false; @@ -150,7 +152,8 @@ public class MTMVRewriteUtilTest { minTimes = 0; result = 1L; - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<Long>) any, (Set<BaseTableInfo>) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set<String>) any, + (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; result = false; @@ -180,7 +183,8 @@ public class MTMVRewriteUtilTest { public void testGetMTMVCanRewritePartitionsNotSync() throws AnalysisException { new Expectations() { { - MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyLong, (Set<Long>) any, (Set<BaseTableInfo>) any, + MTMVPartitionUtil.isMTMVPartitionSync((MTMV) any, anyString, (Set<String>) any, + (Set<BaseTableInfo>) any, (Set<String>) any); minTimes = 0; result = false; diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java index 3bc60339500..512bd6099f0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVTaskTest.java @@ -42,11 +42,9 @@ import java.util.Map; import java.util.Set; public class MTMVTaskTest { - private long poneId = 1L; private String poneName = "p1"; - private long ptwoId = 2L; private String ptwoName = "p2"; - private List<Long> allPartitionIds = Lists.newArrayList(poneId, ptwoId); + private List<String> allPartitionNames = Lists.newArrayList(poneName, ptwoName); private MTMVRelation relation = new MTMVRelation(Sets.newHashSet(), Sets.newHashSet(), Sets.newHashSet()); @Mocked @@ -70,9 +68,9 @@ public class MTMVTaskTest { minTimes = 0; result = mtmv; - mtmv.getPartitionIds(); + mtmv.getPartitionNames(); minTimes = 0; - result = allPartitionIds; + result = Sets.newHashSet(poneName, ptwoName); mtmv.getMvPartitionInfo(); minTimes = 0; @@ -82,12 +80,12 @@ public class MTMVTaskTest { minTimes = 0; result = MTMVPartitionType.FOLLOW_BASE_TABLE; - mtmvPartitionUtil.getPartitionsIdsByNames(mtmv, Lists.newArrayList(poneName)); - minTimes = 0; - result = poneId; + // mtmvPartitionUtil.getPartitionsIdsByNames(mtmv, Lists.newArrayList(poneName)); + // minTimes = 0; + // result = poneId; mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, - (Map<Long, Set<Long>>) any); + (Map<String, Set<String>>) any); minTimes = 0; result = true; @@ -106,16 +104,16 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsManualComplete() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, null, true); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); - Assert.assertEquals(allPartitionIds, result); + List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + Assert.assertEquals(allPartitionNames, result); } @Test public void testCalculateNeedRefreshPartitionsManualPartitions() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.MANUAL, Lists.newArrayList(poneName), false); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); - Assert.assertEquals(Lists.newArrayList(poneId), result); + List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + Assert.assertEquals(Lists.newArrayList(poneName), result); } @Test @@ -129,7 +127,7 @@ public class MTMVTaskTest { }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); Assert.assertTrue(CollectionUtils.isEmpty(result)); } @@ -137,8 +135,8 @@ public class MTMVTaskTest { public void testCalculateNeedRefreshPartitionsSystemComplete() throws AnalysisException { MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); - Assert.assertEquals(allPartitionIds, result); + List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + Assert.assertEquals(allPartitionNames, result); } @Test @@ -146,15 +144,15 @@ public class MTMVTaskTest { new Expectations() { { mtmvPartitionUtil.isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, - (Map<Long, Set<Long>>) any); + (Map<String, Set<String>>) any); minTimes = 0; result = false; } }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); - Assert.assertEquals(allPartitionIds, result); + List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + Assert.assertEquals(allPartitionNames, result); } @Test @@ -162,7 +160,7 @@ public class MTMVTaskTest { new Expectations() { { mtmvPartitionUtil - .isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, (Map<Long, Set<Long>>) any); + .isMTMVSync(mtmv, (Set<BaseTableInfo>) any, (Set<String>) any, (Map<String, Set<String>>) any); minTimes = 0; result = false; @@ -171,14 +169,14 @@ public class MTMVTaskTest { result = RefreshMethod.AUTO; mtmvPartitionUtil - .getMTMVNeedRefreshPartitions(mtmv, (Set<BaseTableInfo>) any, (Map<Long, Set<Long>>) any); + .getMTMVNeedRefreshPartitions(mtmv, (Set<BaseTableInfo>) any, (Map<String, Set<String>>) any); minTimes = 0; - result = Lists.newArrayList(ptwoId); + result = Lists.newArrayList(ptwoName); } }; MTMVTaskContext context = new MTMVTaskContext(MTMVTaskTriggerMode.SYSTEM); MTMVTask task = new MTMVTask(mtmv, relation, context); - List<Long> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); - Assert.assertEquals(Lists.newArrayList(ptwoId), result); + List<String> result = task.calculateNeedRefreshPartitions(Maps.newHashMap()); + Assert.assertEquals(Lists.newArrayList(ptwoName), result); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org