This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9af6a22743a Add utility method to EmbeddedClusterApis to run tasks 
(#18349)
9af6a22743a is described below

commit 9af6a22743ab052628331bb1018c989999279aa0
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Aug 2 09:44:49 2025 +0530

    Add utility method to EmbeddedClusterApis to run tasks (#18349)
    
    Description:
    Embedded tests currently run into serialization issues when submitting a 
`Task` or
    `CompactionSupervisorSpec` payload to the cluster using any 
`OverlordClient`.
    This can happen because the `ObjectMapper` instance used by the 
`OverlordClient` might not have the
    necessary Jackson modules loaded.
    The hack so far has been to ensure that the first server in an embedded 
cluster is always an Overlord.
    But this can quickly run into issues, e.g. when running a cluster with 
basic auth enabled where a
    Coordinator must be started before all other servers.
    
    Fix:
    - Fix up methods `EmbeddedClusterApis`: `leaderOverlord`, 
`leaderCoordinator` and `anyBroker` to use
    the appropriate servers. The request is still sent over the wire even if 
the client bound on a server is used
    to call itself.
    - Add utility methods in `EmbeddedClusterApis` to `submitTask`, `runTask`, 
`postSupervisor`
---
 .../embedded/compact/CompactionTestBase.java       | 12 ++--
 .../indexing/ConcurrentAppendReplaceTest.java      | 10 +--
 .../embedded/indexing/IndexParallelTaskTest.java   |  4 +-
 .../testing/embedded/indexing/IndexTaskTest.java   | 15 ++---
 .../embedded/indexing/KafkaClusterMetricsTest.java | 42 ++++---------
 .../embedded/msq/BaseRealtimeQueryTest.java        | 10 +--
 .../embedded/msq/EmbeddedMSQRealtimeQueryTest.java |  2 +-
 .../msq/EmbeddedMSQRealtimeUnnestQueryTest.java    |  2 +-
 .../embedded/server/CoordinatorClientTest.java     |  3 +-
 .../embedded/server/HighAvailabilityTest.java      |  3 +-
 .../embedded/server/HistoricalCloningTest.java     |  8 +--
 .../catalog/compact/CatalogCompactionTest.java     | 10 +--
 .../simulate/EmbeddedKafkaSupervisorTest.java      | 10 +--
 .../testing/embedded/EmbeddedClusterApis.java      | 71 ++++++++++++++++------
 .../testing/embedded/EmbeddedDruidCluster.java     | 29 +++++++--
 15 files changed, 126 insertions(+), 105 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
index 683abe00199..109c9c12aae 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.testing.embedded.compact;
 
+import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.indexing.common.task.TaskBuilder;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
@@ -68,12 +69,11 @@ public abstract class CompactionTestBase extends 
EmbeddedClusterTestBase
    */
   protected String runTask(TaskBuilder<?, ?, ?> taskBuilder)
   {
-    return cluster.callApi().runTask(
-        (ds, taskId) -> taskBuilder.dataSource(ds).withId(taskId),
-        dataSource,
-        overlord,
-        coordinator
-    );
+    final String taskId = IdUtils.getRandomId();
+    
cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId), 
overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+    return taskId;
   }
 
   protected void verifySegmentIntervals(List<Interval> expectedIntervals)
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ConcurrentAppendReplaceTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ConcurrentAppendReplaceTest.java
index 560481c8caa..721f55dd151 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ConcurrentAppendReplaceTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ConcurrentAppendReplaceTest.java
@@ -68,10 +68,7 @@ public class ConcurrentAppendReplaceTest extends 
EmbeddedClusterTestBase
                    .appendToExisting(true)
                    .context("useConcurrentLocks", true)
                    .dimensions();
-    cluster.callApi().onLeaderOverlord(
-        o -> o.runTask(task1, taskBuilder.withId(task1))
-    );
-    cluster.callApi().waitForTaskToSucceed(task1, overlord);
+    cluster.callApi().runTask(taskBuilder.withId(task1), overlord);
 
     List<DataSegment> usedSegments = getAllUsedSegments();
     Assertions.assertEquals(1, usedSegments.size());
@@ -87,10 +84,7 @@ public class ConcurrentAppendReplaceTest extends 
EmbeddedClusterTestBase
 
     // Run the APPEND task again with a different taskId
     final String task2 = EmbeddedClusterApis.newTaskId(dataSource);
-    cluster.callApi().onLeaderOverlord(
-        o -> o.runTask(task2, taskBuilder.withId(task2))
-    );
-    cluster.callApi().waitForTaskToSucceed(task2, overlord);
+    cluster.callApi().runTask(taskBuilder.withId(task2), overlord);
 
     // Verify that the new segment gets appended with the same version but a 
different ID
     usedSegments = getAllUsedSegments();
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java
index 6725c23b923..e97c9b97747 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java
@@ -214,9 +214,7 @@ public class IndexParallelTaskTest extends 
EmbeddedClusterTestBase
   private String runTask(TaskBuilder.IndexParallel taskBuilder, String 
dataSource)
   {
     final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
-    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, 
taskBuilder.withId(taskId)));
-    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
-
+    cluster.callApi().runTask(taskBuilder.withId(taskId), overlord);
     return taskId;
   }
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexTaskTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexTaskTest.java
index e5578e83e5a..a93ef2ddb79 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexTaskTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexTaskTest.java
@@ -63,8 +63,8 @@ public class IndexTaskTest extends EmbeddedClusterTestBase
   {
     return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
                                .useLatchableEmitter()
-                               .addServer(overlord)
                                .addServer(coordinator)
+                               .addServer(overlord)
                                .addServer(indexer)
                                .addServer(historical)
                                .addServer(broker)
@@ -76,13 +76,12 @@ public class IndexTaskTest extends EmbeddedClusterTestBase
   public void test_runIndexTask_forInlineDatasource()
   {
     final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
-    final Object task = createIndexTaskForInlineData(
+    final IndexTask task = createIndexTaskForInlineData(
         taskId,
         Resources.InlineData.CSV_10_DAYS
     );
 
-    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
-    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+    cluster.callApi().runTask(task, overlord);
 
     // Verify that the task created 10 DAY-granularity segments
     final List<DataSegment> segments = new ArrayList<>(
@@ -142,7 +141,7 @@ public class IndexTaskTest extends EmbeddedClusterTestBase
     runTasksConcurrently(100);
   }
 
-  private Object createIndexTaskForInlineData(String taskId, String 
inlineDataCsv)
+  private IndexTask createIndexTaskForInlineData(String taskId, String 
inlineDataCsv)
   {
     return TaskBuilder.ofTypeIndex()
                       .dataSource(dataSource)
@@ -169,16 +168,14 @@ public class IndexTaskTest extends EmbeddedClusterTestBase
     int index = 0;
     for (String taskId : taskIds) {
       index++;
-      final Object task = createIndexTaskForInlineData(
+      final IndexTask task = createIndexTaskForInlineData(
           taskId,
           StringUtils.format(
               "%s,%s,%d",
               jan1.plusDays(index), "item " + index, index
           )
       );
-      cluster.callApi().onLeaderOverlord(
-          o -> o.runTask(taskId, task)
-      );
+      cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
     }
     for (String taskId : taskIds) {
       cluster.callApi().waitForTaskToSucceed(taskId, overlord);
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index 8717fcf223f..aa3667405bd 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -156,10 +156,10 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
         maxRowsPerSegment
     );
 
-    final Map<String, String> startSupervisorResult = 
cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(kafkaSupervisorSpec)
+    Assertions.assertEquals(
+        supervisorId,
+        cluster.callApi().postSupervisor(kafkaSupervisorSpec)
     );
-    Assertions.assertEquals(Map.of("id", supervisorId), startSupervisorResult);
 
     // Wait for segments to be handed off
     indexer.latchableEmitter().waitForEventAggregate(
@@ -183,9 +183,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
     verifyIngestedMetricCountMatchesEmittedCount("coordinator/time", 
coordinator);
 
     // Suspend the supervisor
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
-    );
+    
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
   @Test
@@ -208,9 +206,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
         taskCompletionTimeoutMillis,
         maxRowsPerSegment
     );
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(kafkaSupervisorSpec)
-    );
+    cluster.callApi().postSupervisor(kafkaSupervisorSpec);
 
     // Wait for a task to succeed
     overlord.latchableEmitter().waitForEventAggregate(
@@ -248,9 +244,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
         false,
         null
     );
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(compactionSupervisorSpec)
-    );
+    cluster.callApi().postSupervisor(compactionSupervisorSpec);
 
     // Wait until some compaction tasks have finished
     overlord.latchableEmitter().waitForEventAggregate(
@@ -309,12 +303,8 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
     );
 
     // Suspend the supervisors
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(compactionSupervisorSpec.createSuspendedSpec())
-    );
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
-    );
+    
cluster.callApi().postSupervisor(compactionSupervisorSpec.createSuspendedSpec());
+    
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
   @Test
