This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c77a4575201b23bc44f7c3560a34e3acf2f44b2d
Author: wzhou-code <[email protected]>
AuthorDate: Fri Aug 18 20:30:19 2023 -0700

    IMPALA-7131: Support external data sources in LocalCatalog mode
    
    This patch makes external data source working in LocalCatalog mode:
     - Add APIs in CatalogdMetaProvider to fetch DataSource from Catalog
       server through RPC.
     - Add getDataSources() and getDataSource() in LocalCatalog.
     - Add LocalDataSourceTable class for loading DataSource table in
       LocalCatalog.
     - Handle request for loading DataSource in CatalogServiceCatalog on
       Catalog server.
     - Enable tests which are skipped by
       SkipIfCatalogV2.data_sources_unsupported().
       Remove SkipIfCatalogV2.data_sources_unsupported().
     - Add end-to-end tests for LocalCatalog mode.
    
    Testing:
     - Passed core tests
    
    Change-Id: I40841c9be9064ac67771c4d3f5acbb3b552a2e55
    Reviewed-on: http://gerrit.cloudera.org:8080/20574
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Wenzhe Zhou <[email protected]>
---
 common/thrift/CatalogService.thrift                |   5 +-
 .../impala/catalog/CatalogServiceCatalog.java      |  40 +++++-
 .../impala/catalog/local/CatalogdMetaProvider.java | 100 ++++++++++++++
 .../impala/catalog/local/DirectMetaProvider.java   |  15 +++
 .../apache/impala/catalog/local/LocalCatalog.java  |  55 +++++++-
 .../impala/catalog/local/LocalDataSourceTable.java | 146 +++++++++++++++++++++
 .../apache/impala/catalog/local/LocalTable.java    |   2 +-
 .../apache/impala/catalog/local/MetaProvider.java  |  11 ++
 .../apache/impala/service/CatalogOpExecutor.java   |   2 -
 .../queries/QueryTest/data-source-tables.test      |  56 ++++++++
 .../queries/QueryTest/jdbc-data-source.test        |  94 ++++++++++++-
 tests/common/skip.py                               |   7 -
 tests/custom_cluster/test_ext_data_sources.py      |  47 +++++++
 tests/metadata/test_ddl.py                         |   1 -
 tests/metadata/test_metadata_query_statements.py   |   1 -
 tests/query_test/test_ext_data_sources.py          |  27 ++--
 16 files changed, 573 insertions(+), 36 deletions(-)

diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index f984bbdb5..1f915393f 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -590,7 +590,8 @@ enum CatalogLookupStatus {
   // change over the lifetime of a table with queries like invalidate 
metadata. In such
   // cases this lookup status is set and the caller can retry the fetch.
   // TODO: Fix partition lookup logic to not do it with IDs.
-  PARTITION_NOT_FOUND
+  PARTITION_NOT_FOUND,
+  DATA_SOURCE_NOT_FOUND
 }
 
 // RPC response for GetPartialCatalogObject.
@@ -609,6 +610,8 @@ struct TGetPartialCatalogObjectResponse {
 
   // Functions are small enough that we return them wholesale.
   7: optional list<Types.TFunction> functions
+  // DataSource objects are small enough that we return them wholesale.
+  8: optional list<CatalogObjects.TDataSource> data_srcs
 }
 
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 7f7d74157..fad2f5035 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -97,6 +97,7 @@ import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCatalogUpdateResult;
+import org.apache.impala.thrift.TDataSource;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TEventProcessorMetrics;
 import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
@@ -874,9 +875,8 @@ public class CatalogServiceCatalog extends Catalog {
         min.setFn(fnObject);
         break;
       case DATA_SOURCE:
-        // These are currently not cached by v2 impalad.
-        // TODO(todd): handle these items.
-        return null;
+        min.setData_source(new TDataSource(obj.data_source));
+        break;
       case HDFS_CACHE_POOL:
         // HdfsCachePools just contain the name strings. Publish them as 
minimal objects.
         return obj;
@@ -3649,6 +3649,38 @@ public class CatalogServiceCatalog extends Catalog {
         versionLock_.readLock().unlock();
       }
     }
