This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c90d088c6f9cd37b3db463b038ad32cc130c02e9
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Thu Jul 21 14:53:42 2022 +0200

    IMPALA-11447: Fix crash when fetching arrays/structs with result caching
    
    Some parts of HS2ColumnarResultSet were not prepared for returning
    non-scalar types. This code only runs if impala.resultset.cache.size
    is set, which is not the case in most of tests. The issue was caught
    with Hue, which uses result caching.
    
    Testing:
    - Added a regression test in test_fetch_first.py, which contained
      other tests that used result caching.
    - It turned out that some tests in the file did not run at all,
      as @needs_session() needs the parenthesis at the end. For this
      reason some test fixes were added to run them correctly, though
      these changes are totally unrelated to the current issue.
    
    Backport issue:
    - Test fails due to STRUCT in SelectList not supported on Parquet.
      Changed to use the corresponding ORC table.
    
    Change-Id: Ia4dd8f76187dc3555207e2d30d46d811e0a7a126
    Reviewed-on: http://gerrit.cloudera.org:8080/18768
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-on: http://gerrit.cloudera.org:8080/18889
    Tested-by: Quanlong Huang <[email protected]>
---
 be/src/service/query-result-set.cc |  8 ++++-
 tests/hs2/test_fetch_first.py      | 66 ++++++++++++++++++++++++--------------
 2 files changed, 49 insertions(+), 25 deletions(-)

