This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b9a0ed53c3 Shutdown tasks on TaskRunner asynchronously (#18471)
2b9a0ed53c3 is described below

commit 2b9a0ed53c36736fb6a9e23ce28867c5aaf81836
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Sep 3 18:27:59 2025 +0530

    Shutdown tasks on TaskRunner asynchronously (#18471)
    
    Changes:
    - Invoke `taskRunner.shutdown()` on `TaskQueue.taskCompleteCallbackExec` to 
avoid blocking queue management
    - Add concurrency tests
---
 .../apache/druid/indexing/overlord/TaskQueue.java  |  34 ++---
 .../overlord/TaskQueueConcurrencyTest.java         | 160 +++++++++++++++++----
 2 files changed, 153 insertions(+), 41 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index d7024eabfe5..4686aec4d98 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -403,13 +403,13 @@ public class TaskQueue
       updateTaskEntry(taskId, entry -> {
         if (entry == null) {
           unknownTaskIds.add(taskId);
-          shutdownUnknownTaskOnRunner(taskId);
+          shutdownTaskOnRunner(taskId, "Task is not present in queue 
anymore.");
         } else {
           runnerTaskFutures.put(taskId, workItem.getResult());
         }
       });
     }
-    log.info("Cleaned up [%,d] tasks on task runner with IDs[%s].", 
unknownTaskIds.size(), unknownTaskIds);
+    log.info("Notified task runner to clean up [%,d] tasks with IDs[%s].", 
unknownTaskIds.size(), unknownTaskIds);
 
     // Attain futures for all active tasks (assuming they are ready to run).
     // Copy tasks list, as notifyStatus may modify it.
@@ -472,14 +472,21 @@ public class TaskQueue
     }
   }
 
-  private void shutdownUnknownTaskOnRunner(String taskId)
+  /**
+   * Triggers a shutdown of the given Task on the {@link TaskRunner}.
+   * The shutdown is invoked on {@link #taskCompleteCallbackExecutor} to avoid
+   * blocking critical paths as task shutdown on the runner may sometimes be 
slow.
+   */
+  private void shutdownTaskOnRunner(String taskId, String reasonFormat, 
Object... args)
   {
-    try {
-      taskRunner.shutdown(taskId, "Task is not present in queue anymore.");
-    }
-    catch (Exception e) {
-      log.warn(e, "TaskRunner failed to clean up task[%s].", taskId);
-    }
+    taskCompleteCallbackExecutor.submit(() -> {
+      try {
+        taskRunner.shutdown(taskId, reasonFormat, args);
+      }
+      catch (Throwable e) {
+        log.error(e, "TaskRunner failed to cleanup task[%s] after 
completion.", taskId);
+      }
+    });
   }
 
   private boolean isTaskPending(Task task)
@@ -734,14 +741,7 @@ public class TaskQueue
          .emit();
     }
 
-    // Inform taskRunner that this task can be shut down.
-    try {
-      taskRunner.shutdown(task.getId(), reasonFormat, args);
-    }
-    catch (Throwable e) {
-      // If task runner shutdown fails, continue with the task shutdown 
routine.
-      log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
-    }
+    shutdownTaskOnRunner(task.getId(), reasonFormat, args);
 
     removeTaskLock(task);
     requestManagement();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
index 6429d6b3edd..288ea3a968d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.IngestionTestBase;
 import org.apache.druid.indexing.common.task.NoopTask;
@@ -59,6 +60,7 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
   private TaskQueue taskQueue;
 
   private Map<String, UpdateAction> threadToUpdateAction;
+  private CountDownLatch taskRunnerShutdownLatch;
 
   @Override
   public void setUpIngestionTestBase() throws IOException
@@ -79,6 +81,13 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
           {
             return SettableFuture.create();
           }
+
+          @Override
+          public void shutdown(String taskid, String reason)
+          {
+            awaitTaskRunnerShutdown();
+            super.shutdown(taskid, reason);
+          }
         },
         createActionClientFactory(),
         getLockbox(),
