This is an automated email from the ASF dual-hosted git repository.

sanechka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f37b7b5b74 fix: harden ingesting autoscalers around task-count 
boundaries (#19269)
5f37b7b5b74 is described below

commit 5f37b7b5b74c98b341588b95daae21b1f38ca01d
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Apr 15 13:30:05 2026 +0300

    fix: harden ingesting autoscalers around task-count boundaries (#19269)
---
 .../embedded/indexing/KafkaClusterMetricsTest.java |   2 +-
 .../SeekableStreamSupervisorIOConfig.java          |   7 +-
 .../supervisor/autoscaler/CostBasedAutoScaler.java |  38 ++++-
 .../supervisor/autoscaler/LagBasedAutoScaler.java  |  28 +++-
 .../SeekableStreamSupervisorSpecTest.java          |  12 +-
 .../autoscaler/CostBasedAutoScalerMockTest.java    | 161 ++++++++++++++++++---
 .../autoscaler/CostBasedAutoScalerTest.java        |   1 +
 .../autoscaler/LagBasedAutoScalerTest.java         | 121 ++++++++++++++++
 8 files changed, 331 insertions(+), 39 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index 39742a9d578..bc7385e6bf6 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -153,7 +153,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
   }
 
   @Test
-  @Timeout(20)
+  @Timeout(120)
   public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
   {
     final int maxRowsPerSegment = 1000;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 633bd9b70dc..421a885b294 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -88,15 +88,18 @@ public abstract class SeekableStreamSupervisorIOConfig
     this.lagAggregator = lagAggregator;
     // Could be null
     this.autoScalerConfig = autoScalerConfig;
-    this.autoScalerEnabled = autoScalerConfig != null && 
autoScalerConfig.getEnableTaskAutoScaler();
+    boolean isAutoScalerAvailable = autoScalerConfig != null;
+    this.autoScalerEnabled = isAutoScalerAvailable && 
autoScalerConfig.getEnableTaskAutoScaler();
     if (autoScalerEnabled) {
       // Priority: taskCountStart > taskCount > taskCountMin
       this.taskCount = Configs.valueOrDefault(
           autoScalerConfig.getTaskCountStart(),
           Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin())
       );
+    } else if (isAutoScalerAvailable) {
+      this.taskCount = Configs.valueOrDefault(taskCount, 
autoScalerConfig.getTaskCountMin());
     } else {
-      this.taskCount = taskCount != null ? taskCount : 1;
+      this.taskCount = Configs.valueOrDefault(taskCount, 1);
     }
     Preconditions.checkArgument(stopTaskCount == null || stopTaskCount > 0,
                                 "stopTaskCount must be greater than 0");
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index f6527beb8ec..21614c90c8c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -174,11 +174,27 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     lastKnownMetrics = collectMetrics();
 
     final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
-    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+    int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+
+    // Take the current task count but clamp it to the configured boundaries 
if it is outside the boundaries.
+    // There might be a configuration instance with a handwritten taskCount 
that is outside the boundaries.
+    final boolean isTaskCountOutOfBounds = currentTaskCount < 
config.getTaskCountMin()
+                                     || currentTaskCount > 
config.getTaskCountMax();
+    if (isTaskCountOutOfBounds) {
+      currentTaskCount = Math.min(config.getTaskCountMax(), 
Math.max(config.getTaskCountMin(), currentTaskCount));
+    }
 
     // Perform scale-up actions; scale-down actions only if configured.
     final int taskCount;
-    if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
+
+    // If task count is out of bounds, scale to the configured boundary
+    // regardless of optimal task count, to get back to a safe state.
+    if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
+      taskCount = currentTaskCount;
+      lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
+      log.info("Task count for supervisor[%s] was out of bounds [%d,%d], 
urgently scaling from [%d] to [%d].",
+               supervisorId, config.getTaskCountMin(), 
config.getTaskCountMax(), currentTaskCount, currentTaskCount);
+    } else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
       taskCount = optimalTaskCount;
       lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
       log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
