This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 134de01a59580854d5050186db9b35a0a54cd8c6
Author: Riza Suminto <[email protected]>
AuthorDate: Fri Jan 3 19:35:55 2025 -0800

    IMPALA-13642: Fix unused test vector in test_scanners.py
    
    Several test vectors were ignored in test_scanners.py. This cause
    repetition of the same test without actually varying the test
    exec_option nor debug_action.
    
    This patch fix it by:
    - Use execute_query() instead of client.execute()
    - Passing vector.get_value('exec_option') when executing test query.
    
    Repurpose ImpalaTestMatrix.embed_independent_exec_options to deepcopy
    'exec_option' dimension during vector generation. Therefore, each test
    execution will have unique copy of 'exec_option' for them self.
    
    This patch also adds flake8-unused-arguments plugin into
    critique-gerrit-review.py and py3-requirements.txt so we can catch this
    issue during code review. impala-flake8 is also updated to use
    impala-python3-common.sh. Adds flake8==3.9.2 in py3-requirements.txt,
    which is the highest version that has compatible dependencies with
    pylint==2.10.2.
    
    Drop unused 'dryrun' parameter in get_catalog_compatibility_comments
    method of critique-gerrit-review.py.
    
    Testing:
    - Run impala-flake8 against test_scanners.py and confirm there is no
      more unused variable.
    - Run and pass test_scanners.py in core exploration.
    
    Change-Id: I3b78736327c71323d10bcd432e162400b7ed1d9d
    Reviewed-on: http://gerrit.cloudera.org:8080/22301
    Reviewed-by: Michael Smith <[email protected]>
    Reviewed-by: Joe McDonnell <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 bin/impala-flake8                      |   2 +-
 bin/jenkins/critique-gerrit-review.py  |   8 +-
 infra/python/deps/py3-requirements.txt |   4 +
 tests/common/test_vector.py            |  24 +++--
 tests/query_test/test_scanners.py      | 170 ++++++++++++++++++---------------
 5 files changed, 116 insertions(+), 92 deletions(-)

diff --git a/bin/impala-flake8 b/bin/impala-flake8
index 1d321c86d..4ec6a329e 100755
--- a/bin/impala-flake8
+++ b/bin/impala-flake8
@@ -17,5 +17,5 @@
 # specific language governing permissions and limitations
 # under the License.
 
-source "$(dirname "$0")/impala-python-common.sh"
+source "$(dirname "$0")/impala-python3-common.sh"
 exec "$PY_ENV_DIR/bin/flake8" "$@"
diff --git a/bin/jenkins/critique-gerrit-review.py 
b/bin/jenkins/critique-gerrit-review.py
index fbdaa8f5f..56aaf1b6b 100755
--- a/bin/jenkins/critique-gerrit-review.py
+++ b/bin/jenkins/critique-gerrit-review.py
@@ -51,6 +51,7 @@ import venv
 FLAKE8_VERSION = "7.1.1"
 FLAKE8_DIFF_VERSION = "0.2.2"
 PYPARSING_VERSION = "3.1.4"
+FLAKE8_UNUSED_ARG_VERSION = "0.0.13"
 
 VENV_PATH = "gerrit_critic_venv"
 VENV_BIN = os.path.join(VENV_PATH, "bin")
@@ -131,7 +132,8 @@ def setup_virtualenv():
               "wheel",
               f"flake8=={FLAKE8_VERSION}",
               f"flake8-diff=={FLAKE8_DIFF_VERSION}",
-              f"pyparsing=={PYPARSING_VERSION}"])
+              f"pyparsing=={PYPARSING_VERSION}",
+              f"flake8-unused-arguments=={FLAKE8_UNUSED_ARG_VERSION}"])
   # Add the libpath of the installed venv to import pyparsing
   sys.path.append(os.path.join(VENV_PATH, 
f"lib/python{sys.version_info.major}."
                                           
f"{sys.version_info.minor}/site-packages/"))
@@ -346,7 +348,7 @@ def extract_thrift_defs_of_revision(revision, file_name):
   return extract_thrift_defs(contents)
 
 
-def get_catalog_compatibility_comments(base_revision, revision, dryrun=False):
+def get_catalog_compatibility_comments(base_revision, revision):
   """Get comments on Thrift/FlatBuffers changes that might break the 
communication
   between impalad and catalogd/statestore"""
   comments = defaultdict(lambda: [])
