This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f5a15ad6055dc4cd1baae2a11c56a969c5d30a31
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Thu May 16 18:54:35 2024 -0700

    IMPALA-12277: Fix NullPointerException for partitioned inserts when
    partition list is stale
    
    When event processor is turned off, inserting values into partitioned
    table can lead to NullPointerException if the partition is deleted
    outside impala (eg: HMS). Since event processor is turned off, impala
    is unaware of the metadata changes to the table.
    
    Currently in impala, we always load the partitions from cached table.
    This can lead to data inconsistency issue especially in the case
    of event processor being turned off or lagged behind. This patch
    address this issue by always verify the target partitions from HMS
    without loading the file metadata from HMS regardless of state of event
    processor. This approach will ensure that partition metadata is
    always consistent with metastore.
    
    The issue can be seen with the following steps:
    - Turn off the event processor
    - create a partitioned table and add a partition from impala
    - drop the same partition from hive
    - from impala, insert values into the partition (expectation is that
    if the partition didn't exist, it will create a new one).
    
    Testing:
    - Verified manually that NullPointerException is avoided with this patch
    - Added end-to-end tests to verify the above scenario for external and
    manged tables.
    
    Change-Id: Ide8f1f6bf017e9a040b53bb5d5291ff2ea3e0d18
    Reviewed-on: http://gerrit.cloudera.org:8080/21437
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  |  97 ++++--
 .../apache/impala/catalog/HdfsTableLoadParams.java | 111 +++++++
 .../impala/catalog/HdfsTableLoadParamsBuilder.java | 126 +++++++
 .../apache/impala/service/CatalogOpExecutor.java   | 363 ++++++++++++---------
 tests/custom_cluster/test_events_custom_configs.py |  30 ++
 5 files changed, 541 insertions(+), 186 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 7bc4edb48..85f4f5ae9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1227,7 +1227,26 @@ public class HdfsTable extends Table implements 
FeFsTable {
         /* partitionsToUpdate*/null, null, null, reason, catalogTimeline);
   }
 
-  /**
+  public void load(boolean reuseMetadata, IMetaStoreClient client,
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      boolean loadPartitionFileMetadata, boolean loadTableSchema,
+      boolean refreshUpdatedPartitions, @Nullable Set<String> 
partitionsToUpdate,
+      @Nullable String debugAction, @Nullable Map<String, Long> 
partitionToEventId,
+      String reason, EventSequence catalogTimeline) throws 
TableLoadingException {
+    load(new HdfsTableLoadParamsBuilder(client, msTbl)
+        .reuseMetadata(reuseMetadata)
+        .setLoadPartitionFileMetadata(loadPartitionFileMetadata)
+        .setLoadTableSchema(loadTableSchema)
+        .setRefreshUpdatedPartitions(refreshUpdatedPartitions)
+        .setPartitionsToUpdate(partitionsToUpdate)
+        .setDebugAction(debugAction).setReason(reason)
+        .setPartitionToEventId(partitionToEventId)
+        .setIsPreLoadForInsert(false)
+        .setCatalogTimeline(catalogTimeline).build());
+  }
+
+  /**
+   * See 'HdfsTableLoadParams' class for the argument list passed into this 
method.
    * Loads table metadata from the Hive Metastore.
    *
    * If 'reuseMetadata' is false, performs a full metadata load from the Hive 
Metastore,
@@ -1245,25 +1264,29 @@ public class HdfsTable extends Table implements 
FeFsTable {
    *
    * If 'loadTableSchema' is true, the table schema is loaded from the Hive 
Metastore.
    *
+   * If 'isPreLoadForInsert' is true, then we intend to refresh partitions 
from the Hive
+   * Metastore without reloading the file metadata(this is done in later 
steps) to ensure
+   * consistency while inserting into partitioned tables.
+   *
    * Existing file descriptors might be reused incorrectly if Hdfs rebalancer 
was
    * executed, as it changes the block locations but doesn't update the mtime 
(file
    * modification time).
    * If this occurs, user has to execute "invalidate metadata" to invalidate 
the
    * metadata cache of the table and trigger a fresh load.
    */
-  public void load(boolean reuseMetadata, IMetaStoreClient client,
-      org.apache.hadoop.hive.metastore.api.Table msTbl, boolean 
loadPartitionFileMetadata,
-      boolean loadTableSchema, boolean refreshUpdatedPartitions,
-      @Nullable Set<String> partitionsToUpdate, @Nullable String debugAction,
-      @Nullable Map<String, Long> partitionToEventId, String reason,
-      EventSequence catalogTimeline) throws TableLoadingException {
+  public void load(HdfsTableLoadParams loadParams) throws 
TableLoadingException {
     final Timer.Context context =
         getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
+    IMetaStoreClient msClient = loadParams.getMsClient();
+    org.apache.hadoop.hive.metastore.api.Table msTbl = loadParams.getMsTable();
+    EventSequence catalogTimeline = loadParams.getCatalogTimeline();
+    Set<String> partitionsToUpdate = loadParams.getPartitionsToUpdate();
     String annotation = String.format("%s metadata for %s%s partition(s) of 
%s.%s (%s)",
-        reuseMetadata ? "Reloading" : "Loading",
-        loadTableSchema ? "table definition and " : "",
+        loadParams.getIsReuseMetadata() ? "Reloading" : "Loading",
+        loadParams.getLoadTableSchema() ? "table definition and " : "",
         partitionsToUpdate == null ? "all" : 
String.valueOf(partitionsToUpdate.size()),
-        msTbl.getDbName(), msTbl.getTableName(), reason);
+        msTbl.getDbName(), msTbl.getTableName(),
+        loadParams.getReason());
     LOG.info(annotation);
     final Timer storageLdTimer =
         getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA);
