This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f18b21c1ae398bd66783e7a901c8984c186c5874 Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Tue Mar 12 21:52:46 2024 -0700 IMPALA-12856: Event processor should ignore processing partition with empty partition values While processing partition related events, Event Processor (EP) is facing IllegalStateException if the partition fetched from HMS has empty partition values. Even though this is a bug in HMS which returns partitions with empty values, EP should ignore such partitions instead of throwing IllegalStateException. Note: Added a debug option 'mock_empty_partition_values' to add malformed partition objects. Testing: - Manually verified the test provided in jira details in local env. - Added unit test to return empty partition values and verify EP state. Change-Id: Id2469930ccd74948325f1723bd8b2bd6aad02d09 Reviewed-on: http://gerrit.cloudera.org:8080/21143 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../java/org/apache/impala/catalog/HdfsTable.java | 43 +++++++-- .../org/apache/impala/service/BackendConfig.java | 4 + .../apache/impala/service/CatalogOpExecutor.java | 6 +- .../java/org/apache/impala/util/DebugUtils.java | 5 + .../java/org/apache/impala/util/MetaStoreUtil.java | 24 +++++ .../events/MetastoreEventsProcessorTest.java | 102 +++++++++++++++++++++ 6 files changed, 175 insertions(+), 9 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 85f4f5ae9..734276f2f 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -2663,8 +2663,12 @@ public class HdfsTable extends Table implements FeFsTable { public List<LiteralExpr> getTypeCompatiblePartValues(List<String> values) { List<LiteralExpr> result = new ArrayList<>(); List<Column> partitionColumns = getClusteringColumns(); - Preconditions.checkState(partitionColumns.size() == values.size()); - for (int i=0; i<partitionColumns.size(); ++i) { + if (partitionColumns.size() != values.size()) { + LOG.error("Unmatched numbers of partition values: expected={}, actual={} for " + + "table: {}." , partitionColumns.size(), values.size(), getFullName()); + return null; + } + for (int i = 0; i < partitionColumns.size(); ++i) { Pair<String, LiteralExpr> pair = getPartitionExprFromValue(values.get(i), partitionColumns.get(i).getType()); if (pair == null) { @@ -2894,12 +2898,23 @@ public class HdfsTable extends Table implements FeFsTable { return result; } + /** + * Reloads the HdfsPartitions which correspond to the given partNames. Returns the + * number of partitions which were reloaded. This method also reloads file metadata + * of all the partitions for the given partNames + */ + public int reloadPartitionsFromNames(IMetaStoreClient client, + List<String> partNames, String reason) throws CatalogException { + return reloadPartitionsFromNames(-1L, client, partNames, reason, + FileMetadataLoadOpts.FORCE_LOAD); + } + /** * Reloads the HdfsPartitions which correspond to the given partNames. Returns the * number of partitions which were reloaded. * fileMetadataLoadOpts: decides how to reload file metadata for the partitions */ - public int reloadPartitionsFromNames(IMetaStoreClient client, + public int reloadPartitionsFromNames(long eventId, IMetaStoreClient client, List<String> partNames, String reason, FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException { Preconditions.checkState(partNames != null && !partNames.isEmpty()); @@ -2910,6 +2925,14 @@ public class HdfsTable extends Table implements FeFsTable { try { hmsPartitions = MetaStoreUtil.fetchPartitionsByName(client, partNames, msTable_); for (Partition partition : hmsPartitions) { + if (partition.getValues().isEmpty()) { + LOG.error("EventId: {}, EventType: {}, Received partition with empty values:" + + " {}.\nThis table will be invalidated.", eventId, reason, partition); + throw new InvalidObjectException(String.format("Unmatched numbers of " + + "partition values: expected=%d, actual=%d for table:%s", + getClusteringColumns().size(), partition.getValues().size(), + getFullName())); + } List<LiteralExpr> partExprs = getTypeCompatiblePartValues(partition.getValues()); HdfsPartition hdfsPartition = getPartition(partExprs); if (hdfsPartition != null) { @@ -2934,13 +2957,14 @@ public class HdfsTable extends Table implements FeFsTable { /** * Reload the HdfsPartitions which correspond to the given partitions. * + * @param eventId of the event from metastore * @param client is the HMS client to be used. * @param partsFromEvent Partition objects from the event. * @param loadFileMetadata If true, file metadata will be reloaded. * @param reason Reason for reloading the partitions for logging purposes. * @return the number of partitions which were reloaded. */ - public int reloadPartitionsFromEvent(IMetaStoreClient client, + public int reloadPartitionsFromEvent(long eventId, IMetaStoreClient client, List<Partition> partsFromEvent, boolean loadFileMetadata, String reason) throws CatalogException { Preconditions.checkArgument(partsFromEvent != null @@ -2950,13 +2974,20 @@ public class HdfsTable extends Table implements FeFsTable { LOG.info("Reloading partition metadata for table: {} ({})", getFullName(), reason); Map<Partition, HdfsPartition> hmsPartToHdfsPart = new HashMap<>(); for (Partition partition : partsFromEvent) { + // If the partition values are empty, ignore the event as partition cannot be + // refetched + if (partition.getValues().isEmpty()) { + LOG.error("EventId: {}, EventType: {}, Received partition with empty values: " + + "{}.\nIgnoring reloading the partition.", eventId, reason, partition); + continue; + } List<LiteralExpr> partExprs = getTypeCompatiblePartValues(partition.getValues()); HdfsPartition hdfsPartition = getPartition(partExprs); // only reload partitions that have more recent write id if (hdfsPartition != null && (!AcidUtils.isTransactionalTable(msTable_.getParameters()) - || hdfsPartition.getWriteId() - <= MetastoreShim.getWriteIdFromMSPartition(partition))) { + || hdfsPartition.getWriteId() + <= MetastoreShim.getWriteIdFromMSPartition(partition))) { hmsPartToHdfsPart.put(partition, hdfsPartition); } } diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index e00985df8..29eeaf3e4 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -467,6 +467,10 @@ public class BackendConfig { public String debugActions() { return backendCfg_.debug_actions; } + public void setDebugActions(String debugActions) { + backendCfg_.debug_actions = debugActions; + } + public boolean isInvalidateMetadataOnEventProcessFailureEnabled() { return backendCfg_.invalidate_metadata_on_event_processing_failure; } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index c7de42284..954f87b4a 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -5068,7 +5068,7 @@ public class CatalogOpExecutor { // the partitions from HMS. int numOfPartsReloaded; try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { - numOfPartsReloaded = hdfsTable.reloadPartitionsFromNames( + numOfPartsReloaded = hdfsTable.reloadPartitionsFromNames(eventId, metaStoreClient.getHiveClient(), partNames, reason, fileMetadataLoadOpts); } modification.updateTableCatalogVersion(); @@ -5151,7 +5151,7 @@ public class CatalogOpExecutor { HdfsTable hdfsTable = (HdfsTable) table; int numOfPartsReloaded; try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { - numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent( + numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent(eventId, metaStoreClient.getHiveClient(), partsFromEvent, false, reason); } modification.updateTableCatalogVersion(); @@ -5265,7 +5265,7 @@ public class CatalogOpExecutor { MutableValidWriteIdList.WriteIdStatus.COMMITTED); int numOfPartsReloaded; try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { - numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent( + numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent(eventId, metaStoreClient.getHiveClient(), partsToRefresh, true, reason); } modification.updateTableCatalogVersion(); diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index 141faf80e..95bc7f390 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -90,6 +90,11 @@ public class DebugUtils { public static final String WAIT_SYNC_DDL_VER_DELAY = "catalogd_wait_sync_ddl_version_delay"; + // debug action label for mock that metastore returns partitions with empty values + // This action is required for repro in the unit test for IMPALA-12856 to mimic the + // behavior of metastore returning partitions with empty values + public static final String MOCK_EMPTY_PARTITION_VALUES = "mock_empty_partition_values"; + /** * Returns true if the label of action is set in the debugActions */ diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java index 0e667b4b4..03d4756b5 100644 --- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java +++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java @@ -37,6 +37,7 @@ import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.compat.MetastoreShim; +import org.apache.impala.service.BackendConfig; import org.apache.thrift.TException; import org.apache.impala.thrift.TColumn; @@ -202,12 +203,35 @@ public class MetaStoreUtil { // Fetch these partitions from the metastore. List<Partition> partitions = client.getPartitionsByNames( msTbl.getDbName(), msTbl.getTableName(), partsToFetch); + boolean foundEmptyPartitionVals = false; + for (Partition partition : partitions) { + if (partition.getValues().isEmpty()) { + LOG.error("Received partition with empty values: {}.\nRefetching the " + + "partition.", partition); + foundEmptyPartitionVals = true; + break; + } + } + if (foundEmptyPartitionVals) { + // Refetch these partitions from metastore because of empty values list + partitions = client.getPartitionsByNames(msTbl.getDbName(), + msTbl.getTableName(), partsToFetch); + } replaceSchemaFromTable(partitions, msTbl); fetchedPartitions.addAll(partitions); numDone += partitions.size(); LOG.info("Fetched {}/{} partitions for table {}.{}", numDone, partNames.size(), msTbl.getDbName(), msTbl.getTableName()); } + // This action is required for repro in the unit test (MetastoreEventsProcessorTest) + // for IMPALA-12856 to mimic the behavior of metastore returning partitions with + // empty values + if (DebugUtils.hasDebugAction(BackendConfig.INSTANCE.debugActions(), + DebugUtils.MOCK_EMPTY_PARTITION_VALUES)) { + for (org.apache.hadoop.hive.metastore.api.Partition msPart : fetchedPartitions) { + msPart.getValues().clear(); + } + } return fetchedPartitions; } diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index 2b55c3548..db35349d3 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -163,6 +163,7 @@ import org.apache.impala.thrift.TTypeNode; import org.apache.impala.thrift.TTypeNodeType; import org.apache.impala.thrift.TUpdateCatalogRequest; import org.apache.impala.thrift.TUpdatedPartition; +import org.apache.impala.util.DebugUtils; import org.apache.impala.util.EventSequence; import org.apache.impala.util.MetaStoreUtil; import org.apache.impala.util.NoOpEventSequence; @@ -4008,6 +4009,107 @@ public class MetastoreEventsProcessorTest { } } + /** + * testEmptyPartitionValues() is a regression test for IMPALA-12856 to mimic the + * behavior of metastore returning partitions with empty values + * @throws Exception + */ + @Test + public void testEmptyPartitionValues() throws Exception { + String prevFlag = BackendConfig.INSTANCE.debugActions(); + try { + String tblName = "test_empty"; + String managedTbl = "managedTblEmptyPartVals"; + createDatabase(TEST_DB_NAME, null); + eventsProcessor_.processEvents(); + createTable(tblName, true); + createTransactionalTable(TEST_DB_NAME, managedTbl, true); + eventsProcessor_.processEvents(); + loadTable(tblName); + loadTable(managedTbl); + List<List<String>> partVals = new ArrayList<>(1); + partVals.add(Arrays.asList("1")); + addPartitions(TEST_DB_NAME, tblName, partVals); + addPartitions(TEST_DB_NAME, managedTbl, partVals); + eventsProcessor_.processEvents(); + // Fire a reload event and process partition with empty values + MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(), true, + Arrays.asList("1"), TEST_DB_NAME, tblName, Collections.emptyMap()); + BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES); + processEventsAndVerifyStatus(prevFlag); + // insert partition event + simulateFiringInsertEvent(tblName, false); + BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES); + processEventsAndVerifyStatus(prevFlag); + // Alter partition event, managed and external table + String location = "/path/to/partition"; + alterPartitions(tblName, partVals, location); + alterPartitions(managedTbl, partVals, location); + BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES); + processEventsAndVerifyStatus(prevFlag); + // Batch partition event + partVals.clear(); + partVals.add(Arrays.asList("2")); + partVals.add(Arrays.asList("3")); + addPartitions(TEST_DB_NAME, tblName, partVals); + eventsProcessor_.processEvents(); + simulateFiringInsertEvent(tblName, false); + BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES); + processEventsAndVerifyStatus(prevFlag); + // commit compaction event + addPartitions(TEST_DB_NAME, managedTbl, partVals); + simulateFiringInsertEvent(managedTbl, true); + simulateFiringInsertEvent(managedTbl, true); + BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES); + eventsProcessor_.processEvents(); + // Run hive query to trigger compaction + try (HiveJdbcClientPool jdbcClientPool = HiveJdbcClientPool.create(1); + HiveJdbcClientPool.HiveJdbcClient hiveClient = jdbcClientPool.getClient()) { + hiveClient.executeSql( + "alter table " + TEST_DB_NAME + '.' + managedTbl + + " partition(p1=1) compact 'minor' and wait"); + } + BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES); + processEventsAndVerifyStatus(prevFlag); + } finally { + BackendConfig.INSTANCE.setDebugActions(prevFlag); + } + } + + private void simulateFiringInsertEvent(String tblName, boolean isTransactional) + throws Exception { + org.apache.hadoop.hive.metastore.api.Table msTbl; + List<Partition> partitions; + try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { + msTbl = metaStoreClient.getHiveClient().getTable(TEST_DB_NAME, tblName); + partitions = MetastoreShim + .getPartitions(metaStoreClient.getHiveClient(), TEST_DB_NAME, tblName); + } + assertNotNull(msTbl); + assertNotNull(partitions); + if (isTransactional) { + for (Partition part : partitions) { + try (MetaStoreClient client = catalog_.getMetaStoreClient()) { + long txnId = MetastoreShim.openTransaction(client.getHiveClient()); + long writeId = MetastoreShim.allocateTableWriteId( + client.getHiveClient(), txnId, TEST_DB_NAME, tblName); + simulateInsertIntoTransactionalTableFromFS(msTbl, part, 1, txnId, writeId); + MetastoreShim.commitTransaction(client.getHiveClient(), txnId); + } + } + } else { + for (Partition part : partitions) { + simulateInsertIntoTableFromFS(msTbl, 1, part, false); + } + } + } + + private void processEventsAndVerifyStatus(String prevFlag) { + eventsProcessor_.processEvents(); + assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus()); + BackendConfig.INSTANCE.setDebugActions(prevFlag); + } + public void testAllocWriteIdEvent(String tblName, boolean isPartitioned, boolean isLoadTable) throws TException, TransactionException, CatalogException { createDatabase(TEST_DB_NAME, null);
