This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 75cb520cbc5 [Enhancement](external catalog) Added status reset when 
jdbc name mapping is abnormal (#33971)
75cb520cbc5 is described below

commit 75cb520cbc5b74a9ba47a4fdcfdaaeadcd33da35
Author: zy-kkk <zhongy...@gmail.com>
AuthorDate: Sat Apr 27 15:40:40 2024 +0800

    [Enhancement](external catalog) Added status reset when jdbc name mapping 
is abnormal (#33971)
---
 .../datasource/mapping/IdentifierMapping.java      | 82 +++++++++++++++-------
 1 file changed, 57 insertions(+), 25 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
index cd121f2b630..363ef351152 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java
@@ -26,6 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.Collections;
 import java.util.List;
@@ -35,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public abstract class IdentifierMapping {
+    private static final Logger LOG = 
LogManager.getLogger(IdentifierMapping.class);
 
     private final ObjectMapper mapper = new ObjectMapper();
     private final ConcurrentHashMap<String, String> localDBToRemoteDB = new 
ConcurrentHashMap<>();
@@ -179,51 +182,59 @@ public abstract class IdentifierMapping {
     }
 
     public String getRemoteDatabaseName(String localDbName) {
-        if (localDBToRemoteDB.isEmpty() || 
!localDBToRemoteDB.containsKey(localDbName)) {
-            loadDatabaseNamesIfNeeded();
-        }
-        return localDBToRemoteDB.get(localDbName);
+        return getRequiredMapping(localDBToRemoteDB, localDbName, "database", 
this::loadDatabaseNamesIfNeeded,
+                localDbName);
     }
 
     public String getRemoteTableName(String localDbName, String 
localTableName) {
         String remoteDbName = getRemoteDatabaseName(localDbName);
-        if (localTableToRemoteTable.isEmpty()
-                || !localTableToRemoteTable.containsKey(remoteDbName)
-                || localTableToRemoteTable.get(remoteDbName) == null
-                || localTableToRemoteTable.get(remoteDbName).isEmpty()
-                || 
!localTableToRemoteTable.get(remoteDbName).containsKey(localTableName)
-                || 
localTableToRemoteTable.get(remoteDbName).get(localTableName) == null) {
-            loadTableNamesIfNeeded(localDbName);
-        }
-
-        return localTableToRemoteTable.get(remoteDbName).get(localTableName);
+        Map<String, String> tableMap = 
localTableToRemoteTable.computeIfAbsent(remoteDbName,
+                k -> new ConcurrentHashMap<>());
+        return getRequiredMapping(tableMap, localTableName, "table", () -> 
loadTableNamesIfNeeded(localDbName),
+                localTableName);
     }
 
     public Map<String, String> getRemoteColumnNames(String localDbName, String 
localTableName) {
         String remoteDbName = getRemoteDatabaseName(localDbName);
         String remoteTableName = getRemoteTableName(localDbName, 
localTableName);
-        if (localColumnToRemoteColumn.isEmpty()
-                || !localColumnToRemoteColumn.containsKey(remoteDbName)
-                || localColumnToRemoteColumn.get(remoteDbName) == null
-                || localColumnToRemoteColumn.get(remoteDbName).isEmpty()
-                || 
!localColumnToRemoteColumn.get(remoteDbName).containsKey(remoteTableName)
-                || 
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName) == null
-                || 
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName).isEmpty()) {
+        ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
tableColumnMap
+                = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> 
new ConcurrentHashMap<>());
+        Map<String, String> columnMap = 
tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
+        if (columnMap.isEmpty()) {
+            LOG.info("Column name mapping missing, loading column names for 
localDbName: {}, localTableName: {}",
+                    localDbName, localTableName);
             loadColumnNamesIfNeeded(localDbName, localTableName);
+            columnMap = tableColumnMap.get(remoteTableName);
         }
-        return 
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName);
+        if (columnMap.isEmpty()) {
+            LOG.warn("No remote column found for localTableName: {}. Please 
refresh this catalog.", localTableName);
+            throw new RuntimeException(
+                    "No remote column found for localTableName: " + 
localTableName + ". Please refresh this catalog.");
+        }
+        return columnMap;
     }
 
+
     private void loadDatabaseNamesIfNeeded() {
         if (dbNamesLoaded.compareAndSet(false, true)) {
-            loadDatabaseNames();
+            try {
+                loadDatabaseNames();
+            } catch (Exception e) {
+                dbNamesLoaded.set(false); // Reset on failure
+                LOG.warn("Error loading database names", e);
+            }
         }
     }
 
     private void loadTableNamesIfNeeded(String localDbName) {
         AtomicBoolean isLoaded = 
tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
         if (isLoaded.compareAndSet(false, true)) {
-            loadTableNames(localDbName);
+            try {
+                loadTableNames(localDbName);
+            } catch (Exception e) {
+                tableNamesLoadedMap.get(localDbName).set(false); // Reset on 
failure
+                LOG.warn("Error loading table names for localDbName: {}", 
localDbName, e);
+            }
         }
     }
 
@@ -232,8 +243,29 @@ public abstract class IdentifierMapping {
         AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
                 .computeIfAbsent(localTableName, k -> new 
AtomicBoolean(false));
         if (isLoaded.compareAndSet(false, true)) {
-            loadColumnNames(localDbName, localTableName);
+            try {
+                loadColumnNames(localDbName, localTableName);
+            } catch (Exception e) {
+                
columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset 
on failure
+                LOG.warn("Error loading column names for localDbName: {}, 
localTableName: {}", localDbName,
+                        localTableName, e);
+            }
+        }
+    }
+
+    private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName, 
Runnable loadIfNeeded,
+            String entityName) {
+        if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) {
+            LOG.info("{} mapping missing, loading for {}: {}", typeName, 
typeName, entityName);
+            loadIfNeeded.run();
+        }
+        V value = map.get(key);
+        if (value == null) {
+            LOG.warn("No remote {} found for {}: {}. Please refresh this 
catalog.", typeName, typeName, entityName);
+            throw new RuntimeException("No remote " + typeName + " found for " 
+ typeName + ": " + entityName
+                    + ". Please refresh this catalog.");
         }
+        return value;
     }
 
     // Load the database name from the data source.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to