This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 39adf42a30765208e51e970339e950aae8544848
Author: gaurav1086 <[email protected]>
AuthorDate: Tue Oct 31 13:24:31 2023 -0700

    IMPALA-12470: Support different schemes for jdbc driver url when
    creating external jdbc table
    
    This patch builds on top of IMPALA-5741 to copy the jdbc jar from
    remote filesystems: Ozone and S3. Currenty we only support hdfs.
    
    Testing:
    Commented out "@skipif.not_hdfs" qualifier in files:
      - tests/query_test/test_ext_data_sources.py
      - tests/custom_cluster/test_ext_data_sources.py
    1) tested locally by running tests:
      - impala-py.test tests/query_test/test_ext_data_sources.py
      - impala-py.test tests/custom_cluster/test_ext_data_sources.py
    2) tested using jenkins job for ozone and S3
    
    Change-Id: I804fa3d239a4bedcd31569f2b46edb7316d7f004
    Reviewed-on: http://gerrit.cloudera.org:8080/20639
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Tested-by: Wenzhe Zhou <[email protected]>
---
 .../org/apache/impala/common/FileSystemUtil.java   | 29 ++++++++++++++++++++++
 .../apache/impala/planner/DataSourceScanNode.java  | 21 ++++++++++------
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java      | 21 ++++++++++------
 testdata/bin/create-load-data.sh                   | 13 ++++++----
 tests/custom_cluster/test_ext_data_sources.py      |  2 --
 tests/query_test/test_ext_data_sources.py          |  7 +-----
 6 files changed, 64 insertions(+), 29 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java 
b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 519449237..39338e00b 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -785,6 +786,34 @@ public class FileSystemUtil {
     fs.copyToLocalFile(source, dest);
   }
 
+  /**
+   * Copies the source file with the given URI to tmp directory on the local 
filesystem.
+   * Returns local file path.
+   * Throws IOException on failure.
+   */
+  public static String copyFileFromUriToLocal(String srcUri)
+      throws IOException {
+    Preconditions.checkNotNull(srcUri);
+    String localLibPath = 
BackendConfig.INSTANCE.getBackendCfg().local_library_path;
+    String fileExt = FilenameUtils.getExtension(srcUri);
+    String localPath;
+    if (localLibPath != null && !localLibPath.isEmpty()) {
+      localPath = localLibPath + "/" + UUID.randomUUID().toString() + "." + 
fileExt;
+    } else {
+      localPath = "/tmp/" + UUID.randomUUID().toString() + "." + fileExt;
+    }
+    try {
+      Path remoteFilePath = new Path(srcUri);
+      Path localFilePath = new Path("file://" + localPath);
+      FileSystemUtil.copyToLocal(remoteFilePath, localFilePath);
+    } catch (IOException e) {
+      String errorMsg = "Failed to copy " + srcUri + " to local path: " + 
localPath;
+      LOG.error(errorMsg, e);
+      throw new IOException(String.format("%s, %s", errorMsg, e.getMessage()));
+    }
+    return localPath;
+  }
+
   /**
    * Delete the file at 'path' if it exists.
    */
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index 7cafa1840..df95c2268 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -17,12 +17,14 @@
 
 package org.apache.impala.planner;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.BoolLiteral;
@@ -35,6 +37,7 @@ import org.apache.impala.analysis.StringLiteral;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.FeDataSourceTable;
+import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.extdatasource.ExternalDataSourceExecutor;
@@ -163,16 +166,14 @@ public class DataSourceScanNode extends ScanNode {
       }
     }
 
-    String hdfsLocation = table_.getDataSource().getHdfs_location();
-    TCacheJarResult cacheResult = FeSupport.CacheJar(hdfsLocation);
-    TStatus cacheJarStatus = cacheResult.getStatus();
-    if (cacheJarStatus.getStatus_code() != TErrorCode.OK) {
+    String dsLocation = table_.getDataSource().getHdfs_location();
+    String localPath;
+    try {
+      localPath = FileSystemUtil.copyFileFromUriToLocal(dsLocation);
+    } catch (IOException e) {
       throw new InternalException(String.format(
-          "Unable to cache data source library at location '%s'. Check that 
the file " +
-          "exists and is readable. Message: %s",
-          hdfsLocation, Joiner.on("\n").join(cacheJarStatus.getError_msgs())));
+          "Unable to fetch data source jar from location '%s'.", dsLocation));
     }
-    String localPath = cacheResult.getLocal_path();
     String className = table_.getDataSource().getClass_name();
     String apiVersion = table_.getDataSource().getApi_version();
     TPrepareResult prepareResult;
