This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6c0c26146d956ad771cee27283c1371b9c23adce
Author: wzhou-code <[email protected]>
AuthorDate: Mon Mar 11 23:28:57 2024 -0700

    IMPALA-12896: Avoid JDBC table to be set as transactional table
    
    In some deployment environment, JDBC tables are set as transactional
    tables by default. This causes catalogd failed to load the metadata for
    JDBC tables. This patch explicitly add table properties with
    "transactional=false" for JDBC table to avoid the JDBC to be set as
    transactional table.
    
    The operations on JDBC table are processed only on coordinator. The
    processed rows should be estimated as 0 for DataSourceScanNode by
    planner so that coordinator-only query plans are generated for simple
    queries on JDBC tables and queries could be executed without invoking
    executor nodes. Also adds Preconditions.check to make sure numNodes
    equals 1 for DataSourceScanNode.
    
    Updates FileSystemUtil.copyFileFromUriToLocal() function to write log
    message for all types of exceptions.
    
    Testing:
     - Fixed planer tests for data source tables.
     - Ran end-to-end tests of JDBC tables with query option
       'exec_single_node_rows_threshold' as default value 100.
     - Passed core-tests.
    
    Change-Id: I556faeda923a4a11d4bef8c1250c9616f77e6fa6
    Reviewed-on: http://gerrit.cloudera.org:8080/21141
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/catalog/DataSourceTable.java    |  6 ++++++
 .../java/org/apache/impala/common/FileSystemUtil.java |  2 +-
 .../apache/impala/util/MaxRowsProcessedVisitor.java   | 19 ++++++++++++++++---
 .../queries/PlannerTest/resource-requirements.test    |  3 +++
 .../queries/PlannerTest/small-query-opt.test          |  8 --------
 tests/custom_cluster/test_ext_data_sources.py         | 13 +++++++++++++
 tests/query_test/test_ext_data_sources.py             |  4 +++-
 7 files changed, 42 insertions(+), 13 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java 
b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
index 7b31badf6..642ca481c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
@@ -41,6 +41,7 @@ import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.JsonUtil;
 import org.apache.impala.util.TResultRowBuilder;
 import com.google.common.base.Preconditions;
