This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 73430a2cd IMPALA-12399: Add filter to skip OPEN_TXN events from HMS
73430a2cd is described below
commit 73430a2cdcb4be876cd73303a664414c3f94f9fd
Author: Venu Reddy <[email protected]>
AuthorDate: Sun Aug 27 15:39:10 2023 +0530
IMPALA-12399: Add filter to skip OPEN_TXN events from HMS
Notification events like OPEN_TXN are ignored on catalogd
MetastoreEventsProcessor. So, we can pass eventTypeSkipList with OPEN_TXN
in NotificationEventRequest while invoking get_next_notification() to
avoid reading such notification messages from metastore and then ignoring
on catalogd. OPEN_TXN event being more frequent(received even upon
describe table operation from beeline), we can significantly reduce
unwanted processing on both hive metastore and catalogd. Catalogd reads
events in batches of EVENTS_BATCH_SIZE_PER_RPC, skipping such unnecessary
events can help catchup the events faster.
Testing:
- Manually tested in cluster and added a testcase
Change-Id: Id2f2e1040abce0b00a8a07bfe8b46afcd98290bf
Reviewed-on: http://gerrit.cloudera.org:8080/20427
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Sai Hemanth Gantasala <[email protected]>
Reviewed-by: Quanlong Huang <[email protected]>
---
.../java/org/apache/impala/compat/MetastoreShim.java | 1 +
.../impala/catalog/events/MetastoreEventsProcessor.java | 5 +++++
.../catalog/events/MetastoreEventsProcessorTest.java | 17 +++++++++++++++++
3 files changed, 23 insertions(+)
diff --git
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 71bac5179..d36b46444 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -586,6 +586,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
*/
public static NotificationEventResponse getNextNotification(IMetaStoreClient
msClient,
NotificationEventRequest eventRequest) throws TException {
+
eventRequest.setEventTypeSkipList(MetastoreEventsProcessor.getEventSkipList());
return msClient.getThriftClient().get_next_notification(eventRequest);
}
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index c392ef980..d3debde25 100644
---
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -265,6 +265,9 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
private static final long SECOND_IN_NANOS = 1000 * 1000 * 1000L;
+ // List of event types to skip while fetching notification events from
metastore
+ private static final List<String> EVENT_SKIP_LIST =
Arrays.asList("OPEN_TXN");
+
/**
* Wrapper around {@link
*
MetastoreEventsProcessor#getNextMetastoreEventsInBatches(CatalogServiceCatalog,
@@ -1126,4 +1129,6 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
public static MessageDeserializer getMessageDeserializer() {
return MESSAGE_DESERIALIZER;
}
+
+ public static List<String> getEventSkipList() { return EVENT_SKIP_LIST; }
}
diff --git
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index bfd44f86c..b1dd8b097 100644
---
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -3259,6 +3259,23 @@ public class MetastoreEventsProcessorTest {
}
}
+ /**
+ * Test whether open transaction event is skipped while fetching
notification events
+ * @throws Exception
+ */
+ @Test
+ public void testSkipFetchOpenTransactionEvent() throws Exception {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ // Make an empty transaction
+ long txnId = MetastoreShim.openTransaction(client.getHiveClient());
+ MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+ }
+ List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+ assertEquals(1, events.size());
+ assertEquals(MetastoreEventType.COMMIT_TXN,
+ MetastoreEventType.from(events.get(0).getEventType()));
+ }
+
private void createDatabase(String catName, String dbName,
Map<String, String> params) throws TException {
try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {