This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bd3486c05101b68e759d30d75e0902c40f73b08e
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Fri Dec 20 14:48:11 2024 +0100

    IMPALA-13586: Initial support for Iceberg REST Catalogs
    
    This patch adds initial support for Iceberg REST Catalogs. This means
    now it's possible to run an Impala cluster without the Hive Metastore,
    and without the Impala CatalogD. Impala Coordinators can directly
    connect to an Iceberg REST server and fetch metadata for databases and
    tables from there. The support is read-only, i.e. DDL and DML statements
    are not supported yet.
    
    This was initially developed in the context of a company Hackathon
    program, i.e. it was a team effort that I squashed into a single commit
    and polished the code a bit.
    
    The Hackathon team members were:
    * Daniel Becker
    * Gabor Kaszab
    * Kurt Deschler
    * Peter Rozsa
    * Zoltan Borok-Nagy
    
    The Iceberg REST Catalog support can be configured via a Java properties
    file, the location of it can be specified via:
     --catalog_config_dir: Directory of configuration files
    
    Currently only one configuration file can be in the direcory as we only
    support a single Catalog at a time. The following properties are mandatory
    in the config file:
    * connector.name=iceberg
    * iceberg.catalog.type=rest
    * iceberg.rest-catalog.uri
    
    The first two properties can only be 'iceberg' and 'rest' for now, they
    are needed for extensibility in the future.
    
    Moreover, Impala Daemons need to specify the following flags to connect
    to an Iceberg REST Catalog:
     --use_local_catalog=true
     --catalogd_deployed=false
    
    Testing
    * e2e added to test basic functionlity with against a custom-built
      Iceberg REST server that delegates to HadoopCatalog under the hood
    * Further testing, e.g. Ranger tests are expected in subsequent
      commits
    
    TODO:
    * manual testing against Polaris / Lakekeeper, we could add automated
      tests in a later patch
    
    Change-Id: I1722b898b568d2f5689002f2b9bef59320cb088c
    Reviewed-on: http://gerrit.cloudera.org:8080/22353
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/common/global-flags.cc                      |   3 +
 be/src/service/frontend.cc                         |   5 +
 be/src/service/frontend.h                          |   4 +
 be/src/service/impala-http-handler.cc              |  18 +
 be/src/service/impala-server.cc                    |  19 +-
 be/src/util/backend-gflag-util.cc                  |   4 +
 bin/start-impala-cluster.py                        |  10 +-
 common/thrift/BackendGflags.thrift                 |   4 +
 common/thrift/Frontend.thrift                      |   5 +
 .../org/apache/impala/catalog/FeIcebergTable.java  |   4 +-
 .../impala/catalog/IcebergContentFileStore.java    |   2 +-
 .../catalog/iceberg/GroupedContentFiles.java       |   2 +-
 .../impala/catalog/iceberg/IcebergRESTCatalog.java | 161 ++++++
 .../impala/catalog/local/CatalogdMetaProvider.java |   4 +
 .../impala/catalog/local/DirectMetaProvider.java   |   4 +
 .../impala/catalog/local/IcebergMetaProvider.java  | 600 +++++++++++++++++++++
 .../apache/impala/catalog/local/LocalCatalog.java  |   4 +
 .../apache/impala/catalog/local/MetaProvider.java  |   1 +
 .../apache/impala/service/FeCatalogManager.java    |  95 +++-
 .../java/org/apache/impala/service/Frontend.java   |  32 +-
 .../org/apache/impala/service/JniFrontend.java     |  20 +-
 .../java/org/apache/impala/util/IcebergUtil.java   |  10 +-
 java/iceberg-rest-catalog-test/pom.xml             | 107 ++++
 .../iceberg/rest/IcebergRestCatalogTest.java       | 136 +++++
 java/pom.xml                                       |   1 +
 testdata/bin/minicluster_trino/Dockerfile          |   2 +-
 .../{Dockerfile => iceberg_rest.properties}        |  21 +-
 .../Dockerfile => run-iceberg-rest-server.sh}      |  23 +-
 .../iceberg_rest_config/rest.properties}           |  18 +-
 .../queries/QueryTest/iceberg-rest-catalog.test    |  90 ++++
 tests/common/custom_cluster_test_suite.py          |   2 +
 tests/common/impala_cluster.py                     |   8 +-
 tests/custom_cluster/test_iceberg_rest_catalog.py  |  90 ++++
 www/catalog.tmpl                                   |  18 +
 34 files changed, 1454 insertions(+), 73 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 0f10aa687..07a4697d2 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -430,6 +430,9 @@ DEFINE_bool(iceberg_always_allow_merge_on_read_operations, 
false, "Impala can on
     "executing DELETE, UPDATE and MERGE operations on Iceberg tables even if 
the table "
     "property is 'copy-on-write'.");
 
+DEFINE_string(catalog_config_dir, "", "Directory of configuration files of 
external "
+    "catalogs, e.g. Iceberg REST Catalog.");
+
 // Host and port of Statestore Service
 DEFINE_string(state_store_host, "localhost",
     "hostname where StatestoreService is running");
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index 22f3cdb77..50c701270 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -121,6 +121,7 @@ Frontend::Frontend() {
     {"checkConfiguration", "()Ljava/lang/String;", &check_config_id_},
     {"updateCatalogCache", "([B)[B", &update_catalog_cache_id_},
     {"updateExecutorMembership", "([B)V", &update_membership_id_},
+    {"getCatalogInfo", "()[B", &get_catalog_info_},
     {"getCatalogMetrics", "()[B", &get_catalog_metrics_id_},
     {"getTableNames", "([B)[B", &get_table_names_id_},
     {"getMetadataTableNames", "([B)[B", &get_metadata_table_names_id_},
@@ -258,6 +259,10 @@ Status Frontend::GetMetadataTableNames(const string& db, 
const string& table_nam
       metadata_table_names);
 }
 
+Status Frontend::GetCatalogInfo(TGetCatalogInfoResult* catalog_info) {
+  return JniUtil::CallJniMethod(fe_, get_catalog_info_, catalog_info);
+}
+
 Status Frontend::GetDbs(const string* pattern, const TSessionState* session,
     TGetDbsResult* dbs) {
   TGetDbsParams params;
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index e2f0b551f..e5780305d 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -89,6 +89,9 @@ class Frontend {
       const string* pattern, const TSessionState* session,
       TGetTablesResult* metadata_table_names);
 
+  /// Return list of catalog info strings
+  Status GetCatalogInfo(TGetCatalogInfoResult* catalog_info);
+
   /// Return all databases matching the optional argument 'pattern'.
   /// If pattern is NULL, match all databases otherwise match only those 
databases that
   /// match the pattern string. Patterns are "p1|p2|p3" where | denotes choice,
@@ -264,6 +267,7 @@ class Frontend {
   jmethodID check_config_id_; // JniFrontend.checkConfiguration()
   jmethodID update_catalog_cache_id_; // 
JniFrontend.updateCatalogCache(byte[][])
   jmethodID update_membership_id_; // JniFrontend.updateExecutorMembership()
+  jmethodID get_catalog_info_; // JniFrontend.getCatalogInfo()
   jmethodID get_catalog_metrics_id_; // JniFrontend.getCatalogMetrics()
   jmethodID get_table_names_id_; // JniFrontend.getTableNames
   jmethodID get_metadata_table_names_id_; // JniFrontend.getMetadataTableNames
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index cacfa0e89..71fe8e4da 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -993,6 +993,24 @@ void ImpalaHttpHandler::CatalogHandler(const 
Webserver::WebRequest& req,
     return;
   }
 
+  TGetCatalogInfoResult catalog_info;
+  status = server_->exec_env_->frontend()->GetCatalogInfo(&catalog_info);
+
+  if (!status.ok()) {
+    Value error(status.GetDetail().c_str(), document->GetAllocator());
+    document->AddMember("error", error, document->GetAllocator());
+    return;
+  }
+
+  Value info(kArrayType);
+  for (const string& str: catalog_info.info) {
+    Value str_val(str.c_str(), document->GetAllocator());
+    Value value(kObjectType);
+    value.AddMember("value", str_val, document->GetAllocator());
+    info.PushBack(value, document->GetAllocator());
+  }
+  document->AddMember("info", info, document->GetAllocator());
+
   Value databases(kArrayType);
   for (const TDatabase& db: get_dbs_result.dbs) {
     Value database(kObjectType);
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ad0f7463f..ffee7dfc3 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -291,6 +291,10 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala 
daemon can accept and co
     "queries from clients. If false, it will refuse client connections.");
 DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query 
"
     "fragments.");
+
+DEFINE_bool(catalogd_deployed, true, "If false, Impala daemon doesn't expect 
Catalog "
+    "Daemon to be present.");
+
 DEFINE_string(executor_groups, "",
     "List of executor groups, separated by comma. Each executor group 
specification can "
     "optionally contain a minimum size, separated by a ':', e.g. 
--executor_groups "
@@ -576,7 +580,8 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
   ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env_->metrics()));
 
   // Register the catalog update callback if running in a real cluster as a 