@@ -451,7 +453,7 @@ if __name__ == "__main__":
   comments = get_flake8_comments(base_revision, revision)
   merge_comments(comments, get_misc_comments(base_revision, revision, 
args.dryrun))
   merge_comments(
-    comments, get_catalog_compatibility_comments(base_revision, revision, 
args.dryrun))
+    comments, get_catalog_compatibility_comments(base_revision, revision))
   merge_comments(comments, get_planner_tests_comments())
   review_input = {"comments": comments}
   if len(comments) > 0:
diff --git a/infra/python/deps/py3-requirements.txt 
b/infra/python/deps/py3-requirements.txt
index b61bc461c..c21bb3e92 100644
--- a/infra/python/deps/py3-requirements.txt
+++ b/infra/python/deps/py3-requirements.txt
@@ -30,3 +30,7 @@ pylint == 2.10.2
   platformdirs == 2.4.1
   typing-extensions == 3.10.0.2
 k5test==0.10.3
+
+# Below are needed only for dev
+flake8==3.9.2
+flake8-unused-arguments==0.0.13
diff --git a/tests/common/test_vector.py b/tests/common/test_vector.py
index 3ccaa2dc2..bdab6cf5b 100644
--- a/tests/common/test_vector.py
+++ b/tests/common/test_vector.py
@@ -199,28 +199,32 @@ class ImpalaTestMatrix(object):
     else:
       raise ValueError('Unknown exploration strategy: %s' % 
exploration_strategy)
 
-  def embed_independent_exec_options(self, vector_values):
-    if not self.independent_exec_option_names:
-      return vector_values
+  def __deepcopy_vector_values(self, vector_values):
+    """Return a deepcopy of vector_values and merge exec options declared 
through
+    add_exec_option_dimension() into 'exec_option' dimension."""
     values = []
     exec_values = []
     exec_option = None
     for val in vector_values:
       if val.name == EXEC_OPTION_KEY:
+        # 'exec_option' is a map. We need to deepcopy the value for each 
vector.
         exec_option = deepcopy(val.value)
       elif val.name in self.independent_exec_option_names:
+        # save this to merge into exec_option later.
         exec_values.append(val)
       else:
         values.append(val)
-    assert exec_option is not None, (
-      "Must have '" + EXEC_OPTION_KEY + "' dimension previously declared!")
-    for val in exec_values:
-      exec_option[val.name] = val.value
-    values.append(ImpalaTestVector.Value(EXEC_OPTION_KEY, exec_option))
+    if self.independent_exec_option_names:
+      assert exec_option is not None, (
+        "Must have '" + EXEC_OPTION_KEY + "' dimension previously declared!")
+      for val in exec_values:
+        exec_option[val.name] = val.value
+    if exec_option:
+      values.append(ImpalaTestVector.Value(EXEC_OPTION_KEY, exec_option))
     return values
 
   def __generate_exhaustive_combinations(self):