@@ -182,6 +183,11 @@ public class DataSourceTable extends Table implements 
FeDataSourceTable {
     // use an external data source so we need to add the table property with 
the name
     // of builtin JDBC DataSource.
     tblProperties.put(TBL_PROP_DATA_SRC_NAME, IMPALA_BUILTIN_JDBC_DATASOURCE);
+    // Explicitly add table property "transactional=false" to avoid JDBC table 
to be
+    // set as transactional table.
+    if (!tblPropertyKeys.contains(AcidUtils.TABLE_IS_TRANSACTIONAL)) {
+      tblProperties.put(AcidUtils.TABLE_IS_TRANSACTIONAL, "false");
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java 
b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 39338e00b..70d9ff59d 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -806,7 +806,7 @@ public class FileSystemUtil {
       Path remoteFilePath = new Path(srcUri);
       Path localFilePath = new Path("file://" + localPath);
       FileSystemUtil.copyToLocal(remoteFilePath, localFilePath);
-    } catch (IOException e) {
+    } catch (Exception e) {
       String errorMsg = "Failed to copy " + srcUri + " to local path: " + 
localPath;
       LOG.error(errorMsg, e);
       throw new IOException(String.format("%s, %s", errorMsg, e.getMessage()));
diff --git 
a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java 
b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
index de4592a80..a366aacfd 100644
--- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
+++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.util;
 
+import org.apache.impala.planner.DataSourceScanNode;
 import org.apache.impala.planner.PlanFragment;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.planner.ScanNode;
@@ -33,10 +34,10 @@ public class MaxRowsProcessedVisitor implements 
Visitor<PlanNode> {
   private boolean valid_ = true;
 
   // Max number of rows processed across all instances of a plan node.
-  private long maxRowsProcessed_ ;
+  private long maxRowsProcessed_ = 0;
 
   // Max number of rows processed per backend impala daemon for a plan node.
-  private long maxRowsProcessedPerNode_;
+  private long maxRowsProcessedPerNode_ = 0;
 
   @Override
   public void visit(PlanNode caller) {
@@ -44,7 +45,17 @@ public class MaxRowsProcessedVisitor implements 
Visitor<PlanNode> {
 
     PlanFragment fragment = caller.getFragment();
     int numNodes = fragment == null ? 1 : fragment.getNumNodes();
-    if (caller instanceof ScanNode) {
+    if (caller instanceof DataSourceScanNode) {
+      // Operations on DataSourceScanNode are processed on coordinator.
+      if (fragment == null) {
+        numNodes = ((DataSourceScanNode)caller).getNumNodes();
+      }
+      Preconditions.checkState(numNodes == 1);
+      if (numNodes != 1) {
+        valid_ = false;
+        return;
+      }
+    } else if (caller instanceof ScanNode) {
       long numRows = caller.getInputCardinality();
       ScanNode scan = (ScanNode) caller;
       boolean missingStats = scan.isTableMissingStats() || 
scan.hasCorruptTableStats();
@@ -70,6 +81,8 @@ public class MaxRowsProcessedVisitor implements 
Visitor<PlanNode> {
       maxRowsProcessedPerNode_ = Math.max(maxRowsProcessedPerNode_,
           (long)Math.ceil(numRows / (double)numNodes));
     }
+    Preconditions.checkState(maxRowsProcessed_ >= 0);
+    Preconditions.checkState(maxRowsProcessedPerNode_ >= 0);
   }
 
   public boolean valid() { return valid_; }
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 6994b3544..e926f1b93 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -1540,6 +1540,7 @@ select * from functional.alltypes_datasource
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=4.00MB Threads=1
 Per-Host Resource Estimates: Memory=1.00GB
+Codegen disabled by planner
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 functional.alltypes_datasource
 Analyzed query: SELECT * FROM functional.alltypes_datasource
@@ -1557,6 +1558,7 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
 Per-Host Resource Estimates: Memory=1.01GB
+Codegen disabled by planner
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 functional.alltypes_datasource
 Analyzed query: SELECT * FROM functional.alltypes_datasource
@@ -1581,6 +1583,7 @@ Per-Host Resources: mem-estimate=1.00GB 
mem-reservation=0B thread-reservation=1
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
 Per-Host Resource Estimates: Memory=1.01GB
+Codegen disabled by planner
 WARNING: The following tables are missing relevant table and/or column 
statistics.
 functional.alltypes_datasource
 Analyzed query: SELECT * FROM functional.alltypes_datasource
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
index 9ce15f4ef..358f4f960 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test
@@ -607,8 +607,6 @@ select * from functional.alltypes_datasource;
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-01:EXCHANGE [UNPARTITIONED]
-|
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
    row-size=116B cardinality=5.00K
 ====
@@ -626,9 +624,6 @@ select * from functional.alltypes_datasource limit 1000;
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-01:EXCHANGE [UNPARTITIONED]
-|  limit: 1000
-|
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
    limit: 1000
    row-size=116B cardinality=1.00K
@@ -640,9 +635,6 @@ select * from functional.alltypes_datasource where id = 1 
limit 7
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
-01:EXCHANGE [UNPARTITIONED]
-|  limit: 7
-|
 00:SCAN DATA SOURCE [functional.alltypes_datasource]
 data source predicates: id = 1
    limit: 7
diff --git a/tests/custom_cluster/test_ext_data_sources.py 
b/tests/custom_cluster/test_ext_data_sources.py
index 2c7d405ca..67a94f100 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -24,6 +24,7 @@ import subprocess
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout
 from tests.common.skip import SkipIfApacheHive
+from tests.common.test_dimensions import create_exec_option_dimension
 from time import sleep
 
 
@@ -34,6 +35,12 @@ class TestExtDataSources(CustomClusterTestSuite):
   def get_workload(self):
     return 'functional-query'
 
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestExtDataSources, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        exec_single_node_option=[100]))
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--use_local_catalog=true",
@@ -225,6 +232,12 @@ class TestImpalaExtJdbcTables(CustomClusterTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestImpalaExtJdbcTables, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
+        exec_single_node_option=[100]))
+
   @classmethod
   def _download_impala_jdbc_driver(cls):
     # Download Impala jdbc driver and copy jdbc driver to HDFS.
diff --git a/tests/query_test/test_ext_data_sources.py 
b/tests/query_test/test_ext_data_sources.py
index fe727e5c8..ca16578a9 100644
--- a/tests/query_test/test_ext_data_sources.py
+++ b/tests/query_test/test_ext_data_sources.py
@@ -20,7 +20,8 @@ from __future__ import absolute_import, division, 
print_function
 import re
 
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.test_dimensions import create_uncompressed_text_dimension
+from tests.common.test_dimensions import (create_uncompressed_text_dimension,
+    extend_exec_option_dimension)
 
 
 class TestExtDataSources(ImpalaTestSuite):
@@ -35,6 +36,7 @@ class TestExtDataSources(ImpalaTestSuite):
     super(TestExtDataSources, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(
         create_uncompressed_text_dimension(cls.get_workload()))
+    extend_exec_option_dimension(cls, "exec_single_node_rows_threshold", "100")
 
   def _get_tbl_properties(self, table_name):
     """Extracts the table properties mapping from the output of DESCRIBE 
FORMATTED"""

Reply via email to