diff --git a/be/src/service/query-result-set.cc 
b/be/src/service/query-result-set.cc
index 7ae6dc613..9d0300954 100644
--- a/be/src/service/query-result-set.cc
+++ b/be/src/service/query-result-set.cc
@@ -351,7 +351,13 @@ int HS2ColumnarResultSet::AddRows(
   for (int j = 0; j < metadata_.columns.size(); ++j) {
     ThriftTColumn* from = &o->result_set_->columns[j];
     ThriftTColumn* to = &result_set_->columns[j];
-    switch (metadata_.columns[j].columnType.types[0].scalar_type.type) {
+    const TColumnType& colType = metadata_.columns[j].columnType;
+    TPrimitiveType::type primitiveType = colType.types[0].scalar_type.type;
+    if (colType.types[0].type != TTypeNodeType::SCALAR) {
+      DCHECK(from->__isset.stringVal);
+      primitiveType = TPrimitiveType::STRING;
+    }
+    switch (primitiveType) {
       case TPrimitiveType::NULL_TYPE:
       case TPrimitiveType::BOOLEAN:
         StitchNulls(
diff --git a/tests/hs2/test_fetch_first.py b/tests/hs2/test_fetch_first.py
index b925c66cb..3bd1b3835 100644
--- a/tests/hs2/test_fetch_first.py
+++ b/tests/hs2/test_fetch_first.py
@@ -128,8 +128,18 @@ class TestFetchFirst(HS2TestSuite):
   def test_query_stmts_v1_with_result_spooling(self):
     self.run_query_stmts_test({'spool_query_results': 'true'})
 
+  def run_query_expect_success(self, query, options):
+    """Executes a query and returns its handle."""
+    execute_statement_req = TCLIService.TExecuteStatementReq()
+    execute_statement_req.sessionHandle = self.session_handle
+    execute_statement_req.confOverlay = options
+    execute_statement_req.statement = query
+    execute_statement_resp = 
self.hs2_client.ExecuteStatement(execute_statement_req)
+    HS2TestSuite.check_response(execute_statement_resp)
+    return execute_statement_resp.operationHandle
+
   @pytest.mark.execute_serially
-  @needs_session
+  @needs_session()
   def test_rows_materialized_counters(self):
     """Test that NumRowsFetched is updated even when a fetch request is served 
by the
     results cache, and that RowsMaterialized is only updated when rows are 
first created
@@ -140,29 +150,25 @@ class TestFetchFirst(HS2TestSuite):
     num_rows_fetched_from_cache = "NumRowsFetchedFromCache: {0} ({0})"
 
     # Execute the query with the results cache enabled.
-    execute_statement_req = TCLIService.TExecuteStatementReq()
-    execute_statement_req.confOverlay[self.IMPALA_RESULT_CACHING_OPT] = 
str(num_rows)
-    execute_statement_req.statement = statement
-    execute_statement_resp = 
self.hs2_client.ExecuteStatement(execute_statement_req)
-    HS2TestSuite.check_response(execute_statement_resp)
+    options = {self.IMPALA_RESULT_CACHING_OPT: str(num_rows)}
+    handle = self.run_query_expect_success(statement, options)
 
     # Fetch all rows from the query and verify they have been cached.
-    self.fetch_until(execute_statement_resp.operationHandle,
-        TCLIService.TFetchOrientation.FETCH_NEXT, num_rows)
+    self.fetch_until(handle, TCLIService.TFetchOrientation.FETCH_NEXT, 
num_rows)
     self.__verify_num_cached_rows(num_rows)
 
     # Get the runtime profile and validate that NumRowsFetched and 
RowsMaterialized both
     # equal the number of rows fetched by the query.
-    profile = 
self.__get_runtime_profile(execute_statement_resp.operationHandle)
+    profile = self.__get_runtime_profile(handle)
     assert num_rows_fetched.format(num_rows) in profile
 
     # Fetch all rows again and confirm that RowsMaterialized is unchanged, but
     # NumRowsFetched is double the number of rows returned by the query.
-    self.fetch_until(execute_statement_resp.operationHandle,
-        TCLIService.TFetchOrientation.FETCH_FIRST, num_rows)
-    profile = 
self.__get_runtime_profile(execute_statement_resp.operationHandle)
+    self.fetch_until(handle, TCLIService.TFetchOrientation.FETCH_FIRST, 
num_rows)
+    profile = self.__get_runtime_profile(handle)
     assert num_rows_fetched.format(num_rows) in profile
     assert num_rows_fetched_from_cache.format(num_rows) in profile
+    self.close(handle)
 
   def __get_runtime_profile(self, op_handle):
     """Helper method to get the runtime profile from a given operation 
handle."""
@@ -319,7 +325,7 @@ class TestFetchFirst(HS2TestSuite):
     self.close(execute_statement_resp.operationHandle)
 
   @pytest.mark.execute_serially
-  @needs_session
+  @needs_session()
   def test_constant_query_stmts(self):
     """Tests query stmts that return a constant result set. These queries are 
handled
     somewhat specially by Impala, therefore, we test them separately. We expect
@@ -363,7 +369,7 @@ class TestFetchFirst(HS2TestSuite):
     self.close(execute_statement_resp.operationHandle)
 
   @pytest.mark.execute_serially
-  @needs_session
+  @needs_session()
   def test_non_query_stmts(self):
     """Tests Impala's limited support for the FETCH_FIRST fetch orientation for
     non-query stmts that return a result set, such as SHOW, COMPUTE STATS, etc.
@@ -434,9 +440,10 @@ class TestFetchFirst(HS2TestSuite):
     # FETCH_NEXT asking for 100 rows. There are only 20 remaining rows.
     self.fetch_until(execute_statement_resp.operationHandle,
                      TCLIService.TFetchOrientation.FETCH_NEXT, 100, 20)
+    self.close(execute_statement_resp.operationHandle)
 
   @pytest.mark.execute_serially
-  @needs_session
+  @needs_session()
   def test_parallel_insert(self):
     """Tests parallel inserts with result set caching on.
     Parallel inserts have a coordinator instance but no coordinator
@@ -447,12 +454,23 @@ class TestFetchFirst(HS2TestSuite):
     self.client.set_configuration({'sync_ddl': 1})
     self.client.execute("create database %s" % self.TEST_DB)
     self.client.execute("create table %s.orderclone like tpch.orders" % 
self.TEST_DB)
-    execute_statement_req = TCLIService.TExecuteStatementReq()
-    execute_statement_req.sessionHandle = self.session_handle
-    execute_statement_req.confOverlay = dict()
-    execute_statement_req.confOverlay[self.IMPALA_RESULT_CACHING_OPT] = "10"
-    execute_statement_req.statement = ("insert overwrite %s.orderclone "
-                                      "select * from tpch.orders "
-                                      "where o_orderkey < 0" % self.TEST_DB)
-    execute_statement_resp = 
self.hs2_client.ExecuteStatement(execute_statement_req)
-    HS2TestSuite.check_response(execute_statement_resp)
+    options = {self.IMPALA_RESULT_CACHING_OPT: "10"}
+    handle = self.run_query_expect_success("insert overwrite %s.orderclone "
+                                 "select * from tpch.orders "
+                                 "where o_orderkey < 0" % self.TEST_DB, 
options)
+    self.close(handle)
+
+  @pytest.mark.execute_serially
+  @needs_session()
+  def test_complex_types_result_caching(self):
+    """Regression test for IMPALA-11447. Returning complex types in select list
+    was crashing in hs2 if result caching was enabled.
+    """
+    options = {self.IMPALA_RESULT_CACHING_OPT: "1024", "disable_codegen": 
"true"}
+    handle = self.run_query_expect_success(
+        "select int_array from functional_orc_def.complextypestbl", options)
+    self.fetch_until(handle, TCLIService.TFetchOrientation.FETCH_NEXT, 10, 8)
+    handle = self.run_query_expect_success(
+        "select alltypes from functional_orc_def.complextypes_structs", 
options)
+    self.fetch_until(handle, TCLIService.TFetchOrientation.FETCH_NEXT, 10, 6)
+    self.close(handle)

Reply via email to