This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 74dfdc00dc [nerids](statistics) remove lock in statistics cache loader #17833 74dfdc00dc is described below commit 74dfdc00dcc1ee7066dcdea22bc482116ca57404 Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Sun Mar 19 21:30:21 2023 +0900 [nerids](statistics) remove lock in statistics cache loader #17833 remove the redandunt lock in the CacheLoader, since it use the forkjoinpool in default Add execute time log for collect stats Avoid submit duplicate task, when there already has a task to load for the same column --- .../doris/statistics/StatisticsCacheLoader.java | 108 ++++++++++----------- 1 file changed, 54 insertions(+), 54 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java index 73d38b7db8..c592a9b4eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKey, ColumnLevelStatisticCache> { @@ -47,70 +49,68 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe + "." + StatisticConstants.HISTOGRAM_TBL_NAME + " WHERE " + "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')"; - private static int CUR_RUNNING_LOAD = 0; + // TODO: Maybe we should trigger a analyze job when the required ColumnStatistic doesn't exists. - private static final Object LOCK = new Object(); + private final ConcurrentMap<StatisticsCacheKey, CompletableFuture<ColumnLevelStatisticCache>> + inProgressing = new ConcurrentHashMap<>(); - // TODO: Maybe we should trigger a analyze job when the required ColumnStatistic doesn't exists. @Override public @NonNull CompletableFuture<ColumnLevelStatisticCache> asyncLoad(@NonNull StatisticsCacheKey key, @NonNull Executor executor) { - synchronized (LOCK) { - if (CUR_RUNNING_LOAD > StatisticConstants.LOAD_TASK_LIMITS) { - try { - LOCK.wait(); - } catch (InterruptedException e) { - LOG.warn("Ignore interruption", e); - } - } - CUR_RUNNING_LOAD++; - return CompletableFuture.supplyAsync(() -> { + + CompletableFuture<ColumnLevelStatisticCache> future = inProgressing.get(key); + if (future != null) { + return future; + } + future = CompletableFuture.supplyAsync(() -> { + long startTime = System.currentTimeMillis(); + try { + LOG.info("Query BE for column stats:{}-{} start time:{}", key.tableId, key.colName, + startTime); ColumnLevelStatisticCache statistic = new ColumnLevelStatisticCache(); + Map<String, String> params = new HashMap<>(); + params.put("tblId", String.valueOf(key.tableId)); + params.put("idxId", String.valueOf(key.idxId)); + params.put("colId", String.valueOf(key.colName)); + List<ColumnStatistic> columnStatistics; + List<ResultRow> columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(QUERY_COLUMN_STATISTICS)); try { - Map<String, String> params = new HashMap<>(); - params.put("tblId", String.valueOf(key.tableId)); - params.put("idxId", String.valueOf(key.idxId)); - params.put("colId", String.valueOf(key.colName)); - - List<ColumnStatistic> columnStatistics; - List<ResultRow> columnResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(QUERY_COLUMN_STATISTICS)); - try { - columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult); - } catch (Exception e) { - LOG.warn("Failed to deserialize column statistics", e); - throw new CompletionException(e); - } - if (CollectionUtils.isEmpty(columnStatistics)) { - statistic.setColumnStatistic(ColumnStatistic.UNKNOWN); - } else { - statistic.setColumnStatistic(columnStatistics.get(0)); - } - - List<Histogram> histogramStatistics; - List<ResultRow> histogramResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(QUERY_HISTOGRAM_STATISTICS)); - try { - histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult); - } catch (Exception e) { - LOG.warn("Failed to deserialize histogram statistics", e); - throw new CompletionException(e); - } - if (!CollectionUtils.isEmpty(histogramStatistics)) { - statistic.setHistogram(histogramStatistics.get(0)); - } - } finally { - synchronized (LOCK) { - CUR_RUNNING_LOAD--; - LOCK.notify(); - } + columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult); + } catch (Exception e) { + LOG.warn("Failed to deserialize column statistics", e); + throw new CompletionException(e); + } + if (CollectionUtils.isEmpty(columnStatistics)) { + statistic.setColumnStatistic(ColumnStatistic.UNKNOWN); + } else { + statistic.setColumnStatistic(columnStatistics.get(0)); } + List<Histogram> histogramStatistics; + List<ResultRow> histogramResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(QUERY_HISTOGRAM_STATISTICS)); + try { + histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult); + } catch (Exception e) { + LOG.warn("Failed to deserialize histogram statistics", e); + throw new CompletionException(e); + } + if (!CollectionUtils.isEmpty(histogramStatistics)) { + statistic.setHistogram(histogramStatistics.get(0)); + } return statistic; - }); - } + } finally { + long endTime = System.currentTimeMillis(); + LOG.info("Query BE for column stats:{}-{} end time:{} cost time:{}", key.tableId, key.colName, + endTime, endTime - startTime); + inProgressing.remove(key); + } + }); + inProgressing.put(key, future); + return future; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org