This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9af6a22743a Add utility method to EmbeddedClusterApis to run tasks
(#18349)
9af6a22743a is described below
commit 9af6a22743ab052628331bb1018c989999279aa0
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Aug 2 09:44:49 2025 +0530
Add utility method to EmbeddedClusterApis to run tasks (#18349)
Description:
Embedded tests currently run into serialization issues when submitting a
`Task` or
`CompactionSupervisorSpec` payload to the cluster using any
`OverlordClient`.
This can happen because the `ObjectMapper` instance used by the
`OverlordClient` might not have the
necessary Jackson modules loaded.
The hack so far has been to ensure that the first server in an embedded
cluster is always an Overlord.
But this can quickly run into issues, e.g. when running a cluster with
basic auth enabled where a
Coordinator must be started before all other servers.
Fix:
- Fix up methods `EmbeddedClusterApis`: `leaderOverlord`,
`leaderCoordinator` and `anyBroker` to use
the appropriate servers. The request is still sent over the wire even if
the client bound on a server is used
to call itself.
- Add utility methods in `EmbeddedClusterApis` to `submitTask`, `runTask`,
`postSupervisor`
---
.../embedded/compact/CompactionTestBase.java | 12 ++--
.../indexing/ConcurrentAppendReplaceTest.java | 10 +--
.../embedded/indexing/IndexParallelTaskTest.java | 4 +-
.../testing/embedded/indexing/IndexTaskTest.java | 15 ++---
.../embedded/indexing/KafkaClusterMetricsTest.java | 42 ++++---------
.../embedded/msq/BaseRealtimeQueryTest.java | 10 +--
.../embedded/msq/EmbeddedMSQRealtimeQueryTest.java | 2 +-
.../msq/EmbeddedMSQRealtimeUnnestQueryTest.java | 2 +-
.../embedded/server/CoordinatorClientTest.java | 3 +-
.../embedded/server/HighAvailabilityTest.java | 3 +-
.../embedded/server/HistoricalCloningTest.java | 8 +--
.../catalog/compact/CatalogCompactionTest.java | 10 +--
.../simulate/EmbeddedKafkaSupervisorTest.java | 10 +--
.../testing/embedded/EmbeddedClusterApis.java | 71 ++++++++++++++++------
.../testing/embedded/EmbeddedDruidCluster.java | 29 +++++++--
15 files changed, 126 insertions(+), 105 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
index 683abe00199..109c9c12aae 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
@@ -19,6 +19,7 @@
package org.apache.druid.testing.embedded.compact;
+import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.TaskBuilder;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.testing.embedded.EmbeddedBroker;
@@ -68,12 +69,11 @@ public abstract class CompactionTestBase extends
EmbeddedClusterTestBase
*/
protected String runTask(TaskBuilder<?, ?, ?> taskBuilder)
{
- return cluster.callApi().runTask(
- (ds, taskId) -> taskBuilder.dataSource(ds).withId(taskId),
- dataSource,
- overlord,
- coordinator
- );
+ final String taskId = IdUtils.getRandomId();
+
cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId),
overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+ return taskId;
}
protected void verifySegmentIntervals(List<Interval> expectedIntervals)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ConcurrentAppendReplaceTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ConcurrentAppendReplaceTest.java
index 560481c8caa..721f55dd151 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ConcurrentAppendReplaceTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/ConcurrentAppendReplaceTest.java
@@ -68,10 +68,7 @@ public class ConcurrentAppendReplaceTest extends
EmbeddedClusterTestBase
.appendToExisting(true)
.context("useConcurrentLocks", true)
.dimensions();
- cluster.callApi().onLeaderOverlord(
- o -> o.runTask(task1, taskBuilder.withId(task1))
- );
- cluster.callApi().waitForTaskToSucceed(task1, overlord);
+ cluster.callApi().runTask(taskBuilder.withId(task1), overlord);
List<DataSegment> usedSegments = getAllUsedSegments();
Assertions.assertEquals(1, usedSegments.size());
@@ -87,10 +84,7 @@ public class ConcurrentAppendReplaceTest extends
EmbeddedClusterTestBase
// Run the APPEND task again with a different taskId
final String task2 = EmbeddedClusterApis.newTaskId(dataSource);
- cluster.callApi().onLeaderOverlord(
- o -> o.runTask(task2, taskBuilder.withId(task2))
- );
- cluster.callApi().waitForTaskToSucceed(task2, overlord);
+ cluster.callApi().runTask(taskBuilder.withId(task2), overlord);
// Verify that the new segment gets appended with the same version but a
different ID
usedSegments = getAllUsedSegments();
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java
index 6725c23b923..e97c9b97747 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexParallelTaskTest.java
@@ -214,9 +214,7 @@ public class IndexParallelTaskTest extends
EmbeddedClusterTestBase
private String runTask(TaskBuilder.IndexParallel taskBuilder, String
dataSource)
{
final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
- cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId,
taskBuilder.withId(taskId)));
- cluster.callApi().waitForTaskToSucceed(taskId, overlord);
-
+ cluster.callApi().runTask(taskBuilder.withId(taskId), overlord);
return taskId;
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexTaskTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexTaskTest.java
index e5578e83e5a..a93ef2ddb79 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexTaskTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IndexTaskTest.java
@@ -63,8 +63,8 @@ public class IndexTaskTest extends EmbeddedClusterTestBase
{
return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
- .addServer(overlord)
.addServer(coordinator)
+ .addServer(overlord)
.addServer(indexer)
.addServer(historical)
.addServer(broker)
@@ -76,13 +76,12 @@ public class IndexTaskTest extends EmbeddedClusterTestBase
public void test_runIndexTask_forInlineDatasource()
{
final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
- final Object task = createIndexTaskForInlineData(
+ final IndexTask task = createIndexTaskForInlineData(
taskId,
Resources.InlineData.CSV_10_DAYS
);
- cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
- cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+ cluster.callApi().runTask(task, overlord);
// Verify that the task created 10 DAY-granularity segments
final List<DataSegment> segments = new ArrayList<>(
@@ -142,7 +141,7 @@ public class IndexTaskTest extends EmbeddedClusterTestBase
runTasksConcurrently(100);
}
- private Object createIndexTaskForInlineData(String taskId, String
inlineDataCsv)
+ private IndexTask createIndexTaskForInlineData(String taskId, String
inlineDataCsv)
{
return TaskBuilder.ofTypeIndex()
.dataSource(dataSource)
@@ -169,16 +168,14 @@ public class IndexTaskTest extends EmbeddedClusterTestBase
int index = 0;
for (String taskId : taskIds) {
index++;
- final Object task = createIndexTaskForInlineData(
+ final IndexTask task = createIndexTaskForInlineData(
taskId,
StringUtils.format(
"%s,%s,%d",
jan1.plusDays(index), "item " + index, index
)
);
- cluster.callApi().onLeaderOverlord(
- o -> o.runTask(taskId, task)
- );
+ cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
}
for (String taskId : taskIds) {
cluster.callApi().waitForTaskToSucceed(taskId, overlord);
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index 8717fcf223f..aa3667405bd 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -156,10 +156,10 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
maxRowsPerSegment
);
- final Map<String, String> startSupervisorResult =
cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(kafkaSupervisorSpec)
+ Assertions.assertEquals(
+ supervisorId,
+ cluster.callApi().postSupervisor(kafkaSupervisorSpec)
);
- Assertions.assertEquals(Map.of("id", supervisorId), startSupervisorResult);
// Wait for segments to be handed off
indexer.latchableEmitter().waitForEventAggregate(
@@ -183,9 +183,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
verifyIngestedMetricCountMatchesEmittedCount("coordinator/time",
coordinator);
// Suspend the supervisor
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
- );
+
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}
@Test
@@ -208,9 +206,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
taskCompletionTimeoutMillis,
maxRowsPerSegment
);
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(kafkaSupervisorSpec)
- );
+ cluster.callApi().postSupervisor(kafkaSupervisorSpec);
// Wait for a task to succeed
overlord.latchableEmitter().waitForEventAggregate(
@@ -248,9 +244,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
false,
null
);
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(compactionSupervisorSpec)
- );
+ cluster.callApi().postSupervisor(compactionSupervisorSpec);
// Wait until some compaction tasks have finished
overlord.latchableEmitter().waitForEventAggregate(
@@ -309,12 +303,8 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
);
// Suspend the supervisors
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(compactionSupervisorSpec.createSuspendedSpec())
- );
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
- );
+
cluster.callApi().postSupervisor(compactionSupervisorSpec.createSuspendedSpec());
+
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}
@Test
@@ -337,9 +327,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
taskCompletionTimeoutMillis,
maxRowsPerSegment
);
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(kafkaSupervisorSpec)
- );
+ cluster.callApi().postSupervisor(kafkaSupervisorSpec);
// Wait for some segments to be published
overlord.latchableEmitter().waitForEvent(
@@ -371,9 +359,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
false,
null
);
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(compactionSupervisorSpec)
- );
+ cluster.callApi().postSupervisor(compactionSupervisorSpec);
// Wait until some skipped metrics have been emitted
overlord.latchableEmitter().waitForEventAggregate(
@@ -388,12 +374,8 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
);
// Suspend the supervisors
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(compactionSupervisorSpec.createSuspendedSpec())
- );
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
- );
+
cluster.callApi().postSupervisor(compactionSupervisorSpec.createSuspendedSpec());
+
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}
/**
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
index 3a05637f660..42baf6a8662 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.joda.time.Period;
@@ -84,13 +85,14 @@ public class BaseRealtimeQueryTest extends
EmbeddedClusterTestBase
/**
* Submits a supervisor spec to the Overlord.
*/
- protected void submitSupervisor()
+ protected void submitSupervisor(EmbeddedOverlord overlord)
{
// Submit a supervisor.
final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor();
- final Map<String, String> startSupervisorResult =
- cluster.callApi().onLeaderOverlord(o ->
o.postSupervisor(kafkaSupervisorSpec));
- Assertions.assertEquals(Map.of("id", dataSource), startSupervisorResult);
+ Assertions.assertEquals(
+ dataSource,
+ cluster.callApi().postSupervisor(kafkaSupervisorSpec)
+ );
}
/**
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
index a8b4829fa36..a7071714f3e 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
@@ -167,7 +167,7 @@ public class EmbeddedMSQRealtimeQueryTest extends
BaseRealtimeQueryTest
void setUpEach()
{
msqApis = new EmbeddedMSQApis(cluster, overlord);
- submitSupervisor();
+ submitSupervisor(overlord);
publishToKafka(TestIndex.getMMappedWikipediaIndex());
// Wait for it to be loaded.
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java
index cb38ce54acc..2c9e7c6258c 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeUnnestQueryTest.java
@@ -113,7 +113,7 @@ public class EmbeddedMSQRealtimeUnnestQueryTest extends
BaseRealtimeQueryTest
QueryableIndex index = TestIndex.getMMappedTestIndex();
- submitSupervisor();
+ submitSupervisor(overlord);
publishToKafka(index);
final int totalRows = index.getNumRows();
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorClientTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorClientTest.java
index 90a82b3eaec..12590b86069 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorClientTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorClientTest.java
@@ -192,8 +192,7 @@ public class CoordinatorClientTest extends
EmbeddedClusterTestBase
.dimensions()
.withId(taskId);
- cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
- cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+ cluster.callApi().runTask(task, overlord);
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
}
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HighAvailabilityTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HighAvailabilityTest.java
index 7c025f1f06d..58a3a621ea2 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HighAvailabilityTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HighAvailabilityTest.java
@@ -121,8 +121,7 @@ public class HighAvailabilityTest extends
EmbeddedClusterTestBase
.inlineInputSourceWithData(Resources.InlineData.CSV_10_DAYS)
.dimensions()
.withId(taskId);
- cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
- cluster.callApi().waitForTaskToSucceed(taskId, overlord1);
+ cluster.callApi().runTask(task, overlord1);
coordinator1.latchableEmitter().waitForEvent(
event -> event.hasMetricName("segment/metadataCache/used/count")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
index db22b015c8c..6c64443a4c3 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/HistoricalCloningTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.testing.embedded.server;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.TaskBuilder;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -128,13 +129,12 @@ public class HistoricalCloningTest extends
EmbeddedClusterTestBase
private void runIngestion()
{
final String taskId = IdUtils.getRandomId();
- final Object task = createIndexTaskForInlineData(taskId);
+ final IndexTask task = createIndexTaskForInlineData(taskId);
- cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
- cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+ cluster.callApi().runTask(task, overlord);
}
- private Object createIndexTaskForInlineData(String taskId)
+ private IndexTask createIndexTaskForInlineData(String taskId)
{
return TaskBuilder.ofTypeIndex()
.dataSource(dataSource)
diff --git
a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
index 6c04d2eebbf..9f3106b8844 100644
---
a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
+++
b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.catalog.model.TableMetadata;
import org.apache.druid.catalog.model.table.TableBuilder;
import org.apache.druid.catalog.sync.CatalogClient;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.TaskBuilder;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.Segments;
@@ -113,7 +114,7 @@ public class CatalogCompactionTest extends
EmbeddedClusterTestBase
final CompactionSupervisorSpec compactionSupervisor
= new CompactionSupervisorSpec(compactionConfig, false, null);
- cluster.callApi().onLeaderOverlord(o ->
o.postSupervisor(compactionSupervisor));
+ cluster.callApi().postSupervisor(compactionSupervisor);
// Wait for compaction to finish
overlord.latchableEmitter().waitForEvent(
@@ -136,13 +137,12 @@ public class CatalogCompactionTest extends
EmbeddedClusterTestBase
private void runIngestionAtDayGranularity()
{
final String taskId = IdUtils.getRandomId();
- final Object task = createIndexTaskForInlineData(taskId);
+ final IndexTask task = createIndexTaskForInlineData(taskId);
- cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
- cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+ cluster.callApi().runTask(task, overlord);
}
- private Object createIndexTaskForInlineData(String taskId)
+ private IndexTask createIndexTaskForInlineData(String taskId)
{
final String inlineDataCsv =
"2025-06-01T00:00:00.000Z,shirt,105"
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index b46b2b5bbd6..bab227dc864 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -49,7 +49,6 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
public class EmbeddedKafkaSupervisorTest extends EmbeddedClusterTestBase
@@ -91,10 +90,7 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
final String supervisorId = dataSource + "_supe";
final KafkaSupervisorSpec kafkaSupervisorSpec =
createKafkaSupervisor(supervisorId, topic);
- final Map<String, String> startSupervisorResult =
cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(kafkaSupervisorSpec)
- );
- Assertions.assertEquals(Map.of("id", supervisorId), startSupervisorResult);
+ Assertions.assertEquals(supervisorId,
cluster.callApi().postSupervisor(kafkaSupervisorSpec));
// Wait for the broker to discover the realtime segments
broker.latchableEmitter().waitForEvent(
@@ -120,9 +116,7 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
// Suspend the supervisor and verify the state
- cluster.callApi().onLeaderOverlord(
- o -> o.postSupervisor(kafkaSupervisorSpec.createSuspendedSpec())
- );
+
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
supervisorStatus = cluster.callApi().getSupervisorStatus(supervisorId);
Assertions.assertTrue(supervisorStatus.isSuspended());
}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index f8bb3d66cb8..954e0f117c4 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -26,9 +26,13 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskMetrics;
import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -50,6 +54,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
@@ -130,23 +135,24 @@ public class EmbeddedClusterApis
}
/**
- * Creates a Task using the given builder and runs it.
- *
- * @return ID of the task.
+ * Runs a {@link Task} on this cluster and waits until it has completed
successfully.
+ * The given {@link EmbeddedOverlord} must be the leader with a {@code
LatchableEmitter}
+ * bound so that the task completion metric can be waited upon.
*/
- public String runTask(
- TaskBuilder taskBuilder,
- String dataSource,
- EmbeddedOverlord overlord,
- EmbeddedCoordinator coordinator
- )
+ public void runTask(Task task, EmbeddedOverlord leaderOverlord)
{
- final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
- onLeaderOverlord(o -> o.runTask(taskId, taskBuilder.build(dataSource,
taskId)));
- waitForTaskToSucceed(taskId, overlord);
- waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+ final String taskId = task.getId();
+ submitTask(task);
+ waitForTaskToSucceed(taskId, leaderOverlord);
+ }
- return taskId;
+ /**
+ * Submits a {@link Task} to the leader Overlord but does not wait for it to
finish.
+ * Shorthand for {@code onLeaderOverlord(o -> o.runTask(task.getId(),
task))}.
+ */
+ public void submitTask(Task task)
+ {
+ onLeaderOverlord(o -> o.runTask(task.getId(), task));
}
/**
@@ -156,8 +162,10 @@ public class EmbeddedClusterApis
*/
public void waitForTaskToSucceed(String taskId, EmbeddedOverlord overlord)
{
- waitForTaskToFinish(taskId, overlord);
- verifyTaskHasStatus(taskId, TaskStatus.success(taskId));
+ Assertions.assertEquals(
+ TaskState.SUCCESS,
+ waitForTaskToFinish(taskId, overlord).getStatusCode()
+ );
}
/**
@@ -165,12 +173,13 @@ public class EmbeddedClusterApis
* {@link EmbeddedOverlord} is not the leader, this method can only return by
* throwing an exception upon timeout.
*/
- public void waitForTaskToFinish(String taskId, EmbeddedOverlord overlord)
+ public TaskStatus waitForTaskToFinish(String taskId, EmbeddedOverlord
overlord)
{
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName(TaskMetrics.RUN_DURATION)
.hasDimension(DruidMetrics.TASK_ID, taskId)
);
+ return getTaskStatus(taskId);
}
/**
@@ -272,6 +281,34 @@ public class EmbeddedClusterApis
);
}
+ /**
+ * Gets the current status of the given task.
+ */
+ public TaskStatus getTaskStatus(String taskId)
+ {
+ final TaskStatusPlus statusPlus = onLeaderOverlord(o ->
o.taskStatus(taskId)).getStatus();
+ Assertions.assertNotNull(statusPlus);
+
+ return new TaskStatus(
+ statusPlus.getId(),
+ Objects.requireNonNull(statusPlus.getStatusCode()),
+ Objects.requireNonNull(statusPlus.getDuration()),
+ statusPlus.getErrorMsg(),
+ statusPlus.getLocation()
+ );
+ }
+
+ /**
+ * Posts the given supervisor to the leader Overlord of this cluster.
+ * Shorhand for {@code onLeaderOverlord(o ->
o.postSupervisor(supervisor)).get("id")}.
+ *
+ * @return ID of the submitted supervisor
+ */
+ public String postSupervisor(SupervisorSpec supervisor)
+ {
+ return onLeaderOverlord(o -> o.postSupervisor(supervisor)).get("id");
+ }
+
/**
* Fetches the current status of the given supervisor ID.
*/
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index 1dddc61c1e1..3fb2cba18bc 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -25,6 +25,7 @@ import org.apache.druid.client.broker.BrokerClient;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.metrics.LatchableEmitter;
@@ -77,7 +78,7 @@ public class EmbeddedDruidCluster implements
ClusterReferencesProvider, Embedded
private final EmbeddedClusterApis clusterApis;
private final TestFolder testFolder = new TestFolder();
- private final List<EmbeddedDruidServer> servers = new ArrayList<>();
+ private final List<EmbeddedDruidServer<?>> servers = new ArrayList<>();
private final List<EmbeddedResource> resources = new ArrayList<>();
private final List<Class<? extends DruidModule>> extensionModules = new
ArrayList<>();
private final Properties commonProperties = new Properties();
@@ -184,7 +185,7 @@ public class EmbeddedDruidCluster implements
ClusterReferencesProvider, Embedded
* cluster has started must be started explicitly by calling
* {@link EmbeddedDruidServer#start()}.
*/
- public EmbeddedDruidCluster addServer(EmbeddedDruidServer server)
+ public EmbeddedDruidCluster addServer(EmbeddedDruidServer<?> server)
{
server.onAddedToCluster(commonProperties);
servers.add(server);
@@ -316,19 +317,37 @@ public class EmbeddedDruidCluster implements
ClusterReferencesProvider, Embedded
@Override
public CoordinatorClient leaderCoordinator()
{
- return servers.get(0).bindings().leaderCoordinator();
+ return
findServerOfType(EmbeddedCoordinator.class).bindings().leaderCoordinator();
}
@Override
public OverlordClient leaderOverlord()
{
- return servers.get(0).bindings().leaderOverlord();
+ return
findServerOfType(EmbeddedOverlord.class).bindings().leaderOverlord();
}
@Override
public BrokerClient anyBroker()
{
- return servers.get(0).bindings().anyBroker();
+ return findServerOfType(EmbeddedBroker.class).bindings().anyBroker();
+ }
+
+ private <S extends EmbeddedDruidServer<S>> EmbeddedDruidServer<S>
findServerOfType(
+ Class<S> serverType
+ )
+ {
+ S foundServer = null;
+ for (EmbeddedDruidServer<?> server : servers) {
+ if (serverType.isInstance(server)) {
+ foundServer = serverType.cast(server);
+ break;
+ }
+ }
+
+ return Objects.requireNonNull(
+ foundServer,
+ StringUtils.format("Cluster has no %s", serverType.getSimpleName())
+ );
}
private void validateNotStarted()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]