This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5250cc14b63114fa3976784d6bac39d743e4011a Author: stiga-huang <[email protected]> AuthorDate: Mon Feb 26 09:50:13 2024 +0800 IMPALA-12827: Fix failures in processing AbortTxnEvent due to aborted write id is cleaned up HdfsTable tracks the ValidWriteIdList from HMS. When the table is reloaded, the ValidWriteIdList is updated to the latest state. An ABORT_TXN event that is lagging behind could match to aborted write ids that have already been cleaned up by the HMS housekeeping thread. Such write ids can't be found in the cached ValidWriteIdList as opened or aborted write ids. This hits a Precondition check and fails the event processing. This patch fixes the check to allow this case. Also adds more logs for dealing with write ids. Tests - Add custom-cluster test to start Hive with the housekeeping thread turned on and verified that such ABORT_TXN event is processed correctly. Change-Id: I93b6f684d6e4b94961d804a0c022029249873681 Reviewed-on: http://gerrit.cloudera.org:8080/21071 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- bin/create-test-configuration.sh | 7 ++++ .../org/apache/impala/compat/MetastoreShim.java | 2 + .../java/org/apache/impala/catalog/HdfsTable.java | 9 ++-- .../impala/catalog/events/MetastoreEvents.java | 9 +++- .../hive/common/MutableValidReaderWriteIdList.java | 14 +++++-- fe/src/test/resources/hive-site.xml.py | 5 +++ tests/common/impala_test_suite.py | 1 + tests/custom_cluster/test_events_custom_configs.py | 48 +++++++++++++++++++++- 8 files changed, 84 insertions(+), 11 deletions(-) diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh index 90861eeb1..e8fbde807 100755 --- a/bin/create-test-configuration.sh +++ b/bin/create-test-configuration.sh @@ -151,6 +151,13 @@ mkdir -p hive-site-events-cleanup rm -f hive-site-events-cleanup/hive-site.xml ln -s "${CONFIG_DIR}/hive-site_events_cleanup.xml" hive-site-events-cleanup/hive-site.xml +export HIVE_VARIANT=housekeeping_on +$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_housekeeping_on.xml +mkdir -p hive-site-housekeeping-on +rm -f hive-site-housekeeping-on/hive-site.xml +ln -s "${CONFIG_DIR}/hive-site_housekeeping_on.xml" \ + hive-site-housekeeping-on/hive-site.xml + export HIVE_VARIANT=ranger_auth HIVE_RANGER_CONF_DIR=hive-site-ranger-auth $IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py hive-site_ranger_auth.xml diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 516fe6b7a..2f11da0c3 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -839,6 +839,8 @@ public class MetastoreShim extends Hive3MetastoreShimBase { commitTxnMessage_ = MetastoreEventsProcessor.getMessageDeserializer() .getCommitTxnMessage(event.getMessage()); txnId_ = commitTxnMessage_.getTxnId(); + LOG.info("EventId: {} EventType: COMMIT_TXN transaction id: {}", getEventId(), + txnId_); } @Override diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 05f62814b..f3c3e3132 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -3033,16 +3033,15 @@ public class HdfsTable extends Table implements FeFsTable { throws TableLoadingException { String tblFullName = getFullName(); if (LOG.isTraceEnabled()) LOG.trace("Get valid writeIds for table: " + tblFullName); - ValidWriteIdList validWriteIds = null; + ValidWriteIdList validWriteIds; try { validWriteIds = MetastoreShim.fetchValidWriteIds(client, tblFullName); - if (LOG.isTraceEnabled()) { - LOG.trace("Valid writeIds: " + validWriteIds.writeToString()); - } + LOG.info("Valid writeIds of table {}: {}", tblFullName, + validWriteIds.writeToString()); return validWriteIds; } catch (Exception e) { throw new TableLoadingException(String.format("Error loading ValidWriteIds for " + - "table '%s'", getName()), e); + "table '%s'", tblFullName), e); } } diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 0f073ffce..cf5650e85 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -2802,7 +2802,7 @@ public class MetastoreEvents { @Override protected void processTableEvent() throws MetastoreNotificationException { if (msTbl_ == null) { - debugLog("Ignoring the event since table {} is not found", + debugLog("Ignoring the event since table {} does not exist or is unloaded", getFullyQualifiedTblName()); return; } @@ -2823,6 +2823,8 @@ public class MetastoreEvents { TableWriteId tableWriteId = new TableWriteId(dbName_, tblName_, tbl_.getCreateEventId(), txnToWriteId.getWriteId()); catalog_.addWriteId(txnToWriteId.getTxnId(), tableWriteId); + infoLog("Added write id {} on table {}.{} for txn {}", + txnToWriteId.getWriteId(), dbName_, tblName_, txnToWriteId.getTxnId()); } } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException("Failed to mark open " @@ -3007,12 +3009,15 @@ public class MetastoreEvents { MetastoreEventsProcessor.getMessageDeserializer().getAbortTxnMessage( event.getMessage()); txnId_ = abortTxnMessage.getTxnId(); + infoLog("Received AbortTxnEvent for transaction " + txnId_); } @Override protected void process() throws MetastoreNotificationException { try { - addAbortedWriteIdsToTables(catalog_.getWriteIds(txnId_)); + Set<TableWriteId> tableWriteIds = catalog_.getWriteIds(txnId_); + infoLog("Adding {} aborted write ids", tableWriteIds.size()); + addAbortedWriteIdsToTables(tableWriteIds); } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to " + "mark aborted write ids to table for txn {}. Event processing cannot " diff --git a/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java b/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java index b3a82a1f9..571c63afc 100644 --- a/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java +++ b/fe/src/main/java/org/apache/impala/hive/common/MutableValidReaderWriteIdList.java @@ -251,6 +251,8 @@ public class MutableValidReaderWriteIdList implements MutableValidWriteIdList { for (long currentId = highWatermark + 1; currentId <= writeId; currentId++) { exceptions.add(currentId); } + LOG.debug("Added OPEN write id: {}. Old high water mark: {}.", + writeId, highWatermark); highWatermark = writeId; return true; } @@ -263,19 +265,25 @@ public class MutableValidReaderWriteIdList implements MutableValidWriteIdList { boolean added = false; long maxWriteId = Collections.max(writeIds); if (maxWriteId > highWatermark) { - LOG.trace("Current high water mark: {} and max aborted write id: {}, so mark them " + LOG.info("Current high water mark: {} and max aborted write id: {}, so mark them " + "as open first", highWatermark, maxWriteId); addOpenWriteId(maxWriteId); added = true; } for (long writeId : writeIds) { int index = Collections.binarySearch(exceptions, writeId); - // make sure the write id is not committed - Preconditions.checkState(index >= 0); + if (index < 0) { + LOG.info("Not added ABORTED write id {} since it's not opened and might " + + "already be cleaned up. minOpenWriteId: {}.", writeId, minOpenWriteId); + continue; + } added = added || !abortedBits.get(index); abortedBits.set(index); } updateMinOpenWriteId(); + if (!added) { + LOG.info("Not added any ABORTED write ids of the given {}", writeIds.size()); + } return added; } diff --git a/fe/src/test/resources/hive-site.xml.py b/fe/src/test/resources/hive-site.xml.py index c5f377925..9b9c2dbfd 100644 --- a/fe/src/test/resources/hive-site.xml.py +++ b/fe/src/test/resources/hive-site.xml.py @@ -88,6 +88,11 @@ elif variant == 'events_cleanup': 'hive.metastore.event.db.listener.timetolive': '60s', 'hive.metastore.event.db.listener.clean.interval': '10s' }) +elif variant == 'housekeeping_on': + # HMS configs needed for regression test for IMPALA-12827 + CONFIG.update({ + 'hive.metastore.housekeeping.threads.on': 'true', + }) # HBase-related configs. # Impala processes need to connect to zookeeper on INTERNAL_LISTEN_HOST for HBase. diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 89e0cc4e7..dd0b95175 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -1035,6 +1035,7 @@ class ImpalaTestSuite(BaseTestSuite): # out to beeline for better performance def run_stmt_in_hive(self, stmt, username=None): """Run a statement in Hive by Beeline.""" + LOG.info("-- executing in HiveServer2\n\n" + stmt + "\n") url = 'jdbc:hive2://' + pytest.config.option.hive_server2 return self.run_stmt_in_beeline(url, username, stmt) diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 09ab84b58..eafdcda1f 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -17,6 +17,7 @@ from __future__ import absolute_import, division, print_function from builtins import range import logging +from os import getenv import pytest @@ -25,12 +26,16 @@ from hive_metastore.ttypes import FireEventRequestData from hive_metastore.ttypes import InsertEventRequestData from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_test_suite import ImpalaTestSuite -from tests.common.skip import SkipIfFS +from tests.common.skip import SkipIf, SkipIfFS +from tests.util.acid_txn import AcidTxn from tests.util.hive_utils import HiveDbWrapper from tests.util.event_processor_utils import EventProcessorUtils from tests.util.filesystem_utils import WAREHOUSE from tests.util.iceberg_util import IcebergCatalogs +HIVE_SITE_HOUSEKEEPING_ON =\ + getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on' + @SkipIfFS.hive class TestEventProcessingCustomConfigs(CustomClusterTestSuite): @@ -1144,3 +1149,44 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite): data = int(self.execute_scalar("select count(*) from {0}.{1}".format( unique_database, hive_tbl))) assert data == 0 + + @SkipIf.is_test_jdk + @CustomClusterTestSuite.with_args( + catalogd_args="--hms_event_polling_interval_s=100", + hive_conf_dir=HIVE_SITE_HOUSEKEEPING_ON) + def test_commit_compaction_with_abort_txn(self, unique_database): + """Use a long enough polling interval to allow Hive statements to finish before + the ABORT_TXN event is processed. In local tests, the Hive statements usually + finish in 60s. + TODO: improve this by adding commands to pause and resume the event-processor.""" + tbl = "part_table" + fq_tbl = unique_database + '.' + tbl + acid = AcidTxn(self.hive_client) + self.run_stmt_in_hive( + "create transactional table {} (i int) partitioned by (p int)".format(fq_tbl)) + + # Allocate a write id on this table and abort the txn + txn_id = acid.open_txns() + acid.allocate_table_write_ids(txn_id, unique_database, tbl) + acid.abort_txn(txn_id) + + # Insert some rows and trigger compaction + for i in range(2): + self.run_stmt_in_hive( + "insert into {} partition(p=0) values (1),(2),(3)".format(fq_tbl)) + self.run_stmt_in_hive( + "alter table {} partition(p=0) compact 'major' and wait".format(fq_tbl)) + + # The CREATE_TABLE event hasn't been processed yet so we have to explictily invalidate + # the table first. + self.client.execute("invalidate metadata " + fq_tbl) + # Reload the table so the latest valid writeIdList is loaded + self.client.execute("refresh " + fq_tbl) + # Process the ABORT_TXN event + EventProcessorUtils.wait_for_event_processing(self, timeout=100) + assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" + # Uncomment this once we can stop and resume the event-processor using commands. + # Currently the test is flaky with it since the Hive statements could take longer to + # finish than 100s (e.g. I saw a run of 5mins). + # self.assert_catalogd_log_contains("INFO", "Not added ABORTED write id 1 since it's " + # + "not opened and might already be cleaned up")
