This is an automated email from the ASF dual-hosted git repository.

Fly-Style pushed a commit to branch feat/poll-idle-ratio-replacement
in repository https://gitbox.apache.org/repos/asf/druid.git

commit c16aee0c797b3042585ea9870c4178f623dca8c7
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Jun 24 16:12:05 2026 +0300

    Cleanup
---
 .../autoscaler/CostBasedAutoScalerIntegrationTest.java  | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index 0c72d07d8e6..af79388f30a 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -38,6 +38,7 @@ import org.hamcrest.Matchers;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.Duration;
 import org.joda.time.Period;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -46,6 +47,7 @@ import org.junit.jupiter.api.Timeout;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.OPTIMAL_TASK_COUNT_METRIC;
@@ -62,6 +64,7 @@ public class CostBasedAutoScalerIntegrationTest extends 
StreamIndexTestBase
 
   private String topic;
   private final KafkaResource kafkaServer = new KafkaResource();
+  private ExecutorService backgroundPublishExecutor;
 
   @Override
   protected StreamIngestResource<?> getStreamIngestResource()
@@ -84,6 +87,14 @@ public class CostBasedAutoScalerIntegrationTest extends 
StreamIndexTestBase
     kafkaServer.createTopicWithPartitions(topic, PARTITION_COUNT);
   }
 
+  @AfterEach
+  public void shutdownBackgroundPublishExecutor()
+  {
+    if (backgroundPublishExecutor != null) {
+      backgroundPublishExecutor.shutdownNow();
+    }
+  }
+
   @Test
   @Timeout(45)
   public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
@@ -137,7 +148,8 @@ public class CostBasedAutoScalerIntegrationTest extends 
StreamIndexTestBase
 
     // Produce additional records to create a backlog / lag
     // This ensures tasks are busy processing (low idle ratio)
-    Executors.newSingleThreadExecutor().submit(() -> {
+    backgroundPublishExecutor = Executors.newSingleThreadExecutor();
+    backgroundPublishExecutor.submit(() -> {
       for (int i = 0; i < 500; ++i) {
         publish1kRecords(topic, true);
       }
@@ -186,7 +198,8 @@ public class CostBasedAutoScalerIntegrationTest extends 
StreamIndexTestBase
 
     final int lowInitialTaskCount = 1;
     // This ensures tasks are busy processing (low idle ratio)
-    Executors.newSingleThreadExecutor().submit(() -> {
+    backgroundPublishExecutor = Executors.newSingleThreadExecutor();
+    backgroundPublishExecutor.submit(() -> {
       for (int i = 0; i < 500; ++i) {
         publish1kRecords(topic, true);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to