This is an automated email from the ASF dual-hosted git repository.

jtuglu1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 58693edb82b feat: support scaling direction-aware cooldown for task 
auto-scalers (#19286)
58693edb82b is described below

commit 58693edb82bd16467ec87d893414a81dd2fa56c5
Author: jtuglu1 <[email protected]>
AuthorDate: Thu Apr 23 10:41:25 2026 -0700

    feat: support scaling direction-aware cooldown for task auto-scalers 
(#19286)
    
    Adds support for configuring different cooldowns for scaling direction. 
While both scaling actions do cause temporary disruption to ingestion, scaling 
down can cause more disruption than scaling up due to having less resources 
than when you started to recover from lag. Therefore, to allow for aggressive 
scale up while having a more conservative scale-down approach, this adds 
configuration for cool down period for both directions. Cool down for a 
specific scaling direction is evaluated [...]
    
    1. minScaleUpDelay/minScaleDownDelay
    2. minTriggerScaleActionFrequencyMillis (marked as deprecated)
    3. The previously scaler-specific configured defaults for scale up/down
---
 docs/ingestion/kafka-ingestion.md                  |   3 +-
 docs/ingestion/supervisor.md                       |  17 +-
 .../supervisor/KafkaSupervisorIOConfigTest.java    |   2 +
 .../supervisor/SeekableStreamSupervisor.java       | 103 ++++++--
 .../supervisor/autoscaler/AutoScalerConfig.java    |  22 ++
 .../supervisor/autoscaler/CostBasedAutoScaler.java |  29 +--
 .../autoscaler/CostBasedAutoScalerConfig.java      |  36 ++-
 .../autoscaler/LagBasedAutoScalerConfig.java       |  39 ++-
 .../autoscaler/LagBasedAutoScalerConfigTest.java   | 274 ++++++++++++++++++++-
 .../SeekableStreamSupervisorSpecTest.java          |   2 +-
 .../SeekableStreamSupervisorStateTest.java         | 267 ++++++++++++++++++++
 .../autoscaler/CostBasedAutoScalerConfigTest.java  | 157 +++++++++++-
 .../autoscaler/CostBasedAutoScalerMockTest.java    |  64 +----
 .../autoscaler/LagBasedAutoScalerTest.java         |   2 +
 .../supervisor/autoscaler/ScaleDirection.java      |  38 +++
 website/.spelling                                  |   5 +
 16 files changed, 918 insertions(+), 142 deletions(-)

diff --git a/docs/ingestion/kafka-ingestion.md 
b/docs/ingestion/kafka-ingestion.md
index 2326b59d99d..5216a84b4e4 100644
--- a/docs/ingestion/kafka-ingestion.md
+++ b/docs/ingestion/kafka-ingestion.md
@@ -235,7 +235,8 @@ The following example shows a supervisor spec with idle 
configuration enabled:
         "enableTaskAutoScaler": true,
         "taskCountMax": 6,
         "taskCountMin": 2,
-        "minTriggerScaleActionFrequencyMillis": 600000,
+        "minScaleUpDelay": "PT10M",
+        "minScaleDownDelay": "PT10M",
         "autoScalerStrategy": "lagBased",
         "lagCollectionIntervalMillis": 30000,
         "lagCollectionRangeMillis": 600000,
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 592f0328cbe..ba92ed66b53 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -79,7 +79,9 @@ The following table outlines the configuration properties for 
`autoScalerConfig`
 |`taskCountMax`|The maximum number of ingestion tasks. Must be greater than or 
equal to `taskCountMin`. If `taskCountMax` is greater than the number of Kafka 
partitions or Kinesis shards, Druid sets the maximum number of reading tasks to 
the number of Kafka partitions or Kinesis shards and ignores 
`taskCountMax`.|Yes||
 |`taskCountMin`|The minimum number of ingestion tasks. When you enable the 
autoscaler, Druid computes the initial number of tasks to launch by checking 
the configs in the following order: `taskCountStart`, then `taskCount` (in 
`ioConfig`), then `taskCountMin`.|Yes||
 |`taskCountStart`|Optional config to specify the number of ingestion tasks to 
start with. When you enable the autoscaler, Druid computes the initial number 
of tasks to launch by checking the configs in the following order: 
`taskCountStart`, then `taskCount` (in `ioConfig`), then 
`taskCountMin`.|No|`taskCount` or `taskCountMin`|
-|`minTriggerScaleActionFrequencyMillis`|The minimum time interval between two 
scale actions.| No|600000|
+|`minScaleUpDelay`|Minimum cooldown duration between scale-up actions, 
specified as an ISO-8601 duration string. Falls back to 
`minTriggerScaleActionFrequencyMillis` if not set.|No||
+|`minScaleDownDelay`|Minimum cooldown duration between scale-down actions, 
specified as an ISO-8601 duration string. Falls back to 
`minTriggerScaleActionFrequencyMillis` if not set.|No||
+|`minTriggerScaleActionFrequencyMillis`|**Deprecated.** Use `minScaleUpDelay` 
and `minScaleDownDelay` instead. Minimum time interval in milliseconds between 
scale actions, used as the fallback when the Duration-based fields are not 
set.|No|600000|
 |`autoScalerStrategy`|The algorithm of autoscaler. Druid only supports the 
`lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more 
information.|No|`lagBased`|
 |`stopTaskCountRatio`|A variable version of `ioConfig.stopTaskCount` with a 
valid range of (0.0, 1.0]. Allows the maximum number of stoppable tasks in 
steady state to be proportional to the number of tasks currently running.|No||
 
@@ -161,7 +163,8 @@ The following example shows a supervisor spec with 
`lagBased` autoscaler:
       "enableTaskAutoScaler": true,
       "taskCountMax": 6,
       "taskCountMin": 2,
-      "minTriggerScaleActionFrequencyMillis": 600000,
+      "minScaleUpDelay": "PT10M",
+      "minScaleDownDelay": "PT10M",
       "autoScalerStrategy": "lagBased",
       "lagCollectionIntervalMillis": 30000,
       "lagCollectionRangeMillis": 600000,
@@ -210,10 +213,11 @@ The following table outlines the configuration properties 
related to the `costBa
 |`idleWeight`|The weight of extracted poll idle value in cost function. | No | 
0.75 |
 |`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when 
selecting task counts.|No| `false` |
 |`highLagThreshold`|Average partition lag threshold that triggers burst 
scale-up when set to a value greater than `0`. Set to a negative value to 
disable burst scale-up.|No|-1|
-|`minScaleDownDelay`|Minimum duration between successful scale actions, 
specified as an ISO-8601 duration string.|No|`PT30M`|
+|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before 
the next scale-up is allowed, specified as an ISO-8601 duration string.|No||
+|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action 
before the next scale-down is allowed, specified as an ISO-8601 duration 
string.|No|`PT30M`|
 |`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is 
limited to periods during task rollovers only.|No|`false`|
 
-The following example shows a supervisor spec with `lagBased` autoscaler:
+The following example shows a supervisor spec with `costBased` autoscaler:
 
 <details>
   <summary>Click to view the example</summary>
@@ -227,9 +231,10 @@ The following example shows a supervisor spec with 
`lagBased` autoscaler:
       "autoScalerStrategy": "costBased",
       "taskCountMin": 1,
       "taskCountMax": 10,
-      "minTriggerScaleActionFrequencyMillis": 600000,
+      "minScaleUpDelay": "PT10M",
+      "minScaleDownDelay": "PT30M",
       "lagWeight": 0.1,
-      "idleWeight": 0.9,
+      "idleWeight": 0.9
     }
   }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
index 6295d41937e..289d3c989f3 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java
@@ -315,6 +315,8 @@ public class KafkaSupervisorIOConfigTest
     autoScalerConfig.put("scaleInStep", 1);
     autoScalerConfig.put("scaleOutStep", 2);
     autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+    autoScalerConfig.put("minScaleUpDelay", "PT20M");
