This is an automated email from the ASF dual-hosted git repository. caiconghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 46ca23f [Feature] Support Changing the bucketing mode of the table from Hash Distribution to Random Distribution (#8259) 46ca23f is described below commit 46ca23f2161767b2d9aff27da7621183175145a8 Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Fri Mar 4 09:05:23 2022 +0800 [Feature] Support Changing the bucketing mode of the table from Hash Distribution to Random Distribution (#8259) Co-authored-by: caiconghui1 <caicongh...@jd.com> --- .../sql-statements/Data Definition/ALTER TABLE.md | 4 +- .../sql-statements/Data Definition/CREATE TABLE.md | 2 + .../sql-statements/Data Definition/ALTER TABLE.md | 4 +- .../sql-statements/Data Definition/CREATE TABLE.md | 2 +- .../analysis/ModifyTablePropertiesClause.java | 5 ++- .../java/org/apache/doris/catalog/Catalog.java | 50 +++++++++------------- .../apache/doris/catalog/HashDistributionInfo.java | 4 ++ .../java/org/apache/doris/catalog/OlapTable.java | 14 +++--- .../java/org/apache/doris/catalog/Partition.java | 10 ++--- .../doris/catalog/RandomDistributionInfo.java | 14 ------ .../java/org/apache/doris/persist/EditLog.java | 2 +- .../apache/doris/alter/SchemaChangeJobV2Test.java | 19 ++++++-- .../org/apache/doris/catalog/CatalogTestUtil.java | 11 +++-- .../java/org/apache/doris/catalog/FakeEditLog.java | 6 +++ 14 files changed, 71 insertions(+), 76 deletions(-) diff --git a/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md index 2df6a15..984220a 100644 --- a/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md +++ b/docs/en/sql-reference/sql-statements/Data Definition/ALTER TABLE.md @@ -369,9 +369,9 @@ under the License. ALTER TABLE example_db.my_table set ("colocate_with" = "t1"); - 13. Change the bucketing mode of the table from Random Distribution to Hash Distribution + 13. Change the bucketing mode of the table from Hash Distribution to Random Distribution - ALTER TABLE example_db.my_table set ("distribution_type" = "hash"); + ALTER TABLE example_db.my_table set ("distribution_type" = "random"); 14. Modify the dynamic partition properties of the table (support adding dynamic partition properties to tables without dynamic partition properties) diff --git a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 06ae914..9d68e9a 100644 --- a/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/en/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -276,6 +276,8 @@ Syntax: `DISTRIBUTED BY RANDOM [BUCKETS num]` Explain: Use random numbers for bucketing. + Suggestion: It is recommended to use random bucketing when there is no suitable key for hash bucketing to make the data of the table evenly distributed. + 7. PROPERTIES 1) If ENGINE type is olap. User can specify storage medium, cooldown time and replication number: diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md index 1738b1a..c5a4a37 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/ALTER TABLE.md @@ -369,9 +369,9 @@ under the License. ALTER TABLE example_db.my_table set ("colocate_with" = "t1"); - 13. 将表的分桶方式由 Random Distribution 改为 Hash Distribution + 13. 将表的分桶方式由 Hash Distribution 改为 Random Distribution - ALTER TABLE example_db.my_table set ("distribution_type" = "hash"); + ALTER TABLE example_db.my_table set ("distribution_type" = "random"); 14. 修改表的动态分区属性(支持未添加动态分区属性的表添加动态分区属性) ALTER TABLE example_db.my_table set ("dynamic_partition.enable" = "false"); diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md index 989a355..872a0a2 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Definition/CREATE TABLE.md @@ -292,7 +292,7 @@ under the License. `DISTRIBUTED BY RANDOM [BUCKETS num]` 说明: 使用随机数进行分桶。 - 建议:建议使用Hash分桶方式 + 建议: 当没有合适的key做哈希分桶使得表的数据均匀分布的时候,建议使用RANDOM分桶方式。 7. PROPERTIES 1) 如果 ENGINE 类型为 olap diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index 9d77a9c..da12a87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -55,7 +55,10 @@ public class ModifyTablePropertiesClause extends AlterTableClause { throw new AnalysisException("Can only change storage type to COLUMN"); } } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) { - throw new AnalysisException("Cannot change distribution type for olap table now"); + if (!properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE).equalsIgnoreCase("random")) { + throw new AnalysisException("Can only change distribution type from HASH to RANDOM"); + } + this.needTableStable = false; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK)) { if (!properties.get(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK).equalsIgnoreCase("true")) { throw new AnalysisException( diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index b120610..af17ec5 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -5774,33 +5774,26 @@ public class Catalog { throw new DdlException("Only support change partitioned table's distribution."); } - DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo(); - if (defaultDistributionInfo.getType() != DistributionInfoType.HASH) { - throw new DdlException("Cannot change default bucket number of distribution type " + defaultDistributionInfo.getType()); - } - DistributionDesc distributionDesc = modifyDistributionClause.getDistributionDesc(); - - DistributionInfo distributionInfo = null; - - List<Column> baseSchema = olapTable.getBaseSchema(); - if (distributionDesc != null) { - distributionInfo = distributionDesc.toDistributionInfo(baseSchema); + DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo(); + List<Column> baseSchema = olapTable.getBaseSchema(); + DistributionInfo distributionInfo = distributionDesc.toDistributionInfo(baseSchema); // for now. we only support modify distribution's bucket num - if (distributionInfo.getType() != DistributionInfoType.HASH) { - throw new DdlException("Cannot change distribution type to " + distributionInfo.getType()); + if (distributionInfo.getType() != defaultDistributionInfo.getType()) { + throw new DdlException("Cannot change distribution type when modify default distribution bucket num"); } - - HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; - List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns(); - List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns(); - if (!newDistriCols.equals(defaultDistriCols)) { - throw new DdlException("Cannot assign hash distribution with different distribution cols. " - + "default is: " + defaultDistriCols); + if (distributionInfo.getType() == DistributionInfoType.HASH) { + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns(); + List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns(); + if (!newDistriCols.equals(defaultDistriCols)) { + throw new DdlException("Cannot assign hash distribution with different distribution cols. " + + "default is: " + defaultDistriCols); + } } - int bucketNum = hashDistributionInfo.getBucketNum(); + int bucketNum = distributionInfo.getBucketNum(); if (bucketNum <= 0) { throw new DdlException("Cannot assign hash distribution buckets less than 1"); } @@ -5816,7 +5809,7 @@ public class Catalog { } } - public void replayModifyTableDefaultDistributionBucketNum(short opCode, ModifyTableDefaultDistributionBucketNumOperationLog info) throws MetaNotFoundException { + public void replayModifyTableDefaultDistributionBucketNum(ModifyTableDefaultDistributionBucketNumOperationLog info) throws MetaNotFoundException { long dbId = info.getDbId(); long tableId = info.getTableId(); int bucketNum = info.getBucketNum(); @@ -6987,17 +6980,16 @@ public class Catalog { } } - // Convert table's distribution type from random to hash. - // random distribution is no longer supported. + // Convert table's distribution type from hash to random. public void convertDistributionType(Database db, OlapTable tbl) throws DdlException { tbl.writeLockOrDdlException(); try { - if (!tbl.convertRandomDistributionToHashDistribution()) { - throw new DdlException("Table " + tbl.getName() + " is not random distributed"); + if (!tbl.convertHashDistributionToRandomDistribution()) { + throw new DdlException("Table " + tbl.getName() + " is not hash distributed"); } TableInfo tableInfo = TableInfo.createForModifyDistribution(db.getId(), tbl.getId()); editLog.logModifyDistributionType(tableInfo); - LOG.info("finished to modify distribution type of table: " + tbl.getName()); + LOG.info("finished to modify distribution type of table from hash to random : " + tbl.getName()); } finally { tbl.writeUnlock(); } @@ -7008,8 +7000,8 @@ public class Catalog { OlapTable olapTable = db.getTableOrMetaException(info.getTableId(), TableType.OLAP); olapTable.writeLock(); try { - olapTable.convertRandomDistributionToHashDistribution(); - LOG.info("replay modify distribution type of table: " + olapTable.getName()); + olapTable.convertHashDistributionToRandomDistribution(); + LOG.info("replay modify distribution type of table from hash to random : " + olapTable.getName()); } finally { olapTable.writeUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java index 9fc175c..be9526a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java @@ -146,4 +146,8 @@ public class HashDistributionInfo extends DistributionInfo { return builder.toString(); } + + public RandomDistributionInfo toRandomDistributionInfo() { + return new RandomDistributionInfo(bucketNum); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 649f7fa..13590e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1477,17 +1477,13 @@ public class OlapTable extends Table { return keysNum; } - public boolean convertRandomDistributionToHashDistribution() { + public boolean convertHashDistributionToRandomDistribution() { boolean hasChanged = false; - List<Column> baseSchema = getBaseSchema(); - if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) { - defaultDistributionInfo = ((RandomDistributionInfo) defaultDistributionInfo).toHashDistributionInfo(baseSchema); + if (defaultDistributionInfo.getType() == DistributionInfoType.HASH) { + defaultDistributionInfo = ((HashDistributionInfo) defaultDistributionInfo).toRandomDistributionInfo(); hasChanged = true; - } - - for (Partition partition : idToPartition.values()) { - if (partition.convertRandomDistributionToHashDistribution(baseSchema)) { - hasChanged = true; + for (Partition partition : idToPartition.values()) { + partition.convertHashDistributionToRandomDistribution(); } } return hasChanged; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 8b94e28..045b4cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -423,19 +423,15 @@ public class Partition extends MetaObject implements Writable { } buffer.append("committedVersion: ").append(visibleVersion).append("; "); - buffer.append("distribution_info.type: ").append(distributionInfo.getType().name()).append("; "); buffer.append("distribution_info: ").append(distributionInfo.toString()); return buffer.toString(); } - public boolean convertRandomDistributionToHashDistribution(List<Column> baseSchema) { - boolean hasChanged = false; - if (distributionInfo.getType() == DistributionInfoType.RANDOM) { - distributionInfo = ((RandomDistributionInfo) distributionInfo).toHashDistributionInfo(baseSchema); - hasChanged = true; + public void convertHashDistributionToRandomDistribution() { + if (distributionInfo.getType() == DistributionInfoType.HASH) { + distributionInfo = ((HashDistributionInfo) distributionInfo).toRandomDistributionInfo(); } - return hasChanged; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java index 013d6e3..72fbf06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java @@ -20,12 +20,9 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.DistributionDesc; import org.apache.doris.analysis.RandomDistributionDesc; -import com.google.common.collect.Lists; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.List; /** * Random partition. @@ -90,15 +87,4 @@ public class RandomDistributionInfo extends DistributionInfo { return type == randomDistributionInfo.type && bucketNum == randomDistributionInfo.bucketNum; } - - public HashDistributionInfo toHashDistributionInfo(List<Column> baseSchema) { - List<Column> keyColumns = Lists.newArrayList(); - for (Column column : baseSchema) { - if (column.isKey()) { - keyColumns.add(column); - } - } - HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(bucketNum, keyColumns); - return hashDistributionInfo; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index c0a591a..3ce8460 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -722,7 +722,7 @@ public class EditLog { } case OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM: { ModifyTableDefaultDistributionBucketNumOperationLog modifyTableDefaultDistributionBucketNumOperationLog = (ModifyTableDefaultDistributionBucketNumOperationLog) journal.getData(); - catalog.replayModifyTableDefaultDistributionBucketNum(opCode, modifyTableDefaultDistributionBucketNumOperationLog); + catalog.replayModifyTableDefaultDistributionBucketNum(modifyTableDefaultDistributionBucketNumOperationLog); break; } case OperationType.OP_REPLACE_TEMP_PARTITION: { diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java index b6df42b..6d78eb8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeJobV2Test.java @@ -34,6 +34,7 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.CatalogTestUtil; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.DynamicPartitionProperty; import org.apache.doris.catalog.FakeCatalog; import org.apache.doris.catalog.FakeEditLog; @@ -107,15 +108,12 @@ public class SchemaChangeJobV2Test { @Before public void setUp() throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchMethodException, SecurityException, AnalysisException { + FakeCatalog.setMetaVersion(FeMetaVersion.VERSION_CURRENT); fakeEditLog = new FakeEditLog(); fakeCatalog = new FakeCatalog(); fakeTransactionIDGenerator = new FakeTransactionIDGenerator(); masterCatalog = CatalogTestUtil.createTestCatalog(); slaveCatalog = CatalogTestUtil.createTestCatalog(); - MetaContext metaContext = new MetaContext(); - metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT); - metaContext.setThreadLocalInfo(); - masterTransMgr = masterCatalog.getGlobalTransactionMgr(); masterTransMgr.setEditLog(masterCatalog.getEditLog()); slaveTransMgr = slaveCatalog.getGlobalTransactionMgr(); @@ -425,4 +423,17 @@ public class SchemaChangeJobV2Test { Assert.assertEquals(10, map.get(1000L).schemaVersion); Assert.assertEquals(20, map.get(1000L).schemaHash); } + + @Test + public void testModifyTableDistributionType() throws DdlException { + fakeCatalog = new FakeCatalog(); + fakeEditLog = new FakeEditLog(); + FakeCatalog.setCatalog(masterCatalog); + Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get(); + OlapTable olapTable = (OlapTable) db.getTable(CatalogTestUtil.testTableId1).get(); + Catalog.getCurrentCatalog().convertDistributionType(db, olapTable); + Assert.assertTrue(olapTable.getDefaultDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM); + Partition partition1 = olapTable.getPartition(CatalogTestUtil.testPartitionId1); + Assert.assertTrue(partition1.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index 77a8bd3..e950d4f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -193,12 +193,6 @@ public class CatalogTestUtil { tablet.addReplica(replica2); tablet.addReplica(replica3); - // partition - RandomDistributionInfo distributionInfo = new RandomDistributionInfo(10); - Partition partition = new Partition(partitionId, testPartition1, index, distributionInfo); - partition.updateVisibleVersion(testStartVersion); - partition.setNextVersion(testStartVersion + 1); - // columns List<Column> columns = new ArrayList<Column>(); Column temp = new Column("k1", PrimitiveType.INT); @@ -217,6 +211,11 @@ public class CatalogTestUtil { temp.setIsKey(true); keysColumn.add(temp); + HashDistributionInfo distributionInfo = new HashDistributionInfo(10, keysColumn); + Partition partition = new Partition(partitionId, testPartition1, index, distributionInfo); + partition.updateVisibleVersion(testStartVersion); + partition.setNextVersion(testStartVersion + 1); + // table PartitionInfo partitionInfo = new SinglePartitionInfo(); partitionInfo.setDataProperty(partitionId, DataProperty.DEFAULT_DATA_PROPERTY); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java index 514dfca..266baa7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java @@ -24,6 +24,7 @@ import org.apache.doris.persist.BatchRemoveTransactionsOperation; import org.apache.doris.persist.EditLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; import org.apache.doris.persist.RoutineLoadOperation; +import org.apache.doris.persist.TableInfo; import org.apache.doris.system.Backend; import org.apache.doris.transaction.TransactionState; @@ -95,6 +96,11 @@ public class FakeEditLog extends MockUp<EditLog> { } + @Mock + public void logModifyDistributionType(TableInfo tableInfo) { + + } + public TransactionState getTransaction(long transactionId) { return allTransactionState.get(transactionId); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org