This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6101c5a2e8e [chore](autobucket) add autobucket test and log (#36874) 6101c5a2e8e is described below commit 6101c5a2e8eef27776fc22ff796def55ea3a220e Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Jun 27 13:44:18 2024 +0800 [chore](autobucket) add autobucket test and log (#36874) We met unexpect autobucket case for online env. Add log for investigation. --- .../doris/clone/DynamicPartitionScheduler.java | 59 +++++++++++++++++----- .../doris/catalog/DynamicPartitionTableTest.java | 38 ++++++++++++++ 2 files changed, 84 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 8c5f4f669c5..d17af1836fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -50,6 +50,7 @@ import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.RangeUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Strings; @@ -71,6 +72,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * This class is used to periodically add or drop partition on an olapTable which specify dynamic partition properties @@ -186,7 +188,7 @@ public class DynamicPartitionScheduler extends MasterDaemon { } private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table, - String nowPartitionName, boolean executeFirstTime) { + String partitionName, String nowPartitionName, boolean executeFirstTime) { // if execute first time, all partitions no contain data if (!table.isAutoBucket() || executeFirstTime) { return property.getBuckets(); @@ -194,27 +196,56 @@ public class DynamicPartitionScheduler extends MasterDaemon { // auto bucket // get all history partitions - ArrayList<Long> partitionSizeArray = Lists.newArrayList(); RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo()); List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet()); idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint())); - for (Map.Entry<Long, PartitionItem> idToItem : idToItems) { - Partition partition = table.getPartition(idToItem.getKey()); - // exclude current partition because its data isn't enough one week/day/hour. - if (partition != null && !partition.getName().equals(nowPartitionName) - && partition.getVisibleVersion() >= 2) { - partitionSizeArray.add(partition.getAllDataSize(true)); + List<Partition> partitions = idToItems.stream() + .map(entry -> table.getPartition(entry.getKey())) + .filter(partition -> partition != null && !partition.getName().equals(nowPartitionName)) + .collect(Collectors.toList()); + List<Long> visibleVersions = null; + try { + visibleVersions = Partition.getVisibleVersions(partitions); + } catch (RpcException e) { + LOG.warn("autobucket use property's buckets get visible version fail, table: [{}-{}], " + + "partition: {}, buckets num: {}, exception: ", + table.getName(), table.getId(), partitionName, property.getBuckets(), e); + return property.getBuckets(); + } + + List<Partition> hasDataPartitions = Lists.newArrayList(); + for (int i = 0; i < partitions.size(); i++) { + if (visibleVersions.get(i) >= 2) { + hasDataPartitions.add(partitions.get(i)); } } // no exist history partition data - if (partitionSizeArray.isEmpty()) { + if (hasDataPartitions.isEmpty()) { + LOG.info("autobucket use property's buckets due to all partitions no data, table: [{}-{}], " + + "partition: {}, buckets num: {}", + table.getName(), table.getId(), partitionName, property.getBuckets()); return property.getBuckets(); } + ArrayList<Long> partitionSizeArray = hasDataPartitions.stream() + .map(partition -> partition.getAllDataSize(true)) + .collect(Collectors.toCollection(ArrayList::new)); + long estimatePartitionSize = getNextPartitionSize(partitionSizeArray); // plus 5 for uncompressed data - long uncompressedPartitionSize = getNextPartitionSize(partitionSizeArray) * 5; - return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets); + long uncompressedPartitionSize = estimatePartitionSize * 5; + int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets); + LOG.info("autobucket calc with {} history partitions, table: [{}-{}], partition: {}, buckets num: {}, " + + " estimate partition size: {}, last partitions(partition name, local size, remote size): {}", + hasDataPartitions.size(), table.getName(), table.getId(), partitionName, bucketsNum, + estimatePartitionSize, + hasDataPartitions.stream() + .skip(Math.max(0, hasDataPartitions.size() - 7)) + .map(partition -> "(" + partition.getName() + ", " + partition.getDataSize(true) + + ", " + partition.getRemoteDataSize() + ")") + .collect(Collectors.toList())); + + return bucketsNum; } private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable, @@ -320,7 +351,8 @@ public class DynamicPartitionScheduler extends MasterDaemon { DistributionDesc distributionDesc = null; DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); - int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, nowPartitionName, executeFirstTime); + int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName, + nowPartitionName, executeFirstTime); if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; List<String> distColumnNames = new ArrayList<>(); @@ -488,7 +520,8 @@ public class DynamicPartitionScheduler extends MasterDaemon { return dropPartitionClauses; } - private void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitionTableInfoCol, + // make public just for fe ut + public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitionTableInfoCol, boolean executeFirstTime) throws DdlException { Iterator<Pair<Long, Long>> iterator = dynamicPartitionTableInfoCol.iterator(); while (iterator.hasNext()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index d457be0324f..c2f13837329 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -22,11 +22,13 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.clone.DynamicPartitionScheduler; +import org.apache.doris.clone.RebalancerTestUtil; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TStorageMedium; @@ -46,6 +48,7 @@ import java.time.format.DateTimeFormatter; import java.util.Calendar; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.GregorianCalendar; import java.util.Iterator; @@ -1719,4 +1722,39 @@ public class DynamicPartitionTableTest { + ");"; ExceptionChecker.expectThrowsNoException(() -> createTable(createOlapTblStmt2)); } + + @Test + public void testAutoBuckets() throws Exception { + String createOlapTblStmt = " CREATE TABLE test.test_autobucket_dynamic_partition \n" + + " (k1 DATETIME)\n" + + " PARTITION BY RANGE (k1) () DISTRIBUTED BY HASH (k1) BUCKETS AUTO\n" + + " PROPERTIES (\n" + + " \"dynamic_partition.enable\" = \"true\",\n" + + " \"dynamic_partition.time_unit\" = \"YEAR\",\n" + + " \"dynamic_partition.end\" = \"1\",\n" + + " \"dynamic_partition.prefix\" = \"p\",\n" + + " \"replication_allocation\" = \"tag.location.default: 1\"\n" + + ")"; + ExceptionChecker.expectThrowsNoException(() -> createTable(createOlapTblStmt)); + Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException("test"); + OlapTable table = (OlapTable) db.getTableOrAnalysisException("test_autobucket_dynamic_partition"); + List<Partition> partitions = Lists.newArrayList(table.getAllPartitions()); + Assert.assertEquals(2, partitions.size()); + for (Partition partition : partitions) { + Assert.assertEquals(FeConstants.default_bucket_num, partition.getDistributionInfo().getBucketNum()); + partition.setVisibleVersionAndTime(2L, System.currentTimeMillis()); + } + RebalancerTestUtil.updateReplicaDataSize(1, 1, 1); + + String alterStmt = + "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '2')"; + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt)); + List<Pair<Long, Long>> tempDynamicPartitionTableInfo = Lists.newArrayList(Pair.of(db.getId(), table.getId())); + Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); + + partitions = Lists.newArrayList(table.getAllPartitions()); + partitions.sort(Comparator.comparing(Partition::getId)); + Assert.assertEquals(3, partitions.size()); + Assert.assertEquals(1, partitions.get(2).getDistributionInfo().getBucketNum()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org