+    autoScalerConfig.put("minScaleDownDelay", "PT20M");
 
     final Map<String, Object> consumerProperties = 
KafkaConsumerConfigs.getConsumerProperties();
     consumerProperties.put("bootstrap.servers", "localhost:8082");
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 58d366d2306..f857cfaa436 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -64,6 +64,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.ScaleDirection;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -499,11 +500,34 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                     supervisorId,
                     dataSource
           );
-          final Integer desiredTaskCount = computeDesiredTaskCount.call();
+          final int desiredTaskCount = computeDesiredTaskCount.call();
+          final int currentTaskCount = getCurrentTaskCount();
+
+          if (desiredTaskCount <= 0) {
+            return;
+          }
+
           ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
                                                                
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
                                                                
.setDimension(DruidMetrics.DATASOURCE, dataSource)
                                                                
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
+
+          // 1) This should already be handled by the auto-scaler 
implementation, but make sure we catch/record these for auditability
+          if (desiredTaskCount == currentTaskCount) {
+            log.warn(
+                "Skipping scaling for supervisor[%s] for dataSource[%s]: 
already at desired task count [%d]",
+                supervisorId,
+                dataSource,
+                desiredTaskCount
+            );
+            emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, 
"desired capacity reached")
+                             .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
+            return;
+          }
+
+          // 2) Make sure we wait for any pending completion tasks to finish.
+          // At this point there could be 3 generations of tasks: pending 
completion tasks (old generation), running tasks (current generation), and 
(after our scale) pending tasks (new generation).
+          // We want to avoid killing any old generation tasks preemptively, 
as that might cause the current generation tasks' offsets to become invalid.
           for (CopyOnWriteArrayList<TaskGroup> list : 
pendingCompletionTaskGroups.values()) {
             // There are expected to be pending tasks if this scaling is 
happening on task rollover
             if (!list.isEmpty() && !isScalingTasksOnRollover.get()) {
@@ -513,45 +537,59 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                   dataSource,
                   list
               );
-              if (desiredTaskCount > 0) {
-                emitter.emit(event.setDimension(
-                                      AUTOSCALER_SKIP_REASON_DIMENSION,
-                                      "There are tasks pending completion"
-                                  )
-                                  .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
-              }
+              emitter.emit(event.setDimension(
+                                    AUTOSCALER_SKIP_REASON_DIMENSION,
+                                    "There are tasks pending completion"
+                                )
+                                .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
               return;
             }
           }
-          if (nowTime - dynamicTriggerLastRunTime < 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+
+          // 3) Make sure we are not breaching any scaling cooldown limits.
+          // Scaling operations are disruptive — scale-down in particular can 
leave the supervisor
+          // under-resourced while it recovers from lag induced by the scale 
event, so callers may
+          // configure a longer cooldown for scale-down than for scale-up. 
Both directions are measured against the same
+          // last-scale timestamp so that a rapid up/down oscillation is still 
subject to the appropriate cooldown,
+          // regardless of which direction triggered last.
+          final ScaleDirection scaleDirection;
+          final long cooldownMillis;
+
+          if (desiredTaskCount > currentTaskCount) {
+            scaleDirection = ScaleDirection.SCALE_UP;
+            cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis();
+          } else { // desiredTaskCount < currentTaskCount
+            scaleDirection = ScaleDirection.SCALE_DOWN;
+            cooldownMillis = 
autoScalerConfig.getMinScaleDownDelay().getMillis();
+          }
+
+          if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) {
             log.info(
-                "DynamicAllocationTasksNotice submitted again in [%d] millis, 
minTriggerDynamicFrequency is [%s] for supervisor[%s] for dataSource[%s], 
skipping it! desired task count is [%s], active task count is [%s]",
-                nowTime - dynamicTriggerLastRunTime,
-                autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
+                "DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] 
cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired 
task count is [%d], current task count is [%d]",
+                nowTime - dynamicTriggerLastScaleRunTime,
+                scaleDirection,
+                cooldownMillis,
                 supervisorId,
                 dataSource,
                 desiredTaskCount,
-                getActiveTaskGroupsCount()
+                currentTaskCount
             );
 
-            if (desiredTaskCount > 0) {
-              emitter.emit(event.setDimension(
-                                    AUTOSCALER_SKIP_REASON_DIMENSION,
-                                    "minTriggerScaleActionFrequencyMillis not 
elapsed yet"
-                                )
-                                .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
-            }
+            emitter.emit(event.setDimension(
+                                  AUTOSCALER_SKIP_REASON_DIMENSION,
+                                  "Scale cooldown not elapsed yet"
+                              )
+                              .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
             return;
           }
 
-          if (desiredTaskCount > 0) {
-            emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
-          }
+          // At this point, we can reasonably attempt a scaling action, so 
emit our required task count
+          emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, 
desiredTaskCount));
 
           boolean allocationSuccess = changeTaskCount(desiredTaskCount);
           if (allocationSuccess) {
             onSuccessfulScale.run();
-            dynamicTriggerLastRunTime = nowTime;
+            dynamicTriggerLastScaleRunTime = nowTime;
           }
         }
         catch (Exception ex) {
@@ -586,8 +624,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next 
RunNotice will create scaled number of ingest tasks without resubmitting the 
supervisor.
    *
    * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
-   * @return Boolean flag indicating if scale action was executed or not. If 
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 
'changeTaskCount'.
-   * If false, it will do 'changeTaskCount' again after 
'scaleActionPeriodMillis' millis.
+   * @return Boolean flag indicating if scale action was executed or not. If 
true, it will wait at least the configured
+   *         'minScaleUpDelay' or 'minScaleDownDelay' (whichever matches the 
direction of the next scale) before the
+   *         next 'changeTaskCount'. If false, it will do 'changeTaskCount' 
again after 'scaleActionPeriodMillis' millis.
    * @throws InterruptedException
    * @throws ExecutionException
    */
@@ -958,7 +997,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private final boolean useExclusiveStartingSequence;
   private boolean listenerRegistered = false;
   private long lastRunTime;
-  private long dynamicTriggerLastRunTime;
+  private long dynamicTriggerLastScaleRunTime;
   private int initRetryCounter = 0;
   private volatile DateTime firstRunTime;
   private volatile DateTime earlyStopTime = null;
@@ -1424,6 +1463,16 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, 
onSuccessfulScale, emitter));
   }
 
+  @VisibleForTesting
+  void handleDynamicAllocationTasksNotice(
+      Callable<Integer> scaleAction,
+      Runnable onSuccessfulScale,
+      ServiceEmitter emitter
+  )
+  {
+    new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, 
emitter).handle();
+  }
+
   private Runnable buildRunTask()
   {
     return () -> addNotice(new RunNotice());
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
index 6d3b2a0daa0..f6384d2a64e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
@@ -27,6 +27,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
 
 @UnstableApi
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", 
defaultImpl = LagBasedAutoScalerConfig.class)
@@ -37,7 +38,28 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 public interface AutoScalerConfig
 {
   boolean getEnableTaskAutoScaler();
+
+  /**
+   * @deprecated Use {@link #getMinScaleUpDelay()} and {@link 
#getMinScaleDownDelay()} instead.
+   * This field is retained for backward compatibility and will be removed in 
a future version.
+   */
+  @Deprecated
   long getMinTriggerScaleActionFrequencyMillis();
+
+  /**
+   * Minimum time that must elapse after any scale action before a scale-up is 
permitted.
+   * If not explicitly configured, implementations fall back to
+   * {@link #getMinTriggerScaleActionFrequencyMillis()} for backward 
compatibility.
+   */
+  Duration getMinScaleUpDelay();
+
+  /**
+   * Minimum time that must elapse after any scale action before a scale-down 
is permitted.
+   * If not explicitly configured, implementations fall back to
+   * {@link #getMinTriggerScaleActionFrequencyMillis()} for backward 
compatibility.
+   */
+  Duration getMinScaleDownDelay();
+
   int getTaskCountMax();
   int getTaskCountMin();
   Integer getTaskCountStart();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 21614c90c8c..fa7db4f6928 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -28,7 +28,6 @@ import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
-import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -90,8 +89,6 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   private final WeightedCostFunction costFunction;
   private volatile CostMetrics lastKnownMetrics;
 
-  private volatile long lastScaleActionTimeMillis = -1;
-
   public CostBasedAutoScaler(
       SeekableStreamSupervisor supervisor,
       CostBasedAutoScalerConfig config,
@@ -189,21 +186,17 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
     // If task count is out of bounds, scale to the configured boundary
     // regardless of optimal task count, to get back to a safe state.
-    if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
+    if (isTaskCountOutOfBounds) {
       taskCount = currentTaskCount;
-      lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
       log.info("Task count for supervisor[%s] was out of bounds [%d,%d], 
urgently scaling from [%d] to [%d].",
                supervisorId, config.getTaskCountMin(), 
config.getTaskCountMax(), currentTaskCount, currentTaskCount);
-    } else if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
+    } else if (optimalTaskCount > currentTaskCount) {
       taskCount = optimalTaskCount;
-      lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
       log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
up).", supervisorId, currentTaskCount, taskCount);
     } else if (!config.isScaleDownOnTaskRolloverOnly()
-               && isScaleActionAllowed()
                && optimalTaskCount < currentTaskCount
                && optimalTaskCount > 0) {
       taskCount = optimalTaskCount;
-      lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
       log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
down).", supervisorId, currentTaskCount, taskCount);
     } else {
       taskCount = -1;
@@ -594,22 +587,4 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     }
   }
 
