This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new adf6873ee IMPALA-12474: Update latest metastore event id in a separate
thread
adf6873ee is described below
commit adf6873ee71faed3cd34788a5042bd8684cebdca
Author: Venu Reddy <[email protected]>
AuthorDate: Tue Oct 10 02:54:32 2023 +0530
IMPALA-12474: Update latest metastore event id in a separate thread
At present, update latest metastore event id task can be deferred
due to event processing task because of the single thread scheduled
executor. This patch uses two separate single thread scheduled
executors for event processing and updating latest metastore event
id.
Event Processing task is currently scheduled with fixed delay.
Processing time of an execution can affects the next schedule.
This patch uses schedule at fixed rate in order to expedite the
event processsing. Same task do not execute concurrently.
Tests:
- Verified manually and executed all the tests
Change-Id: I38fbacf8c2278133ee68ac18dba549cc86f91319
Reviewed-on: http://gerrit.cloudera.org:8080/20551
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../catalog/events/MetastoreEventsProcessor.java | 40 ++++++++++++++--------
1 file changed, 26 insertions(+), 14 deletions(-)
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 866c264e6..0e0e54c4f 100644
---
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -533,9 +533,20 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
protected final CatalogServiceCatalog catalog_;
// scheduler daemon thread executor for processing events at given frequency
- private final ScheduledExecutorService scheduler_ = Executors
- .newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat("MetastoreEventsProcessor").build());
+ private final ScheduledExecutorService processEventsScheduler_ =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("MetastoreEventsProcessor-ProcessEvents")
+ .build());
+
+ // scheduler daemon thread executor to update the latest event id at given
frequency
+ private final ScheduledExecutorService updateEventIdScheduler_ =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("MetastoreEventsProcessor-UpdateEventId")
+ .build());
// metrics registry to keep track of metrics related to event processing
//TODO create a separate singleton class which wraps around this so that we
don't
@@ -683,12 +694,12 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
Preconditions.checkState(pollingFrequencyInSec_ > 0);
LOG.info(String.format("Starting metastore event polling with interval %d
seconds.",
pollingFrequencyInSec_));
- scheduler_.scheduleWithFixedDelay(this::processEvents,
pollingFrequencyInSec_,
- pollingFrequencyInSec_, TimeUnit.SECONDS);
+ processEventsScheduler_.scheduleAtFixedRate(this ::processEvents,
+ pollingFrequencyInSec_, pollingFrequencyInSec_, TimeUnit.SECONDS);
// Update latestEventId in another thread in case that the processEvents()
thread is
// blocked by slow metadata reloading or waiting for table locks.
- scheduler_.scheduleWithFixedDelay(this::updateLatestEventId,
pollingFrequencyInSec_,
- pollingFrequencyInSec_, TimeUnit.SECONDS);
+ updateEventIdScheduler_.scheduleAtFixedRate(this ::updateLatestEventId,
+ pollingFrequencyInSec_, pollingFrequencyInSec_, TimeUnit.SECONDS);
}
/**
@@ -766,10 +777,10 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
*/
@Override
public synchronized void shutdown() {
- Preconditions.checkNotNull(scheduler_);
Preconditions.checkState(eventProcessorStatus_ !=
EventProcessorStatus.STOPPED,
"Event processing is already stopped");
- shutdownAndAwaitTermination();
+ shutdownAndAwaitTermination(processEventsScheduler_);
+ shutdownAndAwaitTermination(updateEventIdScheduler_);
updateStatus(EventProcessorStatus.STOPPED);
LOG.info("Metastore event processing stopped.");
}
@@ -778,21 +789,22 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
* Attempts to cleanly shutdown the scheduler pool. If the pool does not
shutdown
* within timeout, does a force shutdown which might interrupt currently
running tasks.
*/
- private synchronized void shutdownAndAwaitTermination() {
- scheduler_.shutdown(); // disable new tasks from being submitted
+ private synchronized void
shutdownAndAwaitTermination(ScheduledExecutorService ses) {
+ Preconditions.checkNotNull(ses);
+ ses.shutdown(); // disable new tasks from being submitted
try {
// wait for 10 secs for scheduler to complete currently running tasks
- if (!scheduler_.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT,
TimeUnit.SECONDS)) {
+ if (!ses.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
{
// executor couldn't terminate and timed-out, force the termination
LOG.info(String.format("Scheduler pool did not terminate within %d
seconds. "
+ "Attempting to stop currently running tasks",
SCHEDULER_SHUTDOWN_TIMEOUT));
- scheduler_.shutdownNow();
+ ses.shutdownNow();
}
} catch (InterruptedException e) {
// current thread interrupted while pool was waiting for termination
// issue a shutdownNow before returning to cancel currently running tasks
LOG.info("Received interruptedException. Terminating currently running
tasks.", e);
- scheduler_.shutdownNow();
+ ses.shutdownNow();
}
}