coordinator.
-  if ((!TestInfo::is_test() || TestInfo::is_be_cluster_test()) && 
FLAGS_is_coordinator) {
+  if ((!TestInfo::is_test() || TestInfo::is_be_cluster_test()) && 
FLAGS_is_coordinator &&
+      FLAGS_catalogd_deployed) {
     auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
         vector<TTopicDelta>* topic_updates) {
       CatalogUpdateCallback(state, topic_updates);
@@ -1695,6 +1700,7 @@ void ImpalaServer::CloseClientRequestState(const 
QueryHandle& query_handle) {
 }
 
 Status ImpalaServer::UpdateCatalogMetrics() {
+  if (!FLAGS_catalogd_deployed) return Status::OK();
   TGetCatalogMetricsResult metrics;
   RETURN_IF_ERROR(exec_env_->frontend()->GetCatalogMetrics(&metrics));
   ImpaladMetrics::CATALOG_NUM_DBS->SetValue(metrics.num_dbs);
@@ -2233,8 +2239,7 @@ bool ImpalaServer::IsAuthorizedProxyUser(const string& 
user) {
       != authorized_proxy_group_config_.end();
 }
 
-void ImpalaServer::CatalogUpdateVersionInfo::UpdateCatalogVersionMetrics()
-{
+void ImpalaServer::CatalogUpdateVersionInfo::UpdateCatalogVersionMetrics() {
   ImpaladMetrics::CATALOG_VERSION->SetValue(catalog_version);
   ImpaladMetrics::CATALOG_OBJECT_VERSION_LOWER_BOUND->SetValue(
       catalog_object_version_lower_bound);
@@ -2245,6 +2250,7 @@ void 
ImpalaServer::CatalogUpdateVersionInfo::UpdateCatalogVersionMetrics()
 void ImpalaServer::CatalogUpdateCallback(
     const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
     vector<TTopicDelta>* subscriber_topic_updates) {
+  DCHECK(FLAGS_catalogd_deployed);
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
       incoming_topic_deltas.find(CatalogServer::IMPALA_CATALOG_TOPIC);
   if (topic == incoming_topic_deltas.end()) return;
@@ -2304,6 +2310,7 @@ static inline void 
MarkTimelineEvent(RuntimeProfile::EventSequence* timeline,
 
 void ImpalaServer::WaitForCatalogUpdate(const int64_t catalog_update_version,
     const TUniqueId& catalog_service_id, RuntimeProfile::EventSequence* 
timeline) {
+  DCHECK(FLAGS_catalogd_deployed);
   unique_lock<mutex> unique_lock(catalog_version_lock_);
   // Wait for the update to be processed locally.
   VLOG_QUERY << "Waiting for catalog version: " << catalog_update_version
@@ -3138,6 +3145,12 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t 
hs2_port,
   RETURN_IF_ERROR(exec_env_->StartStatestoreSubscriberService());
 
   kudu::Version target_schema_version;
+
+  if (FLAGS_enable_workload_mgmt && !FLAGS_catalogd_deployed) {
+    // TODO(IMPALA-13830): Enable workload management in lightweight 
deployments.
+    return Status("Workload management needs CatalogD to be deployed.");
+  }
+
   if (FLAGS_is_coordinator) {
     if (FLAGS_enable_workload_mgmt) {
       
ABORT_IF_ERROR(workloadmgmt::ParseSchemaVersionFlag(&target_schema_version));
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index cff239fa7..8b8d631fd 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -114,6 +114,8 @@ DECLARE_bool(enable_json_scanner);
 DECLARE_bool(iceberg_allow_datafiles_in_table_location_only);
 DECLARE_bool(iceberg_always_allow_merge_on_read_operations);
 DECLARE_bool(enable_reading_puffin_stats);
+DECLARE_bool(catalogd_deployed);
+DECLARE_string(catalog_config_dir);
 DECLARE_int32(catalog_operation_log_size);
 DECLARE_string(hostname);
 DECLARE_bool(allow_catalog_cache_op_from_masked_users);
@@ -478,6 +480,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
       FLAGS_iceberg_always_allow_merge_on_read_operations);
   cfg.__set_enable_reading_puffin_stats(
       FLAGS_enable_reading_puffin_stats);
+  cfg.__set_catalogd_deployed(FLAGS_catalogd_deployed);
+  cfg.__set_catalog_config_dir(FLAGS_catalog_config_dir);
   cfg.__set_max_filter_error_rate_from_full_scan(
       FLAGS_max_filter_error_rate_from_full_scan);
   cfg.__set_catalog_operation_log_size(FLAGS_catalog_operation_log_size);
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 292604b3f..414c40910 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -169,6 +169,9 @@ parser.add_option("--enable_catalogd_ha", 
dest="enable_catalogd_ha",
                   action="store_true", default=False,
                   help="If true, enables CatalogD HA - the cluster will be 
launched "
                   "with two catalogd instances as Active-Passive HA pair.")
+parser.add_option("--no_catalogd", dest="no_catalogd",
+                  action="store_true", default=False,
+                  help="If true, there will be no CatalogD.")
 parser.add_option("--jni_frontend_class", dest="jni_frontend_class",
                   action="store", default="",
                   help="Use a custom java frontend interface.")
@@ -753,7 +756,8 @@ class MiniClusterOperations(object):
   """
   def get_cluster(self):
     """Return an ImpalaCluster instance."""
-    return 
ImpalaCluster(use_admission_service=options.enable_admission_service)
+    return 
ImpalaCluster(use_admission_service=options.enable_admission_service,
+        deploy_catalogd=not options.no_catalogd)
 
   def kill_all_daemons(self, force=False):
     kill_matching_processes(["catalogd", "impalad", "statestored", 
"admissiond"], force)
@@ -789,7 +793,9 @@ class MiniClusterOperations(object):
                            " for more details.")
 
   def start_catalogd(self):
-    if options.enable_catalogd_ha:
+    if options.no_catalogd:
+      num_catalogd = 0
+    elif options.enable_catalogd_ha:
       num_catalogd = 2
     else:
       num_catalogd = 1
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index c04e08754..dd2fc423d 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -318,4 +318,8 @@ struct TBackendGflags {
   143: required string injected_group_members_debug_only
 
   144: required i32 hms_event_sync_sleep_interval_ms
+
+  145: required bool catalogd_deployed
+
+  146: required string catalog_config_dir
 }
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 15007ae2a..770017505 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -104,6 +104,11 @@ struct TGetTablesResult {
   1: list<string> tables
 }
 
+// getCatalogInfo returns a list of catalog info strings
+struct TGetCatalogInfoResult {
+  1: list<string> info
+}
+
 // Arguments to getTableMetrics, which returns the metrics of a specific table.
 struct TGetTableMetricsParams {
   1: required CatalogObjects.TTableName table_name
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index d66bc3e5a..7049e0e1e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -376,7 +376,9 @@ public interface FeIcebergTable extends FeFsTable {
     if (getTTableStats().getNum_rows() < 0) {
       getTTableStats().setNum_rows(Utils.calculateNumRows(this));
     }
-    getTTableStats().setTotal_file_bytes(Utils.calculateFileSizeInBytes(this));
+    if (getTTableStats().getTotal_file_bytes() <= 0) {
+      
getTTableStats().setTotal_file_bytes(Utils.calculateFileSizeInBytes(this));
+    }
   }
 
   static void setIcebergStorageDescriptor(
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java 
b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
index 10345f57a..f7b381d43 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergContentFileStore.java
@@ -363,4 +363,4 @@ public class IcebergContentFileStore {
         new HashSet<>(tFileStore.getMissing_files()) : Collections.emptySet();
     return ret;
   }
-}
+}
\ No newline at end of file
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
index 2a3d6c544..67f03a6f6 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/GroupedContentFiles.java
@@ -81,4 +81,4 @@ public class GroupedContentFiles {
   public boolean isEmpty() {
     return Iterables.isEmpty(getAllContentFiles());
   }
-}
+}
\ No newline at end of file
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java
new file mode 100644
index 000000000..5bfa7a51c
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergRESTCatalog.java
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.iceberg;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.IcebergTableLoadingException;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.util.IcebergUtil;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Implementation of IcebergCatalog for tables stored in HadoopCatalog.
+ */
+public class IcebergRESTCatalog implements IcebergCatalog {
+  private static final String KEY_URI = "iceberg.rest-catalog.uri";
+  private static final String KEY_NAME = "iceberg.rest-catalog.name";
+  private static final String KEY_CLIENT_ID = "iceberg.rest-catalog.client-id";
+  private static final String KEY_CLIENT_SECRET = 
"iceberg.rest-catalog.client-secret";
+  private static final String KEY_WAREHOUSE = "iceberg.rest-catalog.warehouse";
+
+  private final String REST_URI;
+
+  private static IcebergRESTCatalog instance_;
+  private final RESTCatalog restCatalog_;
+
+  public synchronized static IcebergRESTCatalog getInstance(
+      Properties properties) {
+    if (instance_ == null) {
+      instance_ = new IcebergRESTCatalog(properties);
+    }
+    return instance_;
+  }
+
+  private IcebergRESTCatalog(Properties properties) {
+    setContextClassLoader();
+
+    REST_URI = getRequiredProperty(properties, KEY_URI);
+    final String CATALOG_NAME = properties.getProperty(KEY_NAME, "");
+    final String CLIENT_ID = properties.getProperty(KEY_CLIENT_ID, "impala");
+    final String CLIENT_SECRET = properties.getProperty(KEY_CLIENT_SECRET, "");
+    final String CLIENT_CREDS = CLIENT_ID + ":" + CLIENT_SECRET;
+    final String WAREHOUSE_LOCATION = properties.getProperty(KEY_WAREHOUSE, 
"");
+
+    SessionCatalog.SessionContext context =
+        new SessionCatalog.SessionContext(
+            UUID.randomUUID().toString(),
+            "user",
+            ImmutableMap.of("credential", CLIENT_CREDS),
+            ImmutableMap.of());
+
+    restCatalog_ = new RESTCatalog(context,
+        (config) -> HTTPClient.builder(config).uri(REST_URI).build());
+    HiveConf conf = new HiveConf(IcebergRESTCatalog.class);
+    restCatalog_.setConf(conf);
+    restCatalog_.initialize(
+        CATALOG_NAME,
+        ImmutableMap.of(
+            CatalogProperties.URI, REST_URI,
+            "credential", CLIENT_CREDS,
+            CatalogProperties.WAREHOUSE_LOCATION, WAREHOUSE_LOCATION)
+    );
+  }
+
+  private String getRequiredProperty(Properties properties, String key) {
+    String value = properties.getProperty(key);
+    if (value == null) {
+      throw new IllegalStateException(
+          String.format("Missing property of IcebergRESTCatalog: %s", key));
+    }
+    return value;
+  }
+
+  public String getUri() {
+    return REST_URI;
+  }
+
+  @Override
+  public Table createTable(
+      TableIdentifier identifier,
+      Schema schema,
+      PartitionSpec spec,
+      String location,
+      Map<String, String> properties) {
+    throw new UnsupportedOperationException(
+        "CREATE TABLE is not implemented for REST catalog");
+  }
+
+  public ImmutableList<String> listNamespaces() {
+    ImmutableList.Builder<String> ret = ImmutableList.builder();
+    for (Namespace ns : restCatalog_.listNamespaces()) {
+      ret.add(ns.toString());
+    }
+    return ret.build();
+  }
+
+  public List<TableIdentifier> listTables(String namespace) {
+    return restCatalog_.listTables(Namespace.of(namespace));
+  }
+
+  @Override
+  public Table loadTable(FeIcebergTable feTable) throws TableLoadingException {
+    TableIdentifier tableId = IcebergUtil.getIcebergTableIdentifier(feTable);
+    return loadTable(tableId, null, null);
+  }
+
+  @Override
+  public Table loadTable(TableIdentifier tableId, String tableLocation,
+      Map<String, String> properties) throws IcebergTableLoadingException {
+    return restCatalog_.loadTable(tableId);
+  }
+
+  @Override
+  public boolean dropTable(FeIcebergTable feTable, boolean purge) {
+    throw new UnsupportedOperationException(
+        "DROP TABLE is not implemented for REST catalog");
+  }
+
+  @Override
+  public boolean dropTable(String dbName, String tblName, boolean purge) {
+    throw new UnsupportedOperationException(
+        "DROP TABLE is not implemented for REST catalog");
+  }
+
+  @Override
+  public void renameTable(FeIcebergTable feTable, TableIdentifier newTableId) {
+    throw new UnsupportedOperationException(
+        "RENAME TABLE is not implemented for REST catalog");
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 596c8517b..02cb9aaa7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -396,6 +396,10 @@ public class CatalogdMetaProvider implements MetaProvider {
         .build();
   }
 
+  public String getURI() {
+    return "Catalogd (URI TODO)";
+  }
+
   public CacheStats getCacheStats() {
     return cache_.stats();
   }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index e52b98203..626879f11 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -85,6 +85,10 @@ class DirectMetaProvider implements MetaProvider {
     initMsClientPool();
   }
 
+  public String getURI() {
+    return "HMS (URI TODO)";
+  }
+
   private static synchronized void initMsClientPool() {
     // Lazy-init the metastore client pool based on the backend configuration.
     // TODO(todd): this should probably be a process-wide singleton.
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java
new file mode 100644
index 000000000..cb8d6624b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/local/IcebergMetaProvider.java
@@ -0,0 +1,600 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.types.Types;
+import org.apache.impala.authorization.AuthorizationChecker;
+import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.DataSource;
+import org.apache.impala.catalog.FileDescriptor;
+import org.apache.impala.catalog.Function;
+import org.apache.impala.catalog.HdfsCachePool;
+import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsPartitionLocationCompressor;
+import org.apache.impala.catalog.HdfsStorageDescriptor;
+import org.apache.impala.catalog.IcebergContentFileStore;
+import org.apache.impala.catalog.IcebergFileMetadataLoader;
+import org.apache.impala.catalog.IcebergTableLoadingException;
+import org.apache.impala.catalog.PuffinStatsLoader;
+import org.apache.impala.catalog.SqlConstraints;
+import org.apache.impala.catalog.VirtualColumn;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
+import org.apache.impala.catalog.iceberg.IcebergRESTCatalog;
+import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.thrift.TBriefTableMeta;
+import org.apache.impala.thrift.TIcebergContentFileStore;
+import org.apache.impala.thrift.TIcebergTable;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialTableInfo;
+import org.apache.impala.thrift.TValidWriteIdList;
+import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.IcebergSchemaConverter;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.errorprone.annotations.Immutable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.SnapshotSummary.TOTAL_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP;
+import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READ;
+
+public class IcebergMetaProvider implements MetaProvider {
+  private final static Logger LOG = 
LoggerFactory.getLogger(IcebergMetaProvider.class);
+
+  private AtomicReference<? extends AuthorizationChecker> authzChecker_;
+  private final AuthorizationPolicy authPolicy_ = new AuthorizationPolicy();
+
+  private IcebergRESTCatalog iceCatalog_;
+
+  Properties properties_;
+
+  public IcebergMetaProvider(Properties properties) {
+    properties_ = properties;
+    iceCatalog_ = initCatalog();
+  }
+
+  public String getURI() {
+    return "Iceberg REST (" + iceCatalog_.getUri() + ")";
+  }
+
+  private IcebergRESTCatalog initCatalog() {
+    return IcebergRESTCatalog.getInstance(properties_);
+  }
+
+  public void setAuthzChecker(
+      AtomicReference<? extends AuthorizationChecker> authzChecker) {
+    authzChecker_ = authzChecker;
+  }
+
+  @Override
+  public Iterable<HdfsCachePool> getHdfsCachePools() {
+    throw new UnsupportedOperationException(
+        "HDFSCachePools are not supported in IcebergMetaProvider");
+  }
+
+  @Override
+  public AuthorizationPolicy getAuthPolicy() {
+    return authPolicy_;
+  }
+
+  @Override
+  public boolean isReady() {
+    // Direct provider is always ready since we don't need to wait for
+    // an update from any external process.
+    return true;
+  }
+
+  @Override
+  public void waitForIsReady(long timeoutMs) {
+    // NOOP
+  }
+
+  @Override
+  public ImmutableList<String> loadDbList() throws TException {
+    return iceCatalog_.listNamespaces();
+  }
+
+  @Override
+  public Database loadDb(String dbName) throws TException {
+    Database db = new Database();
+    db.setName(dbName);
+    return db;
+  }
+
+  @Override
+  public ImmutableCollection<TBriefTableMeta> loadTableList(String dbName)
+      throws TException {
+    ImmutableList.Builder<TBriefTableMeta> ret = ImmutableList.builder();
+    Namespace ns = Namespace.of(dbName);
+    for (TableIdentifier tid : iceCatalog_.listTables(ns.toString())) {
+      try {
+        org.apache.iceberg.Table tbl = iceCatalog_.loadTable(tid, null, null);
+        TBriefTableMeta briefMeta = new 
TBriefTableMeta(getIcebergTableName(tbl));
+        briefMeta.setMsType("TABLE");
+        ret.add(briefMeta);
+      } catch (NoSuchTableException | IcebergTableLoadingException e) {
+        // Ignore tables that cannot be loaded.
+        LOG.error(e.toString());
+      }
+    }
+    return ret.build();
+  }
+
+  String getIcebergTableName(org.apache.iceberg.Table tbl) {
+    return tbl.name().substring(tbl.name().lastIndexOf('.') + 1);
+  }
+
+  @Override
+  public Pair<Table, TableMetaRef> getTableIfPresent(String dbName, String 
tblName) {
+    try {
+      return loadTable(dbName, tblName);
+    } catch (TException e) {
+      LOG.error("Failed to load table", e);
+      return null;
+    }
+  }
+
+  @Override
+  public Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
+      throws TException {
+    try {
+      Table msTable = new Table();
+      msTable.setDbName(dbName);
+      Namespace ns = Namespace.of(dbName);
+      org.apache.iceberg.Table tbl = iceCatalog_.loadTable(
+          TableIdentifier.of(ns, tableName), null, null);
+      msTable.setTableName(getIcebergTableName(tbl));
+      msTable.setSd(createStorageDescriptor(tbl));
+      // Iceberg partitioning is not stored in HMS.
+      msTable.setPartitionKeys(Collections.emptyList());
+      msTable.setParameters(createTableProps(tbl));
+      msTable.setTableType(TableType.EXTERNAL_TABLE.toString());
+      // Only allow READONLY operations.
+      MetastoreShim.setTableAccessType(msTable, ACCESSTYPE_READ);
+      long loadingTime = System.currentTimeMillis();
+      TableMetaRef ref = new TableMetaRefImpl(dbName, tableName, msTable, tbl,
+          loadingTime);
+      return Pair.create(msTable, ref);
+    } catch (ImpalaRuntimeException|IcebergTableLoadingException e) {
+      throw new IllegalStateException(
+          String.format("Error loading Iceberg table %s.%s", dbName, 
tableName), e);
+    }
+  }
+
+  private StorageDescriptor createStorageDescriptor(org.apache.iceberg.Table 
tbl)
+      throws ImpalaRuntimeException {
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setInputFormat(HdfsFileFormat.ICEBERG.inputFormat());
+    sd.setOutputFormat(HdfsFileFormat.ICEBERG.outputFormat());
+    sd.setSortCols(Collections.emptyList());
+    SerDeInfo serde = new SerDeInfo();
+    serde.setSerializationLib(HdfsFileFormat.ICEBERG.serializationLib());
+    serde.setParameters(Collections.emptyMap());
+    sd.setSerdeInfo(serde);
+    sd.setCols(IcebergSchemaConverter.convertToHiveSchema(tbl.schema()));
+    Path p = new Path(tbl.location());
+    sd.setLocation(FileSystemUtil.createFullyQualifiedPath(p).toString());
+    return sd;
+  }
+
+  private Map<String, String> createTableProps(org.apache.iceberg.Table tbl) {
+    Map<String, String> props = new HashMap<>(tbl.properties());
+    Snapshot currentSnapshot = tbl.currentSnapshot();
+    if (currentSnapshot != null) {
+      if (props.get(StatsSetupConst.ROW_COUNT) == null) {
+        props.put(StatsSetupConst.ROW_COUNT,
+            String.valueOf(currentSnapshot.summary().get(TOTAL_RECORDS_PROP)));
+      }
+      if (props.get(StatsSetupConst.TOTAL_SIZE) == null) {
+        props.put(StatsSetupConst.TOTAL_SIZE,
+            
String.valueOf(currentSnapshot.summary().get(TOTAL_FILE_SIZE_PROP)));
+      }
+    }
+    return props;
+  }
+
+  @Override
+  public String loadNullPartitionKeyValue() throws MetaException, TException {
+    return "__HIVE_DEFAULT_PARTITION__";
+  }
+
+  @Override
+  public List<PartitionRef> loadPartitionList(TableMetaRef table)
+      throws MetaException, TException {
+    TableMetaRefImpl ref = (TableMetaRefImpl)table;
+    Preconditions.checkState(!ref.isPartitioned());
+    return ImmutableList.of(new 
PartitionRefImpl(PartitionRefImpl.UNPARTITIONED_NAME));
+  }
+
+  @Override
+  public SqlConstraints loadConstraints(
+      TableMetaRef table, Table msTbl) throws TException {
+    return null;
+  }
+
+  @Override
+  public Map<String, PartitionMetadata> loadPartitionsByRefs(
+      TableMetaRef table, List<String> partitionColumnNames,
+      ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs) throws CatalogException, TException {
+    Map<String, PartitionMetadata> ret = new HashMap<>();
+    ret.put("", new PartitionMetadataImpl(((TableMetaRefImpl)table).msTable_));
+    return ret;
+  }
+
+  /**
+   * We model partitions slightly differently to Hive. So, in the case of an
+   * unpartitioned table, we have to create a fake Partition object which has 
the
+   * metadata of the table.
+   */
+  private Map<String, PartitionMetadata> loadUnpartitionedPartition(
+      TableMetaRefImpl table, List<PartitionRef> partitionRefs,
+      ListMap<TNetworkAddress> hostIndex) throws CatalogException {
+    Map<String, PartitionMetadata> ret = new HashMap<>();
+    ret.put("", new PartitionMetadataImpl(((TableMetaRefImpl)table).msTable_));
+    return ret;
+  }
+
+  @Override
+  public List<String> loadFunctionNames(String dbName) throws TException {
+    throw new UnsupportedOperationException(
+        "Functions not supported by IcebergMetaProvider");
+  }
+
+
+  @Override
+  public ImmutableList<Function> loadFunction(String dbName, String 
functionName)
+      throws TException {
+    throw new UnsupportedOperationException(
+        "Functions not supported by IcebergMetaProvider");
+  }
+
+  @Override
+  public ImmutableList<DataSource> loadDataSources() throws TException {
+    throw new UnsupportedOperationException(
+        "DataSource not supported by IcebergMetaProvider");
+  }
+
+  @Override
+  public DataSource loadDataSource(String dsName) throws TException {
+    throw new UnsupportedOperationException(
+        "DataSource not supported by IcebergMetaProvider");
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef 
table,
+      List<String> colNames) throws TException {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
+    TableMetaRefImpl tblImpl = (TableMetaRefImpl) table;
+    org.apache.iceberg.Table iceTbl = tblImpl.iceApiTbl_;
+    Map<Integer, PuffinStatsLoader.PuffinStatsRecord> puffinStats =
+        PuffinStatsLoader.loadPuffinStats(iceTbl, tblImpl.fullName(),
+            -1, Collections.emptySet());
+
+    List<ColumnStatisticsObj> res = new ArrayList<>();
+    for (String colName : colNames) {
+      org.apache.iceberg.types.Types.NestedField field =
+          iceTbl.schema().findField(colName);
+      int fieldId = field.fieldId();
+      PuffinStatsLoader.PuffinStatsRecord stats = puffinStats.get(fieldId);
+      if (stats != null) {
+        long ndv = stats.ndv;
+
+        LongColumnStatsData ndvData = new LongColumnStatsData();
+        ndvData.setNumDVs(ndv);
+
+        ColumnStatisticsData ndvColStatsData = 
createNdvColStatsData(field.type(), ndv);
+        ColumnStatisticsObj statsObj = new ColumnStatisticsObj(colName,
+            "" /* should be the type */, ndvColStatsData);
+        res.add(statsObj);
+      }
+    }
+    return res;
+  }
+
+  private ColumnStatisticsData createNdvColStatsData(
+      org.apache.iceberg.types.Type type, long ndv) {
+    if (type instanceof Types.BooleanType) {
+      // No NDV can be set for BooleanColumnStatsData.
+      BooleanColumnStatsData ndvData = new BooleanColumnStatsData();
+      return ColumnStatisticsData.booleanStats(ndvData);
+    } else if (type instanceof Types.IntegerType
+        || type instanceof Types.LongType
+        || type instanceof Types.TimestampType) {
+      LongColumnStatsData ndvData = new LongColumnStatsData();
+      ndvData.setNumDVs(ndv);
+      return ColumnStatisticsData.longStats(ndvData);
+    } else if (type instanceof Types.DateType) {
+      DateColumnStatsData ndvData = new DateColumnStatsData();
+      ndvData.setNumDVs(ndv);
+      return ColumnStatisticsData.dateStats(ndvData);
+    } else if (type instanceof Types.FloatType || type instanceof 
Types.DoubleType) {
+      DoubleColumnStatsData ndvData = new DoubleColumnStatsData();
+      ndvData.setNumDVs(ndv);
+      return ColumnStatisticsData.doubleStats(ndvData);
+    } else if (type instanceof Types.StringType) {
+      StringColumnStatsData ndvData = new StringColumnStatsData();
+      ndvData.setNumDVs(ndv);
+      return ColumnStatisticsData.stringStats(ndvData);
+    } else if (type instanceof Types.BinaryType) {
+      // No NDV can be set for BinaryColumnStatsData.
+      BinaryColumnStatsData ndvData = new BinaryColumnStatsData();
+      return ColumnStatisticsData.binaryStats(ndvData);
+    } else if (type instanceof Types.DecimalType) {
+      // No NDV can be set for DecimalColumnStatsData.
+      DecimalColumnStatsData ndvData = new DecimalColumnStatsData();
+      ndvData.setNumDVs(ndv);
+      return ColumnStatisticsData.decimalStats(ndvData);
+    } else {
+      return new ColumnStatisticsData();
+    }
+  }
+
+  @Immutable
+  private static class PartitionRefImpl implements PartitionRef {
+    private static final String UNPARTITIONED_NAME = "";
+    private final String name_;
+
+    public PartitionRefImpl(String name) {
+      this.name_ = name;
+    }
+
+    @Override
+    public String getName() {
+      return name_;
+    }
+  }
+
+  private static class PartitionMetadataImpl implements PartitionMetadata {
+    private final Table msTable_;
+
+    public PartitionMetadataImpl(Table msTable) {
+      this.msTable_ = msTable;
+    }
+
+    @Override
+    public Map<String, String> getHmsParameters() { return 
Collections.emptyMap(); }
+
+    @Override
+    public long getWriteId() {
+      return -1;
+    }
+
+    @Override
+    public HdfsStorageDescriptor getInputFormatDescriptor() {
+      String tblName = msTable_.getDbName() + "." + msTable_.getTableName();
+      try {
+        return HdfsStorageDescriptor.fromStorageDescriptor(tblName, 
msTable_.getSd());
+      } catch (HdfsStorageDescriptor.InvalidStorageDescriptorException e) {
+        throw new LocalCatalogException(String.format(
+            "Invalid input format descriptor for table %s", tblName), e);
+      }
+    }
+
+    @Override
+    public HdfsPartitionLocationCompressor.Location getLocation() {
+      return new HdfsPartitionLocationCompressor(0).new Location(
+          msTable_.getSd().getLocation());
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getFileDescriptors() {
+      return ImmutableList.of();
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getInsertFileDescriptors() {
+      return ImmutableList.of();
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getDeleteFileDescriptors() {
+      return ImmutableList.of();
+    }
+
+    @Override
+    public boolean hasIncrementalStats() {
+      return false; // Incremental stats not supported for Iceberg tables.
+    }
+
+    @Override
+    public byte[] getPartitionStats() {
+      return null;
+    }
+
+    @Override
+    public boolean isMarkedCached() {
+      return false;
+    }
+
+    @Override
+    public long getLastCompactionId() {
+      throw new UnsupportedOperationException("Compaction id is not provided 
with " +
+          "IcebergMetaProvider implementation");
+    }
+  }
+
+  private class TableMetaRefImpl implements TableMetaRef {
+
+    private final String dbName_;
+    private final String tableName_;
+    private final Table msTable_;
+    private final long loadingTimeMs_;
+    private final HdfsPartitionLocationCompressor partitionLocationCompressor_;
+    private final org.apache.iceberg.Table iceApiTbl_;
+
+    public TableMetaRefImpl(String dbName, String tableName, Table msTable,
+                            org.apache.iceberg.Table iceApiTbl, long 
loadingTimeMs) {
+      this.dbName_ = dbName;
+      this.tableName_ = tableName;
+      this.msTable_ = msTable;
+      this.iceApiTbl_ = iceApiTbl;
+      this.loadingTimeMs_ = loadingTimeMs;
+      this.partitionLocationCompressor_ = new HdfsPartitionLocationCompressor(
+          msTable.getPartitionKeysSize(),
+          Lists.newArrayList(msTable.getSd().getLocation()));
+    }
+
+    @Override
+    public boolean isPartitioned() {
+      return msTable_.getPartitionKeysSize() != 0;
+    }
+
+    @Override
+    public boolean isMarkedCached() {
+      return false;
+    }
+
+    @Override
+    public List<String> getPartitionPrefixes() {
+      return partitionLocationCompressor_.getPrefixes();
+    }
+
+    @Override
+    public boolean isTransactional() {
+      return AcidUtils.isTransactionalTable(msTable_.getParameters());
+    }
+
+    public String fullName() { return dbName_ + "." + tableName_; }
+
+    @Override
+    public List<VirtualColumn> getVirtualColumns() {
+      List<VirtualColumn> ret = new ArrayList<>();
+      ret.add(VirtualColumn.INPUT_FILE_NAME);
+      ret.add(VirtualColumn.FILE_POSITION);
+      ret.add(VirtualColumn.PARTITION_SPEC_ID);
+      ret.add(VirtualColumn.ICEBERG_PARTITION_SERIALIZED);
+      ret.add(VirtualColumn.ICEBERG_DATA_SEQUENCE_NUMBER);
+      return ret;
+    }
+
+    @Override
+    public long getCatalogVersion() {
+      return 0;
+    }
+
+    @Override
+    public long getLoadedTimeMs() {
+      return loadingTimeMs_;
+    }
+  }
+
+  @Override
+  public TValidWriteIdList getValidWriteIdList(TableMetaRef ref) {
+    return null;
+  }
+
+  /**
+   * Fetches the latest compaction id from HMS and compares with partition 
metadata in
+   * cache. If a partition is stale due to compaction, removes it from metas.
+   */
+  public List<PartitionRef> checkLatestCompaction(String dbName, String 
tableName,
+      TableMetaRef table, Map<PartitionRef, PartitionMetadata> metas) throws 
TException {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public TPartialTableInfo loadIcebergTable(final TableMetaRef table) throws 
TException {
+    TableMetaRefImpl tableRefImpl = (TableMetaRefImpl)table;
+    TableParams tableParams = new TableParams(tableRefImpl.msTable_);
+    org.apache.iceberg.Table apiTable = loadIcebergApiTable(table, tableParams,
+        tableRefImpl.msTable_);
+
+    TPartialTableInfo ret = new TPartialTableInfo();
+    TIcebergTable iceTable = new TIcebergTable();
+    if (apiTable.currentSnapshot() != null) {
+      iceTable.setCatalog_snapshot_id(apiTable.currentSnapshot().snapshotId());
+    }
+    iceTable.setDefault_partition_spec_id(apiTable.spec().specId());
+    ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+    iceTable.setContent_files(getTContentFileStore(table, apiTable, 
hostIndex));
+    iceTable.setPartition_stats(Collections.emptyMap());
+    ret.setIceberg_table(iceTable);
+    ret.setNetwork_addresses(hostIndex.getList());
+    return ret;
+  }
+
+  @Override
+  public org.apache.iceberg.Table loadIcebergApiTable(final TableMetaRef table,
+      TableParams params, Table msTable) throws TException {
+    return ((TableMetaRefImpl)table).iceApiTbl_;
+  }
+
+  public String getLocation(final TableMetaRef table) {
+    return ((TableMetaRefImpl)table).msTable_.getSd().getLocation();
+  }
+
+  private TIcebergContentFileStore getTContentFileStore(final TableMetaRef 
table,
+      org.apache.iceberg.Table apiTable, ListMap<TNetworkAddress> hostIndex) {
+    try {
+      TableScan scan = apiTable.newScan();
+      GroupedContentFiles groupedFiles = new 
GroupedContentFiles(scan.planFiles());
+      IcebergFileMetadataLoader iceFml = new IcebergFileMetadataLoader(
+          apiTable, Collections.emptyList(), hostIndex, groupedFiles,
+          false);
+      iceFml.load();
+      IcebergContentFileStore contentFileStore = new IcebergContentFileStore(
+          apiTable, iceFml.getLoadedIcebergFds(), groupedFiles);
+      return contentFileStore.toThrift();
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "Exception occurred during loading Iceberg file metadata", e);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 1973498a7..c35bd72af 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -89,6 +89,10 @@ public class LocalCatalog implements FeCatalog {
     defaultKuduMasterHosts_ = defaultKuduMasterHosts;
   }
 
+  public String getProviderURI() {
+    return metaProvider_.getURI();
+  }
+
   @Override
   public List<? extends FeDb> getDbs(PatternMatcher matcher) {
     loadDbs();
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 15ade59f7..992e7993b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -57,6 +57,7 @@ import com.google.errorprone.annotations.Immutable;
  */
 public interface MetaProvider {
 
+  String getURI();
   /**
    * Get the authorization policy. This acts as a repository of authorization
    * metadata.
diff --git a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java 
b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
index b4359a8cb..ad639835a 100644
--- a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
+++ b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
@@ -16,15 +16,26 @@
 // under the License.
 package org.apache.impala.service;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.List;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
 import org.apache.impala.authorization.AuthorizationChecker;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.FeCatalog;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.local.CatalogdMetaProvider;
+import org.apache.impala.catalog.local.IcebergMetaProvider;
 import org.apache.impala.catalog.local.LocalCatalog;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
 import org.apache.thrift.TException;
@@ -48,8 +59,14 @@ public abstract class FeCatalogManager {
    * configuration.
    */
   public static FeCatalogManager createFromBackendConfig() {
-    if (BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) {
-      return new LocalImpl();
+    TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
+    if (cfg.use_local_catalog) {
+      if (!cfg.catalogd_deployed) {
+        // Currently Iceberg REST Catalog is the only implementation.
+        return new IcebergRestCatalogImpl();
+      } else {
+        return new LocalImpl();
+      }
     } else {
       return new CatalogdImpl();
     }
@@ -155,6 +172,80 @@ public abstract class FeCatalogManager {
     }
   }
 
+  /**
+   * Implementation which creates LocalCatalog instances and uses an Iceberg 
REST
+   * Catalog.
+   * TODO(boroknagyz): merge with LocalImpl
+   */
+  private static class IcebergRestCatalogImpl extends FeCatalogManager {
+    private static IcebergMetaProvider PROVIDER;
+
+    @Override
+    public synchronized FeCatalog getOrCreateCatalog() {
+      if (PROVIDER == null) {
+        try {
+          PROVIDER = initProvider();
+        } catch (IOException e) {
+          throw new IllegalStateException("Create IcebergMetaProvider failed", 
e);
+        }
+      }
+      return new LocalCatalog(PROVIDER, null);
+    }
+
+    IcebergMetaProvider initProvider() throws IOException {
+      TBackendGflags flags = BackendConfig.INSTANCE.getBackendCfg();
+      String catalogConfigDir = flags.catalog_config_dir;
+      Preconditions.checkState(catalogConfigDir != null &&
+          !catalogConfigDir.isEmpty());
+      List<String> files = listFiles(catalogConfigDir);
+      Preconditions.checkState(files.size() == 1,
+          String.format("Expected number of files in directory %s is one, 
found %d files",
+              catalogConfigDir, files.size()));
+      String configFile = catalogConfigDir + Path.SEPARATOR + files.get(0);
+      Properties props = readPropertiesFile(configFile);
+      // In the future we can expect different catalog types, but currently we 
only
+      // support Iceberg REST Catalogs.
+      checkPropertyValue(configFile, props, "connector.name", "iceberg");
+      checkPropertyValue(configFile, props, "iceberg.catalog.type", "rest");
+      return new IcebergMetaProvider(props);
+    }
+
+    private List<String> listFiles(String dirPath) {
+      File dir = new File(dirPath);
+      Preconditions.checkState(dir.exists() && dir.isDirectory());
+      return Stream.of(dir.listFiles())
+          .filter(file -> !file.isDirectory())
+          .map(File::getName)
+          .collect(Collectors.toList());
+    }
+
+    private Properties readPropertiesFile(String file) throws IOException {
+      Properties props = new Properties();
+      props.load(new FileInputStream(file));
+      return props;
+    }
+
+    private void checkPropertyValue(String configFile, Properties props, 
String key,
+        String expectedValue) {
+      if (!props.containsKey(key)) {
+        throw new IllegalStateException(String.format(
+            "Expected property %s was not specified in config file %s.", key,
+            configFile));
+      }
+      String actualValue = props.getProperty(key);
+      if (!Objects.equals(actualValue, expectedValue)) {
+        throw new IllegalStateException(String.format(
+            "Expected value of '%s' is '%s', but found '%s' in config file %s",
+            key, expectedValue, actualValue, configFile));
+      }
+    }
+
+    @Override
+    TUpdateCatalogCacheResponse updateCatalogCache(TUpdateCatalogCacheRequest 
req) {
+      return null;
+    }
+  }
+
   /**
    * Implementation which returns a provided catalog instance, used by tests.
    * No updates from the statestore are permitted.
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 63a1151ea..4c86fa3b4 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -130,6 +130,7 @@ import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeSystemTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
+import org.apache.impala.catalog.local.LocalCatalog;
 import org.apache.impala.catalog.IcebergPositionDeleteTable;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.ImpaladTableUsageTracker;
@@ -174,6 +175,7 @@ import org.apache.impala.thrift.TClientRequest;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TCommentOnParams;
+import org.apache.impala.thrift.TConvertTableRequest;
 import org.apache.impala.thrift.TCopyTestCaseReq;
 import org.apache.impala.thrift.TCounter;
 import org.apache.impala.thrift.TCreateDropRoleParams;
@@ -185,28 +187,28 @@ import org.apache.impala.thrift.TDescribeHistoryParams;
 import org.apache.impala.thrift.TDescribeOutputStyle;
 import org.apache.impala.thrift.TDescribeResult;
 import org.apache.impala.thrift.TErrorCode;
-import org.apache.impala.thrift.TImpalaTableType;
 import org.apache.impala.thrift.TDescribeTableParams;
-import org.apache.impala.thrift.TIcebergDmlFinalizeParams;
-import org.apache.impala.thrift.TIcebergOperation;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExecutorGroupSet;
 import org.apache.impala.thrift.TExplainResult;
 import org.apache.impala.thrift.TFinalizeParams;
 import org.apache.impala.thrift.TFunctionCategory;
+import org.apache.impala.thrift.TGetCatalogInfoResult;
 import org.apache.impala.thrift.TGetCatalogMetricsResult;
 import org.apache.impala.thrift.TGetTableHistoryResult;
 import org.apache.impala.thrift.TGetTableHistoryResultItem;
 import org.apache.impala.thrift.TGrantRevokePrivParams;
 import org.apache.impala.thrift.TGrantRevokeRoleParams;
+import org.apache.impala.thrift.TIcebergDmlFinalizeParams;
+import org.apache.impala.thrift.TIcebergOperation;
+import org.apache.impala.thrift.TIcebergOptimizationMode;
+import org.apache.impala.thrift.TIcebergOptimizeParams;
 import org.apache.impala.thrift.TImpalaQueryOptions;
+import org.apache.impala.thrift.TImpalaTableType;
 import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TLoadDataReq;
 import org.apache.impala.thrift.TLoadDataResp;
 import org.apache.impala.thrift.TMetadataOpRequest;
-import org.apache.impala.thrift.TConvertTableRequest;
-import org.apache.impala.thrift.TIcebergOptimizationMode;
-import org.apache.impala.thrift.TIcebergOptimizeParams;
 import org.apache.impala.thrift.TPlanExecInfo;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TPoolConfig;
@@ -549,8 +551,8 @@ public class Frontend {
     impaladTableUsageTracker_ = ImpaladTableUsageTracker.createFromConfig(
         BackendConfig.INSTANCE);
     queryHookManager_ = 
QueryEventHookManager.createFromConfig(BackendConfig.INSTANCE);
-    if (!isBackendTest) {
-      TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
+    TBackendGflags cfg = BackendConfig.INSTANCE.getBackendCfg();
+    if (!isBackendTest && cfg.catalogd_deployed) {
       metaStoreClientPool_ = new MetaStoreClientPool(1, 
cfg.initial_hms_cnxn_timeout_s);
       if (MetastoreShim.getMajorVersion() > 2) {
         transactionKeepalive_ = new TransactionKeepalive(metaStoreClientPool_);
@@ -1121,6 +1123,20 @@ public class Frontend {
     return resp;
   }
 
+  public TGetCatalogInfoResult getCatalogInfo() {
+    TGetCatalogInfoResult resp = new TGetCatalogInfoResult();
+    resp.setInfo(Lists.newArrayList());
+    if (BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) {
+      resp.info.add("Catalog Type: Local");
+      FeCatalog catalog = getCatalog();
+      String provider = ((LocalCatalog)catalog).getProviderURI();
+      if (provider != null && !provider.isEmpty()) {
+        resp.info.add("Provider: " + provider);
+      }
+    }
+    return resp;
+  }
+
   /**
    * Keeps track of retries when handling InconsistentMetadataFetchExceptions.
    * Whenever a Catalog object is acquired (e.g., getCatalog), operations that 
access
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java 
b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 1a797d162..6e1390cb2 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -57,12 +57,14 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCivilTime;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDescribeDbParams;
+import org.apache.impala.thrift.TDescribeHistoryParams;
 import org.apache.impala.thrift.TDescribeResult;
 import org.apache.impala.thrift.TDescribeTableParams;
 import org.apache.impala.thrift.TDescriptorTable;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TFunctionCategory;
 import org.apache.impala.thrift.TGetAllHadoopConfigsResponse;
+import org.apache.impala.thrift.TGetCatalogInfoResult;
 import org.apache.impala.thrift.TGetCatalogMetricsResult;
 import org.apache.impala.thrift.TGetDataSrcsParams;
 import org.apache.impala.thrift.TGetDataSrcsResult;
@@ -72,10 +74,8 @@ import org.apache.impala.thrift.TGetFunctionsParams;
 import org.apache.impala.thrift.TGetFunctionsResult;
 import org.apache.impala.thrift.TGetHadoopConfigRequest;
 import org.apache.impala.thrift.TGetHadoopConfigResponse;
-import org.apache.impala.thrift.TGetHadoopGroupsRequest;
-import org.apache.impala.thrift.TGetHadoopGroupsResponse;
-import org.apache.impala.thrift.TGetTableHistoryResult;
 import org.apache.impala.thrift.TGetMetadataTablesParams;
+import org.apache.impala.thrift.TGetTableHistoryResult;
 import org.apache.impala.thrift.TGetTablesParams;
 import org.apache.impala.thrift.TGetTablesResult;
 import org.apache.impala.thrift.TLoadDataReq;
@@ -85,14 +85,13 @@ import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TQueryCompleteContext;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TSessionState;
 import org.apache.impala.thrift.TShowFilesParams;
 import org.apache.impala.thrift.TShowGrantPrincipalParams;
 import org.apache.impala.thrift.TShowRolesParams;
 import org.apache.impala.thrift.TShowStatsOp;
 import org.apache.impala.thrift.TShowStatsParams;
 import org.apache.impala.thrift.TStringLiteral;
-import org.apache.impala.thrift.TDescribeHistoryParams;
-import org.apache.impala.thrift.TSessionState;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
@@ -260,6 +259,17 @@ public class JniFrontend {
     }
   }
 
+  public byte[] getCatalogInfo() throws ImpalaException {
+    Preconditions.checkNotNull(frontend_);
+    TGetCatalogInfoResult info = frontend_.getCatalogInfo();
+    try {
+      TSerializer serializer = new TSerializer(protocolFactory_);
+      return serializer.serialize(info);
+    } catch (TException e) {
+      throw new InternalException(e.getMessage());
+    }
+  }
+
   /**
    * Returns a list of table names matching an optional pattern.
    *
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java 
b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index d299b7107..6f3f0fc5e 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -131,6 +131,12 @@ public class IcebergUtil {
 
   private static final Logger LOG = LoggerFactory.getLogger(IcebergUtil.class);
 
+  public static final String ICEBERG_REST_URI = "iceberg_rest_uri";
+  public static final String ICEBERG_REST_USER_ID = "iceberg_rest_user_id";
+  public static final String ICEBERG_REST_USER_SECRET = 
"iceberg_rest_user_secret";
+  public static final String ICEBERG_REST_WAREHOUSE_LOCATION =
+      "iceberg_rest_warehouse_location";
+
   /**
    * Returns the corresponding catalog implementation for 'feTable'.
    */
@@ -1227,7 +1233,9 @@ public class IcebergUtil {
         CatalogProperties.FILE_IO_IMPL, 
CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
         CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
         CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES,
-        CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH));
+        CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
+        ICEBERG_REST_URI, ICEBERG_REST_USER_ID, ICEBERG_REST_USER_SECRET,
+        ICEBERG_REST_WAREHOUSE_LOCATION));
 
     for (String key : configKeys) {
       String val = conf.get("iceberg." + key);
diff --git a/java/iceberg-rest-catalog-test/pom.xml 
b/java/iceberg-rest-catalog-test/pom.xml
new file mode 100644
index 000000000..ea3278a1a
--- /dev/null
+++ b/java/iceberg-rest-catalog-test/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0"?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <parent>
+    <groupId>org.apache.impala</groupId>
+    <artifactId>impala-parent</artifactId>
+    <version>5.0.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>impala-iceberg-rest-catalog-test</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Iceberg REST Catalog Test</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+          <!-- IMPALA-9468: Avoid pulling in netty for security reasons -->
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-servlet</artifactId>
+        </exclusion>
+      </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs-client</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.iceberg</groupId>
+        <artifactId>iceberg-api</artifactId>
+        <version>${iceberg.version}</version>
+      </dependency>
+
+
+      <dependency>
+        <groupId>org.apache.iceberg</groupId>
+        <artifactId>iceberg-core</artifactId>
+        <version>${iceberg.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.iceberg</groupId>
+        <artifactId>iceberg-core</artifactId>
+        <version>${iceberg.version}</version>
+        <classifier>tests</classifier>
+      </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.11.0</version>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>3.0.0</version>
+        <configuration>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git 
a/java/iceberg-rest-catalog-test/src/main/java/org/apache/iceberg/rest/IcebergRestCatalogTest.java
 
b/java/iceberg-rest-catalog-test/src/main/java/org/apache/iceberg/rest/IcebergRestCatalogTest.java
new file mode 100644
index 000000000..bce4dd553
--- /dev/null
+++ 
b/java/iceberg-rest-catalog-test/src/main/java/org/apache/iceberg/rest/IcebergRestCatalogTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// We use the org.apache.iceberg.rest package because some classes
+// are package-private. This means this code is more likely to
+// break on Iceberg version updates. On the long-term we might
+// switch to an open-source Iceberg REST Catalog.
+package org.apache.iceberg.rest;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergRestCatalogTest {
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergRestCatalogTest.class);
+  private static final ObjectMapper MAPPER = RESTObjectMapper.mapper();
+
+  static final int REST_PORT = 9084;
+
+  private Server httpServer;
+
+  public IcebergRestCatalogTest() {}
+
+  private static String getWarehouseLocation() {
+    String FILESYSTEM_PREFIX = System.getenv("FILESYSTEM_PREFIX");
+    String HADOOP_CATALOG_LOCATION = 
"/test-warehouse/iceberg_test/hadoop_catalog";
+    if (FILESYSTEM_PREFIX != null && !FILESYSTEM_PREFIX.isEmpty()) {
+      return FILESYSTEM_PREFIX + HADOOP_CATALOG_LOCATION;
+    }
+    String DEFAULT_FS = System.getenv("DEFAULT_FS");
+    return DEFAULT_FS + HADOOP_CATALOG_LOCATION;
+  }
+
+  private Catalog initializeBackendCatalog() throws IOException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    return new HadoopCatalog(conf, getWarehouseLocation());
+  }
+
+  public void start(boolean join) throws Exception {
+    Catalog catalog = initializeBackendCatalog();
+    RESTCatalogAdapter adapter = new RESTCatalogAdapter(catalog) {
+      @Override
+      public <T extends RESTResponse> T execute(
+          RESTCatalogAdapter.HTTPMethod method,
+          String path,
+          Map<String, String> queryParams,
+          Object body,
+          Class<T> responseType,
+          Map<String, String> headers,
+          Consumer<ErrorResponse> errorHandler) {
+        Object request = roundTripSerialize(body, "request");
+        T response =
+            super.execute(
+                method, path, queryParams, request, responseType, headers, 
errorHandler);
+        T responseAfterSerialization = roundTripSerialize(response, 
"response");
+        return responseAfterSerialization;
+      }
+    };
+
+    RESTCatalogServlet servlet = new RESTCatalogServlet(adapter);
+    ServletContextHandler context = new ServletContextHandler(
+        ServletContextHandler.NO_SESSIONS);
+    ServletHolder servletHolder = new ServletHolder(servlet);
+    context.addServlet(servletHolder, "/*");
+    context.insertHandler(new GzipHandler());
+
+    this.httpServer = new Server(REST_PORT);
+    httpServer.setHandler(context);
+    httpServer.start();
+
+    if (join) {
+      httpServer.join();
+    }
+  }
+
+  public void stop() throws Exception {
+    if (httpServer != null) {
+      httpServer.stop();
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    new IcebergRestCatalogTest().start(true);
+  }
+
+  public static <T> T roundTripSerialize(T payload, String description) {
+   if (payload != null) {
+     LOG.trace(payload.toString());
+     try {
+       if (payload instanceof RESTMessage) {
+         return (T) MAPPER.readValue(
+             MAPPER.writeValueAsString(payload), payload.getClass());
+       } else {
+         // use Map so that Jackson doesn't try to instantiate ImmutableMap
+         // from payload.getClass()
+         return (T) MAPPER.readValue(
+             MAPPER.writeValueAsString(payload), Map.class);
+       }
+     } catch (Exception e) {
+       LOG.warn(e.toString());
+       throw new RuntimeException(
+          String.format("Failed to serialize and deserialize %s: %s",
+              description, payload), e);
+     }
+   }
+   return null;
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 705928955..f98edd3b8 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -408,6 +408,7 @@ under the License.
   <modules>
     <module>datagenerator</module>
     <module>puffin-data-generator</module>
+    <module>iceberg-rest-catalog-test</module>
     <module>executor-deps</module>
     <module>ext-data-source</module>
     <module>../fe</module>
diff --git a/testdata/bin/minicluster_trino/Dockerfile 
b/testdata/bin/minicluster_trino/Dockerfile
index fa88a68c6..041f03b34 100644
--- a/testdata/bin/minicluster_trino/Dockerfile
+++ b/testdata/bin/minicluster_trino/Dockerfile
@@ -27,7 +27,7 @@ RUN \
     echo "-DHADOOP_USER_NAME=$USERNAME" >> /etc/trino/jvm.config
 
 COPY hive-site.xml core-site.xml hdfs-site.xml /etc/
-COPY iceberg.properties hive.properties /etc/trino/catalog/
+COPY iceberg_rest.properties iceberg.properties hive.properties 
/etc/trino/catalog/
 
 # Expose the Trino port
 EXPOSE 9091
diff --git a/testdata/bin/minicluster_trino/Dockerfile 
b/testdata/bin/minicluster_trino/iceberg_rest.properties
similarity index 58%
copy from testdata/bin/minicluster_trino/Dockerfile
copy to testdata/bin/minicluster_trino/iceberg_rest.properties
index fa88a68c6..8a1f4803c 100644
--- a/testdata/bin/minicluster_trino/Dockerfile
+++ b/testdata/bin/minicluster_trino/iceberg_rest.properties
@@ -15,19 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Use an official Trino image as the base
-FROM trinodb/trino:latest
-
-# Use the developer username, so Trino will have write access to HDFS
-ARG USERNAME
-
-RUN \
-    sed -i 's/http-server.http.port=8080/http-server.http.port=9091/' 
/etc/trino/config.properties && \
-    sed -i 's/localhost:8080/localhost:9091/' /etc/trino/config.properties && \
-    echo "-DHADOOP_USER_NAME=$USERNAME" >> /etc/trino/jvm.config
-
-COPY hive-site.xml core-site.xml hdfs-site.xml /etc/
-COPY iceberg.properties hive.properties /etc/trino/catalog/
-
-# Expose the Trino port
-EXPOSE 9091
+connector.name=iceberg
+iceberg.catalog.type=rest
+iceberg.rest-catalog.uri=http://localhost:9084
+iceberg.rest-catalog.warehouse=hdfs://localhost:20500/test-warehouse/iceberg_test/hadoop_catalog
+hive.config.resources=/etc/hive-site.xml,/etc/hdfs-site.xml,/etc/core-site.xml
diff --git a/testdata/bin/minicluster_trino/Dockerfile 
b/testdata/bin/run-iceberg-rest-server.sh
old mode 100644
new mode 100755
similarity index 58%
copy from testdata/bin/minicluster_trino/Dockerfile
copy to testdata/bin/run-iceberg-rest-server.sh
index fa88a68c6..1de003508
--- a/testdata/bin/minicluster_trino/Dockerfile
+++ b/testdata/bin/run-iceberg-rest-server.sh
@@ -1,3 +1,5 @@
+#!/bin/bash
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,19 +17,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Use an official Trino image as the base
-FROM trinodb/trino:latest
-
-# Use the developer username, so Trino will have write access to HDFS
-ARG USERNAME
-
-RUN \
-    sed -i 's/http-server.http.port=8080/http-server.http.port=9091/' 
/etc/trino/config.properties && \
-    sed -i 's/localhost:8080/localhost:9091/' /etc/trino/config.properties && \
-    echo "-DHADOOP_USER_NAME=$USERNAME" >> /etc/trino/jvm.config
+# We can expect that mvn can be found in /usr/local/bin
+# set bin/bootstrap_build.sh and bin/bootstrap_system.sh
+PATH=${PATH}:/usr/local/bin
 
-COPY hive-site.xml core-site.xml hdfs-site.xml /etc/
-COPY iceberg.properties hive.properties /etc/trino/catalog/
+cd $IMPALA_HOME
+. bin/impala-config.sh
 
-# Expose the Trino port
-EXPOSE 9091
+mvn -f "java/iceberg-rest-catalog-test/pom.xml" exec:java \
+    -Dexec.mainClass="org.apache.iceberg.rest.IcebergRestCatalogTest"
diff --git a/testdata/bin/minicluster_trino/Dockerfile 
b/testdata/configs/catalog_configs/iceberg_rest_config/rest.properties
similarity index 58%
copy from testdata/bin/minicluster_trino/Dockerfile
copy to testdata/configs/catalog_configs/iceberg_rest_config/rest.properties
index fa88a68c6..3723f2fe6 100644
--- a/testdata/bin/minicluster_trino/Dockerfile
+++ b/testdata/configs/catalog_configs/iceberg_rest_config/rest.properties
@@ -15,19 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# Use an official Trino image as the base
-FROM trinodb/trino:latest
+connector.name=iceberg
+iceberg.catalog.type=rest
+iceberg.rest-catalog.uri=http://localhost:9084
 
-# Use the developer username, so Trino will have write access to HDFS
-ARG USERNAME
-
-RUN \
-    sed -i 's/http-server.http.port=8080/http-server.http.port=9091/' 
/etc/trino/config.properties && \
-    sed -i 's/localhost:8080/localhost:9091/' /etc/trino/config.properties && \
-    echo "-DHADOOP_USER_NAME=$USERNAME" >> /etc/trino/jvm.config
-
-COPY hive-site.xml core-site.xml hdfs-site.xml /etc/
-COPY iceberg.properties hive.properties /etc/trino/catalog/
-
-# Expose the Trino port
-EXPOSE 9091
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-rest-catalog.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rest-catalog.test
new file mode 100644
index 000000000..3c5325235
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-rest-catalog.test
@@ -0,0 +1,90 @@
+====
+---- QUERY
+SHOW DATABASES;
+---- RESULTS: VERIFY_IS_SUBSET
+'_impala_builtins','System database for Impala builtin functions'
+'hadoop_catalog_test',''
+'ice',''
+'iceberg_partitioned_orc',''
+'iceberg_resolution_test',''
+---- TYPES
+STRING, STRING
+====
+---- QUERY
+USE ice;
+====
+---- QUERY
+SELECT lat FROM airports_parquet WHERE iata = '00R';
+---- RESULTS
+30.68586111
+---- TYPES
+DOUBLE
+====
+---- QUERY
+SELECT * from ice.airports_parquet.history;
+---- RESULTS
+2021-10-18 16:53:23.865000000,2304960110511088609,NULL,true
+---- TYPES
+TIMESTAMP, BIGINT, BIGINT, BOOLEAN
+====
+---- QUERY
+DESCRIBE ice.airports_parquet
+---- RESULTS
+'iata','string','','true'
+'airport','string','','true'
+'city','string','','true'
+'state','double','','true'
+'country','string','','true'
+'lat','double','','true'
+'lon','double','','true'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+DESCRIBE FORMATTED ice.airports_parquet;
+---- RESULTS: VERIFY_IS_SUBSET
+'# col_name            ','data_type           ','comment             '
+'','NULL','NULL'
+'iata','string','NULL'
+'airport','string','NULL'
+'city','string','NULL'
+'state','double','NULL'
+'country','string','NULL'
+'lat','double','NULL'
+'lon','double','NULL'
+'','NULL','NULL'
+'# Detailed Table Information','NULL','NULL'
+'Database:           ','ice                 ','NULL'
+'OwnerType:          ','USER                ','NULL'
+'Owner:              ','null                ','NULL'
+'Location:           
','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/airports_parquet','NULL'
+'Erasure Coding Policy:','NONE                ','NULL'
+'Table Type:         ','EXTERNAL_TABLE      ','NULL'
+'Table Parameters:','NULL','NULL'
+'','EXTERNAL            ','TRUE                '
+'','bucketing_version   ','2                   '
+'','engine.hive.enabled ','true                '
+'','gc.enabled          ','TRUE                '
+'','numFiles            ','1                   '
+'','storage_handler     
','org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
+'','table_type          ','ICEBERG             '
+'','write.format.default','parquet             '
+'','NULL','NULL'
+'# Storage Information','NULL','NULL'
+'SerDe Library:      ','org.apache.iceberg.mr.hive.HiveIcebergSerDe','NULL'
+'InputFormat:        
','org.apache.iceberg.mr.hive.HiveIcebergInputFormat','NULL'
+'OutputFormat:       
','org.apache.iceberg.mr.hive.HiveIcebergOutputFormat','NULL'
+'Compressed:         ','No                  ','NULL'
+'Sort Columns:       ','[]                  ','NULL'
+'','NULL','NULL'
+'# Constraints','NULL','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+show table stats ice.airports_parquet;
+---- RESULTS
+row_regex:0,1,'.+KB','NOT CACHED','NOT 
CACHED','PARQUET','false','.*/test-warehouse/iceberg_test/hadoop_catalog/ice/airports_parquet','$ERASURECODE_POLICY'
+---- TYPES
+BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING, STRING
+====
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index e6a1957ef..4b7e9654b 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -603,6 +603,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         expected_subscribers += 1
       if "--enable_catalogd_ha" in options:
         expected_subscribers += 1
+      elif "--no_catalogd" in options:
+        expected_subscribers -= 1
 
     if wait_for_backends:
       statestored.service.wait_for_live_subscribers(expected_subscribers,
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 142cf123e..2d032e78f 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -93,9 +93,11 @@ def post_data(url, data):
 # * The docker minicluster with one container per process connected to a 
user-defined
 #   bridge network.
 class ImpalaCluster(object):
-  def __init__(self, docker_network=None, use_admission_service=False):
+  def __init__(self, docker_network=None, use_admission_service=False,
+      deploy_catalogd=True):
     self.docker_network = docker_network
     self.use_admission_service = use_admission_service
+    self.deploy_catalogd = deploy_catalogd
     self.refresh()
 
   @classmethod
@@ -231,7 +233,7 @@ class ImpalaCluster(object):
       # process to write a minidump.
       assert len(self.impalads) >= expected_num_impalads
       assert self.statestored is not None
-      assert self.catalogd is not None
+      if (self.deploy_catalogd): assert self.catalogd is not None
 
     sleep_interval = 0.5
     # Wait for each webserver to be ready.
@@ -274,7 +276,7 @@ class ImpalaCluster(object):
           expected_num=num_impalads, actual_num=len(self.impalads))
     if not self.statestored:
       msg += "statestored failed to start.\n"
-    if not self.catalogd:
+    if self.deploy_catalogd and not self.catalogd:
       msg += "catalogd failed to start.\n"
     if msg:
       raise RuntimeError(msg)
diff --git a/tests/custom_cluster/test_iceberg_rest_catalog.py 
b/tests/custom_cluster/test_iceberg_rest_catalog.py
new file mode 100644
index 000000000..067682601
--- /dev/null
+++ b/tests/custom_cluster/test_iceberg_rest_catalog.py
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+import os
+import socket
+import time
+import pytest
+import signal
+import subprocess
+import sys
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+REST_SERVER_PORT = 9084
+IMPALA_HOME = os.environ['IMPALA_HOME']
+START_ARGS = 'start_args'
+IMPALAD_ARGS = """--use_local_catalog=true --catalogd_deployed=false
+    
--catalog_config_dir={}/testdata/configs/catalog_configs/iceberg_rest_config"""\
+        .format(IMPALA_HOME)
+
+
+class TestIcebergRestCatalog(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  def setup_method(self, method):
+    # Invoke start-impala-cluster.py with '--no_catalogd'
+    start_args = "--no_catalogd"
+    if START_ARGS in method.__dict__:
+      start_args = method.__dict__[START_ARGS] + " " + start_args
+    method.__dict__[START_ARGS] = start_args
+
+    try:
+      self._start_rest_server()
+      self._wait_for_rest_server(300)
+      super(TestIcebergRestCatalog, self).setup_method(method)
+    except Exception as e:
+      self._stop_rest_server()
+      raise e
+
+  def teardown_method(self, method):
+    self._stop_rest_server()
+    super(TestIcebergRestCatalog, self).teardown_method(method)
+
+  def _start_rest_server(self):
+    self.process = subprocess.Popen(
+        'testdata/bin/run-iceberg-rest-server.sh',
+        stdout=sys.stdout, stderr=sys.stderr, shell=True,
+        preexec_fn=os.setsid, cwd=IMPALA_HOME)
+
+  def _stop_rest_server(self):
+    if self.process:
+      os.killpg(self.process.pid, signal.SIGTERM)
+
+  def _wait_for_rest_server(self, timeout_s):
+    sleep_interval_s = 0.5
+    start_time = time.time()
+    while time.time() - start_time < timeout_s:
+      s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+      try:
+        if s.connect_ex(('localhost', REST_SERVER_PORT)) == 0:
+          print("Iceberg REST server is available.")
+          return
+      finally:
+        s.close()
+      time.sleep(sleep_interval_s)
+    raise Exception(
+        "Webserver did not become available within {} 
seconds.".format(timeout_s))
+
+  @CustomClusterTestSuite.with_args(
+     impalad_args=IMPALAD_ARGS)
+  @pytest.mark.execute_serially
+  def test_rest_catalog_basic(self, vector):
+    self.run_test_case('QueryTest/iceberg-rest-catalog', vector, use_db="ice")
diff --git a/www/catalog.tmpl b/www/catalog.tmpl
index c444bb130..32a8eaf49 100644
--- a/www/catalog.tmpl
+++ b/www/catalog.tmpl
@@ -20,6 +20,24 @@ under the License.
 
 <h2>Catalog</h2>
 
+{{?info}}
+<h3>Info</h3>
+<table class='table table-bordered table-hover'>
+  <tr>
+    <th>Value</th>
+  </tr>
+{{/info}}
+{{#info}}
+  <tr>
+    <td><tt>{{value}}</tt></td>
+  </tr>
+{{/info}}
+{{?info}}
+</table>
+{{/info}}
+
+
+
 {{?has_large_tables}}
 <div class="card">
   <div class="card-header">

Reply via email to