This is an automated email from the ASF dual-hosted git repository. sanechka pushed a commit to branch lag-based-autoscaler in repository https://gitbox.apache.org/repos/asf/druid.git
commit 746c7608009254ead4f756617834a5fc438f0dac Author: Sasha Syrotenko <[email protected]> AuthorDate: Tue Apr 7 15:27:09 2026 +0300 bug: use taskCount from ioConfig for scale action instead of activeTaskGroups --- .../supervisor/autoscaler/LagBasedAutoScaler.java | 4 +- .../SeekableStreamSupervisorSpecTest.java | 12 +-- .../autoscaler/LagBasedAutoScalerTest.java | 110 +++++++++++++++++++++ 3 files changed, 115 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index bf35576fbd8..61eb055e0d5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -212,7 +212,7 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler * @param lags the lag metrics of Stream (Kafka/Kinesis) * @return Integer, target number of tasksCount. -1 means skip scale action. */ - private int computeDesiredTaskCount(List<Long> lags) + int computeDesiredTaskCount(List<Long> lags) { // if the supervisor is not suspended, ensure required tasks are running // if suspended, ensure tasks have been requested to gracefully stop @@ -239,7 +239,7 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler withinProportion, spec.getId() ); - int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount(); + int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount(); int desiredActiveTaskCount; int partitionCount = supervisor.getPartitionCount(); if (partitionCount <= 0) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index f0bdac00a8c..7c20855b033 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -434,7 +434,7 @@ public class SeekableStreamSupervisorSpecTest extends SeekableStreamSupervisorTe Assert.assertEquals(1, taskCountBeforeScaleOut); Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); - Assert.assertEquals(2, taskCountAfterScaleOut); + Assert.assertEquals(3, taskCountAfterScaleOut); Assert.assertTrue( dynamicActionEmitter .getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC) @@ -470,14 +470,7 @@ public class SeekableStreamSupervisorSpecTest extends SeekableStreamSupervisorTe EasyMock.replay(taskMaster); StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter(); - TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10) - { - @Override - public int getActiveTaskGroupsCount() - { - return 2; - } - }; + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( supervisor, @@ -488,6 +481,7 @@ public class SeekableStreamSupervisorSpecTest extends SeekableStreamSupervisorTe spec, dynamicActionEmitter ); + supervisor.getIoConfig().setTaskCount(2); supervisor.start(); autoScaler.start(); supervisor.runInternal(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java new file mode 100644 index 00000000000..1352843943c --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.when; + + +public class LagBasedAutoScalerTest +{ + private static final int ACTIVE_TASK_GROUP_AMOUNT = 25; + private static final int PARTITION_COUNT = 100; + private static final int TASK_COUNT_MIN = 50; + private static final int TASK_COUNT_MAX = 100; + private static final int SCALE_OUT_STEP = 4; + private static final long SCALE_OUT_THRESHOLD = 2_000_000L; + private static final long SCALE_IN_THRESHOLD = 300_000L; + private static final double TRIGGER_SCALE_OUT_FRACTION = 0.7; + private static final double TRIGGER_SCALE_IN_FRACTION = 0.9; + + private SupervisorSpec mockSpec; + private SeekableStreamSupervisor mockSupervisor; + private SeekableStreamSupervisorIOConfig mockIoConfig; + private ServiceEmitter mockEmitter; + private LagBasedAutoScalerConfig config; + + @Before + public void setUp() + { + mockSpec = Mockito.mock(SupervisorSpec.class); + mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class); + mockEmitter = Mockito.mock(ServiceEmitter.class); + mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class); + + when(mockSpec.getId()).thenReturn("test-supervisor"); + when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource")); + when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig); + when(mockIoConfig.getStream()).thenReturn("test-stream"); + + config = new LagBasedAutoScalerConfig( + 30_000L, // lagCollectionIntervalMillis + 300_000L, // lagCollectionRangeMillis + 300_000L, // scaleActionStartDelayMillis + 60_000L, // scaleActionPeriodMillis + SCALE_OUT_THRESHOLD, + SCALE_IN_THRESHOLD, + TRIGGER_SCALE_OUT_FRACTION, + TRIGGER_SCALE_IN_FRACTION, + TASK_COUNT_MAX, + null, // taskCountStart + TASK_COUNT_MIN, + 1, // scaleInStep + SCALE_OUT_STEP, + true, // enableTaskAutoScaler + 6_000_000L, // minTriggerScaleActionFrequencyMillis + null, // lagAggregate + null // stopTaskCountRatio + ); + } + + /** + * Reproduces the bug where scale-out from a low activelyReadingTaskGroups count + * yields a desiredTaskCount below taskCountMin. + */ + @Test + public void testScaleOutDoesNotReturnCountBelowTaskCountMin() + { + when(mockIoConfig.getTaskCount()).thenReturn(TASK_COUNT_MIN); + when(mockSupervisor.getActiveTaskGroupsCount()).thenReturn(ACTIVE_TASK_GROUP_AMOUNT); + when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); + + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter); + List<Long> highLagSamples = Collections.nCopies(11, SCALE_OUT_THRESHOLD + 1); + + int result = autoScaler.computeDesiredTaskCount(new ArrayList<>(highLagSamples)); + + // Bug: old code used getActiveTaskGroupsCount()=25 as baseline → 25+4=29 < taskCountMin(50) + // Fix: uses ioConfig.getTaskCount()=50 as baseline → 50+4=54 >= taskCountMin(50) + Assert.assertEquals(TASK_COUNT_MIN + SCALE_OUT_STEP, result); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