-    return [ImpalaTestVector(self.embed_independent_exec_options(vec))
+    return [ImpalaTestVector(self.__deepcopy_vector_values(vec))
       for vec in product(*self.__extract_vector_values()) if 
self.is_valid(vec)]
 
   def __generate_pairwise_combinations(self):
@@ -231,7 +235,7 @@ class ImpalaTestMatrix(object):
     # results will be the same.
     if len(self.dimensions) == 1:
       return self.__generate_exhaustive_combinations()
-    return [ImpalaTestVector(self.embed_independent_exec_options(vec))
+    return [ImpalaTestVector(self.__deepcopy_vector_values(vec))
       for vec in all_pairs(self.__extract_vector_values(), 
filter_func=self.is_valid)]
 
   def add_constraint(self, constraint_func):
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 4da8977da..c8b703381 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -398,7 +398,7 @@ class TestWideTable(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension("num_cols", 
*cls.NUM_COLS))
     # To cut down on test execution time, only run in exhaustive.
     if cls.exploration_strategy() != 'exhaustive':
-      cls.ImpalaTestMatrix.add_constraint(lambda v: False)
+      pytest.skip("Only run in exhaustive")
 
   def test_wide_table(self, vector):
     if vector.get_value('table_format').file_format == 'kudu':
@@ -437,6 +437,7 @@ class TestHdfsScannerSkew(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestHdfsScannerSkew, cls).add_test_dimensions()
+    add_mandatory_exec_option(cls, 'mt_dop', 2)
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format in ('text') and
         v.get_value('table_format').compression_codec == 'none')
@@ -481,7 +482,7 @@ class TestHdfsScannerSkew(ImpalaTestSuite):
 
     tbl_name = unique_database + ".lineitem_skew"
     with self.create_impala_client() as imp_client:
-      imp_client.set_configuration_option('mt_dop', '2')
+      imp_client.set_configuration(vector.get_value('exec_option'))
       imp_client.execute("""create table {} like 
tpch.lineitem""".format(tbl_name))
       # Create a couple of small data files
       for i in range(1, 11):
@@ -661,16 +662,18 @@ class TestParquet(ImpalaTestSuite):
        repetetion level is set to REPEATED succeeds without errors."""
     create_table_from_parquet(self.client, unique_database, 
"repeated_root_schema")
 
-    result = self.client.execute(
-        "select * from %s.repeated_root_schema" % unique_database)
+    result = self.execute_query(
+        "select * from %s.repeated_root_schema" % unique_database,
+        vector.get_value('exec_option'))
     assert len(result.data) == 300
 
   def test_huge_num_rows(self, vector, unique_database):
     """IMPALA-5021: Tests that a zero-slot scan on a file with a huge num_rows 
in the
     footer succeeds without errors."""
     create_table_from_parquet(self.client, unique_database, "huge_num_rows")
-    result = self.client.execute("select count(*) from %s.huge_num_rows"
-      % unique_database)
+    result = self.execute_query(
+      "select count(*) from %s.huge_num_rows" % unique_database,
+      vector.get_value('exec_option'))
     assert len(result.data) == 1
     assert "4294967294" in result.data
 
@@ -811,30 +814,31 @@ class TestParquet(ImpalaTestSuite):
     # functional.parquet.alltypes is well-formatted. 'NumScannersWithNoReads' 
counters are
     # set to 0.
     table_name = 'functional_parquet.alltypes'
-    self._misaligned_parquet_row_groups_helper(table_name, 7300)
+    self._misaligned_parquet_row_groups_helper(vector, table_name, 7300)
     # lineitem_multiblock_parquet/000000_0 is ill-formatted but every scanner 
reads some
     # row groups. 'NumScannersWithNoReads' counters are set to 0.
     table_name = 'functional_parquet.lineitem_multiblock'
-    self._misaligned_parquet_row_groups_helper(table_name, 20000)
+    self._misaligned_parquet_row_groups_helper(vector, table_name, 20000)
     # lineitem_sixblocks.parquet is ill-formatted but every scanner reads some 
row groups.
     # 'NumScannersWithNoReads' counters are set to 0.
     table_name = 'functional_parquet.lineitem_sixblocks'
-    self._misaligned_parquet_row_groups_helper(table_name, 40000)
+    self._misaligned_parquet_row_groups_helper(vector, table_name, 40000)
     # Scanning lineitem_one_row_group.parquet finds two scan ranges that end 
up doing no
     # reads because the file is poorly formatted.
     table_name = 'functional_parquet.lineitem_multiblock_one_row_group'
     self._misaligned_parquet_row_groups_helper(
-        table_name, 40000, num_scanners_with_no_reads=2)
+        vector, table_name, 40000, num_scanners_with_no_reads=2)
 
   def _misaligned_parquet_row_groups_helper(
-      self, table_name, rows_in_table, num_scanners_with_no_reads=0, 
log_prefix=None):
+      self, vector, table_name, rows_in_table, num_scanners_with_no_reads=0,
+      log_prefix=None):
     """Checks if executing a query logs any warnings and if there are any 
scanners that
     end up doing no reads. 'log_prefix' specifies the prefix of the expected 
warning.
     'num_scanners_with_no_reads' indicates the expected number of scanners 
that don't read
     anything because the underlying file is poorly formatted
     """
     query = 'select * from %s' % table_name
-    result = self.client.execute(query)
+    result = self.execute_query(query, vector.get_value('exec_option'))
     assert len(result.data) == rows_in_table
     assert (not result.log and not log_prefix) or \
         (log_prefix and result.log.startswith(log_prefix))
@@ -864,8 +868,9 @@ class TestParquet(ImpalaTestSuite):
     instead we verify sum of ranges read on a backend is 2."""
     query = 'select count(l_orderkey) from 
functional_parquet.lineitem_sixblocks'
     try:
-      self.client.set_configuration_option('mt_dop', '2')
-      result = self.client.execute(query)
+      options = vector.get_value('exec_option')
+      options['mt_dop'] = 2
+      result = self.execute_query(query, options)
       TOTAL_ROWS = 40000
       ranges_complete_list = re.findall(r'ScanRangesComplete: ([0-9]*)',
         result.runtime_profile)
@@ -905,11 +910,11 @@ class TestParquet(ImpalaTestSuite):
     # For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 
blocks, so
     # each impalad should read 1 scan range.
     table_name = 'functional_parquet.lineitem_multiblock'
-    self._multiple_blocks_helper(table_name, 20000, ranges_per_node=1)
+    self._multiple_blocks_helper(vector, table_name, 20000, ranges_per_node=1)
     table_name = 'functional_parquet.lineitem_sixblocks'
     # 2 scan ranges per node should be created to read 'lineitem_sixblocks' 
because
     # there are 6 blocks and 3 scan nodes.
-    self._multiple_blocks_helper(table_name, 40000, ranges_per_node=2)
+    self._multiple_blocks_helper(vector, table_name, 40000, ranges_per_node=2)
 
   @SkipIfFS.hdfs_small_block
   @SkipIfLocal.multiple_impalad
@@ -919,10 +924,10 @@ class TestParquet(ImpalaTestSuite):
     # scan range reads everything from this row group.
     table_name = 'functional_parquet.lineitem_multiblock_one_row_group'
     self._multiple_blocks_helper(
-        table_name, 40000, one_row_group=True, ranges_per_node=1)
+        vector, table_name, 40000, one_row_group=True, ranges_per_node=1)
 
   def _multiple_blocks_helper(
-      self, table_name, rows_in_table, one_row_group=False, ranges_per_node=1):
+      self, vector, table_name, rows_in_table, one_row_group=False, 
ranges_per_node=1):
     """ This function executes a simple SELECT query on a multiblock parquet 
table and
     verifies the number of ranges issued per node and verifies that at least 
one row group
     was read. If 'one_row_group' is True, then one scan range is expected to 
read the data
@@ -930,7 +935,7 @@ class TestParquet(ImpalaTestSuite):
     how many scan ranges we expect to be issued per node. """
 
     query = 'select count(l_orderkey) from %s' % table_name