@@ -337,9 +327,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
         taskCompletionTimeoutMillis,
         maxRowsPerSegment
     );
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(kafkaSupervisorSpec)
-    );
+    cluster.callApi().postSupervisor(kafkaSupervisorSpec);
 
     // Wait for some segments to be published
     overlord.latchableEmitter().waitForEvent(
@@ -371,9 +359,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
         false,
         null
     );
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(compactionSupervisorSpec)
-    );
+    cluster.callApi().postSupervisor(compactionSupervisorSpec);
 
     // Wait until some skipped metrics have been emitted
     overlord.latchableEmitter().waitForEventAggregate(
@@ -388,12 +374,8 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
     );
 
     // Suspend the supervisors
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(compactionSupervisorSpec.createSuspendedSpec())
-    );
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
-    );
+    
cluster.callApi().postSupervisor(compactionSupervisorSpec.createSuspendedSpec());
+    
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
   }
 
   /**
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
index 3a05637f660..42baf6a8662 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.testing.embedded.EmbeddedClusterApis;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
 import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.joda.time.Period;
@@ -84,13 +85,14 @@ public class BaseRealtimeQueryTest extends 
EmbeddedClusterTestBase
   /**
    * Submits a supervisor spec to the Overlord.
    */
-  protected void submitSupervisor()
+  protected void submitSupervisor(EmbeddedOverlord overlord)
   {
     // Submit a supervisor.
     final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor();
-    final Map<String, String> startSupervisorResult =
-        cluster.callApi().onLeaderOverlord(o -> 
o.postSupervisor(kafkaSupervisorSpec));
-    Assertions.assertEquals(Map.of("id", dataSource), startSupervisorResult);
+    Assertions.assertEquals(
+        dataSource,
+        cluster.callApi().postSupervisor(kafkaSupervisorSpec)
+    );
   }
 
   /**
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
index a8b4829fa36..a7071714f3e 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
@@ -167,7 +167,7 @@ public class EmbeddedMSQRealtimeQueryTest extends 
BaseRealtimeQueryTest
   void setUpEach()
   {
     msqApis = new EmbeddedMSQApis(cluster, overlord);
-    submitSupervisor();
+    submitSupervisor(overlord);
     publishToKafka(TestIndex.getMMappedWikipediaIndex());
 
     // Wait for it to be loaded.
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java
index cb38ce54acc..2c9e7c6258c 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java
@@ -113,7 +113,7 @@ public class EmbeddedMSQRealtimeUnnestQueryTest extends 
BaseRealtimeQueryTest
 
     QueryableIndex index = TestIndex.getMMappedTestIndex();
 
-    submitSupervisor();
+    submitSupervisor(overlord);
     publishToKafka(index);
 
     final int totalRows = index.getNumRows();
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorClientTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorClientTest.java
index 90a82b3eaec..12590b86069 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorClientTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorClientTest.java
@@ -192,8 +192,7 @@ public class CoordinatorClientTest extends 
EmbeddedClusterTestBase
                                  .dimensions()
                                  .withId(taskId);
 
-    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
-    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+    cluster.callApi().runTask(task, overlord);
     cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
   }
 }
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HighAvailabilityTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HighAvailabilityTest.java
index 7c025f1f06d..58a3a621ea2 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HighAvailabilityTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HighAvailabilityTest.java
@@ -121,8 +121,7 @@ public class HighAvailabilityTest extends 
EmbeddedClusterTestBase
         .inlineInputSourceWithData(Resources.InlineData.CSV_10_DAYS)
         .dimensions()
         .withId(taskId);
-    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
-    cluster.callApi().waitForTaskToSucceed(taskId, overlord1);
+    cluster.callApi().runTask(task, overlord1);
     coordinator1.latchableEmitter().waitForEvent(
         event -> event.hasMetricName("segment/metadataCache/used/count")
                       .hasDimension(DruidMetrics.DATASOURCE, dataSource)
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
index db22b015c8c..6c64443a4c3 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.testing.embedded.server;
 
 import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.TaskBuilder;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -128,13 +129,12 @@ public class HistoricalCloningTest extends 
EmbeddedClusterTestBase
   private void runIngestion()
   {
     final String taskId = IdUtils.getRandomId();
-    final Object task = createIndexTaskForInlineData(taskId);
+    final IndexTask task = createIndexTaskForInlineData(taskId);
 
-    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
-    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+    cluster.callApi().runTask(task, overlord);
   }
 
-  private Object createIndexTaskForInlineData(String taskId)
+  private IndexTask createIndexTaskForInlineData(String taskId)
   {
     return TaskBuilder.ofTypeIndex()
                       .dataSource(dataSource)
diff --git 
a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
 
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
index 6c04d2eebbf..9f3106b8844 100644
--- 
a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
+++ 
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.catalog.model.TableMetadata;
 import org.apache.druid.catalog.model.table.TableBuilder;
 import org.apache.druid.catalog.sync.CatalogClient;
 import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.TaskBuilder;
 import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
 import org.apache.druid.indexing.overlord.Segments;
@@ -113,7 +114,7 @@ public class CatalogCompactionTest extends 
EmbeddedClusterTestBase
 
     final CompactionSupervisorSpec compactionSupervisor
         = new CompactionSupervisorSpec(compactionConfig, false, null);
-    cluster.callApi().onLeaderOverlord(o -> 
o.postSupervisor(compactionSupervisor));
+    cluster.callApi().postSupervisor(compactionSupervisor);
 
     // Wait for compaction to finish
     overlord.latchableEmitter().waitForEvent(
@@ -136,13 +137,12 @@ public class CatalogCompactionTest extends 
EmbeddedClusterTestBase
   private void runIngestionAtDayGranularity()
   {
     final String taskId = IdUtils.getRandomId();
-    final Object task = createIndexTaskForInlineData(taskId);
+    final IndexTask task = createIndexTaskForInlineData(taskId);
 
-    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
-    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+    cluster.callApi().runTask(task, overlord);
   }
 
-  private Object createIndexTaskForInlineData(String taskId)
+  private IndexTask createIndexTaskForInlineData(String taskId)
   {
     final String inlineDataCsv =
         "2025-06-01T00:00:00.000Z,shirt,105"
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index b46b2b5bbd6..bab227dc864 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -49,7 +49,6 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 public class EmbeddedKafkaSupervisorTest extends EmbeddedClusterTestBase
@@ -91,10 +90,7 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
     final String supervisorId = dataSource + "_supe";
     final KafkaSupervisorSpec kafkaSupervisorSpec = 
createKafkaSupervisor(supervisorId, topic);
 
-    final Map<String, String> startSupervisorResult = 
cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(kafkaSupervisorSpec)
-    );
-    Assertions.assertEquals(Map.of("id", supervisorId), startSupervisorResult);
+    Assertions.assertEquals(supervisorId, 
cluster.callApi().postSupervisor(kafkaSupervisorSpec));
 
     // Wait for the broker to discover the realtime segments
     broker.latchableEmitter().waitForEvent(
@@ -120,9 +116,7 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
     Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s", 
dataSource));
 
     // Suspend the supervisor and verify the state
-    cluster.callApi().onLeaderOverlord(
-        o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
-    );
+    
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
     supervisorStatus = cluster.callApi().getSupervisorStatus(supervisorId);
     Assertions.assertTrue(supervisorStatus.isSuspended());
   }
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index f8bb3d66cb8..954e0f117c4 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -26,9 +26,13 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskMetrics;
 import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -50,6 +54,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.function.Function;
@@ -130,23 +135,24 @@ public class EmbeddedClusterApis
   }
 
   /**
-   * Creates a Task using the given builder and runs it.
-   *
-   * @return ID of the task.
+   * Runs a {@link Task} on this cluster and waits until it has completed 
successfully.
+   * The given {@link EmbeddedOverlord} must be the leader with a {@code 
LatchableEmitter}
+   * bound so that the task completion metric can be waited upon.
    */