up).", supervisorId, currentTaskCount, taskCount);
@@ -192,6 +208,24 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     } else {
       taskCount = -1;
       log.debug("No scaling required for supervisor[%s]", supervisorId);
+
+      // Emit metrics for scaling skip reasons; in case of min == max, 
signaling reaching
+      // max task count has bigger priority for the external observers / 
trackers
+      if (optimalTaskCount >= config.getTaskCountMax() || currentTaskCount == 
config.getTaskCountMax()) {
+        emitter.emit(getMetricBuilder()
+                         .setDimension(
+                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
+                             "Already at max task count"
+                         )
+                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
currentTaskCount));
+      } else if (optimalTaskCount == config.getTaskCountMin() || 
currentTaskCount == config.getTaskCountMin()) {
+        emitter.emit(getMetricBuilder()
+                         .setDimension(
+                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
+                             "Already at min task count"
+                         )
+                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
currentTaskCount));
+      }
     }
     return taskCount;
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index bf35576fbd8..b96a1de7bd4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
@@ -212,7 +213,8 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
    * @param lags the lag metrics of Stream (Kafka/Kinesis)
    * @return Integer, target number of tasksCount. -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
     // if the supervisor is not suspended, ensure required tasks are running
     // if suspended, ensure tasks have been requested to gracefully stop
@@ -239,19 +241,30 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
         withinProportion, spec.getId()
     );
 
-    int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
     int desiredActiveTaskCount;
-    int partitionCount = supervisor.getPartitionCount();
+    final int partitionCount = supervisor.getPartitionCount();
     if (partitionCount <= 0) {
       log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", 
spec.getId());
       return -1;
     }
 
+    final int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
+    final int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
+
+    // Take the current task count but clamp it to the configured boundaries 
if it is outside the boundaries.
+    // There might be a configuration instance with a handwritten taskCount 
that is outside the boundaries.
+    // If that is happening, take the bound and return early.
+    final boolean isTaskCountOutOfBounds = currentActiveTaskCount < 
actualTaskCountMin
+                                           || currentActiveTaskCount > 
actualTaskCountMax;
+    if (isTaskCountOutOfBounds) {
+      currentActiveTaskCount = Math.min(actualTaskCountMax, 
Math.max(actualTaskCountMin, currentActiveTaskCount));
+      return currentActiveTaskCount;
+    }
+
     if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
       // Do Scale out
