This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new a68f71645 IMPALA-14307: Correctly update createEventId and
DeleteEventLog in AlterTableRename
a68f71645 is described below
commit a68f7164589b5d554d7f9a948badbc414e6b944e
Author: stiga-huang <[email protected]>
AuthorDate: Wed Aug 13 22:06:24 2025 +0800
IMPALA-14307: Correctly update createEventId and DeleteEventLog in
AlterTableRename
When EventProcessor is paused, e.g. due to a global INVALIDATE METADATA
operation, in alterTableOrViewRename() we don't fetch the event id of
the ALTER_TABLE event. This causes the createEventId of the new table
being -1 and the DeleteEventLog entry of the old table is missing. So
stale ALTER_TABLE RENAME events could incorrectly remove the new table
or add the old table.
The other case is in the fallback invalidation added in IMPALA-13989
that handles rename failure inside catalog (but succeeds in HMS). The
createEventId is also set as -1.
This patch fixes these by always setting a correct/meaningful
createEventId. When fetching the ALTER_TABLE event fails, we try to use
the event id before the HMS operation. It could be a little bit stale
but much better than -1.
Modified CatalogServiceCatalog#isEventProcessingActive() to just check
if event processing is enabled and renamed it to
isEventProcessingEnabled(). Note that this method is only used in DDLs
that check their self events. We should allow these checks even when
EventProcessor is not in the ACTIVE state. So when EventProcessor is
recovered, fields like createEventId in tables are still correct.
Removed the code of tracking in-flight events at the end of rename since
the new table is in unloaded state and only the createEventId is useful.
The catalog version used is also incorrect since it's not used in
CatalogServiceCatalog#renameTable() so it doesn't make sence to use it.
Removed the InProgressTableModification parameter of
alterTableOrViewRename() since it's not used anymore.
This patch also fixes a bug in getRenamedTableFromEvents() that it
always returns the first event id in the list. It should use the rename
event it finds.
Tests
- Added e2e test and ran it 40 times.
Change-Id: Ie7c305e5aaafc8bbdb85830978182394619fad08
Reviewed-on: http://gerrit.cloudera.org:8080/23291
Reviewed-by: Riza Suminto <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../impala/catalog/CatalogServiceCatalog.java | 14 ++---
.../org/apache/impala/catalog/TableLoader.java | 2 +-
.../impala/catalog/events/MetastoreEvents.java | 4 ++
.../apache/impala/service/CatalogOpExecutor.java | 60 +++++++++++++---------
tests/metadata/test_event_processing.py | 45 ++++++++++++++++
5 files changed, 92 insertions(+), 33 deletions(-)
diff --git
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 5b3f26fa2..02257b4c9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -601,10 +601,8 @@ public class CatalogServiceCatalog extends Catalog {
return metastoreEventProcessor_;
}
- public boolean isEventProcessingActive() {
- return metastoreEventProcessor_ instanceof MetastoreEventsProcessor
- && EventProcessorStatus.ACTIVE
- .equals(((MetastoreEventsProcessor)
metastoreEventProcessor_).getStatus());
+ public boolean isEventProcessingEnabled() {
+ return metastoreEventProcessor_ instanceof MetastoreEventsProcessor;
}
/**
@@ -1282,7 +1280,7 @@ public class CatalogServiceCatalog extends Catalog {
* @return true if given event information evaluates to a self-event, false
otherwise
*/
public boolean evaluateSelfEvent(SelfEventContext ctx) throws
CatalogException {
- Preconditions.checkState(isEventProcessingActive(),
+ Preconditions.checkState(isEventProcessingEnabled(),
"Event processing should be enabled when calling this method");
boolean isInsertEvent = ctx.isInsertEventContext();
long versionNumber =
@@ -1416,7 +1414,7 @@ public class CatalogServiceCatalog extends Catalog {
*/
public boolean addVersionsForInflightEvents(
boolean isInsertEvent, Table tbl, long versionNumber) {
- if (!isEventProcessingActive()) return false;
+ if (!isEventProcessingEnabled()) return false;
boolean added = tbl.addToVersionsForInflightEvents(isInsertEvent,
versionNumber);
if (added) {
LOG.info("Added {} {} in table's {} in-flight events",
@@ -1435,7 +1433,7 @@ public class CatalogServiceCatalog extends Catalog {
* @return true if versionNumber is added to in-flight list. Otherwise,
return false.
*/
public boolean addVersionsForInflightEvents(Db db, long versionNumber) {
- if (!isEventProcessingActive()) return false;
+ if (!isEventProcessingEnabled()) return false;
boolean added = db.addToVersionsForInflightEvents(versionNumber);
if (added) {
LOG.info("Added catalog version {} in database's {} in-flight events",
@@ -3268,6 +3266,8 @@ public class CatalogServiceCatalog extends Catalog {
* removed from the catalog cache.
* Sets dbWasAdded to true if both a new database and table were added to
the catalog
* cache.
+ * 'eventId' is used to update createEventId of the table which avoids the
table being
+ * dropped in processing older events.
*/
public TCatalogObject invalidateTable(TTableName tableName,
Reference<Boolean> tblWasRemoved, Reference<Boolean> dbWasAdded,
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index f625ca100..39db68ea1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -93,7 +93,7 @@ public class TableLoader {
msTbl = msClient.getHiveClient().getTable(db.getName(), tblName);
catalogTimeline.markEvent(FETCHED_HMS_TABLE);
}
- if (eventId != -1 && catalog_.isEventProcessingActive()) {
+ if (eventId != -1 && catalog_.isEventProcessingEnabled()) {
// If the eventId is not -1 it means this table was likely created by
Impala.
// However, since the load operation of the table can happen much
later, it is
// possible that the table was recreated outside Impala and hence the
eventId
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index cb4fe1d3a..b5a8ce139 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1821,9 +1821,13 @@ public class MetastoreEvents {
} else {
if (oldTblRemoved.getRef()) {
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_REMOVED).inc();
+ infoLog("Removed table {}.{}", tableBefore_.getDbName(),
+ tableBefore_.getTableName());
}
if (newTblAdded.getRef()) {
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc();
+ infoLog("Added table {}.{}", tableAfter_.getDbName(),
+ tableAfter_.getTableName());
}
}
}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 4814403f4..312101c97 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1162,7 +1162,7 @@ public class CatalogOpExecutor {
* parameters. No-op if event processing is disabled.
*/
private void addCatalogServiceIdentifiersToTable() {
- if (!catalog_.isEventProcessingActive()) return;
+ if (!catalog_.isEventProcessingEnabled()) return;
org.apache.hadoop.hive.metastore.api.Table msTbl =
table_.getMetaStoreTable();
msTbl.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
catalog_.getCatalogServiceId());
@@ -1236,7 +1236,7 @@ public class CatalogOpExecutor {
|| params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
alterTableOrViewRename(tbl,
TableName.fromThrift(params.getRename_params().getNew_table_name()),
- modification, wantMinimalResult, response, catalogTimeline,
debugAction);
+ wantMinimalResult, response, catalogTimeline, debugAction);
modification.validateInProgressModificationComplete();
return;
}
@@ -1938,7 +1938,7 @@ public class CatalogOpExecutor {
private void addCatalogServiceIdentifiers(
org.apache.hadoop.hive.metastore.api.Table msTbl, String
catalogServiceId,
long catalogVersion) {
- if (!catalog_.isEventProcessingActive()) return;
+ if (!catalog_.isEventProcessingEnabled()) return;
msTbl.putToParameters(
MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
catalogServiceId);
@@ -2395,7 +2395,7 @@ public class CatalogOpExecutor {
private List<NotificationEvent> getNextMetastoreEventsForTableIfEnabled(
EventSequence catalogTimeline, long eventId, String dbName, String
tblName,
String eventType) throws MetastoreNotificationException {
- if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
+ if (!catalog_.isEventProcessingEnabled()) return Collections.emptyList();
List<NotificationEvent> events = MetastoreEventsProcessor
.getNextMetastoreEventsInBatchesForTable(catalog_, eventId, dbName,
tblName,
eventType);
@@ -2412,7 +2412,7 @@ public class CatalogOpExecutor {
private List<NotificationEvent> getNextMetastoreEventsForDbIfEnabled(
EventSequence catalogTimeline, long eventId, String dbName, String
eventType)
throws MetastoreNotificationException {
- if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
+ if (!catalog_.isEventProcessingEnabled()) return Collections.emptyList();
List<NotificationEvent> events = MetastoreEventsProcessor
.getNextMetastoreEventsInBatchesForDb(catalog_, eventId, dbName,
eventType);
catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH);
@@ -2426,7 +2426,7 @@ public class CatalogOpExecutor {
private List<NotificationEvent> getNextMetastoreDropEventsForDbIfEnabled(
EventSequence catalogTimeline, long eventId, String dbName)
throws MetastoreNotificationException {
- if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
+ if (!catalog_.isEventProcessingEnabled()) return Collections.emptyList();
List<String> eventTypes = Lists.newArrayList(
DropDatabaseEvent.EVENT_TYPE, DropTableEvent.EVENT_TYPE);
NotificationFilter filter = e -> dbName.equalsIgnoreCase(e.getDbName())
@@ -2527,8 +2527,10 @@ public class CatalogOpExecutor {
.getMetastoreEventProcessor().getEventsFactory().get(notificationEvent, null);
Preconditions.checkState(event instanceof AlterTableEvent);
AlterTableEvent alterEvent = (AlterTableEvent) event;
+ // Skip other alter events in case there are concurrent modification
from other
+ // engines.
if (!alterEvent.isRename()) continue;
- return new Pair<>(events.get(0).getEventId(),
+ return new Pair<>(alterEvent.getEventId(),
new Pair<>(alterEvent.getBeforeTable(),
alterEvent.getAfterTable()));
} catch (MetastoreNotificationException e) {
throw new CatalogException("Unable to create a metastore event", e);
@@ -5754,8 +5756,7 @@ public class CatalogOpExecutor {
* reloaded on the next access.
*/
private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
- InProgressTableModification modification, boolean wantMinimalResult,
- TDdlExecResponse response, EventSequence catalogTimeline,
+ boolean wantMinimalResult, TDdlExecResponse response, EventSequence
catalogTimeline,
@Nullable String debugAction) throws ImpalaException {
Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread());
TableName tableName = oldTbl.getTableName();
@@ -5763,10 +5764,14 @@ public class CatalogOpExecutor {
oldTbl.getMetaStoreTable().deepCopy();
msTbl.setDbName(newTableName.getDb());
msTbl.setTableName(newTableName.getTbl());
+ // Gets the latest event id before we trigger the alter_table HMS RPC. We
then use
+ // this id to find the triggered ALTER_TABLE event for self-event
detection based on
+ // createEventId and EventDeleteLog.
long eventId = -1;
- try (MetaStoreClient msClient =
catalog_.getMetaStoreClient(catalogTimeline)) {
- eventId = getCurrentEventId(msClient);
- catalogTimeline.markEvent(FETCHED_LATEST_HMS_EVENT_ID + eventId);
+ if (catalog_.isEventProcessingEnabled()) {
+ try (MetaStoreClient msClient =
catalog_.getMetaStoreClient(catalogTimeline)) {
+ eventId = getCurrentEventId(msClient, catalogTimeline);
+ }
}
// If oldTbl is a synchronized Kudu table, rename the underlying Kudu
table.
boolean isSynchronizedKuduTable = (oldTbl instanceof KuduTable) &&
@@ -5817,11 +5822,19 @@ public class CatalogOpExecutor {
catalog_.renameTable(tableName.toThrift(), newTableName.toThrift());
Preconditions.checkNotNull(result);
if (renamedTable != null) {
- org.apache.hadoop.hive.metastore.api.Table tblBefore =
renamedTable.second.first;
- addToDeleteEventLog(renamedTable.first, DeleteEventLog
- .getTblKey(tblBefore.getDbName(), tblBefore.getTableName()));
+ eventId = renamedTable.first;
+ LOG.info("Got ALTER_TABLE RENAME event id {}.", eventId);
+ } else if (catalog_.isEventProcessingEnabled()) {
+ // Using the eventId at the beginning of the operation is better than
nothing.
+ LOG.warn("ALTER_TABLE RENAME event not found. Using {} in createEventId
of {} " +
+ "and DeleteEventLog for {}.",
+ eventId, newTableName, tableName);
+ }
+ if (catalog_.isEventProcessingEnabled()) {
+ addToDeleteEventLog(eventId, DeleteEventLog
+ .getTblKey(tableName.getDb(), tableName.getTbl()));
if (result.second != null) {
- result.second.setCreateEventId(renamedTable.first);
+ result.second.setCreateEventId(eventId);
}
}
TCatalogObject oldTblDesc = null, newTblDesc = null;
@@ -5846,18 +5859,15 @@ public class CatalogOpExecutor {
// The rename succeeded in HMS but failed in the catalog cache. The
cache is in an
// inconsistent state, so invalidate the new table to reload it.
newTblDesc = catalog_.invalidateTable(newTableName.toThrift(),
- new Reference<>(), new Reference<>(), catalogTimeline);
+ new Reference<>(), new Reference<>(), catalogTimeline, eventId);
if (newTblDesc == null) {
throw new ImpalaRuntimeException(String.format(
"The new table/view %s was concurrently removed during rename.",
newTableName));
}
+ LOG.info("Invalidated {} to recover from catalog rename failure",
newTableName);
} else {
Preconditions.checkNotNull(result.first);
- // TODO: call addVersionsForInflightEvents using
InProgressTableModification object
- // that is passed into catalog_.renameTable()
- catalog_.addVersionsForInflightEvents(
- false, result.second, modification.newVersionNumber());
newTblDesc = wantMinimalResult ?
result.second.toInvalidationObject() :
result.second.toTCatalogObject();
}
@@ -5875,7 +5885,7 @@ public class CatalogOpExecutor {
* collected.
*/
public void addToDeleteEventLog(long eventId, String objectKey) {
- if (!catalog_.isEventProcessingActive()) {
+ if (!catalog_.isEventProcessingEnabled()) {
LOG.trace("Not adding event {}:{} since events processing is not
active", eventId,
objectKey);
return;
@@ -6591,7 +6601,7 @@ public class CatalogOpExecutor {
*/
private void addCatalogServiceIdentifiers(
org.apache.hadoop.hive.metastore.api.Table msTbl, Partition partition) {
- if (!catalog_.isEventProcessingActive()) return;
+ if (!catalog_.isEventProcessingEnabled()) return;
Preconditions.checkState(msTbl.isSetParameters());
Preconditions.checkNotNull(partition, "Partition is null");
Map<String, String> tblParams = msTbl.getParameters();
@@ -6625,7 +6635,7 @@ public class CatalogOpExecutor {
*/
private void addToInflightVersionsOfPartition(
Map<String, String> partitionParams, HdfsPartition.Builder partBuilder) {
- if (!catalog_.isEventProcessingActive()) return;
+ if (!catalog_.isEventProcessingEnabled()) return;
Preconditions.checkState(partitionParams != null);
String version = partitionParams
.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey());
@@ -8203,7 +8213,7 @@ public class CatalogOpExecutor {
*/
private void addCatalogServiceIdentifiers(
Database msDb, String catalogServiceId, long newCatalogVersion) {
- if (!catalog_.isEventProcessingActive()) return;
+ if (!catalog_.isEventProcessingEnabled()) return;
Preconditions.checkNotNull(msDb);
msDb.putToParameters(MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(),
catalogServiceId);
diff --git a/tests/metadata/test_event_processing.py
b/tests/metadata/test_event_processing.py
index 767833304..beef68922 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -25,6 +25,7 @@ import threading
from tests.common.test_dimensions import (
create_single_exec_option_dimension,
add_mandatory_exec_option)
+from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2
from tests.common.test_vector import HS2
@@ -744,3 +745,47 @@ class TestEventSyncWaiting(ImpalaTestSuite):
t.close()
self.execute_query("drop database if exists {} cascade".format(db))
self.execute_query("drop database if exists {}_2 cascade".format(db))
+
+
+class TestSelfRenameEvent(ImpalaTestSuite):
+ @pytest.mark.execute_serially
+ def test_self_rename_events(self, unique_database):
+ """Regression test for IMPALA-14307"""
+ try:
+ catalogd = ImpalaCluster.get_e2e_test_cluster().catalogd
+ self.execute_query("create table {}.tbl_a(i
int)".format(unique_database))
+ # Wait until the CREATE_DATABASE and CREATE_TABLE events are skipped.
+ EventProcessorUtils.wait_for_event_processing(self)
+ self.execute_query(
+ "alter table {0}.tbl_a rename to {0}.tbl_b".format(unique_database))
+ self.execute_query(":event_processor('pause')")
+
+ with self.create_impala_client() as alter_client:
+ version_after_create = catalogd.service.get_catalog_version()
+ alter_client.set_configuration(
+ {"debug_action": "catalogd_table_rename_delay:SLEEP@6000"})
+ alter_handle = alter_client.execute_async(
+ "alter table {0}.tbl_b rename to
{0}.tbl_a".format(unique_database))
+ alter_client.wait_for_admission_control(alter_handle, timeout_s=10)
+ # Wait for at most 10 second until catalogd increase the version for
rename
+ # operation. This indicates the rename starts.
+ start_time = time.time()
+ while (time.time() - start_time < 10.0
+ and catalogd.service.get_catalog_version() <=
version_after_create):
+ time.sleep(0.05)
+ # Sleep to let catalogd sends the alter_table HMS RPC
+ time.sleep(1)
+ # Invalidate tbl_b to remove it in catalog
+ self.execute_query("invalidate metadata
{}.tbl_b".format(unique_database))
+ alter_client.wait_for_finished_timeout(alter_handle, timeout=10)
+ alter_client.close_query(alter_handle)
+
+ # Resume event processing. The first ALTER_TABLE RENAME events should be
skipped.
+ events_skipped_before =
EventProcessorUtils.get_int_metric('events-skipped', 0)
+ self.execute_query(":event_processor('start')")
+ EventProcessorUtils.wait_for_event_processing(self)
+ events_skipped_after =
EventProcessorUtils.get_int_metric('events-skipped', 0)
+ assert events_skipped_after == events_skipped_before + 2
+ finally:
+ # Recover event processing to avoid impacting other tests
+ self.execute_query(":event_processor('start')")