-    result = self.client.execute(query)
+    result = self.execute_query(query, vector.get_value('exec_option'))
     assert len(result.data) == 1
     assert result.data[0] == str(rows_in_table)
 
@@ -984,17 +989,19 @@ class TestParquet(ImpalaTestSuite):
     if self.exploration_strategy() != 'exhaustive': pytest.skip("Only run in 
exhaustive")
 
     # Create table
+    options = vector.get_value('exec_option')
     TABLE_NAME = "parquet_annotate_utf8_test"
     qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME)
     query = 'create table %s (a string, b char(10), c varchar(10), d string) ' 
\
             'stored as parquet' % qualified_table_name
-    self.client.execute(query)
+    self.execute_query(query, options)
 
     # Insert data that should have UTF8 annotation
     query = 'insert overwrite table %s '\
             'values("a", cast("b" as char(10)), cast("c" as varchar(10)), 
"d")' \
             % qualified_table_name
-    self.execute_query(query, {'parquet_annotate_strings_utf8': True})
+    options['parquet_annotate_strings_utf8'] = True
+    self.execute_query(query, options)
 
     def get_schema_elements():
       # Copy the created file to the local filesystem and parse metadata
@@ -1026,7 +1033,8 @@ class TestParquet(ImpalaTestSuite):
     assert d_schema_elt.converted_type == ConvertedType.UTF8
 
     # Create table and insert data that should not have UTF8 annotation for 
strings
-    self.execute_query(query, {'parquet_annotate_strings_utf8': False})
+    options['parquet_annotate_strings_utf8'] = False
+    self.execute_query(query, options)
 
     # Check that the schema does not use the UTF8 annotation except for CHAR 
and VARCHAR
     # columns
@@ -1071,7 +1079,8 @@ class TestParquet(ImpalaTestSuite):
     TABLE_NAME = "dict_encoding_with_large_bit_width"
     create_table_from_parquet(self.client, unique_database, TABLE_NAME)
     result = self.execute_query(
-        "select * from {0}.{1}".format(unique_database, TABLE_NAME))
+        "select * from {0}.{1}".format(unique_database, TABLE_NAME),
+        vector.get_value('exec_option'))
     assert(len(result.data) == 33)
 
   def test_rle_dictionary_encoding(self, vector, unique_database):
