This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e78c8369044 Change BasePeriodicTask to use a lock object for start /
stop rather than synchronizing the methods (#16613)
e78c8369044 is described below
commit e78c83690447e7e79d7f5daf27d24bf6caca11a0
Author: Sonam Mandal <[email protected]>
AuthorDate: Fri Aug 15 19:01:18 2025 -0700
Change BasePeriodicTask to use a lock object for start / stop rather than
synchronizing the methods (#16613)
---
.../pinot/core/periodictask/BasePeriodicTask.java | 87 ++++++++++++----------
1 file changed, 48 insertions(+), 39 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
index 5179d9a6778..6ef79b616ea 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/periodictask/BasePeriodicTask.java
@@ -42,6 +42,9 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
protected final long _initialDelayInSeconds;
protected final ReentrantLock _runLock;
+ // Lock used to synchronize life-cycle functions
+ private final Object _lifeCycleLock;
+
private volatile boolean _started;
private volatile boolean _running;
@@ -61,6 +64,7 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
_intervalInSeconds = runFrequencyInSeconds;
_initialDelayInSeconds = initialDelayInSeconds;
_runLock = new ReentrantLock();
+ _lifeCycleLock = new Object();
}
@Override
@@ -99,21 +103,23 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
* This method sets {@code started} flag to true.
*/
@Override
- public final synchronized void start() {
- if (_started) {
- LOGGER.warn("Task: {} is already started", _taskName);
- return;
- }
+ public final void start() {
+ synchronized (_lifeCycleLock) {
+ if (_started) {
+ LOGGER.warn("Task: {} is already started", _taskName);
+ return;
+ }
- try {
- setUpTask();
- } catch (Exception e) {
- LOGGER.error("Caught exception while setting up task: {}", _taskName, e);
- }
+ try {
+ setUpTask();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while setting up task: {}", _taskName,
e);
+ }
- // mark _started as true only after state has completely initialized, so
that run method doesn't end up seeing
- // partially initialized state.
- _started = true;
+ // mark _started as true only after state has completely initialized, so
that run method doesn't end up seeing
+ // partially initialized state.
+ _started = true;
+ }
}
/**
@@ -177,35 +183,38 @@ public abstract class BasePeriodicTask implements
PeriodicTask {
* seconds until the task finishes.
*/
@Override
- public final synchronized void stop() {
- if (!_started) {
- LOGGER.warn("Task: {} is not started", _taskName);
- return;
- }
- long startTimeMs = System.currentTimeMillis();
- _started = false;
-
- try {
- // check if task is done running, or wait for the task to get done, by
trying to acquire runLock.
- if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS,
TimeUnit.MILLISECONDS)) {
- LOGGER
- .warn("Task {} could not be stopped within timeout of {}ms",
_taskName, MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
- } else {
- LOGGER.info("Task {} successfully stopped in {}ms", _taskName,
System.currentTimeMillis() - startTimeMs);
+ public final void stop() {
+ synchronized (_lifeCycleLock) {
+ if (!_started) {
+ LOGGER.warn("Task: {} is not started", _taskName);
+ return;
}
- } catch (InterruptedException ie) {
- LOGGER.error("Caught InterruptedException while waiting for task: {} to
finish", _taskName);
- Thread.currentThread().interrupt();
- } finally {
- if (_runLock.isHeldByCurrentThread()) {
- _runLock.unlock();
+ long startTimeMs = System.currentTimeMillis();
+ _started = false;
+
+ try {
+ // check if task is done running, or wait for the task to get done, by
trying to acquire runLock.
+ if (!_runLock.tryLock(MAX_PERIODIC_TASK_STOP_TIME_MILLIS,
TimeUnit.MILLISECONDS)) {
+ LOGGER
+ .warn("Task {} could not be stopped within timeout of {}ms",
_taskName,
+ MAX_PERIODIC_TASK_STOP_TIME_MILLIS);
+ } else {
+ LOGGER.info("Task {} successfully stopped in {}ms", _taskName,
System.currentTimeMillis() - startTimeMs);
+ }
+ } catch (InterruptedException ie) {
+ LOGGER.error("Caught InterruptedException while waiting for task: {}
to finish", _taskName);
+ Thread.currentThread().interrupt();
+ } finally {
+ if (_runLock.isHeldByCurrentThread()) {
+ _runLock.unlock();
+ }
}
- }
- try {
- cleanUpTask();
- } catch (Exception e) {
- LOGGER.error("Caught exception while cleaning up task: {}", _taskName,
e);
+ try {
+ cleanUpTask();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while cleaning up task: {}", _taskName,
e);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]