This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2205effe0af [fix](catalog) avoid external catalog refresh deadlock
(#61202)
2205effe0af is described below
commit 2205effe0afa2bf717459b6a1270d5fab5825522
Author: Socrates <[email protected]>
AuthorDate: Mon Mar 16 10:50:17 2026 +0800
[fix](catalog) avoid external catalog refresh deadlock (#61202)
### What problem does this PR solve?
Issue Number: #61099
Related PR: N/A
Problem Summary:
- Avoid deadlock when external catalog refresh invalidates cache while
another thread initializes the same catalog through a cache loader.
- Add a unit test to reproduce the lock inversion and verify both worker
threads also exit cleanly after the fix.
---
.../apache/doris/datasource/ExternalCatalog.java | 16 +-
.../doris/datasource/jdbc/JdbcExternalCatalog.java | 4 +-
.../datasource/ExternalCatalogDeadlockTest.java | 162 +++++++++++++++++++++
3 files changed, 173 insertions(+), 9 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 27a31ec0e1d..4eb40256f6c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -577,14 +577,16 @@ public abstract class ExternalCatalog
* @param invalidCache if {@code true}, the catalog cache will be
invalidated
* and reloaded during the refresh process.
*/
- public synchronized void resetToUninitialized(boolean invalidCache) {
- this.objectCreated = false;
- this.initialized = false;
- synchronized (this.confLock) {
- this.cachedConf = null;
+ public void resetToUninitialized(boolean invalidCache) {
+ synchronized (this) {
+ this.objectCreated = false;
+ this.initialized = false;
+ synchronized (this.confLock) {
+ this.cachedConf = null;
+ }
+ this.lowerCaseToDatabaseName.clear();
+ onClose();
}
- this.lowerCaseToDatabaseName.clear();
- onClose();
onRefreshCache(invalidCache);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index c849d779217..a841626bcad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -77,7 +77,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
// Must add "transient" for Gson to ignore this field,
// or Gson will throw exception with HikariCP
private transient JdbcClient jdbcClient;
- private IdentifierMapping identifierMapping;
+ private volatile IdentifierMapping identifierMapping;
private ExternalFunctionRules functionRules;
public JdbcExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props,
@@ -134,7 +134,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
}
@Override
- public synchronized void resetToUninitialized(boolean invalidCache) {
+ public void resetToUninitialized(boolean invalidCache) {
super.resetToUninitialized(invalidCache);
this.identifierMapping = new JdbcIdentifierMapping(
(Env.isTableNamesCaseInsensitive() ||
Env.isStoredTableNamesLowerCase()),
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalCatalogDeadlockTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalCatalogDeadlockTest.java
new file mode 100644
index 00000000000..8a8172f2125
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalCatalogDeadlockTest.java
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.datasource.InitCatalogLog.Type;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ExternalCatalogDeadlockTest {
+
+ @Test
+ public void testResetToUninitializedShouldNotDeadlockWithCacheLoader()
throws Exception {
+ DeadlockCatalog catalog = new DeadlockCatalog();
+ CountDownLatch loaderEntered = new CountDownLatch(1);
+ CountDownLatch allowLoaderToTouchCatalog = new CountDownLatch(1);
+ AtomicReference<Throwable> backgroundFailure = new AtomicReference<>();
+
+ // The loader holds Caffeine's per-key lock before it calls back into
the catalog.
+ LoadingCache<String, String> cache = Caffeine.newBuilder().build(key
-> {
+ loaderEntered.countDown();
+ awaitLatch(allowLoaderToTouchCatalog);
+ catalog.makeSureInitialized();
+ return key;
+ });
+
+ Thread queryThread = new Thread(
+ () -> runQuietly(backgroundFailure, () ->
cache.get("deadlock-key")),
+ "deadlock-cache-loader");
+ queryThread.setDaemon(true);
+ queryThread.start();
+ Assertions.assertTrue(loaderEntered.await(5, TimeUnit.SECONDS));
+
+ Thread refreshThread = new Thread(
+ () -> runQuietly(backgroundFailure, () -> {
+ // resetToUninitialized() grabs the catalog monitor before
invalidating the cache.
+ catalog.setInvalidator(() -> {
+ allowLoaderToTouchCatalog.countDown();
+ cache.invalidate("deadlock-key");
+ });
+ catalog.resetToUninitialized(true);
+ }),
+ "deadlock-catalog-refresh");
+ refreshThread.setDaemon(true);
+ refreshThread.start();
+
+ assertNoDeadlock(queryThread, refreshThread, backgroundFailure);
+ }
+
+ private static void assertNoDeadlock(Thread queryThread, Thread
refreshThread,
+ AtomicReference<Throwable> backgroundFailure) throws Exception {
+ long[] deadlockedThreads = waitForDeadlock(queryThread, refreshThread);
+ queryThread.join(TimeUnit.SECONDS.toMillis(5));
+ refreshThread.join(TimeUnit.SECONDS.toMillis(5));
+ Assertions.assertNull(backgroundFailure.get(), "unexpected background
failure: " + backgroundFailure.get());
+ Assertions.assertNull(deadlockedThreads,
+ String.format("detected deadlock between threads %s and %s",
+ queryThread.getName(), refreshThread.getName()));
+ Assertions.assertFalse(queryThread.isAlive(), queryThread.getName() +
" is still running");
+ Assertions.assertFalse(refreshThread.isAlive(),
refreshThread.getName() + " is still running");
+ }
+
+ private static void awaitLatch(CountDownLatch latch) throws
InterruptedException {
+ Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
+
+ private static void runQuietly(AtomicReference<Throwable> failure,
ThrowingRunnable task) {
+ try {
+ task.run();
+ } catch (Throwable t) {
+ failure.compareAndSet(null, t);
+ }
+ }
+
+ private static long[] waitForDeadlock(Thread queryThread, Thread
refreshThread) throws InterruptedException {
+ ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
+ for (int i = 0; i < 100; i++) {
+ long[] deadlockedThreads = threadMxBean.findDeadlockedThreads();
+ if (deadlockedThreads != null
+ && contains(deadlockedThreads, queryThread.getId())
+ && contains(deadlockedThreads, refreshThread.getId())) {
+ return deadlockedThreads;
+ }
+ Thread.sleep(50);
+ }
+ return null;
+ }
+
+ private static boolean contains(long[] ids, long targetId) {
+ return Arrays.stream(ids).anyMatch(id -> id == targetId);
+ }
+
+ private static class DeadlockCatalog extends ExternalCatalog {
+ private Runnable invalidator = () -> {
+ };
+
+ DeadlockCatalog() {
+ super(1L, "deadlock-catalog", Type.TEST, "");
+ initialized = true;
+ }
+
+ void setInvalidator(Runnable invalidator) {
+ this.invalidator = invalidator;
+ }
+
+ @Override
+ protected void initLocalObjectsImpl() {
+ }
+
+ @Override
+ public void onClose() {
+ }
+
+ @Override
+ public void onRefreshCache(boolean invalidCache) {
+ // Keep the harness catalog usable after refresh so the test only
checks lock ordering.
+ initialized = true;
+ if (invalidCache) {
+ invalidator.run();
+ }
+ }
+
+ @Override
+ protected java.util.List<String>
listTableNamesFromRemote(SessionContext ctx, String dbName) {
+ return java.util.Collections.emptyList();
+ }
+
+ @Override
+ public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
+ return false;
+ }
+ }
+
+ @FunctionalInterface
+ private interface ThrowingRunnable {
+ void run() throws Exception;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]