This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1fda3e3d6f8f69802d68f041cb90ac6c5e93a3cd
Author: maxwellguo <[email protected]>
AuthorDate: Wed May 29 22:11:49 2024 +0800

    IMPALA-12771: Impala catalogd events-skipped may mark the wrong number
    
    This patch modified the description of events-skipped metric which missing
    the description of ATLER event for DB and Table.
    Besides there are some cases where event-skipped metric is incorrectly 
marked,
    most are for transactional table:
    1. The metric should increase when processing AddPartitionEvent on the
       transactional table which is not in loaded state or when db is not found.
    2. The metric should increase when processing AlterPartitionEvent on the
       transactional table which is not in loaded status or when db is not 
found.
    3. The metric should increase when processing AlterPartitionEvent if the
       event is a trivial event and can be skipped.
    4. When processing BatchPartitionEvent, the number of skipped event is not
       marked before doing any real work.
    5. The metric should increase when processing DropPartitionEvent on the
       transactional table which is not in loaded status or when db is not 
found.
    
    testing:
    - add test cases for add database, creata table, alter table, add partition
      , create transactional table,alter partition for transactional and test
      the skipped metric number.
    
    Change-Id: I7aeb04e999b82187eb138c0b643ead259da22f1a
    Reviewed-on: http://gerrit.cloudera.org:8080/21045
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/events/MetastoreEvents.java     | 126 +++++++++++++++------
 .../catalog/events/MetastoreEventsProcessor.java   |   5 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  21 ++--
 .../events/MetastoreEventsProcessorTest.java       |  75 +++++++++++-
 4 files changed, 181 insertions(+), 46 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 79592cca6..10d799426 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1067,7 +1067,7 @@ public class MetastoreEvents {
      * Helper function to initiate a table reload on Catalog. Re-throws the 
exception if
      * the catalog operation throws.
      */
-    protected void reloadTableFromCatalog(String operation, boolean 
isTransactional)
+    protected boolean reloadTableFromCatalog(String operation, boolean 
isTransactional)
         throws CatalogException {
       try {
         if (!catalog_.reloadTableIfExists(dbName_, tblName_,
@@ -1076,11 +1076,7 @@ public class MetastoreEvents {
           debugLog("Automatic refresh on table {} failed as the table "
                   + "either does not exist anymore or is not in loaded state.",
               getFullyQualifiedTblName());
-          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-          debugLog("Incremented skipped metric to "
-              + 
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
-                    .getCount());
-          return;
+          return false;
         }
       } catch (TableLoadingException | DatabaseNotFoundException e) {
         // there could be many reasons for receiving a tableLoading exception,
@@ -1089,11 +1085,12 @@ public class MetastoreEvents {
         // we can do here other than log it appropriately.
         debugLog("Table {} was not refreshed due to error {}",
             getFullyQualifiedTblName(), e.getMessage());
-        return;
+        return false;
       }
       String tblStr = isTransactional ? "transactional table" : "table";
       infoLog("Refreshed {} {}", tblStr, getFullyQualifiedTblName());
       
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES).inc();
+      return true;
     }
 
     /**
@@ -1103,10 +1100,12 @@ public class MetastoreEvents {
      * @param fileMetadataLoadOpts: describes how to reload file metadata for 
partitions
      * @param reason The reason for reload operation which is used for logging 
by
      *               catalogd.
+     * @param batch flag to show if the function is called by the batch event 
process
      */
     protected void reloadPartitions(List<Partition> partitions,
-        FileMetadataLoadOpts fileMetadataLoadOpts, String reason)
+        FileMetadataLoadOpts fileMetadataLoadOpts, String reason, boolean 
batch)
         throws CatalogException {
+      int skippedEvent = batch ? partitions.size() : 1;
       try {
         int numPartsRefreshed = 
catalogOpExecutor_.reloadPartitionsIfExist(getEventId(),
             getEventType().toString(), dbName_, tblName_, partitions, reason,
@@ -1114,13 +1113,23 @@ public class MetastoreEvents {
         if (numPartsRefreshed > 0) {
           
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
               .inc(numPartsRefreshed);
+        } else if (numPartsRefreshed == -1) {
+          debugLog("Ignoring the event since table {} is not loadded or " +
+              "table was removed latter in catalog or table is synced."
+              , getFullyQualifiedTblName());
+          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+              .inc(skippedEvent);
         }
       } catch (TableNotLoadedException e) {
         debugLog("Ignoring the event since table {} is not loaded",
             getFullyQualifiedTblName());
+        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+            .inc(skippedEvent);
       } catch (DatabaseNotFoundException | TableNotFoundException e) {
         debugLog("Ignoring the event since table {} is not found",
             getFullyQualifiedTblName());
+        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+            .inc(skippedEvent);
       }
     }
 
@@ -1139,13 +1148,17 @@ public class MetastoreEvents {
         if (numPartsRefreshed > 0) {
           
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
               .inc(numPartsRefreshed);
+        } else if (numPartsRefreshed == -1) {
+          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         }
       } catch (TableNotLoadedException e) {
         debugLog("Ignoring the event since table {} is not loaded",
             getFullyQualifiedTblName());
+        
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
       } catch (DatabaseNotFoundException | TableNotFoundException e) {
         debugLog("Ignoring the event since table {} is not found",
             getFullyQualifiedTblName());
+        
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
       }
     }
 
@@ -1158,13 +1171,17 @@ public class MetastoreEvents {
         if (numPartsRefreshed > 0) {
           
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
                   .inc(numPartsRefreshed);
+        } else if (numPartsRefreshed == -1) {
+          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         }
       } catch (TableNotLoadedException e) {
         debugLog("Ignoring the event since table {} is not loaded",
             getFullyQualifiedTblName());
+        
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
       } catch (DatabaseNotFoundException | TableNotFoundException e) {
         debugLog("Ignoring the event since table {} is not found",
             getFullyQualifiedTblName());
+        
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
       }
     }
 
@@ -1640,7 +1657,7 @@ public class MetastoreEvents {
         // forcing file metadata reload so that new files (due to insert) are 
reflected
         // HdfsPartition
         reloadPartitions(Arrays.asList(insertPartition_),
-            FileMetadataLoadOpts.FORCE_LOAD, "INSERT event");
+            FileMetadataLoadOpts.FORCE_LOAD, "INSERT event", false);
       } catch (CatalogException e) {
         throw new 
MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
                 + "partition on table {} partition {} failed. Event processing 
cannot "
@@ -1657,7 +1674,10 @@ public class MetastoreEvents {
       // For non-partitioned tables, refresh the whole table.
       Preconditions.checkState(insertPartition_ == null);
       try {
-        reloadTableFromCatalog("INSERT event", false);
+        boolean notSkipped = reloadTableFromCatalog("INSERT event", false);
+        if (!notSkipped) {
+          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+        }
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(
             debugString("Refresh table {} failed. Event processing "
@@ -1737,16 +1757,20 @@ public class MetastoreEvents {
           .renameTableFromEvent(getEventId(), tableBefore_, tableAfter_, 
oldTblRemoved,
               newTblAdded);
 
-      if (oldTblRemoved.getRef()) {
-        
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_REMOVED).inc();
-      }
-      if (newTblAdded.getRef()) {
-        
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc();
-      }
-      if (!oldTblRemoved.getRef() || !newTblAdded.getRef()) {
+      // Only bump the skipped metric if the old table is not removed and the 
new table
+      // is not added. Not doing this in other cases since we need to either 
remove the
+      // old table or add the new table, which is processing the event.
+      if (!oldTblRemoved.getRef() && !newTblAdded.getRef()) {
         
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         debugLog("Incremented skipped metric to " + metrics_
-            
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
+                
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
+      } else {
+        if (oldTblRemoved.getRef()) {
+          
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_REMOVED).inc();
+        }
+        if (newTblAdded.getRef()) {
+          
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc();
+        }
       }
     }
 
@@ -1794,6 +1818,7 @@ public class MetastoreEvents {
       // Ignore the event if this is a trivial event. See javadoc for
       // canBeSkipped() for examples.
       if (canBeSkipped()) {
+        
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         infoLog("Not processing this event as it only modifies some table 
parameters "
             + "which can be ignored.");
         return;
@@ -1808,7 +1833,10 @@ public class MetastoreEvents {
         // refresh, eg. this could be due to as simple as adding a new 
parameter or a
         // full blown adding or changing column type
         // rename is already handled above
-        reloadTableFromCatalog("ALTER_TABLE", false);
+        boolean notSkipped = reloadTableFromCatalog("ALTER_TABLE", false);
+        if (!notSkipped) {
+          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+        }
       }
       long durationNs = System.nanoTime() - startNs;
       // Log event details for those triggered slow reload.
@@ -2476,7 +2504,10 @@ public class MetastoreEvents {
             
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
         if ((AcidUtils.isTransactionalTable(msTbl_.getParameters()) && 
!isSelfEvent() &&
             !incrementalRefresh) || 
MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
-          reloadTableFromCatalog("ADD_PARTITION", true);
+           boolean notSkipped = reloadTableFromCatalog("ADD_PARTITION", true);
+           if (!notSkipped) {
+             
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+           }
         } else {
           // HMS adds partitions in a transactional way. This means there may 
be multiple
           // HMS partition objects in an add_partition event. We try to do the 
same here
@@ -2493,8 +2524,9 @@ public class MetastoreEvents {
                 .inc(numPartsAdded);
           } else {
             
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-            debugLog("Incremented skipped metric to " + metrics_
-                
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
+            debugLog("Incremented skipped metric to {} since no partitions 
were added.",
+            metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+                .getCount());
           }
         }
       } catch (CatalogException e) {
@@ -2615,6 +2647,7 @@ public class MetastoreEvents {
       // Ignore the event if this is a trivial event. See javadoc for
       // isTrivialAlterPartitionEvent() for examples.
       if (canBeSkipped()) {
+        
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         infoLog("Not processing this event as it only modifies some partition "
             + "parameters which can be ignored.");
         return;
@@ -2640,7 +2673,7 @@ public class MetastoreEvents {
               isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD :
                   FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
           reloadPartitions(Arrays.asList(partitionAfter_), 
fileMetadataLoadOpts,
-              "ALTER_PARTITION event");
+              "ALTER_PARTITION event", false);
         } catch (CatalogException e) {
           throw new MetastoreNotificationNeedsInvalidateException(
               debugString("Refresh partition on table {} partition {} failed. 
Event " +
@@ -2685,7 +2718,10 @@ public class MetastoreEvents {
         reloadPartitionsFromEvent(Collections.singletonList(partitionAfter_),
             "ALTER_PARTITION EVENT FOR TRANSACTIONAL TABLE");
       } else {
-        reloadTableFromCatalog("ALTER_PARTITION", true);
+        boolean notSkipped = reloadTableFromCatalog("ALTER_PARTITION", true);
+        if (!notSkipped) {
+          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+        }
       }
     }
   }
@@ -2782,6 +2818,12 @@ public class MetastoreEvents {
           eventsToProcess.add(event);
         }
       }
+      int notSkippedNum = eventsToProcess.size() + 
partitionEventsToForceReload.size();
+      int skippedNum = batchedEvents_.size() - notSkippedNum;
+      if (skippedNum > 0) {
+        metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+            .inc(skippedNum);
+      }
       if (eventsToProcess.isEmpty() && partitionEventsToForceReload.isEmpty()) 
{
         LOG.info(
             "Ignoring {} events between event id {} and {} since they modify 
parameters"
@@ -2792,7 +2834,11 @@ public class MetastoreEvents {
 
       // Reload the whole table if it's a transactional table.
       if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) {
-        reloadTableFromCatalog(getEventType().toString(), true);
+        boolean notSkipped = reloadTableFromCatalog(getEventType().toString(), 
true);
+        if (!notSkipped) {
+          metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC)
+              .inc(eventsToProcess.size() + 
partitionEventsToForceReload.size());
+        }
       } else {
         // Reload the partitions from the batch.
         List<Partition> partitions = new ArrayList<>();
@@ -2804,18 +2850,20 @@ public class MetastoreEvents {
             // for insert event, always reload file metadata so that new files
             // are reflected in HdfsPartition
             reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD,
-                getEventType().toString() + " event");
+                getEventType().toString() + " event", true);
           } else {
             if (!partitionEventsToForceReload.isEmpty()) {
               // force reload truncated partitions
               reloadPartitions(partitionEventsToForceReload,
-                  FileMetadataLoadOpts.FORCE_LOAD, getEventType().toString() + 
" event");
+                  FileMetadataLoadOpts.FORCE_LOAD, getEventType().toString()
+                  + " event", true);
             }
             if (!partitions.isEmpty()) {
               // alter partition event. Reload file metadata of only those 
partitions
               // for which sd has changed
-              reloadPartitions(partitions, 
FileMetadataLoadOpts.LOAD_IF_SD_CHANGED,
-                  getEventType().toString() + " event");
+              reloadPartitions(partitions,
+                  FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, 
getEventType().toString()
+                  + " event", true);
             }
           }
         } catch (CatalogException e) {
@@ -2915,7 +2963,10 @@ public class MetastoreEvents {
             
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
         if ((AcidUtils.isTransactionalTable(msTbl_.getParameters()) &&
             !incrementalRefresh) || 
MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
-          reloadTableFromCatalog("DROP_PARTITION", true);
+          boolean notSkipped = reloadTableFromCatalog("DROP_PARTITION", true);
+          if (!notSkipped) {
+            
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+          }
         } else {
           int numPartsRemoved = catalogOpExecutor_
               .removePartitionsIfNotAddedLater(getEventId(), dbName_, tblName_,
@@ -3102,7 +3153,7 @@ public class MetastoreEvents {
         // forcing file metadata reload so that new files (due to refresh) are 
reflected
         // HdfsPartition
         reloadPartitions(Arrays.asList(reloadPartition_),
-            FileMetadataLoadOpts.FORCE_LOAD, "RELOAD event");
+            FileMetadataLoadOpts.FORCE_LOAD, "RELOAD event", false);
       } catch (CatalogException e) {
         throw new 
MetastoreNotificationNeedsInvalidateException(debugString("Refresh "
             + "partition on table {} partition {} failed. Event processing 
cannot "
@@ -3120,7 +3171,10 @@ public class MetastoreEvents {
       Preconditions.checkState(reloadPartition_ == null);
       try {
         // we always treat the table as non-transactional so all the files are 
reloaded
-        reloadTableFromCatalog("RELOAD event", false);
+        boolean notSkipped = reloadTableFromCatalog("RELOAD event", false);
+        if (!notSkipped) {
+          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+        }
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(
             debugString("Refresh table {} failed. Event processing "
@@ -3139,15 +3193,18 @@ public class MetastoreEvents {
         if (tbl == null) {
           infoLog("Skipping on table {}.{} since it does not exist in cache", 
dbName_,
               tblName_);
+          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
           return ;
         }
         if (tbl instanceof IncompleteTable) {
           infoLog("Skipping on an incomplete table {}", tbl.getFullName());
+          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
           return ;
         }
       } catch (DatabaseNotFoundException e) {
         infoLog("Skipping on table {} because db {} not found in cache", 
tblName_,
             dbName_);
+        
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
         return ;
       }
       catalog_.invalidateTable(tbl.getTableName().toThrift(),
@@ -3267,7 +3324,10 @@ public class MetastoreEvents {
     protected void processTableEvent() throws MetastoreNotificationException {
       try {
         if (partitionName_ == null) {
-          reloadTableFromCatalog("Commit Compaction event", true);
+          boolean notSkipped = reloadTableFromCatalog("Commit Compaction 
event", true);
+          if (!notSkipped) {
+            
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+          }
         } else {
           reloadPartitionsFromNames(Arrays.asList(partitionName_),
                   "Commit compaction event", FileMetadataLoadOpts.FORCE_LOAD);
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 0d0c858b7..40408ff44 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -237,8 +237,9 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   // rate of events received per unit time
   public static final String EVENTS_RECEIVED_METRIC = "events-received";
   // total number of events which are skipped because of the flag setting or
-  // in case of [CREATE|DROP] events on [DATABASE|TABLE|PARTITION] which were 
ignored
-  // because the [DATABASE|TABLE|PARTITION] was already [PRESENT|ABSENT] in 
the catalogd.
+  // in case of [CREATE|DROP|ALTER] events on [DATABASE|TABLE|PARTITION] which 
were
+  // ignored because the [DATABASE|TABLE|PARTITION] was already 
[PRESENT|ABSENT] in
+  // the catalogd.
   public static final String EVENTS_SKIPPED_METRIC = "events-skipped";
   // name of the event processor status metric
   public static final String STATUS_METRIC = "status";
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 4a7d31b24..620e739f0 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -5017,8 +5017,9 @@ public class CatalogOpExecutor {
    * @param partNames List of partition names from the events to be reloaded.
    * @param reason Reason for reloading the partitions for logging purposes.
    * @param fileMetadataLoadOpts describes how to reload file metadata for 
partsFromEvent
-   * @return the number of partitions which were reloaded. If the table does 
not exist,
-   * returns 0. Some partitions could be skipped if they don't exist anymore.
+   * @return the number of partitions which were reloaded. If the table does 
not exist, or
+   * if the table is IncompleteTable or if the table if already synced then 
returns -1.
+   * Some partitions could be skipped if they don't exist anymore.
    */
   public int reloadPartitionsFromNamesIfExists(long eventId, String eventType,
       String dbName, String tblName, List<String> partNames, String reason,
@@ -5031,7 +5032,7 @@ public class CatalogOpExecutor {
           .wasRemovedAfter(eventId, DeleteEventLog.getTblKey(dbName, 
tblName))) {
         LOG.info("EventId: {} EventType: {} Not reloading the partition of 
table {}.{} " +
             "since it was removed later in catalog", eventId, eventType, 
dbName, tblName);
-        return 0;
+        return -1;
       } else {
         throw new TableNotFoundException(
             "Table " + dbName + "." + tblName + " not found");
@@ -5104,7 +5105,8 @@ public class CatalogOpExecutor {
    *                       reloaded.
    * @param reason Reason for reloading the partitions for logging purposes.
    * @return the number of partitions which were reloaded. If the table does 
not exist,
-   * returns 0. Some partitions could be skipped if they don't exist anymore.
+   * or if the table is IncompleteTable or if the table is recreated then 
returns -1.
+   * Some partitions could be skipped if they don't exist anymore.
    */
   public int reloadPartitionsFromEvent(long eventId, String dbName, String 
tblName,
       List<Partition> partsFromEvent, String reason)
@@ -5118,7 +5120,10 @@ public class CatalogOpExecutor {
         LOG.info(
             "Not reloading the partition of table {} since it was removed "
                 + "later in catalog", new TableName(dbName, tblName));
-        return 0;
+        // as the numOfPartsReloaded can be 0 which means no partition is 
reloaded, so
+        // we just return -1, which means the event can be skipped and we do 
not execute
+        // hdfsTable's reloadPartitionsFromNames function.
+        return -1;
       } else {
         throw new TableNotFoundException(
             "Table " + dbName + "." + tblName + " not found");
@@ -5127,7 +5132,7 @@ public class CatalogOpExecutor {
     if (table instanceof IncompleteTable) {
       LOG.info("Table {} is not loaded. Skipping drop partition event {}",
           table.getFullName(), eventId);
-      return 0;
+      return -1;
     }
     if (!(table instanceof HdfsTable)) {
       throw new CatalogException("Partition event received on a non-hdfs 
table");
@@ -5135,7 +5140,7 @@ public class CatalogOpExecutor {
     if (eventId > 0 && eventId <= table.getCreateEventId()) {
       LOG.debug("Not reloading partitions of table {}.{} for event {} since it 
is " +
           "recreated at event {}.", dbName, tblName, eventId, 
table.getCreateEventId());
-      return 0;
+      return -1;
     }
     try {
       tryWriteLock(table, reason, NoOpEventSequence.INSTANCE);
@@ -5194,7 +5199,7 @@ public class CatalogOpExecutor {
     if (table instanceof IncompleteTable) {
       LOG.info("Table {} is not loaded. Skipping partition event {}",
           table.getFullName(), eventId);
-      return 0;
+      return -1;
     }
     if (!(table instanceof HdfsTable)) {
       throw new CatalogException("Partition event received on a non-hdfs 
table");
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 5a800ca60..2b55c3548 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -1990,9 +1990,10 @@ public class MetastoreEventsProcessorTest {
     assertEquals(EventProcessorStatus.ACTIVE.toString(), response.getStatus());
     assertTrue("Atleast 5 events should have been received",
         response.getEvents_received() >= numEventsReceivedBefore + 5);
-    // two events on tbl which is skipped
-    assertTrue("Atleast 2 events should have been skipped",
-        response.getEvents_skipped() >= numEventsSkippedBefore + 2);
+    // The create table and add partition events of table tbl_should_skipped 
and
+    // the add partition event of table testEventProcessorMetrics are all 
skipped
+    assertTrue("3 events should be skipped",
+        response.getEvents_skipped() == numEventsSkippedBefore + 3);
     assertTrue("Event fetch duration should be greater than zero",
         response.getEvents_fetch_duration_mean() > 0);
     assertTrue("Event process duration should be greater than zero",
@@ -2003,6 +2004,74 @@ public class MetastoreEventsProcessorTest {
     assertTrue(response.getLast_synced_event_id() > lastEventSyncId);
   }
 
+  @Test
+  public void testEventProcessorMetricsForSkippedMetric() throws TException {
+    TEventProcessorMetrics responseBefore = 
eventsProcessor_.getEventProcessorMetrics();
+    long numEventsReceivedBefore = responseBefore.getEvents_received();
+    long numEventsSkippedBefore = responseBefore.getEvents_skipped();
+    long lastEventSyncId = responseBefore.getLast_synced_event_id();
+    final String testTblName1 = "testEventProcessorMetrics1";
+    final String testTblName2 = "testEventProcessorMetrics2";
+    // event 1
+    createDatabase(TEST_DB_NAME, null);
+    // event 2
+    createTable(null, TEST_DB_NAME, testTblName1, null, true, null);
+    List<List<String>> partitionVals = new ArrayList<>();
+    partitionVals.add(Arrays.asList("1"));
+    partitionVals.add(Arrays.asList("2"));
+    partitionVals.add(Arrays.asList("3"));
+    // event 3
+    addPartitions(TEST_DB_NAME, testTblName1, partitionVals);
+    eventsProcessor_.processEvents();
+    TEventProcessorMetrics response = 
eventsProcessor_.getEventProcessorMetrics();
+    assertEquals(EventProcessorStatus.ACTIVE.toString(), response.getStatus());
+    assertTrue("Atleast 3 events should have been received",
+            response.getEvents_received() >= numEventsReceivedBefore + 3);
+    assertTrue("we do not turn off disableHmsSync for table testTblName1",
+            response.getEvents_skipped() >= numEventsSkippedBefore);
+    TEventProcessorMetricsSummaryResponse summaryResponse =
+            catalog_.getEventProcessorSummary();
+    assertNotNull(summaryResponse);
+    assertTrue(response.getLast_synced_event_id() > lastEventSyncId);
+
+    // invalidate the table and the table will be IncompleteTable, then
+    // ALTER events will be skipped.
+    catalog_.invalidateTableIfExists(TEST_DB_NAME, testTblName1);
+    //event 4 alter table
+    alterTableAddCol(testTblName1, "newCol", "int", "no decription");
+    // event 5 alter partition
+    partitionVals.clear();
+    partitionVals.add(Arrays.asList("1"));
+    partitionVals.add(Arrays.asList("2"));
+    partitionVals.add(Arrays.asList("3"));
+    String newLocation = "/path/to/new_location/";
+    alterPartitions(testTblName1, partitionVals, newLocation);
+    eventsProcessor_.processEvents();
+    response = eventsProcessor_.getEventProcessorMetrics();
+    assertEquals(EventProcessorStatus.ACTIVE.toString(), response.getStatus());
+    assertTrue("two more events should have been received",
+            response.getEvents_received() >= numEventsReceivedBefore + 5);
+    assertTrue("we do not turn off disableHmsSync for table testTblName1",
+            response.getEvents_skipped() >= numEventsSkippedBefore);
+
+    // create with transaction table
+    //event 6
+    createTransactionalTable(TEST_DB_NAME, testTblName2, true);
+    partitionVals.clear();
+    partitionVals.add(Arrays.asList("1"));
+    partitionVals.add(Arrays.asList("2"));
+    partitionVals.add(Arrays.asList("3"));
+    String anotherNewLocation = "/path/to/another_new_location/";
+    //event 7
+    alterPartitions(testTblName1, partitionVals, anotherNewLocation);
+    eventsProcessor_.processEvents();
+    response = eventsProcessor_.getEventProcessorMetrics();
+    assertEquals(EventProcessorStatus.ACTIVE.toString(), response.getStatus());
+    assertTrue("two more events should have been received",
+            response.getEvents_received() >= numEventsReceivedBefore + 7);
+    assertTrue("we do not turn off disableHmsSync for table testTblName1",
+            response.getEvents_skipped() >= numEventsSkippedBefore);
+  }
   /**
    * Test makes sure that the event metrics are not set when event processor 
is not active
    */

Reply via email to