-  public String runTask(
-      TaskBuilder taskBuilder,
-      String dataSource,
-      EmbeddedOverlord overlord,
-      EmbeddedCoordinator coordinator
-  )
+  public void runTask(Task task, EmbeddedOverlord leaderOverlord)
   {
-    final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
-    onLeaderOverlord(o -> o.runTask(taskId, taskBuilder.build(dataSource, 
taskId)));
-    waitForTaskToSucceed(taskId, overlord);
-    waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+    final String taskId = task.getId();
+    submitTask(task);
+    waitForTaskToSucceed(taskId, leaderOverlord);
+  }
 
-    return taskId;
+  /**
+   * Submits a {@link Task} to the leader Overlord but does not wait for it to 
finish.
+   * Shorthand for {@code onLeaderOverlord(o -> o.runTask(task.getId(), 
task))}.
+   */
+  public void submitTask(Task task)
+  {
+    onLeaderOverlord(o -> o.runTask(task.getId(), task));
   }
 
   /**
@@ -156,8 +162,10 @@ public class EmbeddedClusterApis
    */
   public void waitForTaskToSucceed(String taskId, EmbeddedOverlord overlord)
   {
-    waitForTaskToFinish(taskId, overlord);
-    verifyTaskHasStatus(taskId, TaskStatus.success(taskId));
+    Assertions.assertEquals(
+        TaskState.SUCCESS,
+        waitForTaskToFinish(taskId, overlord).getStatusCode()
+    );
   }
 
   /**
@@ -165,12 +173,13 @@ public class EmbeddedClusterApis
    * {@link EmbeddedOverlord} is not the leader, this method can only return by
    * throwing an exception upon timeout.
    */
