This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 75cb520cbc5 [Enhancement](external catalog) Added status reset when jdbc name mapping is abnormal (#33971) 75cb520cbc5 is described below commit 75cb520cbc5b74a9ba47a4fdcfdaaeadcd33da35 Author: zy-kkk <zhongy...@gmail.com> AuthorDate: Sat Apr 27 15:40:40 2024 +0800 [Enhancement](external catalog) Added status reset when jdbc name mapping is abnormal (#33971) --- .../datasource/mapping/IdentifierMapping.java | 82 +++++++++++++++------- 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java index cd121f2b630..363ef351152 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java @@ -26,6 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; @@ -35,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; public abstract class IdentifierMapping { + private static final Logger LOG = LogManager.getLogger(IdentifierMapping.class); private final ObjectMapper mapper = new ObjectMapper(); private final ConcurrentHashMap<String, String> localDBToRemoteDB = new ConcurrentHashMap<>(); @@ -179,51 +182,59 @@ public abstract class IdentifierMapping { } public String getRemoteDatabaseName(String localDbName) { - if (localDBToRemoteDB.isEmpty() || !localDBToRemoteDB.containsKey(localDbName)) { - loadDatabaseNamesIfNeeded(); - } - return localDBToRemoteDB.get(localDbName); + return getRequiredMapping(localDBToRemoteDB, localDbName, "database", this::loadDatabaseNamesIfNeeded, + localDbName); } public String getRemoteTableName(String localDbName, String localTableName) { String remoteDbName = getRemoteDatabaseName(localDbName); - if (localTableToRemoteTable.isEmpty() - || !localTableToRemoteTable.containsKey(remoteDbName) - || localTableToRemoteTable.get(remoteDbName) == null - || localTableToRemoteTable.get(remoteDbName).isEmpty() - || !localTableToRemoteTable.get(remoteDbName).containsKey(localTableName) - || localTableToRemoteTable.get(remoteDbName).get(localTableName) == null) { - loadTableNamesIfNeeded(localDbName); - } - - return localTableToRemoteTable.get(remoteDbName).get(localTableName); + Map<String, String> tableMap = localTableToRemoteTable.computeIfAbsent(remoteDbName, + k -> new ConcurrentHashMap<>()); + return getRequiredMapping(tableMap, localTableName, "table", () -> loadTableNamesIfNeeded(localDbName), + localTableName); } public Map<String, String> getRemoteColumnNames(String localDbName, String localTableName) { String remoteDbName = getRemoteDatabaseName(localDbName); String remoteTableName = getRemoteTableName(localDbName, localTableName); - if (localColumnToRemoteColumn.isEmpty() - || !localColumnToRemoteColumn.containsKey(remoteDbName) - || localColumnToRemoteColumn.get(remoteDbName) == null - || localColumnToRemoteColumn.get(remoteDbName).isEmpty() - || !localColumnToRemoteColumn.get(remoteDbName).containsKey(remoteTableName) - || localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName) == null - || localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName).isEmpty()) { + ConcurrentHashMap<String, ConcurrentHashMap<String, String>> tableColumnMap + = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> new ConcurrentHashMap<>()); + Map<String, String> columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>()); + if (columnMap.isEmpty()) { + LOG.info("Column name mapping missing, loading column names for localDbName: {}, localTableName: {}", + localDbName, localTableName); loadColumnNamesIfNeeded(localDbName, localTableName); + columnMap = tableColumnMap.get(remoteTableName); } - return localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName); + if (columnMap.isEmpty()) { + LOG.warn("No remote column found for localTableName: {}. Please refresh this catalog.", localTableName); + throw new RuntimeException( + "No remote column found for localTableName: " + localTableName + ". Please refresh this catalog."); + } + return columnMap; } + private void loadDatabaseNamesIfNeeded() { if (dbNamesLoaded.compareAndSet(false, true)) { - loadDatabaseNames(); + try { + loadDatabaseNames(); + } catch (Exception e) { + dbNamesLoaded.set(false); // Reset on failure + LOG.warn("Error loading database names", e); + } } } private void loadTableNamesIfNeeded(String localDbName) { AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false)); if (isLoaded.compareAndSet(false, true)) { - loadTableNames(localDbName); + try { + loadTableNames(localDbName); + } catch (Exception e) { + tableNamesLoadedMap.get(localDbName).set(false); // Reset on failure + LOG.warn("Error loading table names for localDbName: {}", localDbName, e); + } } } @@ -232,8 +243,29 @@ public abstract class IdentifierMapping { AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName) .computeIfAbsent(localTableName, k -> new AtomicBoolean(false)); if (isLoaded.compareAndSet(false, true)) { - loadColumnNames(localDbName, localTableName); + try { + loadColumnNames(localDbName, localTableName); + } catch (Exception e) { + columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset on failure + LOG.warn("Error loading column names for localDbName: {}, localTableName: {}", localDbName, + localTableName, e); + } + } + } + + private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName, Runnable loadIfNeeded, + String entityName) { + if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) { + LOG.info("{} mapping missing, loading for {}: {}", typeName, typeName, entityName); + loadIfNeeded.run(); + } + V value = map.get(key); + if (value == null) { + LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName); + throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName + + ". Please refresh this catalog."); } + return value; } // Load the database name from the data source. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org