-      int taskCount = currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
-
-      int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
+      final int taskCount = currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
       if (currentActiveTaskCount == actualTaskCountMax) {
         log.debug(
             "CurrentActiveTaskCount reached task count Max limit, skipping 
scale out action for supervisor[%s].",
@@ -272,8 +285,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
       // Do Scale in
-      int taskCount = currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
-      int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
+      final int taskCount = currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
       if (currentActiveTaskCount == actualTaskCountMin) {
         log.debug(
             "CurrentActiveTaskCount reached task count Min limit[%d], skipping 
scale in action for supervisor[%s].",
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index f0bdac00a8c..7c20855b033 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -434,7 +434,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     Assert.assertEquals(1, taskCountBeforeScaleOut);
     Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
-    Assert.assertEquals(2, taskCountAfterScaleOut);
+    Assert.assertEquals(3, taskCountAfterScaleOut);
     Assert.assertTrue(
         dynamicActionEmitter
             
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
@@ -470,14 +470,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     EasyMock.replay(taskMaster);
 
     StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
-    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10)
-    {
-      @Override
-      public int getActiveTaskGroupsCount()
-      {
-        return 2;
-      }
-    };
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
@@ -488,6 +481,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
         spec,
         dynamicActionEmitter
     );
+    supervisor.getIoConfig().setTaskCount(2);
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 7b46c4c6558..26c4c20f859 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -23,17 +23,22 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
+import java.util.List;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class CostBasedAutoScalerMockTest
@@ -59,7 +64,7 @@ public class CostBasedAutoScalerMockTest
     mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
 
     when(mockSpec.getId()).thenReturn(SUPERVISOR_ID);
-    
when(mockSpec.getDataSources()).thenReturn(java.util.List.of("test-datasource"));
+    when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource"));
     when(mockSpec.isSuspended()).thenReturn(false);
     when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
     when(mockIoConfig.getStream()).thenReturn(STREAM_NAME);
@@ -232,6 +237,60 @@ public class CostBasedAutoScalerMockTest
     );
   }
 
+  @Test
+  public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
+  {
+    CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                       
.taskCountMax(100)
+                                                                       
.taskCountMin(50)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.build();
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+    final int configuredTaskCount = 1;
+    final int taskCountMin = 50;
+
+    // Mock computeOptimalTaskCount to return a value different from the 
boundary,
+    // so the assertion proves the boundary clamping path was taken.
+    doReturn(taskCountMin - 1).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 
0.2);
+
+    final int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should scale to taskCountMin when the configured task count is below 
the minimum boundary",
+        taskCountMin,
+        result
+    );
+  }
+
+  @Test
+  public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
+  {
+    CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                       
.taskCountMax(50)
+                                                                       
.taskCountMin(1)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.build();
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+    final int configuredTaskCount = 100;
+    final int taskCountMax = 50;
+
+    // Mock computeOptimalTaskCount to return a value different from the 
boundary,
+    // so the assertion proves the boundary clamping path was taken.
+    doReturn(taskCountMax + 1).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8);
+
+    final int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should scale to taskCountMax when the configured task count is above 
the maximum boundary",
+        taskCountMax,
+        result
+    );
+  }
+
   @Test
   public void testScaleUpToMaximumTasks()
   {
@@ -357,6 +416,89 @@ public class CostBasedAutoScalerMockTest
     );
   }
 
+  @Test
+  public void testEmitsMaxTaskCountSkipReasonWhenCurrentIsAtMax()
+  {
+    CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                       
.taskCountMax(10)
+                                                                       
.taskCountMin(1)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.build();
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+    final int currentTaskCount = 10; // already at max
+    doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
+
+    Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
+
+    @SuppressWarnings("unchecked")
+    ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+    verify(mockEmitter).emit(captor.capture());
+    Assert.assertEquals(
+        "Should emit 'Already at max task count' skip reason when current task 
count is at maximum",
+        "Already at max task count",
+        ((ServiceMetricEvent.Builder) captor.getValue())
+            
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+    );
+  }
+
+  @Test
+  public void testEmitsMinTaskCountSkipReasonWhenCurrentIsAtMin()
+  {
+    CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                       
.taskCountMax(100)
+                                                                       
.taskCountMin(10)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.build();
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+    final int currentTaskCount = 10; // already at min
+    doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
+
+    Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
+
+    @SuppressWarnings("unchecked")
+    ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+    verify(mockEmitter).emit(captor.capture());
+    Assert.assertEquals(
+        "Should emit 'Already at min task count' skip reason when current task 
count is at minimum",
+        "Already at min task count",
+        ((ServiceMetricEvent.Builder) captor.getValue())
+            
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+    );
+  }
+
+  @Test
+  public void testMaxSkipReasonTakesPriorityWhenMinEqualsMax()
+  {
+    // When min == max, current is simultaneously at both bounds.
+    // The comment in the production code states that signaling max has higher 
priority.
+    CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                       
.taskCountMax(5)
+                                                                       
.taskCountMin(5)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.build();
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+    final int currentTaskCount = 5; // at both min and max
+    doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
+
+    Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
+
+    @SuppressWarnings("unchecked")
+    ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor = 
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+    verify(mockEmitter).emit(captor.capture());
+    Assert.assertEquals(
+        "Max skip reason should take priority over min skip reason when min 
equals max",
+        "Already at max task count",
+        ((ServiceMetricEvent.Builder) captor.getValue())
+            
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+    );
+  }
+
   private void setupMocksForMetricsCollection(
       CostBasedAutoScaler autoScaler,
       int taskCount,
@@ -377,22 +519,7 @@ public class CostBasedAutoScalerMockTest
     SeekableStreamSupervisorIOConfig ioConfig = 
mock(SeekableStreamSupervisorIOConfig.class);
     doReturn(ioConfig).when(mockSupervisor).getIoConfig();
     doReturn(taskCount).when(ioConfig).getTaskCount();
+    doReturn(STREAM_NAME).when(ioConfig).getStream();
   }
 
