This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1fda3e3d6f8f69802d68f041cb90ac6c5e93a3cd Author: maxwellguo <[email protected]> AuthorDate: Wed May 29 22:11:49 2024 +0800 IMPALA-12771: Impala catalogd events-skipped may mark the wrong number This patch modified the description of events-skipped metric which missing the description of ATLER event for DB and Table. Besides there are some cases where event-skipped metric is incorrectly marked, most are for transactional table: 1. The metric should increase when processing AddPartitionEvent on the transactional table which is not in loaded state or when db is not found. 2. The metric should increase when processing AlterPartitionEvent on the transactional table which is not in loaded status or when db is not found. 3. The metric should increase when processing AlterPartitionEvent if the event is a trivial event and can be skipped. 4. When processing BatchPartitionEvent, the number of skipped event is not marked before doing any real work. 5. The metric should increase when processing DropPartitionEvent on the transactional table which is not in loaded status or when db is not found. testing: - add test cases for add database, creata table, alter table, add partition , create transactional table,alter partition for transactional and test the skipped metric number. Change-Id: I7aeb04e999b82187eb138c0b643ead259da22f1a Reviewed-on: http://gerrit.cloudera.org:8080/21045 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/events/MetastoreEvents.java | 126 +++++++++++++++------ .../catalog/events/MetastoreEventsProcessor.java | 5 +- .../apache/impala/service/CatalogOpExecutor.java | 21 ++-- .../events/MetastoreEventsProcessorTest.java | 75 +++++++++++- 4 files changed, 181 insertions(+), 46 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 79592cca6..10d799426 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -1067,7 +1067,7 @@ public class MetastoreEvents { * Helper function to initiate a table reload on Catalog. Re-throws the exception if * the catalog operation throws. */ - protected void reloadTableFromCatalog(String operation, boolean isTransactional) + protected boolean reloadTableFromCatalog(String operation, boolean isTransactional) throws CatalogException { try { if (!catalog_.reloadTableIfExists(dbName_, tblName_, @@ -1076,11 +1076,7 @@ public class MetastoreEvents { debugLog("Automatic refresh on table {} failed as the table " + "either does not exist anymore or is not in loaded state.", getFullyQualifiedTblName()); - metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); - debugLog("Incremented skipped metric to " - + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) - .getCount()); - return; + return false; } } catch (TableLoadingException | DatabaseNotFoundException e) { // there could be many reasons for receiving a tableLoading exception, @@ -1089,11 +1085,12 @@ public class MetastoreEvents { // we can do here other than log it appropriately. debugLog("Table {} was not refreshed due to error {}", getFullyQualifiedTblName(), e.getMessage()); - return; + return false; } String tblStr = isTransactional ? "transactional table" : "table"; infoLog("Refreshed {} {}", tblStr, getFullyQualifiedTblName()); metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLE_REFRESHES).inc(); + return true; } /** @@ -1103,10 +1100,12 @@ public class MetastoreEvents { * @param fileMetadataLoadOpts: describes how to reload file metadata for partitions * @param reason The reason for reload operation which is used for logging by * catalogd. + * @param batch flag to show if the function is called by the batch event process */ protected void reloadPartitions(List<Partition> partitions, - FileMetadataLoadOpts fileMetadataLoadOpts, String reason) + FileMetadataLoadOpts fileMetadataLoadOpts, String reason, boolean batch) throws CatalogException { + int skippedEvent = batch ? partitions.size() : 1; try { int numPartsRefreshed = catalogOpExecutor_.reloadPartitionsIfExist(getEventId(), getEventType().toString(), dbName_, tblName_, partitions, reason, @@ -1114,13 +1113,23 @@ public class MetastoreEvents { if (numPartsRefreshed > 0) { metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES) .inc(numPartsRefreshed); + } else if (numPartsRefreshed == -1) { + debugLog("Ignoring the event since table {} is not loadded or " + + "table was removed latter in catalog or table is synced." + , getFullyQualifiedTblName()); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .inc(skippedEvent); } } catch (TableNotLoadedException e) { debugLog("Ignoring the event since table {} is not loaded", getFullyQualifiedTblName()); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .inc(skippedEvent); } catch (DatabaseNotFoundException | TableNotFoundException e) { debugLog("Ignoring the event since table {} is not found", getFullyQualifiedTblName()); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .inc(skippedEvent); } } @@ -1139,13 +1148,17 @@ public class MetastoreEvents { if (numPartsRefreshed > 0) { metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES) .inc(numPartsRefreshed); + } else if (numPartsRefreshed == -1) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } } catch (TableNotLoadedException e) { debugLog("Ignoring the event since table {} is not loaded", getFullyQualifiedTblName()); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } catch (DatabaseNotFoundException | TableNotFoundException e) { debugLog("Ignoring the event since table {} is not found", getFullyQualifiedTblName()); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } } @@ -1158,13 +1171,17 @@ public class MetastoreEvents { if (numPartsRefreshed > 0) { metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES) .inc(numPartsRefreshed); + } else if (numPartsRefreshed == -1) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } } catch (TableNotLoadedException e) { debugLog("Ignoring the event since table {} is not loaded", getFullyQualifiedTblName()); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } catch (DatabaseNotFoundException | TableNotFoundException e) { debugLog("Ignoring the event since table {} is not found", getFullyQualifiedTblName()); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } } @@ -1640,7 +1657,7 @@ public class MetastoreEvents { // forcing file metadata reload so that new files (due to insert) are reflected // HdfsPartition reloadPartitions(Arrays.asList(insertPartition_), - FileMetadataLoadOpts.FORCE_LOAD, "INSERT event"); + FileMetadataLoadOpts.FORCE_LOAD, "INSERT event", false); } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh " + "partition on table {} partition {} failed. Event processing cannot " @@ -1657,7 +1674,10 @@ public class MetastoreEvents { // For non-partitioned tables, refresh the whole table. Preconditions.checkState(insertPartition_ == null); try { - reloadTableFromCatalog("INSERT event", false); + boolean notSkipped = reloadTableFromCatalog("INSERT event", false); + if (!notSkipped) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); + } } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException( debugString("Refresh table {} failed. Event processing " @@ -1737,16 +1757,20 @@ public class MetastoreEvents { .renameTableFromEvent(getEventId(), tableBefore_, tableAfter_, oldTblRemoved, newTblAdded); - if (oldTblRemoved.getRef()) { - metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_REMOVED).inc(); - } - if (newTblAdded.getRef()) { - metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc(); - } - if (!oldTblRemoved.getRef() || !newTblAdded.getRef()) { + // Only bump the skipped metric if the old table is not removed and the new table + // is not added. Not doing this in other cases since we need to either remove the + // old table or add the new table, which is processing the event. + if (!oldTblRemoved.getRef() && !newTblAdded.getRef()) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); debugLog("Incremented skipped metric to " + metrics_ - .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount()); + .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount()); + } else { + if (oldTblRemoved.getRef()) { + metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_REMOVED).inc(); + } + if (newTblAdded.getRef()) { + metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_TABLES_ADDED).inc(); + } } } @@ -1794,6 +1818,7 @@ public class MetastoreEvents { // Ignore the event if this is a trivial event. See javadoc for // canBeSkipped() for examples. if (canBeSkipped()) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); infoLog("Not processing this event as it only modifies some table parameters " + "which can be ignored."); return; @@ -1808,7 +1833,10 @@ public class MetastoreEvents { // refresh, eg. this could be due to as simple as adding a new parameter or a // full blown adding or changing column type // rename is already handled above - reloadTableFromCatalog("ALTER_TABLE", false); + boolean notSkipped = reloadTableFromCatalog("ALTER_TABLE", false); + if (!notSkipped) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); + } } long durationNs = System.nanoTime() - startNs; // Log event details for those triggered slow reload. @@ -2476,7 +2504,10 @@ public class MetastoreEvents { BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable(); if ((AcidUtils.isTransactionalTable(msTbl_.getParameters()) && !isSelfEvent() && !incrementalRefresh) || MetaStoreUtils.isMaterializedViewTable(msTbl_)) { - reloadTableFromCatalog("ADD_PARTITION", true); + boolean notSkipped = reloadTableFromCatalog("ADD_PARTITION", true); + if (!notSkipped) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); + } } else { // HMS adds partitions in a transactional way. This means there may be multiple // HMS partition objects in an add_partition event. We try to do the same here @@ -2493,8 +2524,9 @@ public class MetastoreEvents { .inc(numPartsAdded); } else { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); - debugLog("Incremented skipped metric to " + metrics_ - .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount()); + debugLog("Incremented skipped metric to {} since no partitions were added.", + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .getCount()); } } } catch (CatalogException e) { @@ -2615,6 +2647,7 @@ public class MetastoreEvents { // Ignore the event if this is a trivial event. See javadoc for // isTrivialAlterPartitionEvent() for examples. if (canBeSkipped()) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); infoLog("Not processing this event as it only modifies some partition " + "parameters which can be ignored."); return; @@ -2640,7 +2673,7 @@ public class MetastoreEvents { isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD : FileMetadataLoadOpts.LOAD_IF_SD_CHANGED; reloadPartitions(Arrays.asList(partitionAfter_), fileMetadataLoadOpts, - "ALTER_PARTITION event"); + "ALTER_PARTITION event", false); } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException( debugString("Refresh partition on table {} partition {} failed. Event " + @@ -2685,7 +2718,10 @@ public class MetastoreEvents { reloadPartitionsFromEvent(Collections.singletonList(partitionAfter_), "ALTER_PARTITION EVENT FOR TRANSACTIONAL TABLE"); } else { - reloadTableFromCatalog("ALTER_PARTITION", true); + boolean notSkipped = reloadTableFromCatalog("ALTER_PARTITION", true); + if (!notSkipped) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); + } } } } @@ -2782,6 +2818,12 @@ public class MetastoreEvents { eventsToProcess.add(event); } } + int notSkippedNum = eventsToProcess.size() + partitionEventsToForceReload.size(); + int skippedNum = batchedEvents_.size() - notSkippedNum; + if (skippedNum > 0) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .inc(skippedNum); + } if (eventsToProcess.isEmpty() && partitionEventsToForceReload.isEmpty()) { LOG.info( "Ignoring {} events between event id {} and {} since they modify parameters" @@ -2792,7 +2834,11 @@ public class MetastoreEvents { // Reload the whole table if it's a transactional table. if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) { - reloadTableFromCatalog(getEventType().toString(), true); + boolean notSkipped = reloadTableFromCatalog(getEventType().toString(), true); + if (!notSkipped) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) + .inc(eventsToProcess.size() + partitionEventsToForceReload.size()); + } } else { // Reload the partitions from the batch. List<Partition> partitions = new ArrayList<>(); @@ -2804,18 +2850,20 @@ public class MetastoreEvents { // for insert event, always reload file metadata so that new files // are reflected in HdfsPartition reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD, - getEventType().toString() + " event"); + getEventType().toString() + " event", true); } else { if (!partitionEventsToForceReload.isEmpty()) { // force reload truncated partitions reloadPartitions(partitionEventsToForceReload, - FileMetadataLoadOpts.FORCE_LOAD, getEventType().toString() + " event"); + FileMetadataLoadOpts.FORCE_LOAD, getEventType().toString() + + " event", true); } if (!partitions.isEmpty()) { // alter partition event. Reload file metadata of only those partitions // for which sd has changed - reloadPartitions(partitions, FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, - getEventType().toString() + " event"); + reloadPartitions(partitions, + FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, getEventType().toString() + + " event", true); } } } catch (CatalogException e) { @@ -2915,7 +2963,10 @@ public class MetastoreEvents { BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable(); if ((AcidUtils.isTransactionalTable(msTbl_.getParameters()) && !incrementalRefresh) || MetaStoreUtils.isMaterializedViewTable(msTbl_)) { - reloadTableFromCatalog("DROP_PARTITION", true); + boolean notSkipped = reloadTableFromCatalog("DROP_PARTITION", true); + if (!notSkipped) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); + } } else { int numPartsRemoved = catalogOpExecutor_ .removePartitionsIfNotAddedLater(getEventId(), dbName_, tblName_, @@ -3102,7 +3153,7 @@ public class MetastoreEvents { // forcing file metadata reload so that new files (due to refresh) are reflected // HdfsPartition reloadPartitions(Arrays.asList(reloadPartition_), - FileMetadataLoadOpts.FORCE_LOAD, "RELOAD event"); + FileMetadataLoadOpts.FORCE_LOAD, "RELOAD event", false); } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh " + "partition on table {} partition {} failed. Event processing cannot " @@ -3120,7 +3171,10 @@ public class MetastoreEvents { Preconditions.checkState(reloadPartition_ == null); try { // we always treat the table as non-transactional so all the files are reloaded - reloadTableFromCatalog("RELOAD event", false); + boolean notSkipped = reloadTableFromCatalog("RELOAD event", false); + if (!notSkipped) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); + } } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException( debugString("Refresh table {} failed. Event processing " @@ -3139,15 +3193,18 @@ public class MetastoreEvents { if (tbl == null) { infoLog("Skipping on table {}.{} since it does not exist in cache", dbName_, tblName_); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); return ; } if (tbl instanceof IncompleteTable) { infoLog("Skipping on an incomplete table {}", tbl.getFullName()); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); return ; } } catch (DatabaseNotFoundException e) { infoLog("Skipping on table {} because db {} not found in cache", tblName_, dbName_); + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); return ; } catalog_.invalidateTable(tbl.getTableName().toThrift(), @@ -3267,7 +3324,10 @@ public class MetastoreEvents { protected void processTableEvent() throws MetastoreNotificationException { try { if (partitionName_ == null) { - reloadTableFromCatalog("Commit Compaction event", true); + boolean notSkipped = reloadTableFromCatalog("Commit Compaction event", true); + if (!notSkipped) { + metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); + } } else { reloadPartitionsFromNames(Arrays.asList(partitionName_), "Commit compaction event", FileMetadataLoadOpts.FORCE_LOAD); diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java index 0d0c858b7..40408ff44 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java @@ -237,8 +237,9 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { // rate of events received per unit time public static final String EVENTS_RECEIVED_METRIC = "events-received"; // total number of events which are skipped because of the flag setting or - // in case of [CREATE|DROP] events on [DATABASE|TABLE|PARTITION] which were ignored - // because the [DATABASE|TABLE|PARTITION] was already [PRESENT|ABSENT] in the catalogd. + // in case of [CREATE|DROP|ALTER] events on [DATABASE|TABLE|PARTITION] which were + // ignored because the [DATABASE|TABLE|PARTITION] was already [PRESENT|ABSENT] in + // the catalogd. public static final String EVENTS_SKIPPED_METRIC = "events-skipped"; // name of the event processor status metric public static final String STATUS_METRIC = "status"; diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 4a7d31b24..620e739f0 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -5017,8 +5017,9 @@ public class CatalogOpExecutor { * @param partNames List of partition names from the events to be reloaded. * @param reason Reason for reloading the partitions for logging purposes. * @param fileMetadataLoadOpts describes how to reload file metadata for partsFromEvent - * @return the number of partitions which were reloaded. If the table does not exist, - * returns 0. Some partitions could be skipped if they don't exist anymore. + * @return the number of partitions which were reloaded. If the table does not exist, or + * if the table is IncompleteTable or if the table if already synced then returns -1. + * Some partitions could be skipped if they don't exist anymore. */ public int reloadPartitionsFromNamesIfExists(long eventId, String eventType, String dbName, String tblName, List<String> partNames, String reason, @@ -5031,7 +5032,7 @@ public class CatalogOpExecutor { .wasRemovedAfter(eventId, DeleteEventLog.getTblKey(dbName, tblName))) { LOG.info("EventId: {} EventType: {} Not reloading the partition of table {}.{} " + "since it was removed later in catalog", eventId, eventType, dbName, tblName); - return 0; + return -1; } else { throw new TableNotFoundException( "Table " + dbName + "." + tblName + " not found"); @@ -5104,7 +5105,8 @@ public class CatalogOpExecutor { * reloaded. * @param reason Reason for reloading the partitions for logging purposes. * @return the number of partitions which were reloaded. If the table does not exist, - * returns 0. Some partitions could be skipped if they don't exist anymore. + * or if the table is IncompleteTable or if the table is recreated then returns -1. + * Some partitions could be skipped if they don't exist anymore. */ public int reloadPartitionsFromEvent(long eventId, String dbName, String tblName, List<Partition> partsFromEvent, String reason) @@ -5118,7 +5120,10 @@ public class CatalogOpExecutor { LOG.info( "Not reloading the partition of table {} since it was removed " + "later in catalog", new TableName(dbName, tblName)); - return 0; + // as the numOfPartsReloaded can be 0 which means no partition is reloaded, so + // we just return -1, which means the event can be skipped and we do not execute + // hdfsTable's reloadPartitionsFromNames function. + return -1; } else { throw new TableNotFoundException( "Table " + dbName + "." + tblName + " not found"); @@ -5127,7 +5132,7 @@ public class CatalogOpExecutor { if (table instanceof IncompleteTable) { LOG.info("Table {} is not loaded. Skipping drop partition event {}", table.getFullName(), eventId); - return 0; + return -1; } if (!(table instanceof HdfsTable)) { throw new CatalogException("Partition event received on a non-hdfs table"); @@ -5135,7 +5140,7 @@ public class CatalogOpExecutor { if (eventId > 0 && eventId <= table.getCreateEventId()) { LOG.debug("Not reloading partitions of table {}.{} for event {} since it is " + "recreated at event {}.", dbName, tblName, eventId, table.getCreateEventId()); - return 0; + return -1; } try { tryWriteLock(table, reason, NoOpEventSequence.INSTANCE); @@ -5194,7 +5199,7 @@ public class CatalogOpExecutor { if (table instanceof IncompleteTable) { LOG.info("Table {} is not loaded. Skipping partition event {}", table.getFullName(), eventId); - return 0; + return -1; } if (!(table instanceof HdfsTable)) { throw new CatalogException("Partition event received on a non-hdfs table"); diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index 5a800ca60..2b55c3548 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -1990,9 +1990,10 @@ public class MetastoreEventsProcessorTest { assertEquals(EventProcessorStatus.ACTIVE.toString(), response.getStatus()); assertTrue("Atleast 5 events should have been received", response.getEvents_received() >= numEventsReceivedBefore + 5); - // two events on tbl which is skipped - assertTrue("Atleast 2 events should have been skipped", - response.getEvents_skipped() >= numEventsSkippedBefore + 2); + // The create table and add partition events of table tbl_should_skipped and + // the add partition event of table testEventProcessorMetrics are all skipped + assertTrue("3 events should be skipped", + response.getEvents_skipped() == numEventsSkippedBefore + 3); assertTrue("Event fetch duration should be greater than zero", response.getEvents_fetch_duration_mean() > 0); assertTrue("Event process duration should be greater than zero", @@ -2003,6 +2004,74 @@ public class MetastoreEventsProcessorTest { assertTrue(response.getLast_synced_event_id() > lastEventSyncId); } + @Test + public void testEventProcessorMetricsForSkippedMetric() throws TException { + TEventProcessorMetrics responseBefore = eventsProcessor_.getEventProcessorMetrics(); + long numEventsReceivedBefore = responseBefore.getEvents_received(); + long numEventsSkippedBefore = responseBefore.getEvents_skipped(); + long lastEventSyncId = responseBefore.getLast_synced_event_id(); + final String testTblName1 = "testEventProcessorMetrics1"; + final String testTblName2 = "testEventProcessorMetrics2"; + // event 1 + createDatabase(TEST_DB_NAME, null); + // event 2 + createTable(null, TEST_DB_NAME, testTblName1, null, true, null); + List<List<String>> partitionVals = new ArrayList<>(); + partitionVals.add(Arrays.asList("1")); + partitionVals.add(Arrays.asList("2")); + partitionVals.add(Arrays.asList("3")); + // event 3 + addPartitions(TEST_DB_NAME, testTblName1, partitionVals); + eventsProcessor_.processEvents(); + TEventProcessorMetrics response = eventsProcessor_.getEventProcessorMetrics(); + assertEquals(EventProcessorStatus.ACTIVE.toString(), response.getStatus()); + assertTrue("Atleast 3 events should have been received", + response.getEvents_received() >= numEventsReceivedBefore + 3); + assertTrue("we do not turn off disableHmsSync for table testTblName1", + response.getEvents_skipped() >= numEventsSkippedBefore); + TEventProcessorMetricsSummaryResponse summaryResponse = + catalog_.getEventProcessorSummary(); + assertNotNull(summaryResponse); + assertTrue(response.getLast_synced_event_id() > lastEventSyncId); + + // invalidate the table and the table will be IncompleteTable, then + // ALTER events will be skipped. + catalog_.invalidateTableIfExists(TEST_DB_NAME, testTblName1); + //event 4 alter table + alterTableAddCol(testTblName1, "newCol", "int", "no decription"); + // event 5 alter partition + partitionVals.clear(); + partitionVals.add(Arrays.asList("1")); + partitionVals.add(Arrays.asList("2")); + partitionVals.add(Arrays.asList("3")); + String newLocation = "/path/to/new_location/"; + alterPartitions(testTblName1, partitionVals, newLocation); + eventsProcessor_.processEvents(); + response = eventsProcessor_.getEventProcessorMetrics(); + assertEquals(EventProcessorStatus.ACTIVE.toString(), response.getStatus()); + assertTrue("two more events should have been received", + response.getEvents_received() >= numEventsReceivedBefore + 5); + assertTrue("we do not turn off disableHmsSync for table testTblName1", + response.getEvents_skipped() >= numEventsSkippedBefore); + + // create with transaction table + //event 6 + createTransactionalTable(TEST_DB_NAME, testTblName2, true); + partitionVals.clear(); + partitionVals.add(Arrays.asList("1")); + partitionVals.add(Arrays.asList("2")); + partitionVals.add(Arrays.asList("3")); + String anotherNewLocation = "/path/to/another_new_location/"; + //event 7 + alterPartitions(testTblName1, partitionVals, anotherNewLocation); + eventsProcessor_.processEvents(); + response = eventsProcessor_.getEventProcessorMetrics(); + assertEquals(EventProcessorStatus.ACTIVE.toString(), response.getStatus()); + assertTrue("two more events should have been received", + response.getEvents_received() >= numEventsReceivedBefore + 7); + assertTrue("we do not turn off disableHmsSync for table testTblName1", + response.getEvents_skipped() >= numEventsSkippedBefore); + } /** * Test makes sure that the event metrics are not set when event processor is not active */
