This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 6c0c26146d956ad771cee27283c1371b9c23adce Author: wzhou-code <[email protected]> AuthorDate: Mon Mar 11 23:28:57 2024 -0700 IMPALA-12896: Avoid JDBC table to be set as transactional table In some deployment environment, JDBC tables are set as transactional tables by default. This causes catalogd failed to load the metadata for JDBC tables. This patch explicitly add table properties with "transactional=false" for JDBC table to avoid the JDBC to be set as transactional table. The operations on JDBC table are processed only on coordinator. The processed rows should be estimated as 0 for DataSourceScanNode by planner so that coordinator-only query plans are generated for simple queries on JDBC tables and queries could be executed without invoking executor nodes. Also adds Preconditions.check to make sure numNodes equals 1 for DataSourceScanNode. Updates FileSystemUtil.copyFileFromUriToLocal() function to write log message for all types of exceptions. Testing: - Fixed planer tests for data source tables. - Ran end-to-end tests of JDBC tables with query option 'exec_single_node_rows_threshold' as default value 100. - Passed core-tests. Change-Id: I556faeda923a4a11d4bef8c1250c9616f77e6fa6 Reviewed-on: http://gerrit.cloudera.org:8080/21141 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/catalog/DataSourceTable.java | 6 ++++++ .../java/org/apache/impala/common/FileSystemUtil.java | 2 +- .../apache/impala/util/MaxRowsProcessedVisitor.java | 19 ++++++++++++++++--- .../queries/PlannerTest/resource-requirements.test | 3 +++ .../queries/PlannerTest/small-query-opt.test | 8 -------- tests/custom_cluster/test_ext_data_sources.py | 13 +++++++++++++ tests/query_test/test_ext_data_sources.py | 4 +++- 7 files changed, 42 insertions(+), 13 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java index 7b31badf6..642ca481c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java @@ -41,6 +41,7 @@ import org.apache.impala.thrift.TResultSetMetadata; import org.apache.impala.thrift.TTable; import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableType; +import org.apache.impala.util.AcidUtils; import org.apache.impala.util.JsonUtil; import org.apache.impala.util.TResultRowBuilder; import com.google.common.base.Preconditions; @@ -182,6 +183,11 @@ public class DataSourceTable extends Table implements FeDataSourceTable { // use an external data source so we need to add the table property with the name // of builtin JDBC DataSource. tblProperties.put(TBL_PROP_DATA_SRC_NAME, IMPALA_BUILTIN_JDBC_DATASOURCE); + // Explicitly add table property "transactional=false" to avoid JDBC table to be + // set as transactional table. + if (!tblPropertyKeys.contains(AcidUtils.TABLE_IS_TRANSACTIONAL)) { + tblProperties.put(AcidUtils.TABLE_IS_TRANSACTIONAL, "false"); + } } /** diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java index 39338e00b..70d9ff59d 100644 --- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java +++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java @@ -806,7 +806,7 @@ public class FileSystemUtil { Path remoteFilePath = new Path(srcUri); Path localFilePath = new Path("file://" + localPath); FileSystemUtil.copyToLocal(remoteFilePath, localFilePath); - } catch (IOException e) { + } catch (Exception e) { String errorMsg = "Failed to copy " + srcUri + " to local path: " + localPath; LOG.error(errorMsg, e); throw new IOException(String.format("%s, %s", errorMsg, e.getMessage())); diff --git a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java index de4592a80..a366aacfd 100644 --- a/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java +++ b/fe/src/main/java/org/apache/impala/util/MaxRowsProcessedVisitor.java @@ -17,6 +17,7 @@ package org.apache.impala.util; +import org.apache.impala.planner.DataSourceScanNode; import org.apache.impala.planner.PlanFragment; import org.apache.impala.planner.PlanNode; import org.apache.impala.planner.ScanNode; @@ -33,10 +34,10 @@ public class MaxRowsProcessedVisitor implements Visitor<PlanNode> { private boolean valid_ = true; // Max number of rows processed across all instances of a plan node. - private long maxRowsProcessed_ ; + private long maxRowsProcessed_ = 0; // Max number of rows processed per backend impala daemon for a plan node. - private long maxRowsProcessedPerNode_; + private long maxRowsProcessedPerNode_ = 0; @Override public void visit(PlanNode caller) { @@ -44,7 +45,17 @@ public class MaxRowsProcessedVisitor implements Visitor<PlanNode> { PlanFragment fragment = caller.getFragment(); int numNodes = fragment == null ? 1 : fragment.getNumNodes(); - if (caller instanceof ScanNode) { + if (caller instanceof DataSourceScanNode) { + // Operations on DataSourceScanNode are processed on coordinator. + if (fragment == null) { + numNodes = ((DataSourceScanNode)caller).getNumNodes(); + } + Preconditions.checkState(numNodes == 1); + if (numNodes != 1) { + valid_ = false; + return; + } + } else if (caller instanceof ScanNode) { long numRows = caller.getInputCardinality(); ScanNode scan = (ScanNode) caller; boolean missingStats = scan.isTableMissingStats() || scan.hasCorruptTableStats(); @@ -70,6 +81,8 @@ public class MaxRowsProcessedVisitor implements Visitor<PlanNode> { maxRowsProcessedPerNode_ = Math.max(maxRowsProcessedPerNode_, (long)Math.ceil(numRows / (double)numNodes)); } + Preconditions.checkState(maxRowsProcessed_ >= 0); + Preconditions.checkState(maxRowsProcessedPerNode_ >= 0); } public boolean valid() { return valid_; } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index 6994b3544..e926f1b93 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -1540,6 +1540,7 @@ select * from functional.alltypes_datasource ---- PLAN Max Per-Host Resource Reservation: Memory=4.00MB Threads=1 Per-Host Resource Estimates: Memory=1.00GB +Codegen disabled by planner WARNING: The following tables are missing relevant table and/or column statistics. functional.alltypes_datasource Analyzed query: SELECT * FROM functional.alltypes_datasource @@ -1557,6 +1558,7 @@ PLAN-ROOT SINK ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=4.00MB Threads=2 Per-Host Resource Estimates: Memory=1.01GB +Codegen disabled by planner WARNING: The following tables are missing relevant table and/or column statistics. functional.alltypes_datasource Analyzed query: SELECT * FROM functional.alltypes_datasource @@ -1581,6 +1583,7 @@ Per-Host Resources: mem-estimate=1.00GB mem-reservation=0B thread-reservation=1 ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=4.00MB Threads=2 Per-Host Resource Estimates: Memory=1.01GB +Codegen disabled by planner WARNING: The following tables are missing relevant table and/or column statistics. functional.alltypes_datasource Analyzed query: SELECT * FROM functional.alltypes_datasource diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test index 9ce15f4ef..358f4f960 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/small-query-opt.test @@ -607,8 +607,6 @@ select * from functional.alltypes_datasource; ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -01:EXCHANGE [UNPARTITIONED] -| 00:SCAN DATA SOURCE [functional.alltypes_datasource] row-size=116B cardinality=5.00K ==== @@ -626,9 +624,6 @@ select * from functional.alltypes_datasource limit 1000; ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -01:EXCHANGE [UNPARTITIONED] -| limit: 1000 -| 00:SCAN DATA SOURCE [functional.alltypes_datasource] limit: 1000 row-size=116B cardinality=1.00K @@ -640,9 +635,6 @@ select * from functional.alltypes_datasource where id = 1 limit 7 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -01:EXCHANGE [UNPARTITIONED] -| limit: 7 -| 00:SCAN DATA SOURCE [functional.alltypes_datasource] data source predicates: id = 1 limit: 7 diff --git a/tests/custom_cluster/test_ext_data_sources.py b/tests/custom_cluster/test_ext_data_sources.py index 2c7d405ca..67a94f100 100644 --- a/tests/custom_cluster/test_ext_data_sources.py +++ b/tests/custom_cluster/test_ext_data_sources.py @@ -24,6 +24,7 @@ import subprocess from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout from tests.common.skip import SkipIfApacheHive +from tests.common.test_dimensions import create_exec_option_dimension from time import sleep @@ -34,6 +35,12 @@ class TestExtDataSources(CustomClusterTestSuite): def get_workload(self): return 'functional-query' + @classmethod + def add_test_dimensions(cls): + super(TestExtDataSources, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( + exec_single_node_option=[100])) + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", @@ -225,6 +232,12 @@ class TestImpalaExtJdbcTables(CustomClusterTestSuite): def get_workload(cls): return 'functional-query' + @classmethod + def add_test_dimensions(cls): + super(TestImpalaExtJdbcTables, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( + exec_single_node_option=[100])) + @classmethod def _download_impala_jdbc_driver(cls): # Download Impala jdbc driver and copy jdbc driver to HDFS. diff --git a/tests/query_test/test_ext_data_sources.py b/tests/query_test/test_ext_data_sources.py index fe727e5c8..ca16578a9 100644 --- a/tests/query_test/test_ext_data_sources.py +++ b/tests/query_test/test_ext_data_sources.py @@ -20,7 +20,8 @@ from __future__ import absolute_import, division, print_function import re from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.test_dimensions import create_uncompressed_text_dimension +from tests.common.test_dimensions import (create_uncompressed_text_dimension, + extend_exec_option_dimension) class TestExtDataSources(ImpalaTestSuite): @@ -35,6 +36,7 @@ class TestExtDataSources(ImpalaTestSuite): super(TestExtDataSources, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension( create_uncompressed_text_dimension(cls.get_workload())) + extend_exec_option_dimension(cls, "exec_single_node_rows_threshold", "100") def _get_tbl_properties(self, table_name): """Extracts the table properties mapping from the output of DESCRIBE FORMATTED"""
