This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 39adf42a30765208e51e970339e950aae8544848 Author: gaurav1086 <[email protected]> AuthorDate: Tue Oct 31 13:24:31 2023 -0700 IMPALA-12470: Support different schemes for jdbc driver url when creating external jdbc table This patch builds on top of IMPALA-5741 to copy the jdbc jar from remote filesystems: Ozone and S3. Currenty we only support hdfs. Testing: Commented out "@skipif.not_hdfs" qualifier in files: - tests/query_test/test_ext_data_sources.py - tests/custom_cluster/test_ext_data_sources.py 1) tested locally by running tests: - impala-py.test tests/query_test/test_ext_data_sources.py - impala-py.test tests/custom_cluster/test_ext_data_sources.py 2) tested using jenkins job for ozone and S3 Change-Id: I804fa3d239a4bedcd31569f2b46edb7316d7f004 Reviewed-on: http://gerrit.cloudera.org:8080/20639 Reviewed-by: Wenzhe Zhou <[email protected]> Tested-by: Wenzhe Zhou <[email protected]> --- .../org/apache/impala/common/FileSystemUtil.java | 29 ++++++++++++++++++++++ .../apache/impala/planner/DataSourceScanNode.java | 21 ++++++++++------ .../jdbc/dao/GenericJdbcDatabaseAccessor.java | 21 ++++++++++------ testdata/bin/create-load-data.sh | 13 ++++++---- tests/custom_cluster/test_ext_data_sources.py | 2 -- tests/query_test/test_ext_data_sources.py | 7 +----- 6 files changed, 64 insertions(+), 29 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index 519449237..39338e00b 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -785,6 +786,34 @@ public class FileSystemUtil { fs.copyToLocalFile(source, dest); } + /** + * Copies the source file with the given URI to tmp directory on the local filesystem. + * Returns local file path. + * Throws IOException on failure. + */ + public static String copyFileFromUriToLocal(String srcUri) + throws IOException { + Preconditions.checkNotNull(srcUri); + String localLibPath = BackendConfig.INSTANCE.getBackendCfg().local_library_path; + String fileExt = FilenameUtils.getExtension(srcUri); + String localPath; + if (localLibPath != null && !localLibPath.isEmpty()) { + localPath = localLibPath + "/" + UUID.randomUUID().toString() + "." + fileExt; + } else { + localPath = "/tmp/" + UUID.randomUUID().toString() + "." + fileExt; + } + try { + Path remoteFilePath = new Path(srcUri); + Path localFilePath = new Path("file://" + localPath); + FileSystemUtil.copyToLocal(remoteFilePath, localFilePath); + } catch (IOException e) { + String errorMsg = "Failed to copy " + srcUri + " to local path: " + localPath; + LOG.error(errorMsg, e); + throw new IOException(String.format("%s, %s", errorMsg, e.getMessage())); + } + return localPath; + } + /** * Delete the file at 'path' if it exists. */ diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java index 7cafa1840..df95c2268 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java @@ -17,12 +17,14 @@ package org.apache.impala.planner; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BinaryPredicate; import org.apache.impala.analysis.BoolLiteral; @@ -35,6 +37,7 @@ import org.apache.impala.analysis.StringLiteral; import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.catalog.DataSource; import org.apache.impala.catalog.FeDataSourceTable; +import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; import org.apache.impala.extdatasource.ExternalDataSourceExecutor; @@ -163,16 +166,14 @@ public class DataSourceScanNode extends ScanNode { } } - String hdfsLocation = table_.getDataSource().getHdfs_location(); - TCacheJarResult cacheResult = FeSupport.CacheJar(hdfsLocation); - TStatus cacheJarStatus = cacheResult.getStatus(); - if (cacheJarStatus.getStatus_code() != TErrorCode.OK) { + String dsLocation = table_.getDataSource().getHdfs_location(); + String localPath; + try { + localPath = FileSystemUtil.copyFileFromUriToLocal(dsLocation); + } catch (IOException e) { throw new InternalException(String.format( - "Unable to cache data source library at location '%s'. Check that the file " + - "exists and is readable. Message: %s", - hdfsLocation, Joiner.on("\n").join(cacheJarStatus.getError_msgs()))); + "Unable to fetch data source jar from location '%s'.", dsLocation)); } - String localPath = cacheResult.getLocal_path(); String className = table_.getDataSource().getClass_name(); String apiVersion = table_.getDataSource().getApi_version(); TPrepareResult prepareResult; @@ -191,6 +192,10 @@ public class DataSourceScanNode extends ScanNode { throw new InternalException(String.format( "Error calling prepare() on data source %s", DataSource.debugString(table_.getDataSource())), e); + } finally { + // Delete the jar file once its loaded + Path localJarPath = new Path(localPath); + FileSystemUtil.deleteIfExists(localJarPath); } if (prepareStatus.getStatus_code() != TErrorCode.OK) { throw new InternalException(String.format( diff --git a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java index 5f74a2a6e..178a44ed6 100644 --- a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java +++ b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java @@ -17,8 +17,8 @@ package org.apache.impala.extdatasource.jdbc.dao; - import java.io.File; +import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; import java.sql.Connection; @@ -36,6 +36,8 @@ import javax.sql.DataSource; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.impala.common.FileSystemUtil; import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig; import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfigManager; import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException; @@ -231,15 +233,15 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor { BasicDataSourceFactory.createDataSource(props); // Put jdbc driver to cache String driverUrl = props.getProperty("driverUrl"); - TCacheJarResult cacheResult = FeSupport.CacheJar(driverUrl); - TStatus cacheJarStatus = cacheResult.getStatus(); - if (cacheJarStatus.getStatus_code() != TErrorCode.OK) { + String driverLocalPath; + try { + driverLocalPath = + FileSystemUtil.copyFileFromUriToLocal(driverUrl); + } catch (IOException e) { throw new JdbcDatabaseAccessException(String.format( - "Unable to cache jdbc driver jar at location '%s'. " + - "Check that the file exists and is readable. Message: %s", - driverUrl, cacheJarStatus.getError_msgs())); + "Unable to fetch jdbc driver jar from location '%s'. ", + driverUrl)); } - String driverLocalPath = cacheResult.getLocal_path(); // Create class loader for jdbc driver and set it for the // BasicDataSource object so that the driver class could be loaded // from jar file without searching classpath. @@ -248,6 +250,9 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor { URLClassLoader.newInstance( new URL[] { driverJarUrl }, getClass().getClassLoader()); basicDataSource.setDriverClassLoader(driverLoader); + // Delete the jar file once its loaded + Path localJarPath = new Path(driverLocalPath); + FileSystemUtil.deleteIfExists(localJarPath); return basicDataSource; }); } diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh index 9d87c06cf..541d88b7b 100755 --- a/testdata/bin/create-load-data.sh +++ b/testdata/bin/create-load-data.sh @@ -614,11 +614,6 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then # Caching tables in s3 returns an IllegalArgumentException, see IMPALA-1714 run-step "Caching test tables" cache-test-tables.log cache-test-tables - # TODO: Modify the .sql file that creates the table to take an alternative location into - # account. - run-step "Loading external data sources" load-ext-data-source.log \ - copy-and-load-ext-data-source - run-step "Creating internal HBase table" create-internal-hbase-table.log \ create-internal-hbase-table @@ -628,6 +623,14 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then run-step "Logging created files" created-files.log hdfs dfs -ls -R /test-warehouse fi +if [[ "${TARGET_FILESYSTEM}" = "hdfs" || "${TARGET_FILESYSTEM}" = "ozone" || \ + "${TARGET_FILESYSTEM}" = "s3" ]]; then + # TODO: Modify the .sql file that creates the table to take an alternative location into + # account. + run-step "Loading external data sources" load-ext-data-source.log \ + copy-and-load-ext-data-source +fi + # TODO: Investigate why all stats are not preserved. Theoretically, we only need to # recompute stats for HBase. run-step "Computing table stats" compute-table-stats.log \ diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py index b8b465cfb..ec74a78f7 100644 --- a/tests/custom_cluster/test_ext_data_sources.py +++ b/tests/custom_cluster/test_ext_data_sources.py @@ -19,7 +19,6 @@ from __future__ import absolute_import, division, print_function import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.skip import SkipIf class TestExtDataSources(CustomClusterTestSuite): @@ -37,7 +36,6 @@ class TestExtDataSources(CustomClusterTestSuite): """Start Impala cluster in LocalCatalog Mode""" self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database) - @SkipIf.not_hdfs @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", diff --git a/tests/query_test/test_ext_data_sources.py b/tests/query_test/test_ext_data_sources.py index 985e07cb2..094f0cd67 100644 --- a/tests/query_test/test_ext_data_sources.py +++ b/tests/query_test/test_ext_data_sources.py @@ -20,9 +20,7 @@ from __future__ import absolute_import, division, print_function import re from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIf from tests.common.test_dimensions import create_uncompressed_text_dimension -from tests.util.filesystem_utils import FILESYSTEM_PREFIX class TestExtDataSources(ImpalaTestSuite): @@ -62,14 +60,12 @@ class TestExtDataSources(ImpalaTestSuite): properties[fields[1].rstrip()] = fields[2].rstrip() return properties - @SkipIf.not_hdfs def test_verify_jdbc_table_properties(self, vector): jdbc_tbl_name = "functional.alltypes_jdbc_datasource" properties = self._get_tbl_properties(jdbc_tbl_name) # Verify data source related table properties assert properties['__IMPALA_DATA_SOURCE_NAME'] == 'jdbcdatasource' - expected_location =\ - "{0}/test-warehouse/data-sources/jdbc-data-source.jar".format(FILESYSTEM_PREFIX) + expected_location = "/test-warehouse/data-sources/jdbc-data-source.jar" assert re.search(expected_location, properties['__IMPALA_DATA_SOURCE_LOCATION']) assert properties['__IMPALA_DATA_SOURCE_CLASS'] == \ 'org.apache.impala.extdatasource.jdbc.JdbcDataSource' @@ -82,6 +78,5 @@ class TestExtDataSources(ImpalaTestSuite): def test_data_source_tables(self, vector, unique_database): self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database) - @SkipIf.not_hdfs def test_jdbc_data_source(self, vector, unique_database): self.run_test_case('QueryTest/jdbc-data-source', vector, use_db=unique_database)
