This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 826eef68b0d8e7d3f83227862db9d1d74180cee5
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Thu Aug 29 23:33:05 2024 +0800

    [enhancement](schema-change) Support light schema change on hash columns 
and agg key columns with varchar type to change length (#39319)
    
    ## Proposed changes
    
    1. Schema change should rebuild distribution info after modifying
    columns, especially distribution columns. Or it may cause dynamic
    partition failed when checking distribution columns' equality.
    2. Support hash key columns to do light schema change. For unique key or
    dup key columns, could not be enabled temporarily due to some historical
    reasons. See #39798 .
---
 .../apache/doris/alter/SchemaChangeHandler.java    |  9 +--
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  1 +
 .../main/java/org/apache/doris/catalog/Column.java |  2 +-
 .../apache/doris/catalog/HashDistributionInfo.java |  4 ++
 .../java/org/apache/doris/catalog/OlapTable.java   | 24 +++++++
 ...t_dynamic_partition_mod_distribution_key.groovy | 75 ++++++++++++++++++++++
 ..._varchar_schema_change_with_distribution.groovy | 52 ---------------
 7 files changed, 107 insertions(+), 60 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 3fbcd3f629b..87d76d8de39 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -614,13 +614,7 @@ public class SchemaChangeHandler extends AlterHandler {
                     if (columnPos == null && col.getDataType() == 
PrimitiveType.VARCHAR
                             && modColumn.getDataType() == 
PrimitiveType.VARCHAR) {
                         col.checkSchemaChangeAllowed(modColumn);
-                        // If col and modColumn is not key, it allow light 
schema change,
-                        // of course, olapTable has been enable light schema 
change
-                        if (modColumn.isKey() || col.isKey()) {
-                            lightSchemaChange = false;
-                        } else {
-                            lightSchemaChange = 
olapTable.getEnableLightSchemaChange();
-                        }
+                        lightSchemaChange = 
olapTable.getEnableLightSchemaChange();
                     }
                     if (col.isClusterKey()) {
                         throw new DdlException("Can not modify cluster key 
column: " + col.getName());
@@ -3047,6 +3041,7 @@ public class SchemaChangeHandler extends AlterHandler {
         }
         olapTable.setIndexes(indexes);
         olapTable.rebuildFullSchema();
+        olapTable.rebuildDistributionInfo();
     }
 
     public void 
replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo 
info)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 6e4fee168eb..dc25da17b8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -720,6 +720,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
         }
         // rebuild table's full schema
         tbl.rebuildFullSchema();
+        tbl.rebuildDistributionInfo();
 
         // update bloom filter
         if (hasBfChange) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 98acd9453dd..fc69c31f0e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -842,7 +842,7 @@ public class Column implements GsonPostProcessable {
             }
         }
 
