This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.4.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0140a15a044e719e052e2fedfe01efe81f65fe06
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Tue May 14 18:40:04 2024 -0700

    IMPALA-12680: Fix NullPointerException during AlterTableAddPartitions
    
    When global INVALIDATE METADATA is run at the same time while
    AlterTableAddPartition statement is being run, a precondition check in
    addHmsPartitions() could lead to NullPointerException. This happens
    due to Map<String, Long> partitionToEventId being initialized to null
    when event processor is not active.
    
    We should always initialize 'partitionToEventId' to empty hash map
    regardless of the state of event processor. If the event processor is
    not active, then addHmsPartitions() adds partitions that are directly
    fetched from metastore.
    
    Note: Also, addressed the same issue that could potentially happen in
    AlterTableRecoverPartitions.
    
    Testing:
    - Verified manually that NullPointerException scenario is avoided.
    - Added a unit test to verify the above use case.
    
    Change-Id: I730fed311ebc09762dccc152d9583d5394b0b9b3
    Reviewed-on: http://gerrit.cloudera.org:8080/21430
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 28 ++++++++------
 .../java/org/apache/impala/util/DebugUtils.java    |  3 ++
 .../events/MetastoreEventsProcessorTest.java       | 45 +++++++++++++++++++++-
 3 files changed, 64 insertions(+), 12 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 6b5ffa01d..17210b657 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -145,6 +145,7 @@ import 
org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
+import 
org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
 import org.apache.impala.catalog.events.MetastoreNotificationException;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.monitor.CatalogOperationTracker;
@@ -1270,7 +1271,7 @@ public class CatalogOpExecutor {
             format = params.getSet_file_format_params().file_format;
           }
           alterTableAddPartitions(tbl, params.getAdd_partition_params(), 
format,
-              catalogTimeline, modification);
+              catalogTimeline, modification, debugAction);
           reloadMetadata = false;
           responseSummaryMsg = "New partition has been added to the table.";
           break;
