This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ccbbc0acbeaf01e9636991726d4f0ddb2c9feb05
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Wed Aug 7 16:12:09 2024 -0700

    IMPALA-12758: Fix catalogd not setting prev_id for reloaded partitions
    
    Cache modification via partitioned table events should retain the
    prev_id of the partitions while reloading the partitions. Currently,
    prev_id is set to -1, so when the catalogDelta is sent from the
    statestore to impala demons, because prev_id is not valid, coordinators
    in local catalog mode will not know whether to invalidate the current
    partition.
    
    This patch addresses this issue by setting the prev_id of reloaded
    partition.
    
    Note: This is only an issue in local catalog mode for
    REFRESH <partition> or updating partitions based on HMS events.
    
    Testing:
    - Added an end-to-end test to verify the same
    
    Change-Id: Ia7d53f601b63d83e99cbd2d1f58076a0dc78c610
    Reviewed-on: http://gerrit.cloudera.org:8080/21662
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  |  1 +
 tests/custom_cluster/test_events_custom_configs.py | 22 ++++++++++++++++++++++
 tests/custom_cluster/test_local_catalog.py         | 17 +++++++++++++++++
 3 files changed, 40 insertions(+)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 9e6a2047d..009e702f9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -3053,6 +3053,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
           || HdfsPartition.comparePartitionKeyValues(
           oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) 
== 0);
       if (oldPartition != null) {
+        partBuilder.setPrevId(oldPartition.getId());
         partBuilder.setFileDescriptors(oldPartition);
         partBuilder.setCreateEventId(oldPartition.getCreateEventId());
         partBuilder.setLastCompactionId(oldPartition.getLastCompactionId());
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 4f37ca1b1..b3695e297 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1307,6 +1307,28 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     verify_partition(True)
     verify_partition(False)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=5",
+    cluster_size=1)
+  def test_invalidate_stale_partition_on_reload(self, unique_database):
+    test_tbl = unique_database + ".test_invalidate_table"
+    self.client.execute("create table {} (id int) partitioned by (p int)"
+        .format(test_tbl))
+    self.client.execute("alter table {} add partition (p=0)".format(test_tbl))
+    self.client.execute("alter table {} add partition (p=1)".format(test_tbl))
+    self.client.execute("alter table {} add partition (p=2)".format(test_tbl))
+    self.run_stmt_in_hive("SET hive.exec.dynamic.partition.mode=nonstrict; "
+        "insert into {} partition(p) values 
(0,0),(1,1),(2,2)".format(test_tbl))
+    self.client.execute("select * from {}".format(test_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    log_regex = r"Invalidated objects in cache: \[partition %s:p=\d 
\(id=%%d\)\]" \
+        % test_tbl
+    self.assert_impalad_log_contains('INFO', log_regex % 0)
+    self.assert_impalad_log_contains('INFO', log_regex % 1)
+    self.assert_impalad_log_contains('INFO', log_regex % 2)
+
 @SkipIfFS.hive
 class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):
   """This class contains tests that exercise the event processing mechanism in 
the
diff --git a/tests/custom_cluster/test_local_catalog.py 
b/tests/custom_cluster/test_local_catalog.py
index 93a587a9d..140920d04 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -574,6 +574,23 @@ class 
TestLocalCatalogObservability(CustomClusterTestSuite):
     assert 1 == impalad.get_metric_value(
         "catalog.server.client-cache.total-clients-for-lightweight-rpc")
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=0",
+    cluster_size=1)
+  def test_invalidate_stale_partition_on_reload(self, unique_database):
+    test_tbl = unique_database + ".test_invalidate_table"
+    self.client.execute(
+        "create table {} (id int) partitioned by (p int)".format(test_tbl))
+    self.client.execute("alter table {} add partition (p=0)".format(test_tbl))
+    # Make the partition loaded in the coordinator, so that one instance needs 
to be
+    # invalidated when the partition is reloaded.
+    self.client.execute("show partitions {}".format(test_tbl))
+    self.client.execute("refresh {} partition(p=0)".format(test_tbl))
+    log_regex = r"Invalidated objects in cache: \[partition %s:p=0 \(id=0\)\]" 
\
+        % test_tbl
+    self.assert_impalad_log_contains('INFO', log_regex)
 
 class TestFullAcid(CustomClusterTestSuite):
   @classmethod

Reply via email to