This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b47955df1fa6cf89a8a987821ff5ebcf38970aff
Author: Riza Suminto <[email protected]>
AuthorDate: Mon Jan 27 11:20:39 2025 -0800

    IMPALA-13701: Rewrite test_invalidate_stale_partition_on_reload
    
    Test test_invalidate_stale_partition_on_reload continues to be flaky. It
    is suspected that the test order is not deterministic enough to ensure
    expected Coordinator log lines are printed consistently.
    
    This patch rewrite the test and make it more deterministic by:
    1. Set shorter hms_event_polling_interval_s and
       statestore_heartbeat_frequency_ms.
    2. Running the "select *" query twice: before, and after the Hive insert
       query.
    3. Wait for a full statestore_heartbeat_frequency_ms before inspecting
       logs and run the second query.
    4. Validate that the second 'select *' query use higher table version
       number than the first one.
    
    Testing:
    - Loop the test 100 times and pass in local machine.
    
    Change-Id: Id578de1d6c9f809178d0b612068c449cf6c5f653
    Reviewed-on: http://gerrit.cloudera.org:8080/22400
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/custom_cluster/test_events_custom_configs.py | 54 ++++++++++++++++++++--
 1 file changed, 51 insertions(+), 3 deletions(-)

diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 31a6b7b33..f8c3d0752 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -17,8 +17,10 @@
 from __future__ import absolute_import, division, print_function
 from builtins import range
 import logging
-from os import getenv
 import pytest
+import re
+from os import getenv
+from time import sleep
 
 
 from beeswaxd.BeeswaxService import QueryState
@@ -38,6 +40,17 @@ from tests.util.iceberg_util import IcebergCatalogs
 HIVE_SITE_HOUSEKEEPING_ON =\
     getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on'
 TRUNCATE_TBL_STMT = 'truncate table'
+# The statestore heartbeat and topic update frequency (ms). Set low for 
testing.
+STATESTORE_RPC_FREQUENCY_MS = 100
+STATESTORED_ARGS = (
+  "-statestore_heartbeat_frequency_ms={freq_ms} "
+  "-statestore_priority_update_frequency_ms={freq_ms}").format(
+    freq_ms=STATESTORE_RPC_FREQUENCY_MS)
+
+
+def wait_single_statestore_heartbeat():
+  """Wait for state sync across impalads."""
+  sleep(STATESTORE_RPC_FREQUENCY_MS / 1000.0)
 
 
 class TestEventProcessingCustomConfigsBase(CustomClusterTestSuite):
@@ -1307,22 +1320,51 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     verify_partition(True)
     verify_partition(False)
 
+  def __search_table_version(self, profile, full_table_name):
+    """Seach version of the first table mentioned in runtime protile."""
+    version_regex = r"Original Table Versions: {}, (\d+),".format(
+      full_table_name)
+    m = re.search(version_regex, profile)
+    assert m is not None, 'Searching for "{}" in profile but found 
none\n{}'.format(
+      version_regex, profile
+    )
+    return int(m.group(1))
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=true",
-    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=5",
+    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=1",
+    statestored_args=STATESTORED_ARGS,
     disable_log_buffering=True, cluster_size=1)
   def test_invalidate_stale_partition_on_reload(self, unique_database):
+    """Test that metadata change from external system will eventually propagate
+    to Coordinator in local catalog mode."""
+    # Create table with 3 partitions.
     test_tbl = unique_database + ".test_invalidate_table"
     self.client.execute("create table {} (id int) partitioned by (p int)"
         .format(test_tbl))
     self.client.execute("alter table {} add partition (p=0)".format(test_tbl))
     self.client.execute("alter table {} add partition (p=1)".format(test_tbl))
     self.client.execute("alter table {} add partition (p=2)".format(test_tbl))
+
+    # Run Impala query to make sure all partitions metadata are loaded.
+    result = self.client.execute("select * from {}".format(test_tbl))
+    assert len(result.data) == 0
+    first_version = self.__search_table_version(result.runtime_profile, 
test_tbl)
+
+    # Run Hive query that triggers metadata change.
     self.run_stmt_in_hive("SET hive.exec.dynamic.partition.mode=nonstrict; "
         "insert into {} partition(p) values 
(0,0),(1,1),(2,2)".format(test_tbl))
-    self.client.execute("select * from {}".format(test_tbl))
+
+    # Let CatalogD hear about the new event.
     EventProcessorUtils.wait_for_event_processing(self)
+
+    # Wait for StatestoreD to propagate the update.
+    wait_single_statestore_heartbeat()
+
+    # Validate that relevant log lines are printed in Coordinator.
+    # This should be instantaneous after statestore update received, but set 
explicit
+    # long timeout as a precaution.
     log_regex = r"Invalidated objects in cache: \[partition {}:p={}"\
       .format(test_tbl, '%d')
     self.assert_impalad_log_contains('INFO', log_regex % 0, expected_count=1,
@@ -1332,6 +1374,12 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     self.assert_impalad_log_contains('INFO', log_regex % 2, expected_count=1,
         timeout_s=20)
 
+    # Rerun Impala query and confirm that Coordinator use a newer table 
version.
+    result = self.client.execute("select * from {}".format(test_tbl))
+    assert len(result.data) == 3
+    second_version = self.__search_table_version(result.runtime_profile, 
test_tbl)
+    assert first_version < second_version
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args="--use_local_catalog=true",

Reply via email to