This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new adf6873ee IMPALA-12474: Update latest metastore event id in a separate 
thread
adf6873ee is described below

commit adf6873ee71faed3cd34788a5042bd8684cebdca
Author: Venu Reddy <[email protected]>
AuthorDate: Tue Oct 10 02:54:32 2023 +0530

    IMPALA-12474: Update latest metastore event id in a separate thread
    
    At present, update latest metastore event id task can be deferred
    due to event processing task because of the single thread scheduled
    executor. This patch uses two separate single thread scheduled
    executors for event processing and updating latest metastore event
    id.
    
    Event Processing task is currently scheduled with fixed delay.
    Processing time of an execution can affects the next schedule.
    This patch uses schedule at fixed rate in order to expedite the
    event processsing. Same task do not execute concurrently.
    
    Tests:
    - Verified manually and executed all the tests
    
    Change-Id: I38fbacf8c2278133ee68ac18dba549cc86f91319
    Reviewed-on: http://gerrit.cloudera.org:8080/20551
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../catalog/events/MetastoreEventsProcessor.java   | 40 ++++++++++++++--------
 1 file changed, 26 insertions(+), 14 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 866c264e6..0e0e54c4f 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -533,9 +533,20 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   protected final CatalogServiceCatalog catalog_;
 
   // scheduler daemon thread executor for processing events at given frequency
-  private final ScheduledExecutorService scheduler_ = Executors
-      .newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setDaemon(true)
-          .setNameFormat("MetastoreEventsProcessor").build());
+  private final ScheduledExecutorService processEventsScheduler_ =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat("MetastoreEventsProcessor-ProcessEvents")
+              .build());
+
+  // scheduler daemon thread executor to update the latest event id at given 
frequency
+  private final ScheduledExecutorService updateEventIdScheduler_ =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setNameFormat("MetastoreEventsProcessor-UpdateEventId")
+              .build());
 
   // metrics registry to keep track of metrics related to event processing
   //TODO create a separate singleton class which wraps around this so that we 
don't
@@ -683,12 +694,12 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     Preconditions.checkState(pollingFrequencyInSec_ > 0);
     LOG.info(String.format("Starting metastore event polling with interval %d 
seconds.",
         pollingFrequencyInSec_));
-    scheduler_.scheduleWithFixedDelay(this::processEvents, 
pollingFrequencyInSec_,
-        pollingFrequencyInSec_, TimeUnit.SECONDS);
+    processEventsScheduler_.scheduleAtFixedRate(this ::processEvents,
+        pollingFrequencyInSec_, pollingFrequencyInSec_, TimeUnit.SECONDS);
     // Update latestEventId in another thread in case that the processEvents() 
thread is
     // blocked by slow metadata reloading or waiting for table locks.
-    scheduler_.scheduleWithFixedDelay(this::updateLatestEventId, 
pollingFrequencyInSec_,
-        pollingFrequencyInSec_, TimeUnit.SECONDS);
+    updateEventIdScheduler_.scheduleAtFixedRate(this ::updateLatestEventId,
+        pollingFrequencyInSec_, pollingFrequencyInSec_, TimeUnit.SECONDS);
   }
 
   /**
@@ -766,10 +777,10 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
    */
   @Override
   public synchronized void shutdown() {
-    Preconditions.checkNotNull(scheduler_);
     Preconditions.checkState(eventProcessorStatus_ != 
EventProcessorStatus.STOPPED,
         "Event processing is already stopped");
-    shutdownAndAwaitTermination();
+    shutdownAndAwaitTermination(processEventsScheduler_);
+    shutdownAndAwaitTermination(updateEventIdScheduler_);
     updateStatus(EventProcessorStatus.STOPPED);
     LOG.info("Metastore event processing stopped.");
   }
@@ -778,21 +789,22 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
    * Attempts to cleanly shutdown the scheduler pool. If the pool does not 
shutdown
    * within timeout, does a force shutdown which might interrupt currently 
running tasks.
    */
-  private synchronized void shutdownAndAwaitTermination() {
-    scheduler_.shutdown(); // disable new tasks from being submitted
+  private synchronized void 
shutdownAndAwaitTermination(ScheduledExecutorService ses) {
+    Preconditions.checkNotNull(ses);
+    ses.shutdown(); // disable new tasks from being submitted
     try {
       // wait for 10 secs for scheduler to complete currently running tasks
-      if (!scheduler_.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT, 
TimeUnit.SECONDS)) {
+      if (!ses.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) 
{
         // executor couldn't terminate and timed-out, force the termination
         LOG.info(String.format("Scheduler pool did not terminate within %d 
seconds. "
             + "Attempting to stop currently running tasks", 
SCHEDULER_SHUTDOWN_TIMEOUT));
-        scheduler_.shutdownNow();
+        ses.shutdownNow();
       }
     } catch (InterruptedException e) {
       // current thread interrupted while pool was waiting for termination
       // issue a shutdownNow before returning to cancel currently running tasks
       LOG.info("Received interruptedException. Terminating currently running 
tasks.", e);
-      scheduler_.shutdownNow();
+      ses.shutdownNow();
     }
   }
 

Reply via email to