This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit bb7ad61c6c381ac25cd3fa198944230799fb8be8 Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Wed May 3 11:45:55 2023 -0700 IMPALA-11534: Skip reloading file metadata for some ALTER_TABLE events Reloading file metadata for medium to wide tables is heavy weight operation in general. So it would be ideal from event processor perspective to minimize file metadata reloading especially for ALTER_TABLE statements which are quite common in metastore events. This patch implements the above optimization by looking at before and after table objects of an alter event and see if it corresponds to ALTER TABLE add/change/replace column, set owner, set table properties. If any of these are changed, the file metadata reloading can be skipped. For inter-operability purpose this patch introduced a new start-up flag 'file_metadata_reload_properties' which can be used to define what table properties need file metadata to be reloaded. If this value is set to empty, this optimization is not in effect and the file metadata is always reloaded. Testing: Added a unit test to confirm that, for certain alter table statements the file metadata isn't reloaded. Change-Id: Ia66b96a7c4b7f50fbf46b2e02296cd29a47347b6 Reviewed-on: http://gerrit.cloudera.org:8080/19838 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/catalog/catalog-server.cc | 6 ++ be/src/util/backend-gflag-util.cc | 2 + common/thrift/BackendGflags.thrift | 2 + .../org/apache/impala/compat/MetastoreShim.java | 2 +- .../impala/catalog/CatalogServiceCatalog.java | 50 ++++++++--- .../impala/catalog/events/MetastoreEvents.java | 86 ++++++++++++++++++- .../org/apache/impala/service/BackendConfig.java | 9 ++ .../events/MetastoreEventsProcessorTest.java | 96 ++++++++++++++++++++++ 8 files changed, 241 insertions(+), 12 deletions(-) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 3e639737e..23a6fd012 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -132,6 +132,12 @@ DEFINE_bool(enable_reload_events, false, "This configuration is used to fire a " "This config only affects the firing of the reload event. Processing of reload " "event will always happen"); +DEFINE_string(file_metadata_reload_properties, "EXTERNAL, metadata_location," + "transactional, transactional_properties, TRANSLATED_TO_EXTERNAL, repl.last.id", + "This configuration is used to whitelist the table properties that are supposed to" + "refresh file metadata when these properties are changed. To skip this optimization," + "set the value to empty string"); + DECLARE_string(state_store_host); DECLARE_int32(state_store_subscriber_port); DECLARE_int32(state_store_port); diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index e4449575a..ec5f317ec 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -105,6 +105,7 @@ DECLARE_bool(pull_table_types_and_comments); DECLARE_bool(enable_reload_events); DECLARE_string(geospatial_library); DECLARE_int32(thrift_rpc_max_message_size); +DECLARE_string(file_metadata_reload_properties); // HS2 SAML2.0 configuration // Defined here because TAG_FLAG caused issues in global-flags.cc @@ -408,6 +409,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { cfg.__set_min_processing_per_thread(FLAGS_min_processing_per_thread); cfg.__set_skip_resource_checking_on_last_executor_group_set( FLAGS_skip_resource_checking_on_last_executor_group_set); + cfg.__set_file_metadata_reload_properties(FLAGS_file_metadata_reload_properties); cfg.__set_thrift_rpc_max_message_size(FLAGS_thrift_rpc_max_message_size); return Status::OK(); } diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index ba92d239e..878733ec2 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -252,4 +252,6 @@ struct TBackendGflags { 110: required bool skip_resource_checking_on_last_executor_group_set 111: required i32 thrift_rpc_max_message_size + + 112: required string file_metadata_reload_properties } diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index e7b075e74..71bac5179 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -893,7 +893,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase { } } else { catalog_.reloadTableIfExists(entry.getKey().getDb(), entry.getKey().getTbl(), - "CommitTxnEvent", getEventId()); + "CommitTxnEvent", getEventId(), /*isSkipFileMetadataReload*/false); } } } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index c16ba3ac7..bdaff8a31 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -133,6 +134,7 @@ import org.slf4j.LoggerFactory; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -319,6 +321,10 @@ public class CatalogServiceCatalog extends Catalog { private final Set<String> blacklistedDbs_; // Tables that will be skipped in loading. private final Set<TableName> blacklistedTables_; + + // Table properties that require file metadata reload + private final Set<String> whitelistedTblProperties_; + /** * Initialize the CatalogServiceCatalog using a given MetastoreClientPool impl. * @@ -364,6 +370,12 @@ public class CatalogServiceCatalog extends Catalog { catalogdTableInvalidator_ = CatalogdTableInvalidator.create(this, BackendConfig.INSTANCE); Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0); + String whitelist = BackendConfig.INSTANCE.getFileMetadataReloadProperties(); + whitelistedTblProperties_ = Sets.newHashSet(); + for (String tblProps: Splitter.on(',').trimResults().omitEmptyStrings().split( + whitelist)) { + whitelistedTblProperties_.add(tblProps); + } } public void startEventsProcessor() { @@ -2477,7 +2489,7 @@ public class CatalogServiceCatalog extends Catalog { * and ignore the result. */ public void reloadTable(Table tbl, String reason) throws CatalogException { - reloadTable(tbl, reason, -1); + reloadTable(tbl, reason, -1, false); } /** @@ -2486,10 +2498,22 @@ public class CatalogServiceCatalog extends Catalog { * and ignore the result. * eventId: HMS event id which triggered reload */ - public void reloadTable(Table tbl, String reason, long eventId) - throws CatalogException { + public void reloadTable(Table tbl, String reason, long eventId, + boolean isSkipFileMetadataReload) throws CatalogException { reloadTable(tbl, new TResetMetadataRequest(), CatalogObject.ThriftObjectType.NONE, - reason, eventId); + reason, eventId, isSkipFileMetadataReload); + } + + /** + * Wrapper around {@link #reloadTable(Table, boolean, CatalogObject.ThriftObjectType, + * String, long)} which passes false for {@code refreshUpdatedPartitions} argument. + * eventId: HMS event id which triggered reload + */ + public TCatalogObject reloadTable(Table tbl, TResetMetadataRequest request, + CatalogObject.ThriftObjectType resultType, String reason, long eventId) + throws CatalogException { + return reloadTable(tbl, request, resultType, reason, eventId, + /*isSkipFileMetadataReload*/false); } /** @@ -2507,8 +2531,8 @@ public class CatalogServiceCatalog extends Catalog { * table. Otherwise, set the eventId as table's last synced id after reload is done */ public TCatalogObject reloadTable(Table tbl, TResetMetadataRequest request, - CatalogObject.ThriftObjectType resultType, String reason, long eventId) - throws CatalogException { + CatalogObject.ThriftObjectType resultType, String reason, long eventId, + boolean isSkipFileMetadataReload) throws CatalogException { LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName())); Preconditions.checkState(!(tbl instanceof IncompleteTable)); String dbName = tbl.getDb().getName(); @@ -2550,8 +2574,10 @@ public class CatalogServiceCatalog extends Catalog { } if (tbl instanceof HdfsTable) { ((HdfsTable) tbl) - .load(true, msClient.getHiveClient(), msTbl, - request.refresh_updated_hms_partitions, request.debug_action, reason); + .load(true, msClient.getHiveClient(), msTbl, !isSkipFileMetadataReload, + /* loadTableSchema*/true, request.refresh_updated_hms_partitions, + /* partitionsToUpdate*/null, request.debug_action, + /*partitionToEventId*/null, reason); } else { tbl.load(true, msClient.getHiveClient(), msTbl, reason); } @@ -2757,7 +2783,7 @@ public class CatalogServiceCatalog extends Catalog { * otherwise. */ public boolean reloadTableIfExists(String dbName, String tblName, String reason, - long eventId) throws CatalogException { + long eventId, boolean isSkipFileMetadataReload) throws CatalogException { try { Table table = getTable(dbName, tblName); if (table == null || table instanceof IncompleteTable) return false; @@ -2766,7 +2792,7 @@ public class CatalogServiceCatalog extends Catalog { + "event {}.", dbName, tblName, eventId, table.getCreateEventId()); return false; } - reloadTable(table, reason, eventId); + reloadTable(table, reason, eventId, isSkipFileMetadataReload); } catch (DatabaseNotFoundException | TableLoadingException e) { LOG.info(String.format("Reload table if exists failed with: %s", e.getMessage())); return false; @@ -3860,4 +3886,8 @@ public class CatalogServiceCatalog extends Catalog { } return false; } + + public Set<String> getWhitelistedTblProperties() { + return whitelistedTblProperties_; + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index aae2122c4..b9d1334e9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -749,6 +750,10 @@ public class MetastoreEvents { // case of alter events protected org.apache.hadoop.hive.metastore.api.Table msTbl_; + // A boolean flag used in alter table event to record if the file metadata reload + // can be skipped for certain type of alter table statements + protected boolean skipFileMetadataReload_ = false; + // in case of partition batch events, this method can be overridden to return // the partition object from the events which are batched together. protected Partition getPartitionForBatching() { return null; } @@ -850,7 +855,8 @@ public class MetastoreEvents { throws CatalogException { try { if (!catalog_.reloadTableIfExists(dbName_, tblName_, - "Processing " + operation + " event from HMS", getEventId())) { + "Processing " + operation + " event from HMS", getEventId(), + skipFileMetadataReload_)) { debugLog("Automatic refresh on table {} failed as the table " + "either does not exist anymore or is not in loaded state.", getFullyQualifiedTblName()); @@ -1465,6 +1471,7 @@ public class MetastoreEvents { + "which can be ignored."); return; } + skipFileMetadataReload_ = canSkipFileMetadataReload(tableBefore_, tableAfter_); // in case of table level alters from external systems it is better to do a full // refresh eg. this could be due to as simple as adding a new parameter or a // full blown adding or changing column type @@ -1495,6 +1502,83 @@ public class MetastoreEvents { } } + + /** + * This method checks if the reloading of file metadata can be skipped for an alter + * statement. This method accepts two arguments, 1) pre-modified HMS table instance + * 2) post-modified HMS table instance and compare what really changed in the alter + * event. + */ + private boolean canSkipFileMetadataReload( + org.apache.hadoop.hive.metastore.api.Table beforeTable, + org.apache.hadoop.hive.metastore.api.Table afterTable) { + Set<String> whitelistedTblProperties = catalog_.getWhitelistedTblProperties(); + // If the whitelisted table properties are empty, then we skip this optimization + if (whitelistedTblProperties.isEmpty()) { + return false; + } + // There are lot of other alter statements which doesn't require file metadata + // reload but these are the most common types for alter statements. + if (isFieldSchemaChanged(beforeTable, afterTable) || + isTableOwnerChanged(beforeTable.getOwner(), afterTable.getOwner()) || + !isCustomTblPropsChanged(whitelistedTblProperties, beforeTable, afterTable)) { + return true; + } + return false; + } + + private boolean isFieldSchemaChanged( + org.apache.hadoop.hive.metastore.api.Table beforeTable, + org.apache.hadoop.hive.metastore.api.Table afterTable) { + List<FieldSchema> beforeCols = beforeTable.getSd().getCols(); + List<FieldSchema> afterCols = afterTable.getSd().getCols(); + // check if columns are added or removed + if (beforeCols.size() != afterCols.size()) { + infoLog("Change in number of columns detected for table {}.{} from {} to {}", + dbName_, tblName_, beforeCols.size(), afterCols.size()); + return true; + } + // check if columns are replaced or column definition is changed + // Field schema's comment is rarely used, so it'll be ignored + for (int i = 0; i < beforeCols.size(); i++) { + if (!beforeCols.get(i).getName().equals(afterCols.get(i).getName()) || + !beforeCols.get(i).getType().equals(afterCols.get(i).getType())) { + infoLog("Change in table schema detected for table {}.{} from {} to {} ", + tblName_, dbName_, beforeCols.get(i).getName() + " (" + + beforeCols.get(i).getType() +")", afterCols.get(i).getName() + " (" + + afterCols.get(i).getType() + ")"); + return true; + } + } + return false; + } + + private boolean isTableOwnerChanged(String ownerBefore, String ownerAfter) { + if (!Objects.equals(ownerBefore, ownerAfter)) { + infoLog("Change in Ownership detected for table {}.{}, oldOwner: {}, " + + "newOwner: {}", dbName_, tblName_, ownerBefore, ownerAfter); + return true; + } + return false; + } + + // Check if the whitelisted properties are changed during the alter statement + private boolean isCustomTblPropsChanged(Set<String> whitelistedTblProperties, + org.apache.hadoop.hive.metastore.api.Table beforeTable, + org.apache.hadoop.hive.metastore.api.Table afterTable) { + for (String whitelistConfig : whitelistedTblProperties) { + String configBefore = beforeTable.getParameters().get(whitelistConfig); + String configAfter = afterTable.getParameters().get(whitelistConfig); + if (!Objects.equals(configBefore, configAfter)) { + infoLog("Change in whitelisted table properties detected for table {}.{} " + + "whitelisted config: {}, value before: {}, value after: {}", dbName_, + tblName_, whitelistConfig, configBefore, configAfter); + return true; + } + } + return false; + } + /** * Detects a event sync flag was turned on in this event */ diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index 4031e8483..68c136ceb 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -398,4 +398,13 @@ public class BackendConfig { public int getThriftRpcMaxMessageSize() { return backendCfg_.thrift_rpc_max_message_size; } + + public String getFileMetadataReloadProperties() { + return backendCfg_.file_metadata_reload_properties; + } + + @VisibleForTesting + public void setFileMetadataReloadProperties(String newPropertiesConfig) { + backendCfg_.file_metadata_reload_properties = newPropertiesConfig; + } } diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index 3d87a7de5..37d9203ed 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -18,6 +18,7 @@ package org.apache.impala.catalog.events; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -2784,6 +2785,84 @@ public class MetastoreEventsProcessorTest { } } + /** + * Some of the the alter table events doesn't require to reload file metadata, this + * test asserts that file metadata is not reloaded for such alter table events + */ + @Test + public void testAlterTableNoFileMetadataReload() throws Exception { + createDatabase(TEST_DB_NAME, null); + final String testTblName = "testAlterTableNoFileMetadataReload"; + createTable(testTblName, true); + eventsProcessor_.processEvents(); + // load the table first + loadTable(testTblName); + HdfsTable tbl = (HdfsTable) catalog_.getTable(TEST_DB_NAME, testTblName); + // get file metadata load counter before altering the partition + long fileMetadataLoadBefore = + tbl.getMetrics().getCounter(HdfsTable.NUM_LOAD_FILEMETADATA_METRIC).getCount(); + // Test 1: alter table add cols + alterTableAddCol(testTblName, "newCol", "string", ""); + eventsProcessor_.processEvents(); + + // Test 2: alter table change column type + altertableChangeCol(testTblName, "newCol", "int", null); + eventsProcessor_.processEvents(); + + // Test 3: alter table set owner + alterTableSetOwner(testTblName, "testOwner"); + eventsProcessor_.processEvents(); + + // Test 4: alter table set non-whitelisted tbl properties + alterTableAddParameter(testTblName, "dummyKey1", "dummyValue1"); + eventsProcessor_.processEvents(); + + // Verify that the file metadata isn't reloaded + tbl = (HdfsTable) catalog_.getTable(TEST_DB_NAME, testTblName); + long fileMetadataLoadAfter = + tbl.getMetrics().getCounter(HdfsTable.NUM_LOAD_FILEMETADATA_METRIC).getCount(); + assertEquals(fileMetadataLoadAfter, fileMetadataLoadBefore); + + // Test 5: alter table add whitelisted property + alterTableAddParameter(testTblName, "EXTERNAL", "true"); + eventsProcessor_.processEvents(); + // Verify that the file metadata is reloaded + tbl = (HdfsTable) catalog_.getTable(TEST_DB_NAME, testTblName); + fileMetadataLoadAfter = + tbl.getMetrics().getCounter(HdfsTable.NUM_LOAD_FILEMETADATA_METRIC).getCount(); + assertNotEquals(fileMetadataLoadAfter, fileMetadataLoadBefore); + fileMetadataLoadBefore = fileMetadataLoadAfter; + + // Test 6: alter table change whitelisted property + alterTableAddParameter(testTblName, "EXTERNAL", "false"); + eventsProcessor_.processEvents(); + // Verify that the file metadata is reloaded + tbl = (HdfsTable) catalog_.getTable(TEST_DB_NAME, testTblName); + fileMetadataLoadAfter = + tbl.getMetrics().getCounter(HdfsTable.NUM_LOAD_FILEMETADATA_METRIC).getCount(); + assertNotEquals(fileMetadataLoadAfter, fileMetadataLoadBefore); + fileMetadataLoadBefore = fileMetadataLoadAfter; + + // Test 6: alter table set the same value for whitelisted property + alterTableAddParameter(testTblName, "EXTERNAL", "false"); + eventsProcessor_.processEvents(); + // Verify that the file metadata isn't reloaded + tbl = (HdfsTable) catalog_.getTable(TEST_DB_NAME, testTblName); + fileMetadataLoadAfter = + tbl.getMetrics().getCounter(HdfsTable.NUM_LOAD_FILEMETADATA_METRIC).getCount(); + assertEquals(fileMetadataLoadAfter, fileMetadataLoadBefore); + + // Test 7: alter table remove whitelisted property + alterTableAddParameter(testTblName, "EXTERNAL", null); // sending null value will + // unset the parameter + eventsProcessor_.processEvents(); + // Verify that the file metadata is reloaded + tbl = (HdfsTable) catalog_.getTable(TEST_DB_NAME, testTblName); + fileMetadataLoadAfter = + tbl.getMetrics().getCounter(HdfsTable.NUM_LOAD_FILEMETADATA_METRIC).getCount(); + assertNotEquals(fileMetadataLoadAfter, fileMetadataLoadBefore); + } + /** * For an external table, the test asserts file metadata is not reloaded during * AlterPartitionEvent processing if only partition parameters are changed @@ -3828,6 +3907,23 @@ public class MetastoreEventsProcessorTest { } } + /** + * Sets the owner info from HMS + * @param tblName + * @param newOwner + * @throws TException + */ + private void alterTableSetOwner(String tblName, String newOwner) throws TException { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + org.apache.hadoop.hive.metastore.api.Table msTable = + msClient.getHiveClient().getTable(TEST_DB_NAME, tblName); + msTable.setOwner(newOwner); + msTable.setOwnerType(PrincipalType.USER); + msClient.getHiveClient().alter_table_with_environmentContext( + TEST_DB_NAME, tblName, msTable, null); + } + } + /** * Removes the partition by values from HMS * @param tblName
