This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6101c5a2e8e [chore](autobucket) add autobucket test and log (#36874)
6101c5a2e8e is described below

commit 6101c5a2e8eef27776fc22ff796def55ea3a220e
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Jun 27 13:44:18 2024 +0800

    [chore](autobucket) add autobucket test and log (#36874)
    
    We met unexpect autobucket case for online env. Add log for
    investigation.
---
 .../doris/clone/DynamicPartitionScheduler.java     | 59 +++++++++++++++++-----
 .../doris/catalog/DynamicPartitionTableTest.java   | 38 ++++++++++++++
 2 files changed, 84 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 8c5f4f669c5..d17af1836fe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -50,6 +50,7 @@ import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.RangeUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.base.Strings;
@@ -71,6 +72,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This class is used to periodically add or drop partition on an olapTable 
which specify dynamic partition properties
@@ -186,7 +188,7 @@ public class DynamicPartitionScheduler extends MasterDaemon 
{
     }
 
     private static int getBucketsNum(DynamicPartitionProperty property, 
OlapTable table,
-            String nowPartitionName, boolean executeFirstTime) {
+            String partitionName, String nowPartitionName, boolean 
executeFirstTime) {
         // if execute first time, all partitions no contain data
         if (!table.isAutoBucket() || executeFirstTime) {
             return property.getBuckets();
@@ -194,27 +196,56 @@ public class DynamicPartitionScheduler extends 
MasterDaemon {
 
         // auto bucket
         // get all history partitions
-        ArrayList<Long> partitionSizeArray = Lists.newArrayList();
         RangePartitionInfo info = (RangePartitionInfo) 
(table.getPartitionInfo());
         List<Map.Entry<Long, PartitionItem>> idToItems = new 
ArrayList<>(info.getIdToItem(false).entrySet());
         idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) 
o.getValue()).getItems().upperEndpoint()));
-        for (Map.Entry<Long, PartitionItem> idToItem : idToItems) {
-            Partition partition = table.getPartition(idToItem.getKey());
-            // exclude current partition because its data isn't enough one 
week/day/hour.
-            if (partition != null && 
!partition.getName().equals(nowPartitionName)
-                    && partition.getVisibleVersion() >= 2) {
-                partitionSizeArray.add(partition.getAllDataSize(true));
+        List<Partition> partitions = idToItems.stream()
+                .map(entry -> table.getPartition(entry.getKey()))
+                .filter(partition -> partition != null && 
!partition.getName().equals(nowPartitionName))
+                .collect(Collectors.toList());
+        List<Long> visibleVersions = null;
+        try {
+            visibleVersions = Partition.getVisibleVersions(partitions);
+        } catch (RpcException e) {
+            LOG.warn("autobucket use property's buckets get visible version 
fail, table: [{}-{}], "
+                    + "partition: {}, buckets num: {}, exception: ",
+                    table.getName(), table.getId(), partitionName, 
property.getBuckets(), e);
+            return property.getBuckets();
+        }
+
+        List<Partition> hasDataPartitions = Lists.newArrayList();
+        for (int i = 0; i < partitions.size(); i++) {
+            if (visibleVersions.get(i) >= 2) {
+                hasDataPartitions.add(partitions.get(i));
             }
         }
 
         // no exist history partition data
-        if (partitionSizeArray.isEmpty()) {
+        if (hasDataPartitions.isEmpty()) {
+            LOG.info("autobucket use property's buckets due to all partitions 
no data, table: [{}-{}], "
+                    + "partition: {}, buckets num: {}",
+                    table.getName(), table.getId(), partitionName, 
property.getBuckets());
             return property.getBuckets();
         }
 
+        ArrayList<Long> partitionSizeArray = hasDataPartitions.stream()
+                .map(partition -> partition.getAllDataSize(true))
+                .collect(Collectors.toCollection(ArrayList::new));
+        long estimatePartitionSize = getNextPartitionSize(partitionSizeArray);
         // plus 5 for uncompressed data
-        long uncompressedPartitionSize = 
getNextPartitionSize(partitionSizeArray) * 5;
-        return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, 
Config.autobucket_min_buckets);
+        long uncompressedPartitionSize = estimatePartitionSize * 5;
+        int bucketsNum = 
AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, 
Config.autobucket_min_buckets);
+        LOG.info("autobucket calc with {} history partitions, table: [{}-{}], 
partition: {}, buckets num: {}, "
+                + " estimate partition size: {}, last partitions(partition 
name, local size, remote size): {}",
+                hasDataPartitions.size(), table.getName(), table.getId(), 
partitionName, bucketsNum,
+                estimatePartitionSize,
+                hasDataPartitions.stream()
+                        .skip(Math.max(0, hasDataPartitions.size() - 7))
+                        .map(partition -> "(" + partition.getName() + ", " + 
partition.getDataSize(true)
+                                + ", " + partition.getRemoteDataSize() + ")")
+                        .collect(Collectors.toList()));
+
+        return bucketsNum;
     }
 
     private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, 
