This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2b9a0ed53c3 Shutdown tasks on TaskRunner asynchronously (#18471)
2b9a0ed53c3 is described below
commit 2b9a0ed53c36736fb6a9e23ce28867c5aaf81836
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Sep 3 18:27:59 2025 +0530
Shutdown tasks on TaskRunner asynchronously (#18471)
Changes:
- Invoke `taskRunner.shutdown()` on `TaskQueue.taskCompleteCallbackExec` to
avoid blocking queue management
- Add concurrency tests
---
.../apache/druid/indexing/overlord/TaskQueue.java | 34 ++---
.../overlord/TaskQueueConcurrencyTest.java | 160 +++++++++++++++++----
2 files changed, 153 insertions(+), 41 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index d7024eabfe5..4686aec4d98 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -403,13 +403,13 @@ public class TaskQueue
updateTaskEntry(taskId, entry -> {
if (entry == null) {
unknownTaskIds.add(taskId);
- shutdownUnknownTaskOnRunner(taskId);
+ shutdownTaskOnRunner(taskId, "Task is not present in queue
anymore.");
} else {
runnerTaskFutures.put(taskId, workItem.getResult());
}
});
}
- log.info("Cleaned up [%,d] tasks on task runner with IDs[%s].",
unknownTaskIds.size(), unknownTaskIds);
+ log.info("Notified task runner to clean up [%,d] tasks with IDs[%s].",
unknownTaskIds.size(), unknownTaskIds);
// Attain futures for all active tasks (assuming they are ready to run).
// Copy tasks list, as notifyStatus may modify it.
@@ -472,14 +472,21 @@ public class TaskQueue
}
}
- private void shutdownUnknownTaskOnRunner(String taskId)
+ /**
+ * Triggers a shutdown of the given Task on the {@link TaskRunner}.
+ * The shutdown is invoked on {@link #taskCompleteCallbackExecutor} to avoid
+ * blocking critical paths as task shutdown on the runner may sometimes be
slow.
+ */
+ private void shutdownTaskOnRunner(String taskId, String reasonFormat,
Object... args)
{
- try {
- taskRunner.shutdown(taskId, "Task is not present in queue anymore.");
- }
- catch (Exception e) {
- log.warn(e, "TaskRunner failed to clean up task[%s].", taskId);
- }
+ taskCompleteCallbackExecutor.submit(() -> {
+ try {
+ taskRunner.shutdown(taskId, reasonFormat, args);
+ }
+ catch (Throwable e) {
+ log.error(e, "TaskRunner failed to cleanup task[%s] after
completion.", taskId);
+ }
+ });
}
private boolean isTaskPending(Task task)
@@ -734,14 +741,7 @@ public class TaskQueue
.emit();
}
- // Inform taskRunner that this task can be shut down.
- try {
- taskRunner.shutdown(task.getId(), reasonFormat, args);
- }
- catch (Throwable e) {
- // If task runner shutdown fails, continue with the task shutdown
routine.
- log.warn(e, "TaskRunner failed to cleanup task after completion: %s",
task.getId());
- }
+ shutdownTaskOnRunner(task.getId(), reasonFormat, args);
removeTaskLock(task);
requestManagement();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
index 6429d6b3edd..288ea3a968d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.NoopTask;
@@ -59,6 +60,7 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
private TaskQueue taskQueue;
private Map<String, UpdateAction> threadToUpdateAction;
+ private CountDownLatch taskRunnerShutdownLatch;
@Override
public void setUpIngestionTestBase() throws IOException
@@ -79,6 +81,13 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
{
return SettableFuture.create();
}
+
+ @Override
+ public void shutdown(String taskid, String reason)
+ {
+ awaitTaskRunnerShutdown();
+ super.shutdown(taskid, reason);
+ }
},
createActionClientFactory(),
getLockbox(),
@@ -98,7 +107,7 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
? super.addOrUpdateTaskEntry(taskId, updateOperation)
: super.addOrUpdateTaskEntry(
taskId,
- existing -> updateAction.critical.perform(() ->
updateOperation.apply(existing))
+ existing -> updateAction.runCritical(() ->
updateOperation.apply(existing))
);
}
@@ -112,7 +121,7 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
if (updateAction == null) {
super.setActive(active);
} else {
- updateAction.critical.perform(() -> {
+ updateAction.runCritical(() -> {
super.setActive(active);
return 0;
});
@@ -472,6 +481,67 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
Assert.assertEquals(Optional.absent(),
taskQueue.getActiveTask(task1.getId()));
}
+ @Test
+ public void test_shutdown_blocks_syncFromStorage_forSameTaskId()
+ {
+ taskQueue.setActive(true);
+
+ final Task task1 = createTask("t1");
+ taskQueue.add(task1);
+
+ // Remove the task from storage so that sync tries to remove it from the
queue
+ getTaskStorage().setStatus(TaskStatus.success(task1.getId()));
+
getTaskStorage().removeTasksOlderThan(DateTimes.nowUtc().plusDays(1).getMillis());
+
+ ActionVerifier.verifyThat(
+ update(
+ () -> taskQueue.shutdown(task1.getId(), "killed")
+ ).withEndState(
+ () -> Assert.assertEquals(List.of(task1), taskQueue.getTasks())
+ )
+ ).blocks(
+ update(
+ () -> taskQueue.syncFromStorage()
+ ).withEndState(
+ () -> Assert.assertTrue(taskQueue.getTasks().isEmpty())
+ )
+ );
+ }
+
+ @Test
+ public void test_shutdown_doesNotWait_forTaskToShutdownOnRunner()
+ {
+ taskQueue.setActive(true);
+
+ final Task task1 = createTask("t1");
+ taskQueue.add(task1);
+ Assert.assertEquals(List.of(task1), taskQueue.getTasks());
+
+ // Keep the task shutdown blocked on the TaskRunner
+ taskRunnerShutdownLatch = new CountDownLatch(1);
+
+ // Verify that shutdown on TaskQueue finishes immediately
+ taskQueue.shutdown(task1.getId(), "killed");
+
+ Optional<TaskInfo<Task, TaskStatus>> taskInfo =
taskQueue.getActiveTaskInfo(task1.getId());
+ Assert.assertTrue(taskInfo.isPresent());
+ Assert.assertEquals(TaskStatus.failure(task1.getId(), "killed"),
taskInfo.get().getStatus());
+
+ taskRunnerShutdownLatch.countDown();
+ }
+
+ private void awaitTaskRunnerShutdown()
+ {
+ if (taskRunnerShutdownLatch != null) {
+ try {
+ taskRunnerShutdownLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
private UpdateAction update(Action action)
{
return new UpdateAction(action, threadToUpdateAction::put);
@@ -582,6 +652,9 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
return this;
}
+ /**
+ * Starts this update action and returns when it is finished.
+ */
void perform()
{
startNotifier.onUpdateStart(Thread.currentThread().getName(), this);
@@ -595,15 +668,58 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
}
}
+ /**
+ * Runs the given computation in the critical section of this update
action.
+ *
+ * @return When the critical computation is complete.
+ */
+ <V> V runCritical(Supplier<V> updateComputation)
+ {
+ return critical.perform(updateComputation);
+ }
+
+ void startCritical()
+ {
+ critical.start.countDown();
+ }
+
+ void startAndFinishCritical()
+ {
+ startCritical();
+ waitUntilCriticalIsReadyToFinish();
+ finishCritical();
+ }
+
+ void finishCritical()
+ {
+ critical.finish.countDown();
+ }
+
void waitToFinishAndVerify()
{
waitFor(finished);
verifyAction.perform();
}
+
+ boolean isReadyToStartCritical()
+ {
+ return critical.isReadyToStart.getCount() == 0;
+ }
+
+ void waitUntilCriticalIsReadyToStart()
+ {
+ waitFor(critical.isReadyToStart);
+ }
+
+ void waitUntilCriticalIsReadyToFinish()
+ {
+ waitFor(critical.isReadyToFinish);
+ }
}
/**
- * Verifies thread-safety between two actions.
+ * Ensures thread-safety between two update actions by verifying that an
action
+ * {@link #update1} blocks or does not block another action {@code update2}.
*/
private static class ActionVerifier
{
@@ -626,37 +742,37 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
// Start update 1 and wait for it to enter critical section
executor.submit(update1::perform);
- waitFor(update1.critical.isReadyToStart);
+ update1.waitUntilCriticalIsReadyToStart();
+ // Start update 2 and wait for some time
executor.submit(update2::perform);
-
- // Wait for some time and verify that update 2 critical has not started
yet
try {
Thread.sleep(1000);
}
catch (Exception e) {
throw new RuntimeException(e);
}
- Assert.assertEquals(1, update2.critical.isReadyToStart.getCount());
- update1.critical.start.countDown();
+ // Verify that update 2 is not ready to start critical section yet
+ Assert.assertFalse(update2.isReadyToStartCritical());
+
+ update1.startCritical();
// Wait for update 1 critical to reach finish
- // and verify that update 2 critical has not started yet
- waitFor(update1.critical.isReadyToFinish);
- Assert.assertEquals(1, update2.critical.isReadyToStart.getCount());
+ // and verify that update 2 critical is not ready to start yet
+ update1.waitUntilCriticalIsReadyToFinish();
+ Assert.assertFalse(update2.isReadyToStartCritical());
// Finish update 1 critical and verify that update 2 is now ready to
start
- update1.critical.finish.countDown();
- waitFor(update2.critical.isReadyToStart);
+ update1.finishCritical();
+ update2.waitUntilCriticalIsReadyToStart();
+ Assert.assertTrue(update2.isReadyToStartCritical());
// Finish update 1
update1.waitToFinishAndVerify();
// Start and finish update2
- update2.critical.start.countDown();
- waitFor(update2.critical.isReadyToFinish);
- update2.critical.finish.countDown();
+ update2.startAndFinishCritical();
update2.waitToFinishAndVerify();
executor.shutdownNow();
@@ -672,22 +788,18 @@ public class TaskQueueConcurrencyTest extends
IngestionTestBase
// Start update 1 and wait for it to enter critical section
executor.submit(update1::perform);
- waitFor(update1.critical.isReadyToStart);
+ update1.waitUntilCriticalIsReadyToStart();
// Start update2 and verify that it has also entered critical section
executor.submit(update2::perform);
- waitFor(update2.critical.isReadyToStart);
+ update2.waitUntilCriticalIsReadyToStart();
// Finish update2 to prove that it is not blocked by update1
- update2.critical.start.countDown();
- waitFor(update2.critical.isReadyToFinish);
- update2.critical.finish.countDown();
+ update2.startAndFinishCritical();
update2.waitToFinishAndVerify();
// Start and finish update1
- update1.critical.start.countDown();
- waitFor(update1.critical.isReadyToFinish);
- update1.critical.finish.countDown();
+ update1.startAndFinishCritical();
update1.waitToFinishAndVerify();
executor.shutdownNow();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]