This is an automated email from the ASF dual-hosted git repository.

sanechka pushed a commit to branch lag-based-autoscaler
in repository https://gitbox.apache.org/repos/asf/druid.git

commit 746c7608009254ead4f756617834a5fc438f0dac
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Tue Apr 7 15:27:09 2026 +0300

    bug: use taskCount from ioConfig for scale action instead of 
activeTaskGroups
---
 .../supervisor/autoscaler/LagBasedAutoScaler.java  |   4 +-
 .../SeekableStreamSupervisorSpecTest.java          |  12 +--
 .../autoscaler/LagBasedAutoScalerTest.java         | 110 +++++++++++++++++++++
 3 files changed, 115 insertions(+), 11 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index bf35576fbd8..61eb055e0d5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -212,7 +212,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
    * @param lags the lag metrics of Stream (Kafka/Kinesis)
    * @return Integer, target number of tasksCount. -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  int computeDesiredTaskCount(List<Long> lags)
   {
     // if the supervisor is not suspended, ensure required tasks are running
     // if suspended, ensure tasks have been requested to gracefully stop
@@ -239,7 +239,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
         withinProportion, spec.getId()
     );
 
-    int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
     int desiredActiveTaskCount;
     int partitionCount = supervisor.getPartitionCount();
     if (partitionCount <= 0) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index f0bdac00a8c..7c20855b033 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -434,7 +434,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     Assert.assertEquals(1, taskCountBeforeScaleOut);
     Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
-    Assert.assertEquals(2, taskCountAfterScaleOut);
+    Assert.assertEquals(3, taskCountAfterScaleOut);
     Assert.assertTrue(
         dynamicActionEmitter
             
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
@@ -470,14 +470,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
     EasyMock.replay(taskMaster);
 
     StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
-    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10)
-    {
-      @Override
-      public int getActiveTaskGroupsCount()
-      {
-        return 2;
-      }
-    };
+    TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor(10);
 
     LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
         supervisor,
@@ -488,6 +481,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
         spec,
         dynamicActionEmitter
     );
+    supervisor.getIoConfig().setTaskCount(2);
     supervisor.start();
     autoScaler.start();
     supervisor.runInternal();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
new file mode 100644
index 00000000000..1352843943c
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
+
+public class LagBasedAutoScalerTest
+{
+  private static final int ACTIVE_TASK_GROUP_AMOUNT = 25;
+  private static final int PARTITION_COUNT = 100;
+  private static final int TASK_COUNT_MIN = 50;
+  private static final int TASK_COUNT_MAX = 100;
+  private static final int SCALE_OUT_STEP = 4;
+  private static final long SCALE_OUT_THRESHOLD = 2_000_000L;
+  private static final long SCALE_IN_THRESHOLD = 300_000L;
+  private static final double TRIGGER_SCALE_OUT_FRACTION = 0.7;
+  private static final double TRIGGER_SCALE_IN_FRACTION = 0.9;
+
+  private SupervisorSpec mockSpec;
+  private SeekableStreamSupervisor mockSupervisor;
+  private SeekableStreamSupervisorIOConfig mockIoConfig;
+  private ServiceEmitter mockEmitter;
+  private LagBasedAutoScalerConfig config;
+
+  @Before
+  public void setUp()
+  {
+    mockSpec = Mockito.mock(SupervisorSpec.class);
+    mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class);
+    mockEmitter = Mockito.mock(ServiceEmitter.class);
+    mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+    when(mockSpec.getId()).thenReturn("test-supervisor");
+    when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource"));
+    when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
+    when(mockIoConfig.getStream()).thenReturn("test-stream");
+
+    config = new LagBasedAutoScalerConfig(
+        30_000L,   // lagCollectionIntervalMillis
+        300_000L,  // lagCollectionRangeMillis
+        300_000L,  // scaleActionStartDelayMillis
+        60_000L,   // scaleActionPeriodMillis
+        SCALE_OUT_THRESHOLD,
+        SCALE_IN_THRESHOLD,
+        TRIGGER_SCALE_OUT_FRACTION,
+        TRIGGER_SCALE_IN_FRACTION,
+        TASK_COUNT_MAX,
+        null,      // taskCountStart
+        TASK_COUNT_MIN,
+        1,         // scaleInStep
+        SCALE_OUT_STEP,
+        true,      // enableTaskAutoScaler
+        6_000_000L, // minTriggerScaleActionFrequencyMillis
+        null,      // lagAggregate
+        null       // stopTaskCountRatio
+    );
+  }
+
+  /**
+   * Reproduces the bug where scale-out from a low activelyReadingTaskGroups 
count
+   * yields a desiredTaskCount below taskCountMin.
+   */
+  @Test
+  public void testScaleOutDoesNotReturnCountBelowTaskCountMin()
+  {
+    when(mockIoConfig.getTaskCount()).thenReturn(TASK_COUNT_MIN);
+    
when(mockSupervisor.getActiveTaskGroupsCount()).thenReturn(ACTIVE_TASK_GROUP_AMOUNT);
+    when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
+
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(mockSupervisor, 
config, mockSpec, mockEmitter);
+    List<Long> highLagSamples = Collections.nCopies(11, SCALE_OUT_THRESHOLD + 
1);
+
+    int result = autoScaler.computeDesiredTaskCount(new 
ArrayList<>(highLagSamples));
+
+    // Bug: old code used getActiveTaskGroupsCount()=25 as baseline → 25+4=29 
< taskCountMin(50)
+    // Fix: uses ioConfig.getTaskCount()=50 as baseline → 50+4=54 >= 
taskCountMin(50)
+    Assert.assertEquals(TASK_COUNT_MIN + SCALE_OUT_STEP, result);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to