@@ -1168,12 +1177,13 @@ class TestParquet(ImpalaTestSuite):
     """IMPALA-6964: Test that the counter Parquet[Un]compressedPageSize is 
updated
        when reading [un]compressed Parquet files, and that the counter
        Parquet[Un]compressedPageSize is not updated."""
+    options = vector.get_value('exec_option')
     # lineitem_sixblocks is not compressed so ParquetCompressedPageSize should 
be empty,
     # but ParquetUncompressedPageSize should have been updated. Query needs an 
order by
     # so that all rows are read. Only access a couple of columns to reduce 
query runtime.
-    result = self.client.execute("select l_orderkey"
-                                 " from functional_parquet.lineitem_sixblocks"
-                                 " order by l_orderkey limit 10")
+    result = self.execute_query("select l_orderkey"
+                                " from functional_parquet.lineitem_sixblocks"
+                                " order by l_orderkey limit 10", options)
 
     compressed_page_size_summaries = get_bytes_summary_stats_counter(
         "ParquetCompressedPageSize", result.runtime_profile)
@@ -1195,8 +1205,8 @@ class TestParquet(ImpalaTestSuite):
     # alltypestiny is compressed so both ParquetCompressedPageSize and
     # ParquetUncompressedPageSize should have been updated
     # Query needs an order by so that all rows are read.
-    result = self.client.execute("select int_col from 
functional_parquet.alltypestiny"
-                                 " order by int_col limit 10")
+    result = self.execute_query("select int_col from 
functional_parquet.alltypestiny"
+                                " order by int_col limit 10", options)
 
     for summary_name in ("ParquetCompressedPageSize", 
"ParquetUncompressedPageSize"):
       page_size_summaries = get_bytes_summary_stats_counter(
@@ -1211,13 +1221,14 @@ class TestParquet(ImpalaTestSuite):
     """IMPALA-6964: Test that the counter 
Parquet[Un]compressedBytesReadPerColumn is
        updated when reading [un]compressed Parquet files, and that the counter
        Parquet[Un]CompressedBytesReadPerColumn is not updated."""
+    options = vector.get_value('exec_option')
     # lineitem_sixblocks is not compressed so 
ParquetCompressedBytesReadPerColumn should
     # be empty, but ParquetUncompressedBytesReadPerColumn should have been 
updated
     # Query needs an order by so that all rows are read. Only access a couple 
of
     # columns to reduce query runtime.
-    result = self.client.execute("select l_orderkey, l_partkey "
-                                 "from functional_parquet.lineitem_sixblocks "
-                                 " order by l_orderkey limit 10")
+    result = self.execute_query("select l_orderkey, l_partkey "
+                                "from functional_parquet.lineitem_sixblocks "
+                                " order by l_orderkey limit 10", options)
 
     compressed_bytes_read_per_col_summaries = get_bytes_summary_stats_counter(
         "ParquetCompressedBytesReadPerColumn", result.runtime_profile)
@@ -1244,8 +1255,8 @@ class TestParquet(ImpalaTestSuite):
     # alltypestiny is compressed so both ParquetCompressedBytesReadPerColumn 
and
     # ParquetUncompressedBytesReadPerColumn should have been updated
     # Query needs an order by so that all rows are read.
-    result = self.client.execute("select * from 
functional_parquet.alltypestiny"
-                                 " order by int_col limit 10")
+    result = self.execute_query("select * from functional_parquet.alltypestiny"
+                                " order by int_col limit 10", options)
 
     for summary_name in ("ParquetCompressedBytesReadPerColumn",
                          "ParquetUncompressedBytesReadPerColumn"):
@@ -1433,7 +1444,7 @@ class TestParquetScanRangeAssigment(ImpalaTestSuite):
     result = self.execute_query("select sum(l_extendedprice * l_discount) as 
revenue "
         "from tpch_parquet.lineitem where l_shipdate >= '1994-01-01' and "
         "l_shipdate < '1995-01-01' and l_discount between 0.05 and 0.07 and "
-        "l_quantity < 24")
+        "l_quantity < 24", vector.get_value('exec_option'))
 
     # NumRowGroups tracks the number of row groups actually read, not 
necessarily the
     # number assigned. Assert that each fragment processed exactly one row 
