This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit e4364cd24435b3885202a894aeee9221dddf6d25 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Tue Aug 26 16:09:49 2025 +0200 IMPALA-14358: Event processing can invalidate Iceberg tables IMPALA-12829 added extra code to CatalogServiceCatalog's reloadTableIfExists() that can throw a ClassCastException when it reloads an Iceberg table. When it happens during event processing the event processor invalidates the table. This usually happens when another engine updates an Iceberg table. It then causes slow table loading times as the tables need to be fully reloaded instead of just doing an incremental table loading. This patches fixes the ClassCastException by moving the cast into an if statement. Testing * e2e tests added Change-Id: I892cf326a72024674facad6750893352b982c658 Reviewed-on: http://gerrit.cloudera.org:8080/23349 Reviewed-by: Csaba Ringhofer <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/CatalogServiceCatalog.java | 44 +++++++++++----------- tests/metadata/test_event_processing.py | 23 +++++++++-- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 9c76e622b..0943212bd 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -3418,28 +3418,30 @@ public class CatalogServiceCatalog extends Catalog { tblName, eventId, message); return false; } - // Transactional table events using this method should reload the table only if the - // incoming committedWriteIds are different from the committedWriteIds cached in - // CatalogD. The if block execution is skipped for non-transactional table events. - HdfsTable hdfsTable = (HdfsTable) table; - ValidWriteIdList previousWriteIdList = hdfsTable.getValidWriteIds(); - if (previousWriteIdList != null && committedWriteIds != null - && !committedWriteIds.isEmpty()) { - // get a copy of previous write id list - boolean tableNeedsRefresh = false; - previousWriteIdList = MetastoreShim.getValidWriteIdListFromString( - previousWriteIdList.toString()); - for (Long writeId : committedWriteIds) { - if (!previousWriteIdList.isWriteIdValid(writeId)) { - tableNeedsRefresh = true; - break; + if (table instanceof HdfsTable) { + // Transactional table events using this method should reload the table only if the + // incoming committedWriteIds are different from the committedWriteIds cached in + // CatalogD. The if block execution is skipped for non-transactional table events. + HdfsTable hdfsTable = (HdfsTable) table; + ValidWriteIdList previousWriteIdList = hdfsTable.getValidWriteIds(); + if (previousWriteIdList != null && committedWriteIds != null + && !committedWriteIds.isEmpty()) { + // get a copy of previous write id list + boolean tableNeedsRefresh = false; + previousWriteIdList = MetastoreShim.getValidWriteIdListFromString( + previousWriteIdList.toString()); + for (Long writeId : committedWriteIds) { + if (!previousWriteIdList.isWriteIdValid(writeId)) { + tableNeedsRefresh = true; + break; + } + } + if (!tableNeedsRefresh) { + LOG.info("Not reloading table {} for event {} since the cache is " + + "already up-to-date", table.getFullName(), eventId); + hdfsTable.setLastSyncedEventId(eventId); + return false; } - } - if (!tableNeedsRefresh) { - LOG.info("Not reloading table {} for event {} since the cache is " - + "already up-to-date", table.getFullName(), eventId); - hdfsTable.setLastSyncedEventId(eventId); - return false; } } reloadTable(table, reason, eventId, isSkipFileMetadataReload, diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py index beef68922..32c2607f7 100644 --- a/tests/metadata/test_event_processing.py +++ b/tests/metadata/test_event_processing.py @@ -79,23 +79,38 @@ class TestEventProcessing(TestEventProcessingBase): finally: self.execute_query("drop database if exists {0} cascade".format(db_name)) + @pytest.mark.execute_serially def test_hive_impala_iceberg_reloads(self, unique_database): + def get_refresh_count(): + return EventProcessorUtils.get_int_metric('tables-refreshed', 0) + + def run_hive_check_refresh(stmt): + refresh_before = get_refresh_count() + self.run_stmt_in_hive(stmt) + EventProcessorUtils.wait_for_event_processing(self) + refresh_after = get_refresh_count() + assert refresh_after - refresh_before == 1 + test_tbl = unique_database + ".test_events" self.run_stmt_in_hive("create table {} (value string) \ partitioned by (year int) stored by iceberg".format(test_tbl)) EventProcessorUtils.wait_for_event_processing(self) self.execute_query("describe {}".format(test_tbl)) - self.run_stmt_in_hive("insert into {} values ('1', 2025)".format(test_tbl)) - self.run_stmt_in_hive("select * from {}".format(test_tbl)) + run_hive_check_refresh("insert into {} values ('1', 2025)".format(test_tbl)) - EventProcessorUtils.wait_for_event_processing(self) res = self.execute_query("select * from {}".format(test_tbl)) - assert ["1\t2025"] == res.data res = self.execute_query("refresh {}".format(test_tbl)) assert "Iceberg table reload skipped as no change detected" in res.runtime_profile + run_hive_check_refresh("alter table {} add columns (s string)".format(test_tbl)) + + res = self.execute_query("select * from {}".format(test_tbl)) + assert ["1\t2025\tNULL"] == res.data + res = self.execute_query("refresh {}".format(test_tbl)) + assert "Iceberg table reload skipped as no change detected" in res.runtime_profile + @SkipIfHive2.acid def test_empty_partition_events_transactional(self, unique_database): self._run_test_empty_partition_events(unique_database, True)
