This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ccbbc0acbeaf01e9636991726d4f0ddb2c9feb05 Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Wed Aug 7 16:12:09 2024 -0700 IMPALA-12758: Fix catalogd not setting prev_id for reloaded partitions Cache modification via partitioned table events should retain the prev_id of the partitions while reloading the partitions. Currently, prev_id is set to -1, so when the catalogDelta is sent from the statestore to impala demons, because prev_id is not valid, coordinators in local catalog mode will not know whether to invalidate the current partition. This patch addresses this issue by setting the prev_id of reloaded partition. Note: This is only an issue in local catalog mode for REFRESH <partition> or updating partitions based on HMS events. Testing: - Added an end-to-end test to verify the same Change-Id: Ia7d53f601b63d83e99cbd2d1f58076a0dc78c610 Reviewed-on: http://gerrit.cloudera.org:8080/21662 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../java/org/apache/impala/catalog/HdfsTable.java | 1 + tests/custom_cluster/test_events_custom_configs.py | 22 ++++++++++++++++++++++ tests/custom_cluster/test_local_catalog.py | 17 +++++++++++++++++ 3 files changed, 40 insertions(+) diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 9e6a2047d..009e702f9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -3053,6 +3053,7 @@ public class HdfsTable extends Table implements FeFsTable { || HdfsPartition.comparePartitionKeyValues( oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) == 0); if (oldPartition != null) { + partBuilder.setPrevId(oldPartition.getId()); partBuilder.setFileDescriptors(oldPartition); partBuilder.setCreateEventId(oldPartition.getCreateEventId()); partBuilder.setLastCompactionId(oldPartition.getLastCompactionId()); diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 4f37ca1b1..b3695e297 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -1307,6 +1307,28 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): verify_partition(True) verify_partition(False) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=5", + cluster_size=1) + def test_invalidate_stale_partition_on_reload(self, unique_database): + test_tbl = unique_database + ".test_invalidate_table" + self.client.execute("create table {} (id int) partitioned by (p int)" + .format(test_tbl)) + self.client.execute("alter table {} add partition (p=0)".format(test_tbl)) + self.client.execute("alter table {} add partition (p=1)".format(test_tbl)) + self.client.execute("alter table {} add partition (p=2)".format(test_tbl)) + self.run_stmt_in_hive("SET hive.exec.dynamic.partition.mode=nonstrict; " + "insert into {} partition(p) values (0,0),(1,1),(2,2)".format(test_tbl)) + self.client.execute("select * from {}".format(test_tbl)) + EventProcessorUtils.wait_for_event_processing(self) + log_regex = r"Invalidated objects in cache: \[partition %s:p=\d \(id=%%d\)\]" \ + % test_tbl + self.assert_impalad_log_contains('INFO', log_regex % 0) + self.assert_impalad_log_contains('INFO', log_regex % 1) + self.assert_impalad_log_contains('INFO', log_regex % 2) + @SkipIfFS.hive class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase): """This class contains tests that exercise the event processing mechanism in the diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py index 93a587a9d..140920d04 100644 --- a/tests/custom_cluster/test_local_catalog.py +++ b/tests/custom_cluster/test_local_catalog.py @@ -574,6 +574,23 @@ class TestLocalCatalogObservability(CustomClusterTestSuite): assert 1 == impalad.get_metric_value( "catalog.server.client-cache.total-clients-for-lightweight-rpc") + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0", + cluster_size=1) + def test_invalidate_stale_partition_on_reload(self, unique_database): + test_tbl = unique_database + ".test_invalidate_table" + self.client.execute( + "create table {} (id int) partitioned by (p int)".format(test_tbl)) + self.client.execute("alter table {} add partition (p=0)".format(test_tbl)) + # Make the partition loaded in the coordinator, so that one instance needs to be + # invalidated when the partition is reloaded. + self.client.execute("show partitions {}".format(test_tbl)) + self.client.execute("refresh {} partition(p=0)".format(test_tbl)) + log_regex = r"Invalidated objects in cache: \[partition %s:p=0 \(id=0\)\]" \ + % test_tbl + self.assert_impalad_log_contains('INFO', log_regex) class TestFullAcid(CustomClusterTestSuite): @classmethod