OlapTable olapTable,
@@ -320,7 +351,8 @@ public class DynamicPartitionScheduler extends MasterDaemon 
{
 
             DistributionDesc distributionDesc = null;
             DistributionInfo distributionInfo = 
olapTable.getDefaultDistributionInfo();
-            int bucketsNum = getBucketsNum(dynamicPartitionProperty, 
olapTable, nowPartitionName, executeFirstTime);
+            int bucketsNum = getBucketsNum(dynamicPartitionProperty, 
olapTable, partitionName,
+                    nowPartitionName, executeFirstTime);
             if (distributionInfo.getType() == 
DistributionInfo.DistributionInfoType.HASH) {
                 HashDistributionInfo hashDistributionInfo = 
(HashDistributionInfo) distributionInfo;
                 List<String> distColumnNames = new ArrayList<>();
@@ -488,7 +520,8 @@ public class DynamicPartitionScheduler extends MasterDaemon 
{
         return dropPartitionClauses;
     }
 
-    private void executeDynamicPartition(Collection<Pair<Long, Long>> 
dynamicPartitionTableInfoCol,
+    // make public just for fe ut
+    public void executeDynamicPartition(Collection<Pair<Long, Long>> 
dynamicPartitionTableInfoCol,
             boolean executeFirstTime) throws DdlException {
         Iterator<Pair<Long, Long>> iterator = 
dynamicPartitionTableInfoCol.iterator();
         while (iterator.hasNext()) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
index d457be0324f..c2f13837329 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
@@ -22,11 +22,13 @@ import org.apache.doris.analysis.CreateDbStmt;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.clone.DynamicPartitionScheduler;
+import org.apache.doris.clone.RebalancerTestUtil;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TStorageMedium;
@@ -46,6 +48,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.Iterator;
@@ -1719,4 +1722,39 @@ public class DynamicPartitionTableTest {
                 + ");";
         ExceptionChecker.expectThrowsNoException(() -> 
createTable(createOlapTblStmt2));
     }
+
+    @Test
+    public void testAutoBuckets() throws Exception {
+        String createOlapTblStmt = " CREATE TABLE 
test.test_autobucket_dynamic_partition \n"
+                + " (k1 DATETIME)\n"
+                + " PARTITION BY RANGE (k1) () DISTRIBUTED BY HASH (k1) 
BUCKETS AUTO\n"
+                + " PROPERTIES (\n"
+                + " \"dynamic_partition.enable\" = \"true\",\n"
+                + " \"dynamic_partition.time_unit\" = \"YEAR\",\n"
+                + " \"dynamic_partition.end\" = \"1\",\n"
+                + " \"dynamic_partition.prefix\" = \"p\",\n"
+                + " \"replication_allocation\" = \"tag.location.default: 1\"\n"
+                + ")";
+        ExceptionChecker.expectThrowsNoException(() -> 
createTable(createOlapTblStmt));
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException("test");
+        OlapTable table = (OlapTable) 
db.getTableOrAnalysisException("test_autobucket_dynamic_partition");
+        List<Partition> partitions = 
Lists.newArrayList(table.getAllPartitions());
+        Assert.assertEquals(2, partitions.size());
+        for (Partition partition : partitions) {
+            Assert.assertEquals(FeConstants.default_bucket_num, 
partition.getDistributionInfo().getBucketNum());
+            partition.setVisibleVersionAndTime(2L, System.currentTimeMillis());
+        }
+        RebalancerTestUtil.updateReplicaDataSize(1, 1, 1);
+
+        String alterStmt =
+                "alter table test.test_autobucket_dynamic_partition set 
('dynamic_partition.end' = '2')";
+        ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt));
+        List<Pair<Long, Long>> tempDynamicPartitionTableInfo = 
Lists.newArrayList(Pair.of(db.getId(), table.getId()));
+        
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo,
 false);
+
+        partitions = Lists.newArrayList(table.getAllPartitions());
+        partitions.sort(Comparator.comparing(Partition::getId));
+        Assert.assertEquals(3, partitions.size());
+        Assert.assertEquals(1, 
partitions.get(2).getDistributionInfo().getBucketNum());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to