This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 967ad8300f4 [refactor](stats) Remove useless async loader code. (#31380) (#31464) 967ad8300f4 is described below commit 967ad8300f42abee1582d4352ea7e8a8f9cbbd17 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Tue Feb 27 19:47:46 2024 +0800 [refactor](stats) Remove useless async loader code. (#31380) (#31464) --- .../statistics/ColumnStatisticsCacheLoader.java | 9 ---- .../apache/doris/statistics/StatisticsCache.java | 16 ------- .../doris/statistics/StatisticsCacheLoader.java | 51 +--------------------- .../org/apache/doris/statistics/CacheTest.java | 2 +- 4 files changed, 2 insertions(+), 76 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java index eda3645fd00..0b66fa5e7b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java @@ -18,7 +18,6 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.qe.InternalQueryExecutionException; import org.apache.doris.statistics.util.StatisticsUtil; @@ -27,19 +26,11 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Optional; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader<Optional<ColumnStatistic>> { private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class); - private static final ThreadPoolExecutor singleThreadPool = ThreadPoolManager.newDaemonFixedThreadPool( - StatisticConstants.RETRY_LOAD_THREAD_POOL_SIZE, - StatisticConstants.RETRY_LOAD_QUEUE_SIZE, "STATS_RELOAD", - true, - new DiscardOldestPolicy()); - @Override protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) { Optional<ColumnStatistic> columnStatistic = Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 20f25eb3e96..62e11f5c9d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -44,7 +44,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; public class StatisticsCache { @@ -75,21 +74,6 @@ public class StatisticsCache { .executor(threadPool) .buildAsync(histogramCacheLoader); - { - threadPool.submit(() -> { - while (true) { - try { - columnStatisticsCacheLoader.removeExpiredInProgressing(); - histogramCacheLoader.removeExpiredInProgressing(); - } catch (Throwable t) { - // IGNORE - } - Thread.sleep(TimeUnit.MINUTES.toMillis(15)); - } - - }); - } - public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId, long idxId, String colName) { ConnectContext ctx = ConnectContext.get(); if (ctx != null && ctx.getSessionVariable().internalSession) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java index c212851a284..8b49c57f1bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java @@ -22,77 +22,28 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.NonNull; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; public abstract class StatisticsCacheLoader<V> implements AsyncCacheLoader<StatisticsCacheKey, V> { private static final Logger LOG = LogManager.getLogger(StatisticsCacheLoader.class); - private final Map<StatisticsCacheKey, CompletableFutureWithCreateTime<V>> inProgressing = new HashMap<>(); - @Override public @NonNull CompletableFuture<V> asyncLoad( @NonNull StatisticsCacheKey key, @NonNull Executor executor) { - CompletableFutureWithCreateTime<V> cfWrapper = inProgressing.get(key); - if (cfWrapper != null) { - return cfWrapper.cf; - } CompletableFuture<V> future = CompletableFuture.supplyAsync(() -> { long startTime = System.currentTimeMillis(); try { return doLoad(key); } finally { long endTime = System.currentTimeMillis(); - LOG.info("Query BE for column stats:{}-{} end time:{} cost time:{}", key.tableId, key.colName, - endTime, endTime - startTime); - removeFromIProgressing(key); + LOG.info("Load statistic cache [{}] cost time ms:{}", key, endTime - startTime); } }, executor); - putIntoIProgressing(key, - new CompletableFutureWithCreateTime<V>(System.currentTimeMillis(), future)); return future; } protected abstract V doLoad(StatisticsCacheKey k); - - private static class CompletableFutureWithCreateTime<V> extends CompletableFuture<V> { - - public final long startTime; - public final CompletableFuture<V> cf; - private final long expiredTimeMilli = TimeUnit.MINUTES.toMillis(30); - - public CompletableFutureWithCreateTime(long startTime, CompletableFuture<V> cf) { - this.startTime = startTime; - this.cf = cf; - } - - public boolean isExpired() { - return System.currentTimeMillis() - startTime > expiredTimeMilli; - } - } - - private void putIntoIProgressing(StatisticsCacheKey k, CompletableFutureWithCreateTime<V> v) { - synchronized (inProgressing) { - inProgressing.put(k, v); - } - } - - private void removeFromIProgressing(StatisticsCacheKey k) { - synchronized (inProgressing) { - inProgressing.remove(k); - } - } - - public void removeExpiredInProgressing() { - // Quite simple logic that would complete very fast. - // Lock on object to avoid ConcurrentModificationException. - synchronized (inProgressing) { - inProgressing.entrySet().removeIf(e -> e.getValue().isExpired()); - } - } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java index e3a14ecfc5f..bba6cd0c590 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java @@ -289,7 +289,7 @@ public class CacheTest extends TestWithFeService { Assertions.assertEquals(6, columnStatistic.minValue); Assertions.assertEquals(7, columnStatistic.maxValue); } else { - System.out.println("Cached is not loaded, skip test."); + System.out.println("Cache is not loaded, skip test."); } } catch (Throwable t) { t.printStackTrace(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org