This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b0f1d4904289869af445ec7171b3af4126d634ca
Author: Peter Rozsa <[email protected]>
AuthorDate: Mon Apr 28 09:36:26 2025 +0200

    IMPALA-14016: Add multi-catalog support for local catalog mode
    
    This patch adds a new MetaProvider called MultiMetaProvider, which is
    capable of handling multiple MetaProviders at once, prioritizing one
    primary provider over multiple secondary providers. The primary
    provider handles some methods exclusively for deterministic behavior.
    In database listings, if one database name occurs multiple times the
    contained tables are merged under that database name; if the two
    separate databases contain a table with the same name, the query
    analyzation fails with an error.
    This change also modifies the local catalog implementation's
    initialization. If catalogd is deployed, then it instantiates the
    CatalogdMetaProvider and checks if the catalog configuration directory
    is set as a backend flag. If it's set, then it tries to load every
    configuration from the folder, and tries to instantiate the
    IcebergMetaProvider from those configs. If the instantiation fails, an
    error is reported to the logs, but the startup is not interrupted.
    
    Tests:
     - E2E tests for multi-catalog behavior
     - Unit test for ConfigLoader
    
    Change-Id: Ifbdd0f7085345e7954d9f6f264202699182dd1e1
    Reviewed-on: http://gerrit.cloudera.org:8080/22878
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Zoltan Borok-Nagy <[email protected]>
---
 .../impala/authorization/AuthorizationFactory.java |   2 +-
 .../authorization/NoopAuthorizationFactory.java    |   2 +-
 .../ranger/RangerAuthorizationFactory.java         |   2 +-
 .../org/apache/impala/catalog/FeCatalogUtils.java  |   4 +
 .../impala/catalog/iceberg/IcebergRESTCatalog.java |  11 +-
 .../impala/catalog/local/IcebergMetaProvider.java  |   6 +-
 .../impala/catalog/local/MultiMetaProvider.java    | 328 +++++++++++++++++++++
 .../apache/impala/service/FeCatalogManager.java    | 269 -----------------
 .../java/org/apache/impala/service/Frontend.java   |   3 +-
 .../service/catalogmanager/CatalogdImpl.java       |  71 +++++
 .../service/catalogmanager/ConfigLoader.java       |  88 ++++++
 .../service/catalogmanager/FeCatalogManager.java   |  89 ++++++
 .../impala/service/catalogmanager/LocalImpl.java   | 117 ++++++++
 .../impala/service/catalogmanager/TestImpl.java    |  45 +++
 .../org/apache/impala/common/FrontendTestBase.java |   2 +-
 .../service/catalogmanager/ConfigLoaderTest.java   | 127 ++++++++
 java/iceberg-rest-catalog-test/pom.xml             |   6 +
 .../iceberg/rest/IcebergRestCatalogTest.java       | 155 +++++++---
 testdata/bin/run-iceberg-rest-server.sh            |   2 +-
 .../multicatalog_rest_config/rest-1.properties}    |  20 +-
 .../multicatalog_rest_config/rest-2.properties}    |  20 +-
 .../functional/functional_schema_template.sql      |  14 +
 .../datasets/functional/schema_constraints.csv     |   1 +
 .../queries/QueryTest/iceberg-multicatalog.test    |  45 +++
 ...berg-multiple-rest-catalogs-ambiguous-name.test |   6 +
 .../QueryTest/iceberg-multiple-rest-catalogs.test  |  40 +++
 tests/common/iceberg_rest_server.py                |  50 +++-
 tests/custom_cluster/test_iceberg_rest_catalog.py  | 152 ++++++++--
 28 files changed, 1268 insertions(+), 409 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java 
b/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java
index 9b9dba84b..cbb8a6f51 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationFactory.java
@@ -20,7 +20,7 @@ package org.apache.impala.authorization;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.service.BackendConfig;
-import org.apache.impala.service.FeCatalogManager;
+import org.apache.impala.service.catalogmanager.FeCatalogManager;
 
 import java.util.function.Supplier;
 
diff --git 
a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
 
b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
index 23733f680..8714b3294 100644
--- 
a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
+++ 
b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
@@ -24,7 +24,7 @@ import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.service.BackendConfig;
-import org.apache.impala.service.FeCatalogManager;
+import org.apache.impala.service.catalogmanager.FeCatalogManager;
 import org.apache.impala.thrift.TCatalogServiceRequestHeader;
 import org.apache.impala.thrift.TCreateDropRoleParams;
 import org.apache.impala.thrift.TDdlExecResponse;
diff --git 
a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
 
b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
index c0a04e445..c536ec607 100644
--- 
a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
+++ 
b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
@@ -27,7 +27,7 @@ import org.apache.impala.authorization.AuthorizationManager;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.service.BackendConfig;
-import org.apache.impala.service.FeCatalogManager;
+import org.apache.impala.service.catalogmanager.FeCatalogManager;
 
 import java.util.function.Supplier;
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java 
b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index b8a892ade..aa62ac352 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -44,6 +44,7 @@ import org.apache.impala.catalog.local.LocalIcebergTable;
 import org.apache.impala.catalog.local.LocalKuduTable;
 import org.apache.impala.catalog.local.LocalView;
 import org.apache.impala.catalog.local.MetaProvider;