-        if (this.aggregationType != other.aggregationType) {
+        if (!Objects.equals(this.aggregationType, other.aggregationType)) {
             throw new DdlException("Can not change aggregation type");
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
index 2de384961dd..7d1ab1db8d1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
@@ -169,4 +169,8 @@ public class HashDistributionInfo extends DistributionInfo {
     public RandomDistributionInfo toRandomDistributionInfo() {
         return new RandomDistributionInfo(bucketNum);
     }
+
+    public void setDistributionColumns(List<Column> column) {
+        this.distributionColumns = column;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 563d348e3ca..78d97e8e48b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -473,6 +473,30 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         }
     }
 
+    public void rebuildDistributionInfo() {
+        if (!Objects.equals(defaultDistributionInfo.getType(), 
DistributionInfoType.HASH)) {
+            return;
+        }
+        HashDistributionInfo distributionInfo = (HashDistributionInfo) 
defaultDistributionInfo;
+        Set<String> originalColumnsNames =
+                distributionInfo.getDistributionColumns()
+                        .stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toSet());
+
+        List<Column> newDistributionColumns = getBaseSchema()
+                .stream()
+                .filter(column -> 
originalColumnsNames.contains(column.getName()))
+                .map(Column::new)
+                .collect(Collectors.toList());
+        distributionInfo.setDistributionColumns(newDistributionColumns);
+
+        getPartitions()
+                .stream()
+                .map(Partition::getDistributionInfo)
+                .forEach(info -> ((HashDistributionInfo) 
info).setDistributionColumns(newDistributionColumns));
+    }
+
     public boolean deleteIndexInfo(String indexName) {
         if (!indexNameToId.containsKey(indexName)) {
             return false;
diff --git 
a/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy
 
b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy
new file mode 100644
index 00000000000..db44f59216b
--- /dev/null
+++ 
b/regression-test/suites/partition_p0/dynamic_partition/test_dynamic_partition_mod_distribution_key.groovy
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_dynamic_partition_mod_distribution_key") {
+    def options = new ClusterOptions()
+    options.setFeNum(2)
+    docker(options) {
+        // FIXME: for historical bugs, this case will fail if adding k2 as dup 
key or unique key
+        // see in https://github.com/apache/doris/issues/39798
+        def keys = ["DUPLICATE KEY (k1)", "UNIQUE KEY (k1)", "AGGREGATE KEY 
(k1, k2)"]
+        def aggTypes = ["", "", "REPLACE"]
+        for (i in 0..<3) {
+            def key = keys.get(i)
+            def aggType = aggTypes.get(i)
+            def tableName = "test_dynamic_partition_mod_distribution_key"
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+
+            sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                k1 DATE NOT NULL,
+                k2 VARCHAR(20) NOT NULL,
+                v INT ${aggType}
+            ) ${key} 
+            PARTITION BY RANGE(k1) ()
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "dynamic_partition.enable"="true",
+                "dynamic_partition.end"="3",
+                "dynamic_partition.buckets"="1",
+                "dynamic_partition.start"="-3",
+                "dynamic_partition.prefix"="p",
+                "dynamic_partition.time_unit"="DAY",
+                "dynamic_partition.create_history_partition"="true",
+                "dynamic_partition.replication_allocation" = 
"tag.location.default: 1")
+            """
+
+            sql """ alter table ${tableName} modify column k1 comment 
'new_comment_for_k1' """
+            sql """ alter table ${tableName} modify column k2 varchar(255) """
+
+            cluster.restartFrontends()
+            sleep(30000)
+            context.reconnectFe()
+
+            sql """ ADMIN SET FRONTEND CONFIG 
('dynamic_partition_check_interval_seconds' = '1') """
+            sql """ alter table ${tableName} set('dynamic_partition.end'='5') 
"""
+            result = sql "show partitions from ${tableName}"
+            for (def retry = 0; retry < 10; retry++) { // at most wait 120s
+                if (result.size() == 9) {
+                    break;
+                }
+                logger.info("wait dynamic partition scheduler, sleep 1s")
+                sleep(1000)  // sleep 1s
+                result = sql "show partitions from ${tableName}"
+            }
+            assertEquals(9, result.size())
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy
 
b/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy
deleted file mode 100644
index 5068f0aec43..00000000000
--- 
a/regression-test/suites/schema_change_p0/test_varchar_schema_change_with_distribution.groovy
+++ /dev/null
@@ -1,52 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite ("test_varchar_schema_change_with_distribution") {
-    def tableName = "test_varchar_schema_change_with_distribution"
-    sql """ DROP TABLE IF EXISTS ${tableName} FORCE;"""
-
-    sql """
-        CREATE TABLE ${tableName}
-        (
-            dt datetime NOT NULL COMMENT 'εˆ†εŒΊζ—₯期',
-            citycode SMALLINT,
-            watts_range VARCHAR(20),
-            pv BIGINT SUM DEFAULT '0'
-        )
-        AGGREGATE KEY(dt, citycode, watts_range)
-        PARTITION BY RANGE(dt)()
-        DISTRIBUTED BY HASH(dt, watts_range) BUCKETS 1
-        PROPERTIES (
-            "dynamic_partition.enable"="true",
-            "dynamic_partition.end"="3",
-            "dynamic_partition.buckets"="1",
-            "dynamic_partition.start"="-3",
-            "dynamic_partition.prefix"="p",
-            "dynamic_partition.time_unit"="HOUR",
-            "dynamic_partition.create_history_partition"="true",
-            "dynamic_partition.replication_allocation" = 
"tag.location.default: 1"
-        );
-    """
-
-    test {
-        sql """ alter table ${tableName} modify column watts_range varchar(30) 
"""
-        exception "Can not modify distribution column"
-    }
-
-    sql """ DROP TABLE IF EXISTS ${tableName} FORCE;"""
-
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to