-  public void waitForTaskToFinish(String taskId, EmbeddedOverlord overlord)
+  public TaskStatus waitForTaskToFinish(String taskId, EmbeddedOverlord 
overlord)
   {
     overlord.latchableEmitter().waitForEvent(
         event -> event.hasMetricName(TaskMetrics.RUN_DURATION)
                       .hasDimension(DruidMetrics.TASK_ID, taskId)
     );
+    return getTaskStatus(taskId);
   }
 
   /**
@@ -272,6 +281,34 @@ public class EmbeddedClusterApis
     );
   }
 
+  /**
+   * Gets the current status of the given task.
+   */
+  public TaskStatus getTaskStatus(String taskId)
+  {
+    final TaskStatusPlus statusPlus = onLeaderOverlord(o -> 
o.taskStatus(taskId)).getStatus();
+    Assertions.assertNotNull(statusPlus);
+
+    return new TaskStatus(
+        statusPlus.getId(),
+        Objects.requireNonNull(statusPlus.getStatusCode()),
+        Objects.requireNonNull(statusPlus.getDuration()),
+        statusPlus.getErrorMsg(),
+        statusPlus.getLocation()
+    );
+  }
+
+  /**
+   * Posts the given supervisor to the leader Overlord of this cluster.
+   * Shorhand for {@code onLeaderOverlord(o -> 
o.postSupervisor(supervisor)).get("id")}.
+   *
+   * @return ID of the submitted supervisor
+   */
+  public String postSupervisor(SupervisorSpec supervisor)
+  {
+    return onLeaderOverlord(o -> o.postSupervisor(supervisor)).get("id");
+  }
+
   /**
    * Fetches the current status of the given supervisor ID.
    */
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index 1dddc61c1e1..3fb2cba18bc 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -25,6 +25,7 @@ import org.apache.druid.client.broker.BrokerClient;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.metrics.LatchableEmitter;
@@ -77,7 +78,7 @@ public class EmbeddedDruidCluster implements 
ClusterReferencesProvider, Embedded
   private final EmbeddedClusterApis clusterApis;
   private final TestFolder testFolder = new TestFolder();
 