-  /**
-   * Determines if a scale action is currently allowed based on the elapsed 
time
-   * since the last scale action and the configured minimum scale-down delay.
-   */
-  private boolean isScaleActionAllowed()
-  {
-    if (lastScaleActionTimeMillis < 0) {
-      return true;
-    }
-
-    final long barrierMillis = config.getMinScaleDownDelay().getMillis();
-    if (barrierMillis <= 0) {
-      return true;
-    }
-
-    final long elapsedMillis = DateTimes.nowUtc().getMillis() - 
lastScaleActionTimeMillis;
-    return elapsedMillis >= barrierMillis;
-  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index 025da787851..b19a3e2cbbe 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -45,7 +45,6 @@ import java.util.Objects;
 public class CostBasedAutoScalerConfig implements AutoScalerConfig
 {
   static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10 
minutes
-  static final long DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS = 5 * 60 
* 1000; // 5 minutes
   static final double DEFAULT_LAG_WEIGHT = 0.25;
   static final double DEFAULT_IDLE_WEIGHT = 0.75;
   static final Duration DEFAULT_MIN_SCALE_DELAY = 
Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3);
@@ -62,6 +61,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
   private final double idleWeight;
   private final boolean useTaskCountBoundaries;
   private final int highLagThreshold;
+  private final Duration minScaleUpDelay;
   private final Duration minScaleDownDelay;
   private final boolean scaleDownDuringTaskRolloverOnly;
 
@@ -78,6 +78,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
       @Nullable @JsonProperty("idleWeight") Double idleWeight,
       @Nullable @JsonProperty("useTaskCountBoundaries") Boolean 
useTaskCountBoundaries,
       @Nullable @JsonProperty("highLagThreshold") Integer highLagThreshold,
+      @Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay,
       @Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay,
       @Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean 
scaleDownDuringTaskRolloverOnly
   )
@@ -90,7 +91,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
                                    : DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
     this.minTriggerScaleActionFrequencyMillis = Configs.valueOrDefault(
         minTriggerScaleActionFrequencyMillis,
-        DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS
+        DEFAULT_SCALE_ACTION_PERIOD_MILLIS
     );
 
     // Cost function weights with defaults
@@ -98,6 +99,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
     this.useTaskCountBoundaries = 
Configs.valueOrDefault(useTaskCountBoundaries, false);
     this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1);
+    this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay, 
Duration.millis(this.minTriggerScaleActionFrequencyMillis));
     this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, 
DEFAULT_MIN_SCALE_DELAY);
     this.scaleDownDuringTaskRolloverOnly = 
Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
 
@@ -125,7 +127,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
 
     Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
     Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 
0");
-    Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0, 
"minScaleDownDelay must be >= 0");
+    Preconditions.checkArgument(this.minScaleUpDelay.getMillis() >= 0, 
"minScaleUpDelay must be a duration >= 0 millis");
+    Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0, 
"minScaleDownDelay must be a duration >= 0 millis");
   }
 
   /**
@@ -165,6 +168,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     return taskCountStart;
   }
 
+  @Deprecated
   @Override
   @JsonProperty
   public long getMinTriggerScaleActionFrequencyMillis()
@@ -217,10 +221,19 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
   }
 
   /**
-   * Represents the minimum duration between successful scale actions.
-   * A higher value implies a more conservative scaling behavior, ensuring 
that tasks
-   * are not scaled too frequently during workload fluctuations.
+   * Returns the minimum delay before a scale-up action is allowed after any 
previous scale action.
    */
+  @Override
+  @JsonProperty
+  public Duration getMinScaleUpDelay()
+  {
+    return minScaleUpDelay;
+  }
+
+  /**
+   * Returns the minimum delay before a scale-down action is allowed after any 
previous scale action.
+   */
+  @Override
   @JsonProperty
   public Duration getMinScaleDownDelay()
   {
@@ -263,6 +276,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
            && Double.compare(that.lagWeight, lagWeight) == 0
            && Double.compare(that.idleWeight, idleWeight) == 0
            && useTaskCountBoundaries == that.useTaskCountBoundaries
+           && Objects.equals(minScaleUpDelay, that.minScaleUpDelay)
            && Objects.equals(minScaleDownDelay, that.minScaleDownDelay)
            && scaleDownDuringTaskRolloverOnly == 
that.scaleDownDuringTaskRolloverOnly
            && Objects.equals(taskCountStart, that.taskCountStart)
@@ -285,6 +299,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
         idleWeight,
         useTaskCountBoundaries,
         highLagThreshold,
+        minScaleUpDelay,
         minScaleDownDelay,
         scaleDownDuringTaskRolloverOnly
     );
@@ -305,6 +320,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
            ", idleWeight=" + idleWeight +
            ", useTaskCountBoundaries=" + useTaskCountBoundaries +
            ", highLagThreshold=" + highLagThreshold +
+           ", minScaleUpDelay=" + minScaleUpDelay +
            ", minScaleDownDelay=" + minScaleDownDelay +
            ", scaleDownDuringTaskRolloverOnly=" + 
scaleDownDuringTaskRolloverOnly +
            '}';
@@ -327,6 +343,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     private Double idleWeight;
     private Boolean useTaskCountBoundaries;
     private Integer highLagThreshold;
+    private Duration minScaleUpDelay;
     private Duration minScaleDownDelay;
     private Boolean scaleDownDuringTaskRolloverOnly;
 
@@ -388,6 +405,12 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
       return this;
     }
 
+    public Builder minScaleUpDelay(Duration minScaleUpDelay)
+    {
+      this.minScaleUpDelay = minScaleUpDelay;
+      return this;
+    }
+
     public Builder minScaleDownDelay(Duration minScaleDownDelay)
     {
       this.minScaleDownDelay = minScaleDownDelay;
@@ -426,6 +449,7 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
           idleWeight,
           useTaskCountBoundaries,
           highLagThreshold,
+          minScaleUpDelay,
           minScaleDownDelay,
           scaleDownDuringTaskRolloverOnly
       );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index c53ac0e379c..1ec08384a40 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -23,12 +23,14 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
 import java.util.Objects;
@@ -51,6 +53,8 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
   private final int scaleOutStep;
   private final boolean enableTaskAutoScaler;
   private final long minTriggerScaleActionFrequencyMillis;
+  private final Duration minScaleUpDelay;
+  private final Duration minScaleDownDelay;
   private final AggregateFunction lagAggregate;
   private final Double stopTaskCountRatio;
 
@@ -71,6 +75,8 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
           @Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
           @Nullable @JsonProperty("enableTaskAutoScaler") Boolean 
enableTaskAutoScaler,
           @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long 
minTriggerScaleActionFrequencyMillis,
+          @Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay,
+          @Nullable @JsonProperty("minScaleDownDelay") Duration 
minScaleDownDelay,
           @Nullable @JsonProperty("lagAggregate") AggregateFunction 
lagAggregate,
           @Nullable @JsonProperty("stopTaskCountRatio") Double 
stopTaskCountRatio
   )
@@ -105,11 +111,21 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
     this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
     this.minTriggerScaleActionFrequencyMillis =
         minTriggerScaleActionFrequencyMillis != null ? 
minTriggerScaleActionFrequencyMillis : 600000;
+    this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay, 
Duration.millis(this.minTriggerScaleActionFrequencyMillis));
+    this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, 
Duration.millis(this.minTriggerScaleActionFrequencyMillis));
 
     Preconditions.checkArgument(
         stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && 
stopTaskCountRatio <= 1.0),
         "0.0 < stopTaskCountRatio <= 1.0"
     );
+    Preconditions.checkArgument(
+        this.minScaleUpDelay.getMillis() >= 0,
+        "minScaleUpDelay must be a duration >= 0 millis"
+    );
+    Preconditions.checkArgument(
+        this.minScaleDownDelay.getMillis() >= 0,
+        "minScaleDownDelay must be a duration >= 0 millis"
+    );
     this.stopTaskCountRatio = stopTaskCountRatio;
   }
 
@@ -213,6 +229,7 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
     return enableTaskAutoScaler;
   }
 
+  @Deprecated
   @Override
   @JsonProperty
   public long getMinTriggerScaleActionFrequencyMillis()
@@ -220,6 +237,20 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
     return minTriggerScaleActionFrequencyMillis;
   }
 
+  @Override
+  @JsonProperty
+  public Duration getMinScaleUpDelay()
+  {
+    return minScaleUpDelay;
+  }
+
+  @Override
+  @JsonProperty
+  public Duration getMinScaleDownDelay()
+  {
+    return minScaleDownDelay;
+  }
+
   @JsonProperty
   @Nullable
   public AggregateFunction getLagAggregate()
@@ -244,8 +275,10 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
             ", taskCountMin=" + taskCountMin +
             ", taskCountStart=" + taskCountStart +
             ", minTriggerScaleActionFrequencyMillis=" + 
minTriggerScaleActionFrequencyMillis +
+            ", minScaleUpDelay=" + minScaleUpDelay +
+            ", minScaleDownDelay=" + minScaleDownDelay +
             ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
-            ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
+            ", lagCollectionRangeMillis=" + lagCollectionRangeMillis +
             ", scaleOutThreshold=" + scaleOutThreshold +
             ", triggerScaleOutFractionThreshold=" + 
triggerScaleOutFractionThreshold +
             ", scaleInThreshold=" + scaleInThreshold +
@@ -286,6 +319,8 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
            scaleOutStep == that.scaleOutStep &&
            enableTaskAutoScaler == that.enableTaskAutoScaler &&
            minTriggerScaleActionFrequencyMillis == 
that.minTriggerScaleActionFrequencyMillis &&
+           Objects.equals(minScaleUpDelay, that.minScaleUpDelay) &&
+           Objects.equals(minScaleDownDelay, that.minScaleDownDelay) &&
            Objects.equals(taskCountStart, that.taskCountStart) &&
            lagAggregate == that.lagAggregate &&
            Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
@@ -310,6 +345,8 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
         scaleOutStep,
         enableTaskAutoScaler,
         minTriggerScaleActionFrequencyMillis,
+        minScaleUpDelay,
+        minScaleDownDelay,
         lagAggregate,
         stopTaskCountRatio
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java
index 93653f28fe9..db9bdb3d0c8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagBasedAutoScalerConfigTest.java
@@ -21,12 +21,14 @@ package 
org.apache.druid.indexing.overlord.supervisor.autoscaler;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class LagBasedAutoScalerConfigTest
 {
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
 
   @Test
   public void testDefaults()
@@ -48,6 +50,8 @@ public class LagBasedAutoScalerConfigTest
         null,
         null,
         null,
+        null,
+        null,
         null
     );
 
@@ -63,6 +67,9 @@ public class LagBasedAutoScalerConfigTest
     Assert.assertEquals(1, config.getScaleInStep());
     Assert.assertEquals(2, config.getScaleOutStep());
     Assert.assertEquals(600000, 
config.getMinTriggerScaleActionFrequencyMillis());
+    // When minScaleUpDelay/minScaleDownDelay are not set, they fall back to 
minTriggerScaleActionFrequencyMillis
+    Assert.assertEquals(Duration.millis(600000), config.getMinScaleUpDelay());
+    Assert.assertEquals(Duration.millis(600000), 
config.getMinScaleDownDelay());
     Assert.assertNull(config.getLagAggregate());
     Assert.assertNull(config.getStopTaskCountRatio());
     Assert.assertEquals(0, config.getTaskCountMax());
@@ -88,7 +95,9 @@ public class LagBasedAutoScalerConfigTest
         1,
         5,
         true,
-        5000L,
+        null,
+        Duration.millis(3000),
+        Duration.millis(7000),
         AggregateFunction.SUM,
         0.1
     );
@@ -125,6 +134,8 @@ public class LagBasedAutoScalerConfigTest
                 true,
                 null,
                 null,
+                null,
+                null,
                 null
             )
     );
@@ -151,6 +162,8 @@ public class LagBasedAutoScalerConfigTest
                 true,
                 null,
                 null,
+                null,
+                null,
                 null
             )
     );
@@ -177,6 +190,8 @@ public class LagBasedAutoScalerConfigTest
                 true,
                 null,
                 null,
+                null,
+                null,
                 null
             )
     );
@@ -206,6 +221,8 @@ public class LagBasedAutoScalerConfigTest
             true,
             null,
             null,
+            null,
+            null,
             0.0
         )
     );
@@ -230,6 +247,8 @@ public class LagBasedAutoScalerConfigTest
             true,
             null,
             null,
+            null,
+            null,
             -0.1
         )
     );
@@ -255,6 +274,8 @@ public class LagBasedAutoScalerConfigTest
             true,
             null,
             null,
+            null,
+            null,
             1.1
         )
     );
@@ -278,11 +299,256 @@ public class LagBasedAutoScalerConfigTest
         true,
         null,
         null,
+        null,
+        null,
         0.5
     );
     Assert.assertEquals(Double.valueOf(0.5), config.getStopTaskCountRatio());
   }
 
