This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 95de4c4b1ea Reduce timeout in LatchableEmitter to identify slow
embedded tests (#18483)
95de4c4b1ea is described below
commit 95de4c4b1ea134e32f322efb0e63f1112a64b5ab
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Sep 22 17:28:01 2025 +0530
Reduce timeout in LatchableEmitter to identify slow embedded tests (#18483)
Changes:
- Make wait timeout in LatchableEmitter configurable
- Set the default wait timeout to 10s
- Fix docker tests workflow to upload failsafe reports
---
.github/workflows/docker-tests.yml | 10 ++---
.../IngestionBackwardCompatibilityDockerTest.java | 1 +
.../embedded/docker/IngestionDockerTest.java | 1 +
.../embedded/indexing/KafkaClusterMetricsTest.java | 1 +
.../embedded/k8s/KubernetesClusterDockerTest.java | 1 +
.../KubernetesClusterWithOperatorDockerTest.java | 1 +
.../msq/EmbeddedDurableShuffleStorageTest.java | 1 +
.../embedded/msq/MSQWorkerFaultToleranceTest.java | 15 +++----
.../embedded/query/QueryVirtualStorageTest.java | 23 +---------
.../druid/server/metrics/LatchableEmitter.java | 15 +++++--
.../server/metrics/LatchableEmitterConfig.java | 49 ++++++++++++++++++++++
.../testing/embedded/EmbeddedDruidCluster.java | 15 +++++++
.../embedded/emitter/LatchableEmitterModule.java | 8 +++-
13 files changed, 101 insertions(+), 40 deletions(-)
diff --git a/.github/workflows/docker-tests.yml
b/.github/workflows/docker-tests.yml
index 51628881016..8a54032a246 100644
--- a/.github/workflows/docker-tests.yml
+++ b/.github/workflows/docker-tests.yml
@@ -71,14 +71,14 @@ jobs:
name: failure-docker-logs
path: docker-logs.tgz
- - name: Collect surefire reports on failure
+ - name: Collect failsafe reports on failure
if: ${{ failure() && steps.run-it.conclusion == 'failure' }}
run: |
- tar cvzf ./surefire-logs.tgz ./embedded-tests/target/surefire-reports
+ tar cvzf ./failsafe-logs.tgz ./embedded-tests/target/failsafe-reports
- - name: Upload surefire reports to GitHub
+ - name: Upload failsafe reports to GitHub
if: ${{ failure() && steps.run-it.conclusion == 'failure' }}
uses: actions/upload-artifact@v4
with:
- name: failure-surefire-logs
- path: surefire-logs.tgz
+ name: failure-failsafe-logs
+ path: failsafe-logs.tgz
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionBackwardCompatibilityDockerTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionBackwardCompatibilityDockerTest.java
index 5c814c2cb90..757aa886a81 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionBackwardCompatibilityDockerTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionBackwardCompatibilityDockerTest.java
@@ -61,6 +61,7 @@ public class IngestionBackwardCompatibilityDockerTest extends
IngestionSmokeTest
return cluster
.useContainerFriendlyHostname()
+ .useDefaultTimeoutForLatchableEmitter(60)
.addResource(containerOverlord)
.addResource(containerCoordinator)
.addServer(overlord)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java
index 25371fae269..40e7a6bc073 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java
@@ -52,6 +52,7 @@ public class IngestionDockerTest extends IngestionSmokeTest
implements LatestIma
overlord.addProperty("druid.plaintextPort", "7090");
return cluster
+ .useDefaultTimeoutForLatchableEmitter(60)
.useContainerFriendlyHostname()
.addResource(containerOverlord)
.addResource(containerCoordinator)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index 5a977b861e8..44c9be9a9c8 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -109,6 +109,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
cluster.addExtension(KafkaIndexTaskModule.class)
.addExtension(KafkaEmitterModule.class)
.addExtension(LatchableEmitterModule.class)
+ .useDefaultTimeoutForLatchableEmitter(60)
.addCommonProperty("druid.emitter", "composing")
.addCommonProperty("druid.emitter.composing.emitters",
"[\"latching\",\"kafka\"]")
.addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
index c44796ca0f2..c49434f66c4 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
@@ -54,6 +54,7 @@ public class KubernetesClusterDockerTest extends
IngestionSmokeTest implements L
return cluster
.useContainerFriendlyHostname()
+ .useDefaultTimeoutForLatchableEmitter(60)
.addResource(k3sCluster)
.addServer(overlord)
.addServer(broker)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
index 748dfd55041..f9a9c10c497 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
@@ -62,6 +62,7 @@ public class KubernetesClusterWithOperatorDockerTest extends
IngestionSmokeTest
return cluster
.useContainerFriendlyHostname()
+ .useDefaultTimeoutForLatchableEmitter(60)
.addResource(k3sCluster)
.addServer(overlord)
.addServer(broker)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
index fd6c819e756..62f168d9ba7 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
@@ -91,6 +91,7 @@ public class EmbeddedDurableShuffleStorageTest extends
EmbeddedClusterTestBase
return EmbeddedDruidCluster
.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
+ .useDefaultTimeoutForLatchableEmitter(20)
.addExtensions(S3StorageConnectorModule.class)
.addResource(storageResource)
.addResource(msqStorageResource)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
index 254401d1d6f..063bf2eb169 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
@@ -50,7 +50,6 @@ public class MSQWorkerFaultToleranceTest extends
EmbeddedClusterTestBase
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
private final EmbeddedIndexer indexer = new EmbeddedIndexer()
- .addProperty("druid.plaintextPort", "7091")
.addProperty("druid.worker.capacity", "1");
private EmbeddedMSQApis msqApis;
@@ -93,6 +92,7 @@ public class MSQWorkerFaultToleranceTest extends
EmbeddedClusterTestBase
// Add a faulty Indexer to the cluster so that worker is launched but
doesn't finish
final EmbeddedIndexer faultyIndexer = new EmbeddedIndexer()
+ .addProperty("druid.plaintextPort", "7091")
.addProperty("druid.unsafe.cluster.testing", "true")
.addProperty("druid.unsafe.cluster.testing.overlordClient.taskStatusDelay",
"PT1H")
.addProperty("druid.worker.capacity", "1");
@@ -106,6 +106,13 @@ public class MSQWorkerFaultToleranceTest extends
EmbeddedClusterTestBase
final String workerTaskId = (String)
matchingEvent.getUserDims().get(DruidMetrics.TASK_ID);
Thread.sleep(100);
+ // Add a functional Indexer where the worker can be relaunched
+ final EmbeddedIndexer functionalIndexer = new EmbeddedIndexer()
+ .addProperty("druid.plaintextPort", "6091")
+ .addProperty("druid.worker.capacity", "1");
+ cluster.addServer(functionalIndexer);
+ functionalIndexer.start();
+
// Cancel the worker task and verify that it has failed
cluster.callApi().onLeaderOverlord(o -> o.cancelTask(workerTaskId));
overlord.latchableEmitter().waitForEvent(
@@ -115,12 +122,6 @@ public class MSQWorkerFaultToleranceTest extends
EmbeddedClusterTestBase
);
faultyIndexer.stop();
- // Add a functional Indexer so that the worker is relaunched successfully
- final EmbeddedIndexer functionalIndexer = new EmbeddedIndexer()
- .addProperty("druid.worker.capacity", "1");
- cluster.addServer(functionalIndexer);
- functionalIndexer.start();
-
// Verify that the controller task eventually succeeds
cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(),
overlord.latchableEmitter());
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
index 1cfcad2896e..8ea58968f81 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
@@ -24,16 +24,6 @@ import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule;
-import org.apache.druid.msq.dart.guice.DartControllerModule;
-import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule;
-import org.apache.druid.msq.dart.guice.DartWorkerModule;
-import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
-import org.apache.druid.msq.guice.MSQDurableStorageModule;
-import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
-import org.apache.druid.msq.guice.MSQIndexingModule;
-import org.apache.druid.msq.guice.MSQSqlModule;
-import org.apache.druid.msq.guice.SqlTaskModule;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.sql.calcite.planner.Calcites;
@@ -104,18 +94,7 @@ class QueryVirtualStorageTest extends
EmbeddedClusterTestBase
return EmbeddedDruidCluster
.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
- .addExtensions(
- DartControllerModule.class,
- DartWorkerModule.class,
- DartControllerMemoryManagementModule.class,
- DartWorkerMemoryManagementModule.class,
- IndexerMemoryManagementModule.class,
- MSQDurableStorageModule.class,
- MSQIndexingModule.class,
- MSQSqlModule.class,
- SqlTaskModule.class,
- MSQExternalDataSourceModule.class
- )
+ .useDefaultTimeoutForLatchableEmitter(20)
.addResource(storageResource)
.addCommonProperty("druid.storage.zip", "false")
.addServer(coordinator)
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index bd0b486f6da..247b74d8e57 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -62,12 +62,18 @@ public class LatchableEmitter extends StubServiceEmitter
*/
private final List<Event> processedEvents = new ArrayList<>();
+ /**
+ * Default timeout to use while waiting for events.
+ */
+ private final long defaultWaitTimeoutMillis;
+
/**
* Creates a {@link StubServiceEmitter} that may be used in embedded tests.
*/
- public LatchableEmitter(String service, String host)
+ public LatchableEmitter(String service, String host, LatchableEmitterConfig
config)
{
super(service, host);
+ this.defaultWaitTimeoutMillis = config.getDefaultWaitTimeoutMillis();
}
@Override
@@ -129,7 +135,8 @@ public class LatchableEmitter extends StubServiceEmitter
}
/**
- * Wait indefinitely until a metric event that matches the given condition
is emitted.
+ * Wait until a metric event that matches the given condition is emitted.
+ * Uses the {@link LatchableEmitterConfig#defaultWaitTimeoutMillis}.
*/
public ServiceMetricEvent waitForEvent(UnaryOperator<EventMatcher> condition)
{
@@ -137,7 +144,7 @@ public class LatchableEmitter extends StubServiceEmitter
waitForEvent(
event -> event instanceof ServiceMetricEvent
&& matcher.test((ServiceMetricEvent) event),
- -1
+ defaultWaitTimeoutMillis
);
return matcher.matchingEvent.get();
}
@@ -158,7 +165,7 @@ public class LatchableEmitter extends StubServiceEmitter
event -> event instanceof ServiceMetricEvent
&& eventMatcher.test((ServiceMetricEvent) event)
&& aggregateMatcher.test((ServiceMetricEvent) event),
- 300_000
+ defaultWaitTimeoutMillis
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitterConfig.java
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitterConfig.java
new file mode 100644
index 00000000000..9ab358119aa
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitterConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+
+import javax.annotation.Nullable;
+
+public class LatchableEmitterConfig
+{
+ @JsonProperty
+ private final long defaultWaitTimeoutMillis;
+
+ @JsonCreator
+ public LatchableEmitterConfig(
+ @JsonProperty("defaultWaitTimeoutMillis") @Nullable Long
defaultWaitTimeoutMillis
+ )
+ {
+ this.defaultWaitTimeoutMillis =
Configs.valueOrDefault(defaultWaitTimeoutMillis, 10_000);
+ }
+
+ /**
+ * Default time to wait for an event to be emitted. Slow tests can set this
+ * config to a high value to avoid failures.
+ */
+ public long getDefaultWaitTimeoutMillis()
+ {
+ return defaultWaitTimeoutMillis;
+ }
+}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index ee6885f030b..66178c05895 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -145,6 +145,21 @@ public class EmbeddedDruidCluster implements
EmbeddedResource
return this;
}
+ /**
+ * Configures this cluster to use the given default timeout with the
+ * {@link LatchableEmitter}. Slow tests may set a higher value for the
timeout
+ * to avoid failures. If the cluster does not {@link #useLatchableEmitter()},
+ * this method has no effect. Default timeout is 10 seconds.
+ */
+ public EmbeddedDruidCluster useDefaultTimeoutForLatchableEmitter(long
timeoutSeconds)
+ {
+ addCommonProperty(
+ "druid.emitter.latching.defaultWaitTimeoutMillis",
+ String.valueOf(timeoutSeconds * 1000)
+ );
+ return this;
+ }
+
/**
* Adds an extension to this cluster. The list of extensions is populated in
* the common property {@code druid.extensions.modulesForEmbeddedTest}.
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
index 71c228a96e5..78603090e5f 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
@@ -23,12 +23,14 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.name.Names;
+import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.server.metrics.LatchableEmitterConfig;
/**
* Guice module to use {@link LatchableEmitter}. This module is added to the
file
@@ -40,6 +42,7 @@ public class LatchableEmitterModule implements DruidModule
@Override
public void configure(Binder binder)
{
+ JsonConfigProvider.bind(binder, "druid.emitter.latching",
LatchableEmitterConfig.class);
binder.bind(Key.get(Emitter.class, Names.named(LatchableEmitter.TYPE)))
.to(LatchableEmitter.class);
}
@@ -47,9 +50,10 @@ public class LatchableEmitterModule implements DruidModule
@Provides
@ManageLifecycle
public LatchableEmitter makeEmitter(
- @Self DruidNode selfNode
+ @Self DruidNode selfNode,
+ LatchableEmitterConfig config
)
{
- return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost());
+ return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(),
config);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]