This is an automated email from the ASF dual-hosted git repository.
jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 58693edb82b feat: support scaling direction-aware cooldown for task
auto-scalers (#19286)
58693edb82b is described below
commit 58693edb82bd16467ec87d893414a81dd2fa56c5
Author: jtuglu1 <[email protected]>
AuthorDate: Thu Apr 23 10:41:25 2026 -0700
feat: support scaling direction-aware cooldown for task auto-scalers
(#19286)
Adds support for configuring different cooldowns for scaling direction.
While both scaling actions do cause temporary disruption to ingestion, scaling
down can cause more disruption than scaling up due to having less resources
than when you started to recover from lag. Therefore, to allow for aggressive
scale up while having a more conservative scale-down approach, this adds
configuration for cool down period for both directions. Cool down for a
specific scaling direction is evaluated [...]
1. minScaleUpDelay/minScaleDownDelay
2. minTriggerScaleActionFrequencyMillis (marked as deprecated)
3. The previously scaler-specific configured defaults for scale up/down
---
docs/ingestion/kafka-ingestion.md | 3 +-
docs/ingestion/supervisor.md | 17 +-
.../supervisor/KafkaSupervisorIOConfigTest.java | 2 +
.../supervisor/SeekableStreamSupervisor.java | 103 ++++++--
.../supervisor/autoscaler/AutoScalerConfig.java | 22 ++
.../supervisor/autoscaler/CostBasedAutoScaler.java | 29 +--
.../autoscaler/CostBasedAutoScalerConfig.java | 36 ++-
.../autoscaler/LagBasedAutoScalerConfig.java | 39 ++-
.../autoscaler/LagBasedAutoScalerConfigTest.java | 274 ++++++++++++++++++++-
.../SeekableStreamSupervisorSpecTest.java | 2 +-
.../SeekableStreamSupervisorStateTest.java | 267 ++++++++++++++++++++
.../autoscaler/CostBasedAutoScalerConfigTest.java | 157 +++++++++++-
.../autoscaler/CostBasedAutoScalerMockTest.java | 64 +----
.../autoscaler/LagBasedAutoScalerTest.java | 2 +
.../supervisor/autoscaler/ScaleDirection.java | 38 +++
website/.spelling | 5 +
16 files changed, 918 insertions(+), 142 deletions(-)
diff --git a/docs/ingestion/kafka-ingestion.md
b/docs/ingestion/kafka-ingestion.md
index 2326b59d99d..5216a84b4e4 100644
--- a/docs/ingestion/kafka-ingestion.md
+++ b/docs/ingestion/kafka-ingestion.md
@@ -235,7 +235,8 @@ The following example shows a supervisor spec with idle
configuration enabled:
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
- "minTriggerScaleActionFrequencyMillis": 600000,
+ "minScaleUpDelay": "PT10M",
+ "minScaleDownDelay": "PT10M",
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 592f0328cbe..ba92ed66b53 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -79,7 +79,9 @@ The following table outlines the configuration properties for
`autoScalerConfig`
|`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or
equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka
partitions or Kinesis shards, Druid sets the maximum number of reading tasks to
the number of Kafka partitions or Kinesis shards and ignores
`taskCountMax`.|Yes||
|`taskCountMin`|The minimum number of ingestion tasks. When you enable the
autoscaler, Druid computes the initial number of tasks to launch by checking
the configs in the following order: `taskCountStart`, then `taskCount` (in
`ioConfig`), then `taskCountMin`.|Yes||
|`taskCountStart`|Optional config to specify the number of ingestion tasks to
start with. When you enable the autoscaler, Druid computes the initial number
of tasks to launch by checking the configs in the following order:
`taskCountStart`, then `taskCount` (in `ioConfig`), then
`taskCountMin`.|No|`taskCount` or `taskCountMin`|
-|`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two
scale actions.| No|600000|
+|`minScaleUpDelay`|Minimum cooldown duration between scale-up actions,
specified as an ISO-8601 duration string. Falls back to
`minTriggerScaleActionFrequencyMillis` if not set.|No||
+|`minScaleDownDelay`|Minimum cooldown duration between scale-down actions,
specified as an ISO-8601 duration string. Falls back to
`minTriggerScaleActionFrequencyMillis` if not set.|No||
+|`minTriggerScaleActionFrequencyMillis`|**Deprecated.** Use `minScaleUpDelay`
and `minScaleDownDelay` instead. Minimum time interval in milliseconds between
scale actions, used as the fallback when the Duration-based fields are not
set.|No|600000|
|`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more
information.|No|`lagBased`|
|`stopTaskCountRatio`|A variable version of `ioConfig.stopTaskCount` with a
valid range of (0.0, 1.0]. Allows the maximum number of stoppable tasks in
steady state to be proportional to the number of tasks currently running.|No||
@@ -161,7 +163,8 @@ The following example shows a supervisor spec with
`lagBased` autoscaler:
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
- "minTriggerScaleActionFrequencyMillis": 600000,
+ "minScaleUpDelay": "PT10M",
+ "minScaleDownDelay": "PT10M",
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
@@ -210,10 +213,11 @@ The following table outlines the configuration properties
related to the `costBa
|`idleWeight`|The weight of extracted poll idle value in cost function. | No |
0.75 |
|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when
selecting task counts.|No| `false` |
|`highLagThreshold`|Average partition lag threshold that triggers burst
scale-up when set to a value greater than `0`. Set to a negative value to
disable burst scale-up.|No|-1|
-|`minScaleDownDelay`|Minimum duration between successful scale actions,
specified as an ISO-8601 duration string.|No|`PT30M`|
+|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before
the next scale-up is allowed, specified as an ISO-8601 duration string.|No||
+|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action
before the next scale-down is allowed, specified as an ISO-8601 duration
string.|No|`PT30M`|
|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is
limited to periods during task rollovers only.|No|`false`|
-The following example shows a supervisor spec with `lagBased` autoscaler:
+The following example shows a supervisor spec with `costBased` autoscaler:
<details>
<summary>Click to view the example</summary>
@@ -227,9 +231,10 @@ The following example shows a supervisor spec with
`lagBased` autoscaler:
"autoScalerStrategy": "costBased",
"taskCountMin": 1,
"taskCountMax": 10,
- "minTriggerScaleActionFrequencyMillis": 600000,
+ "minScaleUpDelay": "PT10M",
+ "minScaleDownDelay": "PT30M",
"lagWeight": 0.1,
- "idleWeight": 0.9,
+ "idleWeight": 0.9
}
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 6295d41937e..289d3c989f3 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -315,6 +315,8 @@ public class KafkaSupervisorIOConfigTest
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+ autoScalerConfig.put("minScaleUpDelay", "PT20M");
+ autoScalerConfig.put("minScaleDownDelay", "PT20M");
final Map<String, Object> consumerProperties =
KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("bootstrap.servers", "localhost:8082");
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 58d366d2306..f857cfaa436 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -64,6 +64,7 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScaleDirection;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -499,11 +500,34 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
supervisorId,
dataSource
);
- final Integer desiredTaskCount = computeDesiredTaskCount.call();
+ final int desiredTaskCount = computeDesiredTaskCount.call();
+ final int currentTaskCount = getCurrentTaskCount();
+
+ if (desiredTaskCount <= 0) {
+ return;
+ }
+
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
+
+ // 1) This should already be handled by the auto-scaler
implementation, but make sure we catch/record these for auditability
+ if (desiredTaskCount == currentTaskCount) {
+ log.warn(
+ "Skipping scaling for supervisor[%s] for dataSource[%s]:
already at desired task count [%d]",
+ supervisorId,
+ dataSource,
+ desiredTaskCount
+ );
+ emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION,
"desired capacity reached")
+ .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
+ return;
+ }
+
+ // 2) Make sure we wait for any pending completion tasks to finish.
+ // At this point there could be 3 generations of tasks: pending
completion tasks (old generation), running tasks (current generation), and
(after our scale) pending tasks (new generation).
+ // We want to avoid killing any old generation tasks preemptively,
as that might cause the current generation tasks' offsets to become invalid.
for (CopyOnWriteArrayList<TaskGroup> list :
pendingCompletionTaskGroups.values()) {
// There are expected to be pending tasks if this scaling is
happening on task rollover
if (!list.isEmpty() && !isScalingTasksOnRollover.get()) {
@@ -513,45 +537,59 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
dataSource,
list
);
- if (desiredTaskCount > 0) {
- emitter.emit(event.setDimension(
- AUTOSCALER_SKIP_REASON_DIMENSION,
- "There are tasks pending completion"
- )
- .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
- }
+ emitter.emit(event.setDimension(
+ AUTOSCALER_SKIP_REASON_DIMENSION,
+ "There are tasks pending completion"
+ )
+ .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
return;
}
}
- if (nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+
+ // 3) Make sure we are not breaching any scaling cooldown limits.
+ // Scaling operations are disruptive — scale-down in particular can
leave the supervisor
+ // under-resourced while it recovers from lag induced by the scale
event, so callers may
+ // configure a longer cooldown for scale-down than for scale-up.
Both directions are measured against the same
+ // last-scale timestamp so that a rapid up/down oscillation is still
subject to the appropriate cooldown,
+ // regardless of which direction triggered last.
+ final ScaleDirection scaleDirection;
+ final long cooldownMillis;
+
+ if (desiredTaskCount > currentTaskCount) {
+ scaleDirection = ScaleDirection.SCALE_UP;
+ cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis();
+ } else { // desiredTaskCount < currentTaskCount
+ scaleDirection = ScaleDirection.SCALE_DOWN;
+ cooldownMillis =
autoScalerConfig.getMinScaleDownDelay().getMillis();
+ }
+
+ if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) {
log.info(
- "DynamicAllocationTasksNotice submitted again in [%d] millis,
minTriggerDynamicFrequency is [%s] for supervisor[%s] for dataSource[%s],
skipping it! desired task count is [%s], active task count is [%s]",
- nowTime - dynamicTriggerLastRunTime,
- autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
+ "DynamicAllocationTasksNotice submitted again in [%d]ms, [%s]
cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired
task count is [%d], current task count is [%d]",
+ nowTime - dynamicTriggerLastScaleRunTime,
+ scaleDirection,
+ cooldownMillis,
supervisorId,
dataSource,
desiredTaskCount,
- getActiveTaskGroupsCount()
+ currentTaskCount
);
- if (desiredTaskCount > 0) {
- emitter.emit(event.setDimension(
- AUTOSCALER_SKIP_REASON_DIMENSION,
- "minTriggerScaleActionFrequencyMillis not
elapsed yet"
- )
- .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
- }
+ emitter.emit(event.setDimension(
+ AUTOSCALER_SKIP_REASON_DIMENSION,
+ "Scale cooldown not elapsed yet"
+ )
+ .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
return;
}
- if (desiredTaskCount > 0) {
- emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
- }
+ // At this point, we can reasonably attempt a scaling action, so
emit our required task count
+ emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC,
desiredTaskCount));
boolean allocationSuccess = changeTaskCount(desiredTaskCount);
if (allocationSuccess) {
onSuccessfulScale.run();
- dynamicTriggerLastRunTime = nowTime;
+ dynamicTriggerLastScaleRunTime = nowTime;
}
}
catch (Exception ex) {
@@ -586,8 +624,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next
RunNotice will create scaled number of ingest tasks without resubmitting the
supervisor.
*
* @param desiredActiveTaskCount desired taskCount computed from AutoScaler
- * @return Boolean flag indicating if scale action was executed or not. If
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next
'changeTaskCount'.
- * If false, it will do 'changeTaskCount' again after
'scaleActionPeriodMillis' millis.
+ * @return Boolean flag indicating if scale action was executed or not. If
true, it will wait at least the configured
+ * 'minScaleUpDelay' or 'minScaleDownDelay' (whichever matches the
direction of the next scale) before the
+ * next 'changeTaskCount'. If false, it will do 'changeTaskCount'
again after 'scaleActionPeriodMillis' millis.
* @throws InterruptedException
* @throws ExecutionException
*/
@@ -958,7 +997,7 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final boolean useExclusiveStartingSequence;
private boolean listenerRegistered = false;
private long lastRunTime;
- private long dynamicTriggerLastRunTime;
+ private long dynamicTriggerLastScaleRunTime;
private int initRetryCounter = 0;
private volatile DateTime firstRunTime;
private volatile DateTime earlyStopTime = null;
@@ -1424,6 +1463,16 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction,
onSuccessfulScale, emitter));
}
+ @VisibleForTesting
+ void handleDynamicAllocationTasksNotice(
+ Callable<Integer> scaleAction,
+ Runnable onSuccessfulScale,
+ ServiceEmitter emitter
+ )
+ {
+ new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale,
emitter).handle();
+ }
+
private Runnable buildRunTask()
{
return () -> addNotice(new RunNotice());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
index 6d3b2a0daa0..f6384d2a64e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
@@ -27,6 +27,7 @@ import
org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
@UnstableApi
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy",
defaultImpl = LagBasedAutoScalerConfig.class)
@@ -37,7 +38,28 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
public interface AutoScalerConfig
{
boolean getEnableTaskAutoScaler();
+
+ /**
+ * @deprecated Use {@link #getMinScaleUpDelay()} and {@link
#getMinScaleDownDelay()} instead.
+ * This field is retained for backward compatibility and will be removed in
a future version.
+ */
+ @Deprecated
long getMinTriggerScaleActionFrequencyMillis();
+
+ /**
+ * Minimum time that must elapse after any scale action before a scale-up is
permitted.
+ * If not explicitly configured, implementations fall back to
+ * {@link #getMinTriggerScaleActionFrequencyMillis()} for backward
compatibility.
+ */
+ Duration getMinScaleUpDelay();
+
+ /**
+ * Minimum time that must elapse after any scale action before a scale-down
is permitted.
+ * If not explicitly configured, implementations fall back to
+ * {@link #getMinTriggerScaleActionFrequencyMillis()} for backward
compatibility.
+ */
+ Duration getMinScaleDownDelay();
+
int getTaskCountMax();
int getTaskCountMin();
Integer getTaskCountStart();
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 21614c90c8c..fa7db4f6928 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -28,7 +28,6 @@ import
org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
-import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -90,8 +89,6 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
private final WeightedCostFunction costFunction;
private volatile CostMetrics lastKnownMetrics;
- private volatile long lastScaleActionTimeMillis = -1;
-
public CostBasedAutoScaler(
SeekableStreamSupervisor supervisor,
CostBasedAutoScalerConfig config,
@@ -189,21 +186,17 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
// If task count is out of bounds, scale to the configured boundary
// regardless of optimal task count, to get back to a safe state.
- if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
+ if (isTaskCountOutOfBounds) {
taskCount = currentTaskCount;
- lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Task count for supervisor[%s] was out of bounds [%d,%d],
urgently scaling from [%d] to [%d].",
supervisorId, config.getTaskCountMin(),
config.getTaskCountMax(), currentTaskCount, currentTaskCount);
- } else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
+ } else if (optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
- lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
up).", supervisorId, currentTaskCount, taskCount);
} else if (!config.isScaleDownOnTaskRolloverOnly()
- && isScaleActionAllowed()
&& optimalTaskCount < currentTaskCount
&& optimalTaskCount > 0) {
taskCount = optimalTaskCount;
- lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
down).", supervisorId, currentTaskCount, taskCount);
} else {
taskCount = -1;
@@ -594,22 +587,4 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
}
}
- /**
- * Determines if a scale action is currently allowed based on the elapsed
time
- * since the last scale action and the configured minimum scale-down delay.
- */
- private boolean isScaleActionAllowed()
- {
- if (lastScaleActionTimeMillis < 0) {
- return true;
- }
-
- final long barrierMillis = config.getMinScaleDownDelay().getMillis();
- if (barrierMillis <= 0) {
- return true;
- }
-
- final long elapsedMillis = DateTimes.nowUtc().getMillis() -
lastScaleActionTimeMillis;
- return elapsedMillis >= barrierMillis;
- }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index 025da787851..b19a3e2cbbe 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -45,7 +45,6 @@ import java.util.Objects;
public class CostBasedAutoScalerConfig implements AutoScalerConfig
{
static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10
minutes
- static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 5 * 60
* 1000; // 5 minutes
static final double DEFAULT_LAG_WEIGHT = 0.25;
static final double DEFAULT_IDLE_WEIGHT = 0.75;
static final Duration DEFAULT_MIN_SCALE_DELAY =
Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3);
@@ -62,6 +61,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private final double idleWeight;
private final boolean useTaskCountBoundaries;
private final int highLagThreshold;
+ private final Duration minScaleUpDelay;
private final Duration minScaleDownDelay;
private final boolean scaleDownDuringTaskRolloverOnly;
@@ -78,6 +78,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
@Nullable @JsonProperty("idleWeight") Double idleWeight,
@Nullable @JsonProperty("useTaskCountBoundaries") Boolean
useTaskCountBoundaries,
@Nullable @JsonProperty("highLagThreshold") Integer highLagThreshold,
+ @Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay,
@Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay,
@Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean
scaleDownDuringTaskRolloverOnly
)
@@ -90,7 +91,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
: DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
this.minTriggerScaleActionFrequencyMillis = Configs.valueOrDefault(
minTriggerScaleActionFrequencyMillis,
- DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS
+ DEFAULT_SCALE_ACTION_PERIOD_MILLIS
);
// Cost function weights with defaults
@@ -98,6 +99,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
this.useTaskCountBoundaries =
Configs.valueOrDefault(useTaskCountBoundaries, false);
this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1);
+ this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay,
Duration.millis(this.minTriggerScaleActionFrequencyMillis));
this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay,
DEFAULT_MIN_SCALE_DELAY);
this.scaleDownDuringTaskRolloverOnly =
Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
@@ -125,7 +127,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >=
0");
- Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0,
"minScaleDownDelay must be >= 0");
+ Preconditions.checkArgument(this.minScaleUpDelay.getMillis() >= 0,
"minScaleUpDelay must be a duration >= 0 millis");
+ Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0,
"minScaleDownDelay must be a duration >= 0 millis");
}
/**
@@ -165,6 +168,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return taskCountStart;
}
+ @Deprecated
@Override
@JsonProperty
public long getMinTriggerScaleActionFrequencyMillis()
@@ -217,10 +221,19 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
}
/**
- * Represents the minimum duration between successful scale actions.
- * A higher value implies a more conservative scaling behavior, ensuring
that tasks
- * are not scaled too frequently during workload fluctuations.
+ * Returns the minimum delay before a scale-up action is allowed after any
previous scale action.
*/
+ @Override
+ @JsonProperty
+ public Duration getMinScaleUpDelay()
+ {
+ return minScaleUpDelay;
+ }
+
+ /**
+ * Returns the minimum delay before a scale-down action is allowed after any
previous scale action.
+ */
+ @Override
@JsonProperty
public Duration getMinScaleDownDelay()
{
@@ -263,6 +276,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
&& Double.compare(that.lagWeight, lagWeight) == 0
&& Double.compare(that.idleWeight, idleWeight) == 0
&& useTaskCountBoundaries == that.useTaskCountBoundaries
+ && Objects.equals(minScaleUpDelay, that.minScaleUpDelay)
&& Objects.equals(minScaleDownDelay, that.minScaleDownDelay)
&& scaleDownDuringTaskRolloverOnly ==
that.scaleDownDuringTaskRolloverOnly
&& Objects.equals(taskCountStart, that.taskCountStart)
@@ -285,6 +299,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
idleWeight,
useTaskCountBoundaries,
highLagThreshold,
+ minScaleUpDelay,
minScaleDownDelay,
scaleDownDuringTaskRolloverOnly
);
@@ -305,6 +320,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
", idleWeight=" + idleWeight +
", useTaskCountBoundaries=" + useTaskCountBoundaries +
", highLagThreshold=" + highLagThreshold +
+ ", minScaleUpDelay=" + minScaleUpDelay +
", minScaleDownDelay=" + minScaleDownDelay +
", scaleDownDuringTaskRolloverOnly=" +
scaleDownDuringTaskRolloverOnly +
'}';
@@ -327,6 +343,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private Double idleWeight;
private Boolean useTaskCountBoundaries;
private Integer highLagThreshold;
+ private Duration minScaleUpDelay;
private Duration minScaleDownDelay;
private Boolean scaleDownDuringTaskRolloverOnly;
@@ -388,6 +405,12 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return this;
}
+ public Builder minScaleUpDelay(Duration minScaleUpDelay)
+ {
+ this.minScaleUpDelay = minScaleUpDelay;
+ return this;
+ }
+
public Builder minScaleDownDelay(Duration minScaleDownDelay)
{
this.minScaleDownDelay = minScaleDownDelay;
@@ -426,6 +449,7 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
idleWeight,
useTaskCountBoundaries,
highLagThreshold,
+ minScaleUpDelay,
minScaleDownDelay,
scaleDownDuringTaskRolloverOnly
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index c53ac0e379c..1ec08384a40 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -23,12 +23,14 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.util.Objects;
@@ -51,6 +53,8 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
private final int scaleOutStep;
private final boolean enableTaskAutoScaler;
private final long minTriggerScaleActionFrequencyMillis;
+ private final Duration minScaleUpDelay;
+ private final Duration minScaleDownDelay;
private final AggregateFunction lagAggregate;
private final Double stopTaskCountRatio;
@@ -71,6 +75,8 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
@Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
@Nullable @JsonProperty("enableTaskAutoScaler") Boolean
enableTaskAutoScaler,
@Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long
minTriggerScaleActionFrequencyMillis,
+ @Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay,
+ @Nullable @JsonProperty("minScaleDownDelay") Duration
minScaleDownDelay,
@Nullable @JsonProperty("lagAggregate") AggregateFunction
lagAggregate,
@Nullable @JsonProperty("stopTaskCountRatio") Double
stopTaskCountRatio
)
@@ -105,11 +111,21 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
this.minTriggerScaleActionFrequencyMillis =
minTriggerScaleActionFrequencyMillis != null ?
minTriggerScaleActionFrequencyMillis : 600000;
+ this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay,
Duration.millis(this.minTriggerScaleActionFrequencyMillis));
+ this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay,
Duration.millis(this.minTriggerScaleActionFrequencyMillis));
Preconditions.checkArgument(
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 &&
stopTaskCountRatio <= 1.0),
"0.0 < stopTaskCountRatio <= 1.0"
);
+ Preconditions.checkArgument(
+ this.minScaleUpDelay.getMillis() >= 0,
+ "minScaleUpDelay must be a duration >= 0 millis"
+ );
+ Preconditions.checkArgument(
+ this.minScaleDownDelay.getMillis() >= 0,
+ "minScaleDownDelay must be a duration >= 0 millis"
+ );
this.stopTaskCountRatio = stopTaskCountRatio;
}
@@ -213,6 +229,7 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
return enableTaskAutoScaler;
}
+ @Deprecated
@Override
@JsonProperty
public long getMinTriggerScaleActionFrequencyMillis()
@@ -220,6 +237,20 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
return minTriggerScaleActionFrequencyMillis;
}
+ @Override
+ @JsonProperty
+ public Duration getMinScaleUpDelay()
+ {
+ return minScaleUpDelay;
+ }
+
+ @Override
+ @JsonProperty
+ public Duration getMinScaleDownDelay()
+ {
+ return minScaleDownDelay;
+ }
+
@JsonProperty
@Nullable
public AggregateFunction getLagAggregate()
@@ -244,8 +275,10 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
", taskCountMin=" + taskCountMin +
", taskCountStart=" + taskCountStart +
", minTriggerScaleActionFrequencyMillis=" +
minTriggerScaleActionFrequencyMillis +
+ ", minScaleUpDelay=" + minScaleUpDelay +
+ ", minScaleDownDelay=" + minScaleDownDelay +
", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
- ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
+ ", lagCollectionRangeMillis=" + lagCollectionRangeMillis +
", scaleOutThreshold=" + scaleOutThreshold +
", triggerScaleOutFractionThreshold=" +
triggerScaleOutFractionThreshold +
", scaleInThreshold=" + scaleInThreshold +
@@ -286,6 +319,8 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
scaleOutStep == that.scaleOutStep &&
enableTaskAutoScaler == that.enableTaskAutoScaler &&
minTriggerScaleActionFrequencyMillis ==
that.minTriggerScaleActionFrequencyMillis &&
+ Objects.equals(minScaleUpDelay, that.minScaleUpDelay) &&
+ Objects.equals(minScaleDownDelay, that.minScaleDownDelay) &&
Objects.equals(taskCountStart, that.taskCountStart) &&
lagAggregate == that.lagAggregate &&
Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
@@ -310,6 +345,8 @@ public class LagBasedAutoScalerConfig implements
AutoScalerConfig
scaleOutStep,
enableTaskAutoScaler,
minTriggerScaleActionFrequencyMillis,
+ minScaleUpDelay,
+ minScaleDownDelay,
lagAggregate,
stopTaskCountRatio
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java
index 93653f28fe9..db9bdb3d0c8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java
@@ -21,12 +21,14 @@ package
org.apache.druid.indexing.overlord.supervisor.autoscaler;
import com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Test;
public class LagBasedAutoScalerConfigTest
{
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
@Test
public void testDefaults()
@@ -48,6 +50,8 @@ public class LagBasedAutoScalerConfigTest
null,
null,
null,
+ null,
+ null,
null
);
@@ -63,6 +67,9 @@ public class LagBasedAutoScalerConfigTest
Assert.assertEquals(1, config.getScaleInStep());
Assert.assertEquals(2, config.getScaleOutStep());
Assert.assertEquals(600000,
config.getMinTriggerScaleActionFrequencyMillis());
+ // When minScaleUpDelay/minScaleDownDelay are not set, they fall back to
minTriggerScaleActionFrequencyMillis
+ Assert.assertEquals(Duration.millis(600000), config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(600000),
config.getMinScaleDownDelay());
Assert.assertNull(config.getLagAggregate());
Assert.assertNull(config.getStopTaskCountRatio());
Assert.assertEquals(0, config.getTaskCountMax());
@@ -88,7 +95,9 @@ public class LagBasedAutoScalerConfigTest
1,
5,
true,
- 5000L,
+ null,
+ Duration.millis(3000),
+ Duration.millis(7000),
AggregateFunction.SUM,
0.1
);
@@ -125,6 +134,8 @@ public class LagBasedAutoScalerConfigTest
true,
null,
null,
+ null,
+ null,
null
)
);
@@ -151,6 +162,8 @@ public class LagBasedAutoScalerConfigTest
true,
null,
null,
+ null,
+ null,
null
)
);
@@ -177,6 +190,8 @@ public class LagBasedAutoScalerConfigTest
true,
null,
null,
+ null,
+ null,
null
)
);
@@ -206,6 +221,8 @@ public class LagBasedAutoScalerConfigTest
true,
null,
null,
+ null,
+ null,
0.0
)
);
@@ -230,6 +247,8 @@ public class LagBasedAutoScalerConfigTest
true,
null,
null,
+ null,
+ null,
-0.1
)
);
@@ -255,6 +274,8 @@ public class LagBasedAutoScalerConfigTest
true,
null,
null,
+ null,
+ null,
1.1
)
);
@@ -278,11 +299,256 @@ public class LagBasedAutoScalerConfigTest
true,
null,
null,
+ null,
+ null,
0.5
);
Assert.assertEquals(Double.valueOf(0.5), config.getStopTaskCountRatio());
}
+ @Test
+ public void testScaleDelayFallback() throws Exception
+ {
+ // Neither minScaleUpDelay nor minScaleDownDelay set: both fall back to
minTriggerScaleActionFrequencyMillis
+ LagBasedAutoScalerConfig baseOnly = new LagBasedAutoScalerConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 10,
+ null,
+ 1,
+ null,
+ null,
+ false,
+ 60000L,
+ null,
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(60000L,
baseOnly.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.millis(60000), baseOnly.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(60000),
baseOnly.getMinScaleDownDelay());
+
+ // Only minScaleUpDelay set
+ LagBasedAutoScalerConfig upOnly = new LagBasedAutoScalerConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 10,
+ null,
+ 1,
+ null,
+ null,
+ false,
+ 60000L,
+ Duration.millis(15000),
+ null,
+ null,
+ null
+ );
+ Assert.assertEquals(Duration.millis(15000), upOnly.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(60000), upOnly.getMinScaleDownDelay());
+
+ // Only minScaleDownDelay set
+ LagBasedAutoScalerConfig downOnly = new LagBasedAutoScalerConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 10,
+ null,
+ 1,
+ null,
+ null,
+ false,
+ 60000L,
+ null,
+ Duration.millis(30000),
+ null,
+ null
+ );
+ Assert.assertEquals(Duration.millis(60000), downOnly.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(30000),
downOnly.getMinScaleDownDelay());
+
+ // Both set: serde roundtrip preserves values
+ LagBasedAutoScalerConfig bothSet = new LagBasedAutoScalerConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 10,
+ null,
+ 1,
+ null,
+ null,
+ false,
+ null,
+ Duration.millis(15000),
+ Duration.millis(30000),
+ null,
+ null
+ );
+ Assert.assertEquals(Duration.millis(15000), bothSet.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(30000),
bothSet.getMinScaleDownDelay());
+ LagBasedAutoScalerConfig roundTripped = OBJECT_MAPPER.readValue(
+ OBJECT_MAPPER.writeValueAsString(bothSet),
+ LagBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(bothSet, roundTripped);
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testScaleDelayFallbackViaSerde() throws Exception
+ {
+ // JSON with only minTriggerScaleActionFrequencyMillis (no Duration
fields):
+ // both getMinScaleUpDelay() and getMinScaleDownDelay() should fall back
to it.
+ String json =
"{\"taskCountMax\":10,\"taskCountMin\":1,\"minTriggerScaleActionFrequencyMillis\":45000}";
+ LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(json,
LagBasedAutoScalerConfig.class);
+ Assert.assertEquals(45000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.millis(45000), config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(45000), config.getMinScaleDownDelay());
+
+ // JSON with minTriggerScaleActionFrequencyMillis and only minScaleUpDelay:
+ // getMinScaleUpDelay() should return the explicit value;
getMinScaleDownDelay() falls back.
+ String jsonUpOnly = "{\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minTriggerScaleActionFrequencyMillis\":45000,"
+ + "\"minScaleUpDelay\":\"PT10S\"}";
+ LagBasedAutoScalerConfig configUpOnly =
OBJECT_MAPPER.readValue(jsonUpOnly, LagBasedAutoScalerConfig.class);
+ Assert.assertEquals(Duration.standardSeconds(10),
configUpOnly.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(45000),
configUpOnly.getMinScaleDownDelay());
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testMinTriggerScaleActionFrequencyMillisSerdeCompat() throws
Exception
+ {
+ final long defaultMinTriggerMillis = 600_000L;
+
+ // Backwards-compat: nothing set -> deprecated field gets its default and
both direction
+ // delays fall back to it.
+ {
+ LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+ "{\"taskCountMax\":10,\"taskCountMin\":1}",
+ LagBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(defaultMinTriggerMillis,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.millis(defaultMinTriggerMillis),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(defaultMinTriggerMillis),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Backwards-compat: legacy spec sets only the deprecated field. Both
direction delays fall
+ // back to it.
+ {
+ LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+
"{\"taskCountMax\":10,\"taskCountMin\":1,\"minTriggerScaleActionFrequencyMillis\":900000}",
+ LagBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(900_000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.millis(900_000),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(900_000),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Forwards-compat: direction delays set, deprecated field omitted.
Deprecated field defaults.
+ {
+ LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+ "{\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}",
+ LagBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(defaultMinTriggerMillis,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.standardMinutes(2),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.standardMinutes(15),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Forwards-compat: deprecated field AND direction delays set (overlapping
migration window).
+ // Direction delays win for their own direction; the deprecated field is
preserved.
+ {
+ LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+ "{\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+ + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}",
+ LagBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(900_000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.standardMinutes(2),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.standardMinutes(15),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Mixed: only minScaleUpDelay + deprecated field set. Down falls back to
the deprecated field.
+ {
+ LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+ "{\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+ + "\"minScaleUpDelay\":\"PT2M\"}",
+ LagBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(900_000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.standardMinutes(2),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(900_000),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Mixed: only minScaleDownDelay + deprecated field set. Up falls back to
the deprecated field.
+ {
+ LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+ "{\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+ + "\"minScaleDownDelay\":\"PT15M\"}",
+ LagBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(900_000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.millis(900_000),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.standardMinutes(15),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Mixed: deprecated field omitted, only minScaleUpDelay set. Down falls
back to the
+ // deprecated field's default.
+ {
+ LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+
"{\"taskCountMax\":10,\"taskCountMin\":1,\"minScaleUpDelay\":\"PT2M\"}",
+ LagBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(defaultMinTriggerMillis,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.standardMinutes(2),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.millis(defaultMinTriggerMillis),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+ }
+
+ private void assertRoundTrips(LagBasedAutoScalerConfig config) throws
Exception
+ {
+ LagBasedAutoScalerConfig roundTripped = OBJECT_MAPPER.readValue(
+ OBJECT_MAPPER.writeValueAsString(config),
+ LagBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(config, roundTripped);
+ }
+
@Test
public void testEqualsAndHashCode()
{
@@ -302,6 +568,8 @@ public class LagBasedAutoScalerConfigTest
3,
true,
7000L,
+ null,
+ null,
AggregateFunction.SUM,
0.5
);
@@ -321,6 +589,8 @@ public class LagBasedAutoScalerConfigTest
1,
true,
7000L,
+ null,
+ null,
AggregateFunction.AVERAGE,
0.5
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index 7c20855b033..bf3d5f9d71e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -441,7 +441,7 @@ public class SeekableStreamSupervisorSpecTest extends
SeekableStreamSupervisorTe
.stream()
.map(metric ->
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
- .anyMatch("minTriggerScaleActionFrequencyMillis not elapsed
yet"::equals));
+ .anyMatch("Scale cooldown not elapsed yet"::equals));
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC,
1);
autoScaler.reset();
autoScaler.stop();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index d59e2711ce8..5f6986551a7 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -49,10 +49,13 @@ import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@@ -81,6 +84,8 @@ import
org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.PendingSegmentRecord;
@@ -2683,6 +2688,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
true,
null,
null,
+ null,
+ null,
null
);
SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1,
autoScalerConfig, null);
@@ -2760,6 +2767,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
true,
null,
null,
+ null,
+ null,
0.4
);
SeekableStreamSupervisorIOConfig config = new
SeekableStreamSupervisorIOConfig(
@@ -3303,6 +3312,22 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
}
+ private class StateOverrideTestSeekableStreamSupervisor extends
TestSeekableStreamSupervisor
+ {
+ private final SupervisorStateManager.State state;
+
+ private
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.State state)
+ {
+ this.state = state;
+ }
+
+ @Override
+ public SupervisorStateManager.State getState()
+ {
+ return state;
+ }
+ }
+
private class TestEmittingTestSeekableStreamSupervisor extends
BaseTestSeekableStreamSupervisor
{
private final CountDownLatch latch;
@@ -3572,6 +3597,134 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
+ @Test
+ public void testDynamicAllocationScaleUpAllowedWhenCooldownElapsed()
+ {
+ final long zeroCooldown = 0L;
+ final long unusedCooldown = Duration.standardHours(1).getMillis();
+ final StubServiceEmitter scalingEmitter =
setupSupervisorForAutoScalingTest(zeroCooldown, unusedCooldown, 2);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ // minScaleUpDelay = 0 means any scale-up is immediately allowed.
+ supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {},
scalingEmitter);
+
+ Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertEquals("Exactly one required-tasks emission expected", 1,
events.size());
+ assertScaledToTaskCount(events.get(0), 5);
+ }
+
+ @Test
+ public void testDynamicAllocationScaleUpBlockedWhenCooldownNotElapsed()
+ {
+ final long scaleUpCooldown = Duration.standardHours(1).getMillis();
+ final long unusedCooldown = 0L;
+ final StubServiceEmitter scalingEmitter =
setupSupervisorForAutoScalingTest(scaleUpCooldown, unusedCooldown, 2);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ // First scale-up succeeds and stamps the last-scale timestamp.
+ supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {},
scalingEmitter);
+ Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue());
+
+ // Second scale-up is within the 1h minScaleUpDelay window and must be
blocked.
+ supervisor.handleDynamicAllocationTasksNotice(() -> 7, () -> {},
scalingEmitter);
+ Assert.assertEquals("Second scale-up must not take effect", 5,
supervisor.getIoConfig().getTaskCount().intValue());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertEquals("Two required-tasks emissions expected (one applied,
one skipped)", 2, events.size());
+ // First emission: the successful scale carries no skip-reason dim and
reports the applied count.
+ assertScaledToTaskCount(events.get(0), 5);
+ // Second emission: the gated scale carries the cooldown skip-reason dim
and the proposed (not applied) count.
+ assertScaleSkipped(events.get(1), 7, "Scale cooldown not elapsed yet");
+ }
+
+ @Test
+ public void testDynamicAllocationScaleDownAllowedWhenCooldownElapsed()
+ {
+ final long unusedCooldown = Duration.standardHours(1).getMillis();
+ final long zeroCooldown = 0L;
+ final StubServiceEmitter scalingEmitter =
setupSupervisorForAutoScalingTest(unusedCooldown, zeroCooldown, 5);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ // minScaleDownDelay = 0 means any scale-down is immediately allowed.
+ supervisor.handleDynamicAllocationTasksNotice(() -> 2, () -> {},
scalingEmitter);
+
+ Assert.assertEquals(2, supervisor.getIoConfig().getTaskCount().intValue());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertEquals("Exactly one required-tasks emission expected", 1,
events.size());
+ assertScaledToTaskCount(events.get(0), 2);
+ }
+
+ @Test
+ public void testDynamicAllocationScaleDownBlockedWhenCooldownNotElapsed()
+ {
+ final long unusedCooldown = 0L;
+ final long scaleDownCooldown = Duration.standardHours(1).getMillis();
+ final StubServiceEmitter scalingEmitter =
setupSupervisorForAutoScalingTest(unusedCooldown, scaleDownCooldown, 5);
+ final TestSeekableStreamSupervisor supervisor =
+ new
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+ // First scale-down succeeds and stamps the last-scale timestamp.
+ supervisor.handleDynamicAllocationTasksNotice(() -> 3, () -> {},
scalingEmitter);
+ Assert.assertEquals(3, supervisor.getIoConfig().getTaskCount().intValue());
+
+ // Second scale-down is within the 1h minScaleDownDelay window and must be
blocked.
+ supervisor.handleDynamicAllocationTasksNotice(() -> 1, () -> {},
scalingEmitter);
+ Assert.assertEquals("Second scale-down must not take effect", 3,
supervisor.getIoConfig().getTaskCount().intValue());
+
+ final List<ServiceMetricEvent> events =
+
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+ Assert.assertEquals("Two required-tasks emissions expected (one applied,
one skipped)", 2, events.size());
+ assertScaledToTaskCount(events.get(0), 3);
+ assertScaleSkipped(events.get(1), 1, "Scale cooldown not elapsed yet");
+ }
+
+ /**
+ * Asserts that a required-tasks emission represents an scale event: it
carries the standard
+ * supervisor/datasource/stream dims, no scalingSkipReason dim, and the
metric value matches the
+ * new task count.
+ */
+ private static void assertScaledToTaskCount(ServiceMetricEvent event, int
expectedRequiredCount)
+ {
+ assertStandardDimensions(event);
+ Assert.assertNull(
+ "Attempted scale must not carry a scalingSkipReason dim",
+
event.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+ );
+ Assert.assertEquals(expectedRequiredCount, event.getValue().intValue());
+ }
+
+ /**
+ * Asserts that a required-tasks emission represents a skipped scale: it
carries the standard
+ * supervisor/datasource/stream dims, a scalingSkipReason dim equal to
{@code expectedReason},
+ * and the metric value matches the proposed (not applied) task count.
+ */
+ private static void assertScaleSkipped(ServiceMetricEvent event, int
expectedRequiredCount, String expectedReason)
+ {
+ assertStandardDimensions(event);
+ Assert.assertEquals(
+ expectedReason,
+
event.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+ );
+ Assert.assertEquals(expectedRequiredCount, event.getValue().intValue());
+ }
+
+ private static void assertStandardDimensions(ServiceMetricEvent event)
+ {
+ final Map<String, Object> dims = event.getUserDims();
+ Assert.assertEquals(SUPERVISOR_ID, dims.get(DruidMetrics.SUPERVISOR_ID));
+ Assert.assertEquals(DATASOURCE, dims.get(DruidMetrics.DATASOURCE));
+ Assert.assertEquals(STREAM, dims.get(DruidMetrics.STREAM));
+ }
+
private static TestSeekableStreamIndexTask createTestTask(String taskId,
String groupId, @Nullable Integer serverPriority,
SeekableStreamIndexTaskIOConfig taskIoConfig, RecordSupplier recordSupplier)
{
return new TestSeekableStreamIndexTask(
@@ -3588,4 +3741,118 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
serverPriority
);
}
+
+ /**
+ * Resets the {@link #spec} and {@link #taskMaster} mocks so the supervisor
sees an ioConfig with
+ * the given direction-specific cooldowns and so {@code
changeTaskCountInIOConfig} can run
+ * without hitting unmocked calls. Returns a dedicated emitter for the
caller to pass into the
+ * notice handler so dynamic-allocation events can be asserted in isolation.
+ */
+ private StubServiceEmitter setupSupervisorForAutoScalingTest(
+ long minScaleUpDelayMillis,
+ long minScaleDownDelayMillis,
+ int initialTaskCount
+ )
+ {
+ final AutoScalerConfig autoScalerConfig = testAutoScalerConfig(
+ minScaleUpDelayMillis,
+ minScaleDownDelayMillis
+ );
+ final SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(
+ initialTaskCount,
+ autoScalerConfig,
+ null
+ );
+ return resetSpecAndTaskMasterForScaling(ioConfig);
+ }
+
+ /**
+ * Returns a minimal test-only {@link AutoScalerConfig}
+ */
+ private static AutoScalerConfig testAutoScalerConfig(long
minScaleUpDelayMillis, long minScaleDownDelayMillis)
+ {
+ return new AutoScalerConfig()
+ {
+ @Override
+ public boolean getEnableTaskAutoScaler()
+ {
+ return true;
+ }
+
+ @Override
+ public long getMinTriggerScaleActionFrequencyMillis()
+ {
+ return 0L;
+ }
+
+ @Override
+ public Duration getMinScaleUpDelay()
+ {
+ return Duration.millis(minScaleUpDelayMillis);
+ }
+
+ @Override
+ public Duration getMinScaleDownDelay()
+ {
+ return Duration.millis(minScaleDownDelayMillis);
+ }
+
+ @Override
+ public int getTaskCountMax()
+ {
+ return 100;
+ }
+
+ @Override
+ public int getTaskCountMin()
+ {
+ return 1;
+ }
+
+ @Override
+ public Integer getTaskCountStart()
+ {
+ return null;
+ }
+
+ @Override
+ public Double getStopTaskCountRatio()
+ {
+ return null;
+ }
+
+ @Override
+ public SupervisorTaskAutoScaler createAutoScaler(
+ Supervisor supervisor,
+ SupervisorSpec spec,
+ ServiceEmitter emitter
+ )
+ {
+ throw new UnsupportedOperationException("test autoscaler config:
createAutoScaler not used");
+ }
+ };
+ }
+
+ private StubServiceEmitter
resetSpecAndTaskMasterForScaling(SeekableStreamSupervisorIOConfig ioConfig)
+ {
+ final StubServiceEmitter scalingEmitter = new
StubServiceEmitter("scaling", "localhost");
+
+ EasyMock.reset(spec, taskMaster);
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ // changeTaskCountInIOConfig calls this; absent path just logs and moves
on.
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+
+ replayAll();
+ return scalingEmitter;
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index fa50d87b56d..05724d1e837 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_IDLE_WEIGHT;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_LAG_WEIGHT;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_SCALE_DELAY;
-import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS;
import static
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
@SuppressWarnings("TextBlockMigration")
@@ -51,6 +50,7 @@ public class CostBasedAutoScalerConfigTest
+ " \"lagWeight\": 0.6,\n"
+ " \"idleWeight\": 0.4,\n"
+ " \"highLagThreshold\": 30000,\n"
+ + " \"minScaleUpDelay\": \"PT5M\",\n"
+ " \"minScaleDownDelay\": \"PT10M\",\n"
+ " \"scaleDownDuringTaskRolloverOnly\": true\n"
+ "}";
@@ -61,11 +61,11 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(100, config.getTaskCountMax());
Assert.assertEquals(5, config.getTaskCountMin());
Assert.assertEquals(Integer.valueOf(10), config.getTaskCountStart());
- Assert.assertEquals(600000L,
config.getMinTriggerScaleActionFrequencyMillis());
Assert.assertEquals(Double.valueOf(0.8), config.getStopTaskCountRatio());
Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+ Assert.assertEquals(Duration.standardMinutes(5),
config.getMinScaleUpDelay());
Assert.assertEquals(Duration.standardMinutes(10),
config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
Assert.assertEquals(30000, config.getHighLagThreshold());
@@ -95,12 +95,10 @@ public class CostBasedAutoScalerConfigTest
// Check defaults
Assert.assertEquals(DEFAULT_SCALE_ACTION_PERIOD_MILLIS,
config.getScaleActionPeriodMillis());
- Assert.assertEquals(
- DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS,
- config.getMinTriggerScaleActionFrequencyMillis()
- );
Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001);
Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001);
+ // minScaleUpDelay and minScaleDownDelay each have their own independent
default
+ Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS),
config.getMinScaleUpDelay());
Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY,
config.getMinScaleDownDelay());
Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
Assert.assertNull(config.getTaskCountStart());
@@ -183,11 +181,11 @@ public class CostBasedAutoScalerConfigTest
.taskCountMin(5)
.taskCountStart(10)
.enableTaskAutoScaler(true)
-
.minTriggerScaleActionFrequencyMillis(600000L)
.stopTaskCountRatio(0.8)
.scaleActionPeriodMillis(60000L)
.lagWeight(0.6)
.idleWeight(0.4)
+
.minScaleUpDelay(Duration.standardMinutes(5))
.minScaleDownDelay(Duration.standardMinutes(10))
.scaleDownDuringTaskRolloverOnly(true)
.highLagThreshold(30000)
@@ -197,13 +195,156 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(100, config.getTaskCountMax());
Assert.assertEquals(5, config.getTaskCountMin());
Assert.assertEquals(Integer.valueOf(10), config.getTaskCountStart());
- Assert.assertEquals(600000L,
config.getMinTriggerScaleActionFrequencyMillis());
Assert.assertEquals(Double.valueOf(0.8), config.getStopTaskCountRatio());
Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+ Assert.assertEquals(Duration.standardMinutes(5),
config.getMinScaleUpDelay());
Assert.assertEquals(Duration.standardMinutes(10),
config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
Assert.assertEquals(30000, config.getHighLagThreshold());
}
+
+ @Test
+ public void testScaleDelayDefaults() throws Exception
+ {
+ // Neither set: each direction gets its own independent default
+ CostBasedAutoScalerConfig defaults = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(10)
+
.taskCountMin(1)
+ .build();
+ Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS),
defaults.getMinScaleUpDelay());
+ Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY,
defaults.getMinScaleDownDelay());
+
+ // Only minScaleUpDelay set: up uses explicit value, down uses its default
+ CostBasedAutoScalerConfig upOnly = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(10)
+
.taskCountMin(1)
+
.minScaleUpDelay(Duration.standardMinutes(5))
+ .build();
+ Assert.assertEquals(Duration.standardMinutes(5),
upOnly.getMinScaleUpDelay());
+ Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY,
upOnly.getMinScaleDownDelay());
+
+ // Only minScaleDownDelay set: down uses explicit value, up uses its own
default (does not fall back to down)
+ CostBasedAutoScalerConfig downOnly = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(10)
+
.taskCountMin(1)
+
.minScaleDownDelay(Duration.standardMinutes(20))
+ .build();
+ Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS),
downOnly.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.standardMinutes(20),
downOnly.getMinScaleDownDelay());
+
+ // Both set: serde roundtrip preserves values
+ CostBasedAutoScalerConfig bothSet = CostBasedAutoScalerConfig.builder()
+
.taskCountMax(10)
+
.taskCountMin(1)
+
.minScaleUpDelay(Duration.standardMinutes(5))
+
.minScaleDownDelay(Duration.standardMinutes(20))
+ .build();
+ Assert.assertEquals(Duration.standardMinutes(5),
bothSet.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.standardMinutes(20),
bothSet.getMinScaleDownDelay());
+ CostBasedAutoScalerConfig roundTripped =
mapper.readValue(mapper.writeValueAsString(bothSet),
CostBasedAutoScalerConfig.class);
+ Assert.assertEquals(bothSet, roundTripped);
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testMinTriggerScaleActionFrequencyMillisSerdeCompat() throws
Exception
+ {
+ final long defaultMinTriggerMillis = DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
+ final Duration defaultUp =
Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS);
+ final Duration defaultDown = DEFAULT_MIN_SCALE_DELAY;
+
+ // Backwards-compat: nothing set -> everything uses its own default.
+ {
+ CostBasedAutoScalerConfig config = mapper.readValue(
+
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1}",
+ CostBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(defaultMinTriggerMillis,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(defaultUp, config.getMinScaleUpDelay());
+ Assert.assertEquals(defaultDown, config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Backwards-compat: legacy spec sets only the deprecated field. Direction
delays still use
+ // their own defaults (no cross-field fallback in CostBased).
+ {
+ CostBasedAutoScalerConfig config = mapper.readValue(
+
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minTriggerScaleActionFrequencyMillis\":900000}",
+ CostBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(900_000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.millis(900_000L),
config.getMinScaleUpDelay());
+ Assert.assertEquals(defaultDown, config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Forwards-compat: direction delays set, deprecated field omitted.
Deprecated field defaults.
+ {
+ CostBasedAutoScalerConfig config = mapper.readValue(
+
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}",
+ CostBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(defaultMinTriggerMillis,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.standardMinutes(2),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.standardMinutes(15),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Forwards-compat: deprecated field AND direction delays set (overlapping
migration window).
+ // All three are honored independently.
+ {
+ CostBasedAutoScalerConfig config = mapper.readValue(
+
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+ + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}",
+ CostBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(900_000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.standardMinutes(2),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.standardMinutes(15),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Only minScaleUpDelay set alongside the deprecated field: down uses its
own default,
+ // not the deprecated field's value.
+ {
+ CostBasedAutoScalerConfig config = mapper.readValue(
+
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+ + "\"minScaleUpDelay\":\"PT2M\"}",
+ CostBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(900_000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.standardMinutes(2),
config.getMinScaleUpDelay());
+ Assert.assertEquals(defaultDown, config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+
+ // Only minScaleDownDelay set alongside the deprecated field: up uses its
own default.
+ {
+ CostBasedAutoScalerConfig config = mapper.readValue(
+
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+ + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+ + "\"minScaleDownDelay\":\"PT15M\"}",
+ CostBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(900_000L,
config.getMinTriggerScaleActionFrequencyMillis());
+ Assert.assertEquals(Duration.millis(900_000L),
config.getMinScaleUpDelay());
+ Assert.assertEquals(Duration.standardMinutes(15),
config.getMinScaleDownDelay());
+ assertRoundTrips(config);
+ }
+ }
+
+ private void assertRoundTrips(CostBasedAutoScalerConfig config) throws
Exception
+ {
+ CostBasedAutoScalerConfig roundTripped = mapper.readValue(
+ mapper.writeValueAsString(config),
+ CostBasedAutoScalerConfig.class
+ );
+ Assert.assertEquals(config, roundTripped);
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 26c4c20f859..6466b0966e8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -82,24 +82,10 @@ public class CostBasedAutoScalerMockTest
@Test
public void testScaleUpWhenOptimalGreaterThanCurrent()
{
- // Use config with a long barrier to test cooldown behavior
- CostBasedAutoScalerConfig barrierConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.minScaleDownDelay(Duration.standardHours(1))
-
.build();
-
- CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
- mockSupervisor,
- barrierConfig,
- mockSpec,
- mockEmitter
- ));
+ CostBasedAutoScaler autoScaler = spy(new
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
int currentTaskCount = 10;
int scaleUpOptimal = 17;
- // Trigger scale-up, which should set the cooldown timer
doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
setupMocksForMetricsCollection(autoScaler, currentTaskCount, 5000.0, 0.1);
@@ -108,15 +94,6 @@ public class CostBasedAutoScalerMockTest
scaleUpOptimal,
autoScaler.computeTaskCountForScaleAction()
);
-
- // Verify cooldown blocks immediate subsequent scaling
- doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
- Assert.assertEquals(
- "Scale action should be blocked during the cooldown window",
- -1,
- autoScaler.computeTaskCountForScaleAction()
- );
}
@Test
@@ -135,45 +112,6 @@ public class CostBasedAutoScalerMockTest
Assert.assertEquals("Should return -1 when it equals current (no change
needed)", -1, result);
}
- @Test
- public void testScaleDownBlockedReturnsMinusOne()
- {
- // Use config with a long barrier to test cooldown behavior
- CostBasedAutoScalerConfig barrierConfig =
CostBasedAutoScalerConfig.builder()
-
.taskCountMax(100)
-
.taskCountMin(1)
-
.enableTaskAutoScaler(true)
-
.minScaleDownDelay(Duration.standardHours(1))
-
.build();
-
- CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
- mockSupervisor,
- barrierConfig,
- mockSpec,
- mockEmitter
- ));
-
- int currentTaskCount = 50;
- int optimalCount = 30; // Lower than current (scale-down scenario)
-
- doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
- setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
-
- // First attempt: allowed (no prior scale action)
- Assert.assertEquals(
- "Scale-down should succeed when no prior scale action exists",
- optimalCount,
- autoScaler.computeTaskCountForScaleAction()
- );
-
- // Second attempt: blocked by cooldown
- Assert.assertEquals(
- "Scale-down should be blocked during the cooldown window",
- -1,
- autoScaler.computeTaskCountForScaleAction()
- );
- }
-
@Test
public void testReturnsMinusOneWhenMetricsCollectionFails()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
index 7f5a0f5a9df..ffbfee77b72 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
@@ -74,6 +74,8 @@ public class LagBasedAutoScalerTest
4,
true, // enableTaskAutoScaler
6_000_000L, // minTriggerScaleActionFrequencyMillis
+ null, // minScaleUpDelay
+ null, // minScaleDownDelay
null, // lagAggregate
null // stopTaskCountRatio
);
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/ScaleDirection.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/ScaleDirection.java
new file mode 100644
index 00000000000..63523f6cfb0
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/ScaleDirection.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor.autoscaler;
+
+public enum ScaleDirection
+{
+ SCALE_UP("scale-up"), SCALE_DOWN("scale-down");
+
+ private final String label;
+
+ ScaleDirection(String label)
+ {
+ this.label = label;
+ }
+
+ @Override
+ public String toString()
+ {
+ return label;
+ }
+}
diff --git a/website/.spelling b/website/.spelling
index 99e767c4891..28701817f36 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -895,6 +895,8 @@ c3.2xlarge
defaultManualBrokerService
maxPriority
minPriority
+minScaleUpDelay
+minScaleDownDelay
NUMBER_FEATURES
NUMBER_OF_CONTRIBUTORS
PreparedStatement
@@ -2080,6 +2082,7 @@ autoscalers
batch_index_task
cgroup
classloader
+cooldown
com.metamx
common.runtime.properties
cpuacct
@@ -2168,6 +2171,8 @@ s3n
slf4j
sql
sqlQuery
+scale-up
+scale-down
successfulSending
[S]igar
taskBlackListCleanupPeriod
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]