This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e7c97439d12644af2a59bcd893aa7b8bdcb83b36
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Mon Nov 18 22:02:12 2024 -0800

    IMPALA-12141: EP shouldn't fail while releasing write lock if the lock
    is not held previously
    
    Without IMPALA-12832, Event Processor (EP) is going into error state
    when there is an issue while obtaining a table write lock because the
    finally-clause of releaseWriteLock() is always invoked even if the lock
    is not held by current thread. This patch addresses the problem by
    checking if the table holds write lock before releasing it.
    
    Note: With IMPALA-12832, the EP invalidates the table when an error is
    encountered which is still an overhead. With this patch EP will neither
    goes into error state nor invalidates when this issue is encountered.
    
    Testing:
    - Added an end-to-end to verify the same.
    
    Change-Id: Ib2e4c965796dd515ab8549efa616f72510ca447f
    Reviewed-on: http://gerrit.cloudera.org:8080/22080
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 25 +++++++++++++++++-----
 .../java/org/apache/impala/util/DebugUtils.java    |  5 +++++
 tests/custom_cluster/test_events_custom_configs.py | 25 ++++++++++++++++++++++
 3 files changed, 50 insertions(+), 5 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 20937c57b..923894cd9 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4958,7 +4958,9 @@ public class CatalogOpExecutor {
         table.setLastSyncedEventId(eventId);
       }
       UnlockWriteLockIfErronouslyLocked();
-      table.releaseWriteLock();
+      if (table.isWriteLockedByCurrentThread()) {
+        table.releaseWriteLock();
+      }
     }
   }
 
@@ -5063,7 +5065,14 @@ public class CatalogOpExecutor {
 
     boolean errorOccured = false;
     try {
-      tryWriteLock(table, reason, NoOpEventSequence.INSTANCE);
+      if (!DebugUtils.hasDebugAction(BackendConfig.INSTANCE.debugActions(),
+          DebugUtils.MOCK_WRITE_LOCK_FAILURE)) {
+        tryWriteLock(table, reason, NoOpEventSequence.INSTANCE);
+      } else {
+        // Mock the debug action that there is a failure in obtaining write 
lock.
+        // We don't want to throw InternalException to fail EP for test 
purpose.
+        return 0;
+      }
       InProgressTableModification modification =
           new InProgressTableModification(catalog_, table);
       catalog_.getLock().writeLock().unlock();
@@ -5100,7 +5109,9 @@ public class CatalogOpExecutor {
         table.setLastSyncedEventId(eventId);
       }
       UnlockWriteLockIfErronouslyLocked();
-      table.releaseWriteLock();
+      if (table.isWriteLockedByCurrentThread()) {
+        table.releaseWriteLock();
+      }
     }
     return 0;
   }
@@ -5173,7 +5184,9 @@ public class CatalogOpExecutor {
           "Could not acquire lock on the table " + table.getFullName(), e);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
-      table.releaseWriteLock();
+      if (table.isWriteLockedByCurrentThread()) {
+        table.releaseWriteLock();
+      }
     }
   }
 
@@ -5296,7 +5309,9 @@ public class CatalogOpExecutor {
       throw e;
     } finally {
       UnlockWriteLockIfErronouslyLocked();
-      table.releaseWriteLock();
+      if (table.isWriteLockedByCurrentThread()) {
+        table.releaseWriteLock();
+      }
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 28502d871..781acd766 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -95,6 +95,11 @@ public class DebugUtils {
   // behavior of metastore returning partitions with empty values
   public static final String MOCK_EMPTY_PARTITION_VALUES = 
"mock_empty_partition_values";
 
+  // debug action label to mock catalogD to mimick that there was failure while
+  // obtaining write lock while reloading partitions. This action is required 
for repro
+  // test failure for IMPALA-13126.
+  public static final String MOCK_WRITE_LOCK_FAILURE = 
"mock_write_lock_failure";
+
   /**
    * Returns true if the label of action is set in the debugActions
    */
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index adf30b18c..b23861622 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1332,6 +1332,31 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     self.assert_impalad_log_contains('INFO', log_regex % 2, expected_count=1,
         timeout_s=20)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal "
+                  "--hms_event_polling_interval_s=1 "
+                  "--invalidate_metadata_on_event_processing_failure=false "
+                  "--debug_actions=mock_write_lock_failure:true",
+    disable_log_buffering=True, cluster_size=1)
+  def test_write_lock_on_partitioned_events(self, unique_database):
+    """IMPALA-12277: This test verifies that CommitCompactionEvent on a 
partitioned table
+    succeeds if the write lock is not held by the table while processing the 
event by the
+    event processor. 'mock_write_lock_failure' mocks that there is a failure 
while
+    acquiring write lock for CommitCompactionEvent"""
+    test_tbl = unique_database + ".test_invalidate_table"
+    acid_props = self._get_transactional_tblproperties(True)
+    self.client.execute("create table {} (id int) partitioned by (p int) {}"
+                        .format(test_tbl, acid_props))
+    for _ in range(10):
+      self.client.execute(
+        "insert into {} partition(p=0) values (1),(2),(3)".format(test_tbl))
+    self.run_stmt_in_hive(
+      "alter table {} partition(p=0) compact 'major' and 
wait".format(test_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
+
 
 @SkipIfFS.hive
 class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):

Reply via email to