@@ -1273,36 +1296,41 @@ public class HdfsTable extends Table implements 
FeFsTable {
       // turn all exceptions into TableLoadingException
       msTable_ = msTbl;
       try {
-        if (loadTableSchema) {
+        if (loadParams.getLoadTableSchema()) {
             // set nullPartitionKeyValue from the hive conf.
           nullPartitionKeyValue_ =
-            MetaStoreUtil.getNullPartitionKeyValue(client).intern();
+            MetaStoreUtil.getNullPartitionKeyValue(msClient).intern();
           loadSchema(msTbl);
-          loadAllColumnStats(client, catalogTimeline);
-          loadConstraintsInfo(client, msTbl);
+          loadAllColumnStats(msClient, catalogTimeline);
+          loadConstraintsInfo(msClient, msTbl);
           catalogTimeline.markEvent("Loaded table schema");
         }
-        loadValidWriteIdList(client);
+        loadValidWriteIdList(msClient);
         // Set table-level stats first so partition stats can inherit it.
         setTableStats(msTbl);
         // Load partition and file metadata
-        if (reuseMetadata) {
+        if (loadParams.getIsReuseMetadata()) {
           // Incrementally update this table's partitions and file metadata
-          Preconditions.checkState(
-              partitionsToUpdate == null || loadPartitionFileMetadata);
+          if (!loadParams.getIsPreLoadForInsert()) {
+            Preconditions.checkState(partitionsToUpdate == null ||
+                loadParams.isLoadPartitionFileMetadata(), "Conflicts in " +
+                "'partitionsToUpdate' and 'loadPartitionFileMetadata'");
+          }
           storageMetadataLoadTime_ += updateMdFromHmsTable(msTbl);
           if (msTbl.getPartitionKeysSize() == 0) {
-            if (loadPartitionFileMetadata) {
-              storageMetadataLoadTime_ += 
updateUnpartitionedTableFileMd(client,
-                  debugAction, catalogTimeline);
+            if (loadParams.isLoadPartitionFileMetadata()) {
+              storageMetadataLoadTime_ += updateUnpartitionedTableFileMd(
+                  msClient, loadParams.getDebugAction(),
+                  catalogTimeline);
             } else {  // Update the single partition stats in case table stats 
changes.
               updateUnpartitionedTableStats();
             }
           } else {
-            storageMetadataLoadTime_ += updatePartitionsFromHms(
-                client, partitionsToUpdate, loadPartitionFileMetadata,
-                refreshUpdatedPartitions, partitionToEventId, debugAction,
-                catalogTimeline);
+            storageMetadataLoadTime_ += updatePartitionsFromHms(msClient,
+                partitionsToUpdate, loadParams.isLoadPartitionFileMetadata(),
+                loadParams.getRefreshUpdatedPartitions(),
+                loadParams.getPartitionToEventId(), 
loadParams.getDebugAction(),
+                catalogTimeline, loadParams.getIsPreLoadForInsert());
           }
           LOG.info("Incrementally loaded table metadata for: " + 
getFullName());
         } else {
@@ -1311,14 +1339,16 @@ public class HdfsTable extends Table implements 
FeFsTable {
               
getMetrics().getTimer(HdfsTable.LOAD_DURATION_ALL_PARTITIONS).time();
           // Load all partitions from Hive Metastore, including file metadata.
           List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
-              MetaStoreUtil.fetchAllPartitions(
-                  client, msTbl, NUM_PARTITION_FETCH_RETRIES);
+              MetaStoreUtil.fetchAllPartitions(msClient,
+                  msTbl, NUM_PARTITION_FETCH_RETRIES);
           LOG.info("Fetched partition metadata from the Metastore: " + 
getFullName());
-          storageMetadataLoadTime_ = loadAllPartitions(client, msPartitions, 
msTbl,
-              catalogTimeline);
+          storageMetadataLoadTime_ = loadAllPartitions(msClient,
+              msPartitions, msTbl, catalogTimeline);
           allPartitionsLdContext.stop();
         }
-        if (loadTableSchema) setAvroSchema(client, msTbl, catalogTimeline);
+        if (loadParams.getLoadTableSchema()) {
+          setAvroSchema(msClient, msTbl, catalogTimeline);
+        }
         fileMetadataStats_.unset();
         refreshLastUsedTime();
         // Make sure all the partition modifications are done.
@@ -1446,12 +1476,15 @@ public class HdfsTable extends Table implements 
FeFsTable {
   private long updatePartitionsFromHms(IMetaStoreClient client,
       Set<String> partitionsToUpdate, boolean loadPartitionFileMetadata,
       boolean refreshUpdatedPartitions, Map<String, Long> partitionToEventId,
-      String debugAction, EventSequence catalogTimeline) throws Exception {
+      String debugAction, EventSequence catalogTimeline, boolean 
isPreLoadForInsert)
+      throws Exception {
     if (LOG.isTraceEnabled()) LOG.trace("Sync table partitions: " + 
getFullName());
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
     Preconditions.checkState(msTbl.getPartitionKeysSize() != 0);
-    Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate 
== null);
+    if (!isPreLoadForInsert) {
+      Preconditions.checkState(loadPartitionFileMetadata || partitionsToUpdate 
== null);
+    }
     PartitionDeltaUpdater deltaUpdater;
     if (refreshUpdatedPartitions) {
       deltaUpdater = new PartBasedDeltaUpdater(client,
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParams.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParams.java
new file mode 100644
index 000000000..9beb3bc79
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParams.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.impala.util.EventSequence;
+
+/**
+ * The arguments required to load the HdfsTable object
+ *
+ * Use HdfsTableLoadParamsBuilder to create this instance.
+ */
+public class HdfsTableLoadParams {
+  private final boolean reuseMetadata_;
+  private final IMetaStoreClient client_;
+  private final Table msTbl_;
+  private final boolean loadPartitionFileMetadata_;
+  private final boolean loadTableSchema_;
+  private final boolean refreshUpdatedPartitions_;
+  private final Set<String> partitionsToUpdate_;
+  private final String debugAction_;
+  private final Map<String, Long> partitionToEventId_;
+  private final String reason_;
+  private final EventSequence catalogTimeline_;
+  private final boolean isPreLoadForInsert_;
+
+  public HdfsTableLoadParams(boolean reuseMetadata, IMetaStoreClient client,
+      Table msTbl, boolean loadPartitionFileMetadata, boolean loadTableSchema,
+      boolean refreshUpdatedPartitions, Set<String> partitionsToUpdate,
+      String debugAction, Map<String, Long> partitionToEventId, String reason,
+      EventSequence catalogTimeline, boolean isPreLoadForInsert) {
+    reuseMetadata_ = reuseMetadata;
+    client_ = client;
+    msTbl_ = msTbl;
+    loadPartitionFileMetadata_ = loadPartitionFileMetadata;
+    loadTableSchema_ = loadTableSchema;
+    refreshUpdatedPartitions_ = refreshUpdatedPartitions;
+    partitionsToUpdate_ = partitionsToUpdate;
+    debugAction_ = debugAction;
+    partitionToEventId_ = partitionToEventId;
+    reason_ = reason;
+    catalogTimeline_ = catalogTimeline;
+    isPreLoadForInsert_ = isPreLoadForInsert;
+  }
+
+  public boolean getIsReuseMetadata() {
+    return reuseMetadata_;
+  }
+
+  public IMetaStoreClient getMsClient() {
+    return client_;
+  }
+
+  public Table getMsTable() {
+    return msTbl_;
+  }
+
+  public boolean isLoadPartitionFileMetadata() {
+    return loadPartitionFileMetadata_;
+  }
+
+  public boolean getLoadTableSchema() {
+    return loadTableSchema_;
+  }
+
+  public boolean getRefreshUpdatedPartitions() {
+    return refreshUpdatedPartitions_;
+  }
+
+  public Set<String> getPartitionsToUpdate() {
+    return partitionsToUpdate_;
+  }
+
+  public String getDebugAction() {
+    return debugAction_;
+  }
+
+  public Map<String, Long> getPartitionToEventId() {
+    return partitionToEventId_;
+  }
+
+  public String getReason() {
+    return reason_;
+  }
+
+  public EventSequence getCatalogTimeline() {
+    return catalogTimeline_;
+  }
+
+  public boolean getIsPreLoadForInsert() {
+    return isPreLoadForInsert_;
+  }
+}
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParamsBuilder.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParamsBuilder.java
new file mode 100644
index 000000000..c4f528982
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTableLoadParamsBuilder.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.impala.util.EventSequence;
+
+/**
+ * Utility class to help set up the various parameters for load() method in 
HdfsTable.
+ */
+public class HdfsTableLoadParamsBuilder {
+  boolean reuseMetadata_;
+  IMetaStoreClient client_;
+  Table msTbl_;
+  boolean loadPartitionFileMetadata_;
+  boolean loadTableSchema_;
+  boolean refreshUpdatedPartitions_;
+  Set<String> partitionsToUpdate_;
+  String debugAction_;
+  Map<String, Long> partitionToEventId_;
+  String reason_;
+  EventSequence catalogTimeline_;
+  boolean isPreLoadForInsert_;
+
+  public HdfsTableLoadParamsBuilder(IMetaStoreClient client, Table msTbl) {
+    client_ = client;
+    msTbl_ = msTbl;
+  }
+
+  public HdfsTableLoadParamsBuilder reuseMetadata(boolean reuseMetadata) {
+    reuseMetadata_ = reuseMetadata;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setMsClient(IMetaStoreClient client) {
+    client_ = client;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setMetastoreTable(Table msTbl) {
+    msTbl_ = msTbl;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setLoadPartitionFileMetadata(
+    boolean loadPartitionFileMetadata) {
+    loadPartitionFileMetadata_ = loadPartitionFileMetadata;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setLoadTableSchema(boolean 
loadTableSchema) {
+    loadTableSchema_ = loadTableSchema;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setRefreshUpdatedPartitions(
+    boolean refreshUpdatedPartitions) {
+    refreshUpdatedPartitions_ = refreshUpdatedPartitions;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setPartitionsToUpdate(
+    Set<String> partitionsToUpdate) {
+    // 'partitionsToUpdate' should be a pass-by-value argument as
+    // HdfsTable#updatePartitionsFromHms() may modify this HashSet if the
+    // partitions already exists is metastore but not in Impala's cache.
+    // Reason: 'partitionsToUpdate' is used by HdfsTable#load() to reload file
+    // metadata of newly created partitions in Impala.
+    if (partitionsToUpdate != null) { //partitionsToUpdate can be null
+      partitionsToUpdate_ = new HashSet<>(partitionsToUpdate);
+    }
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setDebugAction(String debugAction) {
+    debugAction_ = debugAction;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setPartitionToEventId(
+    Map<String, Long> partitionToEventId) {
+    partitionToEventId_ = partitionToEventId;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setReason(String reason) {
+    reason_ = reason;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setCatalogTimeline(EventSequence 
catalogTimeline) {
+    catalogTimeline_ = catalogTimeline;
+    return this;
+  }
+
+  public HdfsTableLoadParamsBuilder setIsPreLoadForInsert(boolean 
isPreLoadForInsert) {
+    isPreLoadForInsert_ = isPreLoadForInsert;
+    return this;
+  }
+
+  public HdfsTableLoadParams build() {
+    return new HdfsTableLoadParams(reuseMetadata_, client_, msTbl_,
+        loadPartitionFileMetadata_, loadTableSchema_, 
refreshUpdatedPartitions_,
+        partitionsToUpdate_, debugAction_, partitionToEventId_, reason_,
+        catalogTimeline_, isPreLoadForInsert_);
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 47f5b07a6..4a7d31b24 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -119,6 +119,7 @@ import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.HdfsTableLoadParamsBuilder;
 import org.apache.impala.catalog.HiveStorageDescriptorFactory;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.KuduColumn;
@@ -7229,24 +7230,8 @@ public class CatalogOpExecutor {
       modification = new InProgressTableModification(catalog_, table);
       catalog_.getLock().writeLock().unlock();
 
-      TblTransaction tblTxn = null;
-      if (update.isSetTransaction_id()) {
-        long transactionId = update.getTransaction_id();
-        Preconditions.checkState(transactionId > 0);
-        try (MetaStoreClient msClient = 
catalog_.getMetaStoreClient(catalogTimeline)) {
-          if (DebugUtils.hasDebugAction(update.getDebug_action(),
-              DebugUtils.UPDATE_CATALOG_ABORT_INSERT_TXN)) {
-            MetastoreShim.abortTransaction(msClient.getHiveClient(), 
transactionId);
-            LOG.info("Aborted txn due to the debug action.");
-          }
-          // Setup transactional parameters needed to do alter 
table/partitions later.
-          // TODO: Could be optimized to possibly save some RPCs, as these 
parameters are
-          //       not always needed + the writeId of the INSERT could be 
probably reused.
-          tblTxn = MetastoreShim.createTblTransaction(
-              msClient.getHiveClient(), table.getMetaStoreTable(), 
transactionId);
-          catalogTimeline.markEvent("Created Metastore transaction");
-        }
-      }
+      TblTransaction tblTxn = createTableTransactionIfApplicable(update,
+          catalogTimeline, table);
 
       // Collects the cache directive IDs of any cached table/partitions that 
were
       // targeted. A watch on these cache directives is submitted to the
@@ -7263,6 +7248,27 @@ public class CatalogOpExecutor {
       TableName tblName = new TableName(table.getDb().getName(), 
table.getName());
       List<String> errorMessages = Lists.newArrayList();
       HashSet<String> partsToLoadMetadata = null;
+      modification.addCatalogServiceIdentifiersToTable();
+      org.apache.hadoop.hive.metastore.api.Table msTbl =
+          table.getMetaStoreTable().deepCopy();
+      HashSet<String> partsToCreate = new HashSet<>();
+      if (table.getNumClusteringCols() > 0) {
+        try (MetaStoreClient msClient = 
catalog_.getMetaStoreClient(catalogTimeline)) {
+          partsToCreate = 
Sets.newHashSet(update.getUpdated_partitions().keySet());
+          ((HdfsTable) table).load(
+              new HdfsTableLoadParamsBuilder(msClient.getHiveClient(), msTbl)
+                  .reuseMetadata(true)
+                  .setLoadPartitionFileMetadata(false)
+                  .setLoadTableSchema(false)
+                  .setRefreshUpdatedPartitions(false)
+                  .setPartitionsToUpdate(partsToCreate)
+                  .setDebugAction(update.getDebug_action())
+                  .setReason("Preload for INSERT")
+                  .setIsPreLoadForInsert(true)
+                  .setCatalogTimeline(catalogTimeline)
+                  .build());
+        }
+      }
       Collection<? extends FeFsPartition> parts =
           FeCatalogUtils.loadAllPartitions((FeFsTable)table);
       List<FeFsPartition> affectedExistingPartitions = new ArrayList<>();
@@ -7277,129 +7283,22 @@ public class CatalogOpExecutor {
       // partitions in this map. This is used later on when table is reloaded 
to set
       // the createEventId for the partitions.
       Map<String, Long> partitionToEventId = new HashMap<>();
-      modification.addCatalogServiceIdentifiersToTable();
       if (table.getNumClusteringCols() > 0) {
         // Set of all partition names targeted by the insert that need to be 
created
         // in the Metastore (partitions that do not currently exist in the 
catalog).
         // In the BE, we don't currently distinguish between which targeted 
partitions
         // are new and which already exist, so initialize the set with all 
targeted
         // partition names and remove the ones that are found to exist.
-        HashSet<String> partsToCreate =
-            Sets.newHashSet(update.getUpdated_partitions().keySet());
         partsToLoadMetadata = Sets.newHashSet(partsToCreate);
         for (FeFsPartition partition: parts) {
-          String partName = partition.getPartitionName();
-          // Attempt to remove this partition name from partsToCreate. If 
remove
-          // returns true, it indicates the partition already exists.
-          if (partsToCreate.remove(partName)) {
-            affectedExistingPartitions.add(partition);
-            // For existing partitions, we need to unset column_stats_accurate 
to
-            // tell hive the statistics is not accurate any longer.
-            if (partition.getParameters() != null &&  partition.getParameters()
-                .containsKey(StatsSetupConst.COLUMN_STATS_ACCURATE)) {
-              org.apache.hadoop.hive.metastore.api.Partition hmsPartition =
-                  ((HdfsPartition) partition).toHmsPartition();
-              
hmsPartition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
-              hmsPartitionsStatsUnset.add(hmsPartition);
-            }
-            if (partition.isMarkedCached()) {
-              // The partition was targeted by the insert and is also cached. 
Since
-              // data was written to the partition, a watch needs to be placed 
on the
-              // cache directive so the TableLoadingMgr can perform an async
-              // refresh once all data becomes cached.
-              cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId(
-                  partition.getParameters()));
-            }
-          }
-          if (partsToCreate.size() == 0) break;
-        }
-
-        if (!partsToCreate.isEmpty()) {
-          try (MetaStoreClient msClient = 
catalog_.getMetaStoreClient(catalogTimeline)) {
-            org.apache.hadoop.hive.metastore.api.Table msTbl =
-                table.getMetaStoreTable().deepCopy();
-            List<org.apache.hadoop.hive.metastore.api.Partition> hmsParts =
-                Lists.newArrayList();
-            HiveConf hiveConf = new HiveConf(this.getClass());
-            Warehouse warehouse = new Warehouse(hiveConf);
-            for (String partName: partsToCreate) {
-              org.apache.hadoop.hive.metastore.api.Partition partition =
-                  new org.apache.hadoop.hive.metastore.api.Partition();
-              hmsParts.add(partition);
-
-              partition.setDbName(tblName.getDb());
-              partition.setTableName(tblName.getTbl());
-              partition.setValues(MetaStoreUtil.getPartValsFromName(msTbl, 
partName));
-              
partition.setSd(MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd()));
-              partition.getSd().setLocation(msTbl.getSd().getLocation() + "/" 
+ partName);
-              if (AcidUtils.isTransactionalTable(msTbl.getParameters())) {
-                // Self event detection is deprecated for non-transactional 
tables add
-                // partition. So we add catalog service identifiers only for
-                // transactional tables
-                addCatalogServiceIdentifiers(msTbl, partition);
-              }
-              MetastoreShim.updatePartitionStatsFast(partition, msTbl, 
warehouse);
-            }
-
-            // First add_partitions and then alter_partitions the successful 
ones with
-            // caching directives. The reason is that some partitions could 
have been
-            // added concurrently, and we want to avoid caching a partition 
twice and
-            // leaking a caching directive.
-            List<Partition> addedHmsParts = addHmsPartitions(
-                msClient, table, hmsParts, partitionToEventId, true, 
catalogTimeline);
-            for (Partition part: addedHmsParts) {
-              String part_name =
-                  FeCatalogUtils.getPartitionName((FeFsTable)table, 
part.getValues());
-              addedPartitionNames.put(part_name, part.getValues());
-            }
-            if (addedHmsParts.size() > 0) {
-              if (cachePoolName != null) {
-                List<org.apache.hadoop.hive.metastore.api.Partition> 
cachedHmsParts =
-                    Lists.newArrayList();
-                // Submit a new cache directive and update the partition 
metadata with
-                // the directive id.
-                for (org.apache.hadoop.hive.metastore.api.Partition part: 
addedHmsParts) {
-                  try {
-                    
cacheDirIds.add(HdfsCachingUtil.submitCachePartitionDirective(
-                        part, cachePoolName, cacheReplication));
-                    StatsSetupConst.setBasicStatsState(part.getParameters(), 
"false");
-                    cachedHmsParts.add(part);
-                  } catch (ImpalaRuntimeException e) {
-                    String msg = String.format("Partition %s.%s(%s): State: 
Not " +
-                        "cached. Action: Cache manully via 'ALTER TABLE'.",
-                        part.getDbName(), part.getTableName(), 
part.getValues());
-                    LOG.error(msg, e);
-                    errorMessages.add(msg);
-                  }
-                }
-                try {
-                  MetastoreShim.alterPartitions(msClient.getHiveClient(), 
tblName.getDb(),
-                      tblName.getTbl(), cachedHmsParts);
-                } catch (Exception e) {
-                  LOG.error("Failed in alter_partitions: ", e);
-                  // Try to uncache the partitions when the alteration in the 
HMS
-                  // failed.
-                  for (org.apache.hadoop.hive.metastore.api.Partition part:
-                      cachedHmsParts) {
-                    try {
-                      
HdfsCachingUtil.removePartitionCacheDirective(part.getParameters());
-                    } catch (ImpalaException e1) {
-                      String msg = String.format(
-                          "Partition %s.%s(%s): State: Leaked caching 
directive. " +
-                          "Action: Manually uncache directory %s via hdfs " +
-                          "cacheAdmin.", part.getDbName(), part.getTableName(),
-                          part.getValues(), part.getSd().getLocation());
-                      LOG.error(msg, e);
-                      errorMessages.add(msg);
-                    }
-                  }
-                }
-              }
-            }
-          } catch (Exception e) {
-            throw new InternalException("Error adding partitions", e);
-          }
+          updatePartitionMetadataAndCacheStatus(partsToCreate,
+              affectedExistingPartitions, partition, hmsPartitionsStatsUnset,
+              cacheDirIds);
+          if (partsToCreate.isEmpty()) break;
         }
+        addPartitionNamesInMetastore(table, tblName, partsToCreate, 
addedPartitionNames,
+            cachePoolName, cacheReplication, catalogTimeline, 
partitionToEventId,
+            cacheDirIds, msTbl, errorMessages);
 
         // Unset COLUMN_STATS_ACCURATE by calling alter partition to hms.
         if (!hmsPartitionsStatsUnset.isEmpty()) {
@@ -7449,28 +7348,7 @@ public class CatalogOpExecutor {
       }
 
       if (table instanceof FeIcebergTable && update.isSetIceberg_operation()) {
-        FeIcebergTable iceTbl = (FeIcebergTable)table;
-        org.apache.iceberg.Transaction iceTxn = 
IcebergUtil.getIcebergTransaction(iceTbl);
-        IcebergCatalogOpExecutor.execute(iceTbl, iceTxn,
-            update.getIceberg_operation());
-        catalogTimeline.markEvent("Executed Iceberg operation " +
-            update.getIceberg_operation().getOperation());
-        if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) {
-          // Add catalog service id and the 'newCatalogVersion' to the table 
parameters.
-          // This way we can avoid reloading the table on self-events (Iceberg 
generates
-          // an ALTER TABLE statement to set the new metadata_location).
-          modification.registerInflightEvent();
-          IcebergCatalogOpExecutor.addCatalogVersionToTxn(
-              iceTxn, catalog_.getCatalogServiceId(), 
modification.newVersionNumber());
-          catalogTimeline.markEvent("Updated table properties");
-        }
-
-        if (update.isSetDebug_action()) {
-          String debugAction = update.getDebug_action();
-          DebugUtils.executeDebugAction(debugAction, 
DebugUtils.ICEBERG_COMMIT);
-        }
-        iceTxn.commitTransaction();
-        modification.markInflightEventRegistrationComplete();
+        insertIntoIcebergTable(table, update, catalogTimeline, modification);
       }
 
       loadTableMetadata(table, modification.newVersionNumber(), true, false,
@@ -7505,6 +7383,183 @@ public class CatalogOpExecutor {
     return response;
   }
 
+  private TblTransaction 
createTableTransactionIfApplicable(TUpdateCatalogRequest update,
+      EventSequence catalogTimeline, Table table) throws ImpalaException {
+    TblTransaction tblTxn = null;
+    if (update.isSetTransaction_id()) {
+      long transactionId = update.getTransaction_id();
+      Preconditions.checkState(transactionId > 0);
+      try (MetaStoreClient msClient = 
catalog_.getMetaStoreClient(catalogTimeline)) {
+        if (DebugUtils.hasDebugAction(update.getDebug_action(),
+            DebugUtils.UPDATE_CATALOG_ABORT_INSERT_TXN)) {
+          MetastoreShim.abortTransaction(msClient.getHiveClient(), 
transactionId);
+          LOG.info("Aborted txn due to the debug action.");
+        }
+        // Setup transactional parameters needed to do alter table/partitions 
later.
+        // TODO: Could be optimized to possibly save some RPCs, as these 
parameters are
+        //       not always needed + the writeId of the INSERT could be 
probably reused.
+        tblTxn = MetastoreShim.createTblTransaction(
+            msClient.getHiveClient(), table.getMetaStoreTable(), 
transactionId);
+        catalogTimeline.markEvent("Created Metastore transaction");
+      }
+    }
+    return tblTxn;
+  }
+
+  /**
+   * This process creates any missing partitions and clears a table property 
related to
+   * COLUMN_STATS_ACCURATE. It also gathers information about the cache 
directory
+   * IDs.
+   */
+  private void updatePartitionMetadataAndCacheStatus(
+      HashSet<String> pickupExistingPartitions,
+      List<FeFsPartition> affectedExistingPartitions, FeFsPartition partition,
+      List<org.apache.hadoop.hive.metastore.api.Partition> 
hmsPartitionsStatsUnset,
+      List<Long> cacheDirIds) throws ImpalaException {
+    String partName = partition.getPartitionName();
+    // Attempt to remove this partition name from pickupExistingPartitions. If 
remove
+    // returns true, it indicates the partition already exists.
+    if (pickupExistingPartitions.remove(partName)) {
+      affectedExistingPartitions.add(partition);
+      // For existing partitions, we need to unset column_stats_accurate to
+      // tell hive the statistics is not accurate any longer.
+      if (partition.getParameters() != null && partition.getParameters()
+          .containsKey(StatsSetupConst.COLUMN_STATS_ACCURATE)) {
+        org.apache.hadoop.hive.metastore.api.Partition hmsPartition =
+            ((HdfsPartition) partition).toHmsPartition();
+        
hmsPartition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+        hmsPartitionsStatsUnset.add(hmsPartition);
+      }
+      if (partition.isMarkedCached()) {
+        // The partition was targeted by the insert and is also cached. Since
+        // data was written to the partition, a watch needs to be placed on the
+        // cache directive so the TableLoadingMgr can perform an async
+        // refresh once all data becomes cached.
+        cacheDirIds.add(HdfsCachingUtil.getCacheDirectiveId(
+            partition.getParameters()));
+      }
+    }
+  }
+
+  private void addPartitionNamesInMetastore(Table table, TableName tblName,
+      HashSet<String> partsToCreate, Map<String, List<String>> 
addedPartitionNames,
+      String cachePoolName, Short cacheReplication, EventSequence 
catalogTimeline,
+      Map<String, Long> partitionToEventId, List<Long> cacheDirIds,
+      org.apache.hadoop.hive.metastore.api.Table msTbl, List<String> 
errorMessages)
+      throws ImpalaException {
+    if (!partsToCreate.isEmpty()) {
+      try (MetaStoreClient msClient = 
catalog_.getMetaStoreClient(catalogTimeline)) {
+        List<org.apache.hadoop.hive.metastore.api.Partition> hmsParts =
+            Lists.newArrayList();
+        HiveConf hiveConf = new HiveConf(this.getClass());
+        Warehouse warehouse = new Warehouse(hiveConf);
+        for (String partName: partsToCreate) {
+          org.apache.hadoop.hive.metastore.api.Partition partition =
+              new org.apache.hadoop.hive.metastore.api.Partition();
+          hmsParts.add(partition);
+
+          partition.setDbName(tblName.getDb());
+          partition.setTableName(tblName.getTbl());
+          partition.setValues(MetaStoreUtil.getPartValsFromName(msTbl, 
partName));
+          
partition.setSd(MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd()));
+          partition.getSd().setLocation(msTbl.getSd().getLocation() + "/" + 
partName);
+          if (AcidUtils.isTransactionalTable(msTbl.getParameters())) {
+            // Self event detection is deprecated for non-transactional tables 
add
+            // partition. So we add catalog service identifiers only for
+            // transactional tables
+            addCatalogServiceIdentifiers(msTbl, partition);
+          }
+          MetastoreShim.updatePartitionStatsFast(partition, msTbl, warehouse);
+        }
+
+        // First add_partitions and then alter_partitions the successful ones 
with
+        // caching directives. The reason is that some partitions could have 
been
+        // added concurrently, and we want to avoid caching a partition twice 
and
+        // leaking a caching directive.
+        List<Partition> addedHmsParts = addHmsPartitions(
+            msClient, table, hmsParts, partitionToEventId, true, 
catalogTimeline);
+        for (Partition part: addedHmsParts) {
+          String part_name =
+              FeCatalogUtils.getPartitionName((FeFsTable)table, 
part.getValues());
+          addedPartitionNames.put(part_name, part.getValues());
+        }
+        if (addedHmsParts.size() > 0) {
+          if (cachePoolName != null) {
+            List<org.apache.hadoop.hive.metastore.api.Partition> 
cachedHmsParts =
+                Lists.newArrayList();
+            // Submit a new cache directive and update the partition metadata 
with
+            // the directive id.
+            for (org.apache.hadoop.hive.metastore.api.Partition part: 
addedHmsParts) {
+              try {
+                cacheDirIds.add(HdfsCachingUtil.submitCachePartitionDirective(
+                    part, cachePoolName, cacheReplication));
+                StatsSetupConst.setBasicStatsState(part.getParameters(), 
"false");
+                cachedHmsParts.add(part);
+              } catch (ImpalaRuntimeException e) {
+                String msg = String.format("Partition %s.%s(%s): State: Not " +
+                        "cached. Action: Cache manully via 'ALTER TABLE'.",
+                    part.getDbName(), part.getTableName(), part.getValues());
+                LOG.error(msg, e);
+                errorMessages.add(msg);
+              }
+            }
+            try {
+              MetastoreShim.alterPartitions(msClient.getHiveClient(), 
tblName.getDb(),
+                  tblName.getTbl(), cachedHmsParts);
+            } catch (Exception e) {
+              LOG.error("Failed in alter_partitions: ", e);
+              // Try to uncache the partitions when the alteration in the HMS
+              // failed.
+              for (org.apache.hadoop.hive.metastore.api.Partition part:
+                  cachedHmsParts) {
+                try {
+                  
HdfsCachingUtil.removePartitionCacheDirective(part.getParameters());
+                } catch (ImpalaException e1) {
+                  String msg = String.format(
+                      "Partition %s.%s(%s): State: Leaked caching directive. " 
+
+                          "Action: Manually uncache directory %s via hdfs " +
+                          "cacheAdmin.", part.getDbName(), part.getTableName(),
+                      part.getValues(), part.getSd().getLocation());
+                  LOG.error(msg, e);
+                  errorMessages.add(msg);
+                }
+              }
+            }
+          }
+        }
+      } catch (Exception e) {
+        throw new InternalException("Error adding partitions", e);
+      }
+    }
+  }
+
+  private void insertIntoIcebergTable(Table table, TUpdateCatalogRequest 
update,
+      EventSequence catalogTimeline, InProgressTableModification modification)
+      throws ImpalaException {
+    FeIcebergTable iceTbl = (FeIcebergTable)table;
+    org.apache.iceberg.Transaction iceTxn = 
IcebergUtil.getIcebergTransaction(iceTbl);
+    IcebergCatalogOpExecutor.execute(iceTbl, iceTxn,
+        update.getIceberg_operation());
+    catalogTimeline.markEvent("Executed Iceberg operation " +
+        update.getIceberg_operation().getOperation());
+    if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) {
+      // Add catalog service id and the 'newCatalogVersion' to the table 
parameters.
+      // This way we can avoid reloading the table on self-events (Iceberg 
generates
+      // an ALTER TABLE statement to set the new metadata_location).
+      modification.registerInflightEvent();
+      IcebergCatalogOpExecutor.addCatalogVersionToTxn(
+          iceTxn, catalog_.getCatalogServiceId(), 
modification.newVersionNumber());
+      catalogTimeline.markEvent("Updated table properties");
+    }
+
+    if (update.isSetDebug_action()) {
+      String debugAction = update.getDebug_action();
+      DebugUtils.executeDebugAction(debugAction, DebugUtils.ICEBERG_COMMIT);
+    }
+    iceTxn.commitTransaction();
+    modification.markInflightEventRegistrationComplete();
+  }
+
   /**
    * Populates insert event data and calls fireInsertEvents() if external event
    * processing is enabled.
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 941bd31e7..4f37ca1b1 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1276,6 +1276,36 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     tables_removed_after = EventProcessorUtils.get_int_metric("tables-removed")
     assert tables_removed_after == tables_removed_before
 
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--hms_event_polling_interval_s=0")
+  def test_ep_delay_metadata_reload_for_insert(self, unique_database):
+    """IMPALA-12277: This test verifies that insert operation on a partitioned 
table
+    succeeds if the partition is dropped previously externally with the event 
processor
+    being lagging or not active. Impala should create a new partition in this 
case."""
+
+    def verify_partition(is_transactional, test_table="insert_part_reload"):
+      query = " ".join(["create table {}.{} (i int)", " partitioned by (year 
int) ",
+          self._get_transactional_tblproperties(is_transactional)])
+      self.client.execute(query.format(unique_database, test_table))
+      self.client.execute("insert into {}.{} partition(year) values (0,2024), 
(0,2022)"
+          .format(unique_database, test_table))
+      self.run_stmt_in_hive("alter table {}.{} drop partition(year=2024)"
+          .format(unique_database, test_table))
+      self.run_stmt_in_hive("alter table {}.{} add partition(year=2023)"
+          .format(unique_database, test_table))
+      self.run_stmt_in_hive("insert into {}.{} partition(year=2021) values (1)"
+          .format(unique_database, test_table))
+      self.client.execute(
+        "insert into {}.{} partition(year) values (0,2023), (0,2024), (0,2022)"
+        .format(unique_database, test_table))
+      EventProcessorUtils.wait_for_event_processing(self)
+      results = self.client.execute(
+        "select DISTINCT(year) from {}.{} order by year desc"
+        .format(unique_database, test_table))
+      assert results.data == ["2024", "2023", "2022", "2021"]
+      self.client.execute("drop table {}.{}".format(unique_database, 
test_table))
+    verify_partition(True)
+    verify_partition(False)
 
 @SkipIfFS.hive
 class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):


Reply via email to