-  private final List<EmbeddedDruidServer> servers = new ArrayList<>();
+  private final List<EmbeddedDruidServer<?>> servers = new ArrayList<>();
   private final List<EmbeddedResource> resources = new ArrayList<>();
   private final List<Class<? extends DruidModule>> extensionModules = new 
ArrayList<>();
   private final Properties commonProperties = new Properties();
@@ -184,7 +185,7 @@ public class EmbeddedDruidCluster implements 
ClusterReferencesProvider, Embedded
    * cluster has started must be started explicitly by calling
    * {@link EmbeddedDruidServer#start()}.
    */
-  public EmbeddedDruidCluster addServer(EmbeddedDruidServer server)
+  public EmbeddedDruidCluster addServer(EmbeddedDruidServer<?> server)
   {
     server.onAddedToCluster(commonProperties);
     servers.add(server);
@@ -316,19 +317,37 @@ public class EmbeddedDruidCluster implements 
ClusterReferencesProvider, Embedded
   @Override
   public CoordinatorClient leaderCoordinator()
   {
-    return servers.get(0).bindings().leaderCoordinator();
+    return 
findServerOfType(EmbeddedCoordinator.class).bindings().leaderCoordinator();
   }
 
   @Override
   public OverlordClient leaderOverlord()
   {
-    return servers.get(0).bindings().leaderOverlord();
+    return 
findServerOfType(EmbeddedOverlord.class).bindings().leaderOverlord();
   }
 
   @Override
   public BrokerClient anyBroker()
   {
-    return servers.get(0).bindings().anyBroker();
+    return findServerOfType(EmbeddedBroker.class).bindings().anyBroker();
+  }
+
+  private <S extends EmbeddedDruidServer<S>> EmbeddedDruidServer<S> 
findServerOfType(
+      Class<S> serverType
+  )
+  {
+    S foundServer = null;
+    for (EmbeddedDruidServer<?> server : servers) {
+      if (serverType.isInstance(server)) {
+        foundServer = serverType.cast(server);
+        break;
+      }
+    }
+
+    return Objects.requireNonNull(
+        foundServer,
+        StringUtils.format("Cluster has no %s", serverType.getSimpleName())
+    );
   }
 
   private void validateNotStarted()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to