+    case DATA_SOURCE: {
+      TDataSource dsDesc = 
Preconditions.checkNotNull(req.object_desc.data_source);
+      versionLock_.readLock().lock();
+      try {
+        TGetPartialCatalogObjectResponse resp = new 
TGetPartialCatalogObjectResponse();
+        if (dsDesc.getName() == null || dsDesc.getName().isEmpty()) {
+          // Return all DataSource objects.
+          List<DataSource> data_srcs = getDataSources();
+          if (data_srcs == null || data_srcs.isEmpty()) {
+            resp.setData_srcs(Collections.emptyList());
+          } else {
+            List<TDataSource> thriftDataSrcs =
+                Lists.newArrayListWithCapacity(data_srcs.size());
+            for (DataSource ds : data_srcs) thriftDataSrcs.add(ds.toThrift());
+            resp.setData_srcs(thriftDataSrcs);
+          }
+        } else {
+          // Return the DataSource for the given name.
+          DataSource ds = getDataSource(dsDesc.getName());
+          if (ds == null) {
+            resp.setData_srcs(Collections.emptyList());
+          } else {
+            List<TDataSource> thriftDataSrcs = 
Lists.newArrayListWithCapacity(1);
+            thriftDataSrcs.add(ds.toThrift());
+            resp.setData_srcs(thriftDataSrcs);
+          }
+        }
+        return resp;
+      } finally {
+        versionLock_.readLock().unlock();
+      }
+    }
     default:
       throw new CatalogException("Unable to fetch partial info for type: " +
           req.object_desc.type);
