This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f5a15ad6055dc4cd1baae2a11c56a969c5d30a31 Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Thu May 16 18:54:35 2024 -0700 IMPALA-12277: Fix NullPointerException for partitioned inserts when partition list is stale When event processor is turned off, inserting values into partitioned table can lead to NullPointerException if the partition is deleted outside impala (eg: HMS). Since event processor is turned off, impala is unaware of the metadata changes to the table. Currently in impala, we always load the partitions from cached table. This can lead to data inconsistency issue especially in the case of event processor being turned off or lagged behind. This patch address this issue by always verify the target partitions from HMS without loading the file metadata from HMS regardless of state of event processor. This approach will ensure that partition metadata is always consistent with metastore. The issue can be seen with the following steps: - Turn off the event processor - create a partitioned table and add a partition from impala - drop the same partition from hive - from impala, insert values into the partition (expectation is that if the partition didn't exist, it will create a new one). Testing: - Verified manually that NullPointerException is avoided with this patch - Added end-to-end tests to verify the above scenario for external and manged tables. Change-Id: Ide8f1f6bf017e9a040b53bb5d5291ff2ea3e0d18 Reviewed-on: http://gerrit.cloudera.org:8080/21437 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../java/org/apache/impala/catalog/HdfsTable.java | 97 ++++-- .../apache/impala/catalog/HdfsTableLoadParams.java | 111 +++++++ .../impala/catalog/HdfsTableLoadParamsBuilder.java | 126 +++++++ .../apache/impala/service/CatalogOpExecutor.java | 363 ++++++++++++--------- tests/custom_cluster/test_events_custom_configs.py | 30 ++ 5 files changed, 541 insertions(+), 186 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 7bc4edb48..85f4f5ae9 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -1227,7 +1227,26 @@ public class HdfsTable extends Table implements FeFsTable { /* partitionsToUpdate*/null, null, null, reason, catalogTimeline); } - /** + public void load(boolean reuseMetadata, IMetaStoreClient client, + org.apache.hadoop.hive.metastore.api.Table msTbl, + boolean loadPartitionFileMetadata, boolean loadTableSchema, + boolean refreshUpdatedPartitions, @Nullable Set<String> partitionsToUpdate, + @Nullable String debugAction, @Nullable Map<String, Long> partitionToEventId, + String reason, EventSequence catalogTimeline) throws TableLoadingException { + load(new HdfsTableLoadParamsBuilder(client, msTbl) + .reuseMetadata(reuseMetadata) + .setLoadPartitionFileMetadata(loadPartitionFileMetadata) + .setLoadTableSchema(loadTableSchema) + .setRefreshUpdatedPartitions(refreshUpdatedPartitions) + .setPartitionsToUpdate(partitionsToUpdate) + .setDebugAction(debugAction).setReason(reason) + .setPartitionToEventId(partitionToEventId) + .setIsPreLoadForInsert(false) + .setCatalogTimeline(catalogTimeline).build()); + } + + /** + * See 'HdfsTableLoadParams' class for the argument list passed into this method. * Loads table metadata from the Hive Metastore. * * If 'reuseMetadata' is false, performs a full metadata load from the Hive Metastore, @@ -1245,25 +1264,29 @@ public class HdfsTable extends Table implements FeFsTable { * * If 'loadTableSchema' is true, the table schema is loaded from the Hive Metastore. * + * If 'isPreLoadForInsert' is true, then we intend to refresh partitions from the Hive + * Metastore without reloading the file metadata(this is done in later steps) to ensure + * consistency while inserting into partitioned tables. + * * Existing file descriptors might be reused incorrectly if Hdfs rebalancer was * executed, as it changes the block locations but doesn't update the mtime (file * modification time). * If this occurs, user has to execute "invalidate metadata" to invalidate the * metadata cache of the table and trigger a fresh load. */ - public void load(boolean reuseMetadata, IMetaStoreClient client, - org.apache.hadoop.hive.metastore.api.Table msTbl, boolean loadPartitionFileMetadata, - boolean loadTableSchema, boolean refreshUpdatedPartitions, - @Nullable Set<String> partitionsToUpdate, @Nullable String debugAction, - @Nullable Map<String, Long> partitionToEventId, String reason, - EventSequence catalogTimeline) throws TableLoadingException { + public void load(HdfsTableLoadParams loadParams) throws TableLoadingException { final Timer.Context context = getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time(); + IMetaStoreClient msClient = loadParams.getMsClient(); + org.apache.hadoop.hive.metastore.api.Table msTbl = loadParams.getMsTable(); + EventSequence catalogTimeline = loadParams.getCatalogTimeline(); + Set<String> partitionsToUpdate = loadParams.getPartitionsToUpdate(); String annotation = String.format("%s metadata for %s%s partition(s) of %s.%s (%s)", - reuseMetadata ? "Reloading" : "Loading", - loadTableSchema ? "table definition and " : "", + loadParams.getIsReuseMetadata() ? "Reloading" : "Loading", + loadParams.getLoadTableSchema() ? "table definition and " : "", partitionsToUpdate == null ? "all" : String.valueOf(partitionsToUpdate.size()), - msTbl.getDbName(), msTbl.getTableName(), reason); + msTbl.getDbName(), msTbl.getTableName(), + loadParams.getReason()); LOG.info(annotation); final Timer storageLdTimer = getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA); @@ -1273,36 +1296,41 @@ public class HdfsTable extends Table implements FeFsTable { // turn all exceptions into TableLoadingException msTable_ = msTbl; try { - if (loadTableSchema) { + if (loadParams.getLoadTableSchema()) { // set nullPartitionKeyValue from the hive conf. nullPartitionKeyValue_ = - MetaStoreUtil.getNullPartitionKeyValue(client).intern(); + MetaStoreUtil.getNullPartitionKeyValue(msClient).intern(); loadSchema(msTbl); - loadAllColumnStats(client, catalogTimeline); - loadConstraintsInfo(client, msTbl); + loadAllColumnStats(msClient, catalogTimeline); + loadConstraintsInfo(msClient, msTbl); catalogTimeline.markEvent("Loaded table schema"); } - loadValidWriteIdList(client); + loadValidWriteIdList(msClient); // Set table-level stats first so partition stats can inherit it. setTableStats(msTbl); // Load partition and file metadata - if (reuseMetadata) { + if (loadParams.getIsReuseMetadata()) { // Incrementally update this table's partitions and file metadata - Preconditions.checkState( - partitionsToUpdate == null || loadPartitionFileMetadata); + if (!loadParams.getIsPreLoadForInsert()) { + Preconditions.checkState(partitionsToUpdate == null || + loadParams.isLoadPartitionFileMetadata(), "Conflicts in " + + "'partitionsToUpdate' and 'loadPartitionFileMetadata'"); + } storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl); if (msTbl.getPartitionKeysSize() == 0) { - if (loadPartitionFileMetadata) { - storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(client, - debugAction, catalogTimeline); + if (loadParams.isLoadPartitionFileMetadata()) { + storageMetadataLoadTime_ += updateUnpartitionedTableFileMd( + msClient, loadParams.getDebugAction(), + catalogTimeline); } else { // Update the single partition stats in case table stats changes. updateUnpartitionedTableStats(); } } else { - storageMetadataLoadTime_ += updatePartitionsFromHms( - client, partitionsToUpdate, loadPartitionFileMetadata, - refreshUpdatedPartitions, partitionToEventId, debugAction, - catalogTimeline); + storageMetadataLoadTime_ += updatePartitionsFromHms(msClient, + partitionsToUpdate, loadParams.isLoadPartitionFileMetadata(), + loadParams.getRefreshUpdatedPartitions(), + loadParams.getPartitionToEventId(), loadParams.getDebugAction(), + catalogTimeline, loadParams.getIsPreLoadForInsert()); } LOG.info("Incrementally loaded table metadata for: " + getFullName()); } else { @@ -1311,14 +1339,16 @@ public class HdfsTable extends Table implements FeFsTable { getMetrics().getTimer(HdfsTable.LOAD_DURATION_ALL_PARTITIONS).time(); // Load all partitions from Hive Metastore, including file metadata. List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions = - MetaStoreUtil.fetchAllPartitions( - client, msTbl, NUM_PARTITION_FETCH_RETRIES); + MetaStoreUtil.fetchAllPartitions(msClient, + msTbl, NUM_PARTITION_FETCH_RETRIES); LOG.info("Fetched partition metadata from the Metastore: " + getFullName()); - storageMetadataLoadTime_ = loadAllPartitions(client, msPartitions, msTbl, - catalogTimeline); + storageMetadataLoadTime_ = loadAllPartitions(msClient, + msPartitions, msTbl, catalogTimeline); allPartitionsLdContext.stop(); } - if (loadTableSchema) setAvroSchema(client, msTbl, catalogTimeline); + if (loadParams.getLoadTableSchema()) { + setAvroSchema(msClient, msTbl, catalogTimeline); + } fileMetadataStats_.unset(); refreshLastUsedTime(); // Make sure all the partition modifications are done. @@ -1446,12 +1476,15 @@ public class HdfsTable extends Table implements FeFsTable { private long updatePartitionsFromHms(IMetaStoreClient client, Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata, boolean refreshUpdatedPartitions, Map<String, Long> partitionToEventId, - String debugAction, EventSequence catalogTimeline) throws Exception { + String debugAction, EventSequence catalogTimeline, boolean isPreLoadForInsert) + throws Exception { if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + getFullName()); org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable(); Preconditions.checkNotNull(msTbl); Preconditions.checkState(msTbl.getPartitionKeysSize() != 0); - Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null); + if (!isPreLoadForInsert) { + Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate == null); + } PartitionDeltaUpdater deltaUpdater; if (refreshUpdatedPartitions) { deltaUpdater = new PartBasedDeltaUpdater(client, diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParams.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParams.java new file mode 100644 index 000000000..9beb3bc79 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParams.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.impala.util.EventSequence; + +/** + * The arguments required to load the HdfsTable object + * + * Use HdfsTableLoadParamsBuilder to create this instance. + */ +public class HdfsTableLoadParams { + private final boolean reuseMetadata_; + private final IMetaStoreClient client_; + private final Table msTbl_; + private final boolean loadPartitionFileMetadata_; + private final boolean loadTableSchema_; + private final boolean refreshUpdatedPartitions_; + private final Set<String> partitionsToUpdate_; + private final String debugAction_; + private final Map<String, Long> partitionToEventId_; + private final String reason_; + private final EventSequence catalogTimeline_; + private final boolean isPreLoadForInsert_; + + public HdfsTableLoadParams(boolean reuseMetadata, IMetaStoreClient client, + Table msTbl, boolean loadPartitionFileMetadata, boolean loadTableSchema, + boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate, + String debugAction, Map<String, Long> partitionToEventId, String reason, + EventSequence catalogTimeline, boolean isPreLoadForInsert) { + reuseMetadata_ = reuseMetadata; + client_ = client; + msTbl_ = msTbl; + loadPartitionFileMetadata_ = loadPartitionFileMetadata; + loadTableSchema_ = loadTableSchema; + refreshUpdatedPartitions_ = refreshUpdatedPartitions; + partitionsToUpdate_ = partitionsToUpdate; + debugAction_ = debugAction; + partitionToEventId_ = partitionToEventId; + reason_ = reason; + catalogTimeline_ = catalogTimeline; + isPreLoadForInsert_ = isPreLoadForInsert; + } + + public boolean getIsReuseMetadata() { + return reuseMetadata_; + } + + public IMetaStoreClient getMsClient() { + return client_; + } + + public Table getMsTable() { + return msTbl_; + } + + public boolean isLoadPartitionFileMetadata() { + return loadPartitionFileMetadata_; + } + + public boolean getLoadTableSchema() { + return loadTableSchema_; + } + + public boolean getRefreshUpdatedPartitions() { + return refreshUpdatedPartitions_; + } + + public Set<String> getPartitionsToUpdate() { + return partitionsToUpdate_; + } + + public String getDebugAction() { + return debugAction_; + } + + public Map<String, Long> getPartitionToEventId() { + return partitionToEventId_; + } + + public String getReason() { + return reason_; + } + + public EventSequence getCatalogTimeline() { + return catalogTimeline_; + } + + public boolean getIsPreLoadForInsert() { + return isPreLoadForInsert_; + } +} diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParamsBuilder.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParamsBuilder.java new file mode 100644 index 000000000..c4f528982 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParamsBuilder.java @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.impala.util.EventSequence; + +/** + * Utility class to help set up the various parameters for load() method in HdfsTable. + */ +public class HdfsTableLoadParamsBuilder { + boolean reuseMetadata_; + IMetaStoreClient client_; + Table msTbl_; + boolean loadPartitionFileMetadata_; + boolean loadTableSchema_; + boolean refreshUpdatedPartitions_; + Set<String> partitionsToUpdate_; + String debugAction_; + Map<String, Long> partitionToEventId_; + String reason_; + EventSequence catalogTimeline_; + boolean isPreLoadForInsert_; + + public HdfsTableLoadParamsBuilder(IMetaStoreClient client, Table msTbl) { + client_ = client; + msTbl_ = msTbl; + } + + public HdfsTableLoadParamsBuilder reuseMetadata(boolean reuseMetadata) { + reuseMetadata_ = reuseMetadata; + return this; + } + + public HdfsTableLoadParamsBuilder setMsClient(IMetaStoreClient client) { + client_ = client; + return this; + } + + public HdfsTableLoadParamsBuilder setMetastoreTable(Table msTbl) { + msTbl_ = msTbl; + return this; + } + + public HdfsTableLoadParamsBuilder setLoadPartitionFileMetadata( + boolean loadPartitionFileMetadata) { + loadPartitionFileMetadata_ = loadPartitionFileMetadata; + return this; + } + + public HdfsTableLoadParamsBuilder setLoadTableSchema(boolean loadTableSchema) { + loadTableSchema_ = loadTableSchema; + return this; + } + + public HdfsTableLoadParamsBuilder setRefreshUpdatedPartitions( + boolean refreshUpdatedPartitions) { + refreshUpdatedPartitions_ = refreshUpdatedPartitions; + return this; + } + + public HdfsTableLoadParamsBuilder setPartitionsToUpdate( + Set<String> partitionsToUpdate) { + // 'partitionsToUpdate' should be a pass-by-value argument as + // HdfsTable#updatePartitionsFromHms() may modify this HashSet if the + // partitions already exists is metastore but not in Impala's cache. + // Reason: 'partitionsToUpdate' is used by HdfsTable#load() to reload file + // metadata of newly created partitions in Impala. + if (partitionsToUpdate != null) { //partitionsToUpdate can be null + partitionsToUpdate_ = new HashSet<>(partitionsToUpdate); + } + return this; + } + + public HdfsTableLoadParamsBuilder setDebugAction(String debugAction) { + debugAction_ = debugAction; + return this; + } + + public HdfsTableLoadParamsBuilder setPartitionToEventId( + Map<String, Long> partitionToEventId) { + partitionToEventId_ = partitionToEventId; + return this; + } + + public HdfsTableLoadParamsBuilder setReason(String reason) { + reason_ = reason; + return this; + } + + public HdfsTableLoadParamsBuilder setCatalogTimeline(EventSequence catalogTimeline) { + catalogTimeline_ = catalogTimeline; + return this; + } + + public HdfsTableLoadParamsBuilder setIsPreLoadForInsert(boolean isPreLoadForInsert) { + isPreLoadForInsert_ = isPreLoadForInsert; + return this; + } + + public HdfsTableLoadParams build() { + return new HdfsTableLoadParams(reuseMetadata_, client_, msTbl_, + loadPartitionFileMetadata_, loadTableSchema_, refreshUpdatedPartitions_, + partitionsToUpdate_, debugAction_, partitionToEventId_, reason_, + catalogTimeline_, isPreLoadForInsert_); + } +} \ No newline at end of file diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 47f5b07a6..4a7d31b24 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -119,6 +119,7 @@ import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.HdfsPartition; import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.catalog.HdfsTableLoadParamsBuilder; import org.apache.impala.catalog.HiveStorageDescriptorFactory; import org.apache.impala.catalog.IncompleteTable; import org.apache.impala.catalog.KuduColumn; @@ -7229,24 +7230,8 @@ public class CatalogOpExecutor { modification = new InProgressTableModification(catalog_, table); catalog_.getLock().writeLock().unlock(); - TblTransaction tblTxn = null; - if (update.isSetTransaction_id()) { - long transactionId = update.getTransaction_id(); - Preconditions.checkState(transactionId > 0); - try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { - if (DebugUtils.hasDebugAction(update.getDebug_action(), - DebugUtils.UPDATE_CATALOG_ABORT_INSERT_TXN)) { - MetastoreShim.abortTransaction(msClient.getHiveClient(), transactionId); - LOG.info("Aborted txn due to the debug action."); - } - // Setup transactional parameters needed to do alter table/partitions later. - // TODO: Could be optimized to possibly save some RPCs, as these parameters are - // not always needed + the writeId of the INSERT could be probably reused. - tblTxn = MetastoreShim.createTblTransaction( - msClient.getHiveClient(), table.getMetaStoreTable(), transactionId); - catalogTimeline.markEvent("Created Metastore transaction"); - } - } + TblTransaction tblTxn = createTableTransactionIfApplicable(update, + catalogTimeline, table); // Collects the cache directive IDs of any cached table/partitions that were // targeted. A watch on these cache directives is submitted to the @@ -7263,6 +7248,27 @@ public class CatalogOpExecutor { TableName tblName = new TableName(table.getDb().getName(), table.getName()); List<String> errorMessages = Lists.newArrayList(); HashSet<String> partsToLoadMetadata = null; + modification.addCatalogServiceIdentifiersToTable(); + org.apache.hadoop.hive.metastore.api.Table msTbl = + table.getMetaStoreTable().deepCopy(); + HashSet<String> partsToCreate = new HashSet<>(); + if (table.getNumClusteringCols() > 0) { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { + partsToCreate = Sets.newHashSet(update.getUpdated_partitions().keySet()); + ((HdfsTable) table).load( + new HdfsTableLoadParamsBuilder(msClient.getHiveClient(), msTbl) + .reuseMetadata(true) + .setLoadPartitionFileMetadata(false) + .setLoadTableSchema(false) + .setRefreshUpdatedPartitions(false) + .setPartitionsToUpdate(partsToCreate) + .setDebugAction(update.getDebug_action()) + .setReason("Preload for INSERT") + .setIsPreLoadForInsert(true) + .setCatalogTimeline(catalogTimeline) + .build()); + } + } Collection<? extends FeFsPartition> parts = FeCatalogUtils.loadAllPartitions((FeFsTable)table); List<FeFsPartition> affectedExistingPartitions = new ArrayList<>(); @@ -7277,129 +7283,22 @@ public class CatalogOpExecutor { // partitions in this map. This is used later on when table is reloaded to set // the createEventId for the partitions. Map<String, Long> partitionToEventId = new HashMap<>(); - modification.addCatalogServiceIdentifiersToTable(); if (table.getNumClusteringCols() > 0) { // Set of all partition names targeted by the insert that need to be created // in the Metastore (partitions that do not currently exist in the catalog). // In the BE, we don't currently distinguish between which targeted partitions // are new and which already exist, so initialize the set with all targeted // partition names and remove the ones that are found to exist. - HashSet<String> partsToCreate = - Sets.newHashSet(update.getUpdated_partitions().keySet()); partsToLoadMetadata = Sets.newHashSet(partsToCreate); for (FeFsPartition partition: parts) { - String partName = partition.getPartitionName(); - // Attempt to remove this partition name from partsToCreate. If remove - // returns true, it indicates the partition already exists. - if (partsToCreate.remove(partName)) { - affectedExistingPartitions.add(partition); - // For existing partitions, we need to unset column_stats_accurate to - // tell hive the statistics is not accurate any longer. - if (partition.getParameters() != null && partition.getParameters() - .containsKey(StatsSetupConst.COLUMN_STATS_ACCURATE)) { - org.apache.hadoop.hive.metastore.api.Partition hmsPartition = - ((HdfsPartition) partition).toHmsPartition(); - hmsPartition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); - hmsPartitionsStatsUnset.add(hmsPartition); - } - if (partition.isMarkedCached()) { - // The partition was targeted by the insert and is also cached. Since - // data was written to the partition, a watch needs to be placed on the - // cache directive so the TableLoadingMgr can perform an async - // refresh once all data becomes cached. - cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId( - partition.getParameters())); - } - } - if (partsToCreate.size() == 0) break; - } - - if (!partsToCreate.isEmpty()) { - try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { - org.apache.hadoop.hive.metastore.api.Table msTbl = - table.getMetaStoreTable().deepCopy(); - List<org.apache.hadoop.hive.metastore.api.Partition> hmsParts = - Lists.newArrayList(); - HiveConf hiveConf = new HiveConf(this.getClass()); - Warehouse warehouse = new Warehouse(hiveConf); - for (String partName: partsToCreate) { - org.apache.hadoop.hive.metastore.api.Partition partition = - new org.apache.hadoop.hive.metastore.api.Partition(); - hmsParts.add(partition); - - partition.setDbName(tblName.getDb()); - partition.setTableName(tblName.getTbl()); - partition.setValues(MetaStoreUtil.getPartValsFromName(msTbl, partName)); - partition.setSd(MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd())); - partition.getSd().setLocation(msTbl.getSd().getLocation() + "/" + partName); - if (AcidUtils.isTransactionalTable(msTbl.getParameters())) { - // Self event detection is deprecated for non-transactional tables add - // partition. So we add catalog service identifiers only for - // transactional tables - addCatalogServiceIdentifiers(msTbl, partition); - } - MetastoreShim.updatePartitionStatsFast(partition, msTbl, warehouse); - } - - // First add_partitions and then alter_partitions the successful ones with - // caching directives. The reason is that some partitions could have been - // added concurrently, and we want to avoid caching a partition twice and - // leaking a caching directive. - List<Partition> addedHmsParts = addHmsPartitions( - msClient, table, hmsParts, partitionToEventId, true, catalogTimeline); - for (Partition part: addedHmsParts) { - String part_name = - FeCatalogUtils.getPartitionName((FeFsTable)table, part.getValues()); - addedPartitionNames.put(part_name, part.getValues()); - } - if (addedHmsParts.size() > 0) { - if (cachePoolName != null) { - List<org.apache.hadoop.hive.metastore.api.Partition> cachedHmsParts = - Lists.newArrayList(); - // Submit a new cache directive and update the partition metadata with - // the directive id. - for (org.apache.hadoop.hive.metastore.api.Partition part: addedHmsParts) { - try { - cacheDirIds.add(HdfsCachingUtil.submitCachePartitionDirective( - part, cachePoolName, cacheReplication)); - StatsSetupConst.setBasicStatsState(part.getParameters(), "false"); - cachedHmsParts.add(part); - } catch (ImpalaRuntimeException e) { - String msg = String.format("Partition %s.%s(%s): State: Not " + - "cached. Action: Cache manully via 'ALTER TABLE'.", - part.getDbName(), part.getTableName(), part.getValues()); - LOG.error(msg, e); - errorMessages.add(msg); - } - } - try { - MetastoreShim.alterPartitions(msClient.getHiveClient(), tblName.getDb(), - tblName.getTbl(), cachedHmsParts); - } catch (Exception e) { - LOG.error("Failed in alter_partitions: ", e); - // Try to uncache the partitions when the alteration in the HMS - // failed. - for (org.apache.hadoop.hive.metastore.api.Partition part: - cachedHmsParts) { - try { - HdfsCachingUtil.removePartitionCacheDirective(part.getParameters()); - } catch (ImpalaException e1) { - String msg = String.format( - "Partition %s.%s(%s): State: Leaked caching directive. " + - "Action: Manually uncache directory %s via hdfs " + - "cacheAdmin.", part.getDbName(), part.getTableName(), - part.getValues(), part.getSd().getLocation()); - LOG.error(msg, e); - errorMessages.add(msg); - } - } - } - } - } - } catch (Exception e) { - throw new InternalException("Error adding partitions", e); - } + updatePartitionMetadataAndCacheStatus(partsToCreate, + affectedExistingPartitions, partition, hmsPartitionsStatsUnset, + cacheDirIds); + if (partsToCreate.isEmpty()) break; } + addPartitionNamesInMetastore(table, tblName, partsToCreate, addedPartitionNames, + cachePoolName, cacheReplication, catalogTimeline, partitionToEventId, + cacheDirIds, msTbl, errorMessages); // Unset COLUMN_STATS_ACCURATE by calling alter partition to hms. if (!hmsPartitionsStatsUnset.isEmpty()) { @@ -7449,28 +7348,7 @@ public class CatalogOpExecutor { } if (table instanceof FeIcebergTable && update.isSetIceberg_operation()) { - FeIcebergTable iceTbl = (FeIcebergTable)table; - org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(iceTbl); - IcebergCatalogOpExecutor.execute(iceTbl, iceTxn, - update.getIceberg_operation()); - catalogTimeline.markEvent("Executed Iceberg operation " + - update.getIceberg_operation().getOperation()); - if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) { - // Add catalog service id and the 'newCatalogVersion' to the table parameters. - // This way we can avoid reloading the table on self-events (Iceberg generates - // an ALTER TABLE statement to set the new metadata_location). - modification.registerInflightEvent(); - IcebergCatalogOpExecutor.addCatalogVersionToTxn( - iceTxn, catalog_.getCatalogServiceId(), modification.newVersionNumber()); - catalogTimeline.markEvent("Updated table properties"); - } - - if (update.isSetDebug_action()) { - String debugAction = update.getDebug_action(); - DebugUtils.executeDebugAction(debugAction, DebugUtils.ICEBERG_COMMIT); - } - iceTxn.commitTransaction(); - modification.markInflightEventRegistrationComplete(); + insertIntoIcebergTable(table, update, catalogTimeline, modification); } loadTableMetadata(table, modification.newVersionNumber(), true, false, @@ -7505,6 +7383,183 @@ public class CatalogOpExecutor { return response; } + private TblTransaction createTableTransactionIfApplicable(TUpdateCatalogRequest update, + EventSequence catalogTimeline, Table table) throws ImpalaException { + TblTransaction tblTxn = null; + if (update.isSetTransaction_id()) { + long transactionId = update.getTransaction_id(); + Preconditions.checkState(transactionId > 0); + try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { + if (DebugUtils.hasDebugAction(update.getDebug_action(), + DebugUtils.UPDATE_CATALOG_ABORT_INSERT_TXN)) { + MetastoreShim.abortTransaction(msClient.getHiveClient(), transactionId); + LOG.info("Aborted txn due to the debug action."); + } + // Setup transactional parameters needed to do alter table/partitions later. + // TODO: Could be optimized to possibly save some RPCs, as these parameters are + // not always needed + the writeId of the INSERT could be probably reused. + tblTxn = MetastoreShim.createTblTransaction( + msClient.getHiveClient(), table.getMetaStoreTable(), transactionId); + catalogTimeline.markEvent("Created Metastore transaction"); + } + } + return tblTxn; + } + + /** + * This process creates any missing partitions and clears a table property related to + * COLUMN_STATS_ACCURATE. It also gathers information about the cache directory + * IDs. + */ + private void updatePartitionMetadataAndCacheStatus( + HashSet<String> pickupExistingPartitions, + List<FeFsPartition> affectedExistingPartitions, FeFsPartition partition, + List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitionsStatsUnset, + List<Long> cacheDirIds) throws ImpalaException { + String partName = partition.getPartitionName(); + // Attempt to remove this partition name from pickupExistingPartitions. If remove + // returns true, it indicates the partition already exists. + if (pickupExistingPartitions.remove(partName)) { + affectedExistingPartitions.add(partition); + // For existing partitions, we need to unset column_stats_accurate to + // tell hive the statistics is not accurate any longer. + if (partition.getParameters() != null && partition.getParameters() + .containsKey(StatsSetupConst.COLUMN_STATS_ACCURATE)) { + org.apache.hadoop.hive.metastore.api.Partition hmsPartition = + ((HdfsPartition) partition).toHmsPartition(); + hmsPartition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); + hmsPartitionsStatsUnset.add(hmsPartition); + } + if (partition.isMarkedCached()) { + // The partition was targeted by the insert and is also cached. Since + // data was written to the partition, a watch needs to be placed on the + // cache directive so the TableLoadingMgr can perform an async + // refresh once all data becomes cached. + cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId( + partition.getParameters())); + } + } + } + + private void addPartitionNamesInMetastore(Table table, TableName tblName, + HashSet<String> partsToCreate, Map<String, List<String>> addedPartitionNames, + String cachePoolName, Short cacheReplication, EventSequence catalogTimeline, + Map<String, Long> partitionToEventId, List<Long> cacheDirIds, + org.apache.hadoop.hive.metastore.api.Table msTbl, List<String> errorMessages) + throws ImpalaException { + if (!partsToCreate.isEmpty()) { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { + List<org.apache.hadoop.hive.metastore.api.Partition> hmsParts = + Lists.newArrayList(); + HiveConf hiveConf = new HiveConf(this.getClass()); + Warehouse warehouse = new Warehouse(hiveConf); + for (String partName: partsToCreate) { + org.apache.hadoop.hive.metastore.api.Partition partition = + new org.apache.hadoop.hive.metastore.api.Partition(); + hmsParts.add(partition); + + partition.setDbName(tblName.getDb()); + partition.setTableName(tblName.getTbl()); + partition.setValues(MetaStoreUtil.getPartValsFromName(msTbl, partName)); + partition.setSd(MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd())); + partition.getSd().setLocation(msTbl.getSd().getLocation() + "/" + partName); + if (AcidUtils.isTransactionalTable(msTbl.getParameters())) { + // Self event detection is deprecated for non-transactional tables add + // partition. So we add catalog service identifiers only for + // transactional tables + addCatalogServiceIdentifiers(msTbl, partition); + } + MetastoreShim.updatePartitionStatsFast(partition, msTbl, warehouse); + } + + // First add_partitions and then alter_partitions the successful ones with + // caching directives. The reason is that some partitions could have been + // added concurrently, and we want to avoid caching a partition twice and + // leaking a caching directive. + List<Partition> addedHmsParts = addHmsPartitions( + msClient, table, hmsParts, partitionToEventId, true, catalogTimeline); + for (Partition part: addedHmsParts) { + String part_name = + FeCatalogUtils.getPartitionName((FeFsTable)table, part.getValues()); + addedPartitionNames.put(part_name, part.getValues()); + } + if (addedHmsParts.size() > 0) { + if (cachePoolName != null) { + List<org.apache.hadoop.hive.metastore.api.Partition> cachedHmsParts = + Lists.newArrayList(); + // Submit a new cache directive and update the partition metadata with + // the directive id. + for (org.apache.hadoop.hive.metastore.api.Partition part: addedHmsParts) { + try { + cacheDirIds.add(HdfsCachingUtil.submitCachePartitionDirective( + part, cachePoolName, cacheReplication)); + StatsSetupConst.setBasicStatsState(part.getParameters(), "false"); + cachedHmsParts.add(part); + } catch (ImpalaRuntimeException e) { + String msg = String.format("Partition %s.%s(%s): State: Not " + + "cached. Action: Cache manully via 'ALTER TABLE'.", + part.getDbName(), part.getTableName(), part.getValues()); + LOG.error(msg, e); + errorMessages.add(msg); + } + } + try { + MetastoreShim.alterPartitions(msClient.getHiveClient(), tblName.getDb(), + tblName.getTbl(), cachedHmsParts); + } catch (Exception e) { + LOG.error("Failed in alter_partitions: ", e); + // Try to uncache the partitions when the alteration in the HMS + // failed. + for (org.apache.hadoop.hive.metastore.api.Partition part: + cachedHmsParts) { + try { + HdfsCachingUtil.removePartitionCacheDirective(part.getParameters()); + } catch (ImpalaException e1) { + String msg = String.format( + "Partition %s.%s(%s): State: Leaked caching directive. " + + "Action: Manually uncache directory %s via hdfs " + + "cacheAdmin.", part.getDbName(), part.getTableName(), + part.getValues(), part.getSd().getLocation()); + LOG.error(msg, e); + errorMessages.add(msg); + } + } + } + } + } + } catch (Exception e) { + throw new InternalException("Error adding partitions", e); + } + } + } + + private void insertIntoIcebergTable(Table table, TUpdateCatalogRequest update, + EventSequence catalogTimeline, InProgressTableModification modification) + throws ImpalaException { + FeIcebergTable iceTbl = (FeIcebergTable)table; + org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(iceTbl); + IcebergCatalogOpExecutor.execute(iceTbl, iceTxn, + update.getIceberg_operation()); + catalogTimeline.markEvent("Executed Iceberg operation " + + update.getIceberg_operation().getOperation()); + if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) { + // Add catalog service id and the 'newCatalogVersion' to the table parameters. + // This way we can avoid reloading the table on self-events (Iceberg generates + // an ALTER TABLE statement to set the new metadata_location). + modification.registerInflightEvent(); + IcebergCatalogOpExecutor.addCatalogVersionToTxn( + iceTxn, catalog_.getCatalogServiceId(), modification.newVersionNumber()); + catalogTimeline.markEvent("Updated table properties"); + } + + if (update.isSetDebug_action()) { + String debugAction = update.getDebug_action(); + DebugUtils.executeDebugAction(debugAction, DebugUtils.ICEBERG_COMMIT); + } + iceTxn.commitTransaction(); + modification.markInflightEventRegistrationComplete(); + } + /** * Populates insert event data and calls fireInsertEvents() if external event * processing is enabled. diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 941bd31e7..4f37ca1b1 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -1276,6 +1276,36 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): tables_removed_after = EventProcessorUtils.get_int_metric("tables-removed") assert tables_removed_after == tables_removed_before + @CustomClusterTestSuite.with_args( + catalogd_args="--hms_event_polling_interval_s=0") + def test_ep_delay_metadata_reload_for_insert(self, unique_database): + """IMPALA-12277: This test verifies that insert operation on a partitioned table + succeeds if the partition is dropped previously externally with the event processor + being lagging or not active. Impala should create a new partition in this case.""" + + def verify_partition(is_transactional, test_table="insert_part_reload"): + query = " ".join(["create table {}.{} (i int)", " partitioned by (year int) ", + self._get_transactional_tblproperties(is_transactional)]) + self.client.execute(query.format(unique_database, test_table)) + self.client.execute("insert into {}.{} partition(year) values (0,2024), (0,2022)" + .format(unique_database, test_table)) + self.run_stmt_in_hive("alter table {}.{} drop partition(year=2024)" + .format(unique_database, test_table)) + self.run_stmt_in_hive("alter table {}.{} add partition(year=2023)" + .format(unique_database, test_table)) + self.run_stmt_in_hive("insert into {}.{} partition(year=2021) values (1)" + .format(unique_database, test_table)) + self.client.execute( + "insert into {}.{} partition(year) values (0,2023), (0,2024), (0,2022)" + .format(unique_database, test_table)) + EventProcessorUtils.wait_for_event_processing(self) + results = self.client.execute( + "select DISTINCT(year) from {}.{} order by year desc" + .format(unique_database, test_table)) + assert results.data == ["2024", "2023", "2022", "2021"] + self.client.execute("drop table {}.{}".format(unique_database, test_table)) + verify_partition(True) + verify_partition(False) @SkipIfFS.hive class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):
