This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c77a4575201b23bc44f7c3560a34e3acf2f44b2d Author: wzhou-code <[email protected]> AuthorDate: Fri Aug 18 20:30:19 2023 -0700 IMPALA-7131: Support external data sources in LocalCatalog mode This patch makes external data source working in LocalCatalog mode: - Add APIs in CatalogdMetaProvider to fetch DataSource from Catalog server through RPC. - Add getDataSources() and getDataSource() in LocalCatalog. - Add LocalDataSourceTable class for loading DataSource table in LocalCatalog. - Handle request for loading DataSource in CatalogServiceCatalog on Catalog server. - Enable tests which are skipped by SkipIfCatalogV2.data_sources_unsupported(). Remove SkipIfCatalogV2.data_sources_unsupported(). - Add end-to-end tests for LocalCatalog mode. Testing: - Passed core tests Change-Id: I40841c9be9064ac67771c4d3f5acbb3b552a2e55 Reviewed-on: http://gerrit.cloudera.org:8080/20574 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Wenzhe Zhou <[email protected]> --- common/thrift/CatalogService.thrift | 5 +- .../impala/catalog/CatalogServiceCatalog.java | 40 +++++- .../impala/catalog/local/CatalogdMetaProvider.java | 100 ++++++++++++++ .../impala/catalog/local/DirectMetaProvider.java | 15 +++ .../apache/impala/catalog/local/LocalCatalog.java | 55 +++++++- .../impala/catalog/local/LocalDataSourceTable.java | 146 +++++++++++++++++++++ .../apache/impala/catalog/local/LocalTable.java | 2 +- .../apache/impala/catalog/local/MetaProvider.java | 11 ++ .../apache/impala/service/CatalogOpExecutor.java | 2 - .../queries/QueryTest/data-source-tables.test | 56 ++++++++ .../queries/QueryTest/jdbc-data-source.test | 94 ++++++++++++- tests/common/skip.py | 7 - tests/custom_cluster/test_ext_data_sources.py | 47 +++++++ tests/metadata/test_ddl.py | 1 - tests/metadata/test_metadata_query_statements.py | 1 - tests/query_test/test_ext_data_sources.py | 27 ++-- 16 files changed, 573 insertions(+), 36 deletions(-) diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index f984bbdb5..1f915393f 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -590,7 +590,8 @@ enum CatalogLookupStatus { // change over the lifetime of a table with queries like invalidate metadata. In such // cases this lookup status is set and the caller can retry the fetch. // TODO: Fix partition lookup logic to not do it with IDs. - PARTITION_NOT_FOUND + PARTITION_NOT_FOUND, + DATA_SOURCE_NOT_FOUND } // RPC response for GetPartialCatalogObject. @@ -609,6 +610,8 @@ struct TGetPartialCatalogObjectResponse { // Functions are small enough that we return them wholesale. 7: optional list<Types.TFunction> functions + // DataSource objects are small enough that we return them wholesale. + 8: optional list<CatalogObjects.TDataSource> data_srcs } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 7f7d74157..fad2f5035 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -97,6 +97,7 @@ import org.apache.impala.thrift.TCatalogInfoSelector; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TCatalogUpdateResult; +import org.apache.impala.thrift.TDataSource; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TEventProcessorMetrics; import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse; @@ -874,9 +875,8 @@ public class CatalogServiceCatalog extends Catalog { min.setFn(fnObject); break; case DATA_SOURCE: - // These are currently not cached by v2 impalad. - // TODO(todd): handle these items. - return null; + min.setData_source(new TDataSource(obj.data_source)); + break; case HDFS_CACHE_POOL: // HdfsCachePools just contain the name strings. Publish them as minimal objects. return obj; @@ -3649,6 +3649,38 @@ public class CatalogServiceCatalog extends Catalog { versionLock_.readLock().unlock(); } } + case DATA_SOURCE: { + TDataSource dsDesc = Preconditions.checkNotNull(req.object_desc.data_source); + versionLock_.readLock().lock(); + try { + TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse(); + if (dsDesc.getName() == null || dsDesc.getName().isEmpty()) { + // Return all DataSource objects. + List<DataSource> data_srcs = getDataSources(); + if (data_srcs == null || data_srcs.isEmpty()) { + resp.setData_srcs(Collections.emptyList()); + } else { + List<TDataSource> thriftDataSrcs = + Lists.newArrayListWithCapacity(data_srcs.size()); + for (DataSource ds : data_srcs) thriftDataSrcs.add(ds.toThrift()); + resp.setData_srcs(thriftDataSrcs); + } + } else { + // Return the DataSource for the given name. + DataSource ds = getDataSource(dsDesc.getName()); + if (ds == null) { + resp.setData_srcs(Collections.emptyList()); + } else { + List<TDataSource> thriftDataSrcs = Lists.newArrayListWithCapacity(1); + thriftDataSrcs.add(ds.toThrift()); + resp.setData_srcs(thriftDataSrcs); + } + } + return resp; + } finally { + versionLock_.readLock().unlock(); + } + } default: throw new CatalogException("Unable to fetch partial info for type: " + req.object_desc.type); @@ -3756,7 +3788,7 @@ public class CatalogServiceCatalog extends Catalog { /** * Return a partial view of information about global parts of the catalog (eg - * the list of tables, etc). + * the list of database names, etc). */ private TGetPartialCatalogObjectResponse getPartialCatalogInfo( TGetPartialCatalogObjectRequest req) { diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index 218ffc4d5..1e10169c0 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -57,6 +57,7 @@ import org.apache.impala.catalog.Catalog; import org.apache.impala.catalog.CatalogDeltaLog; import org.apache.impala.catalog.CatalogException; import org.apache.impala.catalog.CatalogObjectCache; +import org.apache.impala.catalog.DataSource; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HdfsCachePool; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; @@ -81,6 +82,7 @@ import org.apache.impala.thrift.TCatalogInfoSelector; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TDataSource; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TDbInfoSelector; import org.apache.impala.thrift.TErrorCode; @@ -236,6 +238,8 @@ public class CatalogdMetaProvider implements MetaProvider { private static final String FUNCTIONS_STATS_CATEGORY = "Functions"; private static final String RPC_STATS_CATEGORY = "RPCs"; private static final String STORAGE_METADATA_LOAD_CATEGORY = "StorageLoad"; + private static final String DATA_SOURCE_LIST_STATS_CATEGORY = "DataSourceLists"; + private static final Object DS_OBJ_LIST_CACHE_KEY = new Object(); private static final String RPC_REQUESTS = CATALOG_FETCH_PREFIX + "." + RPC_STATS_CATEGORY + ".Requests"; private static final String RPC_BYTES = @@ -455,6 +459,7 @@ public class CatalogdMetaProvider implements MetaProvider { case TABLE_NOT_FOUND: case TABLE_NOT_LOADED: case PARTITION_NOT_FOUND: + case DATA_SOURCE_NOT_FOUND: invalidateCacheForObject(req.object_desc); throw new InconsistentMetadataFetchException( String.format("Fetching %s failed. Could not find %s", @@ -665,6 +670,14 @@ public class CatalogdMetaProvider implements MetaProvider { return req; } + private TGetPartialCatalogObjectRequest newReqForDataSource(String dsName) { + TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest(); + req.object_desc = new TCatalogObject(); + req.object_desc.setType(TCatalogObjectType.DATA_SOURCE); + if (dsName == null) dsName = ""; + req.object_desc.setData_source(new TDataSource(dsName, "", "", "")); + return req; + } @Override public Database loadDb(final String dbName) throws TException { @@ -1213,6 +1226,51 @@ public class CatalogdMetaProvider implements MetaProvider { return funcs.build(); } + /** + * Get all DataSource objects from catalogd. + */ + public ImmutableList<DataSource> loadDataSources() throws TException { + ImmutableList<TDataSource> thriftDataSrcs = loadWithCaching( + "DataSource object list", DATA_SOURCE_LIST_STATS_CATEGORY, + DS_OBJ_LIST_CACHE_KEY, new Callable<ImmutableList<TDataSource>>() { + @Override + public ImmutableList<TDataSource> call() throws Exception { + TGetPartialCatalogObjectRequest req = newReqForDataSource(null); + TGetPartialCatalogObjectResponse resp = sendRequest(req); + checkResponse(resp.data_srcs != null, req, "no existing DataSource"); + return ImmutableList.copyOf(resp.data_srcs); + } + }); + ImmutableList.Builder<DataSource> dataSrcBld = ImmutableList.builder(); + for (TDataSource thriftDataSrc : thriftDataSrcs) { + dataSrcBld.add(DataSource.fromThrift(thriftDataSrc)); + } + return dataSrcBld.build(); + } + + /** + * Get the DataSource object from catalogd for the given DataSource name. + */ + public DataSource loadDataSource(String dsName) throws TException { + Preconditions.checkState(dsName != null && !dsName.isEmpty()); + return loadWithCaching("DataSource object for " + dsName, + DATA_SOURCE_LIST_STATS_CATEGORY, + new DataSourceCacheKey(dsName.toLowerCase()), new Callable<DataSource>() { + @Override + public DataSource call() throws Exception { + TGetPartialCatalogObjectRequest req = newReqForDataSource(dsName); + TGetPartialCatalogObjectResponse resp = sendRequest(req); + checkResponse(resp.data_srcs != null, req, "missing expected DataSource"); + if (resp.data_srcs.size() == 1) { + return DataSource.fromThrift(resp.data_srcs.get(0)); + } else { + Preconditions.checkState(resp.data_srcs.size() == 0); + return null; + } + } + }); + } + /** * Invalidate portions of the cache as indicated by the provided request. * @@ -1519,6 +1577,12 @@ public class CatalogdMetaProvider implements MetaProvider { DbCacheKey.DbInfoType.HMS_METADATA, DbCacheKey.DbInfoType.FUNCTION_NAMES), invalidated); break; + case DATA_SOURCE: + if (cache_.asMap().remove(DS_OBJ_LIST_CACHE_KEY) != null) { + invalidated.add("list of DataSource objects"); + } + invalidateCacheForDataSource(obj.data_source.name, invalidated); + break; default: break; } @@ -1580,6 +1644,18 @@ public class CatalogdMetaProvider implements MetaProvider { } } + /** + * Invalidate cached DataSource for the given DataSource name. If anything was + * invalidated, adds a human-readable string to 'invalidated' indicating the + * invalidated DataSource. + */ + private void invalidateCacheForDataSource(String dsName, List<String> invalidated) { + DataSourceCacheKey key = new DataSourceCacheKey(dsName.toLowerCase()); + if (cache_.asMap().remove(key) != null) { + invalidated.add("DataSource " + dsName); + } + } + /** * Reference to a partition within a table. We remember the partition's ID and pass * that back to the catalog in subsequent requests back to fetch the details of the @@ -2016,6 +2092,30 @@ public class CatalogdMetaProvider implements MetaProvider { } } + /** + * Cache key for a DataSource object. + */ + @Immutable + private static class DataSourceCacheKey { + private final String dsName_; + + DataSourceCacheKey(String dsName) { + dsName_ = dsName; + } + + @Override + public int hashCode() { + return Objects.hashCode(dsName_, getClass()); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof DataSourceCacheKey)) return false; + DataSourceCacheKey other = (DataSourceCacheKey)obj; + return dsName_.equals(other.dsName_); + } + } + @VisibleForTesting static class SizeOfWeigher implements Weigher<Object, Object> { // Bypass flyweight objects like small boxed integers, Boolean.TRUE, enums, etc. diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java index 90bdba126..6fa3f098e 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; import org.apache.impala.authorization.AuthorizationPolicy; import org.apache.impala.catalog.CatalogException; +import org.apache.impala.catalog.DataSource; import org.apache.impala.catalog.FileMetadataLoader; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HdfsCachePool; @@ -335,6 +336,20 @@ class DirectMetaProvider implements MetaProvider { "Functions not supported by DirectMetaProvider"); } + @Override + public ImmutableList<DataSource> loadDataSources() throws TException { + // See above. + throw new UnsupportedOperationException( + "DataSource not supported by DirectMetaProvider"); + } + + @Override + public DataSource loadDataSource(String dsName) throws TException { + // See above. + throw new UnsupportedOperationException( + "DataSource not supported by DirectMetaProvider"); + } + @Override public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef table, List<String> colNames) throws TException { diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java index c194e5aae..26f86823e 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java @@ -27,6 +27,7 @@ import org.apache.impala.authorization.AuthorizationPolicy; import org.apache.impala.catalog.BuiltinsDb; import org.apache.impala.catalog.Catalog; import org.apache.impala.catalog.CatalogException; +import org.apache.impala.catalog.DataSource; import org.apache.impala.catalog.DatabaseNotFoundException; import org.apache.impala.catalog.Db; import org.apache.impala.catalog.FeCatalog; @@ -43,14 +44,19 @@ import org.apache.impala.catalog.PartitionNotFoundException; import org.apache.impala.catalog.PrunablePartition; import org.apache.impala.common.InternalException; import org.apache.impala.thrift.TCatalogObject; +import org.apache.impala.thrift.TCatalogObjectType; +import org.apache.impala.thrift.TDataSource; import org.apache.impala.thrift.TGetPartitionStatsResponse; import org.apache.impala.thrift.TPartitionKeyValue; import org.apache.impala.thrift.TUniqueId; import org.apache.impala.util.PatternMatcher; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * Implementation of FeCatalog which runs within the impalad and fetches metadata @@ -68,6 +74,8 @@ import com.google.common.base.Preconditions; * returned from its methods. */ public class LocalCatalog implements FeCatalog { + private final static Logger LOG = LoggerFactory.getLogger(LocalCatalog.class); + private final MetaProvider metaProvider_; private Map<String, FeDb> dbs_ = new HashMap<>(); private Map<String, HdfsCachePool> hdfsCachePools_ = null; @@ -108,7 +116,6 @@ public class LocalCatalog implements FeCatalog { dbs_ = dbs; } - @Override public List<String> getTableNames(String dbName, PatternMatcher matcher) throws DatabaseNotFoundException { @@ -152,7 +159,29 @@ public class LocalCatalog implements FeCatalog { throws CatalogException { // TODO(todd): this probably makes the /catalog page not load with an error. // We should probably disable that page in local-catalog mode. - throw new UnsupportedOperationException("LocalCatalog.getTCatalogObject"); + Preconditions.checkNotNull(objectDesc, "invalid objectDesc"); + if (objectDesc.type == TCatalogObjectType.DATA_SOURCE) { + // This function could be called by backend function + // CatalogOpExecutor::HandleDropDataSource() when cleaning jar file of data source. + TDataSource dsDesc = Preconditions.checkNotNull(objectDesc.data_source); + String dsName = dsDesc.getName(); + if (dsName != null && !dsName.isEmpty()) { + try { + DataSource ds = metaProvider_.loadDataSource(dsName); + if (ds != null) { + TCatalogObject resultObj = new TCatalogObject(); + resultObj.setType(TCatalogObjectType.DATA_SOURCE); + resultObj.setData_source(ds.toThrift()); + return resultObj; + } + } catch (Exception e) { + LOG.info("Data source not found: " + dsName + ", " + e.getMessage()); + } + } + throw new CatalogException("Data source not found: " + dsName); + } else { + throw new UnsupportedOperationException("LocalCatalog.getTCatalogObject"); + } } @Override @@ -195,14 +224,26 @@ public class LocalCatalog implements FeCatalog { } @Override - public List<? extends FeDataSource> getDataSources( - PatternMatcher createHivePatternMatcher) { - throw new UnsupportedOperationException("TODO"); + public List<? extends FeDataSource> getDataSources(PatternMatcher matcher) { + try { + List<DataSource> dataSrcs = metaProvider_.loadDataSources(); + return Catalog.filterCatalogObjectsByPattern(dataSrcs, matcher); + } catch (Exception e) { + LOG.info("Unable to load DataSource objects, ", e); + // Return empty list. + return Lists.newArrayList(); + } } @Override - public FeDataSource getDataSource(String dataSourceName) { - throw new UnsupportedOperationException("TODO"); + public FeDataSource getDataSource(String dsName) { + Preconditions.checkNotNull(dsName); + try { + return metaProvider_.loadDataSource(dsName); + } catch (Exception e) { + LOG.info("DataSource not found: " + dsName, e); + return null; + } } @Override diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalDataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalDataSourceTable.java new file mode 100644 index 000000000..93a2d6924 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDataSourceTable.java @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog.local; + +import java.util.Set; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.impala.catalog.DataSourceTable; +import org.apache.impala.catalog.FeCatalogUtils; +import org.apache.impala.catalog.FeDataSourceTable; +import org.apache.impala.catalog.local.MetaProvider.TableMetaRef; +import org.apache.impala.catalog.TableLoadingException; +import org.apache.impala.catalog.Type; +import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TDataSource; +import org.apache.impala.thrift.TDataSourceTable; +import org.apache.impala.thrift.TResultSet; +import org.apache.impala.thrift.TResultSetMetadata; +import org.apache.impala.thrift.TTableDescriptor; +import org.apache.impala.thrift.TTableType; +import org.apache.impala.util.TResultRowBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * DataSource table instance loaded from {@link LocalCatalog}. + * + * All DataSource properties are stored as table properties (persisted in the + * metastore). Tables that contain the TBL_PROP_DATA_SRC_NAME table parameter are + * assumed to be backed by an external data source. + */ +public class LocalDataSourceTable extends LocalTable implements FeDataSourceTable { + private final static Logger LOG = LoggerFactory.getLogger(LocalDataSourceTable.class); + + private String initString_; + private TDataSource dataSource_; + + public static LocalDataSourceTable load(LocalDb db, Table msTbl, TableMetaRef ref) + throws TableLoadingException { + Preconditions.checkNotNull(db); + Preconditions.checkNotNull(msTbl); + if (LOG.isTraceEnabled()) { + LOG.trace("load table: " + msTbl.getDbName() + "." + msTbl.getTableName()); + } + if (msTbl.getPartitionKeysSize() > 0) { + throw new TableLoadingException("Data source table cannot contain clustering " + + "columns: " + msTbl.getTableName()); + } + return new LocalDataSourceTable(db, msTbl, ref); + } + + private LocalDataSourceTable(LocalDb db, Table msTbl, TableMetaRef ref) + throws TableLoadingException { + super(db, msTbl, ref); + + String dataSourceName = getRequiredTableProperty( + msTbl, DataSourceTable.TBL_PROP_DATA_SRC_NAME, null); + String location = getRequiredTableProperty( + msTbl, DataSourceTable.TBL_PROP_LOCATION, dataSourceName); + String className = getRequiredTableProperty( + msTbl, DataSourceTable.TBL_PROP_CLASS, dataSourceName); + String apiVersionString = getRequiredTableProperty( + msTbl, DataSourceTable.TBL_PROP_API_VER, dataSourceName); + dataSource_ = new TDataSource(dataSourceName, location, className, apiVersionString); + initString_ = getRequiredTableProperty( + msTbl, DataSourceTable.TBL_PROP_INIT_STRING, dataSourceName); + } + + private String getRequiredTableProperty(Table msTbl, String key, String dataSourceName) + throws TableLoadingException { + String val = msTbl.getParameters().get(key); + if (val == null) { + throw new TableLoadingException(String.format("Failed to load table %s " + + "produced by external data source %s. Missing required metadata: %s", + msTbl.getTableName(), + dataSourceName == null ? "<unknown>" : dataSourceName, key)); + } + return val; + } + + /** + * Gets the DataSource object. + */ + @Override // FeDataSourceTable + public TDataSource getDataSource() { return dataSource_; } + + /** + * Gets the table init string passed to the DataSource. + */ + @Override // FeDataSourceTable + public String getInitString() { return initString_; } + + @Override // FeDataSourceTable + public int getNumNodes() { return 1; } + + /** + * Returns statistics on this table as a tabular result set. Used for the + * SHOW TABLE STATS statement. The schema of the returned TResultSet is set + * inside this method. + */ + @Override // FeDataSourceTable + public TResultSet getTableStats() { + TResultSet result = new TResultSet(); + TResultSetMetadata resultSchema = new TResultSetMetadata(); + resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift())); + result.setSchema(resultSchema); + TResultRowBuilder rowBuilder = new TResultRowBuilder(); + rowBuilder.add(getNumRows()); + result.addToRows(rowBuilder.get()); + return result; + } + + @Override + public TTableDescriptor toThriftDescriptor( + int tableId, Set<Long> referencedPartitions) { + TTableDescriptor tableDesc = new TTableDescriptor(tableId, + TTableType.DATA_SOURCE_TABLE, FeCatalogUtils.getTColumnDescriptors(this), + getNumClusteringCols(), getName(), getDb().getName()); + tableDesc.setDataSourceTable(getDataSourceTable()); + return tableDesc; + } + + /** + * Returns a thrift {@link TDataSourceTable} structure for this DataSource table. + */ + private TDataSourceTable getDataSourceTable() { + return new TDataSourceTable(dataSource_, initString_); + } +} diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java index 5e56cd9e9..62731ec4b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java @@ -115,7 +115,7 @@ abstract class LocalTable implements FeTable { } else if (IcebergTable.isIcebergTable(msTbl)) { t = LocalIcebergTable.loadIcebergTableViaMetaProvider(db, msTbl, ref); } else if (DataSourceTable.isDataSourceTable(msTbl)) { - // TODO(todd) support datasource table + t = LocalDataSourceTable.load(db, msTbl, ref); } else if (HdfsFileFormat.isHdfsInputFormatClass( msTbl.getSd().getInputFormat())) { t = LocalFsTable.load(db, msTbl, ref); diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java index 444ff0556..a9d8c6671 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.impala.authorization.AuthorizationPolicy; import org.apache.impala.catalog.CatalogException; +import org.apache.impala.catalog.DataSource; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HdfsCachePool; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; @@ -106,6 +107,16 @@ public interface MetaProvider { ImmutableList<Function> loadFunction(String dbName, String functionName) throws TException; + /** + * Retrieve all DataSource objects. + */ + ImmutableList<DataSource> loadDataSources() throws TException; + + /** + * Retrieve the DataSource object for the given DataSource name. + */ + DataSource loadDataSource(String dsName) throws TException; + /** * Load the given partitions from the specified table. * diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 2ef60d740..b958c352c 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -2453,7 +2453,6 @@ public class CatalogOpExecutor { private void createDataSource(TCreateDataSourceParams params, TDdlExecResponse resp) throws ImpalaException { - // TODO(IMPALA-7131): support data sources with LocalCatalog. if (LOG.isTraceEnabled()) { LOG.trace("Adding DATA SOURCE: " + params.toString()); } DataSource dataSource = DataSource.fromThrift(params.getData_source()); DataSource existingDataSource = catalog_.getDataSource(dataSource.getName()); @@ -2475,7 +2474,6 @@ public class CatalogOpExecutor { private void dropDataSource(TDropDataSourceParams params, TDdlExecResponse resp) throws ImpalaException { - // TODO(IMPALA-7131): support data sources with LocalCatalog. if (LOG.isTraceEnabled()) LOG.trace("Drop DATA SOURCE: " + params.toString()); DataSource dataSource = catalog_.removeDataSource(params.getData_source()); if (dataSource == null) { diff --git a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test index d1469e81e..47de8f97f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test +++ b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test @@ -1,5 +1,49 @@ ==== ---- QUERY +# Create DataSource +DROP DATA SOURCE IF EXISTS TestGenericDataSource; +CREATE DATA SOURCE TestGenericDataSource +LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/test-data-source.jar' +CLASS 'org.apache.impala.extdatasource.AllTypesDataSource' +API_VERSION 'V1'; +---- RESULTS +'Data source has been created.' +==== +---- QUERY +# Show created DataSource +SHOW DATA SOURCES LIKE 'testgenericdatasource'; +---- LABELS +NAME,LOCATION,CLASS NAME,API VERSION +---- RESULTS +'testgenericdatasource',regex:'.*/test-warehouse/data-sources/test-data-source.jar','org.apache.impala.extdatasource.AllTypesDataSource','V1' +---- TYPES +STRING,STRING,STRING,STRING +==== +---- QUERY +# Create DataSource table +DROP TABLE IF EXISTS alltypes_datasource; +CREATE TABLE alltypes_datasource ( + id INT, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + timestamp_col TIMESTAMP, + string_col STRING, + dec_col1 DECIMAL(9,0), + dec_col2 DECIMAL(10,0), + dec_col3 DECIMAL(20,10), + dec_col4 DECIMAL(38,37), + dec_col5 DECIMAL(10,5), + date_col DATE) +PRODUCED BY DATA SOURCE TestGenericDataSource("TestInitString"); +---- RESULTS +'Table has been created.' +==== +---- QUERY # Gets all types including a row with a NULL value. The predicate pushed to # the data source is not actually used, but the second predicate is # evaluated by Impala. @@ -129,3 +173,15 @@ where smallint_col IS DISTINCT FROM 11 and tinyint_col IS DISTINCT FROM 1) ---- TYPES BIGINT ==== +---- QUERY +# Drop table +DROP TABLE alltypes_datasource; +---- RESULTS +'Table has been dropped.' +==== +---- QUERY +# Drop DataSource +DROP DATA SOURCE TestGenericDataSource; +---- RESULTS +'Data source has been dropped.' +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test index 2bd068234..efc4a77d6 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test +++ b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test @@ -1,7 +1,79 @@ ==== ---- QUERY -# Test the jdbc data source +# Create DataSource +DROP DATA SOURCE IF EXISTS TestJdbcDataSource; +CREATE DATA SOURCE TestJdbcDataSource +LOCATION '$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-data-source.jar' +CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' +API_VERSION 'V1'; +---- RESULTS +'Data source has been created.' +==== +---- QUERY +# Show created DataSource +SHOW DATA SOURCES LIKE 'testjdbcdatasource'; +---- LABELS +NAME,LOCATION,CLASS NAME,API VERSION +---- RESULTS +'testjdbcdatasource',regex:'.*/test-warehouse/data-sources/jdbc-data-source.jar','org.apache.impala.extdatasource.jdbc.JdbcDataSource','V1' +---- TYPES +STRING,STRING,STRING,STRING +==== ---- QUERY +# Create external JDBC DataSource table +DROP TABLE IF EXISTS alltypes_jdbc_datasource; +CREATE TABLE alltypes_jdbc_datasource ( + id INT, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + date_string_col STRING, + string_col STRING, + timestamp_col TIMESTAMP) +PRODUCED BY DATA SOURCE TestJdbcDataSource( +'{"database.type":"POSTGRES", +"jdbc.url":"jdbc:postgresql://$INTERNAL_LISTEN_HOST:5432/functional", +"jdbc.driver":"org.postgresql.Driver", +"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar", +"dbcp.username":"hiveuser", +"dbcp.password":"password", +"table":"alltypes"}'); +---- RESULTS +'Table has been created.' +==== +---- QUERY +# Create external JDBC DataSource table +DROP TABLE IF EXISTS alltypes_jdbc_datasource_2; +CREATE TABLE alltypes_jdbc_datasource_2 ( + id INT, + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + date_string_col STRING, + string_col STRING, + timestamp_col TIMESTAMP) +PRODUCED BY DATA SOURCE TestJdbcDataSource( +'{"database.type":"POSTGRES", +"jdbc.url":"jdbc:postgresql://$INTERNAL_LISTEN_HOST:5432/functional", +"jdbc.driver":"org.postgresql.Driver", +"driver.url":"$FILESYSTEM_PREFIX/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar", +"dbcp.username":"hiveuser", +"dbcp.password":"password", +"table":"AllTypesWithQuote", +"column.mapping":"id=id, bool_col=Bool_col, tinyint_col=Tinyint_col, smallint_col=Smallint_col, int_col=Int_col, bigint_col=Bigint_col, float_col=Float_col, double_col=Double_col, date_string_col=Date_string_col, string_col=String_col, timestamp=Timestamp"}'); +---- RESULTS +'Table has been created.' +==== +---- QUERY +# Test the jdbc DataSource # count(*) with a predicate evaluated by Impala select count(*) from alltypes_jdbc_datasource where float_col = 0 and string_col is not NULL @@ -20,7 +92,7 @@ BIGINT ==== ---- QUERY # Gets all types including a row with a NULL value. The predicate pushed to -# the data source. +# the DataSource. select * from alltypes_jdbc_datasource where id > 10 and int_col< 5 limit 5 @@ -103,3 +175,21 @@ order by a.id, b.id limit 10 ---- TYPES INT, INT ==== +---- QUERY +# Drop table +DROP TABLE alltypes_jdbc_datasource; +---- RESULTS +'Table has been dropped.' +==== +---- QUERY +# Drop table +DROP TABLE alltypes_jdbc_datasource_2; +---- RESULTS +'Table has been dropped.' +==== +---- QUERY +# Drop DataSource +DROP DATA SOURCE TestJdbcDataSource; +---- RESULTS +'Data source has been dropped.' +==== diff --git a/tests/common/skip.py b/tests/common/skip.py index 2b1ac76cb..a5980d5e9 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -220,13 +220,6 @@ class SkipIfCatalogV2: IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(), reason="Test is specific to old implementation of catalog.") - # TODO: IMPALA-7131: add support or update tests to reflect expected behaviour. - @classmethod - def data_sources_unsupported(self): - return pytest.mark.skipif( - IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(), - reason="IMPALA-7131: data sources not supported.") - # TODO: IMPALA-8489: fix this bug. @classmethod def impala_8489(self): diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py new file mode 100644 index 000000000..b8b465cfb --- /dev/null +++ b/tests/custom_cluster/test_ext_data_sources.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function +import pytest + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.skip import SkipIf + + +class TestExtDataSources(CustomClusterTestSuite): + """Impala query tests for external data sources.""" + + @classmethod + def get_workload(self): + return 'functional-query' + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal") + def test_data_source_tables(self, vector, unique_database): + """Start Impala cluster in LocalCatalog Mode""" + self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database) + + @SkipIf.not_hdfs + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal") + def test_jdbc_data_source(self, vector, unique_database): + """Start Impala cluster in LocalCatalog Mode""" + self.run_test_case('QueryTest/jdbc-data-source', vector, use_db=unique_database) diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index 308ed1e1c..1cb6fa3dd 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -1168,7 +1168,6 @@ class TestLibCache(TestDdlBase): # Run serially because this test inspects global impalad metrics. # TODO: The metrics checks could be relaxed to enable running this test in # parallel, but that might need a more general wait_for_metric_value(). - @SkipIfCatalogV2.data_sources_unsupported() @pytest.mark.execute_serially def test_create_drop_data_src(self, vector, unique_database): """This will create, run, and drop the same data source repeatedly, exercising diff --git a/tests/metadata/test_metadata_query_statements.py b/tests/metadata/test_metadata_query_statements.py index 071339f37..aa3608a62 100644 --- a/tests/metadata/test_metadata_query_statements.py +++ b/tests/metadata/test_metadata_query_statements.py @@ -146,7 +146,6 @@ class TestMetadataQueryStatements(ImpalaTestSuite): compare=compare_describe_formatted) @pytest.mark.execute_serially # due to data src setup/teardown - @SkipIfCatalogV2.data_sources_unsupported() def test_show_data_sources(self, vector): try: self.__create_data_sources() diff --git a/tests/query_test/test_ext_data_sources.py b/tests/query_test/test_ext_data_sources.py index 17587bc91..b3a3a4260 100644 --- a/tests/query_test/test_ext_data_sources.py +++ b/tests/query_test/test_ext_data_sources.py @@ -17,9 +17,12 @@ from __future__ import absolute_import, division, print_function +import re + from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfCatalogV2, SkipIf +from tests.common.skip import SkipIf, SkipIfDockerizedCluster from tests.common.test_dimensions import create_uncompressed_text_dimension +from tests.util.filesystem_utils import FILESYSTEM_PREFIX class TestExtDataSources(ImpalaTestSuite): @@ -59,15 +62,15 @@ class TestExtDataSources(ImpalaTestSuite): properties[fields[1].rstrip()] = fields[2].rstrip() return properties - @SkipIfCatalogV2.data_sources_unsupported() @SkipIf.not_hdfs def test_verify_jdbc_table_properties(self, vector): jdbc_tbl_name = "functional.alltypes_jdbc_datasource" properties = self._get_tbl_properties(jdbc_tbl_name) # Verify data source related table properties assert properties['__IMPALA_DATA_SOURCE_NAME'] == 'jdbcdatasource' - assert properties['__IMPALA_DATA_SOURCE_LOCATION'] == \ - 'hdfs://localhost:20500/test-warehouse/data-sources/jdbc-data-source.jar' + expected_location =\ + "{0}/test-warehouse/data-sources/jdbc-data-source.jar".format(FILESYSTEM_PREFIX) + assert re.search(expected_location, properties['__IMPALA_DATA_SOURCE_LOCATION']) assert properties['__IMPALA_DATA_SOURCE_CLASS'] == \ 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' assert properties['__IMPALA_DATA_SOURCE_API_VERSION'] == 'V1' @@ -76,11 +79,15 @@ class TestExtDataSources(ImpalaTestSuite): assert 'table\\":\\"alltypes' \ in properties['__IMPALA_DATA_SOURCE_INIT_STRING'] - @SkipIfCatalogV2.data_sources_unsupported() - def test_data_source_tables(self, vector): - self.run_test_case('QueryTest/data-source-tables', vector) + def test_data_source_tables(self, vector, unique_database): + self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database) - @SkipIfCatalogV2.data_sources_unsupported() + # The test uses pre-written jdbc external tables where the jdbc.url refers to Postgres + # server via full URI, i.e. url starts with 'jdbc:postgresql://hostname:5432/'. In the + # dockerised environment, the Postgres server is running on a different host. It is + # configured to accept only local connection. Have to skip this test for dockerised + # cluster since Postgres server is not accessible from impalad. + @SkipIfDockerizedCluster.internal_hostname @SkipIf.not_hdfs - def test_jdbc_data_source(self, vector): - self.run_test_case('QueryTest/jdbc-data-source', vector) + def test_jdbc_data_source(self, vector, unique_database): + self.run_test_case('QueryTest/jdbc-data-source', vector, use_db=unique_database)
