This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f18b21c1ae398bd66783e7a901c8984c186c5874
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Tue Mar 12 21:52:46 2024 -0700

    IMPALA-12856: Event processor should ignore processing partition
    with empty partition values
    
    While processing partition related events, Event Processor (EP) is
    facing IllegalStateException if the partition fetched from HMS has
    empty partition values. Even though this is a bug in HMS which returns
    partitions with empty values, EP should ignore such partitions instead
    of throwing IllegalStateException.
    
    Note: Added a debug option 'mock_empty_partition_values' to add
    malformed partition objects.
    
    Testing:
    - Manually verified the test provided in jira details in local env.
    - Added unit test to return empty partition values and verify EP state.
    
    Change-Id: Id2469930ccd74948325f1723bd8b2bd6aad02d09
    Reviewed-on: http://gerrit.cloudera.org:8080/21143
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  |  43 +++++++--
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../apache/impala/service/CatalogOpExecutor.java   |   6 +-
 .../java/org/apache/impala/util/DebugUtils.java    |   5 +
 .../java/org/apache/impala/util/MetaStoreUtil.java |  24 +++++
 .../events/MetastoreEventsProcessorTest.java       | 102 +++++++++++++++++++++
 6 files changed, 175 insertions(+), 9 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 85f4f5ae9..734276f2f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2663,8 +2663,12 @@ public class HdfsTable extends Table implements 