group.
@@ -1523,11 +1534,10 @@ class TestTextSplitDelimiters(ImpalaTestSuite):
             - materializes (no)
     """
     DATA = "abc\r\nde\r\nfg\rhij\rklm\r\nno\r"
-    max_scan_range_length = 4
+    vector.get_value('exec_option')['max_scan_range_length'] = 4
     expected_result = ['abc', 'de', 'fg', 'hij', 'klm', 'no']
 
-    self._create_and_query_test_table(
-      vector, unique_database, DATA, max_scan_range_length, expected_result)
+    self._create_and_query_test_table(vector, unique_database, DATA, 
expected_result)
 
   def test_text_split_across_buffers_delimiter(self, vector, unique_database):
     """Creates and queries a datafile that exercises a split "\r\n" across io 
buffers (but
@@ -1545,31 +1555,29 @@ class TestTextSplitDelimiters(ImpalaTestSuite):
     max_scan_range_length = DEFAULT_IO_BUFFER_SIZE * 2
     expected_result = data.split("\r\n")
 
-    new_vector = deepcopy(vector)
-    new_vector.get_value('exec_option')['max_row_size'] = 9 * 1024 * 1024
+    vector.get_value('exec_option')['max_row_size'] = 9 * 1024 * 1024
+    vector.get_value('exec_option')['max_scan_range_length'] = 
max_scan_range_length
 
-    self._create_and_query_test_table(
-      new_vector, unique_database, data, max_scan_range_length, 
expected_result)
+    self._create_and_query_test_table(vector, unique_database, data, 
expected_result)
 
-  def _create_and_query_test_table(self, vector, unique_database, data,
-        max_scan_range_length, expected_result):
+  def _create_and_query_test_table(self, vector, unique_database, data, 
expected_result):
+    options = vector.get_value('exec_option')
     TABLE_NAME = "test_text_split_delimiters"
     qualified_table_name = "%s.%s" % (unique_database, TABLE_NAME)
     location = get_fs_path("/test-warehouse/%s_%s" % (unique_database, 
TABLE_NAME))
     query = "create table %s (s string) location '%s'" % 
(qualified_table_name, location)
-    self.client.execute(query)
+    self.execute_query(query, options)
 
     # Passing "w+" to NamedTemporaryFile prevents it from opening the file in 
bytes mode
     with tempfile.NamedTemporaryFile(mode="w+") as f:
       f.write(data)
       f.flush()
       self.filesystem_client.copy_from_local(f.name, location)
-    self.client.execute("refresh %s" % qualified_table_name)
+    self.execute_query("refresh %s" % qualified_table_name, options)
 
-    vector.get_value('exec_option')['max_scan_range_length'] = 
max_scan_range_length
     query = "select * from %s" % qualified_table_name
     result = self.execute_query_expect_success(
-      self.client, query, vector.get_value('exec_option'))
+      self.client, query, options)
 
     assert sorted(result.data) == sorted(expected_result)
 
@@ -1627,22 +1635,23 @@ class TestScanTruncatedFiles(ImpalaTestSuite):
       cls.ImpalaTestMatrix.add_dimension(
           create_uncompressed_text_dimension(cls.get_workload()))
     else:
-      cls.ImpalaTestMatrix.add_constraint(lambda v: False)
+      pytest.skip("Only run in exhaustive")
 
   def test_scan_truncated_file_empty(self, vector, unique_database):
-    self.scan_truncated_file(0, unique_database)
+    self.scan_truncated_file(vector, 0, unique_database)
 
   def test_scan_truncated_file(self, vector, unique_database):
-    self.scan_truncated_file(10, unique_database)
+    self.scan_truncated_file(vector, 10, unique_database)
 
-  def scan_truncated_file(self, num_rows, db_name):
+  def scan_truncated_file(self, vector, num_rows, db_name):
     fq_tbl_name = db_name + ".truncated_file_test"
-    self.execute_query("create table %s (s string)" % fq_tbl_name)
+    options = vector.get_value('exec_option')
+    self.execute_query("create table %s (s string)" % fq_tbl_name, options)
     self.run_stmt_in_hive("insert overwrite table %s select string_col from "
         "functional.alltypes" % fq_tbl_name)
 
     # Update the Impala metadata
