This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit a870a11e64bcbe14730e2b50cfee7e63eeb2fc0c Author: Michael Smith <[email protected]> AuthorDate: Mon Oct 24 14:05:23 2022 -0700 IMPALA-7098: Re-enable tests under EC Re-enables tests under erasure coding, or provides more specific exceptions. Erasure coding uses multiple data blocks to construct a block group. Our tests use RS-3-2-1024k, which includes 3 data blocks in a block group. Each of these blocks is sized according to `dfs.block.size`, so block groups by default hold up to 384MB of data. Impala schedules work to executors based on blocks reported by HDFS, which for EC actually represent block groups. So with default block size, a file in EC has 1/3rd the number of schedulable blocks. In the case of tpch.lineitem, this produces 2 parquet files instead of 3 and reduces the number of executors scheduled to read parquet lineitem as 1. lineitem.tbl is loaded via Hive. With EC it uses 2 block groups, without EC it uses 6 blocks. 2. parquet lineitem is created by select/insert from lineitem.tbl. Impala schedules reads to executors based on available blocks, so with EC this gets scheduled across 2 executors instead of 3 and each executor writes a separate parquet file. Change-Id: Ib452024993e35d5a8d2854c6b2085115b26e40df Reviewed-on: http://gerrit.cloudera.org:8080/19172 Reviewed-by: Joe McDonnell <[email protected]> Tested-by: Joe McDonnell <[email protected]> --- tests/common/skip.py | 14 ++++++++------ tests/custom_cluster/test_admission_controller.py | 2 +- tests/custom_cluster/test_auto_scaling.py | 4 ---- tests/custom_cluster/test_query_retries.py | 3 ++- tests/custom_cluster/test_runtime_profile.py | 3 +-- tests/metadata/test_explain.py | 4 +++- tests/query_test/test_hdfs_caching.py | 7 ++++--- tests/query_test/test_insert.py | 4 +--- tests/query_test/test_mt_dop.py | 3 +-- tests/query_test/test_nested_types.py | 3 +-- tests/query_test/test_resource_limits.py | 2 +- tests/query_test/test_scanners.py | 16 +++++----------- tests/util/auto_scaler.py | 3 +++ 13 files changed, 31 insertions(+), 37 deletions(-) diff --git a/tests/common/skip.py b/tests/common/skip.py index 055030a43..7b8118dca 100644 --- a/tests/common/skip.py +++ b/tests/common/skip.py @@ -48,7 +48,11 @@ class SkipIfFS: hdfs_caching = pytest.mark.skipif(not IS_HDFS, reason="SET CACHED not implemented") hdfs_encryption = pytest.mark.skipif(not IS_HDFS, reason="HDFS encryption is not supported") - hdfs_block_size = pytest.mark.skipif(not IS_HDFS, reason="uses it's own block size") + # EC reports block groups of 3 blocks, and the minimum block size is 1MB. + hdfs_small_block = pytest.mark.skipif(not IS_HDFS or IS_EC, + reason="Requires tables with 1MB block size") + hdfs_block_size = pytest.mark.skipif(not IS_HDFS, + reason="Size of block reported to Impala is not ~128MB") hdfs_acls = pytest.mark.skipif(not IS_HDFS, reason="HDFS acls are not supported") # TODO: IMPALA-11584: see if this can be collapsed into SkipIfNotHdfsMinicluster always_remote = pytest.mark.skipif(IS_EC or not (IS_HDFS or IS_OZONE) @@ -66,7 +70,7 @@ class SkipIfFS: reason="IMPALA-10562") late_filters = pytest.mark.skipif(IS_ISILON, reason="IMPALA-6998") read_past_eof = pytest.mark.skipif(IS_S3 or IS_GCS, reason="IMPALA-2512") - large_block_size = pytest.mark.skipif(IS_OZONE, + large_block_size = pytest.mark.skipif(IS_OZONE or IS_EC, reason="block size is larger than 128MB") # These need test infra work to re-enable. @@ -137,15 +141,13 @@ class SkipIfBuildType: reason="Test depends on running against a local Impala cluster") class SkipIfEC: - remote_read = pytest.mark.skipif(IS_EC, reason="EC files are read remotely and " - "features relying on local read do not work.") oom = pytest.mark.skipif(IS_EC, reason="Probably broken by HDFS-13540.") - fix_later = pytest.mark.skipif(IS_EC, reason="It should work but doesn't.") contain_full_explain = pytest.mark.skipif(IS_EC, reason="Contain full explain output " "for hdfs tables.") - different_schedule = pytest.mark.skipif(IS_EC, reason="Query is scheduled differently.") different_scan_split = pytest.mark.skipif(IS_EC, reason="Scan split of row " "groups for Parquet tables created in EC mode is different.") + parquet_file_size = pytest.mark.skipif(IS_EC, + reason="Fewer parquet files due to large block size, reducing parallelism") class SkipIfDockerizedCluster: diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 8fdc8667f..445c4bda4 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -437,7 +437,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): ".* is greater than pool max mem resources 10.00 MB", str(ex)) @SkipIfFS.hdfs_block_size - @SkipIfEC.fix_later + @SkipIfEC.parquet_file_size @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1, diff --git a/tests/custom_cluster/test_auto_scaling.py b/tests/custom_cluster/test_auto_scaling.py index 7547d98dc..eb7832660 100644 --- a/tests/custom_cluster/test_auto_scaling.py +++ b/tests/custom_cluster/test_auto_scaling.py @@ -24,7 +24,6 @@ from time import sleep, time from tests.util.auto_scaler import AutoScaler from tests.util.concurrent_workload import ConcurrentWorkload from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.skip import SkipIfEC LOG = logging.getLogger("test_auto_scaling") TOTAL_BACKENDS_METRIC_NAME = "cluster-membership.backends.total" @@ -66,7 +65,6 @@ class TestAutoScaling(CustomClusterTestSuite): LOG.info("Current running queries: %s", running_queries) return running_queries - @SkipIfEC.fix_later def test_single_workload(self): """This test exercises the auto-scaling logic in the admission controller. It spins up a base cluster (coordinator, catalog, statestore), runs a workload to initiate a @@ -121,7 +119,6 @@ class TestAutoScaling(CustomClusterTestSuite): LOG.info("Stopping auto scaler") auto_scaler.stop() - @SkipIfEC.fix_later def test_single_group_maxed_out(self): """This test starts an auto scaler and limits it to a single executor group. It then makes sure that the query throughput does not exceed the expected limit.""" @@ -181,7 +178,6 @@ class TestAutoScaling(CustomClusterTestSuite): LOG.info("Stopping auto scaler") auto_scaler.stop() - @SkipIfEC.fix_later def test_sequential_startup(self): """This test starts an executor group sequentially and observes that no queries are admitted until the group has been fully started.""" diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py index 34e83ae92..469bb326f 100644 --- a/tests/custom_cluster/test_query_retries.py +++ b/tests/custom_cluster/test_query_retries.py @@ -49,13 +49,14 @@ def _get_rpc_fail_action(port): def _get_disk_fail_action(port): return "IMPALA_TMP_FILE_WRITE:127.0.0.1:{port}:FAIL".format(port=port) + # All tests in this class have SkipIfEC because all tests run a query and expect # the query to be retried when killing a random impalad. On EC this does not always work # because many queries that might run on three impalads for HDFS / S3 builds, might only # run on two instances on EC builds. The difference is that EC creates smaller tables # compared to data stored on HDFS / S3. If the query is only run on two instances, then # randomly killing one impalad won't necessarily trigger a retry of the query. [email protected]_later [email protected]_file_size class TestQueryRetries(CustomClusterTestSuite): # A query that shuffles a lot of data. Useful when testing query retries since it diff --git a/tests/custom_cluster/test_runtime_profile.py b/tests/custom_cluster/test_runtime_profile.py index 3d669c954..cb5e2963c 100644 --- a/tests/custom_cluster/test_runtime_profile.py +++ b/tests/custom_cluster/test_runtime_profile.py @@ -17,7 +17,7 @@ import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from tests.common.skip import SkipIfEC, SkipIfFS +from tests.common.skip import SkipIfFS class TestRuntimeProfile(CustomClusterTestSuite): @@ -31,7 +31,6 @@ class TestRuntimeProfile(CustomClusterTestSuite): # Test depends on block size < 256MiB so larger table is stored in at least 4 blocks. @SkipIfFS.large_block_size - @SkipIfEC.different_schedule @pytest.mark.execute_serially @CustomClusterTestSuite.with_args('--gen_experimental_profile=true ' + PERIODIC_COUNTER_UPDATE_FLAG) diff --git a/tests/metadata/test_explain.py b/tests/metadata/test_explain.py index f19d68014..9e1e35697 100644 --- a/tests/metadata/test_explain.py +++ b/tests/metadata/test_explain.py @@ -26,7 +26,6 @@ from tests.util.filesystem_utils import WAREHOUSE # Tests the different explain levels [0-3] on a few queries. # TODO: Clean up this test to use an explain level test dimension and appropriate # result sub-sections for the expected explain plans. [email protected]_later class TestExplain(ImpalaTestSuite): # Value for the num_scanner_threads query option to ensure that the memory estimates of # scan nodes are consistent even when run on machines with different numbers of cores. @@ -53,18 +52,21 @@ class TestExplain(ImpalaTestSuite): self.run_test_case('QueryTest/explain-level0', vector) @SkipIfNotHdfsMinicluster.plans + @SkipIfEC.contain_full_explain def test_explain_level1(self, vector): vector.get_value('exec_option')['num_scanner_threads'] = self.NUM_SCANNER_THREADS vector.get_value('exec_option')['explain_level'] = 1 self.run_test_case('QueryTest/explain-level1', vector) @SkipIfNotHdfsMinicluster.plans + @SkipIfEC.contain_full_explain def test_explain_level2(self, vector): vector.get_value('exec_option')['num_scanner_threads'] = self.NUM_SCANNER_THREADS vector.get_value('exec_option')['explain_level'] = 2 self.run_test_case('QueryTest/explain-level2', vector) @SkipIfNotHdfsMinicluster.plans + @SkipIfEC.contain_full_explain def test_explain_level3(self, vector): vector.get_value('exec_option')['num_scanner_threads'] = self.NUM_SCANNER_THREADS vector.get_value('exec_option')['explain_level'] = 3 diff --git a/tests/query_test/test_hdfs_caching.py b/tests/query_test/test_hdfs_caching.py index 0d65ab4da..4f9f82c4a 100644 --- a/tests/query_test/test_hdfs_caching.py +++ b/tests/query_test/test_hdfs_caching.py @@ -25,15 +25,14 @@ from subprocess import check_call from tests.common.environ import build_flavor_timeout, IS_DOCKERIZED_TEST_CLUSTER from tests.common.impala_cluster import ImpalaCluster from tests.common.impala_test_suite import ImpalaTestSuite, LOG -from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfDockerizedCluster +from tests.common.skip import SkipIfFS, SkipIfDockerizedCluster from tests.common.test_dimensions import create_single_exec_option_dimension -from tests.util.filesystem_utils import get_fs_path +from tests.util.filesystem_utils import get_fs_path, IS_EC from tests.util.shell_util import exec_process # End to end test that hdfs caching is working. @SkipIfFS.hdfs_caching # missing coverage: verify SET CACHED gives error [email protected]_later class TestHdfsCaching(ImpalaTestSuite): @classmethod def get_workload(self): @@ -83,6 +82,8 @@ class TestHdfsCaching(ImpalaTestSuite): if IS_DOCKERIZED_TEST_CLUSTER: assert num_metrics_increased == 0, "HDFS caching is disabled in dockerised cluster." + elif IS_EC: + assert num_metrics_increased == 0, "HDFS caching is disabled with erasure coding." elif num_metrics_increased != 1: # Test failed, print the metrics for i in range(0, len(cached_bytes_before)): diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py index b57308add..213be78b7 100644 --- a/tests/query_test/test_insert.py +++ b/tests/query_test/test_insert.py @@ -25,7 +25,7 @@ from testdata.common import widetable from tests.common.impala_cluster import ImpalaCluster from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.parametrize import UniqueDatabase -from tests.common.skip import (SkipIfFS, SkipIfEC, SkipIfLocal, SkipIfHive2, +from tests.common.skip import (SkipIfFS, SkipIfLocal, SkipIfHive2, SkipIfNotHdfsMinicluster) from tests.common.test_dimensions import ( create_exec_option_dimension, @@ -133,8 +133,6 @@ class TestInsertQueries(ImpalaTestSuite): super(TestInsertQueries, cls).setup_class() @UniqueDatabase.parametrize(sync_ddl=True) - # Erasure coding doesn't respect memory limit - @SkipIfEC.fix_later # ABFS partition names cannot end in periods @SkipIfFS.file_or_folder_name_ends_with_period def test_insert(self, vector, unique_database): diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py index 4c9421046..58483c9ec 100644 --- a/tests/query_test/test_mt_dop.py +++ b/tests/query_test/test_mt_dop.py @@ -120,8 +120,7 @@ class TestMtDopParquet(ImpalaTestSuite): vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop') self.run_test_case('QueryTest/mt-dop-parquet-nested', vector) - # Impala scans fewer row groups than it should with erasure coding. - @SkipIfEC.fix_later + @SkipIfEC.parquet_file_size def test_parquet_filtering(self, vector): """IMPALA-4624: Test that dictionary filtering eliminates row groups correctly.""" vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop') diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py index 4401e783f..1603c369b 100644 --- a/tests/query_test/test_nested_types.py +++ b/tests/query_test/test_nested_types.py @@ -21,7 +21,7 @@ import pytest from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfHive2, SkipIfNotHdfsMinicluster +from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfNotHdfsMinicluster from tests.common.test_dimensions import (create_exec_option_dimension, create_exec_option_dimension_from_dict, create_client_protocol_dimension, create_orc_dimension, orc_schema_resolution_constraint) @@ -300,7 +300,6 @@ class TestNestedTypesNoMtDop(ImpalaTestSuite): self.run_test_case('QueryTest/nested-types-tpch-errors', vector, use_db='tpch_nested' + db_suffix) - @SkipIfEC.fix_later def test_parquet_stats(self, vector): """Queries that test evaluation of Parquet row group statistics.""" if vector.get_value('table_format').file_format == 'orc': diff --git a/tests/query_test/test_resource_limits.py b/tests/query_test/test_resource_limits.py index 37081e415..7644193c3 100644 --- a/tests/query_test/test_resource_limits.py +++ b/tests/query_test/test_resource_limits.py @@ -34,7 +34,7 @@ class TestResourceLimits(ImpalaTestSuite): create_parquet_dimension(cls.get_workload())) @SkipIfLocal.multiple_impalad - @SkipIfEC.fix_later + @SkipIfEC.parquet_file_size def test_thread_limits(self, vector): # Remove option from vector to allow test file to override it per query. del vector.get_value('exec_option')['num_nodes'] diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 29979929a..6bae1a4ee 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -35,7 +35,6 @@ from tests.common.impala_test_suite import ImpalaTestSuite, LOG from tests.common.skip import ( SkipIf, SkipIfFS, - SkipIfEC, SkipIfHive2, SkipIfHive3, SkipIfLocal, @@ -721,9 +720,8 @@ class TestParquet(ImpalaTestSuite): assert len(result.data) == 1 assert "AAAAAAAACPKFFAAA" in result.data - @SkipIfFS.hdfs_block_size + @SkipIfFS.hdfs_small_block @SkipIfLocal.multiple_impalad - @SkipIfEC.fix_later def test_misaligned_parquet_row_groups(self, vector): """IMPALA-3989: Test that no warnings are issued when misaligned row groups are encountered. Make sure that 'NumScannersWithNoReads' counters are set to the number of @@ -775,9 +773,8 @@ class TestParquet(ImpalaTestSuite): total += int(n) assert total == num_scanners_with_no_reads - @SkipIfFS.hdfs_block_size + @SkipIfFS.hdfs_small_block @SkipIfLocal.multiple_impalad - @SkipIfEC.fix_later def test_multiple_blocks_mt_dop(self, vector): """Sanity check for MT scan nodes to make sure all blocks from the same file are read. 2 scan ranges per node should be created to read 'lineitem_sixblocks' because @@ -821,9 +818,8 @@ class TestParquet(ImpalaTestSuite): finally: self.client.clear_configuration() - @SkipIfFS.hdfs_block_size + @SkipIfFS.hdfs_small_block @SkipIfLocal.multiple_impalad - @SkipIfEC.fix_later def test_multiple_blocks(self, vector): # For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 blocks, so # each impalad should read 1 scan range. @@ -834,9 +830,8 @@ class TestParquet(ImpalaTestSuite): # there are 6 blocks and 3 scan nodes. self._multiple_blocks_helper(table_name, 40000, ranges_per_node=2) - @SkipIfFS.hdfs_block_size + @SkipIfFS.hdfs_small_block @SkipIfLocal.multiple_impalad - @SkipIfEC.fix_later def test_multiple_blocks_one_row_group(self, vector): # For IMPALA-1881. The table functional_parquet.lineitem_multiblock_one_row_group has # 3 blocks but only one row group across these blocks. We test to see that only one @@ -1573,8 +1568,7 @@ class TestOrc(ImpalaTestSuite): lambda v: v.get_value('table_format').file_format == 'orc') cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1)) - @SkipIfFS.hdfs_block_size - @SkipIfEC.fix_later + @SkipIfFS.hdfs_small_block @SkipIfLocal.multiple_impalad def test_misaligned_orc_stripes(self, vector, unique_database): self._build_lineitem_table_helper(unique_database, 'lineitem_threeblocks', diff --git a/tests/util/auto_scaler.py b/tests/util/auto_scaler.py index bb6925e7d..e9754beb6 100755 --- a/tests/util/auto_scaler.py +++ b/tests/util/auto_scaler.py @@ -24,6 +24,7 @@ import os import pipes from subprocess import check_call from tests.common.impala_cluster import ImpalaCluster +from tests.util.filesystem_utils import IS_EC from threading import Event, Thread IMPALA_HOME = os.environ["IMPALA_HOME"] @@ -255,6 +256,8 @@ class AutoScaler(object): "-vmodule=admission-controller=3,cluster-membership-mgr=3", "-admission_control_slots=%s" % executor_slots, "-shutdown_grace_period_s=2"] + if IS_EC: + impalad_args.append("--default_query_options=allow_erasure_coded_files=true") options += ["--impalad_args=%s" % a for a in impalad_args]