+  @Test
+  public void testScaleDelayFallback() throws Exception
+  {
+    // Neither minScaleUpDelay nor minScaleDownDelay set: both fall back to 
minTriggerScaleActionFrequencyMillis
+    LagBasedAutoScalerConfig baseOnly = new LagBasedAutoScalerConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        10,
+        null,
+        1,
+        null,
+        null,
+        false,
+        60000L,
+        null,
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(60000L, 
baseOnly.getMinTriggerScaleActionFrequencyMillis());
+    Assert.assertEquals(Duration.millis(60000), baseOnly.getMinScaleUpDelay());
+    Assert.assertEquals(Duration.millis(60000), 
baseOnly.getMinScaleDownDelay());
+
+    // Only minScaleUpDelay set
+    LagBasedAutoScalerConfig upOnly = new LagBasedAutoScalerConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        10,
+        null,
+        1,
+        null,
+        null,
+        false,
+        60000L,
+        Duration.millis(15000),
+        null,
+        null,
+        null
+    );
+    Assert.assertEquals(Duration.millis(15000), upOnly.getMinScaleUpDelay());
+    Assert.assertEquals(Duration.millis(60000), upOnly.getMinScaleDownDelay());
+
+    // Only minScaleDownDelay set
+    LagBasedAutoScalerConfig downOnly = new LagBasedAutoScalerConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        10,
+        null,
+        1,
+        null,
+        null,
+        false,
+        60000L,
+        null,
+        Duration.millis(30000),
+        null,
+        null
+    );
+    Assert.assertEquals(Duration.millis(60000), downOnly.getMinScaleUpDelay());
+    Assert.assertEquals(Duration.millis(30000), 
downOnly.getMinScaleDownDelay());
+
+    // Both set: serde roundtrip preserves values
+    LagBasedAutoScalerConfig bothSet = new LagBasedAutoScalerConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        10,
+        null,
+        1,
+        null,
+        null,
+        false,
+        null,
+        Duration.millis(15000),
+        Duration.millis(30000),
+        null,
+        null
+    );
+    Assert.assertEquals(Duration.millis(15000), bothSet.getMinScaleUpDelay());
+    Assert.assertEquals(Duration.millis(30000), 
bothSet.getMinScaleDownDelay());
+    LagBasedAutoScalerConfig roundTripped = OBJECT_MAPPER.readValue(
+        OBJECT_MAPPER.writeValueAsString(bothSet),
+        LagBasedAutoScalerConfig.class
+    );
+    Assert.assertEquals(bothSet, roundTripped);
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testScaleDelayFallbackViaSerde() throws Exception
+  {
+    // JSON with only minTriggerScaleActionFrequencyMillis (no Duration 
fields):
+    // both getMinScaleUpDelay() and getMinScaleDownDelay() should fall back 
to it.
+    String json = 
"{\"taskCountMax\":10,\"taskCountMin\":1,\"minTriggerScaleActionFrequencyMillis\":45000}";
+    LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(json, 
LagBasedAutoScalerConfig.class);
+    Assert.assertEquals(45000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+    Assert.assertEquals(Duration.millis(45000), config.getMinScaleUpDelay());
+    Assert.assertEquals(Duration.millis(45000), config.getMinScaleDownDelay());
+
+    // JSON with minTriggerScaleActionFrequencyMillis and only minScaleUpDelay:
+    // getMinScaleUpDelay() should return the explicit value; 
getMinScaleDownDelay() falls back.
+    String jsonUpOnly = "{\"taskCountMax\":10,\"taskCountMin\":1,"
+                        + "\"minTriggerScaleActionFrequencyMillis\":45000,"
+                        + "\"minScaleUpDelay\":\"PT10S\"}";
+    LagBasedAutoScalerConfig configUpOnly = 
OBJECT_MAPPER.readValue(jsonUpOnly, LagBasedAutoScalerConfig.class);
+    Assert.assertEquals(Duration.standardSeconds(10), 
configUpOnly.getMinScaleUpDelay());
+    Assert.assertEquals(Duration.millis(45000), 
configUpOnly.getMinScaleDownDelay());
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testMinTriggerScaleActionFrequencyMillisSerdeCompat() throws 
Exception
+  {
+    final long defaultMinTriggerMillis = 600_000L;
+
+    // Backwards-compat: nothing set -> deprecated field gets its default and 
both direction
+    // delays fall back to it.
+    {
+      LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+          "{\"taskCountMax\":10,\"taskCountMin\":1}",
+          LagBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(defaultMinTriggerMillis, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.millis(defaultMinTriggerMillis), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.millis(defaultMinTriggerMillis), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Backwards-compat: legacy spec sets only the deprecated field. Both 
direction delays fall
+    // back to it.
+    {
+      LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+          
"{\"taskCountMax\":10,\"taskCountMin\":1,\"minTriggerScaleActionFrequencyMillis\":900000}",
+          LagBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(900_000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.millis(900_000), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.millis(900_000), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Forwards-compat: direction delays set, deprecated field omitted. 
Deprecated field defaults.
+    {
+      LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+          "{\"taskCountMax\":10,\"taskCountMin\":1,"
+          + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}",
+          LagBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(defaultMinTriggerMillis, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.standardMinutes(2), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.standardMinutes(15), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Forwards-compat: deprecated field AND direction delays set (overlapping 
migration window).
+    // Direction delays win for their own direction; the deprecated field is 
preserved.
+    {
+      LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+          "{\"taskCountMax\":10,\"taskCountMin\":1,"
+          + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+          + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}",
+          LagBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(900_000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.standardMinutes(2), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.standardMinutes(15), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Mixed: only minScaleUpDelay + deprecated field set. Down falls back to 
the deprecated field.
+    {
+      LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+          "{\"taskCountMax\":10,\"taskCountMin\":1,"
+          + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+          + "\"minScaleUpDelay\":\"PT2M\"}",
+          LagBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(900_000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.standardMinutes(2), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.millis(900_000), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Mixed: only minScaleDownDelay + deprecated field set. Up falls back to 
the deprecated field.
+    {
+      LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+          "{\"taskCountMax\":10,\"taskCountMin\":1,"
+          + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+          + "\"minScaleDownDelay\":\"PT15M\"}",
+          LagBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(900_000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.millis(900_000), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.standardMinutes(15), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Mixed: deprecated field omitted, only minScaleUpDelay set. Down falls 
back to the
+    // deprecated field's default.
+    {
+      LagBasedAutoScalerConfig config = OBJECT_MAPPER.readValue(
+          
"{\"taskCountMax\":10,\"taskCountMin\":1,\"minScaleUpDelay\":\"PT2M\"}",
+          LagBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(defaultMinTriggerMillis, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.standardMinutes(2), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.millis(defaultMinTriggerMillis), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+  }
+
+  private void assertRoundTrips(LagBasedAutoScalerConfig config) throws 
Exception
+  {
+    LagBasedAutoScalerConfig roundTripped = OBJECT_MAPPER.readValue(
+        OBJECT_MAPPER.writeValueAsString(config),
+        LagBasedAutoScalerConfig.class
+    );
+    Assert.assertEquals(config, roundTripped);
+  }
+
   @Test
   public void testEqualsAndHashCode()
   {
@@ -302,6 +568,8 @@ public class LagBasedAutoScalerConfigTest
         3,
         true,
         7000L,
+        null,
+        null,
         AggregateFunction.SUM,
         0.5
     );
@@ -321,6 +589,8 @@ public class LagBasedAutoScalerConfigTest
         1,
         true,
         7000L,
+        null,
+        null,
         AggregateFunction.AVERAGE,
         0.5
     );
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
index 7c20855b033..bf3d5f9d71e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java
@@ -441,7 +441,7 @@ public class SeekableStreamSupervisorSpecTest extends 
SeekableStreamSupervisorTe
             .stream()
             .map(metric -> 
metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
             .filter(Objects::nonNull)
-            .anyMatch("minTriggerScaleActionFrequencyMillis not elapsed 
yet"::equals));
+            .anyMatch("Scale cooldown not elapsed yet"::equals));
     
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 
1);
     autoScaler.reset();
     autoScaler.stop();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index d59e2711ce8..5f6986551a7 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -49,10 +49,13 @@ import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerListener;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@@ -81,6 +84,8 @@ import 
org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.PendingSegmentRecord;
@@ -2683,6 +2688,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         true,
         null,
         null,
+        null,
+        null,
         null
     );
     SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1, 
autoScalerConfig, null);
@@ -2760,6 +2767,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         true,
         null,
         null,
+        null,
+        null,
         0.4
     );
     SeekableStreamSupervisorIOConfig config = new 
SeekableStreamSupervisorIOConfig(
@@ -3303,6 +3312,22 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     }
   }
 
+  private class StateOverrideTestSeekableStreamSupervisor extends 
TestSeekableStreamSupervisor
+  {
+    private final SupervisorStateManager.State state;
+
+    private 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.State state)
+    {
+      this.state = state;
+    }
+
+    @Override
+    public SupervisorStateManager.State getState()
+    {
+      return state;
+    }
+  }
+
   private class TestEmittingTestSeekableStreamSupervisor extends 
BaseTestSeekableStreamSupervisor
   {
     private final CountDownLatch latch;
@@ -3572,6 +3597,134 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   }
 
 
+  @Test
+  public void testDynamicAllocationScaleUpAllowedWhenCooldownElapsed()
+  {
+    final long zeroCooldown = 0L;
+    final long unusedCooldown = Duration.standardHours(1).getMillis();
+    final StubServiceEmitter scalingEmitter = 
setupSupervisorForAutoScalingTest(zeroCooldown, unusedCooldown, 2);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    // minScaleUpDelay = 0 means any scale-up is immediately allowed.
+    supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {}, 
scalingEmitter);
+
+    Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertEquals("Exactly one required-tasks emission expected", 1, 
events.size());
+    assertScaledToTaskCount(events.get(0), 5);
+  }
+
+  @Test
+  public void testDynamicAllocationScaleUpBlockedWhenCooldownNotElapsed()
+  {
+    final long scaleUpCooldown = Duration.standardHours(1).getMillis();
+    final long unusedCooldown = 0L;
+    final StubServiceEmitter scalingEmitter = 
setupSupervisorForAutoScalingTest(scaleUpCooldown, unusedCooldown, 2);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    // First scale-up succeeds and stamps the last-scale timestamp.
+    supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {}, 
scalingEmitter);
+    Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue());
+
+    // Second scale-up is within the 1h minScaleUpDelay window and must be 
blocked.
+    supervisor.handleDynamicAllocationTasksNotice(() -> 7, () -> {}, 
scalingEmitter);
+    Assert.assertEquals("Second scale-up must not take effect", 5, 
supervisor.getIoConfig().getTaskCount().intValue());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertEquals("Two required-tasks emissions expected (one applied, 
one skipped)", 2, events.size());
+    // First emission: the successful scale carries no skip-reason dim and 
reports the applied count.
+    assertScaledToTaskCount(events.get(0), 5);
+    // Second emission: the gated scale carries the cooldown skip-reason dim 
and the proposed (not applied) count.
+    assertScaleSkipped(events.get(1), 7, "Scale cooldown not elapsed yet");
+  }
+
+  @Test
+  public void testDynamicAllocationScaleDownAllowedWhenCooldownElapsed()
+  {
+    final long unusedCooldown = Duration.standardHours(1).getMillis();
+    final long zeroCooldown = 0L;
+    final StubServiceEmitter scalingEmitter = 
setupSupervisorForAutoScalingTest(unusedCooldown, zeroCooldown, 5);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    // minScaleDownDelay = 0 means any scale-down is immediately allowed.
+    supervisor.handleDynamicAllocationTasksNotice(() -> 2, () -> {}, 
scalingEmitter);
+
+    Assert.assertEquals(2, supervisor.getIoConfig().getTaskCount().intValue());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertEquals("Exactly one required-tasks emission expected", 1, 
events.size());
+    assertScaledToTaskCount(events.get(0), 2);
+  }
+
+  @Test
+  public void testDynamicAllocationScaleDownBlockedWhenCooldownNotElapsed()
+  {
+    final long unusedCooldown = 0L;
+    final long scaleDownCooldown = Duration.standardHours(1).getMillis();
+    final StubServiceEmitter scalingEmitter = 
setupSupervisorForAutoScalingTest(unusedCooldown, scaleDownCooldown, 5);
+    final TestSeekableStreamSupervisor supervisor =
+        new 
StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING);
+
+    // First scale-down succeeds and stamps the last-scale timestamp.
+    supervisor.handleDynamicAllocationTasksNotice(() -> 3, () -> {}, 
scalingEmitter);
+    Assert.assertEquals(3, supervisor.getIoConfig().getTaskCount().intValue());
+
+    // Second scale-down is within the 1h minScaleDownDelay window and must be 
blocked.
+    supervisor.handleDynamicAllocationTasksNotice(() -> 1, () -> {}, 
scalingEmitter);
+    Assert.assertEquals("Second scale-down must not take effect", 3, 
supervisor.getIoConfig().getTaskCount().intValue());
+
+    final List<ServiceMetricEvent> events =
+        
scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC);
+    Assert.assertEquals("Two required-tasks emissions expected (one applied, 
one skipped)", 2, events.size());
+    assertScaledToTaskCount(events.get(0), 3);
+    assertScaleSkipped(events.get(1), 1, "Scale cooldown not elapsed yet");
+  }
+
+  /**
+   * Asserts that a required-tasks emission represents an scale event: it 
carries the standard
+   * supervisor/datasource/stream dims, no scalingSkipReason dim, and the 
metric value matches the
+   * new task count.
+   */
+  private static void assertScaledToTaskCount(ServiceMetricEvent event, int 
expectedRequiredCount)
+  {
+    assertStandardDimensions(event);
+    Assert.assertNull(
+        "Attempted scale must not carry a scalingSkipReason dim",
+        
event.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+    );
+    Assert.assertEquals(expectedRequiredCount, event.getValue().intValue());
+  }
+
+  /**
+   * Asserts that a required-tasks emission represents a skipped scale: it 
carries the standard
+   * supervisor/datasource/stream dims, a scalingSkipReason dim equal to 
{@code expectedReason},
+   * and the metric value matches the proposed (not applied) task count.
+   */
+  private static void assertScaleSkipped(ServiceMetricEvent event, int 
expectedRequiredCount, String expectedReason)
+  {
+    assertStandardDimensions(event);
+    Assert.assertEquals(
+        expectedReason,
+        
event.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION)
+    );
+    Assert.assertEquals(expectedRequiredCount, event.getValue().intValue());
+  }
+
+  private static void assertStandardDimensions(ServiceMetricEvent event)
+  {
+    final Map<String, Object> dims = event.getUserDims();
+    Assert.assertEquals(SUPERVISOR_ID, dims.get(DruidMetrics.SUPERVISOR_ID));
+    Assert.assertEquals(DATASOURCE, dims.get(DruidMetrics.DATASOURCE));
+    Assert.assertEquals(STREAM, dims.get(DruidMetrics.STREAM));
+  }
+
   private static TestSeekableStreamIndexTask createTestTask(String taskId, 
String groupId, @Nullable Integer serverPriority, 
SeekableStreamIndexTaskIOConfig taskIoConfig, RecordSupplier recordSupplier)
   {
     return new TestSeekableStreamIndexTask(
@@ -3588,4 +3741,118 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         serverPriority
     );
   }
+
+  /**
+   * Resets the {@link #spec} and {@link #taskMaster} mocks so the supervisor 
sees an ioConfig with
+   * the given direction-specific cooldowns and so {@code 
changeTaskCountInIOConfig} can run
+   * without hitting unmocked calls. Returns a dedicated emitter for the 
caller to pass into the
+   * notice handler so dynamic-allocation events can be asserted in isolation.
+   */
+  private StubServiceEmitter setupSupervisorForAutoScalingTest(
+      long minScaleUpDelayMillis,
+      long minScaleDownDelayMillis,
+      int initialTaskCount
+  )
+  {
+    final AutoScalerConfig autoScalerConfig = testAutoScalerConfig(
+        minScaleUpDelayMillis,
+        minScaleDownDelayMillis
+    );
+    final SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(
+        initialTaskCount,
+        autoScalerConfig,
+        null
+    );
+    return resetSpecAndTaskMasterForScaling(ioConfig);
+  }
+
+  /**
+   * Returns a minimal test-only {@link AutoScalerConfig}
+   */
+  private static AutoScalerConfig testAutoScalerConfig(long 
minScaleUpDelayMillis, long minScaleDownDelayMillis)
+  {
+    return new AutoScalerConfig()
+    {
+      @Override
+      public boolean getEnableTaskAutoScaler()
+      {
+        return true;
+      }
+
+      @Override
+      public long getMinTriggerScaleActionFrequencyMillis()
+      {
+        return 0L;
+      }
+
+      @Override
+      public Duration getMinScaleUpDelay()
+      {
+        return Duration.millis(minScaleUpDelayMillis);
+      }
+
+      @Override
+      public Duration getMinScaleDownDelay()
+      {
+        return Duration.millis(minScaleDownDelayMillis);
+      }
+
+      @Override
+      public int getTaskCountMax()
+      {
+        return 100;
+      }
+
+      @Override
+      public int getTaskCountMin()
+      {
+        return 1;
+      }
+
+      @Override
+      public Integer getTaskCountStart()
+      {
+        return null;
+      }
+
+      @Override
+      public Double getStopTaskCountRatio()
+      {
+        return null;
+      }
+
+      @Override
+      public SupervisorTaskAutoScaler createAutoScaler(
+          Supervisor supervisor,
+          SupervisorSpec spec,
+          ServiceEmitter emitter
+      )
+      {
+        throw new UnsupportedOperationException("test autoscaler config: 
createAutoScaler not used");
+      }
+    };
+  }
+
+  private StubServiceEmitter 
resetSpecAndTaskMasterForScaling(SeekableStreamSupervisorIOConfig ioConfig)
+  {
+    final StubServiceEmitter scalingEmitter = new 
StubServiceEmitter("scaling", "localhost");
+
+    EasyMock.reset(spec, taskMaster);
+    EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    // changeTaskCountInIOConfig calls this; absent path just logs and moves 
on.
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+
+    replayAll();
+    return scalingEmitter;
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index fa50d87b56d..05724d1e837 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_IDLE_WEIGHT;
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_LAG_WEIGHT;
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_SCALE_DELAY;
-import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS;
 import static 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig.DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
 
 @SuppressWarnings("TextBlockMigration")
@@ -51,6 +50,7 @@ public class CostBasedAutoScalerConfigTest
                   + "  \"lagWeight\": 0.6,\n"
                   + "  \"idleWeight\": 0.4,\n"
                   + "  \"highLagThreshold\": 30000,\n"
+                  + "  \"minScaleUpDelay\": \"PT5M\",\n"
                   + "  \"minScaleDownDelay\": \"PT10M\",\n"
                   + "  \"scaleDownDuringTaskRolloverOnly\": true\n"
                   + "}";
@@ -61,11 +61,11 @@ public class CostBasedAutoScalerConfigTest
     Assert.assertEquals(100, config.getTaskCountMax());
     Assert.assertEquals(5, config.getTaskCountMin());
     Assert.assertEquals(Integer.valueOf(10), config.getTaskCountStart());
-    Assert.assertEquals(600000L, 
config.getMinTriggerScaleActionFrequencyMillis());
     Assert.assertEquals(Double.valueOf(0.8), config.getStopTaskCountRatio());
     Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
     Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
     Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+    Assert.assertEquals(Duration.standardMinutes(5), 
config.getMinScaleUpDelay());
     Assert.assertEquals(Duration.standardMinutes(10), 
config.getMinScaleDownDelay());
     Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
     Assert.assertEquals(30000, config.getHighLagThreshold());
@@ -95,12 +95,10 @@ public class CostBasedAutoScalerConfigTest
 
     // Check defaults
     Assert.assertEquals(DEFAULT_SCALE_ACTION_PERIOD_MILLIS, 
config.getScaleActionPeriodMillis());
-    Assert.assertEquals(
-        DEFAULT_MIN_TRIGGER_SCALE_ACTION_FREQUENCY_MILLIS,
-        config.getMinTriggerScaleActionFrequencyMillis()
-    );
     Assert.assertEquals(DEFAULT_LAG_WEIGHT, config.getLagWeight(), 0.001);
     Assert.assertEquals(DEFAULT_IDLE_WEIGHT, config.getIdleWeight(), 0.001);
+    // minScaleUpDelay and minScaleDownDelay each have their own independent 
default
+    Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), 
config.getMinScaleUpDelay());
     Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, 
config.getMinScaleDownDelay());
     Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
     Assert.assertNull(config.getTaskCountStart());
@@ -183,11 +181,11 @@ public class CostBasedAutoScalerConfigTest
                                                                 
.taskCountMin(5)
                                                                 
.taskCountStart(10)
                                                                 
.enableTaskAutoScaler(true)
-                                                                
.minTriggerScaleActionFrequencyMillis(600000L)
                                                                 
.stopTaskCountRatio(0.8)
                                                                 
.scaleActionPeriodMillis(60000L)
                                                                 .lagWeight(0.6)
                                                                 
.idleWeight(0.4)
+                                                                
.minScaleUpDelay(Duration.standardMinutes(5))
                                                                 
.minScaleDownDelay(Duration.standardMinutes(10))
                                                                 
.scaleDownDuringTaskRolloverOnly(true)
                                                                 
.highLagThreshold(30000)
@@ -197,13 +195,156 @@ public class CostBasedAutoScalerConfigTest
     Assert.assertEquals(100, config.getTaskCountMax());
     Assert.assertEquals(5, config.getTaskCountMin());
     Assert.assertEquals(Integer.valueOf(10), config.getTaskCountStart());
-    Assert.assertEquals(600000L, 
config.getMinTriggerScaleActionFrequencyMillis());
     Assert.assertEquals(Double.valueOf(0.8), config.getStopTaskCountRatio());
     Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
     Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
     Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+    Assert.assertEquals(Duration.standardMinutes(5), 
config.getMinScaleUpDelay());
     Assert.assertEquals(Duration.standardMinutes(10), 
config.getMinScaleDownDelay());
     Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
     Assert.assertEquals(30000, config.getHighLagThreshold());
   }
