This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f5ec8be9731 perf: Submit tasks to TaskRunner order by task priority
(#19203)
f5ec8be9731 is described below
commit f5ec8be97318db0cc33dbb2473c8a075c30fa617
Author: jtuglu1 <[email protected]>
AuthorDate: Fri Mar 27 09:35:15 2026 -0700
perf: Submit tasks to TaskRunner order by task priority (#19203)
Ensure active tasks are (re)-submitted to TaskRunner in order of priority.
With the way things are currently structured, TaskQueue and TaskRunner both
introduce their own delays to the scheduling of a task as they are both queues
in their own ways. This patch attempts to minimize the HOL-blocking delay
introduced by the TaskQueue and #18851 will help reduce the slowness on the
TaskRunner. This helps enormously for large task volume cases (O(5k) tasks+)
where lots of low-priority tasks [...]
Notably, this does do a sort every time startPendingTasksOnRunner is
called, however, given the activeTasks should be on the order of O(10k) tasks
and the comparator is comparing integers, this should be relatively cheap
(empirically, this has never shown up in flamegraph as TaskQueue is bound by
other things). When #18851 is merged (and priority based running is introduced)
this may have less of an effect. Storing activeTasks in an ordered container
might be nice, but given this is a [...]
---
.../apache/druid/indexing/overlord/TaskQueue.java | 11 +++-
.../druid/indexing/overlord/TaskQueueTest.java | 68 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 2 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 221bc2a63f4..405e6f8f60d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -74,6 +74,7 @@ import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -417,8 +418,14 @@ public class TaskQueue
log.info("Notified task runner to clean up [%,d] tasks with IDs[%s].",
unknownTaskIds.size(), unknownTaskIds);
// Attain futures for all active tasks (assuming they are ready to run).
- // Copy tasks list, as notifyStatus may modify it.
- for (final String queuedTaskId : List.copyOf(activeTasks.keySet())) {
+ // Sort by priority (highest first) so that higher-priority tasks are
submitted
+ // to the runner before lower-priority ones.
+ final List<String> queuedTaskIds = activeTasks.values()
+ .stream()
+
.sorted(Comparator.comparingInt((TaskEntry entry) ->
entry.getTask().getPriority()).reversed())
+ .map(entry ->
entry.getTask().getId())
+ .toList();
+ for (final String queuedTaskId : queuedTaskIds) {
updateTaskEntry(
queuedTaskId,
entry -> startPendingTaskOnRunner(entry,
runnerTaskFutures.get(queuedTaskId))
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 33f80da67e0..bc409064e97 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -52,6 +52,7 @@ import
org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.IngestionTestBase;
+import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.NoopTaskContextEnricher;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
@@ -98,6 +99,7 @@ import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -751,6 +753,47 @@ public class TaskQueueTest extends IngestionTestBase
serviceEmitter.verifyEmitted("task/run/time", 3);
}
+ @Test
+ public void testTaskSubmissionToTaskRunnerBasedOnPriority() throws Exception
+ {
+ final RecordingTaskRunner recordingRunner = new
RecordingTaskRunner(serviceEmitter);
+ final TaskQueue priorityQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(10, null, null, null, null, null),
+ new DefaultTaskConfig(),
+ getTaskStorage(),
+ recordingRunner,
+ actionClientFactory,
+ getLockbox(),
+ serviceEmitter,
+ getObjectMapper(),
+ new NoopTaskContextEnricher()
+ );
+ priorityQueue.setActive(true);
+
+ final NoopTask lowPriority1 = NoopTask.ofPriority(1);
+ final NoopTask lowPriority2 = NoopTask.ofPriority(10);
+ final NoopTask medPriority = NoopTask.ofPriority(50);
+ final NoopTask highPriority1 = NoopTask.ofPriority(90);
+ final NoopTask highPriority2 = NoopTask.ofPriority(100);
+
+ priorityQueue.add(lowPriority1);
+ priorityQueue.add(medPriority);
+ priorityQueue.add(lowPriority2);
+ priorityQueue.add(highPriority2);
+ priorityQueue.add(highPriority1);
+
+ priorityQueue.manageQueuedTasks();
+
+ final List<String> submitted = recordingRunner.getSubmittedTaskIds();
+ Assert.assertEquals(5, submitted.size());
+ Assert.assertEquals(highPriority2.getId(), submitted.get(0));
+ Assert.assertEquals(highPriority1.getId(), submitted.get(1));
+ Assert.assertEquals(medPriority.getId(), submitted.get(2));
+ Assert.assertEquals(lowPriority2.getId(), submitted.get(3));
+ Assert.assertEquals(lowPriority1.getId(), submitted.get(4));
+ }
+
private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
{
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider
@@ -904,4 +947,29 @@ public class TaskQueueTest extends IngestionTestBase
}
}
}
+
+ /**
+ * A task runner that records the order in which tasks are submitted via
{@link #run(Task)}.
+ */
+ static class RecordingTaskRunner extends SimpleTaskRunner
+ {
+ private final List<String> submittedTaskIds = new ArrayList<>();
+
+ RecordingTaskRunner(ServiceEmitter emitter)
+ {
+ super(emitter);
+ }
+
+ @Override
+ public ListenableFuture<TaskStatus> run(Task task)
+ {
+ submittedTaskIds.add(task.getId());
+ return Futures.immediateFuture(TaskStatus.success(task.getId()));
+ }
+
+ List<String> getSubmittedTaskIds()
+ {
+ return submittedTaskIds;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]