This is an automated email from the ASF dual-hosted git repository. laszlog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit b47955df1fa6cf89a8a987821ff5ebcf38970aff Author: Riza Suminto <[email protected]> AuthorDate: Mon Jan 27 11:20:39 2025 -0800 IMPALA-13701: Rewrite test_invalidate_stale_partition_on_reload Test test_invalidate_stale_partition_on_reload continues to be flaky. It is suspected that the test order is not deterministic enough to ensure expected Coordinator log lines are printed consistently. This patch rewrite the test and make it more deterministic by: 1. Set shorter hms_event_polling_interval_s and statestore_heartbeat_frequency_ms. 2. Running the "select *" query twice: before, and after the Hive insert query. 3. Wait for a full statestore_heartbeat_frequency_ms before inspecting logs and run the second query. 4. Validate that the second 'select *' query use higher table version number than the first one. Testing: - Loop the test 100 times and pass in local machine. Change-Id: Id578de1d6c9f809178d0b612068c449cf6c5f653 Reviewed-on: http://gerrit.cloudera.org:8080/22400 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- tests/custom_cluster/test_events_custom_configs.py | 54 ++++++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 31a6b7b33..f8c3d0752 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -17,8 +17,10 @@ from __future__ import absolute_import, division, print_function from builtins import range import logging -from os import getenv import pytest +import re +from os import getenv +from time import sleep from beeswaxd.BeeswaxService import QueryState @@ -38,6 +40,17 @@ from tests.util.iceberg_util import IcebergCatalogs HIVE_SITE_HOUSEKEEPING_ON =\ getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on' TRUNCATE_TBL_STMT = 'truncate table' +# The statestore heartbeat and topic update frequency (ms). Set low for testing. +STATESTORE_RPC_FREQUENCY_MS = 100 +STATESTORED_ARGS = ( + "-statestore_heartbeat_frequency_ms={freq_ms} " + "-statestore_priority_update_frequency_ms={freq_ms}").format( + freq_ms=STATESTORE_RPC_FREQUENCY_MS) + + +def wait_single_statestore_heartbeat(): + """Wait for state sync across impalads.""" + sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0) class TestEventProcessingCustomConfigsBase(CustomClusterTestSuite): @@ -1307,22 +1320,51 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): verify_partition(True) verify_partition(False) + def __search_table_version(self, profile, full_table_name): + """Seach version of the first table mentioned in runtime protile.""" + version_regex = r"Original Table Versions: {}, (\d+),".format( + full_table_name) + m = re.search(version_regex, profile) + assert m is not None, 'Searching for "{}" in profile but found none\n{}'.format( + version_regex, profile + ) + return int(m.group(1)) + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", - catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=5", + catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=1", + statestored_args=STATESTORED_ARGS, disable_log_buffering=True, cluster_size=1) def test_invalidate_stale_partition_on_reload(self, unique_database): + """Test that metadata change from external system will eventually propagate + to Coordinator in local catalog mode.""" + # Create table with 3 partitions. test_tbl = unique_database + ".test_invalidate_table" self.client.execute("create table {} (id int) partitioned by (p int)" .format(test_tbl)) self.client.execute("alter table {} add partition (p=0)".format(test_tbl)) self.client.execute("alter table {} add partition (p=1)".format(test_tbl)) self.client.execute("alter table {} add partition (p=2)".format(test_tbl)) + + # Run Impala query to make sure all partitions metadata are loaded. + result = self.client.execute("select * from {}".format(test_tbl)) + assert len(result.data) == 0 + first_version = self.__search_table_version(result.runtime_profile, test_tbl) + + # Run Hive query that triggers metadata change. self.run_stmt_in_hive("SET hive.exec.dynamic.partition.mode=nonstrict; " "insert into {} partition(p) values (0,0),(1,1),(2,2)".format(test_tbl)) - self.client.execute("select * from {}".format(test_tbl)) + + # Let CatalogD hear about the new event. EventProcessorUtils.wait_for_event_processing(self) + + # Wait for StatestoreD to propagate the update. + wait_single_statestore_heartbeat() + + # Validate that relevant log lines are printed in Coordinator. + # This should be instantaneous after statestore update received, but set explicit + # long timeout as a precaution. log_regex = r"Invalidated objects in cache: \[partition {}:p={}"\ .format(test_tbl, '%d') self.assert_impalad_log_contains('INFO', log_regex % 0, expected_count=1, @@ -1332,6 +1374,12 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): self.assert_impalad_log_contains('INFO', log_regex % 2, expected_count=1, timeout_s=20) + # Rerun Impala query and confirm that Coordinator use a newer table version. + result = self.client.execute("select * from {}".format(test_tbl)) + assert len(result.data) == 3 + second_version = self.__search_table_version(result.runtime_profile, test_tbl) + assert first_version < second_version + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true",