+
+  @Test
+  public void testScaleDelayDefaults() throws Exception
+  {
+    // Neither set: each direction gets its own independent default
+    CostBasedAutoScalerConfig defaults = CostBasedAutoScalerConfig.builder()
+                                                                  
.taskCountMax(10)
+                                                                  
.taskCountMin(1)
+                                                                  .build();
+    Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), 
defaults.getMinScaleUpDelay());
+    Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, 
defaults.getMinScaleDownDelay());
+
+    // Only minScaleUpDelay set: up uses explicit value, down uses its default
+    CostBasedAutoScalerConfig upOnly = CostBasedAutoScalerConfig.builder()
+                                                                
.taskCountMax(10)
+                                                                
.taskCountMin(1)
+                                                                
.minScaleUpDelay(Duration.standardMinutes(5))
+                                                                .build();
+    Assert.assertEquals(Duration.standardMinutes(5), 
upOnly.getMinScaleUpDelay());
+    Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, 
upOnly.getMinScaleDownDelay());
+
+    // Only minScaleDownDelay set: down uses explicit value, up uses its own 
default (does not fall back to down)
+    CostBasedAutoScalerConfig downOnly = CostBasedAutoScalerConfig.builder()
+                                                                  
.taskCountMax(10)
+                                                                  
.taskCountMin(1)
+                                                                  
.minScaleDownDelay(Duration.standardMinutes(20))
+                                                                  .build();
+    Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), 
downOnly.getMinScaleUpDelay());
+    Assert.assertEquals(Duration.standardMinutes(20), 
downOnly.getMinScaleDownDelay());
+
+    // Both set: serde roundtrip preserves values
+    CostBasedAutoScalerConfig bothSet = CostBasedAutoScalerConfig.builder()
+                                                                 
.taskCountMax(10)
+                                                                 
.taskCountMin(1)
+                                                                 
.minScaleUpDelay(Duration.standardMinutes(5))
+                                                                 
.minScaleDownDelay(Duration.standardMinutes(20))
+                                                                 .build();
+    Assert.assertEquals(Duration.standardMinutes(5), 
bothSet.getMinScaleUpDelay());
+    Assert.assertEquals(Duration.standardMinutes(20), 
bothSet.getMinScaleDownDelay());
+    CostBasedAutoScalerConfig roundTripped = 
mapper.readValue(mapper.writeValueAsString(bothSet), 
CostBasedAutoScalerConfig.class);
+    Assert.assertEquals(bothSet, roundTripped);
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testMinTriggerScaleActionFrequencyMillisSerdeCompat() throws 
Exception
+  {
+    final long defaultMinTriggerMillis = DEFAULT_SCALE_ACTION_PERIOD_MILLIS;
+    final Duration defaultUp = 
Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS);
+    final Duration defaultDown = DEFAULT_MIN_SCALE_DELAY;
+
+    // Backwards-compat: nothing set -> everything uses its own default.
+    {
+      CostBasedAutoScalerConfig config = mapper.readValue(
+          
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1}",
+          CostBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(defaultMinTriggerMillis, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(defaultUp, config.getMinScaleUpDelay());
+      Assert.assertEquals(defaultDown, config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Backwards-compat: legacy spec sets only the deprecated field. Direction 
delays still use
+    // their own defaults (no cross-field fallback in CostBased).
+    {
+      CostBasedAutoScalerConfig config = mapper.readValue(
+          
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+          + "\"minTriggerScaleActionFrequencyMillis\":900000}",
+          CostBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(900_000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.millis(900_000L), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(defaultDown, config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Forwards-compat: direction delays set, deprecated field omitted. 
Deprecated field defaults.
+    {
+      CostBasedAutoScalerConfig config = mapper.readValue(
+          
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+          + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}",
+          CostBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(defaultMinTriggerMillis, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.standardMinutes(2), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.standardMinutes(15), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Forwards-compat: deprecated field AND direction delays set (overlapping 
migration window).
+    // All three are honored independently.
+    {
+      CostBasedAutoScalerConfig config = mapper.readValue(
+          
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+          + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+          + "\"minScaleUpDelay\":\"PT2M\",\"minScaleDownDelay\":\"PT15M\"}",
+          CostBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(900_000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.standardMinutes(2), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.standardMinutes(15), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Only minScaleUpDelay set alongside the deprecated field: down uses its 
own default,
+    // not the deprecated field's value.
+    {
+      CostBasedAutoScalerConfig config = mapper.readValue(
+          
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+          + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+          + "\"minScaleUpDelay\":\"PT2M\"}",
+          CostBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(900_000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.standardMinutes(2), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(defaultDown, config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+
+    // Only minScaleDownDelay set alongside the deprecated field: up uses its 
own default.
+    {
+      CostBasedAutoScalerConfig config = mapper.readValue(
+          
"{\"autoScalerStrategy\":\"costBased\",\"enableTaskAutoScaler\":true,\"taskCountMax\":10,\"taskCountMin\":1,"
+          + "\"minTriggerScaleActionFrequencyMillis\":900000,"
+          + "\"minScaleDownDelay\":\"PT15M\"}",
+          CostBasedAutoScalerConfig.class
+      );
+      Assert.assertEquals(900_000L, 
config.getMinTriggerScaleActionFrequencyMillis());
+      Assert.assertEquals(Duration.millis(900_000L), 
config.getMinScaleUpDelay());
+      Assert.assertEquals(Duration.standardMinutes(15), 
config.getMinScaleDownDelay());
+      assertRoundTrips(config);
+    }
+  }
+
+  private void assertRoundTrips(CostBasedAutoScalerConfig config) throws 
Exception
+  {
+    CostBasedAutoScalerConfig roundTripped = mapper.readValue(
+        mapper.writeValueAsString(config),
+        CostBasedAutoScalerConfig.class
+    );
+    Assert.assertEquals(config, roundTripped);
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
index 26c4c20f859..6466b0966e8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -82,24 +82,10 @@ public class CostBasedAutoScalerMockTest
   @Test
   public void testScaleUpWhenOptimalGreaterThanCurrent()
   {
-    // Use config with a long barrier to test cooldown behavior
-    CostBasedAutoScalerConfig barrierConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                       
.taskCountMax(100)
-                                                                       
.taskCountMin(1)
-                                                                       
.enableTaskAutoScaler(true)
-                                                                       
.minScaleDownDelay(Duration.standardHours(1))
-                                                                       
.build();
-
-    CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
-        mockSupervisor,
-        barrierConfig,
-        mockSpec,
-        mockEmitter
-    ));
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
 
     int currentTaskCount = 10;
     int scaleUpOptimal = 17;
-    // Trigger scale-up, which should set the cooldown timer
     doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
     setupMocksForMetricsCollection(autoScaler, currentTaskCount, 5000.0, 0.1);
 
@@ -108,15 +94,6 @@ public class CostBasedAutoScalerMockTest
         scaleUpOptimal,
         autoScaler.computeTaskCountForScaleAction()
     );
-
-    // Verify cooldown blocks immediate subsequent scaling
-    doReturn(scaleUpOptimal).when(autoScaler).computeOptimalTaskCount(any());
-    setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
-    Assert.assertEquals(
-        "Scale action should be blocked during the cooldown window",
-        -1,
-        autoScaler.computeTaskCountForScaleAction()
-    );
   }
 
   @Test
@@ -135,45 +112,6 @@ public class CostBasedAutoScalerMockTest
     Assert.assertEquals("Should return -1 when it equals current (no change 
needed)", -1, result);
   }
 
-  @Test
-  public void testScaleDownBlockedReturnsMinusOne()
-  {
-    // Use config with a long barrier to test cooldown behavior
-    CostBasedAutoScalerConfig barrierConfig = 
CostBasedAutoScalerConfig.builder()
-                                                                       
.taskCountMax(100)
-                                                                       
.taskCountMin(1)
-                                                                       
.enableTaskAutoScaler(true)
-                                                                       
.minScaleDownDelay(Duration.standardHours(1))
-                                                                       
.build();
-
-    CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(
-        mockSupervisor,
-        barrierConfig,
-        mockSpec,
-        mockEmitter
-    ));
-
-    int currentTaskCount = 50;
-    int optimalCount = 30; // Lower than current (scale-down scenario)
-
-    doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
-    setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9);
-
-    // First attempt: allowed (no prior scale action)
-    Assert.assertEquals(
-        "Scale-down should succeed when no prior scale action exists",
-        optimalCount,
-        autoScaler.computeTaskCountForScaleAction()
-    );
-
-    // Second attempt: blocked by cooldown
-    Assert.assertEquals(
-        "Scale-down should be blocked during the cooldown window",
-        -1,
-        autoScaler.computeTaskCountForScaleAction()
-    );
-  }
-
   @Test
   public void testReturnsMinusOneWhenMetricsCollectionFails()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
index 7f5a0f5a9df..ffbfee77b72 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java
@@ -74,6 +74,8 @@ public class LagBasedAutoScalerTest
         4,
         true,       // enableTaskAutoScaler
         6_000_000L, // minTriggerScaleActionFrequencyMillis
+        null,       // minScaleUpDelay
+        null,       // minScaleDownDelay
         null,       // lagAggregate
         null        // stopTaskCountRatio
     );
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/ScaleDirection.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/ScaleDirection.java
new file mode 100644
index 00000000000..63523f6cfb0
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/ScaleDirection.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor.autoscaler;
+
+public enum ScaleDirection
+{
+  SCALE_UP("scale-up"), SCALE_DOWN("scale-down");
+
+  private final String label;
+
+  ScaleDirection(String label)
+  {
+    this.label = label;
+  }
+
+  @Override
+  public String toString()
+  {
+    return label;
+  }
+}
diff --git a/website/.spelling b/website/.spelling
index 99e767c4891..28701817f36 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -895,6 +895,8 @@ c3.2xlarge
 defaultManualBrokerService
 maxPriority
 minPriority
+minScaleUpDelay
+minScaleDownDelay
 NUMBER_FEATURES
 NUMBER_OF_CONTRIBUTORS
 PreparedStatement
@@ -2080,6 +2082,7 @@ autoscalers
 batch_index_task
 cgroup
 classloader
+cooldown
 com.metamx
 common.runtime.properties
 cpuacct
@@ -2168,6 +2171,8 @@ s3n
 slf4j
 sql
 sqlQuery
+scale-up
+scale-down
 successfulSending
 [S]igar
 taskBlackListCleanupPeriod


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to