This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 95de4c4b1ea Reduce timeout in LatchableEmitter to identify slow 
embedded tests (#18483)
95de4c4b1ea is described below

commit 95de4c4b1ea134e32f322efb0e63f1112a64b5ab
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Sep 22 17:28:01 2025 +0530

    Reduce timeout in LatchableEmitter to identify slow embedded tests (#18483)
    
    Changes:
    - Make wait timeout in LatchableEmitter configurable
    - Set the default wait timeout to 10s
    - Fix docker tests workflow to upload failsafe reports
---
 .github/workflows/docker-tests.yml                 | 10 ++---
 .../IngestionBackwardCompatibilityDockerTest.java  |  1 +
 .../embedded/docker/IngestionDockerTest.java       |  1 +
 .../embedded/indexing/KafkaClusterMetricsTest.java |  1 +
 .../embedded/k8s/KubernetesClusterDockerTest.java  |  1 +
 .../KubernetesClusterWithOperatorDockerTest.java   |  1 +
 .../msq/EmbeddedDurableShuffleStorageTest.java     |  1 +
 .../embedded/msq/MSQWorkerFaultToleranceTest.java  | 15 +++----
 .../embedded/query/QueryVirtualStorageTest.java    | 23 +---------
 .../druid/server/metrics/LatchableEmitter.java     | 15 +++++--
 .../server/metrics/LatchableEmitterConfig.java     | 49 ++++++++++++++++++++++
 .../testing/embedded/EmbeddedDruidCluster.java     | 15 +++++++
 .../embedded/emitter/LatchableEmitterModule.java   |  8 +++-
 13 files changed, 101 insertions(+), 40 deletions(-)

diff --git a/.github/workflows/docker-tests.yml 
b/.github/workflows/docker-tests.yml
index 51628881016..8a54032a246 100644
--- a/.github/workflows/docker-tests.yml
+++ b/.github/workflows/docker-tests.yml
@@ -71,14 +71,14 @@ jobs:
           name: failure-docker-logs
           path: docker-logs.tgz
 
-      - name: Collect surefire reports on failure
+      - name: Collect failsafe reports on failure
         if: ${{ failure() && steps.run-it.conclusion == 'failure' }}
         run: |
-          tar cvzf ./surefire-logs.tgz ./embedded-tests/target/surefire-reports
+          tar cvzf ./failsafe-logs.tgz ./embedded-tests/target/failsafe-reports
 
-      - name: Upload surefire reports to GitHub
+      - name: Upload failsafe reports to GitHub
         if: ${{ failure() && steps.run-it.conclusion == 'failure' }}
         uses: actions/upload-artifact@v4
         with:
-          name: failure-surefire-logs
-          path: surefire-logs.tgz
+          name: failure-failsafe-logs
+          path: failsafe-logs.tgz
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionBackwardCompatibilityDockerTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionBackwardCompatibilityDockerTest.java
index 5c814c2cb90..757aa886a81 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionBackwardCompatibilityDockerTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionBackwardCompatibilityDockerTest.java
@@ -61,6 +61,7 @@ public class IngestionBackwardCompatibilityDockerTest extends 
IngestionSmokeTest
 
     return cluster
         .useContainerFriendlyHostname()
+        .useDefaultTimeoutForLatchableEmitter(60)
         .addResource(containerOverlord)
         .addResource(containerCoordinator)
         .addServer(overlord)
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java
index 25371fae269..40e7a6bc073 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java
@@ -52,6 +52,7 @@ public class IngestionDockerTest extends IngestionSmokeTest 
implements LatestIma
     overlord.addProperty("druid.plaintextPort", "7090");
 
     return cluster
+        .useDefaultTimeoutForLatchableEmitter(60)
         .useContainerFriendlyHostname()
         .addResource(containerOverlord)
         .addResource(containerCoordinator)
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index 5a977b861e8..44c9be9a9c8 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -109,6 +109,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
     cluster.addExtension(KafkaIndexTaskModule.class)
            .addExtension(KafkaEmitterModule.class)
            .addExtension(LatchableEmitterModule.class)
+           .useDefaultTimeoutForLatchableEmitter(60)
            .addCommonProperty("druid.emitter", "composing")
            .addCommonProperty("druid.emitter.composing.emitters", 
"[\"latching\",\"kafka\"]")
            .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
index c44796ca0f2..c49434f66c4 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
@@ -54,6 +54,7 @@ public class KubernetesClusterDockerTest extends 
IngestionSmokeTest implements L
 
     return cluster
         .useContainerFriendlyHostname()
+        .useDefaultTimeoutForLatchableEmitter(60)
         .addResource(k3sCluster)
         .addServer(overlord)
         .addServer(broker)
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
index 748dfd55041..f9a9c10c497 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
@@ -62,6 +62,7 @@ public class KubernetesClusterWithOperatorDockerTest extends 
IngestionSmokeTest
     
     return cluster
         .useContainerFriendlyHostname()
+        .useDefaultTimeoutForLatchableEmitter(60)
         .addResource(k3sCluster)
         .addServer(overlord)
         .addServer(broker)
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
index fd6c819e756..62f168d9ba7 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
@@ -91,6 +91,7 @@ public class EmbeddedDurableShuffleStorageTest extends 
EmbeddedClusterTestBase
     return EmbeddedDruidCluster
         .withEmbeddedDerbyAndZookeeper()
         .useLatchableEmitter()
+        .useDefaultTimeoutForLatchableEmitter(20)
         .addExtensions(S3StorageConnectorModule.class)
         .addResource(storageResource)
         .addResource(msqStorageResource)
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
index 254401d1d6f..063bf2eb169 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
@@ -50,7 +50,6 @@ public class MSQWorkerFaultToleranceTest extends 
EmbeddedClusterTestBase
   private final EmbeddedOverlord overlord = new EmbeddedOverlord();
   private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
   private final EmbeddedIndexer indexer = new EmbeddedIndexer()
-      .addProperty("druid.plaintextPort", "7091")
       .addProperty("druid.worker.capacity", "1");
 
   private EmbeddedMSQApis msqApis;
@@ -93,6 +92,7 @@ public class MSQWorkerFaultToleranceTest extends 
EmbeddedClusterTestBase
 
     // Add a faulty Indexer to the cluster so that worker is launched but 
doesn't finish
     final EmbeddedIndexer faultyIndexer = new EmbeddedIndexer()
+        .addProperty("druid.plaintextPort", "7091")
         .addProperty("druid.unsafe.cluster.testing", "true")
         
.addProperty("druid.unsafe.cluster.testing.overlordClient.taskStatusDelay", 
"PT1H")
         .addProperty("druid.worker.capacity", "1");
@@ -106,6 +106,13 @@ public class MSQWorkerFaultToleranceTest extends 
EmbeddedClusterTestBase
     final String workerTaskId = (String) 
matchingEvent.getUserDims().get(DruidMetrics.TASK_ID);
     Thread.sleep(100);
 
+    // Add a functional Indexer where the worker can be relaunched
+    final EmbeddedIndexer functionalIndexer = new EmbeddedIndexer()
+        .addProperty("druid.plaintextPort", "6091")
+        .addProperty("druid.worker.capacity", "1");
+    cluster.addServer(functionalIndexer);
+    functionalIndexer.start();
+
     // Cancel the worker task and verify that it has failed
     cluster.callApi().onLeaderOverlord(o -> o.cancelTask(workerTaskId));
     overlord.latchableEmitter().waitForEvent(
@@ -115,12 +122,6 @@ public class MSQWorkerFaultToleranceTest extends 
EmbeddedClusterTestBase
     );
     faultyIndexer.stop();
 
-    // Add a functional Indexer so that the worker is relaunched successfully
-    final EmbeddedIndexer functionalIndexer = new EmbeddedIndexer()
-        .addProperty("druid.worker.capacity", "1");
-    cluster.addServer(functionalIndexer);
-    functionalIndexer.start();
-
     // Verify that the controller task eventually succeeds
     cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), 
overlord.latchableEmitter());
     cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