@@ -3756,7 +3788,7 @@ public class CatalogServiceCatalog extends Catalog {
 
   /**
    * Return a partial view of information about global parts of the catalog (eg
-   * the list of tables, etc).
+   * the list of database names, etc).
    */
   private TGetPartialCatalogObjectResponse getPartialCatalogInfo(
       TGetPartialCatalogObjectRequest req) {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 218ffc4d5..1e10169c0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -57,6 +57,7 @@ import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogDeltaLog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogObjectCache;
+import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
@@ -81,6 +82,7 @@ import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TDataSource;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDbInfoSelector;
 import org.apache.impala.thrift.TErrorCode;
@@ -236,6 +238,8 @@ public class CatalogdMetaProvider implements MetaProvider {
   private static final String FUNCTIONS_STATS_CATEGORY = "Functions";
   private static final String RPC_STATS_CATEGORY = "RPCs";
   private static final String STORAGE_METADATA_LOAD_CATEGORY = "StorageLoad";
+  private static final String DATA_SOURCE_LIST_STATS_CATEGORY = 
"DataSourceLists";
+  private static final Object DS_OBJ_LIST_CACHE_KEY = new Object();
   private static final String RPC_REQUESTS =
       CATALOG_FETCH_PREFIX + "." + RPC_STATS_CATEGORY + ".Requests";
   private static final String RPC_BYTES =
@@ -455,6 +459,7 @@ public class CatalogdMetaProvider implements MetaProvider {
       case TABLE_NOT_FOUND:
       case TABLE_NOT_LOADED:
       case PARTITION_NOT_FOUND:
+      case DATA_SOURCE_NOT_FOUND:
         invalidateCacheForObject(req.object_desc);
         throw new InconsistentMetadataFetchException(
             String.format("Fetching %s failed. Could not find %s",
@@ -665,6 +670,14 @@ public class CatalogdMetaProvider implements MetaProvider {
     return req;
   }
 
+  private TGetPartialCatalogObjectRequest newReqForDataSource(String dsName) {
+    TGetPartialCatalogObjectRequest req = new 
TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.DATA_SOURCE);
+    if (dsName == null) dsName = "";
+    req.object_desc.setData_source(new TDataSource(dsName, "", "", ""));
+    return req;
+  }
 
   @Override
   public Database loadDb(final String dbName) throws TException {
@@ -1213,6 +1226,51 @@ public class CatalogdMetaProvider implements 
MetaProvider {
     return funcs.build();
   }
 
+  /**
+   * Get all DataSource objects from catalogd.
+   */
+  public ImmutableList<DataSource> loadDataSources() throws TException {
+    ImmutableList<TDataSource> thriftDataSrcs = loadWithCaching(
+        "DataSource object list", DATA_SOURCE_LIST_STATS_CATEGORY,
+        DS_OBJ_LIST_CACHE_KEY, new Callable<ImmutableList<TDataSource>>() {
+          @Override
+          public ImmutableList<TDataSource> call() throws Exception {
+            TGetPartialCatalogObjectRequest req = newReqForDataSource(null);
+            TGetPartialCatalogObjectResponse resp = sendRequest(req);
+            checkResponse(resp.data_srcs != null, req, "no existing 
DataSource");
+            return ImmutableList.copyOf(resp.data_srcs);
+          }
+      });
+    ImmutableList.Builder<DataSource> dataSrcBld = ImmutableList.builder();
+    for (TDataSource thriftDataSrc : thriftDataSrcs) {
+      dataSrcBld.add(DataSource.fromThrift(thriftDataSrc));
+    }
+    return dataSrcBld.build();
+  }
+
+  /**
+   * Get the DataSource object from catalogd for the given DataSource name.
+   */
+  public DataSource loadDataSource(String dsName) throws TException {
+    Preconditions.checkState(dsName != null && !dsName.isEmpty());
+    return loadWithCaching("DataSource object for " + dsName,
+        DATA_SOURCE_LIST_STATS_CATEGORY,
+        new DataSourceCacheKey(dsName.toLowerCase()), new 
Callable<DataSource>() {
+          @Override
+          public DataSource call() throws Exception {
+            TGetPartialCatalogObjectRequest req = newReqForDataSource(dsName);
+            TGetPartialCatalogObjectResponse resp = sendRequest(req);
+            checkResponse(resp.data_srcs != null, req, "missing expected 
DataSource");
+            if (resp.data_srcs.size() == 1) {
+              return DataSource.fromThrift(resp.data_srcs.get(0));
+            } else {
+              Preconditions.checkState(resp.data_srcs.size() == 0);
+              return null;
+            }
+          }
+      });
+  }
+
   /**
    * Invalidate portions of the cache as indicated by the provided request.
    *
@@ -1519,6 +1577,12 @@ public class CatalogdMetaProvider implements 
MetaProvider {
           DbCacheKey.DbInfoType.HMS_METADATA,
           DbCacheKey.DbInfoType.FUNCTION_NAMES), invalidated);
       break;
+    case DATA_SOURCE:
+      if (cache_.asMap().remove(DS_OBJ_LIST_CACHE_KEY) != null) {
+        invalidated.add("list of DataSource objects");
+      }
+      invalidateCacheForDataSource(obj.data_source.name, invalidated);
+      break;
     default:
       break;
     }
@@ -1580,6 +1644,18 @@ public class CatalogdMetaProvider implements 
MetaProvider {
     }
   }
 
+  /**
+   * Invalidate cached DataSource for the given DataSource name. If anything 
was
+   * invalidated, adds a human-readable string to 'invalidated' indicating the
+   * invalidated DataSource.
+   */
+  private void invalidateCacheForDataSource(String dsName, List<String> 
invalidated) {
+    DataSourceCacheKey key = new DataSourceCacheKey(dsName.toLowerCase());
+    if (cache_.asMap().remove(key) != null) {
+      invalidated.add("DataSource " + dsName);
+    }
+  }
+
   /**
    * Reference to a partition within a table. We remember the partition's ID 
and pass
    * that back to the catalog in subsequent requests back to fetch the details 
of the
@@ -2016,6 +2092,30 @@ public class CatalogdMetaProvider implements 
MetaProvider {
     }
   }
 
+  /**
+   * Cache key for a DataSource object.
+   */
+  @Immutable
+  private static class DataSourceCacheKey {
+    private final String dsName_;
+
+    DataSourceCacheKey(String dsName) {
+      dsName_ = dsName;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(dsName_, getClass());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof DataSourceCacheKey)) return false;
+      DataSourceCacheKey other = (DataSourceCacheKey)obj;
+      return dsName_.equals(other.dsName_);
+    }
+  }
+
   @VisibleForTesting
   static class SizeOfWeigher implements Weigher<Object, Object> {
     // Bypass flyweight objects like small boxed integers, Boolean.TRUE, 
enums, etc.
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 90bdba126..6fa3f098e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.FileMetadataLoader;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsCachePool;
@@ -335,6 +336,20 @@ class DirectMetaProvider implements MetaProvider {
         "Functions not supported by DirectMetaProvider");
   }
 
+  @Override
+  public ImmutableList<DataSource> loadDataSources() throws TException {
+    // See above.
+    throw new UnsupportedOperationException(
+        "DataSource not supported by DirectMetaProvider");
+  }
+
+  @Override
+  public DataSource loadDataSource(String dsName) throws TException {
+    // See above.
+    throw new UnsupportedOperationException(
+        "DataSource not supported by DirectMetaProvider");
+  }
+
   @Override
   public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef 
table,
       List<String> colNames) throws TException {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index c194e5aae..26f86823e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -27,6 +27,7 @@ import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.BuiltinsDb;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.DatabaseNotFoundException;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.FeCatalog;
@@ -43,14 +44,19 @@ import org.apache.impala.catalog.PartitionNotFoundException;
 import org.apache.impala.catalog.PrunablePartition;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TDataSource;
 import org.apache.impala.thrift.TGetPartitionStatsResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * Implementation of FeCatalog which runs within the impalad and fetches 
metadata
@@ -68,6 +74,8 @@ import com.google.common.base.Preconditions;
  * returned from its methods.
  */
 public class LocalCatalog implements FeCatalog {
+  private final static Logger LOG = 
LoggerFactory.getLogger(LocalCatalog.class);
+
   private final MetaProvider metaProvider_;
   private Map<String, FeDb> dbs_ = new HashMap<>();
   private Map<String, HdfsCachePool> hdfsCachePools_ = null;
@@ -108,7 +116,6 @@ public class LocalCatalog implements FeCatalog {
     dbs_ = dbs;
   }
 
-
   @Override
   public List<String> getTableNames(String dbName, PatternMatcher matcher)
       throws DatabaseNotFoundException {
@@ -152,7 +159,29 @@ public class LocalCatalog implements FeCatalog {
       throws CatalogException {
     // TODO(todd): this probably makes the /catalog page not load with an 
error.
     // We should probably disable that page in local-catalog mode.
-    throw new UnsupportedOperationException("LocalCatalog.getTCatalogObject");
+    Preconditions.checkNotNull(objectDesc, "invalid objectDesc");
+    if (objectDesc.type == TCatalogObjectType.DATA_SOURCE) {
+      // This function could be called by backend function
+      // CatalogOpExecutor::HandleDropDataSource() when cleaning jar file of 
data source.
+      TDataSource dsDesc = Preconditions.checkNotNull(objectDesc.data_source);
+      String dsName = dsDesc.getName();
+      if (dsName != null && !dsName.isEmpty()) {
+        try {
+          DataSource ds = metaProvider_.loadDataSource(dsName);
+          if (ds != null) {
+            TCatalogObject resultObj = new TCatalogObject();
+            resultObj.setType(TCatalogObjectType.DATA_SOURCE);
+            resultObj.setData_source(ds.toThrift());
+            return resultObj;
+          }
+        } catch (Exception e) {
+          LOG.info("Data source not found: " + dsName + ", " + e.getMessage());
+        }
+      }
+      throw new CatalogException("Data source not found: " + dsName);
+    } else {
+      throw new 
UnsupportedOperationException("LocalCatalog.getTCatalogObject");
+    }
   }
 
   @Override
@@ -195,14 +224,26 @@ public class LocalCatalog implements FeCatalog {
   }
 
   @Override
-  public List<? extends FeDataSource> getDataSources(
-      PatternMatcher createHivePatternMatcher) {
-    throw new UnsupportedOperationException("TODO");
+  public List<? extends FeDataSource> getDataSources(PatternMatcher matcher) {
+    try {
+      List<DataSource> dataSrcs = metaProvider_.loadDataSources();
+      return Catalog.filterCatalogObjectsByPattern(dataSrcs, matcher);
+    } catch (Exception e) {
+      LOG.info("Unable to load DataSource objects, ", e);
+      // Return empty list.
+      return Lists.newArrayList();
+    }
   }
 
   @Override
-  public FeDataSource getDataSource(String dataSourceName) {
-    throw new UnsupportedOperationException("TODO");
+  public FeDataSource getDataSource(String dsName) {
+    Preconditions.checkNotNull(dsName);
+    try {
+      return metaProvider_.loadDataSource(dsName);
+    } catch (Exception e) {
+      LOG.info("DataSource not found: " + dsName, e);
+      return null;
+    }
   }
 
   @Override
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/LocalDataSourceTable.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalDataSourceTable.java
new file mode 100644
index 000000000..93a2d6924
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDataSourceTable.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+import java.util.Set;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.catalog.DataSourceTable;
+import org.apache.impala.catalog.FeCatalogUtils;
+import org.apache.impala.catalog.FeDataSourceTable;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TDataSource;
+import org.apache.impala.thrift.TDataSourceTable;
+import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TResultSetMetadata;
+import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.TResultRowBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * DataSource table instance loaded from {@link LocalCatalog}.
+ *
+ * All DataSource properties are stored as table properties (persisted in the
+ * metastore). Tables that contain the TBL_PROP_DATA_SRC_NAME table parameter 
are
+ * assumed to be backed by an external data source.
+ */
+public class LocalDataSourceTable extends LocalTable implements 
FeDataSourceTable {
+  private final static Logger LOG = 
LoggerFactory.getLogger(LocalDataSourceTable.class);
+
+  private String initString_;
+  private TDataSource dataSource_;
+
+  public static LocalDataSourceTable load(LocalDb db, Table msTbl, 
TableMetaRef ref)
+      throws TableLoadingException {
+    Preconditions.checkNotNull(db);
+    Preconditions.checkNotNull(msTbl);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("load table: " + msTbl.getDbName() + "." + 
msTbl.getTableName());
+    }
+    if (msTbl.getPartitionKeysSize() > 0) {
+      throw new TableLoadingException("Data source table cannot contain 
clustering " +
+          "columns: " + msTbl.getTableName());
+    }
+    return new LocalDataSourceTable(db, msTbl, ref);
+  }
+
+  private LocalDataSourceTable(LocalDb db, Table msTbl, TableMetaRef ref)
+      throws TableLoadingException {
+    super(db, msTbl, ref);
+
+    String dataSourceName = getRequiredTableProperty(
+        msTbl, DataSourceTable.TBL_PROP_DATA_SRC_NAME, null);
+    String location = getRequiredTableProperty(
+        msTbl, DataSourceTable.TBL_PROP_LOCATION, dataSourceName);
+    String className = getRequiredTableProperty(
+        msTbl, DataSourceTable.TBL_PROP_CLASS, dataSourceName);
+    String apiVersionString = getRequiredTableProperty(
+        msTbl, DataSourceTable.TBL_PROP_API_VER, dataSourceName);
+    dataSource_ = new TDataSource(dataSourceName, location, className, 
apiVersionString);
+    initString_ = getRequiredTableProperty(
+        msTbl, DataSourceTable.TBL_PROP_INIT_STRING, dataSourceName);
+  }
+
+  private String getRequiredTableProperty(Table msTbl, String key, String 
dataSourceName)
+      throws TableLoadingException {
+    String val = msTbl.getParameters().get(key);
+    if (val == null) {
+      throw new TableLoadingException(String.format("Failed to load table %s " 
+
+          "produced by external data source %s. Missing required metadata: %s",
+          msTbl.getTableName(),
+          dataSourceName == null ? "<unknown>" : dataSourceName, key));
+    }
+    return val;
+  }
+
+  /**
+   * Gets the DataSource object.
+   */
+  @Override // FeDataSourceTable
+  public TDataSource getDataSource() { return dataSource_; }
+
+  /**
+   * Gets the table init string passed to the DataSource.
+   */
+  @Override // FeDataSourceTable
+  public String getInitString() { return initString_; }
+
+  @Override // FeDataSourceTable
+  public int getNumNodes() { return 1; }
+
+  /**
+   * Returns statistics on this table as a tabular result set. Used for the
+   * SHOW TABLE STATS statement. The schema of the returned TResultSet is set
+   * inside this method.
+   */
+  @Override // FeDataSourceTable
+  public TResultSet getTableStats() {
+    TResultSet result = new TResultSet();
+    TResultSetMetadata resultSchema = new TResultSetMetadata();
+    resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
+    result.setSchema(resultSchema);
+    TResultRowBuilder rowBuilder = new TResultRowBuilder();
+    rowBuilder.add(getNumRows());
+    result.addToRows(rowBuilder.get());
+    return result;
+  }
+
+  @Override
+  public TTableDescriptor toThriftDescriptor(
+      int tableId, Set<Long> referencedPartitions) {
+    TTableDescriptor tableDesc = new TTableDescriptor(tableId,
+        TTableType.DATA_SOURCE_TABLE, 
FeCatalogUtils.getTColumnDescriptors(this),
+        getNumClusteringCols(), getName(), getDb().getName());
+    tableDesc.setDataSourceTable(getDataSourceTable());
+    return tableDesc;
+  }
+
+  /**
+   * Returns a thrift {@link TDataSourceTable} structure for this DataSource 
table.
+   */
+  private TDataSourceTable getDataSourceTable() {
+    return new TDataSourceTable(dataSource_, initString_);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index 5e56cd9e9..62731ec4b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -115,7 +115,7 @@ abstract class LocalTable implements FeTable {
     } else if (IcebergTable.isIcebergTable(msTbl)) {
       t = LocalIcebergTable.loadIcebergTableViaMetaProvider(db, msTbl, ref);
     } else if (DataSourceTable.isDataSourceTable(msTbl)) {
-      // TODO(todd) support datasource table
+      t = LocalDataSourceTable.load(db, msTbl, ref);
     } else if (HdfsFileFormat.isHdfsInputFormatClass(
         msTbl.getSd().getInputFormat())) {
       t = LocalFsTable.load(db, msTbl, ref);
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 444ff0556..a9d8c6671 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
@@ -106,6 +107,16 @@ public interface MetaProvider {
   ImmutableList<Function> loadFunction(String dbName, String functionName)
       throws TException;
 
+  /**
+   * Retrieve all DataSource objects.
+   */
+  ImmutableList<DataSource> loadDataSources() throws TException;
+
+  /**
+   * Retrieve the DataSource object for the given DataSource name.
+   */
+  DataSource loadDataSource(String dsName) throws TException;
+
   /**
    * Load the given partitions from the specified table.
    *
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 2ef60d740..b958c352c 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2453,7 +2453,6 @@ public class CatalogOpExecutor {
 
   private void createDataSource(TCreateDataSourceParams params, 
TDdlExecResponse resp)
       throws ImpalaException {
-    // TODO(IMPALA-7131): support data sources with LocalCatalog.
     if (LOG.isTraceEnabled()) { LOG.trace("Adding DATA SOURCE: " + 
params.toString()); }
     DataSource dataSource = DataSource.fromThrift(params.getData_source());
     DataSource existingDataSource = 
catalog_.getDataSource(dataSource.getName());
@@ -2475,7 +2474,6 @@ public class CatalogOpExecutor {
 
   private void dropDataSource(TDropDataSourceParams params, TDdlExecResponse 
resp)
       throws ImpalaException {
-    // TODO(IMPALA-7131): support data sources with LocalCatalog.
     if (LOG.isTraceEnabled()) LOG.trace("Drop DATA SOURCE: " + 
params.toString());
     DataSource dataSource = catalog_.removeDataSource(params.getData_source());
     if (dataSource == null) {
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test 
b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
index d1469e81e..47de8f97f 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
@@ -1,5 +1,49 @@
 ====
 ---- QUERY
+# Create DataSource
+DROP DATA SOURCE IF EXISTS TestGenericDataSource;
+CREATE DATA SOURCE TestGenericDataSource
+LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/test-data-source.jar'
+CLASS 'org.apache.impala.extdatasource.AllTypesDataSource'
+API_VERSION 'V1';
+---- RESULTS
+'Data source has been created.'
+====
+---- QUERY
+# Show created DataSource
+SHOW DATA SOURCES LIKE 'testgenericdatasource';
+---- LABELS
+NAME,LOCATION,CLASS NAME,API VERSION
+---- RESULTS
+'testgenericdatasource',regex:'.*/test-warehouse/data-sources/test-data-source.jar','org.apache.impala.extdatasource.AllTypesDataSource','V1'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
+---- QUERY
+# Create DataSource table
+DROP TABLE IF EXISTS alltypes_datasource;
+CREATE TABLE alltypes_datasource (
+  id INT,
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  timestamp_col TIMESTAMP,
+  string_col STRING,
+  dec_col1 DECIMAL(9,0),
+  dec_col2 DECIMAL(10,0),
+  dec_col3 DECIMAL(20,10),
+  dec_col4 DECIMAL(38,37),
+  dec_col5 DECIMAL(10,5),
+  date_col DATE)
+PRODUCED BY DATA SOURCE TestGenericDataSource("TestInitString");
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
 # Gets all types including a row with a NULL value. The predicate pushed to
 # the data source is not actually used, but the second predicate is
 # evaluated by Impala.
@@ -129,3 +173,15 @@ where smallint_col IS DISTINCT FROM 11 and tinyint_col IS 
DISTINCT FROM 1)
 ---- TYPES
 BIGINT
 ====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_datasource;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Drop DataSource
+DROP DATA SOURCE TestGenericDataSource;
+---- RESULTS
+'Data source has been dropped.'
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test 
b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test
index 2bd068234..efc4a77d6 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test
@@ -1,7 +1,79 @@
 ====
 ---- QUERY
-# Test the jdbc data source
+# Create DataSource
+DROP DATA SOURCE IF EXISTS TestJdbcDataSource;
+CREATE DATA SOURCE TestJdbcDataSource
+LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-data-source.jar'
+CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
+API_VERSION 'V1';
+---- RESULTS
+'Data source has been created.'
+====
+---- QUERY
+# Show created DataSource
+SHOW DATA SOURCES LIKE 'testjdbcdatasource';
+---- LABELS
+NAME,LOCATION,CLASS NAME,API VERSION
+---- RESULTS
+'testjdbcdatasource',regex:'.*/test-warehouse/data-sources/jdbc-data-source.jar','org.apache.impala.extdatasource.jdbc.JdbcDataSource','V1'
+---- TYPES
+STRING,STRING,STRING,STRING
+====
 ---- QUERY
+# Create external JDBC DataSource table
+DROP TABLE IF EXISTS alltypes_jdbc_datasource;
+CREATE TABLE alltypes_jdbc_datasource (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSource(
+'{"database.type":"POSTGRES",
+"jdbc.url":"jdbc:postgresql://$INTERNAL_LISTEN_HOST:5432/functional",
+"jdbc.driver":"org.postgresql.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar",
+"dbcp.username":"hiveuser",
+"dbcp.password":"password",
+"table":"alltypes"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Create external JDBC DataSource table
+DROP TABLE IF EXISTS alltypes_jdbc_datasource_2;
+CREATE TABLE alltypes_jdbc_datasource_2 (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE TestJdbcDataSource(
+'{"database.type":"POSTGRES",
+"jdbc.url":"jdbc:postgresql://$INTERNAL_LISTEN_HOST:5432/functional",
+"jdbc.driver":"org.postgresql.Driver",
+"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar",
+"dbcp.username":"hiveuser",
+"dbcp.password":"password",
+"table":"AllTypesWithQuote",
+"column.mapping":"id=id, bool_col=Bool_col, tinyint_col=Tinyint_col, 
smallint_col=Smallint_col, int_col=Int_col, bigint_col=Bigint_col, 
float_col=Float_col, double_col=Double_col, date_string_col=Date_string_col, 
string_col=String_col, timestamp=Timestamp"}');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+# Test the jdbc DataSource
 # count(*) with a predicate evaluated by Impala
 select count(*) from alltypes_jdbc_datasource
 where float_col = 0 and string_col is not NULL
@@ -20,7 +92,7 @@ BIGINT
 ====
 ---- QUERY
 # Gets all types including a row with a NULL value. The predicate pushed to
-# the data source.
+# the DataSource.
 select *
 from alltypes_jdbc_datasource
 where id > 10 and int_col< 5 limit 5
@@ -103,3 +175,21 @@ order by a.id, b.id limit 10
 ---- TYPES
 INT, INT
 ====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Drop table
+DROP TABLE alltypes_jdbc_datasource_2;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+# Drop DataSource
+DROP DATA SOURCE TestJdbcDataSource;
+---- RESULTS
+'Data source has been dropped.'
+====
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 2b1ac76cb..a5980d5e9 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -220,13 +220,6 @@ class SkipIfCatalogV2:
       IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(),
       reason="Test is specific to old implementation of catalog.")
 
-  # TODO: IMPALA-7131: add support or update tests to reflect expected 
behaviour.
-  @classmethod
-  def data_sources_unsupported(self):
-    return pytest.mark.skipif(
-      IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(),
-      reason="IMPALA-7131: data sources not supported.")
-
   # TODO: IMPALA-8489: fix this bug.
   @classmethod
   def impala_8489(self):
diff --git a/tests/custom_cluster/test_ext_data_sources.py 
b/tests/custom_cluster/test_ext_data_sources.py
new file mode 100644
index 000000000..b8b465cfb
--- /dev/null
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIf
+
+
+class TestExtDataSources(CustomClusterTestSuite):
+  """Impala query tests for external data sources."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_catalog=true",
+      catalogd_args="--catalog_topic_mode=minimal")
+  def test_data_source_tables(self, vector, unique_database):
+    """Start Impala cluster in LocalCatalog Mode"""
+    self.run_test_case('QueryTest/data-source-tables', vector, 
use_db=unique_database)
+
+  @SkipIf.not_hdfs
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--use_local_catalog=true",
+      catalogd_args="--catalog_topic_mode=minimal")
+  def test_jdbc_data_source(self, vector, unique_database):
+    """Start Impala cluster in LocalCatalog Mode"""
+    self.run_test_case('QueryTest/jdbc-data-source', vector, 
use_db=unique_database)
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 308ed1e1c..1cb6fa3dd 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -1168,7 +1168,6 @@ class TestLibCache(TestDdlBase):
   # Run serially because this test inspects global impalad metrics.
   # TODO: The metrics checks could be relaxed to enable running this test in
   # parallel, but that might need a more general wait_for_metric_value().
-  @SkipIfCatalogV2.data_sources_unsupported()
   @pytest.mark.execute_serially
   def test_create_drop_data_src(self, vector, unique_database):
     """This will create, run, and drop the same data source repeatedly, 
exercising
diff --git a/tests/metadata/test_metadata_query_statements.py 
b/tests/metadata/test_metadata_query_statements.py
index 071339f37..aa3608a62 100644
--- a/tests/metadata/test_metadata_query_statements.py
+++ b/tests/metadata/test_metadata_query_statements.py
@@ -146,7 +146,6 @@ class TestMetadataQueryStatements(ImpalaTestSuite):
         compare=compare_describe_formatted)
 
   @pytest.mark.execute_serially # due to data src setup/teardown
-  @SkipIfCatalogV2.data_sources_unsupported()
   def test_show_data_sources(self, vector):
     try:
       self.__create_data_sources()
diff --git a/tests/query_test/test_ext_data_sources.py 
b/tests/query_test/test_ext_data_sources.py
index 17587bc91..b3a3a4260 100644
--- a/tests/query_test/test_ext_data_sources.py
+++ b/tests/query_test/test_ext_data_sources.py
@@ -17,9 +17,12 @@
 
 from __future__ import absolute_import, division, print_function
 
+import re
+
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfCatalogV2, SkipIf
+from tests.common.skip import SkipIf, SkipIfDockerizedCluster
 from tests.common.test_dimensions import create_uncompressed_text_dimension
+from tests.util.filesystem_utils import FILESYSTEM_PREFIX
 
 
 class TestExtDataSources(ImpalaTestSuite):
@@ -59,15 +62,15 @@ class TestExtDataSources(ImpalaTestSuite):
         properties[fields[1].rstrip()] = fields[2].rstrip()
     return properties
 
-  @SkipIfCatalogV2.data_sources_unsupported()
   @SkipIf.not_hdfs
   def test_verify_jdbc_table_properties(self, vector):
     jdbc_tbl_name = "functional.alltypes_jdbc_datasource"
     properties = self._get_tbl_properties(jdbc_tbl_name)
     # Verify data source related table properties
     assert properties['__IMPALA_DATA_SOURCE_NAME'] == 'jdbcdatasource'
-    assert properties['__IMPALA_DATA_SOURCE_LOCATION'] == \
-        
'hdfs://localhost:20500/test-warehouse/data-sources/jdbc-data-source.jar'
+    expected_location =\
+        
"{0}/test-warehouse/data-sources/jdbc-data-source.jar".format(FILESYSTEM_PREFIX)
+    assert re.search(expected_location, 
properties['__IMPALA_DATA_SOURCE_LOCATION'])
     assert properties['__IMPALA_DATA_SOURCE_CLASS'] == \
         'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
     assert properties['__IMPALA_DATA_SOURCE_API_VERSION'] == 'V1'
@@ -76,11 +79,15 @@ class TestExtDataSources(ImpalaTestSuite):
     assert 'table\\":\\"alltypes' \
         in properties['__IMPALA_DATA_SOURCE_INIT_STRING']
 
-  @SkipIfCatalogV2.data_sources_unsupported()
-  def test_data_source_tables(self, vector):
-    self.run_test_case('QueryTest/data-source-tables', vector)
+  def test_data_source_tables(self, vector, unique_database):
+    self.run_test_case('QueryTest/data-source-tables', vector, 
use_db=unique_database)
 
-  @SkipIfCatalogV2.data_sources_unsupported()
+  # The test uses pre-written jdbc external tables where the jdbc.url refers 
to Postgres
+  # server via full URI, i.e. url starts with 
'jdbc:postgresql://hostname:5432/'. In the
+  # dockerised environment, the Postgres server is running on a different 
host. It is
+  # configured to accept only local connection. Have to skip this test for 
dockerised
+  # cluster since Postgres server is not accessible from impalad.
+  @SkipIfDockerizedCluster.internal_hostname
   @SkipIf.not_hdfs
-  def test_jdbc_data_source(self, vector):
-    self.run_test_case('QueryTest/jdbc-data-source', vector)
+  def test_jdbc_data_source(self, vector, unique_database):
+    self.run_test_case('QueryTest/jdbc-data-source', vector, 
use_db=unique_database)


Reply via email to