This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 893e5a3a2eb [fix](fe) Avoid blocking external meta cache refresh on 
slow miss load (#64705)
893e5a3a2eb is described below

commit 893e5a3a2eb47d903cca626fbd8f1c4a51b4ef7c
Author: Wen Zhenghu <[email protected]>
AuthorDate: Wed Jun 24 18:52:10 2026 +0800

    [fix](fe) Avoid blocking external meta cache refresh on slow miss load 
(#64705)
    
    ### What problem does this PR solve?
    
    Issue Number: N/A
    
    Related PR: N/A
    
    Problem Summary:
    This PR avoids blocking external meta cache invalidation on slow miss
    loads in FE. Previously, `MetaCacheEntry` relied on Caffeine's
    synchronous loading path for cache misses. When an external metadata
    loader became slow, operations that invalidate the same cache, such as
    `REFRESH CATALOG` and the corresponding replay path, could wait on the
    slow load and block the replay-related invalidation flow.
    
    Implementation summary:
    - Keep the existing `LoadingCache` to preserve current hit-path behavior
    and `refreshAfterWrite` support.
    - Add a manual miss-load path behind a new FE config switch, using
    `getIfPresent()` instead of synchronous `LoadingCache.get()` for misses.
    - Deduplicate concurrent miss loads with striped locks inside
    `MetaCacheEntry`.
    - Add an entry-level `invalidateGeneration` counter. Each invalidate
    increments the generation before clearing cache state.
    - Record the generation before a manual miss load, check it once before
    `put()`, and check it again after `put()`. If invalidation happens
    during the race window, the just-loaded value is removed so stale data
    is not kept in cache.
    - Keep null miss-load results uncached so the manual path does not
    attempt to put null into Caffeine.
    
    Configuration:
    - Add FE config `enable_external_meta_cache_manual_miss_load`, default
    `false`.
    - When it is `false`, `MetaCacheEntry` keeps the original synchronous
    Caffeine miss-load behavior.
    - When it is `true`, `MetaCacheEntry` uses the manual miss-load path
    plus `invalidateGeneration` protection.
    
    Scope and limitations:
    - This change applies to `MetaCacheEntry` used by external metadata
    cache paths in FE. It does not cover the legacy `MetaCache`.
    - `LegacyMetaCacheFactory` is intentionally not refactored in this PR. A
    follow-up PR will rework that path with `MetaCache`, and the legacy
    factory changes are left to that dedicated refactor.
    - The protection is designed for manual miss loads. It does not make
    Caffeine's asynchronous `refreshAfterWrite` reload generation-aware.
    - As a result, `refreshAfterWrite` is still preserved, but an async
    refresh result may still write back after an invalidate. That is an
    intentional trade-off in this version.
    - The new regression case is valuable as a reference and for suitable
    environments, but it may be skipped in standard CI because it depends on
    JDBC regression setup, FE debug points, and an external MySQL/JDBC
    environment.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test
        - [ ] Regression test
        - [x] Unit Test
        - [x] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason
    
    Manual test:
    1. Reproduced the blocking path with `REFRESH CATALOG` against a JDBC
    external catalog and a debug point that sleeps in
    `PluginDrivenExternalTable.initSchema`.
    2. Repeated the baseline scenario 5 times with
    `enable_external_meta_cache_manual_miss_load=false` and observed
    `REFRESH CATALOG` blocked for about 14s while `DESC` stayed slow.
    3. Repeated the optimized scenario 5 times with
    `enable_external_meta_cache_manual_miss_load=true` and observed `REFRESH
    CATALOG` return within about 1s while `DESC` remained slow.
    4. Added a regression case as a manual-test reference because its
    execution depends on JDBC regression environment and FE debug-point
    availability.
    
    Unit test:
    - `FE_UT_PARALLEL=1 ./run-fe-ut.sh --run
    org.apache.doris.datasource.metacache.MetaCacheEntryTest`
    
    - Behavior changed:
        - [x] Yes.
    
    Behavior change:
    - `REFRESH CATALOG` and the corresponding FE invalidation path are no
    longer blocked by slow external metadata miss loads in this
    `MetaCacheEntry` implementation.
    
    - Does this need documentation?
        - [x] No.
        - [ ] Yes.
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label
---
 .../main/java/org/apache/doris/common/Config.java  |   5 +
 .../doris/datasource/metacache/MetaCacheEntry.java | 129 ++++++++++-
 .../datasource/metacache/MetaCacheEntryTest.java   | 197 ++++++++++++++++-
 ...st_jdbc_refresh_catalog_manual_miss_load.groovy | 243 +++++++++++++++++++++
 4 files changed, 563 insertions(+), 11 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index ff1b7502f23..3fab9c36023 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2193,6 +2193,11 @@ public class Config extends ConfigBase {
     @ConfField(description = {"The auto-refresh interval of the external meta 
cache."})
     public static long external_cache_refresh_time_minutes = 10; // 10 mins
 
+    // Enable manual miss load for external meta cache to avoid blocking 
replayer on slow loaders.
+    @ConfField(mutable = true, masterOnly = false,
+            description = {"Whether external meta cache uses manual miss load 
instead of Caffeine sync load."})
+    public static boolean enable_external_meta_cache_manual_miss_load = true;
+
     /**
      * Github workflow test type, for setting some session variables
      * only for certain test type. E.g. only settting batch_size to small
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCacheEntry.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCacheEntry.java
index 8913cd8f4ca..30668163539 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCacheEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCacheEntry.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.metacache;
 import org.apache.doris.common.CacheFactory;
 import org.apache.doris.common.Config;
 
+import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.github.benmanes.caffeine.cache.stats.CacheStats;
 
@@ -39,14 +40,28 @@ import javax.annotation.Nullable;
  * key/predicate/full invalidation, and lightweight runtime stats.
  */
 public class MetaCacheEntry<K, V> {
+    // Use striped locks to deduplicate slow external loads without managing 
per-key lock lifecycle.
+    private static final int LOAD_LOCK_STRIPES = 128;
+
     private final String name;
     @Nullable
     private final Function<K, V> loader;
     private final CacheSpec cacheSpec;
     private final boolean effectiveEnabled;
     private final boolean autoRefresh;
-    private final LoadingCache<K, V> data;
+    // Keep the loading cache for refreshAfterWrite and the legacy sync-load 
path when the feature is disabled.
+    private final LoadingCache<K, V> loadingData;
+    // Use the plain cache view for manual miss load so slow I/O does not 
happen in Caffeine's sync load path.
+    private final Cache<K, V> data;
+    // Protect one key stripe at a time to deduplicate concurrent miss loads 
with bounded lock count.
+    private final Object[] loadLocks = new Object[LOAD_LOCK_STRIPES];
     private final AtomicLong invalidateCount = new AtomicLong(0);
+    // Bump generation before invalidation so in-flight manual loads do not 
repopulate stale values.
+    private final AtomicLong invalidateGeneration = new AtomicLong(0);
+    // Track load statistics outside Caffeine because manual miss loads bypass 
the built-in load counters.
+    private final AtomicLong loadSuccessCount = new AtomicLong(0);
+    private final AtomicLong loadFailureCount = new AtomicLong(0);
+    private final AtomicLong totalLoadTimeNanos = new AtomicLong(0);
     private final AtomicLong lastLoadSuccessTimeMs = new AtomicLong(-1L);
     private final AtomicLong lastLoadFailureTimeMs = new AtomicLong(-1L);
     private final AtomicReference<String> lastError = new 
AtomicReference<>("");
@@ -92,7 +107,12 @@ public class MetaCacheEntry<K, V> {
                 maxSize,
                 true,
                 null);
-        this.data = cacheFactory.buildCache(this::loadFromDefaultLoader, 
refreshExecutor);
+        this.loadingData = 
cacheFactory.buildCache(this::loadFromDefaultLoader, refreshExecutor);
+        this.data = loadingData;
+        // Initialize striped locks eagerly to keep the hot path 
allocation-free.
+        for (int i = 0; i < loadLocks.length; i++) {
+            loadLocks[i] = new Object();
+        }
     }
 
     public String name() {
@@ -100,29 +120,43 @@ public class MetaCacheEntry<K, V> {
     }
 
     public V get(K key) {
-        return data.get(key);
+        if (!isManualMissLoadEnabled()) {
+            return loadingData.get(key);
+        }
+        return getWithManualLoad(key, this::applyDefaultLoader);
     }
 
     public V get(K key, Function<K, V> missLoader) {
         Function<K, V> loadFunction = Objects.requireNonNull(missLoader, 
"missLoader can not be null");
-        return data.get(key, typedKey -> loadAndTrack(typedKey, loadFunction));
+        if (!isManualMissLoadEnabled()) {
+            return loadingData.get(key, typedKey -> loadAndTrack(typedKey, 
loadFunction));
+        }
+        return getWithManualLoad(key, loadFunction);
     }
 
     public V getIfPresent(K key) {
+        if (!effectiveEnabled) {
+            return null;
+        }
         return data.getIfPresent(key);
     }
 
     public void put(K key, V value) {
+        if (!effectiveEnabled) {
+            return;
+        }
         data.put(key, value);
     }
 
     public void invalidateKey(K key) {
+        invalidateGeneration.incrementAndGet();
         if (data.asMap().remove(key) != null) {
             invalidateCount.incrementAndGet();
         }
     }
 
     public void invalidateIf(Predicate<K> predicate) {
+        invalidateGeneration.incrementAndGet();
         data.asMap().keySet().removeIf(key -> {
             if (predicate.test(key)) {
                 invalidateCount.incrementAndGet();
@@ -133,6 +167,7 @@ public class MetaCacheEntry<K, V> {
     }
 
     public void invalidateAll() {
+        invalidateGeneration.incrementAndGet();
         long size = data.estimatedSize();
         data.invalidateAll();
         invalidateCount.addAndGet(size);
@@ -143,7 +178,11 @@ public class MetaCacheEntry<K, V> {
     }
 
     public MetaCacheEntryStats stats() {
-        CacheStats cacheStats = data.stats();
+        CacheStats cacheStats = loadingData.stats();
+        long successCount = loadSuccessCount.get();
+        long failureCount = loadFailureCount.get();
+        long totalLoadTime = totalLoadTimeNanos.get();
+        long totalLoadCount = successCount + failureCount;
         return new MetaCacheEntryStats(
                 cacheSpec.isEnable(),
                 effectiveEnabled,
@@ -155,10 +194,10 @@ public class MetaCacheEntry<K, V> {
                 cacheStats.hitCount(),
                 cacheStats.missCount(),
                 cacheStats.hitRate(),
-                cacheStats.loadSuccessCount(),
-                cacheStats.loadFailureCount(),
-                cacheStats.totalLoadTime(),
-                cacheStats.averageLoadPenalty(),
+                successCount,
+                failureCount,
+                totalLoadTime,
+                totalLoadCount == 0 ? 0D : (double) totalLoadTime / 
totalLoadCount,
                 cacheStats.evictionCount(),
                 invalidateCount.get(),
                 lastLoadSuccessTimeMs.get(),
@@ -166,20 +205,90 @@ public class MetaCacheEntry<K, V> {
                 lastError.get());
     }
 
+    // Read the config dynamically so existing cache entries follow runtime 
config updates.
+    private boolean isManualMissLoadEnabled() {
+        return Config.enable_external_meta_cache_manual_miss_load;
+    }
+
+    // Execute slow miss loads outside Caffeine's sync load path and suppress 
stale write-back after invalidation.
+    private V getWithManualLoad(K key, Function<K, V> loadFunction) {
+        if (!effectiveEnabled) {
+            // Bypass cache entirely when the entry is disabled so manual miss 
load does not relax disable semantics.
+            return loadAndTrack(key, loadFunction);
+        }
+
+        V value = data.getIfPresent(key);
+        if (value != null) {
+            return value;
+        }
+
+        synchronized (loadLock(key)) {
+            value = data.asMap().get(key);
+            if (value != null) {
+                return value;
+            }
+
+            long generation = invalidateGeneration.get();
+            V loaded = loadAndTrack(key, loadFunction);
+            if (generation != invalidateGeneration.get()) {
+                return loaded;
+            }
+
+            // Keep null results uncached so manual miss load matches 
LoadingCache null-return behavior.
+            if (loaded == null) {
+                return null;
+            }
+
+            // Leave a narrow hook for tests to pause exactly before the cache 
put race window.
+            beforeManualCachePutForTest(key, loaded);
+            data.put(key, loaded);
+            if (generation != invalidateGeneration.get()) {
+                removeLoadedValue(key, loaded);
+            }
+            return loaded;
+        }
+    }
+
+    // Remove only the value loaded by the current request and keep newer 
replacements intact.
+    private void removeLoadedValue(K key, V loaded) {
+        data.asMap().computeIfPresent(key, (ignored, currentValue) -> 
currentValue == loaded ? null : currentValue);
+    }
+
+    // Map keys to a fixed lock stripe set to bound memory usage while keeping 
same-key deduplication.
+    private Object loadLock(K key) {
+        int hash = key == null ? 0 : key.hashCode();
+        return loadLocks[(hash & Integer.MAX_VALUE) % loadLocks.length];
+    }
+
+    // Let tests pause between the first generation check and data.put without 
affecting production behavior.
+    void beforeManualCachePutForTest(K key, V loaded) {
+    }
+
     private V loadFromDefaultLoader(K key) {
+        return loadAndTrack(key, this::applyDefaultLoader);
+    }
+
+    // Resolve the default loader separately so the manual path can share 
tracking without double counting.
+    private V applyDefaultLoader(K key) {
         if (loader == null) {
             throw new UnsupportedOperationException(
                     String.format("Entry '%s' requires a contextual miss 
loader.", name));
         }
-        return loadAndTrack(key, loader);
+        return loader.apply(key);
     }
 
+    // Track load outcomes locally because manual miss loads do not contribute 
to Caffeine load statistics.
     private V loadAndTrack(K key, Function<K, V> loadFunction) {
+        long startNanos = System.nanoTime();
         try {
             V value = loadFunction.apply(key);
+            loadSuccessCount.incrementAndGet();
+            totalLoadTimeNanos.addAndGet(System.nanoTime() - startNanos);
             lastLoadSuccessTimeMs.set(System.currentTimeMillis());
             return value;
         } catch (RuntimeException | Error e) {
+            loadFailureCount.incrementAndGet();
+            totalLoadTimeNanos.addAndGet(System.nanoTime() - startNanos);
             lastLoadFailureTimeMs.set(System.currentTimeMillis());
             lastError.set(e.toString());
             throw e;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/metacache/MetaCacheEntryTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/metacache/MetaCacheEntryTest.java
index 290eed874ed..7583ac3b075 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/metacache/MetaCacheEntryTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/metacache/MetaCacheEntryTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.datasource.metacache;
 
+import org.apache.doris.common.Config;
+
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.Maps;
 import org.junit.Assert;
@@ -24,14 +26,19 @@ import org.junit.Test;
 
 import java.lang.reflect.Field;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class MetaCacheEntryTest {
 
     @Test
     public void testRefreshUsesConfiguredLoader() throws Exception {
+        boolean originalManualMissLoad = 
Config.enable_external_meta_cache_manual_miss_load;
+        Config.enable_external_meta_cache_manual_miss_load = true;
         ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
         try {
             Map<String, String> properties = Maps.newHashMap();
@@ -58,6 +65,7 @@ public class MetaCacheEntryTest {
             }
             Assert.assertTrue("refresh should trigger loader invocation", 
loadCounter.get() >= 2);
         } finally {
+            Config.enable_external_meta_cache_manual_miss_load = 
originalManualMissLoad;
             refreshExecutor.shutdownNow();
         }
     }
@@ -211,9 +219,196 @@ public class MetaCacheEntryTest {
         }
     }
 
+    @Test
+    public void testManualMissLoadDeduplicatesSameKey() throws Exception {
+        boolean originalManualMissLoad = 
Config.enable_external_meta_cache_manual_miss_load;
+        Config.enable_external_meta_cache_manual_miss_load = true;
+        ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+        ExecutorService queryExecutor = Executors.newFixedThreadPool(2);
+        try {
+            CacheSpec cacheSpec = CacheSpec.of(true, CacheSpec.CACHE_NO_TTL, 
10L);
+            CountDownLatch loaderStarted = new CountDownLatch(1);
+            CountDownLatch releaseLoader = new CountDownLatch(1);
+            AtomicInteger loadCounter = new AtomicInteger();
+            MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<>(
+                    "test",
+                    key -> {
+                        loaderStarted.countDown();
+                        awaitLatch(releaseLoader);
+                        return loadCounter.incrementAndGet();
+                    },
+                    cacheSpec,
+                    refreshExecutor,
+                    false);
+
+            Future<Integer> first = queryExecutor.submit(() -> entry.get("k"));
+            Assert.assertTrue(loaderStarted.await(3L, TimeUnit.SECONDS));
+            Future<Integer> second = queryExecutor.submit(() -> 
entry.get("k"));
+            releaseLoader.countDown();
+
+            Assert.assertEquals(Integer.valueOf(1), first.get(3L, 
TimeUnit.SECONDS));
+            Assert.assertEquals(Integer.valueOf(1), second.get(3L, 
TimeUnit.SECONDS));
+            Assert.assertEquals(1, loadCounter.get());
+        } finally {
+            Config.enable_external_meta_cache_manual_miss_load = 
originalManualMissLoad;
+            queryExecutor.shutdownNow();
+            refreshExecutor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testManualMissLoadDoesNotPutAfterInvalidate() throws Exception 
{
+        boolean originalManualMissLoad = 
Config.enable_external_meta_cache_manual_miss_load;
+        Config.enable_external_meta_cache_manual_miss_load = true;
+        ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+        ExecutorService queryExecutor = Executors.newSingleThreadExecutor();
+        try {
+            CacheSpec cacheSpec = CacheSpec.of(true, CacheSpec.CACHE_NO_TTL, 
10L);
+            CountDownLatch loaderStarted = new CountDownLatch(1);
+            CountDownLatch releaseLoader = new CountDownLatch(1);
+            AtomicInteger loadCounter = new AtomicInteger();
+            MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<>(
+                    "test",
+                    key -> {
+                        loaderStarted.countDown();
+                        awaitLatch(releaseLoader);
+                        return loadCounter.incrementAndGet();
+                    },
+                    cacheSpec,
+                    refreshExecutor,
+                    false);
+
+            Future<Integer> first = queryExecutor.submit(() -> entry.get("k"));
+            Assert.assertTrue(loaderStarted.await(3L, TimeUnit.SECONDS));
+            entry.invalidateKey("k");
+            releaseLoader.countDown();
+
+            Assert.assertEquals(Integer.valueOf(1), first.get(3L, 
TimeUnit.SECONDS));
+            Assert.assertNull(entry.getIfPresent("k"));
+            Assert.assertEquals(Integer.valueOf(2), entry.get("k"));
+            Assert.assertEquals(2, loadCounter.get());
+        } finally {
+            Config.enable_external_meta_cache_manual_miss_load = 
originalManualMissLoad;
+            queryExecutor.shutdownNow();
+            refreshExecutor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testManualMissLoadRemovesValueWhenInvalidateHappensBeforePut() 
throws Exception {
+        boolean originalManualMissLoad = 
Config.enable_external_meta_cache_manual_miss_load;
+        Config.enable_external_meta_cache_manual_miss_load = true;
+        ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+        ExecutorService queryExecutor = Executors.newSingleThreadExecutor();
+        try {
+            CacheSpec cacheSpec = CacheSpec.of(true, CacheSpec.CACHE_NO_TTL, 
10L);
+            CountDownLatch beforePutStarted = new CountDownLatch(1);
+            CountDownLatch releaseBeforePut = new CountDownLatch(1);
+            AtomicInteger loadCounter = new AtomicInteger();
+            MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<String, 
Integer>(
+                    "test",
+                    key -> loadCounter.incrementAndGet(),
+                    cacheSpec,
+                    refreshExecutor,
+                    false) {
+                @Override
+                void beforeManualCachePutForTest(String key, Integer loaded) {
+                    beforePutStarted.countDown();
+                    awaitLatch(releaseBeforePut);
+                }
+            };
+
+            Future<Integer> first = queryExecutor.submit(() -> entry.get("k"));
+            Assert.assertTrue(beforePutStarted.await(3L, TimeUnit.SECONDS));
+            entry.invalidateKey("k");
+            releaseBeforePut.countDown();
+
+            Assert.assertEquals(Integer.valueOf(1), first.get(3L, 
TimeUnit.SECONDS));
+            Assert.assertNull(entry.getIfPresent("k"));
+            Assert.assertEquals(Integer.valueOf(2), entry.get("k"));
+            Assert.assertEquals(2, loadCounter.get());
+        } finally {
+            Config.enable_external_meta_cache_manual_miss_load = 
originalManualMissLoad;
+            queryExecutor.shutdownNow();
+            refreshExecutor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testManualMissLoadAllowsNullWithoutCaching() {
+        boolean originalManualMissLoad = 
Config.enable_external_meta_cache_manual_miss_load;
+        Config.enable_external_meta_cache_manual_miss_load = true;
+        ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+        try {
+            CacheSpec cacheSpec = CacheSpec.of(true, CacheSpec.CACHE_NO_TTL, 
10L);
+            MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<>(
+                    "test",
+                    String::length,
+                    cacheSpec,
+                    refreshExecutor,
+                    false);
+            AtomicInteger missLoaderCounter = new AtomicInteger();
+
+            // Verify manual miss load returns null directly and retries 
because null values are not cached.
+            Assert.assertNull(entry.get("missing", key -> {
+                missLoaderCounter.incrementAndGet();
+                return null;
+            }));
+            Assert.assertNull(entry.getIfPresent("missing"));
+            Assert.assertNull(entry.get("missing", key -> {
+                missLoaderCounter.incrementAndGet();
+                return null;
+            }));
+            Assert.assertEquals(2, missLoaderCounter.get());
+        } finally {
+            Config.enable_external_meta_cache_manual_miss_load = 
originalManualMissLoad;
+            refreshExecutor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testManualMissLoadDoesNotCacheWhenEntryDisabled() {
+        boolean originalManualMissLoad = 
Config.enable_external_meta_cache_manual_miss_load;
+        Config.enable_external_meta_cache_manual_miss_load = true;
+        ExecutorService refreshExecutor = Executors.newSingleThreadExecutor();
+        try {
+            CacheSpec cacheSpec = CacheSpec.of(false, CacheSpec.CACHE_NO_TTL, 
10L);
+            AtomicInteger loadCounter = new AtomicInteger();
+            MetaCacheEntry<String, Integer> entry = new MetaCacheEntry<>(
+                    "test",
+                    key -> loadCounter.incrementAndGet(),
+                    cacheSpec,
+                    refreshExecutor,
+                    false);
+
+            // Verify disabled entries bypass cache entirely even when manual 
miss load is enabled by config.
+            Assert.assertEquals(Integer.valueOf(1), entry.get("k"));
+            Assert.assertNull(entry.getIfPresent("k"));
+            Assert.assertEquals(Integer.valueOf(2), entry.get("k"));
+            Assert.assertNull(entry.getIfPresent("k"));
+            Assert.assertEquals(2, loadCounter.get());
+
+            entry.put("k", 100);
+            Assert.assertNull(entry.getIfPresent("k"));
+        } finally {
+            Config.enable_external_meta_cache_manual_miss_load = 
originalManualMissLoad;
+            refreshExecutor.shutdownNow();
+        }
+    }
+
+    // Keep the loader blocking helper in one place so concurrent tests stay 
readable.
+    private void awaitLatch(CountDownLatch latch) {
+        try {
+            Assert.assertTrue(latch.await(3L, TimeUnit.SECONDS));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+    }
+
     @SuppressWarnings("unchecked")
     private LoadingCache<String, Integer> 
extractLoadingCache(MetaCacheEntry<String, Integer> entry) throws Exception {
-        Field dataField = MetaCacheEntry.class.getDeclaredField("data");
+        Field dataField = MetaCacheEntry.class.getDeclaredField("loadingData");
         dataField.setAccessible(true);
         Object raw = dataField.get(entry);
         Assert.assertTrue(raw instanceof LoadingCache);
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_refresh_catalog_manual_miss_load.groovy
 
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_refresh_catalog_manual_miss_load.groovy
new file mode 100644
index 00000000000..f9e942c277a
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_refresh_catalog_manual_miss_load.groovy
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.UUID
+
+// This case depends on JDBC regression configs, FE debug points, and a 
reachable
+// external MySQL environment. Those prerequisites may be unavailable in the
+// default Apache Doris CI pipeline, so the case can be skipped or may not run
+// end-to-end there. That is expected. The case still serves as a valuable
+// reference for manual validation of the MetaCacheEntry refresh blocking fix.
+suite("test_jdbc_refresh_catalog_manual_miss_load", "p0,external") {
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        return
+    }
+
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String mysqlPort = context.config.otherConfigs.get("mysql_57_port")
+    String s3Endpoint = getS3Endpoint()
+    String bucket = getS3BucketName()
+    String driverUrl = 
"https://${bucket}.${s3Endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+    String nameSuffix = UUID.randomUUID().toString().replace("-", "")
+    String catalogName = "jdbc_manual_miss_load_" + nameSuffix
+    String remoteDbName = "jdbc_manual_miss_load_db_" + 
nameSuffix.substring(0, 12)
+    String slowTableName = "slow_probe_000000"
+    int collisionTableCount = 80
+    int hashMod = 512
+    int slowSleepMs = 15000
+    int refreshDelayMs = 5000
+
+    def mysqlJdbcUrl = "jdbc:mysql://${externalEnvIp}:${mysqlPort}"
+    def configRows = sql """ADMIN SHOW FRONTEND CONFIG LIKE 
'enable_external_meta_cache_manual_miss_load';"""
+    String originalManualMissLoadValue = configRows[0][1].toString()
+    def debugPointRows = sql """ADMIN SHOW FRONTEND CONFIG LIKE 
'enable_debug_points';"""
+    // Skip the case when FE debug points are not enabled in the regression 
environment.
+    if (!"true".equalsIgnoreCase(debugPointRows[0][1].toString())) {
+        logger.info("skip ${catalogName} because enable_debug_points is not 
true")
+        return
+    }
+
+    def jint = { long value ->
+        long result = value & 0xffffffffL
+        if (result >= 0x80000000L) {
+            result -= 0x100000000L
+        }
+        return (int) result
+    }
+
+    def javaStringHash = { String value ->
+        int hash = 0
+        for (int i = 0; i < value.length(); i++) {
+            hash = jint(31L * hash + (int) value.charAt(i))
+        }
+        return hash
+    }
+
+    def javaLongHash = { long value ->
+        return jint(value ^ (value >>> 32))
+    }
+
+    def objectsHash = { List<Object> values ->
+        int hash = 1
+        values.each { value ->
+            int elementHash
+            if (value instanceof Number) {
+                elementHash = javaLongHash(((Number) value).longValue())
+            } else {
+                elementHash = javaStringHash(value.toString())
+            }
+            hash = jint(31L * hash + elementHash)
+        }
+        return hash
+    }
+
+    def spread = { int hash ->
+        return jint(hash ^ (((long) hash & 0xffffffffL) >>> 16))
+    }
+
+    // Compute collision table names that land in the same Caffeine hash bin 
as the slow key.
+    def findCollisionTableNames = { long catalogId ->
+        def calcIndex = { String tableName ->
+            int hash = objectsHash([catalogId, remoteDbName, tableName, 
remoteDbName, tableName])
+            return spread(hash) & (hashMod - 1)
+        }
+        int targetIndex = calcIndex(slowTableName)
+        def tableNames = []
+        for (int i = 0; i < 300000; i++) {
+            String candidate = String.format("collide_%06d", i)
+            if (calcIndex(candidate) == targetIndex) {
+                tableNames.add(candidate)
+                if (tableNames.size() >= collisionTableCount) {
+                    break
+                }
+            }
+        }
+        assertEquals(collisionTableCount, tableNames.size())
+        return tableNames
+    }
+
+    def executeOnRemoteMysql = { String statement ->
+        connect("root", "123456", mysqlJdbcUrl) {
+            sql statement
+        }
+    }
+
+    // Recreate the remote schema so every run starts from the same cache and 
metadata state.
+    def recreateRemoteObjects = { List<String> collisionTableNames ->
+        executeOnRemoteMysql("""DROP DATABASE IF EXISTS ${remoteDbName}""")
+        executeOnRemoteMysql("""CREATE DATABASE ${remoteDbName}""")
+        executeOnRemoteMysql("""CREATE TABLE ${remoteDbName}.${slowTableName} 
(k INT)""")
+        collisionTableNames.each { tableName ->
+            executeOnRemoteMysql("""CREATE TABLE ${remoteDbName}.${tableName} 
(k INT)""")
+        }
+    }
+
+    def preheatSchemaCache = { List<String> collisionTableNames ->
+        sql """REFRESH CATALOG ${catalogName}"""
+        collisionTableNames.each { tableName ->
+            sql """DESC ${catalogName}.${remoteDbName}.${tableName}"""
+        }
+        assertEquals(collisionTableCount, getSchemaCacheSize())
+    }
+
+    // Read the current schema cache size for this catalog from 
information_schema.
+    def getSchemaCacheSize = {
+        def statRows = sql """
+                SELECT ESTIMATED_SIZE
+                FROM information_schema.catalog_meta_cache_statistics
+                WHERE CATALOG_NAME = '${catalogName}'
+                  AND ENTRY_NAME = 'schema'
+            """
+        return ((Number) statRows[0][0]).intValue()
+    }
+
+    // Run the blocking reproduction once and assert the refresh latency 
profile.
+    def runRefreshRace = { boolean manualMissLoadEnabled, long minRefreshMs, 
long maxRefreshMs ->
+        sql """ADMIN SET FRONTEND CONFIG 
('enable_external_meta_cache_manual_miss_load' = '${manualMissLoadEnabled}')"""
+        try {
+            GetDebugPoint().enableDebugPointForAllFEs(
+                    "PluginDrivenExternalTable.initSchema.sleep",
+                    ["sleepMs": String.valueOf(slowSleepMs)])
+
+            def descElapsedMs = -1L
+            def descFailure = null
+            def descThread = Thread.start {
+                try {
+                    long descStart = System.currentTimeMillis()
+                    connect(context.config.jdbcUser, 
context.config.jdbcPassword, context.config.jdbcUrl) {
+                        sql """DESC 
${catalogName}.${remoteDbName}.${slowTableName}"""
+                    }
+                    descElapsedMs = System.currentTimeMillis() - descStart
+                } catch (Throwable t) {
+                    descFailure = t
+                }
+            }
+
+            // Delay the refresh long enough so the DESC path can enter the 
injected slow schema load.
+            Thread.sleep(refreshDelayMs)
+
+            long refreshStart = System.currentTimeMillis()
+            sql """REFRESH CATALOG ${catalogName}"""
+            long refreshElapsedMs = System.currentTimeMillis() - refreshStart
+
+            descThread.join(slowSleepMs + 15000)
+            if (descThread.isAlive()) {
+                throw new IllegalStateException("desc thread does not finish 
in time")
+            }
+            if (descFailure != null) {
+                throw new IllegalStateException("desc thread failed", 
descFailure)
+            }
+
+            logger.info("manualMissLoadEnabled=${manualMissLoadEnabled}, 
refreshElapsedMs=${refreshElapsedMs}, descElapsedMs=${descElapsedMs}")
+            assertTrue(descElapsedMs >= slowSleepMs - 1000)
+            assertTrue(refreshElapsedMs >= minRefreshMs)
+            assertTrue(refreshElapsedMs <= maxRefreshMs)
+            assertTrue(refreshElapsedMs < descElapsedMs)
+            return [refreshElapsedMs, descElapsedMs]
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllFEs("PluginDrivenExternalTable.initSchema.sleep")
+        }
+    }
+
+    try {
+        executeOnRemoteMysql("""DROP DATABASE IF EXISTS ${remoteDbName}""")
+        executeOnRemoteMysql("""CREATE DATABASE ${remoteDbName}""")
+        sql """DROP CATALOG IF EXISTS ${catalogName}"""
+        sql """CREATE CATALOG ${catalogName} PROPERTIES(
+                "type" = "jdbc",
+                "user" = "root",
+                "password" = "123456",
+                "jdbc_url" = 
"jdbc:mysql://${externalEnvIp}:${mysqlPort}/${remoteDbName}?useSSL=false",
+                "driver_url" = "${driverUrl}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "only_specified_database" = "true",
+                "include_database_list" = "${remoteDbName}"
+            )"""
+
+        def catalogRows = sql """SHOW CATALOGS LIKE '${catalogName}'"""
+        long catalogId = ((Number) catalogRows[0][0]).longValue()
+        def collisionTableNames = findCollisionTableNames(catalogId)
+
+        recreateRemoteObjects(collisionTableNames)
+        preheatSchemaCache(collisionTableNames)
+        def blockedResult = runRefreshRace(false, 8000L, slowSleepMs + 5000L)
+
+        recreateRemoteObjects(collisionTableNames)
+        preheatSchemaCache(collisionTableNames)
+        def manualLoadResult = runRefreshRace(true, 0L, 5000L)
+
+        // Verify the invalidated slow load did not write back the stale key 
into schema cache.
+        assertEquals(0, getSchemaCacheSize())
+        sql """DESC ${catalogName}.${remoteDbName}.${slowTableName}"""
+        assertEquals(1, getSchemaCacheSize())
+        assertTrue(((Number) blockedResult[0]).longValue() > ((Number) 
manualLoadResult[0]).longValue() + 5000L)
+    } finally {
+        try_sql("""ADMIN SET FRONTEND CONFIG 
('enable_external_meta_cache_manual_miss_load' = 
'${originalManualMissLoadValue}')""")
+        try {
+            
GetDebugPoint().disableDebugPointForAllFEs("PluginDrivenExternalTable.initSchema.sleep")
+        } catch (Throwable t) {
+            logger.warn("failed to disable debug point during cleanup", t)
+        }
+        try_sql("""DROP CATALOG IF EXISTS ${catalogName}""")
+        try {
+            executeOnRemoteMysql("""DROP DATABASE IF EXISTS ${remoteDbName}""")
+        } catch (Throwable t) {
+            logger.warn("failed to drop remote database during cleanup", t)
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to