@@ -191,6 +192,10 @@ public class DataSourceScanNode extends ScanNode {
       throw new InternalException(String.format(
           "Error calling prepare() on data source %s",
           DataSource.debugString(table_.getDataSource())), e);
+    } finally {
+      // Delete the jar file once its loaded
+      Path localJarPath = new Path(localPath);
+      FileSystemUtil.deleteIfExists(localJarPath);
     }
     if (prepareStatus.getStatus_code() != TErrorCode.OK) {
       throw new InternalException(String.format(
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
index 5f74a2a6e..178a44ed6 100644
--- 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -17,8 +17,8 @@
 
 package org.apache.impala.extdatasource.jdbc.dao;
 
-
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.sql.Connection;
@@ -36,6 +36,8 @@ import javax.sql.DataSource;
 import org.apache.commons.dbcp2.BasicDataSource;
 import org.apache.commons.dbcp2.BasicDataSourceFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig;
 import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfigManager;
 import 
org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException;
@@ -231,15 +233,15 @@ public class GenericJdbcDatabaseAccessor implements 
DatabaseAccessor {
                     BasicDataSourceFactory.createDataSource(props);
                 // Put jdbc driver to cache
                 String driverUrl = props.getProperty("driverUrl");
-                TCacheJarResult cacheResult = FeSupport.CacheJar(driverUrl);
-                TStatus cacheJarStatus = cacheResult.getStatus();
-                if (cacheJarStatus.getStatus_code() != TErrorCode.OK) {
+                String driverLocalPath;
+                try {
+                  driverLocalPath =
+                    FileSystemUtil.copyFileFromUriToLocal(driverUrl);
+                } catch (IOException e) {
                   throw new JdbcDatabaseAccessException(String.format(
-                      "Unable to cache jdbc driver jar at location '%s'. " +
-                      "Check that the file exists and is readable. Message: 
%s",
-                      driverUrl, cacheJarStatus.getError_msgs()));
+                      "Unable to fetch jdbc driver jar from location '%s'. ",
+                      driverUrl));
                 }
-                String driverLocalPath = cacheResult.getLocal_path();
                 // Create class loader for jdbc driver and set it for the
                 // BasicDataSource object so that the driver class could be 
loaded
                 // from jar file without searching classpath.
@@ -248,6 +250,9 @@ public class GenericJdbcDatabaseAccessor implements 
DatabaseAccessor {
                     URLClassLoader.newInstance( new URL[] { driverJarUrl },
                         getClass().getClassLoader());
                 basicDataSource.setDriverClassLoader(driverLoader);
+                // Delete the jar file once its loaded
+                Path localJarPath = new Path(driverLocalPath);
+                FileSystemUtil.deleteIfExists(localJarPath);
                 return basicDataSource;
               });
         }
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 9d87c06cf..541d88b7b 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -614,11 +614,6 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
   # Caching tables in s3 returns an IllegalArgumentException, see IMPALA-1714
   run-step "Caching test tables" cache-test-tables.log cache-test-tables
 
-  # TODO: Modify the .sql file that creates the table to take an alternative 
location into
-  # account.
-  run-step "Loading external data sources" load-ext-data-source.log \
-      copy-and-load-ext-data-source
-
   run-step "Creating internal HBase table" create-internal-hbase-table.log \
       create-internal-hbase-table
 
@@ -628,6 +623,14 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
   run-step "Logging created files" created-files.log hdfs dfs -ls -R 
/test-warehouse
 fi
 
+if [[ "${TARGET_FILESYSTEM}" = "hdfs" || "${TARGET_FILESYSTEM}" = "ozone" || \
+      "${TARGET_FILESYSTEM}" = "s3" ]]; then
+  # TODO: Modify the .sql file that creates the table to take an alternative 
location into
+  # account.
+  run-step "Loading external data sources" load-ext-data-source.log \
+      copy-and-load-ext-data-source
+fi
+
 # TODO: Investigate why all stats are not preserved. Theoretically, we only 
need to
 # recompute stats for HBase.
 run-step "Computing table stats" compute-table-stats.log \
diff --git a/tests/custom_cluster/test_ext_data_sources.py 
b/tests/custom_cluster/test_ext_data_sources.py
index b8b465cfb..ec74a78f7 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -19,7 +19,6 @@ from __future__ import absolute_import, division, 
print_function
 import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIf
 
 
 class TestExtDataSources(CustomClusterTestSuite):
@@ -37,7 +36,6 @@ class TestExtDataSources(CustomClusterTestSuite):
     """Start Impala cluster in LocalCatalog Mode"""
     self.run_test_case('QueryTest/data-source-tables', vector, 
use_db=unique_database)
 
-  @SkipIf.not_hdfs
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--use_local_catalog=true",
diff --git a/tests/query_test/test_ext_data_sources.py 
b/tests/query_test/test_ext_data_sources.py
index 985e07cb2..094f0cd67 100644
--- a/tests/query_test/test_ext_data_sources.py
+++ b/tests/query_test/test_ext_data_sources.py
@@ -20,9 +20,7 @@ from __future__ import absolute_import, division, 
print_function
 import re
 
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIf
 from tests.common.test_dimensions import create_uncompressed_text_dimension
-from tests.util.filesystem_utils import FILESYSTEM_PREFIX
 
 
 class TestExtDataSources(ImpalaTestSuite):
@@ -62,14 +60,12 @@ class TestExtDataSources(ImpalaTestSuite):
         properties[fields[1].rstrip()] = fields[2].rstrip()
     return properties
 
-  @SkipIf.not_hdfs
   def test_verify_jdbc_table_properties(self, vector):
     jdbc_tbl_name = "functional.alltypes_jdbc_datasource"
     properties = self._get_tbl_properties(jdbc_tbl_name)
     # Verify data source related table properties
     assert properties['__IMPALA_DATA_SOURCE_NAME'] == 'jdbcdatasource'
-    expected_location =\
-        
"{0}/test-warehouse/data-sources/jdbc-data-source.jar".format(FILESYSTEM_PREFIX)
+    expected_location = "/test-warehouse/data-sources/jdbc-data-source.jar"
     assert re.search(expected_location, 
properties['__IMPALA_DATA_SOURCE_LOCATION'])
     assert properties['__IMPALA_DATA_SOURCE_CLASS'] == \
         'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
@@ -82,6 +78,5 @@ class TestExtDataSources(ImpalaTestSuite):
   def test_data_source_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/data-source-tables', vector, 
use_db=unique_database)
 
-  @SkipIf.not_hdfs
   def test_jdbc_data_source(self, vector, unique_database):
     self.run_test_case('QueryTest/jdbc-data-source', vector, 
use_db=unique_database)

Reply via email to