This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 826eef68b0d8e7d3f83227862db9d1d74180cee5 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Thu Aug 29 23:33:05 2024 +0800 [enhancement](schema-change) Support light schema change on hash columns and agg key columns with varchar type to change length (#39319) ## Proposed changes 1. Schema change should rebuild distribution info after modifying columns, especially distribution columns. Or it may cause dynamic partition failed when checking distribution columns' equality. 2. Support hash key columns to do light schema change. For unique key or dup key columns, could not be enabled temporarily due to some historical reasons. See #39798 . --- .../apache/doris/alter/SchemaChangeHandler.java | 9 +-- .../org/apache/doris/alter/SchemaChangeJobV2.java | 1 + .../main/java/org/apache/doris/catalog/Column.java | 2 +- .../apache/doris/catalog/HashDistributionInfo.java | 4 ++ .../java/org/apache/doris/catalog/OlapTable.java | 24 +++++++ ...t_dynamic_partition_mod_distribution_key.groovy | 75 ++++++++++++++++++++++ ..._varchar_schema_change_with_distribution.groovy | 52 --------------- 7 files changed, 107 insertions(+), 60 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 3fbcd3f629b..87d76d8de39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -614,13 +614,7 @@ public class SchemaChangeHandler extends AlterHandler { if (columnPos == null && col.getDataType() == PrimitiveType.VARCHAR && modColumn.getDataType() == PrimitiveType.VARCHAR) { col.checkSchemaChangeAllowed(modColumn); - // If col and modColumn is not key, it allow light schema change, - // of course, olapTable has been enable light schema change - if (modColumn.isKey() || col.isKey()) { - lightSchemaChange = false; - } else { - lightSchemaChange = olapTable.getEnableLightSchemaChange(); - } + lightSchemaChange = olapTable.getEnableLightSchemaChange(); } if (col.isClusterKey()) { throw new DdlException("Can not modify cluster key column: " + col.getName()); @@ -3047,6 +3041,7 @@ public class SchemaChangeHandler extends AlterHandler { } olapTable.setIndexes(indexes); olapTable.rebuildFullSchema(); + olapTable.rebuildDistributionInfo(); } public void replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 6e4fee168eb..dc25da17b8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -720,6 +720,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } // rebuild table's full schema tbl.rebuildFullSchema(); + tbl.rebuildDistributionInfo(); // update bloom filter if (hasBfChange) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index 98acd9453dd..fc69c31f0e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -842,7 +842,7 @@ public class Column implements GsonPostProcessable { } } - if (this.aggregationType != other.aggregationType) { + if (!Objects.equals(this.aggregationType, other.aggregationType)) { throw new DdlException("Can not change aggregation type"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java index 2de384961dd..7d1ab1db8d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java @@ -169,4 +169,8 @@ public class HashDistributionInfo extends DistributionInfo { public RandomDistributionInfo toRandomDistributionInfo() { return new RandomDistributionInfo(bucketNum); } + + public void setDistributionColumns(List<Column> column) { + this.distributionColumns = column; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 563d348e3ca..78d97e8e48b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -473,6 +473,30 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc } } + public void rebuildDistributionInfo() { + if (!Objects.equals(defaultDistributionInfo.getType(), DistributionInfoType.HASH)) { + return; + } + HashDistributionInfo distributionInfo = (HashDistributionInfo) defaultDistributionInfo; + Set<String> originalColumnsNames = + distributionInfo.getDistributionColumns() + .stream() + .map(Column::getName) + .collect(Collectors.toSet()); + + List<Column> newDistributionColumns = getBaseSchema() + .stream() + .filter(column -> originalColumnsNames.contains(column.getName())) + .map(Column::new) + .collect(Collectors.toList()); + distributionInfo.setDistributionColumns(newDistributionColumns); + + getPartitions() + .stream() + .map(Partition::getDistributionInfo) + .forEach(info -> ((HashDistributionInfo) info).setDistributionColumns(newDistributionColumns)); + } + public boolean deleteIndexInfo(String indexName) { if (!indexNameToId.containsKey(indexName)) { return false; diff --git a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy new file mode 100644 index 00000000000..db44f59216b --- /dev/null +++ b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.apache.doris.regression.suite.ClusterOptions + +suite("test_dynamic_partition_mod_distribution_key") { + def options = new ClusterOptions() + options.setFeNum(2) + docker(options) { + // FIXME: for historical bugs, this case will fail if adding k2 as dup key or unique key + // see in https://github.com/apache/doris/issues/39798 + def keys = ["DUPLICATE KEY (k1)", "UNIQUE KEY (k1)", "AGGREGATE KEY (k1, k2)"] + def aggTypes = ["", "", "REPLACE"] + for (i in 0..<3) { + def key = keys.get(i) + def aggType = aggTypes.get(i) + def tableName = "test_dynamic_partition_mod_distribution_key" + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + k1 DATE NOT NULL, + k2 VARCHAR(20) NOT NULL, + v INT ${aggType} + ) ${key} + PARTITION BY RANGE(k1) () + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "dynamic_partition.enable"="true", + "dynamic_partition.end"="3", + "dynamic_partition.buckets"="1", + "dynamic_partition.start"="-3", + "dynamic_partition.prefix"="p", + "dynamic_partition.time_unit"="DAY", + "dynamic_partition.create_history_partition"="true", + "dynamic_partition.replication_allocation" = "tag.location.default: 1") + """ + + sql """ alter table ${tableName} modify column k1 comment 'new_comment_for_k1' """ + sql """ alter table ${tableName} modify column k2 varchar(255) """ + + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + + sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """ + sql """ alter table ${tableName} set('dynamic_partition.end'='5') """ + result = sql "show partitions from ${tableName}" + for (def retry = 0; retry < 10; retry++) { // at most wait 120s + if (result.size() == 9) { + break; + } + logger.info("wait dynamic partition scheduler, sleep 1s") + sleep(1000) // sleep 1s + result = sql "show partitions from ${tableName}" + } + assertEquals(9, result.size()) + } + } +} \ No newline at end of file diff --git a/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy b/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy deleted file mode 100644 index 5068f0aec43..00000000000 --- a/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite ("test_varchar_schema_change_with_distribution") { - def tableName = "test_varchar_schema_change_with_distribution" - sql """ DROP TABLE IF EXISTS ${tableName} FORCE;""" - - sql """ - CREATE TABLE ${tableName} - ( - dt datetime NOT NULL COMMENT 'εεΊζ₯ζ', - citycode SMALLINT, - watts_range VARCHAR(20), - pv BIGINT SUM DEFAULT '0' - ) - AGGREGATE KEY(dt, citycode, watts_range) - PARTITION BY RANGE(dt)() - DISTRIBUTED BY HASH(dt, watts_range) BUCKETS 1 - PROPERTIES ( - "dynamic_partition.enable"="true", - "dynamic_partition.end"="3", - "dynamic_partition.buckets"="1", - "dynamic_partition.start"="-3", - "dynamic_partition.prefix"="p", - "dynamic_partition.time_unit"="HOUR", - "dynamic_partition.create_history_partition"="true", - "dynamic_partition.replication_allocation" = "tag.location.default: 1" - ); - """ - - test { - sql """ alter table ${tableName} modify column watts_range varchar(30) """ - exception "Can not modify distribution column" - } - - sql """ DROP TABLE IF EXISTS ${tableName} FORCE;""" - -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org