-  private CostMetrics createMetrics(
-      double avgPartitionLag,
-      int currentTaskCount,
-      int partitionCount,
-      double pollIdleRatio
-  )
-  {
-    return new CostMetrics(
-        avgPartitionLag,
-        currentTaskCount,
-        partitionCount,
-        pollIdleRatio,
-        TASK_DURATION_SECONDS,
-        AVG_PROCESSING_RATE
-    );
-  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 12f52279226..4b5666a85ee 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -495,6 +495,7 @@ public class CostBasedAutoScalerTest
     when(supervisor.getIoConfig()).thenReturn(ioConfig);
     when(ioConfig.getStream()).thenReturn("test-stream");
     when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(1));
+    when(ioConfig.getTaskCount()).thenReturn(1);
     when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100));
     // No task stats means the moving average rate is unavailable
     when(supervisor.getStats()).thenReturn(Collections.emptyMap());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
new file mode 100644
index 00000000000..7f5a0f5a9df
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
+
+public class LagBasedAutoScalerTest
+{
+  private static final int PARTITION_COUNT = 100;
+
+  private SupervisorSpec mockSpec;
+  private SeekableStreamSupervisor mockSupervisor;
+  private SeekableStreamSupervisorIOConfig mockIoConfig;
+  private ServiceEmitter mockEmitter;
+  private LagBasedAutoScalerConfig config;
+
+  @Before
+  public void setUp()
+  {
+    mockSpec = Mockito.mock(SupervisorSpec.class);
+    mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class);
+    mockEmitter = Mockito.mock(ServiceEmitter.class);
+    mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+    when(mockSpec.getId()).thenReturn("test-supervisor");
+    when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource"));
+    when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
+    when(mockIoConfig.getStream()).thenReturn("test-stream");
+
+    config = new LagBasedAutoScalerConfig(
+        30_000L,    // lagCollectionIntervalMillis
+        300_000L,   // lagCollectionRangeMillis
+        300_000L,   // scaleActionStartDelayMillis
+        60_000L,    // scaleActionPeriodMillis
+        2_000_000L,
+        300_000L,
+        0.7,
+        0.9,
+        100,
+        null,       // taskCountStart
+        50,
+        1,          // scaleInStep
+        4,
+        true,       // enableTaskAutoScaler
+        6_000_000L, // minTriggerScaleActionFrequencyMillis
+        null,       // lagAggregate
+        null        // stopTaskCountRatio
+    );
+  }
+
+  /**
+   * Verifies that scale-out uses the configured task count as the baseline.
+   */
+  @Test
+  public void testScaleOutDoesNotReturnCountBelowTaskCountMin()
+  {
+    when(mockIoConfig.getTaskCount()).thenReturn(50);
+    when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
+
+    Assert.assertEquals(54, 
createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
+  }
+
+  @Test
+  public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
+  {
+    when(mockIoConfig.getTaskCount()).thenReturn(1);
+    when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
+
+    Assert.assertEquals(50, 
createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
+  }
+
+  @Test
+  public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
+  {
+    when(mockIoConfig.getTaskCount()).thenReturn(101);
+    when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
+
+    Assert.assertEquals(100, 
createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L)));
+  }
+
+  private LagBasedAutoScaler createAutoScaler()
+  {
+    return new LagBasedAutoScaler(mockSupervisor, config, mockSpec, 
mockEmitter);
+  }
+
+  private List<Long> createLagSamples(long lag)
+  {
+    return new ArrayList<>(Collections.nCopies(11, lag));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to