FeFsTable {
   public List<LiteralExpr> getTypeCompatiblePartValues(List<String> values) {
     List<LiteralExpr> result = new ArrayList<>();
     List<Column> partitionColumns = getClusteringColumns();
-    Preconditions.checkState(partitionColumns.size() == values.size());
-    for (int i=0; i<partitionColumns.size(); ++i) {
+    if (partitionColumns.size() != values.size()) {
+      LOG.error("Unmatched numbers of partition values: expected={}, actual={} 
for " +
+          "table: {}." , partitionColumns.size(), values.size(), 
getFullName());
+      return null;
+    }
+    for (int i = 0; i < partitionColumns.size(); ++i) {
       Pair<String, LiteralExpr> pair = getPartitionExprFromValue(values.get(i),
           partitionColumns.get(i).getType());
       if (pair == null) {
@@ -2894,12 +2898,23 @@ public class HdfsTable extends Table implements 
FeFsTable {
     return result;
   }
 
+  /**
+   * Reloads the HdfsPartitions which correspond to the given partNames. 
Returns the
+   * number of partitions which were reloaded. This method also reloads file 
metadata
+   * of all the partitions for the given partNames
+   */
+  public int reloadPartitionsFromNames(IMetaStoreClient client,
+      List<String> partNames, String reason) throws CatalogException {
+    return reloadPartitionsFromNames(-1L, client, partNames, reason,
+        FileMetadataLoadOpts.FORCE_LOAD);
+  }
+
   /**
    * Reloads the HdfsPartitions which correspond to the given partNames. 
Returns the
    * number of partitions which were reloaded.
    * fileMetadataLoadOpts: decides how to reload file metadata for the 
partitions
    */
-  public int reloadPartitionsFromNames(IMetaStoreClient client,
+  public int reloadPartitionsFromNames(long eventId, IMetaStoreClient client,
       List<String> partNames, String reason, FileMetadataLoadOpts 
fileMetadataLoadOpts)
       throws CatalogException {
     Preconditions.checkState(partNames != null && !partNames.isEmpty());
@@ -2910,6 +2925,14 @@ public class HdfsTable extends Table implements 
FeFsTable {
     try {
       hmsPartitions = MetaStoreUtil.fetchPartitionsByName(client, partNames, 
msTable_);
       for (Partition partition : hmsPartitions) {
+        if (partition.getValues().isEmpty()) {
+          LOG.error("EventId: {}, EventType: {}, Received partition with empty 
values:" +
+              " {}.\nThis table will be invalidated.", eventId, reason, 
partition);
+          throw new InvalidObjectException(String.format("Unmatched numbers of 
" +
+              "partition values: expected=%d, actual=%d for table:%s",
+              getClusteringColumns().size(), partition.getValues().size(),
+              getFullName()));
+        }
         List<LiteralExpr> partExprs = 
getTypeCompatiblePartValues(partition.getValues());
         HdfsPartition hdfsPartition = getPartition(partExprs);
         if (hdfsPartition != null) {
@@ -2934,13 +2957,14 @@ public class HdfsTable extends Table implements 
FeFsTable {
   /**
    * Reload the HdfsPartitions which correspond to the given partitions.
    *
+   * @param eventId of the event from metastore
    * @param client is the HMS client to be used.
    * @param partsFromEvent Partition objects from the event.
    * @param loadFileMetadata If true, file metadata will be reloaded.
    * @param reason Reason for reloading the partitions for logging purposes.
    * @return the number of partitions which were reloaded.
    */
-  public int reloadPartitionsFromEvent(IMetaStoreClient client,
+  public int reloadPartitionsFromEvent(long eventId, IMetaStoreClient client,
       List<Partition> partsFromEvent, boolean loadFileMetadata, String reason)
       throws CatalogException {
     Preconditions.checkArgument(partsFromEvent != null
@@ -2950,13 +2974,20 @@ public class HdfsTable extends Table implements 
FeFsTable {
     LOG.info("Reloading partition metadata for table: {} ({})", getFullName(), 
reason);
     Map<Partition, HdfsPartition> hmsPartToHdfsPart = new HashMap<>();
     for (Partition partition : partsFromEvent) {
+      // If the partition values are empty, ignore the event as partition 
cannot be
+      // refetched
+      if (partition.getValues().isEmpty()) {
+        LOG.error("EventId: {}, EventType: {}, Received partition with empty 
values: " +
+            "{}.\nIgnoring reloading the partition.", eventId, reason, 
partition);
+        continue;
+      }
       List<LiteralExpr> partExprs = 
getTypeCompatiblePartValues(partition.getValues());
       HdfsPartition hdfsPartition = getPartition(partExprs);
       // only reload partitions that have more recent write id
       if (hdfsPartition != null
           && (!AcidUtils.isTransactionalTable(msTable_.getParameters())
-                 || hdfsPartition.getWriteId()
-                     <= MetastoreShim.getWriteIdFromMSPartition(partition))) {
+              || hdfsPartition.getWriteId()
+                  <= MetastoreShim.getWriteIdFromMSPartition(partition))) {
         hmsPartToHdfsPart.put(partition, hdfsPartition);
       }
     }
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index e00985df8..29eeaf3e4 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -467,6 +467,10 @@ public class BackendConfig {
 
   public String debugActions() { return backendCfg_.debug_actions; }
 
+  public void setDebugActions(String debugActions) {
+    backendCfg_.debug_actions = debugActions;
+  }
+
   public boolean isInvalidateMetadataOnEventProcessFailureEnabled() {
     return backendCfg_.invalidate_metadata_on_event_processing_failure;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index c7de42284..954f87b4a 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -5068,7 +5068,7 @@ public class CatalogOpExecutor {
       // the partitions from HMS.
       int numOfPartsReloaded;
       try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
-        numOfPartsReloaded = hdfsTable.reloadPartitionsFromNames(
+        numOfPartsReloaded = hdfsTable.reloadPartitionsFromNames(eventId,
             metaStoreClient.getHiveClient(), partNames, reason, 
fileMetadataLoadOpts);
       }
       modification.updateTableCatalogVersion();
@@ -5151,7 +5151,7 @@ public class CatalogOpExecutor {
       HdfsTable hdfsTable = (HdfsTable) table;
       int numOfPartsReloaded;
       try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
-        numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent(
+        numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent(eventId,
             metaStoreClient.getHiveClient(), partsFromEvent, false, reason);
       }
       modification.updateTableCatalogVersion();
@@ -5265,7 +5265,7 @@ public class CatalogOpExecutor {
           MutableValidWriteIdList.WriteIdStatus.COMMITTED);
       int numOfPartsReloaded;
       try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
-        numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent(
+        numOfPartsReloaded = hdfsTable.reloadPartitionsFromEvent(eventId,
             metaStoreClient.getHiveClient(), partsToRefresh, true, reason);
       }
       modification.updateTableCatalogVersion();
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 141faf80e..95bc7f390 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -90,6 +90,11 @@ public class DebugUtils {
   public static final String WAIT_SYNC_DDL_VER_DELAY =
       "catalogd_wait_sync_ddl_version_delay";
 
+  // debug action label for mock that metastore returns partitions with empty 
values
+  // This action is required for repro in the unit test for IMPALA-12856 to 
mimic the
+  // behavior of metastore returning partitions with empty values
+  public static final String MOCK_EMPTY_PARTITION_VALUES = 
"mock_empty_partition_values";
+
   /**
    * Returns true if the label of action is set in the debugActions
    */
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java 
b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index 0e667b4b4..03d4756b5 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -37,6 +37,7 @@ import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.service.BackendConfig;
 import org.apache.thrift.TException;
 
 import org.apache.impala.thrift.TColumn;
@@ -202,12 +203,35 @@ public class MetaStoreUtil {
       // Fetch these partitions from the metastore.
       List<Partition> partitions = client.getPartitionsByNames(
           msTbl.getDbName(), msTbl.getTableName(), partsToFetch);
+      boolean foundEmptyPartitionVals = false;
+      for (Partition partition : partitions) {
+        if (partition.getValues().isEmpty()) {
+          LOG.error("Received partition with empty values: {}.\nRefetching the 
" +
+              "partition.", partition);
+          foundEmptyPartitionVals = true;
+          break;
+        }
+      }
+      if (foundEmptyPartitionVals) {
+        // Refetch these partitions from metastore because of empty values list
+        partitions = client.getPartitionsByNames(msTbl.getDbName(),
+            msTbl.getTableName(), partsToFetch);
+      }
       replaceSchemaFromTable(partitions, msTbl);
       fetchedPartitions.addAll(partitions);
       numDone += partitions.size();
       LOG.info("Fetched {}/{} partitions for table {}.{}", numDone, 
partNames.size(),
           msTbl.getDbName(), msTbl.getTableName());
     }
+    // This action is required for repro in the unit test 
(MetastoreEventsProcessorTest)
+    // for IMPALA-12856 to mimic the behavior of metastore returning 
partitions with
+    // empty values
+    if (DebugUtils.hasDebugAction(BackendConfig.INSTANCE.debugActions(),
+        DebugUtils.MOCK_EMPTY_PARTITION_VALUES)) {
+      for (org.apache.hadoop.hive.metastore.api.Partition msPart : 
fetchedPartitions) {
+        msPart.getValues().clear();
+      }
+    }
     return fetchedPartitions;
   }
 
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 2b55c3548..db35349d3 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -163,6 +163,7 @@ import org.apache.impala.thrift.TTypeNode;
 import org.apache.impala.thrift.TTypeNodeType;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdatedPartition;
+import org.apache.impala.util.DebugUtils;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.NoOpEventSequence;
@@ -4008,6 +4009,107 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  /**
+   * testEmptyPartitionValues() is a regression test for IMPALA-12856 to mimic 
the
+   * behavior of metastore returning partitions with empty values
+   * @throws Exception
+   */
+  @Test
+  public void testEmptyPartitionValues() throws Exception {
+    String prevFlag = BackendConfig.INSTANCE.debugActions();
+    try {
+      String tblName = "test_empty";
+      String managedTbl = "managedTblEmptyPartVals";
+      createDatabase(TEST_DB_NAME, null);
+      eventsProcessor_.processEvents();
+      createTable(tblName, true);
+      createTransactionalTable(TEST_DB_NAME, managedTbl, true);
+      eventsProcessor_.processEvents();
+      loadTable(tblName);
+      loadTable(managedTbl);
+      List<List<String>> partVals = new ArrayList<>(1);
+      partVals.add(Arrays.asList("1"));
+      addPartitions(TEST_DB_NAME, tblName, partVals);
+      addPartitions(TEST_DB_NAME, managedTbl, partVals);
+      eventsProcessor_.processEvents();
+      // Fire a reload event and process partition with empty values
+      MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(), true,
+          Arrays.asList("1"), TEST_DB_NAME, tblName, Collections.emptyMap());
+      
BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES);
+      processEventsAndVerifyStatus(prevFlag);
+      // insert partition event
+      simulateFiringInsertEvent(tblName, false);
+      
BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES);
+      processEventsAndVerifyStatus(prevFlag);
+      // Alter partition event, managed and external table
+      String location = "/path/to/partition";
+      alterPartitions(tblName, partVals, location);
+      alterPartitions(managedTbl, partVals, location);
+      
BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES);
+      processEventsAndVerifyStatus(prevFlag);
+      // Batch partition event
+      partVals.clear();
+      partVals.add(Arrays.asList("2"));
+      partVals.add(Arrays.asList("3"));
+      addPartitions(TEST_DB_NAME, tblName, partVals);
+      eventsProcessor_.processEvents();
+      simulateFiringInsertEvent(tblName, false);
+      
BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES);
+      processEventsAndVerifyStatus(prevFlag);
+      // commit compaction event
+      addPartitions(TEST_DB_NAME, managedTbl, partVals);
+      simulateFiringInsertEvent(managedTbl, true);
+      simulateFiringInsertEvent(managedTbl, true);
+      
BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES);
+      eventsProcessor_.processEvents();
+      // Run hive query to trigger compaction
+      try (HiveJdbcClientPool jdbcClientPool = HiveJdbcClientPool.create(1);
+           HiveJdbcClientPool.HiveJdbcClient hiveClient = 
jdbcClientPool.getClient()) {
+        hiveClient.executeSql(
+            "alter table " + TEST_DB_NAME + '.' + managedTbl +
+                " partition(p1=1) compact 'minor' and wait");
+      }
+      
BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES);
+      processEventsAndVerifyStatus(prevFlag);
+    } finally {
+      BackendConfig.INSTANCE.setDebugActions(prevFlag);
+    }
+  }
+
+  private void simulateFiringInsertEvent(String tblName, boolean 
isTransactional)
+      throws Exception {
+    org.apache.hadoop.hive.metastore.api.Table msTbl;
+    List<Partition> partitions;
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      msTbl = metaStoreClient.getHiveClient().getTable(TEST_DB_NAME, tblName);
+      partitions = MetastoreShim
+          .getPartitions(metaStoreClient.getHiveClient(), TEST_DB_NAME, 
tblName);
+    }
+    assertNotNull(msTbl);
+    assertNotNull(partitions);
+    if (isTransactional) {
+      for (Partition part : partitions) {
+        try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+          long txnId = MetastoreShim.openTransaction(client.getHiveClient());
+          long writeId = MetastoreShim.allocateTableWriteId(
+              client.getHiveClient(), txnId, TEST_DB_NAME, tblName);
+          simulateInsertIntoTransactionalTableFromFS(msTbl, part, 1, txnId, 
writeId);
+          MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+        }
+      }
+    } else {
+      for (Partition part : partitions) {
+        simulateInsertIntoTableFromFS(msTbl, 1, part, false);
+      }
+    }
+  }
+
+  private void processEventsAndVerifyStatus(String prevFlag) {
+    eventsProcessor_.processEvents();
+    assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+    BackendConfig.INSTANCE.setDebugActions(prevFlag);
+  }
+
   public void testAllocWriteIdEvent(String tblName, boolean isPartitioned,
       boolean isLoadTable) throws TException, TransactionException, 
CatalogException {
     createDatabase(TEST_DB_NAME, null);

Reply via email to