This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 46115e9a8ec1c6016cad7a1f3af7f7c260c170e3
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Mon Aug 12 14:40:39 2024 -0700

    IMPALA-13126: Obtain table read lock in EP to process partitioned event
    
    For a partition-level event, isOlderEvent() in catalogD needs to check
    whether the corresponding partition is reloaded after the event. This
    should be done after holding the table read lock. Otherwise,
    EventProcessor could hit ConcurrentModificationException error when
    there are concurrent DDLs/DMLs modifying the partition list.
    
    note: Created IMPALA-13650 for a cleaner solution to clear the inflight
    events list for partitioned table events.
    
    Testing:
    - Added a end-to-end stress test to verify the above scenario
    
    Change-Id: I26933f98556736f66df986f9440ebb64be395bc1
    Reviewed-on: http://gerrit.cloudera.org:8080/21663
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/events/MetastoreEvents.java     | 78 +++++++++++++---------
 tests/custom_cluster/test_events_custom_configs.py | 25 +++++++
 2 files changed, 72 insertions(+), 31 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index c2612a878..43ec31549 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1297,19 +1297,29 @@ public class MetastoreEvents {
           return true;
         }
         // Always check the lastRefreshEventId on the table first for table 
level refresh
-        if (tbl.getLastRefreshEventId() > getEventId() || (partitionEventObj 
!= null &&
-            catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_,
-                partitionEventObj, getEventId()))) {
-          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
-              .inc(getNumberOfEvents());
-          String messageStr = partitionEventObj == null ? "Skipping the event 
since the" +
-              " table " + dbName_+ "." + tblName_ + " has last refresh id as " 
+
-              tbl.getLastRefreshEventId() + ". Comparing it with current event 
" +
-              getEventId() + ". " : "";
-          infoLog("{}Incremented events skipped counter to {}", messageStr,
-              
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
-                  .getCount());
-          return true;
+        boolean canSkip = tbl.getLastRefreshEventId() >= getEventId();
+        try {
+          if (!canSkip && partitionEventObj != null) {
+            tbl.takeReadLock();
+            canSkip = catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_,
+                partitionEventObj, getEventId());
+          }
+          if (canSkip) {
+            metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                .inc(getNumberOfEvents());
+            String messageStr = partitionEventObj == null ? "Skipping the 
event since " +
+                "the table " + dbName_ + "." + tblName_ + " has last refresh 
id as " +
+                tbl.getLastRefreshEventId() + ". Comparing it with current 
event " +
+                getEventId() + ". " : "";
+            infoLog("{}Incremented events skipped counter to {}", messageStr,
+                
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                    .getCount());
+            return true;
+          }
+        } finally {
+          if (tbl.isReadLockedByCurrentThread()) {
+            tbl.releaseReadLock();
+          }
         }
       } catch (CatalogException e) {
         debugLog("ignoring exception while checking if it is an older event "
@@ -1805,12 +1815,6 @@ public class MetastoreEvents {
         return;
       }
 
-      if (isOlderEvent(null)) {
-        infoLog("Not processing the alter table event {} as it is an older 
event",
-            getEventId());
-        return;
-      }
-
       // Determine whether this is an event which we have already seen or if 
it is a new
       // event
       if (isSelfEvent()) {
@@ -1825,6 +1829,12 @@ public class MetastoreEvents {
             + "which can be ignored.");
         return;
       }
+
+      if (isOlderEvent(null)) {
+        infoLog("Not processing the alter table event {} as it is an older 
event",
+            getEventId());
+        return;
+      }
       skipFileMetadataReload_ = !isTruncateOp_ && 
canSkipFileMetadataReload(tableBefore_,
           tableAfter_);
       long startNs = System.nanoTime();
@@ -2640,12 +2650,6 @@ public class MetastoreEvents {
         return;
       }
 
-      if (isOlderEvent(partitionBefore_)) {
-        infoLog("Not processing the alter partition event {} as it is an older 
event",
-            getEventId());
-        return;
-      }
-
       // Ignore the event if this is a trivial event. See javadoc for
       // isTrivialAlterPartitionEvent() for examples.
       if (canBeSkipped()) {
@@ -2654,6 +2658,12 @@ public class MetastoreEvents {
             + "parameters which can be ignored.");
         return;
       }
+
+      if (isOlderEvent(partitionBefore_)) {
+        infoLog("Not processing the alter partition event {} as it is an older 
event",
+            getEventId());
+        return;
+      }
       // Reload the whole table if it's a transactional table or materialized 
view.
       // Materialized views are treated as a special case because it's 
possible to
       // receive partition event on MVs, but they are regular views in Impala. 
That
@@ -3133,13 +3143,19 @@ public class MetastoreEvents {
       org.apache.impala.catalog.Table tbl = catalog_.getTableNoThrow(dbName_, 
tblName_);
       if (tbl == null || tbl instanceof IncompleteTable) { return false; }
       // Always check the lastRefreshEventId on the table first for table 
level refresh
-      if (tbl.getLastRefreshEventId() >= getEventId()
-          || (reloadPartition_ != null
-                 && catalog_.isPartitionLoadedAfterEvent(
-                        dbName_, tblName_, reloadPartition_, getEventId()))) {
-        return true;
+      boolean canSkip = tbl.getLastRefreshEventId() >= getEventId();
+      try {
+        if (!canSkip && reloadPartition_ != null) {
+          tbl.takeReadLock();
+          canSkip = catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_,
+              reloadPartition_, getEventId());
+        }
+        return canSkip;
+      } finally {
+        if (tbl.isReadLockedByCurrentThread()) {
+          tbl.releaseReadLock();
+        }
       }
-      return false;
     }
 
     /**
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index b23861622..1deaf24c8 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1357,6 +1357,31 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     EventProcessorUtils.wait_for_event_processing(self)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--enable_reload_events=true "
+                  "--invalidate_metadata_on_event_processing_failure=false")
+  def test_reload_events_modified_concurrently(self):
+    """IMPALA-13126: This test verifies that the event processor successfully 
consumes or
+    ignores the RELOAD event triggered by refresh operation on a partitioned 
table if the
+    partitions are modified concurrently on the table."""
+    tbl = "scale_db.num_partitions_1234_blocks_per_partition_1"
+    refresh_stmt = "refresh {} partition(j=0)".format(tbl)
+    for _ in range(32):
+      self.client.execute_async(refresh_stmt)
+    for _ in range(100):
+      self.client.execute(
+        "alter table {} add if not exists partition(j=-1)".format(tbl))
+      self.client.execute(
+        "alter table {} drop partition(j=-1)".format(tbl))
+
+    try:
+      EventProcessorUtils.wait_for_event_processing(self, 1000)  # bigger 
timeout required
+      assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+    finally:
+      # Make sure the table doesn't change after this test
+      self.execute_query("alter table {} drop if exists 
partition(j=-1)".format(tbl))
+
 
 @SkipIfFS.hive
 class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):

Reply via email to