This is an automated email from the ASF dual-hosted git repository.
sanechka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5f37b7b5b74 fix: harden ingesting autoscalers around task-count
boundaries (#19269)
5f37b7b5b74 is described below
commit 5f37b7b5b74c98b341588b95daae21b1f38ca01d
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Apr 15 13:30:05 2026 +0300
fix: harden ingesting autoscalers around task-count boundaries (#19269)
---
.../embedded/indexing/KafkaClusterMetricsTest.java | 2 +-
.../SeekableStreamSupervisorIOConfig.java | 7 +-
.../supervisor/autoscaler/CostBasedAutoScaler.java | 38 ++++-
.../supervisor/autoscaler/LagBasedAutoScaler.java | 28 +++-
.../SeekableStreamSupervisorSpecTest.java | 12 +-
.../autoscaler/CostBasedAutoScalerMockTest.java | 161 ++++++++++++++++++---
.../autoscaler/CostBasedAutoScalerTest.java | 1 +
.../autoscaler/LagBasedAutoScalerTest.java | 121 ++++++++++++++++
8 files changed, 331 insertions(+), 39 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index 39742a9d578..bc7385e6bf6 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -153,7 +153,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
}
@Test
- @Timeout(20)
+ @Timeout(120)
public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
{
final int maxRowsPerSegment = 1000;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 633bd9b70dc..421a885b294 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -88,15 +88,18 @@ public abstract class SeekableStreamSupervisorIOConfig
this.lagAggregator = lagAggregator;
// Could be null
this.autoScalerConfig = autoScalerConfig;
- this.autoScalerEnabled = autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoScaler();
+ boolean isAutoScalerAvailable = autoScalerConfig != null;
+ this.autoScalerEnabled = isAutoScalerAvailable &&
autoScalerConfig.getEnableTaskAutoScaler();
if (autoScalerEnabled) {
// Priority: taskCountStart > taskCount > taskCountMin
this.taskCount = Configs.valueOrDefault(
autoScalerConfig.getTaskCountStart(),
Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin())
);
+ } else if (isAutoScalerAvailable) {
+ this.taskCount = Configs.valueOrDefault(taskCount,
autoScalerConfig.getTaskCountMin());
} else {
- this.taskCount = taskCount != null ? taskCount : 1;
+ this.taskCount = Configs.valueOrDefault(taskCount, 1);
}
Preconditions.checkArgument(stopTaskCount == null || stopTaskCount > 0,
"stopTaskCount must be greater than 0");
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index f6527beb8ec..21614c90c8c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -174,11 +174,27 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
lastKnownMetrics = collectMetrics();
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
- final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+ int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+
+ // Take the current task count but clamp it to the configured boundaries
if it is outside the boundaries.
+ // There might be a configuration instance with a handwritten taskCount
that is outside the boundaries.
+ final boolean isTaskCountOutOfBounds = currentTaskCount <
config.getTaskCountMin()
+ || currentTaskCount >
config.getTaskCountMax();
+ if (isTaskCountOutOfBounds) {
+ currentTaskCount = Math.min(config.getTaskCountMax(),
Math.max(config.getTaskCountMin(), currentTaskCount));
+ }
// Perform scale-up actions; scale-down actions only if configured.
final int taskCount;
- if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
+
+ // If task count is out of bounds, scale to the configured boundary
+ // regardless of optimal task count, to get back to a safe state.
+ if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
+ taskCount = currentTaskCount;
+ lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
+ log.info("Task count for supervisor[%s] was out of bounds [%d,%d],
urgently scaling from [%d] to [%d].",
+ supervisorId, config.getTaskCountMin(),
config.getTaskCountMax(), currentTaskCount, currentTaskCount);
+ } else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
up).", supervisorId, currentTaskCount, taskCount);
@@ -192,6 +208,24 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
} else {
taskCount = -1;
log.debug("No scaling required for supervisor[%s]", supervisorId);
+
+ // Emit metrics for scaling skip reasons; in case of min == max,
signaling reaching
+ // max task count has bigger priority for the external observers /
trackers
+ if (optimalTaskCount >= config.getTaskCountMax() || currentTaskCount ==
config.getTaskCountMax()) {
+ emitter.emit(getMetricBuilder()
+ .setDimension(
+
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
+ "Already at max task count"
+ )
+
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC,
currentTaskCount));
+ } else if (optimalTaskCount == config.getTaskCountMin() ||
currentTaskCount == config.getTaskCountMin()) {
+ emitter.emit(getMetricBuilder()
+ .setDimension(
+
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
+ "Already at min task count"
+ )
+
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC,
currentTaskCount));
+ }
}
return taskCount;
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index bf35576fbd8..b96a1de7bd4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
@@ -212,7 +213,8 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
* @param lags the lag metrics of Stream (Kafka/Kinesis)
* @return Integer, target number of tasksCount. -1 means skip scale action.
*/
- private int computeDesiredTaskCount(List<Long> lags)
+ @VisibleForTesting
+ int computeDesiredTaskCount(List<Long> lags)
{
// if the supervisor is not suspended, ensure required tasks are running
// if suspended, ensure tasks have been requested to gracefully stop
@@ -239,19 +241,30 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
withinProportion, spec.getId()
);
- int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+ int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount();
int desiredActiveTaskCount;
- int partitionCount = supervisor.getPartitionCount();
+ final int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?",
spec.getId());
return -1;
}
+ final int actualTaskCountMax =
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
+ final int actualTaskCountMin =
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
+
+ // Take the current task count but clamp it to the configured boundaries
if it is outside the boundaries.
+ // There might be a configuration instance with a handwritten taskCount
that is outside the boundaries.
+ // If that is happening, take the bound and return early.
+ final boolean isTaskCountOutOfBounds = currentActiveTaskCount <
actualTaskCountMin
+ || currentActiveTaskCount >
actualTaskCountMax;
+ if (isTaskCountOutOfBounds) {
+ currentActiveTaskCount = Math.min(actualTaskCountMax,
Math.max(actualTaskCountMin, currentActiveTaskCount));
+ return currentActiveTaskCount;
+ }
+
if (beyondProportion >=
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
// Do Scale out
- int taskCount = currentActiveTaskCount +
lagBasedAutoScalerConfig.getScaleOutStep();
-
- int actualTaskCountMax =
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
+ final int taskCount = currentActiveTaskCount +
lagBasedAutoScalerConfig.getScaleOutStep();
if (currentActiveTaskCount == actualTaskCountMax) {
log.debug(
"CurrentActiveTaskCount reached task count Max limit, skipping
scale out action for supervisor[%s].",
@@ -272,8 +285,7 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
if (withinProportion >=
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
// Do Scale in
- int taskCount = currentActiveTaskCount -
lagBasedAutoScalerConfig.getScaleInStep();
- int actualTaskCountMin =
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
+ final int taskCount = currentActiveTaskCount -
lagBasedAutoScalerConfig.getScaleInStep();
if (currentActiveTaskCount == actualTaskCountMin) {
log.debug(
"CurrentActiveTaskCount reached task count Min limit[%d], skipping
scale in action for supervisor[%s].",
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index f0bdac00a8c..7c20855b033 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -434,7 +434,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
- Assert.assertEquals(2, taskCountAfterScaleOut);
+ Assert.assertEquals(3, taskCountAfterScaleOut);
Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
@@ -470,14 +470,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
EasyMock.replay(taskMaster);
StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
- TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10)
- {
- @Override
- public int getActiveTaskGroupsCount()
- {
- return 2;
- }
- };
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
@@ -488,6 +481,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
spec,
dynamicActionEmitter
);
+ supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 7b46c4c6558..26c4c20f859 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -23,17 +23,22 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class CostBasedAutoScalerMockTest
@@ -59,7 +64,7 @@ public class CostBasedAutoScalerMockTest
mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
when(mockSpec.getId()).thenReturn(SUPERVISOR_ID);
-
when(mockSpec.getDataSources()).thenReturn(java.util.List.of("test-datasource"));
+ when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource"));
when(mockSpec.isSuspended()).thenReturn(false);
when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
when(mockIoConfig.getStream()).thenReturn(STREAM_NAME);
@@ -232,6 +237,60 @@ public class CostBasedAutoScalerMockTest
);
}
+ @Test
+ public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
+ {
+ CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(50)
+
.enableTaskAutoScaler(true)
+
.build();
+ CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+ final int configuredTaskCount = 1;
+ final int taskCountMin = 50;
+
+ // Mock computeOptimalTaskCount to return a value different from the
boundary,
+ // so the assertion proves the boundary clamping path was taken.
+ doReturn(taskCountMin - 1).when(autoScaler).computeOptimalTaskCount(any());
+ setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0,
0.2);
+
+ final int result = autoScaler.computeTaskCountForScaleAction();
+
+ Assert.assertEquals(
+ "Should scale to taskCountMin when the configured task count is below
the minimum boundary",
+ taskCountMin,
+ result
+ );
+ }
+
+ @Test
+ public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
+ {
+ CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(50)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.build();
+ CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+ final int configuredTaskCount = 100;
+ final int taskCountMax = 50;
+
+ // Mock computeOptimalTaskCount to return a value different from the
boundary,
+ // so the assertion proves the boundary clamping path was taken.
+ doReturn(taskCountMax + 1).when(autoScaler).computeOptimalTaskCount(any());
+ setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8);
+
+ final int result = autoScaler.computeTaskCountForScaleAction();
+
+ Assert.assertEquals(
+ "Should scale to taskCountMax when the configured task count is above
the maximum boundary",
+ taskCountMax,
+ result
+ );
+ }
+
@Test
public void testScaleUpToMaximumTasks()
{
@@ -357,6 +416,89 @@ public class CostBasedAutoScalerMockTest
);
}
+ @Test
+ public void testEmitsMaxTaskCountSkipReasonWhenCurrentIsAtMax()
+ {
+ CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(10)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.build();
+ CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+ final int currentTaskCount = 10; // already at max
+ doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
+
+ Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+ verify(mockEmitter).emit(captor.capture());
+ Assert.assertEquals(
+ "Should emit 'Already at max task count' skip reason when current task
count is at maximum",
+ "Already at max task count",
+ ((ServiceMetricEvent.Builder) captor.getValue())
+
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+ );
+ }
+
+ @Test
+ public void testEmitsMinTaskCountSkipReasonWhenCurrentIsAtMin()
+ {
+ CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(10)
+
.enableTaskAutoScaler(true)
+
.build();
+ CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+ final int currentTaskCount = 10; // already at min
+ doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
+
+ Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+ verify(mockEmitter).emit(captor.capture());
+ Assert.assertEquals(
+ "Should emit 'Already at min task count' skip reason when current task
count is at minimum",
+ "Already at min task count",
+ ((ServiceMetricEvent.Builder) captor.getValue())
+
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+ );
+ }
+
+ @Test
+ public void testMaxSkipReasonTakesPriorityWhenMinEqualsMax()
+ {
+ // When min == max, current is simultaneously at both bounds.
+ // The comment in the production code states that signaling max has higher
priority.
+ CostBasedAutoScalerConfig boundedConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(5)
+
.taskCountMin(5)
+
.enableTaskAutoScaler(true)
+
.build();
+ CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+ final int currentTaskCount = 5; // at both min and max
+ doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
+ setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5);
+
+ Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction());
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor<ServiceEventBuilder<ServiceMetricEvent>> captor =
ArgumentCaptor.forClass(ServiceEventBuilder.class);
+ verify(mockEmitter).emit(captor.capture());
+ Assert.assertEquals(
+ "Max skip reason should take priority over min skip reason when min
equals max",
+ "Already at max task count",
+ ((ServiceMetricEvent.Builder) captor.getValue())
+
.getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+ );
+ }
+
private void setupMocksForMetricsCollection(
CostBasedAutoScaler autoScaler,
int taskCount,
@@ -377,22 +519,7 @@ public class CostBasedAutoScalerMockTest
SeekableStreamSupervisorIOConfig ioConfig =
mock(SeekableStreamSupervisorIOConfig.class);
doReturn(ioConfig).when(mockSupervisor).getIoConfig();
doReturn(taskCount).when(ioConfig).getTaskCount();
+ doReturn(STREAM_NAME).when(ioConfig).getStream();
}
- private CostMetrics createMetrics(
- double avgPartitionLag,
- int currentTaskCount,
- int partitionCount,
- double pollIdleRatio
- )
- {
- return new CostMetrics(
- avgPartitionLag,
- currentTaskCount,
- partitionCount,
- pollIdleRatio,
- TASK_DURATION_SECONDS,
- AVG_PROCESSING_RATE
- );
- }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 12f52279226..4b5666a85ee 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -495,6 +495,7 @@ public class CostBasedAutoScalerTest
when(supervisor.getIoConfig()).thenReturn(ioConfig);
when(ioConfig.getStream()).thenReturn("test-stream");
when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(1));
+ when(ioConfig.getTaskCount()).thenReturn(1);
when(supervisor.computeLagStats()).thenReturn(new LagStats(100, 100, 100));
// No task stats means the moving average rate is unavailable
when(supervisor.getStats()).thenReturn(Collections.emptyMap());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
new file mode 100644
index 00000000000..7f5a0f5a9df
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
+
+public class LagBasedAutoScalerTest
+{
+ private static final int PARTITION_COUNT = 100;
+
+ private SupervisorSpec mockSpec;
+ private SeekableStreamSupervisor mockSupervisor;
+ private SeekableStreamSupervisorIOConfig mockIoConfig;
+ private ServiceEmitter mockEmitter;
+ private LagBasedAutoScalerConfig config;
+
+ @Before
+ public void setUp()
+ {
+ mockSpec = Mockito.mock(SupervisorSpec.class);
+ mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class);
+ mockEmitter = Mockito.mock(ServiceEmitter.class);
+ mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(mockSpec.getId()).thenReturn("test-supervisor");
+ when(mockSpec.getDataSources()).thenReturn(List.of("test-datasource"));
+ when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
+ when(mockIoConfig.getStream()).thenReturn("test-stream");
+
+ config = new LagBasedAutoScalerConfig(
+ 30_000L, // lagCollectionIntervalMillis
+ 300_000L, // lagCollectionRangeMillis
+ 300_000L, // scaleActionStartDelayMillis
+ 60_000L, // scaleActionPeriodMillis
+ 2_000_000L,
+ 300_000L,
+ 0.7,
+ 0.9,
+ 100,
+ null, // taskCountStart
+ 50,
+ 1, // scaleInStep
+ 4,
+ true, // enableTaskAutoScaler
+ 6_000_000L, // minTriggerScaleActionFrequencyMillis
+ null, // lagAggregate
+ null // stopTaskCountRatio
+ );
+ }
+
+ /**
+ * Verifies that scale-out uses the configured task count as the baseline.
+ */
+ @Test
+ public void testScaleOutDoesNotReturnCountBelowTaskCountMin()
+ {
+ when(mockIoConfig.getTaskCount()).thenReturn(50);
+ when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
+
+ Assert.assertEquals(54,
createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
+ }
+
+ @Test
+ public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
+ {
+ when(mockIoConfig.getTaskCount()).thenReturn(1);
+ when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
+
+ Assert.assertEquals(50,
createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L)));
+ }
+
+ @Test
+ public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax()
+ {
+ when(mockIoConfig.getTaskCount()).thenReturn(101);
+ when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
+
+ Assert.assertEquals(100,
createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L)));
+ }
+
+ private LagBasedAutoScaler createAutoScaler()
+ {
+ return new LagBasedAutoScaler(mockSupervisor, config, mockSpec,
mockEmitter);
+ }
+
+ private List<Long> createLagSamples(long lag)
+ {
+ return new ArrayList<>(Collections.nCopies(11, lag));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]