-    self.execute_query("refresh %s" % fq_tbl_name)
+    self.execute_query("refresh %s" % fq_tbl_name, options)
 
     # Insert overwrite with a truncated file
     self.run_stmt_in_hive("insert overwrite table %s select string_col from "
@@ -1650,9 +1659,9 @@ class TestScanTruncatedFiles(ImpalaTestSuite):
 
     # The file will not exist if the table is empty and the insert is done by 
Hive 3, so
     # another refresh is needed.
-    self.execute_query("refresh %s" % fq_tbl_name)
+    self.execute_query("refresh %s" % fq_tbl_name, options)
 
-    result = self.execute_query("select count(*) from %s" % fq_tbl_name)
+    result = self.execute_query("select count(*) from %s" % fq_tbl_name, 
options)
     assert(len(result.data) == 1)
     assert(result.data[0] == str(num_rows))
 
@@ -1691,49 +1700,52 @@ class TestOrc(ImpalaTestSuite):
   @SkipIfFS.hdfs_small_block
   @SkipIfLocal.multiple_impalad
   def test_misaligned_orc_stripes(self, vector, unique_database):
-    self._build_lineitem_table_helper(unique_database, 'lineitem_threeblocks',
+    self._build_lineitem_table_helper(vector, unique_database, 
'lineitem_threeblocks',
         'lineitem_threeblocks.orc')
-    self._build_lineitem_table_helper(unique_database, 'lineitem_sixblocks',
+    self._build_lineitem_table_helper(vector, unique_database, 
'lineitem_sixblocks',
         'lineitem_sixblocks.orc')
-    self._build_lineitem_table_helper(unique_database,
+    self._build_lineitem_table_helper(vector, unique_database,
         'lineitem_orc_multiblock_one_stripe',
         'lineitem_orc_multiblock_one_stripe.orc')
 
     # functional_orc.alltypes is well-formatted. 'NumScannersWithNoReads' 
counters are
     # set to 0.
     table_name = 'functional_orc_def.alltypes'
-    self._misaligned_orc_stripes_helper(table_name, 7300)
+    self._misaligned_orc_stripes_helper(vector, table_name, 7300)
     # lineitem_threeblock.orc is ill-formatted but every scanner reads some 
stripes.
     # 'NumScannersWithNoReads' counters are set to 0.
     table_name = unique_database + '.lineitem_threeblocks'
-    self._misaligned_orc_stripes_helper(table_name, 16000)
+    self._misaligned_orc_stripes_helper(vector, table_name, 16000)
     # lineitem_sixblocks.orc is ill-formatted but every scanner reads some 
stripes.
     # 'NumScannersWithNoReads' counters are set to 0.
     table_name = unique_database + '.lineitem_sixblocks'
-    self._misaligned_orc_stripes_helper(table_name, 30000)
+    self._misaligned_orc_stripes_helper(vector, table_name, 30000)
     # Scanning lineitem_orc_multiblock_one_stripe.orc finds two scan ranges 
that end up
     # doing no reads because the file is poorly formatted.
     table_name = unique_database + '.lineitem_orc_multiblock_one_stripe'
     self._misaligned_orc_stripes_helper(
-      table_name, 16000, num_scanners_with_no_reads=2)
+      vector, table_name, 16000, num_scanners_with_no_reads=2)
 
-  def _build_lineitem_table_helper(self, db, tbl, file):
-    self.client.execute("create table %s.%s like tpch.lineitem stored as orc" 
% (db, tbl))
+  def _build_lineitem_table_helper(self, vector, db, tbl, file):
+    options = vector.get_value('exec_option')
+    self.execute_query(
+      "create table %s.%s like tpch.lineitem stored as orc" % (db, tbl), 
options)
     tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (db, tbl))
     # set block size to 156672 so lineitem_threeblocks.orc occupies 3 blocks,
     # lineitem_sixblocks.orc occupies 6 blocks.
     check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal', 
'-d', '-f',
         os.environ['IMPALA_HOME'] + "/testdata/LineItemMultiBlock/" + file, 
tbl_loc])
-    self.client.execute("refresh %s.%s" % (db, tbl))
+    self.execute_query("refresh %s.%s" % (db, tbl), options)
 
   def _misaligned_orc_stripes_helper(
-          self, table_name, rows_in_table, num_scanners_with_no_reads=0):
+          self, vector, table_name, rows_in_table, 
num_scanners_with_no_reads=0):
     """Checks if 'num_scanners_with_no_reads' indicates the expected number of 
scanners
     that don't read anything because the underlying file is poorly formatted.
     Additionally, test that select count(star) match with expected number of 
rows.
     """
