This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 6e021dcef9e [improvement](statistics)External table getRowCount return -1 when row count is not available or row count is 0. (#43009) (#43771) 6e021dcef9e is described below commit 6e021dcef9e335f040cb6f428efb606c5801134a Author: James <lijib...@selectdb.com> AuthorDate: Tue Nov 12 21:39:30 2024 +0800 [improvement](statistics)External table getRowCount return -1 when row count is not available or row count is 0. (#43009) (#43771) backport: https://github.com/apache/doris/pull/43009 --- .../java/org/apache/doris/catalog/OlapTable.java | 8 +- .../main/java/org/apache/doris/catalog/Table.java | 2 +- .../java/org/apache/doris/catalog/TableIf.java | 2 + .../doris/datasource/ExternalRowCountCache.java | 6 +- .../org/apache/doris/datasource/ExternalTable.java | 4 +- .../doris/datasource/hive/HMSExternalTable.java | 18 ++-- .../datasource/iceberg/IcebergExternalTable.java | 3 +- .../doris/datasource/iceberg/IcebergUtils.java | 5 +- .../datasource/paimon/PaimonExternalTable.java | 6 +- .../doris/statistics/StatisticsAutoCollector.java | 2 +- .../doris/statistics/util/StatisticsUtil.java | 8 +- .../datasource/ExternalRowCountCacheTest.java | 102 +++++++++++++++++++++ 12 files changed, 135 insertions(+), 31 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 37e5f265bd6..7ddc51224b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -131,8 +131,6 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { WAITING_STABLE } - public static long ROW_COUNT_BEFORE_REPORT = -1; - @SerializedName(value = "state") private volatile OlapTableState state; @@ -1519,12 +1517,12 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { if (index == null) { LOG.warn("Index {} not exist in partition {}, table {}, {}", indexId, entry.getValue().getName(), id, name); - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } if (strict && !index.getRowCountReported()) { - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } - rowCount += index.getRowCount() == -1 ? 0 : index.getRowCount(); + rowCount += index.getRowCount() == UNKNOWN_ROW_COUNT ? 0 : index.getRowCount(); } return rowCount; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 862d6c1878e..8d648df3356 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -586,7 +586,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf { @Override public long fetchRowCount() { - return 0; + return UNKNOWN_ROW_COUNT; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index d42a32ef8d2..8f9594e82c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -56,6 +56,8 @@ import java.util.stream.Collectors; public interface TableIf { Logger LOG = LogManager.getLogger(TableIf.class); + long UNKNOWN_ROW_COUNT = -1; + default void readLock() { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index faf01a49384..0826187317a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -94,7 +94,7 @@ public class ExternalRowCountCache { } /** - * Get cached row count for the given table. Return 0 if cached not loaded or table not exists. + * Get cached row count for the given table. Return -1 if cached not loaded or table not exists. * Cached will be loaded async. * @param catalogId * @param dbId @@ -106,13 +106,13 @@ public class ExternalRowCountCache { try { CompletableFuture<Optional<Long>> f = rowCountCache.get(key); if (f.isDone()) { - return f.get().orElse(0L); + return f.get().orElse(TableIf.UNKNOWN_ROW_COUNT); } LOG.info("Row count for table {}.{}.{} is still processing.", catalogId, dbId, tableId); } catch (Exception e) { LOG.warn("Unexpected exception while returning row count", e); } - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index 71ac00e48e6..590a4cbe046 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -200,7 +200,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { makeSureInitialized(); } catch (Exception e) { LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e); - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } // All external table should get external row count from cache. return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id); @@ -226,7 +226,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { * This is called by ExternalRowCountCache to load row count cache. */ public long fetchRowCount() { - return 0; + return UNKNOWN_ROW_COUNT; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 5f2c8cbddf3..5df44fda476 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -344,7 +344,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } private long getRowCountFromExternalSource() { - long rowCount; + long rowCount = UNKNOWN_ROW_COUNT; switch (dlaType) { case HIVE: rowCount = StatisticsUtil.getHiveRowCount(this); @@ -358,7 +358,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } rowCount = -1; } - return rowCount; + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } @Override @@ -532,7 +532,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI // Get row count from hive metastore property. long rowCount = getRowCountFromExternalSource(); // Only hive table supports estimate row count by listing file. - if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) { + if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) { LOG.info("Will estimate row count for table {} from file list.", name); rowCount = getRowCountFromFileList(); } @@ -838,11 +838,11 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI */ private long getRowCountFromFileList() { if (!GlobalVariable.enable_get_row_count_from_file_list) { - return -1; + return UNKNOWN_ROW_COUNT; } if (isView()) { - LOG.info("Table {} is view, return 0.", name); - return 0; + LOG.info("Table {} is view, return -1.", name); + return UNKNOWN_ROW_COUNT; } HiveMetaStoreCache.HivePartitionValues partitionValues = getAllPartitionValues(); @@ -869,8 +869,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI estimatedRowSize += column.getDataType().getSlotSize(); } if (estimatedRowSize == 0) { - LOG.warn("Table {} estimated size is 0, return 0.", name); - return 0; + LOG.warn("Table {} estimated size is 0, return -1.", name); + return UNKNOWN_ROW_COUNT; } int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size(); @@ -882,7 +882,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI long rows = totalSize / estimatedRowSize; LOG.info("Table {} rows {}, total size is {}, estimatedRowSize is {}", name, rows, totalSize, estimatedRowSize); - return rows; + return rows > 0 ? rows : UNKNOWN_ROW_COUNT; } // Get all partition values from cache. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index d4361a47797..feded88ea32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -83,7 +83,8 @@ public class IcebergExternalTable extends ExternalTable { @Override public long fetchRowCount() { makeSureInitialized(); - return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); + long rowCount = IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } public Table getIcebergTable() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 62d260dacaf..58519d92636 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -41,6 +41,7 @@ import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; @@ -604,7 +605,7 @@ public class IcebergUtils { if (snapshot == null) { LOG.info("Iceberg table {}.{}.{} is empty, return row count 0.", catalog.getName(), dbName, tbName); // empty table - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } Map<String, String> summary = snapshot.summary(); long rows = Long.parseLong(summary.get(TOTAL_RECORDS)) @@ -614,7 +615,7 @@ public class IcebergUtils { } catch (Exception e) { LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e); } - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index 618c51caea1..196b01efe2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -194,16 +194,16 @@ public class PaimonExternalTable extends ExternalTable { Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) .orElse(null); if (paimonTable == null) { - return -1; + return UNKNOWN_ROW_COUNT; } List<Split> splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); } - return rowCount; + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } catch (Exception e) { LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e); } - return -1; + return UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index a7c2fc6365b..62d3a5b2946 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -181,7 +181,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) { OlapTable ot = (OlapTable) table; - if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == OlapTable.ROW_COUNT_BEFORE_REPORT) { + if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) { LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 775138480d9..288eb88e95f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -560,19 +560,19 @@ public class StatisticsUtil { public static long getHiveRowCount(HMSExternalTable table) { Map<String, String> parameters = table.getRemoteTable().getParameters(); if (parameters == null) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters contains row count, simply get and return it. if (parameters.containsKey(NUM_ROWS)) { long rows = Long.parseLong(parameters.get(NUM_ROWS)); // Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need to check TOTAL_SIZE if NUM_ROWS is 0. - if (rows != 0) { + if (rows > 0) { LOG.info("Get row count {} for hive table {} in table parameters.", rows, table.getName()); return rows; } } if (!parameters.containsKey(TOTAL_SIZE)) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE)); @@ -582,7 +582,7 @@ public class StatisticsUtil { } if (estimatedRowSize == 0) { LOG.warn("Hive table {} estimated row size is invalid {}", table.getName(), estimatedRowSize); - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } long rows = totalSize / estimatedRowSize; LOG.info("Get row count {} for hive table {} by total size {} and row size {}", diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java new file mode 100644 index 00000000000..81605f93dcd --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ThreadPoolManager; + +import mockit.Mock; +import mockit.MockUp; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +public class ExternalRowCountCacheTest { + @Test + public void testLoadWithException() throws Exception { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( + 1, Integer.MAX_VALUE, "TEST", true); + AtomicInteger counter = new AtomicInteger(0); + + new MockUp<ExternalRowCountCache.RowCountCacheLoader>() { + @Mock + protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return null; + } + }; + ExternalRowCountCache cache = new ExternalRowCountCache(executor); + long cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 1) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(1, counter.get()); + + new MockUp<ExternalRowCountCache.RowCountCacheLoader>() { + @Mock + protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return Optional.of(100L); + } + }; + cache.getCachedRowCount(1, 1, 1); + for (int i = 0; i < 60; i++) { + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) { + Assertions.assertEquals(100, cachedRowCount); + break; + } + Thread.sleep(1000); + } + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(100, cachedRowCount); + Assertions.assertEquals(2, counter.get()); + + new MockUp<ExternalRowCountCache.RowCountCacheLoader>() { + @Mock + protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + try { + Thread.sleep(1000000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return Optional.of(100L); + } + }; + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + Thread.sleep(1000); + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 3) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(3, counter.get()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org