@@ -98,7 +107,7 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
                ? super.addOrUpdateTaskEntry(taskId, updateOperation)
                : super.addOrUpdateTaskEntry(
                    taskId,
-                   existing -> updateAction.critical.perform(() -> 
updateOperation.apply(existing))
+                   existing -> updateAction.runCritical(() -> 
updateOperation.apply(existing))
                );
       }
 
@@ -112,7 +121,7 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
         if (updateAction == null) {
           super.setActive(active);
         } else {
-          updateAction.critical.perform(() -> {
+          updateAction.runCritical(() -> {
             super.setActive(active);
             return 0;
           });
@@ -472,6 +481,67 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
     Assert.assertEquals(Optional.absent(), 
taskQueue.getActiveTask(task1.getId()));
   }
 
+  @Test
+  public void test_shutdown_blocks_syncFromStorage_forSameTaskId()
+  {
+    taskQueue.setActive(true);
+
+    final Task task1 = createTask("t1");
+    taskQueue.add(task1);
+
+    // Remove the task from storage so that sync tries to remove it from the 
queue
+    getTaskStorage().setStatus(TaskStatus.success(task1.getId()));
+    
getTaskStorage().removeTasksOlderThan(DateTimes.nowUtc().plusDays(1).getMillis());
+
+    ActionVerifier.verifyThat(
+        update(
+            () -> taskQueue.shutdown(task1.getId(), "killed")
+        ).withEndState(
+            () -> Assert.assertEquals(List.of(task1), taskQueue.getTasks())
+        )
+    ).blocks(
+        update(
+            () -> taskQueue.syncFromStorage()
+        ).withEndState(
+            () -> Assert.assertTrue(taskQueue.getTasks().isEmpty())
+        )
+    );
+  }
+
+  @Test
+  public void test_shutdown_doesNotWait_forTaskToShutdownOnRunner()
+  {
+    taskQueue.setActive(true);
+
+    final Task task1 = createTask("t1");
+    taskQueue.add(task1);
+    Assert.assertEquals(List.of(task1), taskQueue.getTasks());
+
+    // Keep the task shutdown blocked on the TaskRunner
+    taskRunnerShutdownLatch = new CountDownLatch(1);
+
+    // Verify that shutdown on TaskQueue finishes immediately
+    taskQueue.shutdown(task1.getId(), "killed");
+
+    Optional<TaskInfo<Task, TaskStatus>> taskInfo = 
taskQueue.getActiveTaskInfo(task1.getId());
+    Assert.assertTrue(taskInfo.isPresent());
+    Assert.assertEquals(TaskStatus.failure(task1.getId(), "killed"), 
taskInfo.get().getStatus());
+
+    taskRunnerShutdownLatch.countDown();
+  }
+
+  private void awaitTaskRunnerShutdown()
+  {
+    if (taskRunnerShutdownLatch != null) {
+      try {
+        taskRunnerShutdownLatch.await();
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
   private UpdateAction update(Action action)
   {
     return new UpdateAction(action, threadToUpdateAction::put);
@@ -582,6 +652,9 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
       return this;
     }
 
+    /**
+     * Starts this update action and returns when it is finished.
+     */
     void perform()
     {
       startNotifier.onUpdateStart(Thread.currentThread().getName(), this);
@@ -595,15 +668,58 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
       }
     }
 
+    /**
+     * Runs the given computation in the critical section of this update 
action.
+     *
+     * @return When the critical computation is complete.
+     */
+    <V> V runCritical(Supplier<V> updateComputation)
+    {
+      return critical.perform(updateComputation);
+    }
+
+    void startCritical()
+    {
+      critical.start.countDown();
+    }
+
+    void startAndFinishCritical()
+    {
+      startCritical();
+      waitUntilCriticalIsReadyToFinish();
+      finishCritical();
+    }
+
+    void finishCritical()
+    {
+      critical.finish.countDown();
+    }
+
     void waitToFinishAndVerify()
     {
       waitFor(finished);
       verifyAction.perform();
     }
+
+    boolean isReadyToStartCritical()
+    {
+      return critical.isReadyToStart.getCount() == 0;
+    }
+
+    void waitUntilCriticalIsReadyToStart()
+    {
+      waitFor(critical.isReadyToStart);
+    }
+
+    void waitUntilCriticalIsReadyToFinish()
+    {
+      waitFor(critical.isReadyToFinish);
+    }
   }
 
   /**
-   * Verifies thread-safety between two actions.
+   * Ensures thread-safety between two update actions by verifying that an 
action
+   * {@link #update1} blocks or does not block another action {@code update2}.
    */
   private static class ActionVerifier
   {
@@ -626,37 +742,37 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
 
       // Start update 1 and wait for it to enter critical section
       executor.submit(update1::perform);
-      waitFor(update1.critical.isReadyToStart);
+      update1.waitUntilCriticalIsReadyToStart();
 
+      // Start update 2 and wait for some time
       executor.submit(update2::perform);
-
-      // Wait for some time and verify that update 2 critical has not started 
yet
       try {
         Thread.sleep(1000);
       }
       catch (Exception e) {
         throw new RuntimeException(e);
       }
-      Assert.assertEquals(1, update2.critical.isReadyToStart.getCount());
 
-      update1.critical.start.countDown();
+      // Verify that update 2 is not ready to start critical section yet
+      Assert.assertFalse(update2.isReadyToStartCritical());
+
+      update1.startCritical();
 
       // Wait for update 1 critical to reach finish
-      // and verify that update 2 critical has not started yet
-      waitFor(update1.critical.isReadyToFinish);
-      Assert.assertEquals(1, update2.critical.isReadyToStart.getCount());
+      // and verify that update 2 critical is not ready to start yet
+      update1.waitUntilCriticalIsReadyToFinish();
+      Assert.assertFalse(update2.isReadyToStartCritical());
 
       // Finish update 1 critical and verify that update 2 is now ready to 
start
-      update1.critical.finish.countDown();
-      waitFor(update2.critical.isReadyToStart);
+      update1.finishCritical();
+      update2.waitUntilCriticalIsReadyToStart();
+      Assert.assertTrue(update2.isReadyToStartCritical());
 
       // Finish update 1
       update1.waitToFinishAndVerify();
 
       // Start and finish update2
-      update2.critical.start.countDown();
-      waitFor(update2.critical.isReadyToFinish);
-      update2.critical.finish.countDown();
+      update2.startAndFinishCritical();
       update2.waitToFinishAndVerify();
 
       executor.shutdownNow();
@@ -672,22 +788,18 @@ public class TaskQueueConcurrencyTest extends 
IngestionTestBase
 
       // Start update 1 and wait for it to enter critical section
       executor.submit(update1::perform);
-      waitFor(update1.critical.isReadyToStart);
+      update1.waitUntilCriticalIsReadyToStart();
 
       // Start update2 and verify that it has also entered critical section
       executor.submit(update2::perform);
-      waitFor(update2.critical.isReadyToStart);
+      update2.waitUntilCriticalIsReadyToStart();
 
       // Finish update2 to prove that it is not blocked by update1
-      update2.critical.start.countDown();
-      waitFor(update2.critical.isReadyToFinish);
-      update2.critical.finish.countDown();
+      update2.startAndFinishCritical();
       update2.waitToFinishAndVerify();
 
       // Start and finish update1
-      update1.critical.start.countDown();
-      waitFor(update1.critical.isReadyToFinish);
-      update1.critical.finish.countDown();
+      update1.startAndFinishCritical();
       update1.waitToFinishAndVerify();
 
       executor.shutdownNow();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to