+    options = vector.get_value('exec_option')
     query = 'select * from %s' % table_name
-    result = self.client.execute(query)
+    result = self.execute_query(query, options)
     assert len(result.data) == rows_in_table
 
     num_scanners_with_no_reads_list = re.findall(
@@ -1753,7 +1765,7 @@ class TestOrc(ImpalaTestSuite):
 
     # Test that select count(star) match with expected number of rows.
     query = 'select count(*) from %s' % table_name
-    result = self.client.execute(query)
+    result = self.execute_query(query, options)
     assert int(result.data[0]) == rows_in_table
 
   # Skip this test on non-HDFS filesystems, because orc-type-check.test 
contains Hive
@@ -1886,7 +1898,7 @@ class TestOrc(ImpalaTestSuite):
       self.run_test_case("QueryTest/orc_timestamp_with_local_timezone", vector,
           unique_database)
 
-  def _run_invalid_schema_test(self, unique_database, test_name, 
expected_error):
+  def _run_invalid_schema_test(self, vector, unique_database, test_name, 
expected_error):
     """Copies 'test_name'.orc to a table and runs a simple query. These tests 
should
        cause an error during the processing of the ORC schema, so the file's 
columns do
        not have to match with the table's columns.
@@ -1896,14 +1908,15 @@ class TestOrc(ImpalaTestSuite):
         "CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC",
         unique_database, test_name, test_files)
     err = self.execute_query_expect_failure(self.client,
-        "select count(id) from {0}.{1}".format(unique_database, test_name))
+        "select count(id) from {0}.{1}".format(unique_database, test_name),
+        vector.get_value('exec_option'))
     assert expected_error in str(err)
 
   def test_invalid_schema(self, vector, unique_database):
     """Test scanning of ORC file with malformed schema."""
-    self._run_invalid_schema_test(unique_database, "corrupt_schema",
+    self._run_invalid_schema_test(vector, unique_database, "corrupt_schema",
         "Encountered parse error in tail of ORC file")
-    self._run_invalid_schema_test(unique_database, "corrupt_root_type",
+    self._run_invalid_schema_test(vector, unique_database, "corrupt_root_type",
         "Root of the selected type returned by the ORC lib is not STRUCT: 
boolean.")
 
   def test_date_out_of_range_orc(self, vector, unique_database):
@@ -2076,29 +2089,30 @@ class TestSingleFileTable(ImpalaTestSuite):
 
   def test_single_file_table(self, vector, unique_database):
     # Create a simple table with one column.
+    options = vector.get_value('exec_option')
     params = {"db": unique_database, "tbl": "single_file_table"}
     create_tbl_ddl = ("create external table {db}.{tbl} (c1 int) "
                       "stored as textfile").format(**params)
-    self.execute_query_expect_success(self.client, create_tbl_ddl)
+    self.execute_query_expect_success(self.client, create_tbl_ddl, options)
 
     # Insert one value to the table.
     insert_stmt = "insert into {db}.{tbl} values (1)".format(**params)
-    self.execute_query_expect_success(self.client, insert_stmt)
+    self.execute_query_expect_success(self.client, insert_stmt, options)
 
     # Show files and get the path to the first data file.
     show_files_stmt = "show files in {db}.{tbl}".format(**params)
-    res = self.execute_query_expect_success(self.client, show_files_stmt)
+    res = self.execute_query_expect_success(self.client, show_files_stmt, 
options)
     assert len(res.data) == 1
     hdfs_file_path = res.data[0].split("\t")[0]
     params['new_location'] = hdfs_file_path
 
     # Alter location to point a data file.
     alter_stmt = "alter table {db}.{tbl} set location 
'{new_location}'".format(**params)
-    self.execute_query_expect_success(self.client, alter_stmt)
+    self.execute_query_expect_success(self.client, alter_stmt, options)
 
     # Show files and count star should still work.
-    res = self.execute_query_expect_success(self.client, show_files_stmt)
+    res = self.execute_query_expect_success(self.client, show_files_stmt, 
options)
     assert res.data[0].split("\t")[0] == (hdfs_file_path + '/')
     select_stmt = "select count(*) from {db}.{tbl}".format(**params)
-    res = self.execute_query_expect_success(self.client, select_stmt)
+    res = self.execute_query_expect_success(self.client, select_stmt, options)
     assert res.data[0].split("\t")[0] == '1'

Reply via email to