index 1cfcad2896e..8ea58968f81 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java
@@ -24,16 +24,6 @@ import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.msq.dart.guice.DartControllerMemoryManagementModule;
-import org.apache.druid.msq.dart.guice.DartControllerModule;
-import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule;
-import org.apache.druid.msq.dart.guice.DartWorkerModule;
-import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
-import org.apache.druid.msq.guice.MSQDurableStorageModule;
-import org.apache.druid.msq.guice.MSQExternalDataSourceModule;
-import org.apache.druid.msq.guice.MSQIndexingModule;
-import org.apache.druid.msq.guice.MSQSqlModule;
-import org.apache.druid.msq.guice.SqlTaskModule;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.query.DruidProcessingConfigTest;
 import org.apache.druid.sql.calcite.planner.Calcites;
@@ -104,18 +94,7 @@ class QueryVirtualStorageTest extends 
EmbeddedClusterTestBase
     return EmbeddedDruidCluster
         .withEmbeddedDerbyAndZookeeper()
         .useLatchableEmitter()
-        .addExtensions(
-            DartControllerModule.class,
-            DartWorkerModule.class,
-            DartControllerMemoryManagementModule.class,
-            DartWorkerMemoryManagementModule.class,
-            IndexerMemoryManagementModule.class,
-            MSQDurableStorageModule.class,
-            MSQIndexingModule.class,
-            MSQSqlModule.class,
-            SqlTaskModule.class,
-            MSQExternalDataSourceModule.class
-        )
+        .useDefaultTimeoutForLatchableEmitter(20)
         .addResource(storageResource)
         .addCommonProperty("druid.storage.zip", "false")
         .addServer(coordinator)
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java 
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
index bd0b486f6da..247b74d8e57 100644
--- a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
+++ b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java
@@ -62,12 +62,18 @@ public class LatchableEmitter extends StubServiceEmitter
    */
   private final List<Event> processedEvents = new ArrayList<>();
 
