This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3d1f94b4c27b23ea10752a499923b81ec527e7a0 Author: zhangdong <493738...@qq.com> AuthorDate: Mon Sep 9 19:06:18 2024 +0800 [fix](mtmv)fix nested mtmv not refresh (#40433) fix nested mtmv not refresh because the partition version remains unchanged after inserting overwrite for the underlying materialized view we add partitionId in snapshot --- .../java/org/apache/doris/catalog/OlapTable.java | 3 +- .../doris/mtmv/MTMVRefreshPartitionSnapshot.java | 45 +++++++++++++- .../org/apache/doris/mtmv/MTMVVersionSnapshot.java | 25 +++++++- .../data/mtmv_p0/test_multi_level_mtmv.out | 11 ++++ .../test_upgrade_downgrade_olap_mtmv.out | 9 +++ .../suites/mtmv_p0/test_multi_level_mtmv.groovy | 16 +++++ .../suites/mtmv_up_down_olap_p0/load.groovy | 71 ++++++++++++++++++++++ .../test_upgrade_downgrade_olap_mtmv.groovy | 32 ++++++++++ 8 files changed, 208 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 78d97e8e48b..47167cc1165 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -3040,9 +3040,10 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException { Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions(); + long partitionId = getPartitionOrAnalysisException(partitionName).getId(); long visibleVersion = partitionVersions.containsKey(partitionName) ? partitionVersions.get(partitionName) : getPartitionOrAnalysisException(partitionName).getVisibleVersion(); - return new MTMVVersionSnapshot(visibleVersion); + return new MTMVVersionSnapshot(visibleVersion, partitionId); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java index 63bbfc2e037..fa17ed76666 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java @@ -18,6 +18,9 @@ package org.apache.doris.mtmv; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.AnalysisException; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; @@ -74,6 +77,46 @@ public class MTMVRefreshPartitionSnapshot { } public void compatible(MTMV mtmv) { + try { + // snapshot add partitionId resolve problem of insert overwrite + compatiblePartitions(mtmv); + } catch (Throwable e) { + LOG.warn("MTMV compatiblePartitions failed, mtmv: {}", mtmv.getName(), e); + } + try { + // change table id to BaseTableInfo + compatibleTables(mtmv); + } catch (Throwable e) { + LOG.warn("MTMV compatibleTables failed, mtmv: {}", mtmv.getName(), e); + } + } + + private void compatiblePartitions(MTMV mtmv) throws AnalysisException { + if (!checkHasDataWithoutPartitionId()) { + return; + } + OlapTable relatedTable = (OlapTable) mtmv.getMvPartitionInfo().getRelatedTable(); + for (Entry<String, MTMVSnapshotIf> entry : partitions.entrySet()) { + MTMVVersionSnapshot versionSnapshot = (MTMVVersionSnapshot) entry.getValue(); + if (versionSnapshot.getId() == 0) { + Partition partition = relatedTable.getPartition(entry.getKey()); + if (partition != null) { + (versionSnapshot).setId(partition.getId()); + } + } + } + } + + private boolean checkHasDataWithoutPartitionId() { + for (MTMVSnapshotIf snapshot : partitions.values()) { + if (snapshot instanceof MTMVVersionSnapshot && ((MTMVVersionSnapshot) snapshot).getId() == 0) { + return true; + } + } + return false; + } + + private void compatibleTables(MTMV mtmv) { if (tables.size() == tablesInfo.size()) { return; } @@ -87,7 +130,7 @@ public class MTMVRefreshPartitionSnapshot { if (tableInfo.isPresent()) { tablesInfo.put(tableInfo.get(), entry.getValue()); } else { - LOG.warn("MTMV compatible failed, tableId: {}, relationTables: {}", entry.getKey(), + LOG.warn("MTMV compatibleTables failed, tableId: {}, relationTables: {}", entry.getKey(), relation.getBaseTablesOneLevel()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java index 0eb7860bc54..2440649462e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVVersionSnapshot.java @@ -24,10 +24,30 @@ public class MTMVVersionSnapshot implements MTMVSnapshotIf { @SerializedName("v") private long version; + // The partition version after insert overwrite is 1, + // which may cause the upper level materialized view to be unaware of changes in the data at the bottom level. + // However, the partition ID after overwrite will change, so the partition ID should be added. + // only for partition, table will always 0 + @SerializedName("id") + private long id; + public MTMVVersionSnapshot(long version) { this.version = version; } + public MTMVVersionSnapshot(long version, long id) { + this.version = version; + this.id = id; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -37,18 +57,19 @@ public class MTMVVersionSnapshot implements MTMVSnapshotIf { return false; } MTMVVersionSnapshot that = (MTMVVersionSnapshot) o; - return version == that.version; + return version == that.version && id == that.id; } @Override public int hashCode() { - return Objects.hashCode(version); + return Objects.hashCode(version, id); } @Override public String toString() { return "MTMVVersionSnapshot{" + "version=" + version + + ", id=" + id + '}'; } } diff --git a/regression-test/data/mtmv_p0/test_multi_level_mtmv.out b/regression-test/data/mtmv_p0/test_multi_level_mtmv.out index 7543b21ffa7..7d44e381cc8 100644 --- a/regression-test/data/mtmv_p0/test_multi_level_mtmv.out +++ b/regression-test/data/mtmv_p0/test_multi_level_mtmv.out @@ -11,6 +11,17 @@ -- !mv2_should_one_partition -- ["p_2"] +-- !mv1_should_one_partition_again -- +["p_2"] + +-- !mv2_should_one_partition_again -- +["p_2"] + +-- !mv2_again -- +1 1 +2 2 +2 3 + -- !status1 -- multi_level_mtmv1 SCHEMA_CHANGE SUCCESS diff --git a/regression-test/data/mtmv_up_down_olap_p0/test_upgrade_downgrade_olap_mtmv.out b/regression-test/data/mtmv_up_down_olap_p0/test_upgrade_downgrade_olap_mtmv.out new file mode 100644 index 00000000000..760e94479a8 --- /dev/null +++ b/regression-test/data/mtmv_up_down_olap_p0/test_upgrade_downgrade_olap_mtmv.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !refresh_init -- +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !mtmv_sync -- +true + diff --git a/regression-test/suites/mtmv_p0/test_multi_level_mtmv.groovy b/regression-test/suites/mtmv_p0/test_multi_level_mtmv.groovy index 55689b74148..33a876c46d4 100644 --- a/regression-test/suites/mtmv_p0/test_multi_level_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_multi_level_mtmv.groovy @@ -87,6 +87,22 @@ suite("test_multi_level_mtmv") { waitingMTMVTaskFinishedByMvName(mv2) order_qt_mv2_should_one_partition "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1" + // insert into p2 again, check partition version if change + sql """ + INSERT INTO ${tableName} VALUES(2,3); + """ + sql """ + REFRESH MATERIALIZED VIEW ${mv1} AUTO + """ + waitingMTMVTaskFinishedByMvName(mv1) + order_qt_mv1_should_one_partition_again "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv1}' order by CreateTime desc limit 1" + sql """ + REFRESH MATERIALIZED VIEW ${mv2} AUTO + """ + waitingMTMVTaskFinishedByMvName(mv2) + order_qt_mv2_should_one_partition_again "select NeedRefreshPartitions from tasks('type'='mv') where MvName = '${mv2}' order by CreateTime desc limit 1" + order_qt_mv2_again "select * from ${mv2}" + // drop table sql """ drop table ${tableName} diff --git a/regression-test/suites/mtmv_up_down_olap_p0/load.groovy b/regression-test/suites/mtmv_up_down_olap_p0/load.groovy new file mode 100644 index 00000000000..f909b33064d --- /dev/null +++ b/regression-test/suites/mtmv_up_down_olap_p0/load.groovy @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_upgrade_downgrade_prepare_olap_mtmv","p0,mtmv,restart_fe") { + String suiteName = "mtmv_up_down_olap" + String mvName = "${suiteName}_mtmv" + String tableName = "${suiteName}_table" + String tableName2 = "${suiteName}_table2" + + sql """drop materialized view if exists ${mvName};""" + sql """drop table if exists `${tableName}`""" + sql """drop table if exists `${tableName2}`""" + + sql """ + CREATE TABLE `${tableName}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + PARTITION BY RANGE(`date`) + (PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')), + PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')), + PARTITION p201703_all VALUES [('2017-03-01'), ('2017-04-01'))) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,"2017-01-15",1),(2,"2017-02-15",2),(3,"2017-03-15",3); + """ + + sql """ + CREATE TABLE `${tableName2}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `age` SMALLINT NOT NULL COMMENT '\"年龄\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `age`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName2} values(1,1),(2,2),(3,3); + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName} + REFRESH AUTO ON MANUAL + partition by(`date`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT a.* FROM ${tableName} a inner join ${tableName2} b on a.user_id=b.user_id; + """ + waitingMTMVTaskFinishedByMvName(mvName) +} diff --git a/regression-test/suites/mtmv_up_down_olap_p0/test_upgrade_downgrade_olap_mtmv.groovy b/regression-test/suites/mtmv_up_down_olap_p0/test_upgrade_downgrade_olap_mtmv.groovy new file mode 100644 index 00000000000..253908ff4ae --- /dev/null +++ b/regression-test/suites/mtmv_up_down_olap_p0/test_upgrade_downgrade_olap_mtmv.groovy @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_upgrade_downgrade_olap_mtmv","p0,mtmv,restart_fe") { + String suiteName = "mtmv_up_down_olap" + String dbName = context.config.getDbNameByFile(context.file) + String mvName = "${suiteName}_mtmv" + String tableName = "${suiteName}_table" + // test data is normal + order_qt_refresh_init "SELECT * FROM ${mvName}" + // test is sync + order_qt_mtmv_sync "select SyncWithBaseTables from mv_infos('database'='${dbName}') where Name='${mvName}'" + sql """ + REFRESH MATERIALIZED VIEW ${mvName} complete + """ + // test can refresh success + waitingMTMVTaskFinishedByMvName(mvName) +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org