This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.1.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit c90d088c6f9cd37b3db463b038ad32cc130c02e9 Author: Csaba Ringhofer <[email protected]> AuthorDate: Thu Jul 21 14:53:42 2022 +0200 IMPALA-11447: Fix crash when fetching arrays/structs with result caching Some parts of HS2ColumnarResultSet were not prepared for returning non-scalar types. This code only runs if impala.resultset.cache.size is set, which is not the case in most of tests. The issue was caught with Hue, which uses result caching. Testing: - Added a regression test in test_fetch_first.py, which contained other tests that used result caching. - It turned out that some tests in the file did not run at all, as @needs_session() needs the parenthesis at the end. For this reason some test fixes were added to run them correctly, though these changes are totally unrelated to the current issue. Backport issue: - Test fails due to STRUCT in SelectList not supported on Parquet. Changed to use the corresponding ORC table. Change-Id: Ia4dd8f76187dc3555207e2d30d46d811e0a7a126 Reviewed-on: http://gerrit.cloudera.org:8080/18768 Reviewed-by: Impala Public Jenkins <[email protected]> Reviewed-by: Wenzhe Zhou <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Reviewed-on: http://gerrit.cloudera.org:8080/18889 Tested-by: Quanlong Huang <[email protected]> --- be/src/service/query-result-set.cc | 8 ++++- tests/hs2/test_fetch_first.py | 66 ++++++++++++++++++++++++-------------- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc index 7ae6dc613..9d0300954 100644 --- a/be/src/service/query-result-set.cc +++ b/be/src/service/query-result-set.cc @@ -351,7 +351,13 @@ int HS2ColumnarResultSet::AddRows( for (int j = 0; j < metadata_.columns.size(); ++j) { ThriftTColumn* from = &o->result_set_->columns[j]; ThriftTColumn* to = &result_set_->columns[j]; - switch (metadata_.columns[j].columnType.types[0].scalar_type.type) { + const TColumnType& colType = metadata_.columns[j].columnType; + TPrimitiveType::type primitiveType = colType.types[0].scalar_type.type; + if (colType.types[0].type != TTypeNodeType::SCALAR) { + DCHECK(from->__isset.stringVal); + primitiveType = TPrimitiveType::STRING; + } + switch (primitiveType) { case TPrimitiveType::NULL_TYPE: case TPrimitiveType::BOOLEAN: StitchNulls( diff --git a/tests/hs2/test_fetch_first.py b/tests/hs2/test_fetch_first.py index b925c66cb..3bd1b3835 100644 --- a/tests/hs2/test_fetch_first.py +++ b/tests/hs2/test_fetch_first.py @@ -128,8 +128,18 @@ class TestFetchFirst(HS2TestSuite): def test_query_stmts_v1_with_result_spooling(self): self.run_query_stmts_test({'spool_query_results': 'true'}) + def run_query_expect_success(self, query, options): + """Executes a query and returns its handle.""" + execute_statement_req = TCLIService.TExecuteStatementReq() + execute_statement_req.sessionHandle = self.session_handle + execute_statement_req.confOverlay = options + execute_statement_req.statement = query + execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req) + HS2TestSuite.check_response(execute_statement_resp) + return execute_statement_resp.operationHandle + @pytest.mark.execute_serially - @needs_session + @needs_session() def test_rows_materialized_counters(self): """Test that NumRowsFetched is updated even when a fetch request is served by the results cache, and that RowsMaterialized is only updated when rows are first created @@ -140,29 +150,25 @@ class TestFetchFirst(HS2TestSuite): num_rows_fetched_from_cache = "NumRowsFetchedFromCache: {0} ({0})" # Execute the query with the results cache enabled. - execute_statement_req = TCLIService.TExecuteStatementReq() - execute_statement_req.confOverlay[self.IMPALA_RESULT_CACHING_OPT] = str(num_rows) - execute_statement_req.statement = statement - execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req) - HS2TestSuite.check_response(execute_statement_resp) + options = {self.IMPALA_RESULT_CACHING_OPT: str(num_rows)} + handle = self.run_query_expect_success(statement, options) # Fetch all rows from the query and verify they have been cached. - self.fetch_until(execute_statement_resp.operationHandle, - TCLIService.TFetchOrientation.FETCH_NEXT, num_rows) + self.fetch_until(handle, TCLIService.TFetchOrientation.FETCH_NEXT, num_rows) self.__verify_num_cached_rows(num_rows) # Get the runtime profile and validate that NumRowsFetched and RowsMaterialized both # equal the number of rows fetched by the query. - profile = self.__get_runtime_profile(execute_statement_resp.operationHandle) + profile = self.__get_runtime_profile(handle) assert num_rows_fetched.format(num_rows) in profile # Fetch all rows again and confirm that RowsMaterialized is unchanged, but # NumRowsFetched is double the number of rows returned by the query. - self.fetch_until(execute_statement_resp.operationHandle, - TCLIService.TFetchOrientation.FETCH_FIRST, num_rows) - profile = self.__get_runtime_profile(execute_statement_resp.operationHandle) + self.fetch_until(handle, TCLIService.TFetchOrientation.FETCH_FIRST, num_rows) + profile = self.__get_runtime_profile(handle) assert num_rows_fetched.format(num_rows) in profile assert num_rows_fetched_from_cache.format(num_rows) in profile + self.close(handle) def __get_runtime_profile(self, op_handle): """Helper method to get the runtime profile from a given operation handle.""" @@ -319,7 +325,7 @@ class TestFetchFirst(HS2TestSuite): self.close(execute_statement_resp.operationHandle) @pytest.mark.execute_serially - @needs_session + @needs_session() def test_constant_query_stmts(self): """Tests query stmts that return a constant result set. These queries are handled somewhat specially by Impala, therefore, we test them separately. We expect @@ -363,7 +369,7 @@ class TestFetchFirst(HS2TestSuite): self.close(execute_statement_resp.operationHandle) @pytest.mark.execute_serially - @needs_session + @needs_session() def test_non_query_stmts(self): """Tests Impala's limited support for the FETCH_FIRST fetch orientation for non-query stmts that return a result set, such as SHOW, COMPUTE STATS, etc. @@ -434,9 +440,10 @@ class TestFetchFirst(HS2TestSuite): # FETCH_NEXT asking for 100 rows. There are only 20 remaining rows. self.fetch_until(execute_statement_resp.operationHandle, TCLIService.TFetchOrientation.FETCH_NEXT, 100, 20) + self.close(execute_statement_resp.operationHandle) @pytest.mark.execute_serially - @needs_session + @needs_session() def test_parallel_insert(self): """Tests parallel inserts with result set caching on. Parallel inserts have a coordinator instance but no coordinator @@ -447,12 +454,23 @@ class TestFetchFirst(HS2TestSuite): self.client.set_configuration({'sync_ddl': 1}) self.client.execute("create database %s" % self.TEST_DB) self.client.execute("create table %s.orderclone like tpch.orders" % self.TEST_DB) - execute_statement_req = TCLIService.TExecuteStatementReq() - execute_statement_req.sessionHandle = self.session_handle - execute_statement_req.confOverlay = dict() - execute_statement_req.confOverlay[self.IMPALA_RESULT_CACHING_OPT] = "10" - execute_statement_req.statement = ("insert overwrite %s.orderclone " - "select * from tpch.orders " - "where o_orderkey < 0" % self.TEST_DB) - execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req) - HS2TestSuite.check_response(execute_statement_resp) + options = {self.IMPALA_RESULT_CACHING_OPT: "10"} + handle = self.run_query_expect_success("insert overwrite %s.orderclone " + "select * from tpch.orders " + "where o_orderkey < 0" % self.TEST_DB, options) + self.close(handle) + + @pytest.mark.execute_serially + @needs_session() + def test_complex_types_result_caching(self): + """Regression test for IMPALA-11447. Returning complex types in select list + was crashing in hs2 if result caching was enabled. + """ + options = {self.IMPALA_RESULT_CACHING_OPT: "1024", "disable_codegen": "true"} + handle = self.run_query_expect_success( + "select int_array from functional_orc_def.complextypestbl", options) + self.fetch_until(handle, TCLIService.TFetchOrientation.FETCH_NEXT, 10, 8) + handle = self.run_query_expect_success( + "select alltypes from functional_orc_def.complextypes_structs", options) + self.fetch_until(handle, TCLIService.TFetchOrientation.FETCH_NEXT, 10, 6) + self.close(handle)