@@ -4600,8 +4601,8 @@ public class CatalogOpExecutor {
    */
   private void alterTableAddPartitions(Table tbl,
       TAlterTableAddPartitionParams addPartParams, THdfsFileFormat fileFormat,
-      EventSequence catalogTimeline, InProgressTableModification modification)
-      throws ImpalaException {
+      EventSequence catalogTimeline, InProgressTableModification modification,
+      String debugAction) throws ImpalaException {
     Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
 
     TableName tableName = tbl.getTableName();
@@ -4639,10 +4640,10 @@ public class CatalogOpExecutor {
 
     List<Partition> difference = null;
     try (MetaStoreClient msClient = 
catalog_.getMetaStoreClient(catalogTimeline)) {
-      Map<String, Long> partitionToEventId = 
catalog_.isEventProcessingActive() ?
-          Maps.newHashMap() : null;
+      Map<String, Long> partitionToEventId = Maps.newHashMap();
       List<Partition> addedHmsPartitions = 
addHmsPartitionsInTransaction(msClient,
-          tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists, 
catalogTimeline);
+          tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists, 
catalogTimeline,
+          debugAction);
       // Handle HDFS cache. This is done in a separate round bacause we have 
to apply
       // caching only to newly added partitions.
       alterTableCachePartitions(msTbl, msClient, tableName, addedHmsPartitions,
@@ -5299,7 +5300,7 @@ public class CatalogOpExecutor {
    */
   private List<Partition> addHmsPartitions(MetaStoreClient msClient,
       Table tbl, List<Partition> allHmsPartitionsToAdd,
-      @Nullable Map<String, Long> partitionToEventId, boolean ifNotExists,
+      Map<String, Long> partitionToEventId, boolean ifNotExists,
       EventSequence catalogTimeline) throws ImpalaRuntimeException, 
CatalogException {
     long eventId = getCurrentEventId(msClient, catalogTimeline);
     List<Partition> addedHmsPartitions = Lists
@@ -5332,7 +5333,6 @@ public class CatalogOpExecutor {
           // add_partitions call above.
           addedHmsPartitions.addAll(addedPartitions);
         } else {
-          Preconditions.checkNotNull(partitionToEventId);
           addedHmsPartitions.addAll(partitionToEventSubMap.keySet());
           // we cannot keep a mapping of Partition to event ids because the
           // partition objects are changed later in the cachePartitions code 
path.
@@ -5361,8 +5361,12 @@ public class CatalogOpExecutor {
    */
   private List<Partition> addHmsPartitionsInTransaction(MetaStoreClient 
msClient,
       Table tbl, List<Partition> partitions, Map<String, Long> 
partitionToEventId,
-      boolean ifNotExists, EventSequence catalogTimeline) throws 
ImpalaException {
+      boolean ifNotExists, EventSequence catalogTimeline, String debugAction)
+      throws ImpalaException {
     if 
(!AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) {
+      if (DebugUtils.hasDebugAction(debugAction, 
DebugUtils.ENABLE_EVENT_PROCESSOR)) {
+        catalog_.startEventsProcessor();
+      }
       return addHmsPartitions(msClient, tbl, partitions, partitionToEventId,
           ifNotExists, catalogTimeline);
     }
@@ -6302,10 +6306,12 @@ public class CatalogOpExecutor {
     }
 
     // Add partitions to metastore.
-    Map<String, Long> partitionToEventId = catalog_.isEventProcessingActive() ?
-        Maps.newHashMap() : null;
+    Map<String, Long> partitionToEventId = Maps.newHashMap();
     String annotation = String.format("Recovering %d partitions for %s",
         hmsPartitions.size(), tbl.getFullName());
+    if (DebugUtils.hasDebugAction(debugAction, 
DebugUtils.ENABLE_EVENT_PROCESSOR)) {
+      catalog_.startEventsProcessor();
+    }
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation);
         MetaStoreClient msClient = 
catalog_.getMetaStoreClient(catalogTimeline)) {
       List<Partition> addedPartitions = addHmsPartitions(msClient, tbl, 
hmsPartitions,
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index f1b150fbc..67b05e4e4 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -83,6 +83,9 @@ public class DebugUtils {
   // debug action label for introducing delay in loading table metadata.
   public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
 
+  // debug action to enable eventProcessor
+  public static final String ENABLE_EVENT_PROCESSOR = "enable_event_processor";
+
   /**
    * Returns true if the label of action is set in the debugActions
    */
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 69b8a084d..22fd09fb5 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -3962,6 +3962,40 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  @Test
+  public void testAlterTableWithEpDisabled() throws Exception {
+    try {
+      createDatabaseFromImpala(TEST_DB_NAME, null);
+      String testTable = "testAlterTableNoError";
+      createTableFromImpala(TEST_DB_NAME, testTable, true);
+      eventsProcessor_.processEvents();
+      // set EP to paused state and execute Alter table add partition query
+      eventsProcessor_.pause();
+      long numberOfSelfEventsBefore =
+          eventsProcessor_.getMetrics()
+              
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount();
+      TPartitionDef partitionDef = new TPartitionDef();
+      partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "100"));
+      partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "200"));
+      alterTableAddPartition(TEST_DB_NAME, testTable, partitionDef,
+          "enable_event_processor");
+      eventsProcessor_.processEvents();
+      assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+      long numberOfSelfEventsAfter =
+          eventsProcessor_.getMetrics()
+              
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount();
+      // expect ADD_PARTITION event to be skipped as self-event
+      assertEquals("Unexpected self events skipped: ", numberOfSelfEventsAfter,
+          numberOfSelfEventsBefore + 1);
+    } catch (NullPointerException ex) {
+      throw new CatalogException("Exception occured while applying 
AlterTableEvent", ex);
+    } finally {
+      if (eventsProcessor_.getStatus() != EventProcessorStatus.ACTIVE) {
+        eventsProcessor_.start();
+      }
+    }
+  }
+
   private void createDatabase(String catName, String dbName,
       Map<String, String> params) throws TException {
     try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
@@ -4259,8 +4293,17 @@ public class MetastoreEventsProcessorTest {
    */
   private void alterTableAddPartition(
       String dbName, String tblName, TPartitionDef partitionDef) throws 
ImpalaException {
+    alterTableAddPartition(dbName, tblName,partitionDef, null);
+  }
+
+  private void alterTableAddPartition(String dbName, String tblName,
+      TPartitionDef partitionDef, String debugActions) throws ImpalaException {
     TDdlExecRequest req = new TDdlExecRequest();
-    req.setQuery_options(new TDdlQueryOptions());
+    TDdlQueryOptions queryOptions = new TDdlQueryOptions();
+    if (debugActions != null) {
+      queryOptions.setDebug_action(debugActions);
+    }
+    req.setQuery_options(queryOptions);
     req.setDdl_type(TDdlType.ALTER_TABLE);
     TAlterTableParams alterTableParams = new TAlterTableParams();
     alterTableParams.setTable_name(new TTableName(dbName, tblName));

Reply via email to