This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 148888e3ed4f97292499b2e6ee8d5a756dc648d9 Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Wed Feb 8 12:01:41 2023 -0800 IMPALA-11822: Optimize the Refresh/Invalidate event processing by skipping unnecessary events Added a new variable 'lastRefreshEventId' in the catalogD's table/partition object to store the latest event id before loading the table/partition. This will be updated frequently based on refresh or invalidate commands. This variable can be used in the event processor to decide whether to process or skip the reload event by comparing it with the current event id. It is enough to store the refresh event's event id, invalidate event anyway flushes out the object from cache. Note: Need to enable two configs for this optimization to work: 1) enable_reload_events=true 2) enable_sync_to_latest_event_on_ddls=true Testing: Added a test to fire few reload events via HMS API and then verify in the event processor that some older events are skipped. Change-Id: I905957683a96c3ea01ab4bf043d6658ce37b7574 Reviewed-on: http://gerrit.cloudera.org:8080/19484 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/CatalogServiceCatalog.java | 21 +++++++++++++ .../org/apache/impala/catalog/HdfsPartition.java | 21 +++++++++++-- .../java/org/apache/impala/catalog/HdfsTable.java | 8 +++++ .../main/java/org/apache/impala/catalog/Table.java | 18 ++++++++++++ .../org/apache/impala/catalog/TableLoader.java | 1 + .../impala/catalog/events/MetastoreEvents.java | 19 +++++++++++- .../apache/impala/service/CatalogOpExecutor.java | 26 ++++++++++++----- tests/custom_cluster/test_events_custom_configs.py | 34 ++++++++++++++++++++-- 8 files changed, 135 insertions(+), 13 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index c87cda9a2..53f423f4c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -2567,6 +2567,13 @@ public class CatalogServiceCatalog extends Catalog { } tbl.setCatalogVersion(newCatalogVersion); LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName())); + // Set the last refresh event id as current HMS event id since all the metadata + // until the current HMS event id is refreshed at this point. + if (currentHmsEventId > eventId) { + tbl.setLastRefreshEventId(currentHmsEventId); + } else { + tbl.setLastRefreshEventId(eventId); + } return tbl.toTCatalogObject(resultType); } finally { context.stop(); @@ -3837,4 +3844,18 @@ public class CatalogServiceCatalog extends Catalog { return syncToLatestEventFactory_; } + public boolean isPartitionLoadedAfterEvent(String dbName, String tableName, + Partition msPartition, long eventId) { + try { + HdfsPartition hdfsPartition = getHdfsPartition(dbName, tableName, msPartition); + if (hdfsPartition.getLastRefreshEventId() > eventId) { + return true; + } + } catch (CatalogException ex) { + LOG.warn("Encountered an exception while the partition's last refresh event id: " + + dbName + "." + tableName + ". Ignoring further processing and try to " + + "reload the partition.", ex); + } + return false; + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 034790fd9..bbd4bb129 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -746,6 +746,10 @@ public class HdfsPartition extends CatalogObjectImpl // -1 means there is no previous compaction event or compaction is not supported private final long lastCompactionId_; + // The last refresh event id of the partition + // -1 means there is no previous refresh event happened + private final long lastRefreshEventId_; + /** * Constructor. Needed for third party extensions that want to use their own builder * to construct the object. @@ -765,7 +769,8 @@ public class HdfsPartition extends CatalogObjectImpl encodedInsertFileDescriptors, encodedDeleteFileDescriptors, location, isMarkedCached, accessLevel, hmsParameters, cachedMsPartitionDescriptor, partitionStats, hasIncrementalStats, numRows, writeId, - inFlightEvents, /*createEventId=*/-1L, /*lastCompactionId*/-1L); + inFlightEvents, /*createEventId=*/-1L, /*lastCompactionId*/-1L, + /*lastRefreshEventId*/-1L); } protected HdfsPartition(HdfsTable table, long id, long prevId, String partName, @@ -777,7 +782,8 @@ public class HdfsPartition extends CatalogObjectImpl boolean isMarkedCached, TAccessLevel accessLevel, Map<String, String> hmsParameters, CachedHmsPartitionDescriptor cachedMsPartitionDescriptor, byte[] partitionStats, boolean hasIncrementalStats, long numRows, long writeId, - InFlightEvents inFlightEvents, long createEventId, long lastCompactionId) { + InFlightEvents inFlightEvents, long createEventId, long lastCompactionId, + long lastRefreshEventId) { table_ = table; id_ = id; prevId_ = prevId; @@ -798,6 +804,7 @@ public class HdfsPartition extends CatalogObjectImpl inFlightEvents_ = inFlightEvents; createEventId_ = createEventId; lastCompactionId_ = lastCompactionId; + lastRefreshEventId_ = lastRefreshEventId; if (partName == null && id_ != CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) { partName_ = FeCatalogUtils.getPartitionName(this); } else { @@ -807,6 +814,8 @@ public class HdfsPartition extends CatalogObjectImpl public long getCreateEventId() { return createEventId_; } + public long getLastRefreshEventId() { return lastRefreshEventId_; } + @Override // FeFsPartition public HdfsStorageDescriptor getInputFormatDescriptor() { return fileFormatDescriptor_; @@ -1251,6 +1260,7 @@ public class HdfsPartition extends CatalogObjectImpl // is not active. private long createEventId_ = -1L; private long lastCompactionId_ = -1L; + private long lastRefreshEventId_ = -1L; private InFlightEvents inFlightEvents_ = new InFlightEvents(); @Nullable @@ -1318,7 +1328,7 @@ public class HdfsPartition extends CatalogObjectImpl encodedDeleteFileDescriptors_, location_, isMarkedCached_, accessLevel_, hmsParameters_, cachedMsPartitionDescriptor_, partitionStats_, hasIncrementalStats_, numRows_, writeId_, inFlightEvents_, createEventId_, - lastCompactionId_); + lastCompactionId_, lastRefreshEventId_); } public Builder setId(long id) { @@ -1331,6 +1341,11 @@ public class HdfsPartition extends CatalogObjectImpl return this; } + public Builder setLastRefreshEventId(long eventId) { + lastRefreshEventId_ = eventId; + return this; + } + public Builder setPrevId(long prevId) { prevId_ = prevId; return this; diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 538e813c6..da83a47ac 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -2876,6 +2876,13 @@ public class HdfsTable extends Table implements FeFsTable { FsPermissionCache permissionCache = new FsPermissionCache(); Map<HdfsPartition.Builder, HdfsPartition> partBuilderToPartitions = new HashMap<>(); Set<HdfsPartition.Builder> partBuildersFileMetadataRefresh = new HashSet<>(); + long latestEventId = -1L; + try { + latestEventId = client.getCurrentNotificationEventId().getEventId(); + } catch (TException exception) { + LOG.warn(String.format("Unable to fetch latest event id from HMS: %s", + exception.getMessage())); + } for (Map.Entry<Partition, HdfsPartition> entry : hmsPartsToHdfsParts.entrySet()) { Partition hmsPartition = entry.getKey(); HdfsPartition oldPartition = entry.getValue(); @@ -2887,6 +2894,7 @@ public class HdfsTable extends Table implements FeFsTable { if (oldPartition != null) { partBuilder.setFileDescriptors(oldPartition); } + partBuilder.setLastRefreshEventId(latestEventId); switch (fileMetadataLoadOpts) { case FORCE_LOAD: partBuildersFileMetadataRefresh.add(partBuilder); diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index aaad451df..21b766ce8 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -197,6 +197,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { // not by reading this flag and without acquiring read lock on table object protected volatile long lastSyncedEventId_ = -1; + protected volatile long lastRefreshEventId_ = -1L; + protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner) { msTable_ = msTable; @@ -1028,4 +1030,20 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { * Clears the in-progress modifications in case of failures. */ public void resetInProgressModification() { } + + public long getLastRefreshEventId() { return lastRefreshEventId_; } + + public void setLastRefreshEventId(long eventId) { + if (eventId > lastRefreshEventId_) { + lastRefreshEventId_ = eventId; + } + LOG.debug("last refreshed event id for table: {} set to: {}", getFullName(), + lastRefreshEventId_); + // TODO: Should we reset lastSyncedEvent Id if it is less than event Id? + // If we don't reset it - we may start syncing table from an event id which + // is less than refresh event id + if (lastSyncedEventId_ < eventId) { + setLastSyncedEventId(eventId); + } + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java index bdd97a3f6..6ccae1fe1 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java @@ -153,6 +153,7 @@ public class TableLoader { MetastoreEventsProcessor.syncToLatestEventId(catalog_, table, catalog_.getEventFactoryForSyncToLatestEvent(), metrics_); } + table.setLastRefreshEventId(latestEventId); } catch (TableLoadingException e) { table = IncompleteTable.createFailedMetadataLoadTable(db, tblName, e); } catch (NoSuchObjectException e) { diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 78895d2b4..805f58093 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -2456,6 +2456,9 @@ public class MetastoreEvents { // if isRefresh_ is set to true then it is refresh query, else it is invalidate query private boolean isRefresh_; + + private org.apache.impala.catalog.Table tbl_; + /** * Prevent instantiation from outside should use MetastoreEventFactory instead */ @@ -2471,6 +2474,7 @@ public class MetastoreEvents { updatedFields.get("table")); reloadPartition_ = (Partition)updatedFields.get("partition"); isRefresh_ = (boolean)updatedFields.get("isRefresh"); + tbl_ = catalog_.getTable(dbName_, tblName_); } catch (Exception e) { throw new MetastoreNotificationException(debugString("Unable to " + "parse reload message"), e); @@ -2494,7 +2498,7 @@ public class MetastoreEvents { @Override public void process() throws MetastoreNotificationException { - if (isSelfEvent()) { + if (isSelfEvent() || isOlderEvent()) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) .inc(getNumberOfEvents()); infoLog("Incremented events skipped counter to {}", @@ -2514,6 +2518,19 @@ public class MetastoreEvents { } } + private boolean isOlderEvent() { + if (tbl_ instanceof IncompleteTable) { + return false; + } + // Always check the lastRefreshEventId on the table first for table level refresh + if (tbl_.getLastRefreshEventId() > getEventId() || (reloadPartition_ != null && + catalog_.isPartitionLoadedAfterEvent(dbName_, tblName_, reloadPartition_, + getEventId()))) { + return true; + } + return false; + } + /** * Process partition reload */ diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index bc8f1af1a..3830ffa7e 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -6435,8 +6435,8 @@ public class CatalogOpExecutor { } if (BackendConfig.INSTANCE.enableReloadEvents()) { - // fire event for refresh event - fireReloadEventHelper(req, updatedThriftTable, tblName, tbl); + // fire event for refresh event and update the last refresh event id + fireReloadEventAndUpdateRefreshEventId(req, updatedThriftTable, tblName, tbl); } // Return the TCatalogObject in the result to indicate this request can be @@ -6480,12 +6480,13 @@ public class CatalogOpExecutor { /** * Helper class for refresh event. * This class invokes metastore shim's fireReloadEvent to fire event to HMS + * and update the last refresh event id in the cache * @param req - request object for TResetMetadataRequest. * @param updatedThriftTable - updated thrift table after refresh query * @param tblName * @param tbl */ - private void fireReloadEventHelper(TResetMetadataRequest req, + private void fireReloadEventAndUpdateRefreshEventId(TResetMetadataRequest req, TCatalogObject updatedThriftTable, TableName tblName, Table tbl) { List<String> partVals = null; if (req.isSetPartition_spec()) { @@ -6500,19 +6501,30 @@ public class CatalogOpExecutor { catalog_.getCatalogServiceId()); tableParams.put(MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), String.valueOf(newCatalogVersion)); - MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(), - req.isIs_refresh(), partVals, tblName.getDb(), tblName.getTbl(), - tableParams); + List<Long> eventIds = MetastoreShim.fireReloadEventHelper( + catalog_.getMetaStoreClient(), req.isIs_refresh(), partVals, tblName.getDb(), + tblName.getTbl(), tableParams); if (req.isIs_refresh()) { if (catalog_.tryLock(tbl, true, 600000)) { catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion); + if (!eventIds.isEmpty()) { + if (req.isSetPartition_spec()) { + HdfsPartition partition = ((HdfsTable) tbl) + .getPartitionFromThriftPartitionSpec(req.getPartition_spec()); + HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition); + partBuilder.setLastRefreshEventId(eventIds.get(0)); + ((HdfsTable) tbl).updatePartition(partBuilder); + } else { + tbl.setLastRefreshEventId(eventIds.get(0)); + } + } } else { LOG.warn(String.format("Couldn't obtain a version lock for the table: %s. " + "Self events may go undetected in that case", tbl.getName())); } } - } catch (TException e) { + } catch (TException | CatalogException e) { LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "fireReloadEvent") + e.getMessage()); } finally { diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 63385a325..95fb83e6c 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -18,6 +18,9 @@ from __future__ import print_function import logging import pytest + +from hive_metastore.ttypes import FireEventRequest +from hive_metastore.ttypes import FireEventRequestData from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIfFS @@ -227,8 +230,10 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite): self.__run_self_events_test(unique_database, True) self.__run_self_events_test(unique_database, False) - @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1" - " --enable_reload_events=true") + @CustomClusterTestSuite.with_args( + catalogd_args="--hms_event_polling_interval_s=5" + " --enable_reload_events=true" + " --enable_sync_to_latest_event_on_ddls=true") def test_refresh_invalidate_events(self, unique_database): """Test is to verify Impala-11808, refresh/invalidate commands should generate a Reload event in HMS and CatalogD's event processor should process this event. @@ -265,6 +270,31 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite): check_self_events("refresh {}.{} partition(year=2022)" .format(unique_database, test_reload_table)) check_self_events("refresh {}.{}".format(unique_database, test_reload_table)) + EventProcessorUtils.wait_for_event_processing(self) + + # Test to verify if older events are being skipped in event processor + data = FireEventRequestData() + data.refreshEvent = True + req = FireEventRequest(True, data) + req.dbName = unique_database + req.tableName = test_reload_table + # table level reload events + tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events() + for i in range(10): + self.hive_client.fire_listener_event(req) + EventProcessorUtils.wait_for_event_processing(self) + tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events() + assert tbl_events_skipped_after > tbl_events_skipped_before + # partition level reload events + EventProcessorUtils.wait_for_event_processing(self) + part_events_skipped_before = EventProcessorUtils.get_num_skipped_events() + req.partitionVals = ["2022"] + for i in range(10): + self.hive_client.fire_listener_event(req) + EventProcessorUtils.wait_for_event_processing(self) + part_events_skipped_after = EventProcessorUtils.get_num_skipped_events() + assert part_events_skipped_after > part_events_skipped_before + @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") def test_commit_compaction_events(self, unique_database):
