This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e4364cd24435b3885202a894aeee9221dddf6d25
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Aug 26 16:09:49 2025 +0200

    IMPALA-14358: Event processing can invalidate Iceberg tables
    
    IMPALA-12829 added extra code to CatalogServiceCatalog's
    reloadTableIfExists() that can throw a ClassCastException when it
    reloads an Iceberg table. When it happens during event processing
    the event processor invalidates the table.
    
    This usually happens when another engine updates an Iceberg table. It
    then causes slow table loading times as the tables need to be fully
    reloaded instead of just doing an incremental table loading.
    
    This patches fixes the ClassCastException by moving the cast into an
    if statement.
    
    Testing
     * e2e tests added
    
    Change-Id: I892cf326a72024674facad6750893352b982c658
    Reviewed-on: http://gerrit.cloudera.org:8080/23349
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/CatalogServiceCatalog.java      | 44 +++++++++++-----------
 tests/metadata/test_event_processing.py            | 23 +++++++++--
 2 files changed, 42 insertions(+), 25 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 9c76e622b..0943212bd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3418,28 +3418,30 @@ public class CatalogServiceCatalog extends Catalog {
             tblName, eventId, message);
         return false;
       }
-      // Transactional table events using this method should reload the table 
only if the
-      // incoming committedWriteIds are different from the committedWriteIds 
cached in
-      // CatalogD. The if block execution is skipped for non-transactional 
table events.
-      HdfsTable hdfsTable = (HdfsTable) table;
-      ValidWriteIdList previousWriteIdList = hdfsTable.getValidWriteIds();
-      if (previousWriteIdList != null && committedWriteIds != null
-          && !committedWriteIds.isEmpty()) {
-        // get a copy of previous write id list
-        boolean tableNeedsRefresh = false;
-        previousWriteIdList = MetastoreShim.getValidWriteIdListFromString(
-            previousWriteIdList.toString());
-        for (Long writeId : committedWriteIds) {
-          if (!previousWriteIdList.isWriteIdValid(writeId)) {
-            tableNeedsRefresh = true;
-            break;
+      if (table instanceof HdfsTable) {
+        // Transactional table events using this method should reload the 
table only if the
+        // incoming committedWriteIds are different from the committedWriteIds 
cached in
+        // CatalogD. The if block execution is skipped for non-transactional 
table events.
+        HdfsTable hdfsTable = (HdfsTable) table;
+        ValidWriteIdList previousWriteIdList = hdfsTable.getValidWriteIds();
+        if (previousWriteIdList != null && committedWriteIds != null
+            && !committedWriteIds.isEmpty()) {
+          // get a copy of previous write id list
+          boolean tableNeedsRefresh = false;
+          previousWriteIdList = MetastoreShim.getValidWriteIdListFromString(
+              previousWriteIdList.toString());
+          for (Long writeId : committedWriteIds) {
+            if (!previousWriteIdList.isWriteIdValid(writeId)) {
+              tableNeedsRefresh = true;
+              break;
+            }
+          }
+          if (!tableNeedsRefresh) {
+            LOG.info("Not reloading table {} for event {} since the cache is "
+                + "already up-to-date", table.getFullName(), eventId);
+            hdfsTable.setLastSyncedEventId(eventId);
+            return false;
           }
-        }
-        if (!tableNeedsRefresh) {
-          LOG.info("Not reloading table {} for event {} since the cache is "
-              + "already up-to-date", table.getFullName(), eventId);
-          hdfsTable.setLastSyncedEventId(eventId);
-          return false;
         }
       }
       reloadTable(table, reason, eventId, isSkipFileMetadataReload,
diff --git a/tests/metadata/test_event_processing.py 
b/tests/metadata/test_event_processing.py
index beef68922..32c2607f7 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -79,23 +79,38 @@ class TestEventProcessing(TestEventProcessingBase):
     finally:
       self.execute_query("drop database if exists {0} cascade".format(db_name))
 
+  @pytest.mark.execute_serially
   def test_hive_impala_iceberg_reloads(self, unique_database):
+    def get_refresh_count():
+      return EventProcessorUtils.get_int_metric('tables-refreshed', 0)
+
+    def run_hive_check_refresh(stmt):
+      refresh_before = get_refresh_count()
+      self.run_stmt_in_hive(stmt)
+      EventProcessorUtils.wait_for_event_processing(self)
+      refresh_after = get_refresh_count()
+      assert refresh_after - refresh_before == 1
+
     test_tbl = unique_database + ".test_events"
     self.run_stmt_in_hive("create table {} (value string) \
         partitioned by (year int) stored by iceberg".format(test_tbl))
     EventProcessorUtils.wait_for_event_processing(self)
     self.execute_query("describe {}".format(test_tbl))
 
-    self.run_stmt_in_hive("insert into {} values ('1', 2025)".format(test_tbl))
-    self.run_stmt_in_hive("select * from {}".format(test_tbl))
+    run_hive_check_refresh("insert into {} values ('1', 
2025)".format(test_tbl))
 
-    EventProcessorUtils.wait_for_event_processing(self)
     res = self.execute_query("select * from {}".format(test_tbl))
-
     assert ["1\t2025"] == res.data
     res = self.execute_query("refresh {}".format(test_tbl))
     assert "Iceberg table reload skipped as no change detected" in 
res.runtime_profile
 
+    run_hive_check_refresh("alter table {} add columns (s 
string)".format(test_tbl))
+
+    res = self.execute_query("select * from {}".format(test_tbl))
+    assert ["1\t2025\tNULL"] == res.data
+    res = self.execute_query("refresh {}".format(test_tbl))
+    assert "Iceberg table reload skipped as no change detected" in 
res.runtime_profile
+
   @SkipIfHive2.acid
   def test_empty_partition_events_transactional(self, unique_database):
     self._run_test_empty_partition_events(unique_database, True)

Reply via email to