This is an automated email from the ASF dual-hosted git repository.

Fly-Style pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 49de72ed768 feat: CBA idle re-modeling and separate scale up/down 
task-count boundaries (#19378)
49de72ed768 is described below

commit 49de72ed7680974a5e1b677d76b217258f7120dd
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Apr 29 10:14:36 2026 +0300

    feat: CBA idle re-modeling and separate scale up/down task-count boundaries 
(#19378)
---
 docs/ingestion/supervisor.md                       |  39 ++---
 .../CostBasedAutoScalerIntegrationTest.java        |  12 +-
 .../supervisor/autoscaler/CostBasedAutoScaler.java | 160 ++++++++-------------
 .../autoscaler/CostBasedAutoScalerConfig.java      | 108 +++++++++-----
 .../supervisor/autoscaler/CostResult.java          |  10 +-
 .../autoscaler/WeightedCostFunction.java           | 138 ++++++++++++------
 .../autoscaler/CostBasedAutoScalerConfigTest.java  |  19 ++-
 .../CostBasedAutoScalerHighLagScalingTest.java     | 154 --------------------
 .../autoscaler/CostBasedAutoScalerTest.java        | 155 +++++++++++++-------
 .../autoscaler/WeightedCostFunctionTest.java       |  90 +++++++++---
 10 files changed, 441 insertions(+), 444 deletions(-)

diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index ba92ed66b53..4e59f0fa542 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -193,29 +193,30 @@ The following example shows a supervisor spec with 
`lagBased` autoscaler:
 ```
 </details>
 
-**2. Cost-based autoscaler strategy (experimental)**
+**2. Cost-based autoscaler strategy**
 
-An autoscaler which computes the required supervisor task count via cost 
function based on ingestion lag and poll-to-idle ratio.
-Task counts are selected from a bounded range derived from the current 
partitions-per-task ratio,
-not strictly from factors/divisors of the partition count. This bounded 
partitions-per-task window enables gradual scaling while
-voiding large jumps and still allowing non-divisor task counts when needed.
+The cost-based autoscaler picks the number of ingestion tasks that minimizes a 
combined cost score. The score has two components:
 
-**It is experimental and the implementation details as well as cost function 
parameters are subject to change.**
+- **Lag cost** — how long it would take to drain the current backlog at the 
observed processing rate. More tasks reduce this cost.
+- **Idle cost** — how far the predicted idle ratio is from the target of ~25%. 
Tasks that are too busy (under-provisioned) or too idle (over-provisioned) both 
drive the score up. 
+The sweet spot is roughly 25% idle, giving headroom to absorb traffic spikes 
without wasting resources.
+
+At every evaluation interval, Druid computes the score for each candidate task 
count and picks the one with the lowest total cost.
 
 Note: Kinesis is not supported yet, support is in progress.
 
 The following table outlines the configuration properties related to the 
`costBased` autoscaler strategy:
 
-| Property|Description|Required|Default|
-|---------|-----------|--------|-------|
-|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale 
action is triggered. | No | 600000 |
-|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25 |
-|`idleWeight`|The weight of extracted poll idle value in cost function. | No | 
0.75 |
-|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when 
selecting task counts.|No| `false` |
-|`highLagThreshold`|Average partition lag threshold that triggers burst 
scale-up when set to a value greater than `0`. Set to a negative value to 
disable burst scale-up.|No|-1|
-|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before 
the next scale-up is allowed, specified as an ISO-8601 duration string.|No||
-|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action 
before the next scale-down is allowed, specified as an ISO-8601 duration 
string.|No|`PT30M`|
-|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is 
limited to periods during task rollovers only.|No|`false`|
+| Property | Description | Required | Default                   |
+|----------|-------------|----------|---------------------------|
+|`scaleActionPeriodMillis`|How often, in milliseconds, Druid evaluates whether 
to scale.|No| `600000` (10 min)         |
+|`lagWeight`|How much weight to give the lag cost relative to the idle cost. 
Higher values make the autoscaler more aggressive about adding tasks to drain 
backlog.|No| `0.4`                     |
+|`idleWeight`|How much weight to give the idle cost relative to the lag cost. 
Higher values make the autoscaler more aggressive about removing 
over-provisioned tasks.|No| `0.6`                     |
+|`useTaskCountBoundariesOnScaleUp`|Limits scale-up to a small step relative to 
the current task count, preventing large jumps. Disable to allow the autoscaler 
to jump directly to any task count.|No| `false`                   |
+|`useTaskCountBoundariesOnScaleDown`|Limits scale-down to a small step 
relative to the current task count, preventing large drops. Disable to allow 
the autoscaler to drop directly to any task count.|No| `true`                   
 |
+|`minScaleUpDelay`|Minimum cooldown after a scale-up before the next scale-up 
is allowed. Specified as an ISO-8601 duration.|No| `scaleActionPeriodMillis` |
+|`minScaleDownDelay`|Minimum cooldown after a scale-down before the next 
scale-down is allowed. Specified as an ISO-8601 duration.|No| `PT30M`           
        |
+|`scaleDownDuringTaskRolloverOnly`|If `true`, scale-down actions are deferred 
until the next task rollover. This avoids disrupting in-progress ingestion.|No| 
`false`                   |
 
 The following example shows a supervisor spec with `costBased` autoscaler:
 
@@ -231,10 +232,10 @@ The following example shows a supervisor spec with 
`costBased` autoscaler:
       "autoScalerStrategy": "costBased",
       "taskCountMin": 1,
       "taskCountMax": 10,
+      "lagWeight": 0.4,
+      "idleWeight": 0.6,
       "minScaleUpDelay": "PT10M",
-      "minScaleDownDelay": "PT30M",
-      "lagWeight": 0.1,
-      "idleWeight": 0.9
+      "minScaleDownDelay": "PT30M"
     }
   }
 }
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index f57bd113873..7e63c3f4f25 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -143,7 +143,7 @@ public class CostBasedAutoScalerIntegrationTest extends 
StreamIndexTestBase
       }
     });
 
-    // These values were carefully handpicked to allow that test to pass in a 
stable manner.
+    // These values were carefully handpicked to allow that test to pass 
stably.
     final CostBasedAutoScalerConfig autoScalerConfig = 
CostBasedAutoScalerConfig
         .builder()
         .enableTaskAutoScaler(true)
@@ -152,8 +152,8 @@ public class CostBasedAutoScalerIntegrationTest extends 
StreamIndexTestBase
         .taskCountStart(lowInitialTaskCount)
         .scaleActionPeriodMillis(500)
         .minTriggerScaleActionFrequencyMillis(1000)
-        .lagWeight(0.2)
-        .idleWeight(0.8)
+        .lagWeight(0.8)
+        .idleWeight(0.2)
         .build();
 
     final KafkaSupervisorSpec kafkaSupervisorSpec = 
createKafkaSupervisorWithAutoScaler(
@@ -192,11 +192,11 @@ public class CostBasedAutoScalerIntegrationTest extends 
StreamIndexTestBase
 
     final CostBasedAutoScalerConfig autoScalerConfig = 
CostBasedAutoScalerConfig
         .builder()
+        .enableTaskAutoScaler(true)
         .taskCountMin(1)
         .taskCountMax(4)
-        .lagWeight(1.0)
-        .idleWeight(1.0)
-        .enableTaskAutoScaler(true)
+        .lagWeight(0.5)
+        .idleWeight(0.5)
         .minTriggerScaleActionFrequencyMillis(10L)
         .scaleActionPeriodMillis(10L)
         .minScaleDownDelay(Duration.standardSeconds(1))
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index fa7db4f6928..9d50266cd73 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -39,21 +39,23 @@ import 
org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.utils.CollectionUtils;
 
 import javax.annotation.Nullable;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Cost-based auto-scaler for seekable stream supervisors.
- * Uses a weighted cost function combining lag recovery time (seconds) and 
idleness cost (seconds)
+ * Uses a weighted cost function combining lag recovery time and idle-ratio 
cost
  * to determine optimal task counts.
  * <p>
- * Candidate task counts are derived by scanning a bounded window of 
partitions-per-task (PPT) values
- * around the current PPT, then converting those to task counts. This allows 
non-divisor task counts
- * while keeping changes gradual (no large jumps).
+ * Candidate task counts are derived from possible partitions-per-task ratios, 
then converted
+ * to task counts. When configured, scale-up and scale-down can independently 
limit the evaluated
+ * candidates to a small window around the current task count to avoid large 
jumps.
  * <p>
- * Scale-up and scale-down are both evaluated proactively.
- * Future versions may perform scale-down on task rollover only.
+ * Scale-up is applied during regular scale-action checks. Scale-down is 
applied during regular
+ * checks unless {@link 
CostBasedAutoScalerConfig#isScaleDownOnTaskRolloverOnly()} defers it
+ * to task rollover.
  */
 public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
 {
@@ -64,21 +66,17 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   public static final String OPTIMAL_TASK_COUNT_METRIC = 
"task/autoScaler/costBased/optimalTaskCount";
   public static final String INVALID_METRICS_COUNT = 
"task/autoScaler/costBased/invalidMetrics";
 
-  static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
-  static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = 
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
-
   /**
-   * If average partition lag crosses this value and the processing rate is
-   * still zero, scaling actions are skipped and an alert is raised.
+   * Maximum number of candidate task counts to evaluate above or below the 
current task count
+   * when scale-up or scale-down boundaries are enabled.
    */
-  static final int MAX_IDLENESS_PARTITION_LAG = 10_000;
+  static final int BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK = 2;
 
   /**
-   * Divisor for partition count in the K formula: K = (partitionCount / 
K_PARTITION_DIVISOR) / sqrt(currentTaskCount).
-   * This controls how aggressive the scaling is relative to partition count.
-   * That value was chosen by carefully analyzing the math model behind the 
implementation.
+   * If the average partition lag crosses this value and the processing rate is
+   * still zero, scaling actions are skipped and an alert is raised.
    */
-  static final double K_PARTITION_DIVISOR = 6.4;
+  static final int MAX_IDLENESS_PARTITION_LAG = 10_000;
 
   private final String supervisorId;
   private final SeekableStreamSupervisor supervisor;
@@ -176,7 +174,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     // Take the current task count but clamp it to the configured boundaries 
if it is outside the boundaries.
     // There might be a configuration instance with a handwritten taskCount 
that is outside the boundaries.
     final boolean isTaskCountOutOfBounds = currentTaskCount < 
config.getTaskCountMin()
-                                     || currentTaskCount > 
config.getTaskCountMax();
+                                           || currentTaskCount > 
config.getTaskCountMax();
     if (isTaskCountOutOfBounds) {
       currentTaskCount = Math.min(config.getTaskCountMax(), 
Math.max(config.getTaskCountMin(), currentTaskCount));
     }
@@ -188,16 +186,28 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     // regardless of optimal task count, to get back to a safe state.
     if (isTaskCountOutOfBounds) {
       taskCount = currentTaskCount;
-      log.info("Task count for supervisor[%s] was out of bounds [%d,%d], 
urgently scaling from [%d] to [%d].",
-               supervisorId, config.getTaskCountMin(), 
config.getTaskCountMax(), currentTaskCount, currentTaskCount);
+      log.info(
+          "Task count for supervisor[%s] was out of bounds [%d,%d], urgently 
scaling from [%d] to [%d].",
+          supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), 
currentTaskCount, currentTaskCount
+      );
     } else if (optimalTaskCount > currentTaskCount) {
       taskCount = optimalTaskCount;
-      log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
up).", supervisorId, currentTaskCount, taskCount);
+      log.info(
+          "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
up).",
+          supervisorId,
+          currentTaskCount,
+          taskCount
+      );
     } else if (!config.isScaleDownOnTaskRolloverOnly()
                && optimalTaskCount < currentTaskCount
                && optimalTaskCount > 0) {
       taskCount = optimalTaskCount;
-      log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
down).", supervisorId, currentTaskCount, taskCount);
+      log.info(
+          "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale 
down).",
+          supervisorId,
+          currentTaskCount,
+          taskCount
+      );
     } else {
       taskCount = -1;
       log.debug("No scaling required for supervisor[%s]", supervisorId);
@@ -237,7 +247,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
    *   <li>Current task count already optimal</li>
    * </ul>
    *
-   * @return optimal task count for scale-up, or -1 if no scaling action needed
+   * @return optimal task count, or -1 if no scaling action is needed
    */
   int computeOptimalTaskCount(CostMetrics metrics)
   {
@@ -261,11 +271,8 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(
         partitionCount,
         currentTaskCount,
-        (long) metrics.getAggregateLag(),
         config.getTaskCountMin(),
-        config.getTaskCountMax(),
-        config.shouldUseTaskCountBoundaries(),
-        config.getHighLagThreshold()
+        config.getTaskCountMax()
     );
 
     if (validTaskCounts.length == 0) {
@@ -290,9 +297,28 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
         config.getIdleWeight()
     );
 
-    // Find the task count which reduces cost
+    // Find the evaluated task count with the lowest cost.
     final CostResult[] costResults = new CostResult[validTaskCounts.length];
-    for (int i = 0; i < validTaskCounts.length; ++i) {
+    Arrays.fill(costResults, CostResult.INFINITE_COST);
+
+    int startIndex = 0;
+    int endIndex = validTaskCounts.length - 1;
+
+    if (config.shouldUseTaskCountBoundariesOnScaleUp()) {
+      int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts, 
currentTaskCount);
+      endIndex = currentTaskCountIndex >= 0
+                 ? Math.min(currentTaskCountIndex + 
BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK, endIndex)
+                 : endIndex;
+    }
+
+    if (config.shouldUseTaskCountBoundariesOnScaleDown()) {
+      int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts, 
currentTaskCount);
+      startIndex = currentTaskCountIndex >= 0
+                   ? Math.max(currentTaskCountIndex - 
BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK, startIndex)
+                   : startIndex;
+    }
+
+    for (int i = startIndex; i <= endIndex; ++i) {
       final int taskCount = validTaskCounts[i];
       CostResult costResult = costFunction.computeCost(metrics, taskCount, 
config);
       double cost = costResult.totalCost();
@@ -317,24 +343,22 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
       );
     }
 
-    // Scale up is performed eagerly.
+    // Scale-up is applied eagerly; scale-down may be deferred by 
computeTaskCountForScaleAction().
     return optimalTaskCount;
   }
 
   /**
-   * Generates valid task counts based on partitions-per-task ratios.
+   * Generates valid task counts by converting every possible 
partitions-per-task ratio
+   * into a task count and filtering by configured min/max task count bounds.
    *
-   * @return sorted list of valid task counts within bounds
+   * @return list of valid task counts within bounds
    */
   @SuppressWarnings({"ReassignedVariable"})
   static int[] computeValidTaskCounts(
       int partitionCount,
       int currentTaskCount,
-      double aggregateLag,
       int taskCountMin,
-      int taskCountMax,
-      boolean isTaskCountBoundariesEnabled,
-      int highLagThreshold
+      int taskCountMax
   )
   {
     if (partitionCount <= 0 || currentTaskCount <= 0) {
@@ -342,32 +366,10 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     }
 
     IntSet result = new IntArraySet();
-    final int currentPartitionsPerTask = partitionCount / currentTaskCount;
 
     // Minimum partitions per task correspond to the maximum number of tasks 
(scale up) and vice versa.
     int minPartitionsPerTask = Math.min(1, partitionCount / taskCountMax);
     int maxPartitionsPerTask = Math.max(partitionCount, partitionCount / 
taskCountMin);
-
-    if (isTaskCountBoundariesEnabled) {
-      maxPartitionsPerTask = Math.min(
-          partitionCount,
-          currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
-      );
-
-      int extraIncrease = 0;
-      if (highLagThreshold > 0) {
-        extraIncrease = computeExtraPPTIncrease(
-            highLagThreshold,
-            aggregateLag,
-            partitionCount,
-            currentTaskCount,
-            taskCountMax
-        );
-      }
-      int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + 
extraIncrease;
-      minPartitionsPerTask = Math.max(minPartitionsPerTask, 
currentPartitionsPerTask - effectiveMaxIncrease);
-    }
-
     for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= 
minPartitionsPerTask
                                                        && partitionsPerTask != 
0; partitionsPerTask--) {
       final int taskCount = (partitionCount + partitionsPerTask - 1) / 
partitionsPerTask;
@@ -378,50 +380,6 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     return result.toIntArray();
   }
 
-  /**
-   * Computes extra allowed increase in partitions-per-task in scenarios when 
the average per-partition lag
-   * is above the configured threshold.
-   * <p>
-   * This uses a logarithmic formula for consistent absolute growth:
-   * {@code deltaTasks = K * ln(lagSeverity)}
-   * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}
-   * <p>
-   * This ensures that small taskCount's get a massive relative boost,
-   * while large taskCount's receive more measured, stable increases.
-   */
-  static int computeExtraPPTIncrease(
-      double lagThreshold,
-      double aggregateLag,
-      int partitionCount,
-      int currentTaskCount,
-      int taskCountMax
-  )
-  {
-    if (partitionCount <= 0 || taskCountMax <= 0 || currentTaskCount <= 0) {
-      return 0;
-    }
-
-    final double lagPerPartition = aggregateLag / partitionCount;
-    if (lagPerPartition < lagThreshold) {
-      return 0;
-    }
-
-    final double lagSeverity = lagPerPartition / lagThreshold;
-
-    // Logarithmic growth: ln(lagSeverity) is positive when lagSeverity > 1
-    // First multoplier decreases with sqrt(currentTaskCount): aggressive when 
small, conservative when large
-    final double deltaTasks = (partitionCount / K_PARTITION_DIVISOR) / 
Math.sqrt(currentTaskCount) * Math.log(
-        lagSeverity);
-
-    final double targetTaskCount = Math.min(taskCountMax, (double) 
currentTaskCount + deltaTasks);
-
-    // Compute precise PPT reduction to avoid early integer truncation 
artifacts
-    final double currentPPT = (double) partitionCount / currentTaskCount;
-    final double targetPPT = (double) partitionCount / targetTaskCount;
-
-    return Math.max(0, (int) Math.floor(currentPPT - targetPPT));
-  }
-
   /**
    * Extracts the average poll-idle-ratio metric from task stats.
    * This metric indicates how much time the consumer spends idle waiting for 
data.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index b19a3e2cbbe..68f9f5a5ad7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -28,6 +28,7 @@ import 
org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.joda.time.Duration;
 
@@ -37,16 +38,18 @@ import java.util.Objects;
 /**
  * Configuration for cost-based auto-scaling of seekable stream supervisor 
tasks.
  * Uses a cost function combining lag and idle time metrics to determine 
optimal task counts.
- * Task counts are selected from a bounded range derived from the current 
partitions-per-task (PPT)
- * ratio, not strictly from factors/divisors of the partition count. This 
bounded PPT window enables
- * gradual scaling while avoiding large jumps and still allowing non-divisor 
task counts when needed.
+ * Candidate task counts are derived from possible partitions-per-task ratios 
and are not limited
+ * to factors/divisors of the partition count. Optional scale-up and 
scale-down boundaries control
+ * how much of that candidate set is evaluated around the current task count.
  */
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class CostBasedAutoScalerConfig implements AutoScalerConfig
 {
+  private static final EmittingLogger LOG = new 
EmittingLogger(CostBasedAutoScalerConfig.class);
+
   static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10 
minutes
-  static final double DEFAULT_LAG_WEIGHT = 0.25;
-  static final double DEFAULT_IDLE_WEIGHT = 0.75;
+  static final double DEFAULT_LAG_WEIGHT = 0.4;
+  static final double DEFAULT_IDLE_WEIGHT = 0.6;
   static final Duration DEFAULT_MIN_SCALE_DELAY = 
Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3);
 
   private final boolean enableTaskAutoScaler;
@@ -59,12 +62,18 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
 
   private final double lagWeight;
   private final double idleWeight;
-  private final boolean useTaskCountBoundaries;
-  private final int highLagThreshold;
+  private final boolean useTaskCountBoundariesOnScaleUp;
+  private final boolean useTaskCountBoundariesOnScaleDown;
   private final Duration minScaleUpDelay;
   private final Duration minScaleDownDelay;
   private final boolean scaleDownDuringTaskRolloverOnly;
 
+  /**
+   * Creates a new CostBasedAutoScalerConfig instance.
+   * <p>
+   * Note: useTaskCountBoundaries and highLagThreshold are kept for backward 
compatibility,
+   * but effectively they are removed.
+   */
   @JsonCreator
   public CostBasedAutoScalerConfig(
       @JsonProperty("taskCountMax") Integer taskCountMax,
@@ -78,6 +87,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
       @Nullable @JsonProperty("idleWeight") Double idleWeight,
       @Nullable @JsonProperty("useTaskCountBoundaries") Boolean 
useTaskCountBoundaries,
       @Nullable @JsonProperty("highLagThreshold") Integer highLagThreshold,
+      @Nullable @JsonProperty("useTaskCountBoundariesOnScaleUp") Boolean 
useTaskCountBoundariesOnScaleUp,
+      @Nullable @JsonProperty("useTaskCountBoundariesOnScaleDown") Boolean 
useTaskCountBoundariesOnScaleDown,
       @Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay,
       @Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay,
       @Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean 
scaleDownDuringTaskRolloverOnly
@@ -97,13 +108,24 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     // Cost function weights with defaults
     this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
     this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
-    this.useTaskCountBoundaries = 
Configs.valueOrDefault(useTaskCountBoundaries, false);
-    this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1);
-    this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay, 
Duration.millis(this.minTriggerScaleActionFrequencyMillis));
+    this.useTaskCountBoundariesOnScaleUp = 
Configs.valueOrDefault(useTaskCountBoundariesOnScaleUp, false);
+    this.useTaskCountBoundariesOnScaleDown = 
Configs.valueOrDefault(useTaskCountBoundariesOnScaleDown, true);
+    this.minScaleUpDelay = Configs.valueOrDefault(
+        minScaleUpDelay,
+        Duration.millis(this.minTriggerScaleActionFrequencyMillis)
+    );
     this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, 
DEFAULT_MIN_SCALE_DELAY);
     this.scaleDownDuringTaskRolloverOnly = 
Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
 
     if (this.enableTaskAutoScaler) {
+      if (useTaskCountBoundaries != null) {
+        LOG.warn("useTaskCountBoundaries is removed, "
+                 + "use useTaskCountBoundariesOnScaleUp and 
useTaskCountBoundariesOnScaleDown instead");
+      }
+      if (highLagThreshold != null) {
+        LOG.warn("highLagThreshold is removed, the autoscaler behavior is good 
enough just with cost function");
+      }
+
       Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when 
enableTaskAutoScaler is true");
       Preconditions.checkNotNull(taskCountMin, "taskCountMin is required when 
enableTaskAutoScaler is true");
       Preconditions.checkArgument(taskCountMax >= taskCountMin, "taskCountMax 
must be >= taskCountMin");
@@ -127,8 +149,14 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
 
     Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
     Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 
0");
-    Preconditions.checkArgument(this.minScaleUpDelay.getMillis() >= 0, 
"minScaleUpDelay must be a duration >= 0 millis");
-    Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0, 
"minScaleDownDelay must be a duration >= 0 millis");
+    Preconditions.checkArgument(
+        this.minScaleUpDelay.getMillis() >= 0,
+        "minScaleUpDelay must be a duration >= 0 millis"
+    );
+    Preconditions.checkArgument(
+        this.minScaleDownDelay.getMillis() >= 0,
+        "minScaleDownDelay must be a duration >= 0 millis"
+    );
   }
 
   /**
@@ -203,21 +231,25 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
   }
 
   /**
-   * Enables or disables the use of task count boundaries derived from the 
current partitions-per-task (PPT) ratio.
+   * Enables or disables the task-count evaluation window when considering 
scale-up candidates.
+   * If enabled, the optimizer only evaluates a small number of candidate task 
counts above the current count,
+   * which prevents large scale-up jumps.
    */
-  @JsonProperty("useTaskCountBoundaries")
-  public boolean shouldUseTaskCountBoundaries()
+  @JsonProperty("useTaskCountBoundariesOnScaleUp")
+  public boolean shouldUseTaskCountBoundariesOnScaleUp()
   {
-    return useTaskCountBoundaries;
+    return useTaskCountBoundariesOnScaleUp;
   }
 
   /**
-   * Per-partition lag threshold allowing to activate a burst scaleup to 
eliminate high lag.
+   * Enables or disables the task-count evaluation window when considering 
scale-down candidates.
+   * If enabled, the optimizer only evaluates a small number of candidate task 
counts below the current count,
+   * which prevents large scale-down drops.
    */
-  @JsonProperty("highLagThreshold")
-  public int getHighLagThreshold()
+  @JsonProperty("useTaskCountBoundariesOnScaleDown")
+  public boolean shouldUseTaskCountBoundariesOnScaleDown()
   {
-    return highLagThreshold;
+    return useTaskCountBoundariesOnScaleDown;
   }
 
   /**
@@ -241,8 +273,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
   }
 
   /**
-   * Indicates whether task scaling down is limited to periods during task 
rollovers only.
-   * If set to {@code false}, allows scaling down during normal task run time.
+   * Indicates whether scale-down actions are deferred to task rollover.
+   * If set to {@code false}, scale-down can happen during regular 
scale-action checks.
    */
   @JsonProperty("scaleDownDuringTaskRolloverOnly")
   public boolean isScaleDownOnTaskRolloverOnly()
@@ -275,13 +307,13 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
            && scaleActionPeriodMillis == that.scaleActionPeriodMillis
            && Double.compare(that.lagWeight, lagWeight) == 0
            && Double.compare(that.idleWeight, idleWeight) == 0
-           && useTaskCountBoundaries == that.useTaskCountBoundaries
+           && useTaskCountBoundariesOnScaleUp == 
that.useTaskCountBoundariesOnScaleUp
+           && useTaskCountBoundariesOnScaleDown == 
that.useTaskCountBoundariesOnScaleDown
            && Objects.equals(minScaleUpDelay, that.minScaleUpDelay)
            && Objects.equals(minScaleDownDelay, that.minScaleDownDelay)
            && scaleDownDuringTaskRolloverOnly == 
that.scaleDownDuringTaskRolloverOnly
            && Objects.equals(taskCountStart, that.taskCountStart)
-           && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio)
-           && highLagThreshold == that.highLagThreshold;
+           && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
   }
 
   @Override
@@ -297,8 +329,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
         scaleActionPeriodMillis,
         lagWeight,
         idleWeight,
-        useTaskCountBoundaries,
-        highLagThreshold,
+        useTaskCountBoundariesOnScaleUp,
+        useTaskCountBoundariesOnScaleDown,
         minScaleUpDelay,
         minScaleDownDelay,
         scaleDownDuringTaskRolloverOnly
@@ -318,8 +350,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
            ", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
            ", lagWeight=" + lagWeight +
            ", idleWeight=" + idleWeight +
-           ", useTaskCountBoundaries=" + useTaskCountBoundaries +
-           ", highLagThreshold=" + highLagThreshold +
+           ", useTaskCountBoundariesOnScaleUp=" + 
useTaskCountBoundariesOnScaleUp +
+           ", useTaskCountBoundariesOnScaleDown=" + 
useTaskCountBoundariesOnScaleDown +
            ", minScaleUpDelay=" + minScaleUpDelay +
            ", minScaleDownDelay=" + minScaleDownDelay +
            ", scaleDownDuringTaskRolloverOnly=" + 
scaleDownDuringTaskRolloverOnly +
@@ -341,8 +373,8 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
     private Long scaleActionPeriodMillis;
     private Double lagWeight;
     private Double idleWeight;
-    private Boolean useTaskCountBoundaries;
-    private Integer highLagThreshold;
+    private Boolean useTaskCountBoundariesOnScaleUp;
+    private Boolean useTaskCountBoundariesOnScaleDown;
     private Duration minScaleUpDelay;
     private Duration minScaleDownDelay;
     private Boolean scaleDownDuringTaskRolloverOnly;
@@ -423,15 +455,15 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
       return this;
     }
 
-    public Builder useTaskCountBoundaries(boolean useTaskCountBoundaries)
+    public Builder useTaskCountBoundariesOnScaleUp(boolean 
useTaskCountBoundariesOnScaleUp)
     {
-      this.useTaskCountBoundaries = useTaskCountBoundaries;
+      this.useTaskCountBoundariesOnScaleUp = useTaskCountBoundariesOnScaleUp;
       return this;
     }
 
-    public Builder highLagThreshold(int highLagThreshold)
+    public Builder useTaskCountBoundariesOnScaleDown(boolean 
useTaskCountBoundariesOnScaleDown)
     {
-      this.highLagThreshold = highLagThreshold;
+      this.useTaskCountBoundariesOnScaleDown = 
useTaskCountBoundariesOnScaleDown;
       return this;
     }
 
@@ -447,8 +479,10 @@ public class CostBasedAutoScalerConfig implements 
AutoScalerConfig
           scaleActionPeriodMillis,
           lagWeight,
           idleWeight,
-          useTaskCountBoundaries,
-          highLagThreshold,
+          null,
+          null,
+          useTaskCountBoundariesOnScaleUp,
+          useTaskCountBoundariesOnScaleDown,
           minScaleUpDelay,
           minScaleDownDelay,
           scaleDownDuringTaskRolloverOnly
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
index 1ad946ff517..a3925e282ef 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
@@ -21,10 +21,16 @@ package 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
 /**
  * Holds the result of a cost computation from {@link 
WeightedCostFunction#computeCost}.
- * All costs are measured in seconds.
+ * Lag cost is based on recovery time; idle cost is based on the weighted 
idle-ratio penalty.
  */
 public class CostResult
 {
+  static final CostResult INFINITE_COST = new CostResult(
+      Double.POSITIVE_INFINITY,
+      Double.POSITIVE_INFINITY,
+      Double.POSITIVE_INFINITY
+  );
+
   private final double totalCost;
   private final double lagCost;
   private final double idleCost;
@@ -32,7 +38,7 @@ public class CostResult
   /**
    * @param totalCost the weighted sum of lagCost and idleCost
    * @param lagCost   the weighted cost representing expected time (seconds) 
to recover current lag
-   * @param idleCost  the weighted cost representing total compute time 
(seconds) wasted being idle per task duration
+   * @param idleCost  the weighted cost representing the predicted idle-ratio 
penalty
    */
   public CostResult(double totalCost, double lagCost, double idleCost)
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 8fe4f9d8d06..fa60c7a8e8a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -24,17 +24,23 @@ import org.apache.druid.java.util.common.logger.Logger;
 
 /**
  * Weighted cost function using compute time as the core metric.
- * Costs represent actual time in seconds, making them intuitive and 
debuggable.
- * Uses linear scaling without mode inversions for predictable behavior.
+ * Lag cost is based on recovery time in seconds; idle cost is a penalty 
derived from
+ * the predicted idle ratio.
+ *
+ * <p>Idle cost uses a U-shaped penalty with minimum at {@link 
#IDEAL_IDLE_RATIO}.
+ * This penalizes both under-provisioning (low idle, no safety margin, lag 
risk) and
+ * over-provisioning (high idle, wasted capacity), with asymmetric severity 
controlled by
+ * {@link #UNDER_PROVISIONING_PENALTY} and {@link #OVER_PROVISIONING_PENALTY}.
  */
 public class WeightedCostFunction
 {
   private static final Logger log = new Logger(WeightedCostFunction.class);
+
   /**
    * Multiplier for a lag amplification factor; it was carefully chosen
    * during extensive testing as the most balanced multiplier for high-lag 
recovery.
    */
-  static final double LAG_AMPLIFICATION_MULTIPLIER = 0.05;
+  static final double LAG_AMPLIFICATION_MULTIPLIER = 0.4;
 
   /**
    * Minimum rate of processing for any task in records per second. This is 
used
@@ -43,17 +49,34 @@ public class WeightedCostFunction
    */
   static final double MIN_PROCESSING_RATE = 1_000;
 
+  /**
+   * Target idle ratio representing the optimal operating point for the 
U-shaped idle cost.
+   * At this ratio the idle cost is at its minimum; both lower (risk) and 
higher (waste) are penalized.
+   */
+  static final double IDEAL_IDLE_RATIO = 0.25;
+
+  /**
+   * Penalty magnitude applied when idle ratio is 0 (no safety margin).
+   * Controls the steepness of the U-shape on the under-provisioning side.
+   */
+  static final double UNDER_PROVISIONING_PENALTY = 2.0;
+
+  /**
+   * Penalty magnitude applied when idle ratio is 1 (fully wasted capacity).
+   * Controls the steepness of the U-shape on the over-provisioning side.
+   */
+  static final double OVER_PROVISIONING_PENALTY = 1.0;
+
   /**
    * Computes cost for a given task count using compute time metrics.
    * <p>
-   * Costs are measured in 'seconds':
+   * Cost components are derived from:
    * <ul>
-   *   <li><b>lagCost</b>: Expected time (seconds) to recover current lag</li>
-   *   <li><b>idleCost</b>: Total compute time (seconds) wasted being idle per 
task duration</li>
+   *   <li><b>lagCost</b>: weighted expected time to recover current lag</li>
+   *   <li><b>idleCost</b>: weighted U-shaped penalty for the predicted idle 
ratio</li>
    * </ul>
    * <p>
-   * Formula: {@code lagWeight * lagRecoveryTime + idleWeight * idlenessCost}.
-   * This approach directly connects costs to operational metrics.
+   * Formula: {@code lagWeight * lagRecoveryTime + idleWeight * 
idleRatioPenalty}.
    *
    * @return CostResult containing totalCost, lagCost, and idleCost,
    * or result with {@link Double#POSITIVE_INFINITY} for invalid inputs
@@ -65,40 +88,68 @@ public class WeightedCostFunction
   )
   {
     if (metrics == null || config == null || proposedTaskCount <= 0 || 
metrics.getPartitionCount() <= 0) {
-      return new CostResult(Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
+      return CostResult.INFINITE_COST;
     }
 
     final double avgProcessingRate = metrics.getAvgProcessingRate();
-    final double lagRecoveryTime;
     if (avgProcessingRate < 0) {
       throw DruidException.defensive("Avg processing rate[%.2f] must not be 
negative.", avgProcessingRate);
+    }
+
+    // Lag recovery time is decreasing by adding tasks and increasing by 
ejecting tasks.
+    // In case of increasing lag, we apply an amplification factor to reflect 
the urgency of addressing lag.
+    // Caution: we rely only on the metrics, the real issues may be absolutely 
different, up to hardware failure.
+    final double lagRecoveryTime;
+    if (metrics.getAggregateLag() <= 0) {
+      lagRecoveryTime = 0;
     } else {
-      // Lag recovery time is decreasing by adding tasks and increasing by 
ejecting tasks.
-      // In case of increasing lag, we apply an amplification factor to 
reflect the urgency of addressing lag.
-      // Caution: we rely only on the metrics, the real issues may be 
absolutely different, up to hardware failure.
-      if (metrics.getAggregateLag() <= 0) {
-        lagRecoveryTime = 0;
+      final double lagPerPartition = metrics.getAggregateLag() / 
metrics.getPartitionCount();
+      final double amplification = Math.max(1.0, 1.0 + 
LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
+      final double adjustedProcessingRate = Math.max(avgProcessingRate, 
MIN_PROCESSING_RATE);
+      lagRecoveryTime = metrics.getAggregateLag() * amplification / 
(proposedTaskCount * adjustedProcessingRate);
+    }
+
+    // Capacity-based idle prediction. When the proposed count would 
oversaturate the cluster
+    // (busy work exceeds available capacity), the unmet demand becomes a 
virtual lag-recovery
+    // time on the same axis as real lag — so the optimizer treats predicted 
saturation as
+    // predicted lag, not as "perfect utilization".
+    final double currentPollIdleRatio = metrics.getPollIdleRatio();
+    final int currentTaskCount = metrics.getCurrentTaskCount();
+    final double predictedIdleRatio;
+    final double overrun;
+    if (currentPollIdleRatio < 0) {
+      predictedIdleRatio = 0.5;
+      overrun = 0.0;
+    } else if (currentTaskCount <= 0 || proposedTaskCount == currentTaskCount) 
{
+      predictedIdleRatio = currentPollIdleRatio;
+      overrun = 0.0;
+    } else {
+      final double busyFraction = 1.0 - currentPollIdleRatio;
+      final double taskRatio = (double) proposedTaskCount / currentTaskCount;
+      final double rawIdle = 1.0 - busyFraction / taskRatio;
+      if (rawIdle >= 0) {
+        predictedIdleRatio = Math.min(1.0, rawIdle);
+        overrun = 0.0;
       } else {
-        final double lagPerPartition = metrics.getAggregateLag() / 
metrics.getPartitionCount();
-        final double amplification = Math.max(1.0, 1.0 + 
LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
-        final double adjustedProcessingRate = Math.max(avgProcessingRate, 
MIN_PROCESSING_RATE);
-        lagRecoveryTime = metrics.getAggregateLag() * amplification / 
(proposedTaskCount * adjustedProcessingRate);
+        predictedIdleRatio = 0.0;
+        overrun = -rawIdle;
       }
     }
+    final double virtualLagRecoveryTime = overrun * 
metrics.getTaskDurationSeconds();
 
-    final double predictedIdleRatio = estimateIdleRatio(metrics, 
proposedTaskCount);
-    final double idleCost = proposedTaskCount * predictedIdleRatio;
-    final double lagCost = config.getLagWeight() * lagRecoveryTime;
+    final double idleCost = uShapedIdleCost(predictedIdleRatio, 
proposedTaskCount);
+    final double lagCost = config.getLagWeight() * (lagRecoveryTime + 
virtualLagRecoveryTime);
     final double weightedIdleCost = config.getIdleWeight() * idleCost;
     final double cost = lagCost + weightedIdleCost;
 
     log.debug(
         "Cost for taskCount[%d]: lagCost[%.2fs], idleCost[%.2fs], "
-        + "predictedIdle[%.3f], finalCost[%.2fs]",
+        + "predictedIdle[%.3f], overrun[%.3f], finalCost[%.2fs]",
         proposedTaskCount,
         lagCost,
         weightedIdleCost,
         predictedIdleRatio,
+        overrun,
         cost
     );
 
@@ -106,33 +157,28 @@ public class WeightedCostFunction
   }
 
   /**
-   * Estimates the idle ratio for a proposed task count with linear prediction.
+   * U-shaped idle cost with minimum at {@link #IDEAL_IDLE_RATIO}.
    *
-   * @param metrics   current system metrics containing idle ratio and task 
count
-   * @param taskCount target task count to estimate an idle ratio for
-   * @return estimated idle ratio in range [0.0, 1.0]
+   * <ul>
+   *   <li>idle &lt; ideal: under-provisioning penalty, no safety margin, lag 
risk</li>
+   *   <li>idle = ideal: baseline cost only ({@code taskCount * 
IDEAL_IDLE_RATIO})</li>
+   *   <li>idle &gt; ideal: over-provisioning penalty, wasted capacity</li>
+   * </ul>
+   * <p>
+   * The ideal-idle baseline keeps cost non-zero at the optimum so the 
optimizer
+   * always has a finite trade-off against lag cost.
    */
-  private double estimateIdleRatio(CostMetrics metrics, int taskCount)
+  double uShapedIdleCost(double predictedIdleRatio, int taskCount)
   {
-    final double currentPollIdleRatio = metrics.getPollIdleRatio();
-
-    if (currentPollIdleRatio < 0) {
-      // No idle data available, assume moderate idle
-      return 0.5;
-    }
-
-    final int currentTaskCount = metrics.getCurrentTaskCount();
-    if (currentTaskCount <= 0 || taskCount == currentTaskCount) {
-      return currentPollIdleRatio;
+    final double penalty;
+    if (predictedIdleRatio < IDEAL_IDLE_RATIO) {
+      final double norm = (IDEAL_IDLE_RATIO - predictedIdleRatio) / 
IDEAL_IDLE_RATIO;
+      penalty = UNDER_PROVISIONING_PENALTY * norm * norm;
+    } else {
+      final double norm = (predictedIdleRatio - IDEAL_IDLE_RATIO) / (1.0 - 
IDEAL_IDLE_RATIO);
+      penalty = OVER_PROVISIONING_PENALTY * norm * norm;
     }
-
-    // Linear prediction (capacity-based) - existing logic
-    final double busyFraction = 1.0 - currentPollIdleRatio;
-    final double taskRatio = (double) taskCount / currentTaskCount;
-    final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 - 
busyFraction / taskRatio));
-
-    // Clamp to valid range [0, 1]
-    return Math.max(0.0, linearPrediction);
+    return taskCount * (IDEAL_IDLE_RATIO + penalty);
   }
 
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index 05724d1e837..4755b3cf496 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -49,7 +49,6 @@ public class CostBasedAutoScalerConfigTest
                   + "  \"scaleActionPeriodMillis\": 60000,\n"
                   + "  \"lagWeight\": 0.6,\n"
                   + "  \"idleWeight\": 0.4,\n"
-                  + "  \"highLagThreshold\": 30000,\n"
                   + "  \"minScaleUpDelay\": \"PT5M\",\n"
                   + "  \"minScaleDownDelay\": \"PT10M\",\n"
                   + "  \"scaleDownDuringTaskRolloverOnly\": true\n"
@@ -68,7 +67,8 @@ public class CostBasedAutoScalerConfigTest
     Assert.assertEquals(Duration.standardMinutes(5), 
config.getMinScaleUpDelay());
     Assert.assertEquals(Duration.standardMinutes(10), 
config.getMinScaleDownDelay());
     Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
-    Assert.assertEquals(30000, config.getHighLagThreshold());
+    Assert.assertFalse(config.shouldUseTaskCountBoundariesOnScaleUp());
+    Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleDown());
 
     // Test serialization back to JSON
     String serialized = mapper.writeValueAsString(config);
@@ -101,10 +101,10 @@ public class CostBasedAutoScalerConfigTest
     Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), 
config.getMinScaleUpDelay());
     Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, 
config.getMinScaleDownDelay());
     Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
+    Assert.assertFalse(config.shouldUseTaskCountBoundariesOnScaleUp());
+    Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleDown());
     Assert.assertNull(config.getTaskCountStart());
     Assert.assertNull(config.getStopTaskCountRatio());
-    // When highLagThreshold is not set, it defaults to -1 (burst scale-up 
disabled)
-    Assert.assertEquals(-1, config.getHighLagThreshold());
   }
 
   @Test
@@ -185,10 +185,11 @@ public class CostBasedAutoScalerConfigTest
                                                                 
.scaleActionPeriodMillis(60000L)
                                                                 .lagWeight(0.6)
                                                                 
.idleWeight(0.4)
+                                                                
.useTaskCountBoundariesOnScaleUp(true)
+                                                                
.useTaskCountBoundariesOnScaleDown(true)
                                                                 
.minScaleUpDelay(Duration.standardMinutes(5))
                                                                 
.minScaleDownDelay(Duration.standardMinutes(10))
                                                                 
.scaleDownDuringTaskRolloverOnly(true)
-                                                                
.highLagThreshold(30000)
                                                                 .build();
 
     Assert.assertTrue(config.getEnableTaskAutoScaler());
@@ -199,10 +200,11 @@ public class CostBasedAutoScalerConfigTest
     Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
     Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
     Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+    Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleUp());
+    Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleDown());
     Assert.assertEquals(Duration.standardMinutes(5), 
config.getMinScaleUpDelay());
     Assert.assertEquals(Duration.standardMinutes(10), 
config.getMinScaleDownDelay());
     Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
-    Assert.assertEquals(30000, config.getHighLagThreshold());
   }
 
   @Test
@@ -243,7 +245,10 @@ public class CostBasedAutoScalerConfigTest
                                                                  .build();
     Assert.assertEquals(Duration.standardMinutes(5), 
bothSet.getMinScaleUpDelay());
     Assert.assertEquals(Duration.standardMinutes(20), 
bothSet.getMinScaleDownDelay());
-    CostBasedAutoScalerConfig roundTripped = 
mapper.readValue(mapper.writeValueAsString(bothSet), 
CostBasedAutoScalerConfig.class);
+    CostBasedAutoScalerConfig roundTripped = mapper.readValue(
+        mapper.writeValueAsString(bothSet),
+        CostBasedAutoScalerConfig.class
+    );
     Assert.assertEquals(bothSet, roundTripped);
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
deleted file mode 100644
index 122a449c5f7..00000000000
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link CostBasedAutoScaler#computeExtraPPTIncrease}.
- * <p>
- * The burst scaling uses a logarithmic formula:
- * {@code deltaTasks = K * ln(lagSeverity)}
- * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}.
- */
-public class CostBasedAutoScalerHighLagScalingTest
-{
-  private static final int LAG_THRESHOLD = 50_000;
-  private static final int PARTITION_COUNT = 48;
-  private static final int TASK_COUNT_MAX = 48;
-
-  /**
-   * Tests scaling behavior across different lag levels and task counts.
-   * <p>
-   * Expected behavior for 48 partitions with threshold=50K:
-   * <pre>
-   * | Current | Lag/Part | PPT reduction | Notes                              
     |
-   * 
|---------|----------|---------------|-----------------------------------------|
-   * | any     | <50K     | 0             | Below threshold                    
     |
-   * | any     | =50K     | 0             | ln(1) = 0                          
     |
-   * | 1       | 100K     | 40            | Significant boost for recovery     
     |
-   * | 1       | 200K     | 43            | Large boost                        
     |
-   * | 4       | 200K     | 6             | Moderate boost                     
     |
-   * | 12      | 200K     | 0             | Delta too small for PPT change     
     |
-   * | 24      | 200K     | 0             | Delta too small for PPT change     
     |
-   * </pre>
-   */
-  @Test
-  public void testComputeExtraPPTIncrease()
-  {
-    // Below threshold: no boost
-    Assert.assertEquals(
-        "Below threshold should not increase PPT",
-        0,
-        CostBasedAutoScaler.computeExtraPPTIncrease(
-            LAG_THRESHOLD,
-            PARTITION_COUNT * 40_000L,
-            PARTITION_COUNT,
-            4,
-            TASK_COUNT_MAX
-        )
-    );
-
-    // At threshold (lagSeverity=1, ln(1)=0): no boost
-    Assert.assertEquals(
-        "At threshold (ln(1)=0) should not increase PPT",
-        0,
-        CostBasedAutoScaler.computeExtraPPTIncrease(
-            LAG_THRESHOLD,
-            PARTITION_COUNT * 50_000L,
-            PARTITION_COUNT,
-            4,
-            TASK_COUNT_MAX
-        )
-    );
-
-    // C=1, 100K lag (2x threshold): significant boost for emergency recovery
-    int boost1_100k = CostBasedAutoScaler.computeExtraPPTIncrease(
-        LAG_THRESHOLD,
-        PARTITION_COUNT * 100_000L,
-        PARTITION_COUNT,
-        1,
-        TASK_COUNT_MAX
-    );
-    Assert.assertEquals("C=1, 100K lag boost", 40, boost1_100k);
-
-    // C=1, 200K lag (4x threshold): large boost
-    int boost1_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
-        LAG_THRESHOLD,
-        PARTITION_COUNT * 200_000L,
-        PARTITION_COUNT,
-        1,
-        TASK_COUNT_MAX
-    );
-    Assert.assertEquals("C=1, 200K lag boost", 43, boost1_200k);
-
-    // C=4, 200K lag: moderate boost (K decreases with sqrt(C))
-    int boost4_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
-        LAG_THRESHOLD,
-        PARTITION_COUNT * 200_000L,
-        PARTITION_COUNT,
-        4,
-        TASK_COUNT_MAX
-    );
-    Assert.assertEquals("C=4, 200K lag should yield a modest PPT increase", 6, 
boost4_200k);
-
-    // C=12, 200K lag: delta too small to change PPT
-    int boost12_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
-        LAG_THRESHOLD,
-        PARTITION_COUNT * 200_000L,
-        PARTITION_COUNT,
-        12,
-        TASK_COUNT_MAX
-    );
-    Assert.assertEquals("C=12, 200K lag should not change PPT", 0, 
boost12_200k);
-
-    // C=24, 200K lag: delta too small to change PPT
-    int boost24_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
-        LAG_THRESHOLD,
-        PARTITION_COUNT * 200_000L,
-        PARTITION_COUNT,
-        24,
-        TASK_COUNT_MAX
-    );
-    Assert.assertEquals("C=24, 200K lag should not change PPT", 0, 
boost24_200k);
-  }
-
-  @Test
-  public void testComputeExtraPPTIncreaseInvalidInputs()
-  {
-    Assert.assertEquals(
-        0,
-        CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 
0, 4, 48)
-    );
-    Assert.assertEquals(
-        0,
-        CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 
48, 0, 48)
-    );
-    Assert.assertEquals(
-        0,
-        CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 
48, 4, 0)
-    );
-    Assert.assertEquals(
-        0,
-        CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 
-1, 4, 48)
-    );
-  }
-}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 4b5666a85ee..56eacb43eab 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -77,94 +77,62 @@ public class CostBasedAutoScalerTest
   @Test
   public void testComputeValidTaskCounts()
   {
-    boolean useTaskCountBoundaries = true;
-    int highLagThreshold = 50_000;
-
     // For 100 partitions at 25 tasks (4 partitions/task), valid counts 
include 25 and 34
-    int[] validTaskCounts = computeValidTaskCounts(
+    final int[] validTaskCounts = computeValidTaskCounts(
         100,
         25,
-        0L,
         1,
-        100,
-        useTaskCountBoundaries,
-        highLagThreshold
+        100
     );
     Assert.assertTrue("Expected current task count to be included", 
contains(validTaskCounts, 25));
     Assert.assertTrue("Expected next scale-up option (34) to be included", 
contains(validTaskCounts, 34));
 
     // Single partition
-    int[] singlePartition = computeValidTaskCounts(
+    final int[] singlePartition = computeValidTaskCounts(
         1,
         1,
-        0L,
         1,
-        100,
-        useTaskCountBoundaries,
-        highLagThreshold
+        100
     );
     Assert.assertTrue("Single partition should yield at least one valid 
count", singlePartition.length > 0);
     Assert.assertTrue("Single partition should include task count 1", 
contains(singlePartition, 1));
 
     // Current exceeds partitions - should still yield valid, deduplicated 
options
-    int[] exceedsPartitions = computeValidTaskCounts(
+    final int[] exceedsPartitions = computeValidTaskCounts(
         2,
         5,
-        0L,
         1,
-        100,
-        useTaskCountBoundaries,
-        highLagThreshold
+        100
     );
     Assert.assertEquals(2, exceedsPartitions.length);
     Assert.assertTrue(contains(exceedsPartitions, 1));
     Assert.assertTrue(contains(exceedsPartitions, 2));
 
-    // Lag expansion: low lag should not include max, high lag should allow 
aggressive scaling
-    int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 1, 30, 
useTaskCountBoundaries, highLagThreshold);
-    Assert.assertFalse("Low lag should not include max task count", 
contains(lowLagCounts, 30));
-    Assert.assertTrue("Low lag should cap scale-up around 4 tasks", 
contains(lowLagCounts, 4));
-
-    // High lag uses logarithmic formula: K * ln(lagSeverity) where K = 
P/(6.4*sqrt(C))
-    // For P=30, C=3, lagPerPartition=500K, threshold=50K: lagSeverity=10, 
K=2.7, delta=6.2
-    // This allows controlled scaling to ~10-15 tasks (not all the way to max)
-    long highAggregateLag = 30L * 500_000L;
-    int[] highLagCounts = computeValidTaskCounts(
-        30,
-        3,
-        highAggregateLag,
-        1,
-        30,
-        useTaskCountBoundaries,
-        highLagThreshold
-    );
-    Assert.assertTrue("High lag should allow scaling to 10 tasks", 
contains(highLagCounts, 10));
-    Assert.assertTrue("High lag should allow scaling to 15 tasks", 
contains(highLagCounts, 15));
-    Assert.assertFalse("High lag should not jump straight to max (30) from 3", 
contains(highLagCounts, 30));
+    // Unbounded candidate generation includes both nearby and maximum task 
counts.
+    final int[] taskCounts = computeValidTaskCounts(30, 3, 1, 30);
+    Assert.assertTrue("Valid task counts should include max task count", 
contains(taskCounts, 30));
+    Assert.assertTrue("Valid task counts should include nearby scale-up task 
count", contains(taskCounts, 4));
 
     // Respects taskCountMax
-    int[] cappedCounts = computeValidTaskCounts(
+    final int[] cappedCounts = computeValidTaskCounts(
         30,
         4,
-        highAggregateLag,
         1,
-        3,
-        useTaskCountBoundaries,
-        highLagThreshold
+        3
     );
     Assert.assertTrue("Should include taskCountMax when within bounds", 
contains(cappedCounts, 3));
     Assert.assertFalse("Should not exceed taskCountMax", 
contains(cappedCounts, 4));
 
     // Respects taskCountMin - filters out values below the minimum
     // With partitionCount=100, currentTaskCount=10, the computed range 
includes values like 8, 9, 10, 12, 13
-    int[] minCappedCounts = computeValidTaskCounts(100, 10, 0L, 10, 100, 
useTaskCountBoundaries, highLagThreshold);
+    final int[] minCappedCounts = computeValidTaskCounts(100, 10, 10, 100);
     Assert.assertFalse("Should not include values below taskCountMin (8)", 
contains(minCappedCounts, 8));
     Assert.assertFalse("Should not include values below taskCountMin (9)", 
contains(minCappedCounts, 9));
     Assert.assertTrue("Should include values at taskCountMin (10)", 
contains(minCappedCounts, 10));
     Assert.assertTrue("Should include values above taskCountMin (12)", 
contains(minCappedCounts, 12));
 
     // Both bounds applied together
-    int[] bothBounds = computeValidTaskCounts(100, 10, 0L, 10, 12, 
useTaskCountBoundaries, highLagThreshold);
+    final int[] bothBounds = computeValidTaskCounts(100, 10, 10, 12);
     Assert.assertFalse("Should not include values below taskCountMin (8)", 
contains(bothBounds, 8));
     Assert.assertFalse("Should not include values below taskCountMin (9)", 
contains(bothBounds, 9));
     Assert.assertFalse("Should not include values above taskCountMax (13)", 
contains(bothBounds, 13));
@@ -182,7 +150,7 @@ public class CostBasedAutoScalerTest
     Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));
 
     // Negative pollIdleRatio (metric unavailable) should still allow scaling
-    int unavailableIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, -1.0));
+    final int unavailableIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, -1.0));
     MatcherAssert.assertThat(
         "Negative pollIdleRatio should not reject scaling",
         unavailableIdleResult,
@@ -190,16 +158,84 @@ public class CostBasedAutoScalerTest
     );
 
     // High idle (underutilized) - should scale down
-    int scaleDownResult = 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
+    final int scaleDownResult = 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
     Assert.assertTrue("Expected scale-down when idle ratio is high (>0.6)", 
scaleDownResult < 25);
 
     // Very high idle with high task count - should scale down
-    int highIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
+    final int highIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
     Assert.assertTrue("High idle should not suggest scale-up", highIdleResult 
<= 50);
 
-    // With low idle and balanced weights, the algorithm should not scale up 
aggressively
-    int lowIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
-    Assert.assertTrue("With low idle and balanced weights, avoid aggressive 
scale-up", lowIdleResult <= 25);
+    // With idle below ideal (0.1 < 0.25), U-shaped cost penalizes 
under-provisioning,
+    // driving a moderate scale-up toward the ideal operating point.
+    final int lowIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
+    Assert.assertTrue(
+        "Low idle below ideal should drive scale-up toward ideal operating 
point",
+        lowIdleResult > 25
+    );
+  }
+
+  @Test
+  public void testComputeOptimalTaskCountLimitsTaskCountJumps()
+  {
+    final CostBasedAutoScalerConfig boundedScaleUpConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                               
    .taskCountMax(100)
+                                                                               
    .taskCountMin(1)
+                                                                               
    .enableTaskAutoScaler(true)
+                                                                               
    .lagWeight(1.0)
+                                                                               
    .idleWeight(0.0)
+                                                                               
    .useTaskCountBoundariesOnScaleUp(true)
+                                                                               
    .build();
+    final CostBasedAutoScaler boundedScaleUpScaler = 
createAutoScaler(boundedScaleUpConfig);
+
+    Assert.assertEquals(
+        "Scale-up should only evaluate two task-count candidates above the 
current count",
+        13,
+        boundedScaleUpScaler.computeOptimalTaskCount(createMetrics(100_000.0, 
10, 100, 0.25))
+    );
+
+    final CostBasedAutoScalerConfig unboundedScaleUpConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                               
      .taskCountMax(100)
+                                                                               
      .taskCountMin(1)
+                                                                               
      .enableTaskAutoScaler(true)
+                                                                               
      .lagWeight(1.0)
+                                                                               
      .idleWeight(0.0)
+                                                                               
      .build();
+    final CostBasedAutoScaler unboundedScaleUpScaler = 
createAutoScaler(unboundedScaleUpConfig);
+    Assert.assertEquals(
+        "Without scale-up boundaries, lag-only optimization should jump to max 
task count",
+        100,
+        
unboundedScaleUpScaler.computeOptimalTaskCount(createMetrics(100_000.0, 10, 
100, 0.25))
+    );
+
+    final CostBasedAutoScalerConfig boundedScaleDownConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                               
      .taskCountMax(100)
+                                                                               
      .taskCountMin(1)
+                                                                               
      .enableTaskAutoScaler(true)
+                                                                               
      .lagWeight(0.0)
+                                                                               
      .idleWeight(1.0)
+                                                                               
      .useTaskCountBoundariesOnScaleDown(true)
+                                                                               
      .build();
+    final CostBasedAutoScaler boundedScaleDownScaler = 
createAutoScaler(boundedScaleDownConfig);
+
+    Assert.assertEquals(
+        "Scale-down should only evaluate two task-count candidates below the 
current count",
+        34,
+        boundedScaleDownScaler.computeOptimalTaskCount(createMetrics(0.0, 100, 
100, 0.9))
+    );
+
+    final CostBasedAutoScalerConfig unboundedScaleDownConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                               
        .taskCountMax(25)
+                                                                               
        .taskCountMin(1)
+                                                                               
        .enableTaskAutoScaler(true)
+                                                                               
        .lagWeight(0.0)
+                                                                               
        .idleWeight(1.0)
+                                                                               
        .build();
+    final CostBasedAutoScaler unboundedScaleDownScaler = 
createAutoScaler(unboundedScaleDownConfig);
+    Assert.assertEquals(
+        "Without scale-down boundaries, idle-only optimization may select a 
much lower task count",
+        1,
+        unboundedScaleDownScaler.computeOptimalTaskCount(createMetrics(0.0, 
100, 100, 0.9))
+    );
   }
 
   @Test
@@ -531,6 +567,21 @@ public class CostBasedAutoScalerTest
     );
   }
 
+  private CostBasedAutoScaler createAutoScaler(CostBasedAutoScalerConfig 
config)
+  {
+    final SupervisorSpec mockSupervisorSpec = 
Mockito.mock(SupervisorSpec.class);
+    final SeekableStreamSupervisor mockSupervisor = 
Mockito.mock(SeekableStreamSupervisor.class);
+    final ServiceEmitter mockEmitter = Mockito.mock(ServiceEmitter.class);
+    final SeekableStreamSupervisorIOConfig mockIoConfig = 
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+    when(mockSupervisorSpec.getId()).thenReturn("test-supervisor");
+    
when(mockSupervisorSpec.getDataSources()).thenReturn(List.of("test-datasource"));
+    when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
+    when(mockIoConfig.getStream()).thenReturn("test-stream");
+
+    return new CostBasedAutoScaler(mockSupervisor, config, mockSupervisorSpec, 
mockEmitter);
+  }
+
   private boolean contains(int[] array, int value)
   {
     for (int i : array) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index f96df30049e..8d5448065fa 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -195,10 +195,10 @@ public class WeightedCostFunctionTest
   }
 
   @Test
-  public void testIdleCostMonotonicWithTaskCount()
+  public void testIdleCostIsUShapedAroundIdealRatio()
   {
-    // Test that idle cost increases monotonically with task count.
-    // With fixed load, adding more tasks means each task has less work, so 
idle increases.
+    // U-shaped cost: minimum near IDEAL_IDLE_RATIO=0.25, higher on both sides.
+    // Current: 10 tasks with 25% idle (already at ideal).
     CostBasedAutoScalerConfig idleOnlyConfig = 
CostBasedAutoScalerConfig.builder()
                                                                         
.taskCountMax(100)
                                                                         
.taskCountMin(1)
@@ -207,18 +207,19 @@ public class WeightedCostFunctionTest
                                                                         
.idleWeight(1.0)
                                                                         
.build();
 
-    // Current: 10 tasks with 40% idle (60% busy)
-    CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
+    CostMetrics metrics = createMetrics(0.0, 10, 100, 0.25);
+
+    // At current (idle=0.25=ideal): baseline cost only, penalty=0
+    double costAtIdeal = costFunction.computeCost(metrics, 10, 
idleOnlyConfig).totalCost();
+
+    // Scale down → predicted idle falls below ideal → under-provisioning 
penalty
+    double costScaleDown = costFunction.computeCost(metrics, 5, 
idleOnlyConfig).totalCost();
 
-    double costAt5 = costFunction.computeCost(metrics, 5, 
idleOnlyConfig).totalCost();
-    double costAt10 = costFunction.computeCost(metrics, 10, 
idleOnlyConfig).totalCost();
-    double costAt15 = costFunction.computeCost(metrics, 15, 
idleOnlyConfig).totalCost();
-    double costAt20 = costFunction.computeCost(metrics, 20, 
idleOnlyConfig).totalCost();
+    // Scale up → predicted idle rises above ideal → over-provisioning penalty
+    double costScaleUp = costFunction.computeCost(metrics, 20, 
idleOnlyConfig).totalCost();
 
-    // Monotonically increasing idle cost as tasks increase
-    Assert.assertTrue("cost(5) < cost(10)", costAt5 < costAt10);
-    Assert.assertTrue("cost(10) < cost(15)", costAt10 < costAt15);
-    Assert.assertTrue("cost(15) < cost(20)", costAt15 < costAt20);
+    Assert.assertTrue("scale-down costs more than ideal", costScaleDown > 
costAtIdeal);
+    Assert.assertTrue("scale-up costs more than ideal", costScaleUp > 
costAtIdeal);
   }
 
   @Test
@@ -238,8 +239,10 @@ public class WeightedCostFunctionTest
     CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
     double costAt2 = costFunction.computeCost(metrics, 2, 
idleOnlyConfig).totalCost();
 
-    // idlenessCost = taskCount * 0.0 (clamped) = 0
-    Assert.assertEquals("Idle cost should be 0 when predicted idle is clamped 
to 0", 0.0, costAt2, 0.0001);
+    // idle = 0: max under-provisioning penalty; cost = taskCount * (IDEAL + 
UNDER_PENALTY)
+    double expectedAt2 = 2 * (WeightedCostFunction.IDEAL_IDLE_RATIO + 
WeightedCostFunction.UNDER_PROVISIONING_PENALTY);
+    Assert.assertEquals("Idle cost at clamped-to-zero idle ratio should 
reflect full under-provisioning penalty",
+                        expectedAt2, costAt2, 0.0001);
 
     // Extreme scale-up shouldn't exceed 1.0 for idle ratio
     // 10 tasks → 100 tasks with 10% idle
@@ -267,11 +270,13 @@ public class WeightedCostFunctionTest
     double cost10 = costFunction.computeCost(missingIdleData, 10, 
idleOnlyConfig).totalCost();
     double cost20 = costFunction.computeCost(missingIdleData, 20, 
idleOnlyConfig).totalCost();
 
-    // With missing data, predicted idle = 0.5 for all task counts
-    // idlenessCost at 10 = 10 * 0.5 = 5
-    // idlenessCost at 20 = 20 * 0.5 = 10
-    Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 * 0.5, 
cost10, 0.0001);
-    Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 0.5, 
cost20, 0.0001);
+    // With missing data, predicted idle = 0.5 for all task counts regardless 
of proposed count.
+    // U-shaped cost at idle=0.5: idle > IDEAL(0.25), 
norm=(0.5-0.25)/0.75=1/3, penalty=1*(1/3)^2=1/9
+    double expectedCostPerTask = WeightedCostFunction.IDEAL_IDLE_RATIO
+                                 + 
WeightedCostFunction.OVER_PROVISIONING_PENALTY * (1.0 / 3.0) * (1.0 / 3.0);
+    Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 * 
expectedCostPerTask, cost10, 0.0001);
+    Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 
expectedCostPerTask, cost20, 0.0001);
+    Assert.assertEquals("Cost scales linearly with task count at fixed idle 
ratio", 2 * cost10, cost20, 0.0001);
   }
 
   @Test
@@ -338,6 +343,51 @@ public class WeightedCostFunctionTest
   }
 
 
+  @Test
+  public void testUShapedIdleCostFormula()
+  {
+    int n = 10;
+
+    // At ideal ratio: penalty = 0, cost = n * IDEAL_IDLE_RATIO
+    Assert.assertEquals(
+        n * WeightedCostFunction.IDEAL_IDLE_RATIO,
+        costFunction.uShapedIdleCost(WeightedCostFunction.IDEAL_IDLE_RATIO, n),
+        1e-9
+    );
+
+    // At idle = 0 (fully under-provisioned): norm = 1, penalty = 
UNDER_PROVISIONING_PENALTY
+    Assert.assertEquals(
+        n * (WeightedCostFunction.IDEAL_IDLE_RATIO + 
WeightedCostFunction.UNDER_PROVISIONING_PENALTY),
+        costFunction.uShapedIdleCost(0.0, n),
+        1e-9
+    );
+
+    // At idle = 1 (fully over-provisioned): norm = 1, penalty = 
OVER_PROVISIONING_PENALTY
+    Assert.assertEquals(
+        n * (WeightedCostFunction.IDEAL_IDLE_RATIO + 
WeightedCostFunction.OVER_PROVISIONING_PENALTY),
+        costFunction.uShapedIdleCost(1.0, n),
+        1e-9
+    );
+
+    // Both extremes exceed the ideal cost
+    double idealCost = 
costFunction.uShapedIdleCost(WeightedCostFunction.IDEAL_IDLE_RATIO, n);
+    Assert.assertTrue("idle=0 costs more than ideal", 
costFunction.uShapedIdleCost(0.0, n) > idealCost);
+    Assert.assertTrue("idle=1 costs more than ideal", 
costFunction.uShapedIdleCost(1.0, n) > idealCost);
+
+    // Under-provisioning is penalized more than over-provisioning (UNDER > 
OVER)
+    Assert.assertTrue(
+        "under-provisioning penalty exceeds over-provisioning penalty",
+        costFunction.uShapedIdleCost(0.0, n) > 
costFunction.uShapedIdleCost(1.0, n)
+    );
+
+    // Cost scales linearly with task count at any fixed idle ratio
+    Assert.assertEquals(
+        2 * costFunction.uShapedIdleCost(0.5, n),
+        costFunction.uShapedIdleCost(0.5, 2 * n),
+        1e-9
+    );
+  }
+
   private CostMetrics createMetrics(
       double avgPartitionLag,
       int currentTaskCount,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to