+import org.apache.impala.catalog.local.MultiMetaProvider;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.service.BackendConfig;
@@ -396,6 +397,9 @@ public abstract class FeCatalogUtils {
     if (!BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) return;
     Preconditions.checkState(catalog instanceof LocalCatalog);
     MetaProvider provider = ((LocalCatalog) catalog).getMetaProvider();
+    if (provider instanceof MultiMetaProvider) {
+      provider = ((MultiMetaProvider) provider).getPrimaryProvider();
+    }
     if (!(provider instanceof CatalogdMetaProvider)) return;
 
     CacheStats stats = ((CatalogdMetaProvider) provider).getCacheStats();
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java
index d1c5c0786..2440d8e8c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java
@@ -40,18 +40,9 @@ import org.apache.impala.util.IcebergUtil;
 public class IcebergRESTCatalog implements IcebergCatalog {
   private final String REST_URI;
 
-  private static IcebergRESTCatalog instance_;
   private final RESTCatalog restCatalog_;
 
-  public synchronized static IcebergRESTCatalog getInstance(
-      Properties properties) {
-    if (instance_ == null) {
-      instance_ = new IcebergRESTCatalog(properties);
-    }
-    return instance_;
-  }
-
-  private IcebergRESTCatalog(Properties properties) {
+  public IcebergRESTCatalog(Properties properties) {
     setContextClassLoader();
 
     RESTCatalogProperties restConfig = new RESTCatalogProperties(properties);
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java
index d9de92392..bdbb9cea9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java
@@ -106,17 +106,13 @@ public class IcebergMetaProvider implements MetaProvider {
 
   public IcebergMetaProvider(Properties properties) {
     properties_ = properties;
-    iceCatalog_ = initCatalog();
+    iceCatalog_ = new IcebergRESTCatalog(properties);
   }
 
   public String getURI() {
     return "Iceberg REST (" + iceCatalog_.getUri() + ")";
   }
 
-  private IcebergRESTCatalog initCatalog() {
-    return IcebergRESTCatalog.getInstance(properties_);
-  }
-
   public void setAuthzChecker(
       AtomicReference<? extends AuthorizationChecker> authzChecker) {
     authzChecker_ = authzChecker;
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/MultiMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/MultiMetaProvider.java
new file mode 100644
index 000000000..30d5d3035
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MultiMetaProvider.java
@@ -0,0 +1,328 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.catalog.local;
+
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.DataSource;
+import org.apache.impala.catalog.Function;
+import org.apache.impala.catalog.HdfsCachePool;
+import org.apache.impala.catalog.SqlConstraints;
+import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TBriefTableMeta;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialTableInfo;
+import org.apache.impala.thrift.TValidWriteIdList;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TException;
+
+/**
+ * MetaProvider implementation that proxies calls through a chain of 
MetaProviders. First,
+ * the primary provider is visited, if it yields exception, secondary 
providers are
+ * visited one-by-one. Some methods resort to the primary provider to keep the 
behavior
+ * concise, for example: authorization policy checking, readiness probe, and 
null
+ * partition key-value fetching is directed to the primary provider.
+ */
+public class MultiMetaProvider implements MetaProvider {
+
+  private final MetaProvider primaryProvider_;
+  private final List<MetaProvider> secondaryProviders_;
+
+  public MultiMetaProvider(MetaProvider primaryProvider,
+      List<MetaProvider> secondaryProviders) {
+    primaryProvider_ = primaryProvider;
+    secondaryProviders_ = secondaryProviders;
+  }
+
+  public MetaProvider getPrimaryProvider() {
+    return primaryProvider_;
+  }
+
+  @Override
+  public String getURI() {
+    StringJoiner joiner = new StringJoiner(", ");
+    joiner.add(primaryProvider_.getURI());
+    for (MetaProvider provider : secondaryProviders_) {
+      joiner.add(provider.getURI());
+    }
+    return joiner.toString();
+  }
+
+  @Override
+  public AuthorizationPolicy getAuthPolicy() {
+    return primaryProvider_.getAuthPolicy();
+  }
+
+  @Override
+  public boolean isReady() {
+    return primaryProvider_.isReady();
+  }
+
+  @Override
+  public void waitForIsReady(long timeoutMs) {
+    primaryProvider_.waitForIsReady(timeoutMs);
+  }
+
+  @Override
+  public ImmutableList<String> loadDbList() throws TException {
+    return collectFromAllProviders(
+        unchecked(MetaProvider::loadDbList)).stream().flatMap(
+        
Collection::stream).distinct().collect(ImmutableList.toImmutableList());
+  }
+
+  @Override
+  public Database loadDb(String dbName) throws TException {
+    return tryAllProviders(
+        unchecked(provider -> provider.loadDb(dbName)));
+  }
+
+  @Override
+  public ImmutableCollection<TBriefTableMeta> loadTableList(String dbName)
+      throws TException {
+    ImmutableList<TBriefTableMeta> combinedTableList = collectFromAllProviders(
+        unchecked(provider -> 
provider.loadTableList(dbName))).stream().flatMap(
+        Collection::stream).collect(ImmutableList.toImmutableList());
+    Optional<Entry<String, Integer>> firstDuplicate = 
combinedTableList.stream()
+        .map(tableMeta -> tableMeta.name)
+        .collect(Collectors.toMap(s -> s, s -> 1, 
Integer::sum)).entrySet().stream()
+        .filter(stringIntegerEntry -> stringIntegerEntry.getValue() > 
1).findFirst();
+    if (firstDuplicate.isPresent()) {
+      throw new TException("Ambiguous table name: " + 
firstDuplicate.get().getKey());
+    }
+    return combinedTableList;
+  }
+
+  @Override
+  public Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
+      throws TException {
+    return tryAllProviders(
+        unchecked(provider -> provider.loadTable(dbName, tableName)));
+  }
+
+  @Override
+  public Pair<Table, TableMetaRef> getTableIfPresent(String dbName, String 
tableName) {
+    try {
+      return tryAllProviders(
+          unchecked(provider -> provider.getTableIfPresent(dbName, 
tableName)));
+    } catch (TException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public String loadNullPartitionKeyValue() throws TException {
+    return primaryProvider_.loadNullPartitionKeyValue();
+  }
+
+  @Override
+  public List<PartitionRef> loadPartitionList(TableMetaRef table)
+      throws TException {
+    return tryAllProviders(
+        unchecked(provider -> provider.loadPartitionList(table)));
+  }
+
+  @Override
+  public SqlConstraints loadConstraints(TableMetaRef table, Table msTbl)
+      throws TException {
+    return tryAllProviders(
+        unchecked(provider -> provider.loadConstraints(table, msTbl)));
+  }
+
+  @Override
+  public List<String> loadFunctionNames(String dbName) throws TException {
+    return tryAllProviders(
+        unchecked(provider -> provider.loadFunctionNames(dbName)));
+  }
+
+  @Override
+  public ImmutableList<Function> loadFunction(String dbName, String 
functionName)
+      throws TException {
+    return tryAllProviders(
+        unchecked(provider -> provider.loadFunction(dbName, functionName)));
+  }
+
+  @Override
+  public ImmutableList<DataSource> loadDataSources() throws TException {
+    return tryAllProviders(unchecked(MetaProvider::loadDataSources));
+  }
+
+  @Override
+  public DataSource loadDataSource(String dsName) throws TException {
+    return tryAllProviders(
+        unchecked(provider -> provider.loadDataSource(dsName)));
+  }
+
+  @Override
+  public Map<String, PartitionMetadata> loadPartitionsByRefs(TableMetaRef 
table,
+      List<String> partitionColumnNames, ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs)
+      throws TException, CatalogException {
+    return tryAllProviders(unchecked(
+        provider -> provider.loadPartitionsByRefs(table, partitionColumnNames,
+            hostIndex, partitionRefs)));
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef 
table,
+      List<String> colNames) throws TException {
+    return tryAllProviders(
+        unchecked(provider -> provider.loadTableColumnStatistics(table, 
colNames)));
+  }
+
+  @Override
+  public TPartialTableInfo loadIcebergTable(TableMetaRef table) throws 
TException {
+    return tryAllProviders(
+        unchecked( provider -> provider.loadIcebergTable(table)));
+  }
+
+  @Override
+  public org.apache.iceberg.Table loadIcebergApiTable(TableMetaRef table,
+      TableParams param, Table msTable) throws TException {
+    return tryAllProviders(
+        unchecked(provider -> provider.loadIcebergApiTable(table, param, 
msTable)));
+  }
+
+  @Override
+  public TValidWriteIdList getValidWriteIdList(TableMetaRef ref) {
+    try {
+      return tryAllProviders(unchecked(
+          provider -> provider.getValidWriteIdList(ref)));
+    } catch (TException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public Iterable<HdfsCachePool> getHdfsCachePools() {
+    try {
+      return tryAllProviders(unchecked(MetaProvider::getHdfsCachePools));
+    } catch (TException e) {
+      return null;
+    }
+  }
+
+  private ImmutableList<MetaProvider> getAllProviders() {
+    return ImmutableList.<MetaProvider>builder()
+        .add(primaryProvider_)
+        .addAll(secondaryProviders_).build();
+  }
+
+  private <R> R tryAllProviders(java.util.function.Function<MetaProvider, R> 
function)
+      throws TException {
+    Map<String, Exception> exceptions = new HashMap<>();
+
+    ImmutableList<MetaProvider> providers = getAllProviders();
+
+    for (MetaProvider provider : providers) {
+      try {
+        return function.apply(provider);
+      } catch (MetaProviderException e) {
+        exceptions.put(provider.getURI(), e);
+      }
+    }
+
+    handleExceptions(exceptions);
+    return null;
+  }
+
+  private <R> Collection<R> collectFromAllProviders(
+      java.util.function.Function<MetaProvider, R> function)
+      throws TException {
+    Map<String, Exception> exceptions = new HashMap<>();
+
+    ImmutableList<MetaProvider> providers = getAllProviders();
+
+    Collection<R> results = new ArrayList<>();
+
+    for (MetaProvider provider : providers) {
+      try {
+        results.add(function.apply(provider));
+      } catch (MetaProviderException e) {
+        exceptions.put(provider.getURI(), e);
+      }
+    }
+
+    if (!results.isEmpty()) {
+      return results;
+    }
+
+    handleExceptions(exceptions);
+    return Collections.emptyList();
+  }
+
+  private void handleExceptions(Map<String, Exception> exceptions) throws 
TException {
+    if (!exceptions.isEmpty()) {
+      StringWriter stringWriter = new StringWriter();
+      PrintWriter printWriter = new PrintWriter(stringWriter);
+      for (Entry<String, Exception> e : exceptions.entrySet()) {
+        printWriter.print(String.format("%s: ", e.getKey()));
+        e.getValue().printStackTrace(printWriter);
+      }
+      String message = String.format(
+          "Every MetaProvider failed with the following exceptions: %s",
+          stringWriter);
+      throw new TException(message);
+    }
+  }
+
+
+  private static <T, R> java.util.function.Function<T, R> unchecked(
+      ThrowingFunction<T, R> tf) {
+    return t -> {
+      try {
+        return tf.apply(t);
+      } catch (Exception e) {
+        throw new MetaProviderException(e);
+      }
+    };
+  }
+
+  /**
+   * Exception class to make exception wrapping/unwrapping clear in
+   * 'collectFromAllProviders' and 'tryAllProviders'.
+   */
+  public static class MetaProviderException extends RuntimeException {
+
+    MetaProviderException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  @FunctionalInterface
+  public interface ThrowingFunction<T, R> {
+
+    R apply(T t) throws Exception;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java 
b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
deleted file mode 100644
index e5ece72ce..000000000
--- a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
+++ /dev/null
@@ -1,269 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.impala.service;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.List;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.Path;
-import org.apache.impala.authorization.AuthorizationChecker;
-import org.apache.impala.catalog.CatalogException;
-import org.apache.impala.catalog.FeCatalog;
-import org.apache.impala.catalog.ImpaladCatalog;
-import org.apache.impala.catalog.local.CatalogdMetaProvider;
-import org.apache.impala.catalog.local.IcebergMetaProvider;
-import org.apache.impala.catalog.local.LocalCatalog;
-import org.apache.impala.thrift.TBackendGflags;
-import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
-import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
-import org.apache.thrift.TException;
-
-/**
- * Manages the Catalog implementation used by the frontend.
- *
- * This class abstracts away the different lifecycles used by the LocalCatalog
- * and the ImpaladCatalog. The former creates a new instance for each request 
or
- * query, whereas the latter only creates a new instance upon receiving a full 
update
- * from the catalogd via the statestore.
- */
-public abstract class FeCatalogManager {
-
-  protected AtomicReference<? extends AuthorizationChecker> authzChecker_;
-
-  /**
-   * @return the appropriate implementation based on the current backend
-   * configuration.
-   */
-  public static FeCatalogManager createFromBackendConfig() {
-    TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
-    if (cfg.use_local_catalog) {
-      if (!cfg.catalogd_deployed) {
-        // Currently Iceberg REST Catalog is the only implementation.
-        return new IcebergRestCatalogImpl();
-      } else {
-        return new LocalImpl();
-      }
-    } else {
-      return new CatalogdImpl();
-    }
-  }
-
-  /**
-   * Create a manager which always returns the same instance and does not 
permit
-   * updates from the statestore.
-   */
-  public static FeCatalogManager createForTests(FeCatalog testCatalog) {
-    return new TestImpl(testCatalog);
-  }
-
-  public void setAuthzChecker(
-      AtomicReference<? extends AuthorizationChecker> authzChecker) {
-    authzChecker_ = Preconditions.checkNotNull(authzChecker);
-  }
-
-  /**
-   * @return a Catalog instance to be used for a request or query. Depending
-   * on the catalog implementation this may either be a reused instance or a
-   * fresh one for each query.
-   */
-  public abstract FeCatalog getOrCreateCatalog();
-
-  /**
-   * Update the Catalog based on an update from the state store.
-   *
-   * This can be called either in response to a DDL statement (in which case 
the update
-   * may include just the changed objects related to that DDL) or due to data 
being
-   * published by the state store.
-   *
-   * In the case of the DDL-triggered update, the return value is ignored. In 
the case
-   * of the statestore update, the return value is passed back to the C++ code 
to
-   * indicate the last applied catalog update and used to implement SYNC_DDL.
-   */
-  abstract TUpdateCatalogCacheResponse updateCatalogCache(
-      TUpdateCatalogCacheRequest req) throws CatalogException, TException;
-
-  /**
-   * Implementation which creates ImpaladCatalog instances and expects to 
receive
-   * updates via the statestore. New instances are created only when full 
updates
-   * are received.
-   */
-  private static class CatalogdImpl extends FeCatalogManager {
-    private final AtomicReference<ImpaladCatalog> catalog_ =
-        new AtomicReference<>();
-
-    private CatalogdImpl() {
-      catalog_.set(createNewCatalog());
-    }
-
-    @Override
-    public FeCatalog getOrCreateCatalog() {
-      return catalog_.get();
-    }
-
-    @Override
-    TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest 
req)
-        throws CatalogException, TException {
-      ImpaladCatalog catalog = catalog_.get();
-      if (req.is_delta) return catalog.updateCatalog(req);
-
-      // If this is not a delta, this update should replace the current
-      // Catalog contents so create a new catalog and populate it.
-      ImpaladCatalog oldCatalog = catalog;
-      catalog = createNewCatalog();
-
-      TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
-
-      // Now that the catalog has been updated, replace the reference to
-      // catalog_. This ensures that clients don't see the catalog
-      // disappear. The catalog is guaranteed to be ready since 
updateCatalog() has a
-      // postcondition of isReady() == true.
-      catalog_.set(catalog);
-      if (oldCatalog != null) oldCatalog.release();
-
-      return response;
-    }
-
-    private ImpaladCatalog createNewCatalog() {
-      return new ImpaladCatalog(authzChecker_);
-    }
-  }
-
-  /**
-   * Implementation which creates LocalCatalog instances. A new instance is
-   * created for each request or query.
-   */
-  private static class LocalImpl extends FeCatalogManager {
-    private static CatalogdMetaProvider PROVIDER = new CatalogdMetaProvider(
-        BackendConfig.INSTANCE.getBackendCfg());
-
-    @Override
-    public FeCatalog getOrCreateCatalog() {
-      PROVIDER.setAuthzChecker(authzChecker_);
-      return new LocalCatalog(PROVIDER);
-    }
-
-    @Override
-    TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest 
req) {
-      return PROVIDER.updateCatalogCache(req);
-    }
-  }
-
-  /**
-   * Implementation which creates LocalCatalog instances and uses an Iceberg 
REST
-   * Catalog.
-   * TODO(boroknagyz): merge with LocalImpl
-   */
-  private static class IcebergRestCatalogImpl extends FeCatalogManager {
-    private static IcebergMetaProvider PROVIDER;
-
-    @Override
-    public synchronized FeCatalog getOrCreateCatalog() {
-      if (PROVIDER == null) {
-        try {
-          PROVIDER = initProvider();
-        } catch (IOException e) {
-          throw new IllegalStateException("Create IcebergMetaProvider failed", 
e);
-        }
-      }
-      return new LocalCatalog(PROVIDER);
-    }
-
-    IcebergMetaProvider initProvider() throws IOException {
-      TBackendGflags flags = BackendConfig.INSTANCE.getBackendCfg();
-      String catalogConfigDir = flags.catalog_config_dir;
-      Preconditions.checkState(catalogConfigDir != null &&
-          !catalogConfigDir.isEmpty());
-      List<String> files = listFiles(catalogConfigDir);
-      Preconditions.checkState(files.size() == 1,
-          String.format("Expected number of files in directory %s is one, 
found %d files",
-              catalogConfigDir, files.size()));
-      String configFile = catalogConfigDir + Path.SEPARATOR + files.get(0);
-      Properties props = readPropertiesFile(configFile);
-      // In the future we can expect different catalog types, but currently we 
only
-      // support Iceberg REST Catalogs.
-      checkPropertyValue(configFile, props, "connector.name", "iceberg");
-      checkPropertyValue(configFile, props, "iceberg.catalog.type", "rest");
-      return new IcebergMetaProvider(props);
-    }
-
-    private List<String> listFiles(String dirPath) {
-      File dir = new File(dirPath);
-      Preconditions.checkState(dir.exists() && dir.isDirectory());
-      return Stream.of(dir.listFiles())
-          .filter(file -> !file.isDirectory())
-          .map(File::getName)
-          .collect(Collectors.toList());
-    }
-
-    private Properties readPropertiesFile(String file) throws IOException {
-      Properties props = new Properties();
-      props.load(new FileInputStream(file));
-      return props;
-    }
-
-    private void checkPropertyValue(String configFile, Properties props, 
String key,
-        String expectedValue) {
-      if (!props.containsKey(key)) {
-        throw new IllegalStateException(String.format(
-            "Expected property %s was not specified in config file %s.", key,
-            configFile));
-      }
-      String actualValue = props.getProperty(key);
-      if (!Objects.equals(actualValue, expectedValue)) {
-        throw new IllegalStateException(String.format(
-            "Expected value of '%s' is '%s', but found '%s' in config file %s",
-            key, expectedValue, actualValue, configFile));
-      }
-    }
-
-    @Override
-    TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest 
req) {
-      return null;
-    }
-  }
-
-  /**
-   * Implementation which returns a provided catalog instance, used by tests.
-   * No updates from the statestore are permitted.
-   */
-  private static class TestImpl extends FeCatalogManager {
-    private final FeCatalog catalog_;
-
-    TestImpl(FeCatalog catalog) {
-      catalog_ = catalog;
-    }
-
-    @Override
-    public FeCatalog getOrCreateCatalog() {
-      return catalog_;
-    }
-
-    @Override
-    TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest 
req) {
-      throw new IllegalStateException(
-          "Unexpected call to updateCatalogCache() with a test catalog 
instance");
-    }
-  }
-}
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index d37e357d7..5f16a8819 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -142,7 +142,6 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.StructType;
-import org.apache.impala.catalog.SystemTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
@@ -170,7 +169,7 @@ import org.apache.impala.planner.HdfsScanNode;
 import org.apache.impala.planner.PlanFragment;
 import org.apache.impala.planner.Planner;
 import org.apache.impala.planner.ScanNode;
-import org.apache.impala.service.Frontend;
+import org.apache.impala.service.catalogmanager.FeCatalogManager;
 import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.TAlterDbParams;
 import org.apache.impala.thrift.TBackendGflags;
diff --git 
a/fe/src/main/java/org/apache/impala/service/catalogmanager/CatalogdImpl.java 
b/fe/src/main/java/org/apache/impala/service/catalogmanager/CatalogdImpl.java
new file mode 100644
index 000000000..448495609
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/service/catalogmanager/CatalogdImpl.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.service.catalogmanager;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.FeCatalog;
+import org.apache.impala.catalog.ImpaladCatalog;
+import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
+import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
+import org.apache.thrift.TException;
+
+/**
+ * Implementation which creates ImpaladCatalog instances and expects to 
receive updates
+ * via the statestore. New instances are created only when full updates are 
received.
+ */
+class CatalogdImpl extends FeCatalogManager {
+
+  private final AtomicReference<ImpaladCatalog> catalog_ =
+      new AtomicReference<>();
+
+  CatalogdImpl() {
+    catalog_.set(createNewCatalog());
+  }
+
+  @Override
+  public FeCatalog getOrCreateCatalog() {
+    return catalog_.get();
+  }
+
+  @Override
+  public TUpdateCatalogCacheResponse 
updateCatalogCache(TUpdateCatalogCacheRequest req)
+      throws CatalogException, TException {
+    ImpaladCatalog catalog = catalog_.get();
+    if (req.is_delta) return catalog.updateCatalog(req);
+
+    // If this is not a delta, this update should replace the current
+    // Catalog contents so create a new catalog and populate it.
+    ImpaladCatalog oldCatalog = catalog;
+    catalog = createNewCatalog();
+
+    TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);
+
+    // Now that the catalog has been updated, replace the reference to
+    // catalog_. This ensures that clients don't see the catalog
+    // disappear. The catalog is guaranteed to be ready since updateCatalog() 
has a
+    // postcondition of isReady() == true.
+    catalog_.set(catalog);
+    if (oldCatalog != null) oldCatalog.release();
+
+    return response;
+  }
+
+  private ImpaladCatalog createNewCatalog() {
+    return new ImpaladCatalog(authzChecker_);
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/service/catalogmanager/ConfigLoader.java 
b/fe/src/main/java/org/apache/impala/service/catalogmanager/ConfigLoader.java
new file mode 100644
index 000000000..0a7bf1283
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/service/catalogmanager/ConfigLoader.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.service.catalogmanager;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.impala.common.ImpalaRuntimeException;
+
+public class ConfigLoader {
+
+  private final File configFolder_;
+
+  public ConfigLoader(File configFolder) {
+    this.configFolder_ = configFolder;
+  }
+
+  public List<Properties> loadConfigs() throws ImpalaRuntimeException {
+    List<File> files = listFiles(configFolder_);
+    List<Properties> propertiesList = new ArrayList<>();
+    for (File configFile : files) {
+      String fileName = configFile.getName();
+      try {
+        Properties props = readPropertiesFile(configFile);
+        checkPropertyValue(fileName, props, "connector.name", "iceberg");
+        checkPropertyValue(fileName, props, "iceberg.catalog.type", "rest");
+        propertiesList.add(props);
+      } catch (IOException e) {
+        throw new ImpalaRuntimeException(
+            String.format("Unable to read file %s from configuration 
directory: %s",
+                fileName, configFolder_.getAbsolutePath()), e);
+      }
+    }
+    return propertiesList;
+  }
+
+  List<File> listFiles(File configDirectory) {
+    Preconditions.checkState(configDirectory.exists() && 
configDirectory.isDirectory());
+    return Stream.of(configDirectory.listFiles())
+        .filter(file -> !file.isDirectory())
+        .collect(Collectors.toList());
+  }
+
+  Properties readPropertiesFile(File file) throws IOException {
+    Properties props = new Properties();
+    try (InputStream in = Files.newInputStream(file.toPath())) {
+      props.load(in);
+    }
+    return props;
+  }
+
+  private void checkPropertyValue(String configFile, Properties props, String 
key,
+      String expectedValue) {
+    if (!props.containsKey(key)) {
+      throw new IllegalStateException(String.format(
+          "Expected property %s was not specified in config file %s.", key,
+          configFile));
+    }
+    String actualValue = props.getProperty(key);
+    if (!Objects.equals(actualValue, expectedValue)) {
+      throw new IllegalStateException(String.format(
+          "Expected value of '%s' is '%s', but found '%s' in config file %s",
+          key, expectedValue, actualValue, configFile));
+    }
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/service/catalogmanager/FeCatalogManager.java
 
b/fe/src/main/java/org/apache/impala/service/catalogmanager/FeCatalogManager.java
new file mode 100644
index 000000000..481ae45d0
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/service/catalogmanager/FeCatalogManager.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.service.catalogmanager;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.impala.authorization.AuthorizationChecker;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.FeCatalog;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TBackendGflags;
+import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
+import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
+import org.apache.thrift.TException;
+
+/**
+ * Manages the Catalog implementation used by the frontend.
+ *
+ * This class abstracts away the different lifecycles used by the LocalCatalog
+ * and the ImpaladCatalog. The former creates a new instance for each request 
or
+ * query, whereas the latter only creates a new instance upon receiving a full 
update
+ * from the catalogd via the statestore.
+ */
+public abstract class FeCatalogManager {
+
+  protected AtomicReference<? extends AuthorizationChecker> authzChecker_;
+
+  /**
+   * @return the appropriate implementation based on the current backend
+   * configuration.
+   */
+  public static FeCatalogManager createFromBackendConfig() throws 
ImpalaRuntimeException {
+    TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
+    if (cfg.use_local_catalog) {
+      return new LocalImpl();
+    } else {
+      return new CatalogdImpl();
+    }
+  }
+
+  /**
+   * Create a manager which always returns the same instance and does not 
permit
+   * updates from the statestore.
+   */
+  public static FeCatalogManager createForTests(FeCatalog testCatalog) {
+    return new TestImpl(testCatalog);
+  }
+
+  public void setAuthzChecker(
+      AtomicReference<? extends AuthorizationChecker> authzChecker) {
+    authzChecker_ = Preconditions.checkNotNull(authzChecker);
+  }
+
+  /**
+   * @return a Catalog instance to be used for a request or query. Depending
+   * on the catalog implementation this may either be a reused instance or a
+   * fresh one for each query.
+   */
+  public abstract FeCatalog getOrCreateCatalog();
+
+  /**
+   * Update the Catalog based on an update from the state store.
+   *
+   * This can be called either in response to a DDL statement (in which case 
the update
+   * may include just the changed objects related to that DDL) or due to data 
being
+   * published by the state store.
+   *
+   * In the case of the DDL-triggered update, the return value is ignored. In 
the case
+   * of the statestore update, the return value is passed back to the C++ code 
to
+   * indicate the last applied catalog update and used to implement SYNC_DDL.
+   */
+  public abstract TUpdateCatalogCacheResponse updateCatalogCache(
+      TUpdateCatalogCacheRequest req) throws CatalogException, TException;
+}
diff --git 
a/fe/src/main/java/org/apache/impala/service/catalogmanager/LocalImpl.java 
b/fe/src/main/java/org/apache/impala/service/catalogmanager/LocalImpl.java
new file mode 100644
index 000000000..de53c13dd
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/service/catalogmanager/LocalImpl.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.service.catalogmanager;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.RESTException;
+import org.apache.impala.catalog.FeCatalog;
+import org.apache.impala.catalog.local.CatalogdMetaProvider;
+import org.apache.impala.catalog.local.IcebergMetaProvider;
+import org.apache.impala.catalog.local.LocalCatalog;
+import org.apache.impala.catalog.local.MetaProvider;
+import org.apache.impala.catalog.local.MultiMetaProvider;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TBackendGflags;
+import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
+import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation which creates LocalCatalog instances. A new instance is 
created for each
+ * request or query.
+ */
+class LocalImpl extends FeCatalogManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LocalImpl.class);
+  private final MetaProvider provider_;
+
+  public LocalImpl() throws ImpalaRuntimeException {
+    provider_ = getMetaProvider();
+  }
+
+  private MetaProvider getMetaProvider() throws ImpalaRuntimeException {
+    TBackendGflags backendCfg = BackendConfig.INSTANCE.getBackendCfg();
+    String catalogConfigDir = backendCfg.catalog_config_dir;
+    List<MetaProvider> providers = new ArrayList<>();
+    if (backendCfg.catalogd_deployed) {
+      providers.add(new CatalogdMetaProvider(backendCfg));
+    }
+    if (catalogConfigDir != null && !catalogConfigDir.isEmpty()) {
+      File configDir = new File(catalogConfigDir);
+      try {
+        LOG.info("Loading catalog config from {}", configDir);
+        List<MetaProvider> secondaryProviders = 
getSecondaryProviders(configDir);
+        providers.addAll(secondaryProviders);
+      } catch (ImpalaRuntimeException e) {
+        LOG.warn("Unable to load secondary providers from catalog config 
file", e);
+      }
+    }
+    if (providers.isEmpty()) {
+      throw new ImpalaRuntimeException("No metadata providers available");
+    }
+    if (providers.size() == 1) {
+      return providers.get(0);
+    }
+    return new MultiMetaProvider(providers.get(0),
+        providers.subList(1, providers.size()));
+  }
+
+  @Override
+  public FeCatalog getOrCreateCatalog() {
+    if (provider_ instanceof CatalogdMetaProvider) {
+      ((CatalogdMetaProvider) provider_).setAuthzChecker(authzChecker_);
+    }
+    return new LocalCatalog(provider_);
+  }
+
+  @Override
+  public TUpdateCatalogCacheResponse 
updateCatalogCache(TUpdateCatalogCacheRequest req) {
+    if (provider_ instanceof CatalogdMetaProvider) {
+      return ((CatalogdMetaProvider) provider_).updateCatalogCache(req);
+    }
+    if (provider_ instanceof MultiMetaProvider) {
+      MetaProvider primaryProvider = ((MultiMetaProvider) 
provider_).getPrimaryProvider();
+      if (primaryProvider instanceof CatalogdMetaProvider) {
+        return ((CatalogdMetaProvider) 
primaryProvider).updateCatalogCache(req);
+      }
+    }
+    return null;
+  }
+
+  private static List<MetaProvider> getSecondaryProviders(File 
catalogConfigDir)
+      throws ImpalaRuntimeException {
+    ConfigLoader loader = new ConfigLoader(catalogConfigDir);
+    List<MetaProvider> list = new ArrayList<>();
+    for (Properties properties : loader.loadConfigs()) {
+      try {
+        IcebergMetaProvider icebergMetaProvider = new 
IcebergMetaProvider(properties);
+        list.add(icebergMetaProvider);
+      } catch (RESTException e) {
+        LOG.error(String.format(
+            "Unable to instantiate IcebergMetaProvider from the following "
+                + "properties: %s", properties), e);
+      }
+    }
+    return list;
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/service/catalogmanager/TestImpl.java 
b/fe/src/main/java/org/apache/impala/service/catalogmanager/TestImpl.java
new file mode 100644
index 000000000..745d1b47b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/service/catalogmanager/TestImpl.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.service.catalogmanager;
+
+import org.apache.impala.catalog.FeCatalog;
+import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
+import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
+
+/**
+ * Implementation which returns a provided catalog instance, used by tests. No 
updates
+ * from the statestore are permitted.
+ */
+class TestImpl extends FeCatalogManager {
+
+  private final FeCatalog catalog_;
+
+  TestImpl(FeCatalog catalog) {
+    catalog_ = catalog;
+  }
+
+  @Override
+  public FeCatalog getOrCreateCatalog() {
+    return catalog_;
+  }
+
+  @Override
+  public TUpdateCatalogCacheResponse 
updateCatalogCache(TUpdateCatalogCacheRequest req) {
+    throw new IllegalStateException(
+        "Unexpected call to updateCatalogCache() with a test catalog 
instance");
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java 
b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index d544b2f65..6a80c7068 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -59,7 +59,7 @@ import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.service.CompilerFactory;
 import org.apache.impala.service.CompilerFactoryImpl;
-import org.apache.impala.service.FeCatalogManager;
+import org.apache.impala.service.catalogmanager.FeCatalogManager;
 import org.apache.impala.service.Frontend;
 import org.apache.impala.service.FrontendProfile;
 import org.apache.impala.testutil.ImpaladTestCatalog;
diff --git 
a/fe/src/test/java/org/apache/impala/service/catalogmanager/ConfigLoaderTest.java
 
b/fe/src/test/java/org/apache/impala/service/catalogmanager/ConfigLoaderTest.java
new file mode 100644
index 000000000..a1ef153b8
--- /dev/null
+++ 
b/fe/src/test/java/org/apache/impala/service/catalogmanager/ConfigLoaderTest.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.service.catalogmanager;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class ConfigLoaderTest {
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private File tempDir;
+
+  @Before
+  public void setUp() throws Exception {
+    tempDir = tempFolder.newFolder("config");
+  }
+
+  private File createConfigFile(String fileName, String content) throws 
IOException {
+    File configFile = new File(tempDir, fileName);
+    try (FileWriter writer = new FileWriter(configFile)) {
+      writer.write(content);
+    }
+    return configFile;
+  }
+
+  @Test
+  public void testLoadValidConfigs() throws Exception {
+    createConfigFile("valid1.properties",
+        "connector.name=iceberg\niceberg.catalog.type=rest\n");
+    createConfigFile("valid2.properties",
+        "connector.name=iceberg\niceberg.catalog.type=rest\n");
+
+    ConfigLoader loader = new ConfigLoader(tempDir);
+    List<Properties> configs = loader.loadConfigs();
+
+    assertEquals(2, configs.size());
+    assertEquals("iceberg", configs.get(0).getProperty("connector.name"));
+    assertEquals("rest", configs.get(0).getProperty("iceberg.catalog.type"));
+  }
+
+  @Test
+  public void testMissingConnectorNameThrows() throws IOException {
+    createConfigFile("bad.properties", "iceberg.catalog.type=rest\n");
+
+    ConfigLoader loader = new ConfigLoader(tempDir);
+
+    try {
+      loader.loadConfigs();
+      fail("Expected IllegalStateException");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("connector.name"));
+    } catch (ImpalaRuntimeException e) {
+      fail("Expected IllegalStateException");
+    }
+  }
+
+  @Test
+  public void testIncorrectCatalogTypeThrows() throws IOException {
+    createConfigFile("bad.properties",
+        "connector.name=iceberg\niceberg.catalog.type=hive\n");
+
+    ConfigLoader loader = new ConfigLoader(tempDir);
+
+    try {
+      loader.loadConfigs();
+      fail("Expected IllegalStateException");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Expected value of 
'iceberg.catalog.type'"));
+    } catch (ImpalaRuntimeException e) {
+      fail("Expected IllegalStateException");
+    }
+  }
+
+  @Test
+  public void testUnreadableFileThrows() throws IOException {
+    File unreadableFile = createConfigFile("unreadable.properties",
+        "connector.name=iceberg\niceberg.catalog.type=rest\n");
+
+    // Make the file unreadable (on Unix-like systems)
+    assertTrue(unreadableFile.setReadable(false));
+
+    ConfigLoader loader = new ConfigLoader(tempDir);
+
+    try {
+      loader.loadConfigs();
+      fail("Expected ImpalaRuntimeException");
+    } catch (ImpalaRuntimeException e) {
+      assertTrue(e.getMessage().contains("Unable to read file"));
+    }
+  }
+
+  @Test
+  public void testEmptyDirectoryReturnsEmptyList() throws Exception {
+    ConfigLoader loader = new ConfigLoader(tempDir);
+    List<Properties> configs = loader.loadConfigs();
+    assertTrue(configs.isEmpty());
+  }
+}
\ No newline at end of file
diff --git a/java/iceberg-rest-catalog-test/pom.xml 
b/java/iceberg-rest-catalog-test/pom.xml
index 66e6c44cb..b3d70b9db 100644
--- a/java/iceberg-rest-catalog-test/pom.xml
+++ b/java/iceberg-rest-catalog-test/pom.xml
@@ -85,6 +85,12 @@ under the License.
         <version>${iceberg.version}</version>
         <classifier>tests</classifier>
       </dependency>
+
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <version>1.2</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/java/iceberg-rest-catalog-test/src/main/java/org/apache/iceberg/rest/IcebergRestCatalogTest.java
 
b/java/iceberg-rest-catalog-test/src/main/java/org/apache/iceberg/rest/IcebergRestCatalogTest.java
index 5ff996f1a..3073adeb8 100644
--- 
a/java/iceberg-rest-catalog-test/src/main/java/org/apache/iceberg/rest/IcebergRestCatalogTest.java
+++ 
b/java/iceberg-rest-catalog-test/src/main/java/org/apache/iceberg/rest/IcebergRestCatalogTest.java
@@ -23,14 +23,19 @@
 // switch to an open-source Iceberg REST Catalog.
 package org.apache.iceberg.rest;
 
-import java.io.IOException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.Map;
 import java.util.function.Consumer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.rest.responses.ErrorResponse;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.handler.gzip.GzipHandler;
@@ -40,31 +45,46 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class IcebergRestCatalogTest {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergRestCatalogTest.class);
   private static final ObjectMapper MAPPER = RESTObjectMapper.mapper();
 
-  static final int REST_PORT = 9084;
+  private static final int DEFAULT_REST_PORT = 9084;
+  private static final String DEFAULT_HADOOP_CATALOG_LOCATION =
+      "/test-warehouse/iceberg_test/hadoop_catalog";
+  private static final String USAGE_PREFIX = "java -jar 
your-iceberg-rest-catalog.jar";
+
+  private static final String CATALOG_LOCATION_LONGOPT = "catalog-location";
+  private static final String PORT_LONGOPT = "port";
+  private static final String HELP_LONGOPT = "help";
 
   private Server httpServer;
+  private final int port;
+  private final String warehouseLocation;
 
-  public IcebergRestCatalogTest() {}
+  public IcebergRestCatalogTest(int port, String warehouseLocation) {
+    this.port = port;
+    this.warehouseLocation = warehouseLocation;
+  }
 
-  private static String getWarehouseLocation() {
-    String FILESYSTEM_PREFIX = System.getenv("FILESYSTEM_PREFIX");
-    String HADOOP_CATALOG_LOCATION = 
"/test-warehouse/iceberg_test/hadoop_catalog";
-    if (FILESYSTEM_PREFIX != null && !FILESYSTEM_PREFIX.isEmpty()) {
-      return FILESYSTEM_PREFIX + HADOOP_CATALOG_LOCATION;
+  private String getWarehouseLocation() {
+    String filesystemPrefix = System.getenv("FILESYSTEM_PREFIX");
+    if (filesystemPrefix != null && !filesystemPrefix.isEmpty()) {
+      return filesystemPrefix + warehouseLocation;
     }
-    String DEFAULT_FS = System.getenv("DEFAULT_FS");
-    return DEFAULT_FS + HADOOP_CATALOG_LOCATION;
+    String defaultFs = System.getenv("DEFAULT_FS");
+    return defaultFs + warehouseLocation;
   }
 
-  private Catalog initializeBackendCatalog() throws IOException {
+  private Catalog initializeBackendCatalog() {
     Configuration conf = new Configuration();
     conf.set("io-impl", "org.apache.iceberg.hadoop.HadoopFileIO");
-    LOG.info("Default filesystem configured for this Iceberg REST Catalog is " 
+
-        conf.get("fs.defaultFS"));
-    return new HadoopCatalog(conf, getWarehouseLocation());
+    String actualWarehouseLocation = getWarehouseLocation();
+    LOG.info("Initializing Hadoop Catalog at: {}", actualWarehouseLocation);
+    String defaultFs = conf.get("fs.defaultFS");
+    LOG.info("Default filesystem configured for this Iceberg REST Catalog is 
{}",
+        defaultFs);
+    return new HadoopCatalog(conf, actualWarehouseLocation);
   }
 
   public void start(boolean join) throws Exception {
@@ -83,8 +103,7 @@ public class IcebergRestCatalogTest {
         T response =
             super.execute(
                 method, path, queryParams, request, responseType, headers, 
errorHandler);
-        T responseAfterSerialization = roundTripSerialize(response, 
"response");
-        return responseAfterSerialization;
+        return roundTripSerialize(response, "response");
       }
     };
 
@@ -95,9 +114,10 @@ public class IcebergRestCatalogTest {
     context.addServlet(servletHolder, "/*");
     context.insertHandler(new GzipHandler());
 
-    this.httpServer = new Server(REST_PORT);
+    this.httpServer = new Server(port);
     httpServer.setHandler(context);
     httpServer.start();
+    LOG.info("Iceberg REST Catalog started on port: {}", port);
 
     if (join) {
       httpServer.join();
@@ -107,33 +127,84 @@ public class IcebergRestCatalogTest {
   public void stop() throws Exception {
     if (httpServer != null) {
       httpServer.stop();
+      LOG.info("Iceberg REST Catalog stopped.");
     }
   }
 
   public static void main(String[] args) throws Exception {
-    new IcebergRestCatalogTest().start(true);
+    Options options = new Options();
+
+    options.addOption(new Option(null, PORT_LONGOPT, true,
+        "Port for the REST catalog server (default: " + DEFAULT_REST_PORT + 
")"));
+
+    options.addOption(new Option(null, CATALOG_LOCATION_LONGOPT, true,
+        "Base location for the Hadoop catalog (default: "
+            + DEFAULT_HADOOP_CATALOG_LOCATION + ")"));
+
+    options.addOption(new Option(null, HELP_LONGOPT, false, "Display this help 
message"));
+
+    CommandLineParser parser = new BasicParser();
+    HelpFormatter formatter = new HelpFormatter();
+    CommandLine cmd;
+
+    int port = DEFAULT_REST_PORT;
+    String catalogLocation = DEFAULT_HADOOP_CATALOG_LOCATION;
+
+    try {
+      cmd = parser.parse(options, args);
+    } catch (ParseException e) {
+      LOG.error("Error: {}", e.getMessage());
+      formatter.printHelp(USAGE_PREFIX, options);
+      System.exit(1);
+      return;
+    }
+
+    if (cmd.hasOption(HELP_LONGOPT)) {
+      formatter.printHelp(USAGE_PREFIX, options);
+      System.exit(0);
+    }
+
+    if (cmd.hasOption(PORT_LONGOPT)) {
+      try {
+        port = Integer.parseInt(cmd.getOptionValue(PORT_LONGOPT));
+      } catch (NumberFormatException e) {
+        LOG.error("Error: --port requires a valid integer value. Got: {}",
+            cmd.getOptionValue(PORT_LONGOPT));
+        formatter.printHelp(USAGE_PREFIX, options);
+        System.exit(1);
+      }
+    }
+
+    if (cmd.hasOption(CATALOG_LOCATION_LONGOPT)) {
+      catalogLocation = cmd.getOptionValue(CATALOG_LOCATION_LONGOPT);
+    }
+
+    new IcebergRestCatalogTest(port, catalogLocation).start(true);
   }
 
+
   public static <T> T roundTripSerialize(T payload, String description) {
-   if (payload != null) {
-     LOG.trace(payload.toString());
-     try {
-       if (payload instanceof RESTMessage) {
-         return (T) MAPPER.readValue(
-             MAPPER.writeValueAsString(payload), payload.getClass());
-       } else {
-         // use Map so that Jackson doesn't try to instantiate ImmutableMap
-         // from payload.getClass()
-         return (T) MAPPER.readValue(
-             MAPPER.writeValueAsString(payload), Map.class);
-       }
-     } catch (Exception e) {
-       LOG.warn(e.toString());
-       throw new RuntimeException(
-          String.format("Failed to serialize and deserialize %s: %s",
-              description, payload), e);
-     }
-   }
-   return null;
- }
+    if (payload != null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(payload.toString());
+      }
+      try {
+        if (payload instanceof RESTMessage) {
+          return (T) MAPPER.readValue(
+              MAPPER.writeValueAsString(payload), payload.getClass());
+        } else {
+          // use Map so that Jackson doesn't try to instantiate ImmutableMap
+          // from payload.getClass()
+          return (T) MAPPER.readValue(
+              MAPPER.writeValueAsString(payload), Map.class);
+        }
+      } catch (Exception e) {
+        LOG.warn(e.toString());
+        throw new RuntimeException(
+            String.format("Failed to serialize and deserialize %s: %s",
+                description, payload), e);
+      }
+    }
+    return null;
+  }
 }
diff --git a/testdata/bin/run-iceberg-rest-server.sh 
b/testdata/bin/run-iceberg-rest-server.sh
index 49347df96..08bde260f 100755
--- a/testdata/bin/run-iceberg-rest-server.sh
+++ b/testdata/bin/run-iceberg-rest-server.sh
@@ -31,5 +31,5 @@ fi
 CLASSPATH=$(cat $CP_FILE):"$CLASSPATH"
 
 java -cp 
java/iceberg-rest-catalog-test/target/impala-iceberg-rest-catalog-test-${IMPALA_VERSION}.jar:$CLASSPATH
 \
-    org.apache.iceberg.rest.IcebergRestCatalogTest
+    org.apache.iceberg.rest.IcebergRestCatalogTest $@
 
diff --git a/testdata/bin/run-iceberg-rest-server.sh 
b/testdata/configs/catalog_configs/multicatalog_rest_config/rest-1.properties
old mode 100755
new mode 100644
similarity index 60%
copy from testdata/bin/run-iceberg-rest-server.sh
copy to 
testdata/configs/catalog_configs/multicatalog_rest_config/rest-1.properties
index 49347df96..3723f2fe6
--- a/testdata/bin/run-iceberg-rest-server.sh
+++ 
b/testdata/configs/catalog_configs/multicatalog_rest_config/rest-1.properties
@@ -1,5 +1,3 @@
-#!/bin/bash
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -17,19 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-cd $IMPALA_HOME
-. bin/impala-config.sh
-
-CP_FILE="$IMPALA_HOME/java/iceberg-rest-catalog-test/target/build-classpath.txt"
-
-if [ ! -s "$CP_FILE" ]; then
-  >&2 echo Iceberg REST Catalog classpath file $CP_FILE missing.
-  >&2 echo Build java/iceberg-rest-catalog-test first.
-  return 1
-fi
-
-CLASSPATH=$(cat $CP_FILE):"$CLASSPATH"
-
-java -cp 
java/iceberg-rest-catalog-test/target/impala-iceberg-rest-catalog-test-${IMPALA_VERSION}.jar:$CLASSPATH
 \
-    org.apache.iceberg.rest.IcebergRestCatalogTest
+connector.name=iceberg
+iceberg.catalog.type=rest
+iceberg.rest-catalog.uri=http://localhost:9084
 
diff --git a/testdata/bin/run-iceberg-rest-server.sh 
b/testdata/configs/catalog_configs/multicatalog_rest_config/rest-2.properties
old mode 100755
new mode 100644
similarity index 60%
copy from testdata/bin/run-iceberg-rest-server.sh
copy to 
testdata/configs/catalog_configs/multicatalog_rest_config/rest-2.properties
index 49347df96..75d37e200
--- a/testdata/bin/run-iceberg-rest-server.sh
+++ 
b/testdata/configs/catalog_configs/multicatalog_rest_config/rest-2.properties
@@ -1,5 +1,3 @@
-#!/bin/bash
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -17,19 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-cd $IMPALA_HOME
-. bin/impala-config.sh
-
-CP_FILE="$IMPALA_HOME/java/iceberg-rest-catalog-test/target/build-classpath.txt"
-
-if [ ! -s "$CP_FILE" ]; then
-  >&2 echo Iceberg REST Catalog classpath file $CP_FILE missing.
-  >&2 echo Build java/iceberg-rest-catalog-test first.
-  return 1
-fi
-
-CLASSPATH=$(cat $CP_FILE):"$CLASSPATH"
-
-java -cp 
java/iceberg-rest-catalog-test/target/impala-iceberg-rest-catalog-test-${IMPALA_VERSION}.jar:$CLASSPATH
 \
-    org.apache.iceberg.rest.IcebergRestCatalogTest
+connector.name=iceberg
+iceberg.catalog.type=rest
+iceberg.rest-catalog.uri=http://localhost:9085
 
diff --git a/testdata/datasets/functional/functional_schema_template.sql 
b/testdata/datasets/functional/functional_schema_template.sql
index 2fb4af44e..b372fcce0 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3390,6 +3390,20 @@ hadoop fs -put -f 
${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/a
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+airports_parquet_alternative
+---- CREATE
+CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
+STORED AS ICEBERG
+TBLPROPERTIES('write.format.default'='parquet', 
'iceberg.catalog'='hadoop.catalog',
+              
'iceberg.catalog_location'='/test-warehouse/iceberg_test/secondary_hadoop_catalog',
+              'iceberg.table_identifier'='berg.airports_parquet');
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p 
/test-warehouse/iceberg_test/secondary_hadoop_catalog/berg && \
+hadoop fs -put -f 
${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/airports_parquet 
/test-warehouse/iceberg_test/secondary_hadoop_catalog/berg
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 iceberg_resolution_test_external
 ---- CREATE
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
diff --git a/testdata/datasets/functional/schema_constraints.csv 
b/testdata/datasets/functional/schema_constraints.csv
index ba384ebf1..ec1d997e0 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -66,6 +66,7 @@ table_name:hudi_as_parquet, constraint:restrict_to, 
table_format:parquet/none/no
 # Iceberg tests are executed in the PARQUET file format dimension
 table_name:airports_orc, constraint:restrict_to, table_format:parquet/none/none
 table_name:airports_parquet, constraint:restrict_to, 
table_format:parquet/none/none
+table_name:airports_parquet_alternative, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:complextypestbl_iceberg_orc, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:hadoop_catalog_test_external, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:iceberg_int_partitioned, constraint:restrict_to, 
table_format:parquet/none/none
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-multicatalog.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multicatalog.test
new file mode 100644
index 000000000..9012a8957
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multicatalog.test
@@ -0,0 +1,45 @@
+====
+---- QUERY
+SHOW DATABASES like "ice";
+---- RESULTS: VERIFY_IS_SUBSET
+'ice',''
+---- TYPES
+STRING, STRING
+====
+---- QUERY
+SHOW DATABASES like "functional_parquet";
+---- RESULTS: VERIFY_IS_SUBSET
+'functional_parquet',''
+---- TYPES
+STRING, STRING
+====
+---- QUERY
+USE ice;
+====
+---- QUERY
+SELECT lat FROM airports_parquet WHERE iata = '00R';
+---- RESULTS
+30.68586111
+---- TYPES
+DOUBLE
+====
+---- QUERY
+SELECT * FROM functional_parquet.alltypes WHERE id = 3650
+---- RESULTS
+3650,true,0,0,0,0,0,0,'01/01/10','0',2010-01-01 00:00:00,2010,1
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP, INT, INT
+====
+---- QUERY
+SELECT p_float, string_col from ice.iceberg_alltypes_part ice_alltypes INNER 
JOIN functional_parquet.alltypes alltypes ON alltypes.id = ice_alltypes.i
+---- RESULTS
+1.100000023841858,'2'
+1.100000023841858,'1'
+---- TYPES
+FLOAT, STRING
+====
+---- QUERY
+INSERT INTO ice.iceberg_alltypes_part(i) VALUES (567)
+---- CATCH
+AnalysisException: Write not supported. Table ice.iceberg_alltypes_part  
access type is: READONLY
+====
\ No newline at end of file
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-rest-catalogs-ambiguous-name.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-rest-catalogs-ambiguous-name.test
new file mode 100644
index 000000000..8b51515af
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-rest-catalogs-ambiguous-name.test
@@ -0,0 +1,6 @@
+====
+---- QUERY
+select * from ice.iceberg_v2_positional_not_all_data_files_have_delete_files;
+---- CATCH
+TException: Ambiguous table name
+====
\ No newline at end of file
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-rest-catalogs.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-rest-catalogs.test
new file mode 100644
index 000000000..bdc8da05a
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-rest-catalogs.test
@@ -0,0 +1,40 @@
+====
+---- QUERY
+SHOW DATABASES LIKE "ice";
+---- RESULTS: VERIFY_IS_SUBSET
+'ice',''
+---- TYPES
+STRING, STRING
+====
+---- QUERY
+SHOW DATABASES LIKE "berg";
+---- RESULTS: VERIFY_IS_SUBSET
+'berg',''
+---- TYPES
+STRING, STRING
+====
+---- QUERY
+USE ice;
+====
+---- QUERY
+SELECT lat FROM berg.airports_parquet WHERE iata = '00R';
+---- RESULTS
+30.68586111
+---- TYPES
+DOUBLE
+====
+---- QUERY
+SELECT ice_air.iata, ice_air.lat, berg_air.lon
+FROM ice.airports_parquet ice_air
+INNER JOIN berg.airports_parquet berg_air
+ON berg_air.iata = ice_air.iata AND berg_air.iata = "00M"
+---- RESULTS
+'00M',31.95376472,-89.2345047
+---- TYPES
+STRING, DOUBLE, DOUBLE
+====
+---- QUERY
+INSERT INTO berg.airports_parquet(lat) VALUES (32.1)
+---- CATCH
+AnalysisException: Write not supported. Table berg.airports_parquet  access 
type is: READONLY
+====
\ No newline at end of file
diff --git a/tests/common/iceberg_rest_server.py 
b/tests/common/iceberg_rest_server.py
index b82ab0d54..84bbad290 100644
--- a/tests/common/iceberg_rest_server.py
+++ b/tests/common/iceberg_rest_server.py
@@ -24,8 +24,8 @@ import socket
 import sys
 import time
 
-REST_SERVER_PORT = 9084
 IMPALA_HOME = os.environ['IMPALA_HOME']
+IMPALA_CLUSTER_LOGS = os.environ['IMPALA_CLUSTER_LOGS_DIR']
 LOG = logging.getLogger('impala_test_suite')
 
 
@@ -34,36 +34,62 @@ class IcebergRestServer(object):
   Utility class for starting and stopping our minimal Iceberg REST server.
   """
 
-  def start_rest_server(self, timeout_s):
-    self.process = subprocess.Popen('testdata/bin/run-iceberg-rest-server.sh',
-        stdout=sys.stdout, stderr=sys.stderr, shell=True,
+  DEFAULT_REST_SERVER_PORT = 9084
+  DEFAULT_CATALOG_LOCATION = "/test-warehouse/iceberg_test/hadoop_catalog"
+  LOG_PATTERN = "%s/iceberg-rest-server.%d.%s"
+
+  def __init__(self, port=DEFAULT_REST_SERVER_PORT,
+              catalog_location=DEFAULT_CATALOG_LOCATION):
+    self.port = port
+    self.catalog_location = catalog_location
+    self.process = None
+    self.stdout = None
+    self.stderr = None
+
+  def start_rest_server(self, timeout_s=60):
+    start_time = time.strftime("%Y%m%d-%H%M%S")
+    log_pattern = self.LOG_PATTERN % (IMPALA_CLUSTER_LOGS, self.port, 
start_time)
+    self.stdout = open(log_pattern + '.stdout', 'w')
+    self.stderr = open(log_pattern + '.stderr', 'w')
+    self.process = subprocess.Popen(
+        ['testdata/bin/run-iceberg-rest-server.sh', "--port",
+          str(self.port), "--catalog-location", self.catalog_location],
+        stdout=self.stdout, stderr=self.stderr,
         preexec_fn=os.setsid, cwd=IMPALA_HOME)
     self._wait_for_rest_server_to_start(timeout_s)
 
-  def stop_rest_server(self, timeout_s):
-    if self.process:
-      os.killpg(self.process.pid, signal.SIGTERM)
-      self._wait_for_rest_server_to_be_killed(timeout_s)
+  def stop_rest_server(self, timeout_s=60):
+    try:
+      if self.process:
+        os.killpg(self.process.pid, signal.SIGTERM)
+        self._wait_for_rest_server_to_be_killed(timeout_s)
+    except Exception as e:
+      LOG.error("An error occurred while stopping the Iceberg REST server: 
%s", e)
+    finally:
+      if self.stdout:
+        self.stdout.close()
+      if self.stderr:
+        self.stderr.close()
 
   def _wait_for_rest_server_to_start(self, timeout_s):
     sleep_interval_s = 0.5
     start_time = time.time()
     while time.time() - start_time < timeout_s:
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-      if s.connect_ex(('localhost', REST_SERVER_PORT)) == 0:
+      if s.connect_ex(('localhost', self.port)) == 0:
         LOG.info("Iceberg REST server is available.")
         return
       s.close()
       time.sleep(sleep_interval_s)
-    raise Exception(
-        "Webserver did not become available within {} 
seconds.".format(timeout_s))
+    raise Exception("Webserver did not become available within {} "
+        "seconds.".format(timeout_s))
 
   def _wait_for_rest_server_to_be_killed(self, timeout_s):
     sleep_interval_s = 0.5
     start_time = time.time()
     while time.time() - start_time < timeout_s:
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-      if s.connect_ex(('localhost', REST_SERVER_PORT)) != 0:
+      if s.connect_ex(('localhost', self.port)) != 0:
         LOG.info("Iceberg REST server has stopped.")
         return
       s.close()
diff --git a/tests/custom_cluster/test_iceberg_rest_catalog.py 
b/tests/custom_cluster/test_iceberg_rest_catalog.py
index 26fa484aa..59d672aaa 100644
--- a/tests/custom_cluster/test_iceberg_rest_catalog.py
+++ b/tests/custom_cluster/test_iceberg_rest_catalog.py
@@ -22,16 +22,102 @@ import pytest
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite, 
HIVE_CONF_DIR
 from tests.common.iceberg_rest_server import IcebergRestServer
 
-REST_SERVER_PORT = 9084
+
 IMPALA_HOME = os.environ['IMPALA_HOME']
-START_ARGS = 'start_args'
-IMPALAD_ARGS = """--use_local_catalog=true --catalogd_deployed=false
+NO_CATALOGD_STARTARGS = '--no_catalogd'
+REST_STANDALONE_IMPALAD_ARGS = """--use_local_catalog=true 
--catalogd_deployed=false
+    
--catalog_config_dir={}/testdata/configs/catalog_configs/iceberg_rest_config"""\
+        .format(IMPALA_HOME)
+MULTICATALOG_IMPALAD_ARGS = """--use_local_catalog=true
     
--catalog_config_dir={}/testdata/configs/catalog_configs/iceberg_rest_config"""\
         .format(IMPALA_HOME)
+MULTIPLE_REST_IMPALAD_ARGS = """--use_local_catalog=true
+    
--catalog_config_dir={}/testdata/configs/catalog_configs/multicatalog_rest_config"""\
+        .format(IMPALA_HOME)
+MULTIPLE_REST_WITHOUT_CATALOGD_IMPALAD_ARGS = """--use_local_catalog=true
+    --catalogd_deployed=false \
+    
--catalog_config_dir={}/testdata/configs/catalog_configs/multicatalog_rest_config"""\
+        .format(IMPALA_HOME)
+MULTICATALOG_CATALOGD_ARGS = "--catalog_topic_mode=minimal"
+
+
+def RestServerProperties(*server_configs):
+  """
+  Annotation to specify configurations for multiple REST servers to be started.
+  Each argument should be a dictionary with 'port' and 'catalog_location' keys.
+  Example:
+  @RestServerProperties({'port': 9085}, {'port': 9086, 'catalog_location': 
'/tmp/cat2'})
+  """
+  def decorator(func):
+    func.rest_server_configs = list(server_configs)
+    return func
+  return decorator
+
+
+class IcebergRestCatalogTests(CustomClusterTestSuite):
+  """Base class for Iceberg REST Catalog tests."""
+  def setup_method(self, method):
+    args = method.__dict__
+    if HIVE_CONF_DIR in args:
+      raise Exception("Cannot specify HIVE_CONF_DIR because the tests of this 
class are "
+          "running without Hive.")
+    self.servers = []
+
+    server_configs = getattr(method, 'rest_server_configs', None)
+
+    if server_configs:
+      for config in server_configs:
+        port = config.get('port', IcebergRestServer.DEFAULT_REST_SERVER_PORT)
+        catalog_location = config.get('catalog_location',
+            IcebergRestServer.DEFAULT_CATALOG_LOCATION)
+        print("Starting REST server with annotation properties: "
+              "Port=%s, Catalog Location=%s" % (port, catalog_location))
+        self.servers.append(IcebergRestServer(port, catalog_location))
+
+    try:
+      for server in self.servers:
+        server.start_rest_server(300)
+      super(IcebergRestCatalogTests, self).setup_method(method)
+      # At this point we can create the Impala clients that we will need.
+      self.create_impala_clients()
+    except Exception as e:
+      for server in self.servers:
+        server.stop_rest_server(10)
+      raise e
 
+  def teardown_method(self, method):
+    for server in self.servers:
+      server.stop_rest_server()
+    super(IcebergRestCatalogTests, self).teardown_method(method)
 
-class TestIcebergRestCatalog(CustomClusterTestSuite):
-  """Test suite for Iceberg REST Catalog."""
+
+class TestIcebergRestCatalogWithHms(IcebergRestCatalogTests):
+  """Test suite for Iceberg REST Catalog. HMS running while tests are 
running"""
+  @RestServerProperties({'port': 9084})
+  @CustomClusterTestSuite.with_args(
+     impalad_args=MULTICATALOG_IMPALAD_ARGS,
+     catalogd_args=MULTICATALOG_CATALOGD_ARGS)
+  @pytest.mark.execute_serially
+  def test_rest_catalog_multicatalog(self, vector):
+    self.run_test_case('QueryTest/iceberg-multicatalog',
+                       vector, use_db="ice")
+
+  @RestServerProperties(
+    {'port': 9084},
+    {'port': 9085,
+     'catalog_location': 
'/test-warehouse/iceberg_test/secondary_hadoop_catalog'}
+  )
+  @CustomClusterTestSuite.with_args(
+     impalad_args=MULTIPLE_REST_IMPALAD_ARGS,
+     catalogd_args=MULTICATALOG_CATALOGD_ARGS)
+  @pytest.mark.execute_serially
+  def test_multiple_rest_catalogs(self, vector):
+    self.run_test_case('QueryTest/iceberg-multiple-rest-catalogs',
+                       vector, use_db="ice")
+
+
+class TestIcebergRestCatalogNoHms(IcebergRestCatalogTests):
+  """Test suite for Iceberg REST Catalog. HMS is stopped while tests are 
running"""
 
   @classmethod
   def need_default_clients(cls):
@@ -40,13 +126,7 @@ class TestIcebergRestCatalog(CustomClusterTestSuite):
 
   @classmethod
   def setup_class(cls):
-    super(TestIcebergRestCatalog, cls).setup_class()
-    try:
-      cls.iceberg_rest_server = IcebergRestServer()
-      cls.iceberg_rest_server.start_rest_server(300)
-    except Exception as e:
-      cls.iceberg_rest_server.stop_rest_server(10)
-      raise e
+    super(TestIcebergRestCatalogNoHms, cls).setup_class()
 
     try:
       cls._stop_hive_service()
@@ -57,29 +137,41 @@ class TestIcebergRestCatalog(CustomClusterTestSuite):
   @classmethod
   def teardown_class(cls):
     cls.cleanup_infra_services()
-    return super(TestIcebergRestCatalog, cls).teardown_class()
+    return super(TestIcebergRestCatalogNoHms, cls).teardown_class()
 
   @classmethod
   def cleanup_infra_services(cls):
-    cls.iceberg_rest_server.stop_rest_server(10)
     cls._start_hive_service(None)
 
-  def setup_method(self, method):
-    args = method.__dict__
-    if HIVE_CONF_DIR in args:
-      raise Exception("Cannot specify HIVE_CONF_DIR because the tests of this 
class are "
-          "running without Hive.")
-    # Invoke start-impala-cluster.py with '--no_catalogd'
-    start_args = "--no_catalogd"
-    if START_ARGS in args:
-      start_args = args[START_ARGS] + " " + start_args
-    args[START_ARGS] = start_args
-
-    super(TestIcebergRestCatalog, self).setup_method(method)
-    # At this point we can create the Impala clients that we will need.
-    self.create_impala_clients()
-
-  @CustomClusterTestSuite.with_args(impalad_args=IMPALAD_ARGS)
+  @RestServerProperties({'port': 9084})
+  @CustomClusterTestSuite.with_args(
+     impalad_args=REST_STANDALONE_IMPALAD_ARGS,
+     start_args=NO_CATALOGD_STARTARGS)
   @pytest.mark.execute_serially
   def test_rest_catalog_basic(self, vector):
     self.run_test_case('QueryTest/iceberg-rest-catalog', vector, use_db="ice")
+
+  @RestServerProperties(
+    {'port': 9084},
+    {'port': 9085,
+     'catalog_location': 
'/test-warehouse/iceberg_test/secondary_hadoop_catalog'}
+  )
+  @CustomClusterTestSuite.with_args(
+     impalad_args=MULTIPLE_REST_WITHOUT_CATALOGD_IMPALAD_ARGS,
+     start_args=NO_CATALOGD_STARTARGS)
+  @pytest.mark.execute_serially
+  def test_multiple_rest_catalogs_without_catalogd(self, vector):
+    self.run_test_case('QueryTest/iceberg-multiple-rest-catalogs',
+                       vector, use_db="ice")
+
+  @RestServerProperties(
+    {'port': 9084},
+    {'port': 9085}
+  )
+  @CustomClusterTestSuite.with_args(
+     impalad_args=MULTIPLE_REST_WITHOUT_CATALOGD_IMPALAD_ARGS,
+     start_args=NO_CATALOGD_STARTARGS)
+  @pytest.mark.execute_serially
+  def test_multiple_rest_catalogs_with_ambiguous_tables(self, vector):
+    
self.run_test_case('QueryTest/iceberg-multiple-rest-catalogs-ambiguous-name',
+                       vector, use_db="ice")

Reply via email to