+  /**
+   * Default timeout to use while waiting for events.
+   */
+  private final long defaultWaitTimeoutMillis;
+
   /**
    * Creates a {@link StubServiceEmitter} that may be used in embedded tests.
    */
-  public LatchableEmitter(String service, String host)
+  public LatchableEmitter(String service, String host, LatchableEmitterConfig 
config)
   {
     super(service, host);
+    this.defaultWaitTimeoutMillis = config.getDefaultWaitTimeoutMillis();
   }
 
   @Override
@@ -129,7 +135,8 @@ public class LatchableEmitter extends StubServiceEmitter
   }
 
   /**
-   * Wait indefinitely until a metric event that matches the given condition 
is emitted.
+   * Wait until a metric event that matches the given condition is emitted.
+   * Uses the {@link LatchableEmitterConfig#defaultWaitTimeoutMillis}.
    */
   public ServiceMetricEvent waitForEvent(UnaryOperator<EventMatcher> condition)
   {
@@ -137,7 +144,7 @@ public class LatchableEmitter extends StubServiceEmitter
     waitForEvent(
         event -> event instanceof ServiceMetricEvent
                  && matcher.test((ServiceMetricEvent) event),
-        -1
+        defaultWaitTimeoutMillis
     );
     return matcher.matchingEvent.get();
   }
@@ -158,7 +165,7 @@ public class LatchableEmitter extends StubServiceEmitter
         event -> event instanceof ServiceMetricEvent
                  && eventMatcher.test((ServiceMetricEvent) event)
                  && aggregateMatcher.test((ServiceMetricEvent) event),
-        300_000
+        defaultWaitTimeoutMillis
     );
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitterConfig.java
 
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitterConfig.java
new file mode 100644
index 00000000000..9ab358119aa
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/LatchableEmitterConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.metrics;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+
+import javax.annotation.Nullable;
+
+public class LatchableEmitterConfig
+{
+  @JsonProperty
+  private final long defaultWaitTimeoutMillis;
+
+  @JsonCreator
+  public LatchableEmitterConfig(
+      @JsonProperty("defaultWaitTimeoutMillis") @Nullable Long 
defaultWaitTimeoutMillis
+  )
+  {
+    this.defaultWaitTimeoutMillis = 
Configs.valueOrDefault(defaultWaitTimeoutMillis, 10_000);
+  }
+
+  /**
+   * Default time to wait for an event to be emitted. Slow tests can set this
+   * config to a high value to avoid failures.
+   */
+  public long getDefaultWaitTimeoutMillis()
+  {
+    return defaultWaitTimeoutMillis;
+  }
+}
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index ee6885f030b..66178c05895 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -145,6 +145,21 @@ public class EmbeddedDruidCluster implements 
EmbeddedResource
     return this;
   }
 
+  /**
+   * Configures this cluster to use the given default timeout with the
+   * {@link LatchableEmitter}. Slow tests may set a higher value for the 
timeout
+   * to avoid failures. If the cluster does not {@link #useLatchableEmitter()},
+   * this method has no effect. Default timeout is 10 seconds.
+   */
+  public EmbeddedDruidCluster useDefaultTimeoutForLatchableEmitter(long 
timeoutSeconds)
+  {
+    addCommonProperty(
+        "druid.emitter.latching.defaultWaitTimeoutMillis",
+        String.valueOf(timeoutSeconds * 1000)
+    );
+    return this;
+  }
+
   /**
    * Adds an extension to this cluster. The list of extensions is populated in
    * the common property {@code druid.extensions.modulesForEmbeddedTest}.
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
index 71c228a96e5..78603090e5f 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/emitter/LatchableEmitterModule.java
@@ -23,12 +23,14 @@ import com.google.inject.Binder;
 import com.google.inject.Key;
 import com.google.inject.Provides;
 import com.google.inject.name.Names;
+import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.server.metrics.LatchableEmitterConfig;
 
 /**
  * Guice module to use {@link LatchableEmitter}. This module is added to the 
file
@@ -40,6 +42,7 @@ public class LatchableEmitterModule implements DruidModule
   @Override
   public void configure(Binder binder)
   {
+    JsonConfigProvider.bind(binder, "druid.emitter.latching", 
LatchableEmitterConfig.class);
     binder.bind(Key.get(Emitter.class, Names.named(LatchableEmitter.TYPE)))
           .to(LatchableEmitter.class);
   }
@@ -47,9 +50,10 @@ public class LatchableEmitterModule implements DruidModule
   @Provides
   @ManageLifecycle
   public LatchableEmitter makeEmitter(
-      @Self DruidNode selfNode
+      @Self DruidNode selfNode,
+      LatchableEmitterConfig config
   )
   {
-    return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost());
+    return new LatchableEmitter(selfNode.getServiceName(), selfNode.getHost(), 
config);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to