This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 691604b1d IMPALA-12835: Fix event processing without
hms_event_incremental_refresh_transactional_table
691604b1d is described below
commit 691604b1d1f0e5f0dc95fdb4976cf826135e08fb
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Thu Mar 7 18:17:20 2024 +0100
IMPALA-12835: Fix event processing without
hms_event_incremental_refresh_transactional_table
If hms_event_incremental_refresh_transactional_table is false, then
for non-partitioned ACID tables Impala needs to rely on alter table
event to detect INSERTs in Hive. This patch changes the event processor
to not skip reloading files when processing the alter table event
for this specific type of table (even if the changes in the table
look trivial).
Testing:
- added a simple regression test
Change-Id: I137b289f0e5f7c9c1947e2a3b30258c979f20987
Reviewed-on: http://gerrit.cloudera.org:8080/21116
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../impala/catalog/events/MetastoreEvents.java | 13 +++++++++++
tests/custom_cluster/test_events_custom_configs.py | 25 ++++++++++++++++++++++
2 files changed, 38 insertions(+)
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index c62571912..1aa8b682a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1875,6 +1875,19 @@ public class MetastoreEvents {
if (whitelistedTblProperties.isEmpty()) {
return false;
}
+
+ boolean incrementalAcidRefresh =
+
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
+ boolean unpartitioned = afterTable.getPartitionKeysSize() == 0;
+ if (!incrementalAcidRefresh && unpartitioned
+ && AcidUtils.isTransactionalTable(afterTable.getParameters())) {
+ // In case of ACID tables no INSERT event is generated. If flag
+ // hms_event_incremental_refresh_transactional_table is false, then
transaction
+ // related events are ignored (including COMMIT_TXN), so Impala has to
rely on
+ // ALTER_TABLE events to detect INSERTs to unpartitioned tables
(IMPALA-12835).
+ return false;
+ }
+
// There are lot of other alter statements which doesn't require file
metadata
// reload but these are the most common types for alter statements.
if (isFieldSchemaChanged(beforeTable, afterTable) ||
diff --git a/tests/custom_cluster/test_events_custom_configs.py
b/tests/custom_cluster/test_events_custom_configs.py
index dafcb5854..0b943ba23 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1219,3 +1219,28 @@ class
TestEventProcessingCustomConfigs(CustomClusterTestSuite):
# finish than 100s (e.g. I saw a run of 5mins).
# self.assert_catalogd_log_contains("INFO", "Not added ABORTED write id 1
since it's "
# + "not opened and might already be cleaned up")
+
+ @CustomClusterTestSuite.with_args(
+
catalogd_args="--hms_event_incremental_refresh_transactional_table=false")
+ def test_no_hms_event_incremental_refresh_transactional_table(self,
unique_database):
+ """IMPALA-12835: Test that Impala notices inserts to acid tables when
+ hms_event_incremental_refresh_transactional_table is false.
+ """
+ for partitioned in [False, True]:
+ tbl = "part_tbl" if partitioned else "tbl"
+ fq_tbl = unique_database + '.' + tbl
+ part_create = " partitioned by (p int)" if partitioned else ""
+ part_insert = " partition (p = 1)" if partitioned else ""
+
+ self.run_stmt_in_hive(
+ "create transactional table {} (i int){}".format(fq_tbl,
part_create))
+ EventProcessorUtils.wait_for_event_processing(self)
+
+ # Load the table in Impala before INSERT
+ self.client.execute("refresh " + fq_tbl)
+ self.run_stmt_in_hive(
+ "insert into {}{} values (1),(2),(3)".format(fq_tbl, part_insert))
+ EventProcessorUtils.wait_for_event_processing(self)
+
+ results = self.client.execute("select i from " + fq_tbl)
+ assert results.data == ["1", "2", "3"]