This is an automated email from the ASF dual-hosted git repository.
Fly-Style pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 49de72ed768 feat: CBA idle re-modeling and separate scale up/down
task-count boundaries (#19378)
49de72ed768 is described below
commit 49de72ed7680974a5e1b677d76b217258f7120dd
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Apr 29 10:14:36 2026 +0300
feat: CBA idle re-modeling and separate scale up/down task-count boundaries
(#19378)
---
docs/ingestion/supervisor.md | 39 ++---
.../CostBasedAutoScalerIntegrationTest.java | 12 +-
.../supervisor/autoscaler/CostBasedAutoScaler.java | 160 ++++++++-------------
.../autoscaler/CostBasedAutoScalerConfig.java | 108 +++++++++-----
.../supervisor/autoscaler/CostResult.java | 10 +-
.../autoscaler/WeightedCostFunction.java | 138 ++++++++++++------
.../autoscaler/CostBasedAutoScalerConfigTest.java | 19 ++-
.../CostBasedAutoScalerHighLagScalingTest.java | 154 --------------------
.../autoscaler/CostBasedAutoScalerTest.java | 155 +++++++++++++-------
.../autoscaler/WeightedCostFunctionTest.java | 90 +++++++++---
10 files changed, 441 insertions(+), 444 deletions(-)
diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index ba92ed66b53..4e59f0fa542 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -193,29 +193,30 @@ The following example shows a supervisor spec with
`lagBased` autoscaler:
```
</details>
-**2. Cost-based autoscaler strategy (experimental)**
+**2. Cost-based autoscaler strategy**
-An autoscaler which computes the required supervisor task count via cost
function based on ingestion lag and poll-to-idle ratio.
-Task counts are selected from a bounded range derived from the current
partitions-per-task ratio,
-not strictly from factors/divisors of the partition count. This bounded
partitions-per-task window enables gradual scaling while
-voiding large jumps and still allowing non-divisor task counts when needed.
+The cost-based autoscaler picks the number of ingestion tasks that minimizes a
combined cost score. The score has two components:
-**It is experimental and the implementation details as well as cost function
parameters are subject to change.**
+- **Lag cost** — how long it would take to drain the current backlog at the
observed processing rate. More tasks reduce this cost.
+- **Idle cost** — how far the predicted idle ratio is from the target of ~25%.
Tasks that are too busy (under-provisioned) or too idle (over-provisioned) both
drive the score up.
+The sweet spot is roughly 25% idle, giving headroom to absorb traffic spikes
without wasting resources.
+
+At every evaluation interval, Druid computes the score for each candidate task
count and picks the one with the lowest total cost.
Note: Kinesis is not supported yet, support is in progress.
The following table outlines the configuration properties related to the
`costBased` autoscaler strategy:
-| Property|Description|Required|Default|
-|---------|-----------|--------|-------|
-|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale
action is triggered. | No | 600000 |
-|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25 |
-|`idleWeight`|The weight of extracted poll idle value in cost function. | No |
0.75 |
-|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when
selecting task counts.|No| `false` |
-|`highLagThreshold`|Average partition lag threshold that triggers burst
scale-up when set to a value greater than `0`. Set to a negative value to
disable burst scale-up.|No|-1|
-|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before
the next scale-up is allowed, specified as an ISO-8601 duration string.|No||
-|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action
before the next scale-down is allowed, specified as an ISO-8601 duration
string.|No|`PT30M`|
-|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is
limited to periods during task rollovers only.|No|`false`|
+| Property | Description | Required | Default |
+|----------|-------------|----------|---------------------------|
+|`scaleActionPeriodMillis`|How often, in milliseconds, Druid evaluates whether
to scale.|No| `600000` (10 min) |
+|`lagWeight`|How much weight to give the lag cost relative to the idle cost.
Higher values make the autoscaler more aggressive about adding tasks to drain
backlog.|No| `0.4` |
+|`idleWeight`|How much weight to give the idle cost relative to the lag cost.
Higher values make the autoscaler more aggressive about removing
over-provisioned tasks.|No| `0.6` |
+|`useTaskCountBoundariesOnScaleUp`|Limits scale-up to a small step relative to
the current task count, preventing large jumps. Disable to allow the autoscaler
to jump directly to any task count.|No| `false` |
+|`useTaskCountBoundariesOnScaleDown`|Limits scale-down to a small step
relative to the current task count, preventing large drops. Disable to allow
the autoscaler to drop directly to any task count.|No| `true`
|
+|`minScaleUpDelay`|Minimum cooldown after a scale-up before the next scale-up
is allowed. Specified as an ISO-8601 duration.|No| `scaleActionPeriodMillis` |
+|`minScaleDownDelay`|Minimum cooldown after a scale-down before the next
scale-down is allowed. Specified as an ISO-8601 duration.|No| `PT30M`
|
+|`scaleDownDuringTaskRolloverOnly`|If `true`, scale-down actions are deferred
until the next task rollover. This avoids disrupting in-progress ingestion.|No|
`false` |
The following example shows a supervisor spec with `costBased` autoscaler:
@@ -231,10 +232,10 @@ The following example shows a supervisor spec with
`costBased` autoscaler:
"autoScalerStrategy": "costBased",
"taskCountMin": 1,
"taskCountMax": 10,
+ "lagWeight": 0.4,
+ "idleWeight": 0.6,
"minScaleUpDelay": "PT10M",
- "minScaleDownDelay": "PT30M",
- "lagWeight": 0.1,
- "idleWeight": 0.9
+ "minScaleDownDelay": "PT30M"
}
}
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
index f57bd113873..7e63c3f4f25 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
@@ -143,7 +143,7 @@ public class CostBasedAutoScalerIntegrationTest extends
StreamIndexTestBase
}
});
- // These values were carefully handpicked to allow that test to pass in a
stable manner.
+ // These values were carefully handpicked to allow that test to pass
stably.
final CostBasedAutoScalerConfig autoScalerConfig =
CostBasedAutoScalerConfig
.builder()
.enableTaskAutoScaler(true)
@@ -152,8 +152,8 @@ public class CostBasedAutoScalerIntegrationTest extends
StreamIndexTestBase
.taskCountStart(lowInitialTaskCount)
.scaleActionPeriodMillis(500)
.minTriggerScaleActionFrequencyMillis(1000)
- .lagWeight(0.2)
- .idleWeight(0.8)
+ .lagWeight(0.8)
+ .idleWeight(0.2)
.build();
final KafkaSupervisorSpec kafkaSupervisorSpec =
createKafkaSupervisorWithAutoScaler(
@@ -192,11 +192,11 @@ public class CostBasedAutoScalerIntegrationTest extends
StreamIndexTestBase
final CostBasedAutoScalerConfig autoScalerConfig =
CostBasedAutoScalerConfig
.builder()
+ .enableTaskAutoScaler(true)
.taskCountMin(1)
.taskCountMax(4)
- .lagWeight(1.0)
- .idleWeight(1.0)
- .enableTaskAutoScaler(true)
+ .lagWeight(0.5)
+ .idleWeight(0.5)
.minTriggerScaleActionFrequencyMillis(10L)
.scaleActionPeriodMillis(10L)
.minScaleDownDelay(Duration.standardSeconds(1))
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index fa7db4f6928..9d50266cd73 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -39,21 +39,23 @@ import
org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Cost-based auto-scaler for seekable stream supervisors.
- * Uses a weighted cost function combining lag recovery time (seconds) and
idleness cost (seconds)
+ * Uses a weighted cost function combining lag recovery time and idle-ratio
cost
* to determine optimal task counts.
* <p>
- * Candidate task counts are derived by scanning a bounded window of
partitions-per-task (PPT) values
- * around the current PPT, then converting those to task counts. This allows
non-divisor task counts
- * while keeping changes gradual (no large jumps).
+ * Candidate task counts are derived from possible partitions-per-task ratios,
then converted
+ * to task counts. When configured, scale-up and scale-down can independently
limit the evaluated
+ * candidates to a small window around the current task count to avoid large
jumps.
* <p>
- * Scale-up and scale-down are both evaluated proactively.
- * Future versions may perform scale-down on task rollover only.
+ * Scale-up is applied during regular scale-action checks. Scale-down is
applied during regular
+ * checks unless {@link
CostBasedAutoScalerConfig#isScaleDownOnTaskRolloverOnly()} defers it
+ * to task rollover.
*/
public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
{
@@ -64,21 +66,17 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
public static final String OPTIMAL_TASK_COUNT_METRIC =
"task/autoScaler/costBased/optimalTaskCount";
public static final String INVALID_METRICS_COUNT =
"task/autoScaler/costBased/invalidMetrics";
- static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
- static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK =
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
-
/**
- * If average partition lag crosses this value and the processing rate is
- * still zero, scaling actions are skipped and an alert is raised.
+ * Maximum number of candidate task counts to evaluate above or below the
current task count
+ * when scale-up or scale-down boundaries are enabled.
*/
- static final int MAX_IDLENESS_PARTITION_LAG = 10_000;
+ static final int BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK = 2;
/**
- * Divisor for partition count in the K formula: K = (partitionCount /
K_PARTITION_DIVISOR) / sqrt(currentTaskCount).
- * This controls how aggressive the scaling is relative to partition count.
- * That value was chosen by carefully analyzing the math model behind the
implementation.
+ * If the average partition lag crosses this value and the processing rate is
+ * still zero, scaling actions are skipped and an alert is raised.
*/
- static final double K_PARTITION_DIVISOR = 6.4;
+ static final int MAX_IDLENESS_PARTITION_LAG = 10_000;
private final String supervisorId;
private final SeekableStreamSupervisor supervisor;
@@ -176,7 +174,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
// Take the current task count but clamp it to the configured boundaries
if it is outside the boundaries.
// There might be a configuration instance with a handwritten taskCount
that is outside the boundaries.
final boolean isTaskCountOutOfBounds = currentTaskCount <
config.getTaskCountMin()
- || currentTaskCount >
config.getTaskCountMax();
+ || currentTaskCount >
config.getTaskCountMax();
if (isTaskCountOutOfBounds) {
currentTaskCount = Math.min(config.getTaskCountMax(),
Math.max(config.getTaskCountMin(), currentTaskCount));
}
@@ -188,16 +186,28 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
// regardless of optimal task count, to get back to a safe state.
if (isTaskCountOutOfBounds) {
taskCount = currentTaskCount;
- log.info("Task count for supervisor[%s] was out of bounds [%d,%d],
urgently scaling from [%d] to [%d].",
- supervisorId, config.getTaskCountMin(),
config.getTaskCountMax(), currentTaskCount, currentTaskCount);
+ log.info(
+ "Task count for supervisor[%s] was out of bounds [%d,%d], urgently
scaling from [%d] to [%d].",
+ supervisorId, config.getTaskCountMin(), config.getTaskCountMax(),
currentTaskCount, currentTaskCount
+ );
} else if (optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
- log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
up).", supervisorId, currentTaskCount, taskCount);
+ log.info(
+ "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
up).",
+ supervisorId,
+ currentTaskCount,
+ taskCount
+ );
} else if (!config.isScaleDownOnTaskRolloverOnly()
&& optimalTaskCount < currentTaskCount
&& optimalTaskCount > 0) {
taskCount = optimalTaskCount;
- log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
down).", supervisorId, currentTaskCount, taskCount);
+ log.info(
+ "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale
down).",
+ supervisorId,
+ currentTaskCount,
+ taskCount
+ );
} else {
taskCount = -1;
log.debug("No scaling required for supervisor[%s]", supervisorId);
@@ -237,7 +247,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
* <li>Current task count already optimal</li>
* </ul>
*
- * @return optimal task count for scale-up, or -1 if no scaling action needed
+ * @return optimal task count, or -1 if no scaling action is needed
*/
int computeOptimalTaskCount(CostMetrics metrics)
{
@@ -261,11 +271,8 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(
partitionCount,
currentTaskCount,
- (long) metrics.getAggregateLag(),
config.getTaskCountMin(),
- config.getTaskCountMax(),
- config.shouldUseTaskCountBoundaries(),
- config.getHighLagThreshold()
+ config.getTaskCountMax()
);
if (validTaskCounts.length == 0) {
@@ -290,9 +297,28 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
config.getIdleWeight()
);
- // Find the task count which reduces cost
+ // Find the evaluated task count with the lowest cost.
final CostResult[] costResults = new CostResult[validTaskCounts.length];
- for (int i = 0; i < validTaskCounts.length; ++i) {
+ Arrays.fill(costResults, CostResult.INFINITE_COST);
+
+ int startIndex = 0;
+ int endIndex = validTaskCounts.length - 1;
+
+ if (config.shouldUseTaskCountBoundariesOnScaleUp()) {
+ int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts,
currentTaskCount);
+ endIndex = currentTaskCountIndex >= 0
+ ? Math.min(currentTaskCountIndex +
BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK, endIndex)
+ : endIndex;
+ }
+
+ if (config.shouldUseTaskCountBoundariesOnScaleDown()) {
+ int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts,
currentTaskCount);
+ startIndex = currentTaskCountIndex >= 0
+ ? Math.max(currentTaskCountIndex -
BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK, startIndex)
+ : startIndex;
+ }
+
+ for (int i = startIndex; i <= endIndex; ++i) {
final int taskCount = validTaskCounts[i];
CostResult costResult = costFunction.computeCost(metrics, taskCount,
config);
double cost = costResult.totalCost();
@@ -317,24 +343,22 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
);
}
- // Scale up is performed eagerly.
+ // Scale-up is applied eagerly; scale-down may be deferred by
computeTaskCountForScaleAction().
return optimalTaskCount;
}
/**
- * Generates valid task counts based on partitions-per-task ratios.
+ * Generates valid task counts by converting every possible
partitions-per-task ratio
+ * into a task count and filtering by configured min/max task count bounds.
*
- * @return sorted list of valid task counts within bounds
+ * @return list of valid task counts within bounds
*/
@SuppressWarnings({"ReassignedVariable"})
static int[] computeValidTaskCounts(
int partitionCount,
int currentTaskCount,
- double aggregateLag,
int taskCountMin,
- int taskCountMax,
- boolean isTaskCountBoundariesEnabled,
- int highLagThreshold
+ int taskCountMax
)
{
if (partitionCount <= 0 || currentTaskCount <= 0) {
@@ -342,32 +366,10 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
}
IntSet result = new IntArraySet();
- final int currentPartitionsPerTask = partitionCount / currentTaskCount;
// Minimum partitions per task correspond to the maximum number of tasks
(scale up) and vice versa.
int minPartitionsPerTask = Math.min(1, partitionCount / taskCountMax);
int maxPartitionsPerTask = Math.max(partitionCount, partitionCount /
taskCountMin);
-
- if (isTaskCountBoundariesEnabled) {
- maxPartitionsPerTask = Math.min(
- partitionCount,
- currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
- );
-
- int extraIncrease = 0;
- if (highLagThreshold > 0) {
- extraIncrease = computeExtraPPTIncrease(
- highLagThreshold,
- aggregateLag,
- partitionCount,
- currentTaskCount,
- taskCountMax
- );
- }
- int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK +
extraIncrease;
- minPartitionsPerTask = Math.max(minPartitionsPerTask,
currentPartitionsPerTask - effectiveMaxIncrease);
- }
-
for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >=
minPartitionsPerTask
&& partitionsPerTask !=
0; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) /
partitionsPerTask;
@@ -378,50 +380,6 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
return result.toIntArray();
}
- /**
- * Computes extra allowed increase in partitions-per-task in scenarios when
the average per-partition lag
- * is above the configured threshold.
- * <p>
- * This uses a logarithmic formula for consistent absolute growth:
- * {@code deltaTasks = K * ln(lagSeverity)}
- * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}
- * <p>
- * This ensures that small taskCount's get a massive relative boost,
- * while large taskCount's receive more measured, stable increases.
- */
- static int computeExtraPPTIncrease(
- double lagThreshold,
- double aggregateLag,
- int partitionCount,
- int currentTaskCount,
- int taskCountMax
- )
- {
- if (partitionCount <= 0 || taskCountMax <= 0 || currentTaskCount <= 0) {
- return 0;
- }
-
- final double lagPerPartition = aggregateLag / partitionCount;
- if (lagPerPartition < lagThreshold) {
- return 0;
- }
-
- final double lagSeverity = lagPerPartition / lagThreshold;
-
- // Logarithmic growth: ln(lagSeverity) is positive when lagSeverity > 1
- // First multoplier decreases with sqrt(currentTaskCount): aggressive when
small, conservative when large
- final double deltaTasks = (partitionCount / K_PARTITION_DIVISOR) /
Math.sqrt(currentTaskCount) * Math.log(
- lagSeverity);
-
- final double targetTaskCount = Math.min(taskCountMax, (double)
currentTaskCount + deltaTasks);
-
- // Compute precise PPT reduction to avoid early integer truncation
artifacts
- final double currentPPT = (double) partitionCount / currentTaskCount;
- final double targetPPT = (double) partitionCount / targetTaskCount;
-
- return Math.max(0, (int) Math.floor(currentPPT - targetPPT));
- }
-
/**
* Extracts the average poll-idle-ratio metric from task stats.
* This metric indicates how much time the consumer spends idle waiting for
data.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
index b19a3e2cbbe..68f9f5a5ad7 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
@@ -28,6 +28,7 @@ import
org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.joda.time.Duration;
@@ -37,16 +38,18 @@ import java.util.Objects;
/**
* Configuration for cost-based auto-scaling of seekable stream supervisor
tasks.
* Uses a cost function combining lag and idle time metrics to determine
optimal task counts.
- * Task counts are selected from a bounded range derived from the current
partitions-per-task (PPT)
- * ratio, not strictly from factors/divisors of the partition count. This
bounded PPT window enables
- * gradual scaling while avoiding large jumps and still allowing non-divisor
task counts when needed.
+ * Candidate task counts are derived from possible partitions-per-task ratios
and are not limited
+ * to factors/divisors of the partition count. Optional scale-up and
scale-down boundaries control
+ * how much of that candidate set is evaluated around the current task count.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class CostBasedAutoScalerConfig implements AutoScalerConfig
{
+ private static final EmittingLogger LOG = new
EmittingLogger(CostBasedAutoScalerConfig.class);
+
static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10
minutes
- static final double DEFAULT_LAG_WEIGHT = 0.25;
- static final double DEFAULT_IDLE_WEIGHT = 0.75;
+ static final double DEFAULT_LAG_WEIGHT = 0.4;
+ static final double DEFAULT_IDLE_WEIGHT = 0.6;
static final Duration DEFAULT_MIN_SCALE_DELAY =
Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3);
private final boolean enableTaskAutoScaler;
@@ -59,12 +62,18 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private final double lagWeight;
private final double idleWeight;
- private final boolean useTaskCountBoundaries;
- private final int highLagThreshold;
+ private final boolean useTaskCountBoundariesOnScaleUp;
+ private final boolean useTaskCountBoundariesOnScaleDown;
private final Duration minScaleUpDelay;
private final Duration minScaleDownDelay;
private final boolean scaleDownDuringTaskRolloverOnly;
+ /**
+ * Creates a new CostBasedAutoScalerConfig instance.
+ * <p>
+ * Note: useTaskCountBoundaries and highLagThreshold are kept for backward
compatibility,
+ * but effectively they are removed.
+ */
@JsonCreator
public CostBasedAutoScalerConfig(
@JsonProperty("taskCountMax") Integer taskCountMax,
@@ -78,6 +87,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
@Nullable @JsonProperty("idleWeight") Double idleWeight,
@Nullable @JsonProperty("useTaskCountBoundaries") Boolean
useTaskCountBoundaries,
@Nullable @JsonProperty("highLagThreshold") Integer highLagThreshold,
+ @Nullable @JsonProperty("useTaskCountBoundariesOnScaleUp") Boolean
useTaskCountBoundariesOnScaleUp,
+ @Nullable @JsonProperty("useTaskCountBoundariesOnScaleDown") Boolean
useTaskCountBoundariesOnScaleDown,
@Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay,
@Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay,
@Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean
scaleDownDuringTaskRolloverOnly
@@ -97,13 +108,24 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
// Cost function weights with defaults
this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT);
this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT);
- this.useTaskCountBoundaries =
Configs.valueOrDefault(useTaskCountBoundaries, false);
- this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1);
- this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay,
Duration.millis(this.minTriggerScaleActionFrequencyMillis));
+ this.useTaskCountBoundariesOnScaleUp =
Configs.valueOrDefault(useTaskCountBoundariesOnScaleUp, false);
+ this.useTaskCountBoundariesOnScaleDown =
Configs.valueOrDefault(useTaskCountBoundariesOnScaleDown, true);
+ this.minScaleUpDelay = Configs.valueOrDefault(
+ minScaleUpDelay,
+ Duration.millis(this.minTriggerScaleActionFrequencyMillis)
+ );
this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay,
DEFAULT_MIN_SCALE_DELAY);
this.scaleDownDuringTaskRolloverOnly =
Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false);
if (this.enableTaskAutoScaler) {
+ if (useTaskCountBoundaries != null) {
+ LOG.warn("useTaskCountBoundaries is removed, "
+ + "use useTaskCountBoundariesOnScaleUp and
useTaskCountBoundariesOnScaleDown instead");
+ }
+ if (highLagThreshold != null) {
+ LOG.warn("highLagThreshold is removed, the autoscaler behavior is good
enough just with cost function");
+ }
+
Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when
enableTaskAutoScaler is true");
Preconditions.checkNotNull(taskCountMin, "taskCountMin is required when
enableTaskAutoScaler is true");
Preconditions.checkArgument(taskCountMax >= taskCountMin, "taskCountMax
must be >= taskCountMin");
@@ -127,8 +149,14 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0");
Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >=
0");
- Preconditions.checkArgument(this.minScaleUpDelay.getMillis() >= 0,
"minScaleUpDelay must be a duration >= 0 millis");
- Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0,
"minScaleDownDelay must be a duration >= 0 millis");
+ Preconditions.checkArgument(
+ this.minScaleUpDelay.getMillis() >= 0,
+ "minScaleUpDelay must be a duration >= 0 millis"
+ );
+ Preconditions.checkArgument(
+ this.minScaleDownDelay.getMillis() >= 0,
+ "minScaleDownDelay must be a duration >= 0 millis"
+ );
}
/**
@@ -203,21 +231,25 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
}
/**
- * Enables or disables the use of task count boundaries derived from the
current partitions-per-task (PPT) ratio.
+ * Enables or disables the task-count evaluation window when considering
scale-up candidates.
+ * If enabled, the optimizer only evaluates a small number of candidate task
counts above the current count,
+ * which prevents large scale-up jumps.
*/
- @JsonProperty("useTaskCountBoundaries")
- public boolean shouldUseTaskCountBoundaries()
+ @JsonProperty("useTaskCountBoundariesOnScaleUp")
+ public boolean shouldUseTaskCountBoundariesOnScaleUp()
{
- return useTaskCountBoundaries;
+ return useTaskCountBoundariesOnScaleUp;
}
/**
- * Per-partition lag threshold allowing to activate a burst scaleup to
eliminate high lag.
+ * Enables or disables the task-count evaluation window when considering
scale-down candidates.
+ * If enabled, the optimizer only evaluates a small number of candidate task
counts below the current count,
+ * which prevents large scale-down drops.
*/
- @JsonProperty("highLagThreshold")
- public int getHighLagThreshold()
+ @JsonProperty("useTaskCountBoundariesOnScaleDown")
+ public boolean shouldUseTaskCountBoundariesOnScaleDown()
{
- return highLagThreshold;
+ return useTaskCountBoundariesOnScaleDown;
}
/**
@@ -241,8 +273,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
}
/**
- * Indicates whether task scaling down is limited to periods during task
rollovers only.
- * If set to {@code false}, allows scaling down during normal task run time.
+ * Indicates whether scale-down actions are deferred to task rollover.
+ * If set to {@code false}, scale-down can happen during regular
scale-action checks.
*/
@JsonProperty("scaleDownDuringTaskRolloverOnly")
public boolean isScaleDownOnTaskRolloverOnly()
@@ -275,13 +307,13 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
&& scaleActionPeriodMillis == that.scaleActionPeriodMillis
&& Double.compare(that.lagWeight, lagWeight) == 0
&& Double.compare(that.idleWeight, idleWeight) == 0
- && useTaskCountBoundaries == that.useTaskCountBoundaries
+ && useTaskCountBoundariesOnScaleUp ==
that.useTaskCountBoundariesOnScaleUp
+ && useTaskCountBoundariesOnScaleDown ==
that.useTaskCountBoundariesOnScaleDown
&& Objects.equals(minScaleUpDelay, that.minScaleUpDelay)
&& Objects.equals(minScaleDownDelay, that.minScaleDownDelay)
&& scaleDownDuringTaskRolloverOnly ==
that.scaleDownDuringTaskRolloverOnly
&& Objects.equals(taskCountStart, that.taskCountStart)
- && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio)
- && highLagThreshold == that.highLagThreshold;
+ && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio);
}
@Override
@@ -297,8 +329,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
scaleActionPeriodMillis,
lagWeight,
idleWeight,
- useTaskCountBoundaries,
- highLagThreshold,
+ useTaskCountBoundariesOnScaleUp,
+ useTaskCountBoundariesOnScaleDown,
minScaleUpDelay,
minScaleDownDelay,
scaleDownDuringTaskRolloverOnly
@@ -318,8 +350,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
", lagWeight=" + lagWeight +
", idleWeight=" + idleWeight +
- ", useTaskCountBoundaries=" + useTaskCountBoundaries +
- ", highLagThreshold=" + highLagThreshold +
+ ", useTaskCountBoundariesOnScaleUp=" +
useTaskCountBoundariesOnScaleUp +
+ ", useTaskCountBoundariesOnScaleDown=" +
useTaskCountBoundariesOnScaleDown +
", minScaleUpDelay=" + minScaleUpDelay +
", minScaleDownDelay=" + minScaleDownDelay +
", scaleDownDuringTaskRolloverOnly=" +
scaleDownDuringTaskRolloverOnly +
@@ -341,8 +373,8 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
private Long scaleActionPeriodMillis;
private Double lagWeight;
private Double idleWeight;
- private Boolean useTaskCountBoundaries;
- private Integer highLagThreshold;
+ private Boolean useTaskCountBoundariesOnScaleUp;
+ private Boolean useTaskCountBoundariesOnScaleDown;
private Duration minScaleUpDelay;
private Duration minScaleDownDelay;
private Boolean scaleDownDuringTaskRolloverOnly;
@@ -423,15 +455,15 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
return this;
}
- public Builder useTaskCountBoundaries(boolean useTaskCountBoundaries)
+ public Builder useTaskCountBoundariesOnScaleUp(boolean
useTaskCountBoundariesOnScaleUp)
{
- this.useTaskCountBoundaries = useTaskCountBoundaries;
+ this.useTaskCountBoundariesOnScaleUp = useTaskCountBoundariesOnScaleUp;
return this;
}
- public Builder highLagThreshold(int highLagThreshold)
+ public Builder useTaskCountBoundariesOnScaleDown(boolean
useTaskCountBoundariesOnScaleDown)
{
- this.highLagThreshold = highLagThreshold;
+ this.useTaskCountBoundariesOnScaleDown =
useTaskCountBoundariesOnScaleDown;
return this;
}
@@ -447,8 +479,10 @@ public class CostBasedAutoScalerConfig implements
AutoScalerConfig
scaleActionPeriodMillis,
lagWeight,
idleWeight,
- useTaskCountBoundaries,
- highLagThreshold,
+ null,
+ null,
+ useTaskCountBoundariesOnScaleUp,
+ useTaskCountBoundariesOnScaleDown,
minScaleUpDelay,
minScaleDownDelay,
scaleDownDuringTaskRolloverOnly
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
index 1ad946ff517..a3925e282ef 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java
@@ -21,10 +21,16 @@ package
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
/**
* Holds the result of a cost computation from {@link
WeightedCostFunction#computeCost}.
- * All costs are measured in seconds.
+ * Lag cost is based on recovery time; idle cost is based on the weighted
idle-ratio penalty.
*/
public class CostResult
{
+ static final CostResult INFINITE_COST = new CostResult(
+ Double.POSITIVE_INFINITY,
+ Double.POSITIVE_INFINITY,
+ Double.POSITIVE_INFINITY
+ );
+
private final double totalCost;
private final double lagCost;
private final double idleCost;
@@ -32,7 +38,7 @@ public class CostResult
/**
* @param totalCost the weighted sum of lagCost and idleCost
* @param lagCost the weighted cost representing expected time (seconds)
to recover current lag
- * @param idleCost the weighted cost representing total compute time
(seconds) wasted being idle per task duration
+ * @param idleCost the weighted cost representing the predicted idle-ratio
penalty
*/
public CostResult(double totalCost, double lagCost, double idleCost)
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
index 8fe4f9d8d06..fa60c7a8e8a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
@@ -24,17 +24,23 @@ import org.apache.druid.java.util.common.logger.Logger;
/**
* Weighted cost function using compute time as the core metric.
- * Costs represent actual time in seconds, making them intuitive and
debuggable.
- * Uses linear scaling without mode inversions for predictable behavior.
+ * Lag cost is based on recovery time in seconds; idle cost is a penalty
derived from
+ * the predicted idle ratio.
+ *
+ * <p>Idle cost uses a U-shaped penalty with minimum at {@link
#IDEAL_IDLE_RATIO}.
+ * This penalizes both under-provisioning (low idle, no safety margin, lag
risk) and
+ * over-provisioning (high idle, wasted capacity), with asymmetric severity
controlled by
+ * {@link #UNDER_PROVISIONING_PENALTY} and {@link #OVER_PROVISIONING_PENALTY}.
*/
public class WeightedCostFunction
{
private static final Logger log = new Logger(WeightedCostFunction.class);
+
/**
* Multiplier for a lag amplification factor; it was carefully chosen
* during extensive testing as the most balanced multiplier for high-lag
recovery.
*/
- static final double LAG_AMPLIFICATION_MULTIPLIER = 0.05;
+ static final double LAG_AMPLIFICATION_MULTIPLIER = 0.4;
/**
* Minimum rate of processing for any task in records per second. This is
used
@@ -43,17 +49,34 @@ public class WeightedCostFunction
*/
static final double MIN_PROCESSING_RATE = 1_000;
+ /**
+ * Target idle ratio representing the optimal operating point for the
U-shaped idle cost.
+ * At this ratio the idle cost is at its minimum; both lower (risk) and
higher (waste) are penalized.
+ */
+ static final double IDEAL_IDLE_RATIO = 0.25;
+
+ /**
+ * Penalty magnitude applied when idle ratio is 0 (no safety margin).
+ * Controls the steepness of the U-shape on the under-provisioning side.
+ */
+ static final double UNDER_PROVISIONING_PENALTY = 2.0;
+
+ /**
+ * Penalty magnitude applied when idle ratio is 1 (fully wasted capacity).
+ * Controls the steepness of the U-shape on the over-provisioning side.
+ */
+ static final double OVER_PROVISIONING_PENALTY = 1.0;
+
/**
* Computes cost for a given task count using compute time metrics.
* <p>
- * Costs are measured in 'seconds':
+ * Cost components are derived from:
* <ul>
- * <li><b>lagCost</b>: Expected time (seconds) to recover current lag</li>
- * <li><b>idleCost</b>: Total compute time (seconds) wasted being idle per
task duration</li>
+ * <li><b>lagCost</b>: weighted expected time to recover current lag</li>
+ * <li><b>idleCost</b>: weighted U-shaped penalty for the predicted idle
ratio</li>
* </ul>
* <p>
- * Formula: {@code lagWeight * lagRecoveryTime + idleWeight * idlenessCost}.
- * This approach directly connects costs to operational metrics.
+ * Formula: {@code lagWeight * lagRecoveryTime + idleWeight *
idleRatioPenalty}.
*
* @return CostResult containing totalCost, lagCost, and idleCost,
* or result with {@link Double#POSITIVE_INFINITY} for invalid inputs
@@ -65,40 +88,68 @@ public class WeightedCostFunction
)
{
if (metrics == null || config == null || proposedTaskCount <= 0 ||
metrics.getPartitionCount() <= 0) {
- return new CostResult(Double.POSITIVE_INFINITY,
Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);
+ return CostResult.INFINITE_COST;
}
final double avgProcessingRate = metrics.getAvgProcessingRate();
- final double lagRecoveryTime;
if (avgProcessingRate < 0) {
throw DruidException.defensive("Avg processing rate[%.2f] must not be
negative.", avgProcessingRate);
+ }
+
+ // Lag recovery time is decreasing by adding tasks and increasing by
ejecting tasks.
+ // In case of increasing lag, we apply an amplification factor to reflect
the urgency of addressing lag.
+ // Caution: we rely only on the metrics, the real issues may be absolutely
different, up to hardware failure.
+ final double lagRecoveryTime;
+ if (metrics.getAggregateLag() <= 0) {
+ lagRecoveryTime = 0;
} else {
- // Lag recovery time is decreasing by adding tasks and increasing by
ejecting tasks.
- // In case of increasing lag, we apply an amplification factor to
reflect the urgency of addressing lag.
- // Caution: we rely only on the metrics, the real issues may be
absolutely different, up to hardware failure.
- if (metrics.getAggregateLag() <= 0) {
- lagRecoveryTime = 0;
+ final double lagPerPartition = metrics.getAggregateLag() /
metrics.getPartitionCount();
+ final double amplification = Math.max(1.0, 1.0 +
LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
+ final double adjustedProcessingRate = Math.max(avgProcessingRate,
MIN_PROCESSING_RATE);
+ lagRecoveryTime = metrics.getAggregateLag() * amplification /
(proposedTaskCount * adjustedProcessingRate);
+ }
+
+ // Capacity-based idle prediction. When the proposed count would
oversaturate the cluster
+ // (busy work exceeds available capacity), the unmet demand becomes a
virtual lag-recovery
+ // time on the same axis as real lag — so the optimizer treats predicted
saturation as
+ // predicted lag, not as "perfect utilization".
+ final double currentPollIdleRatio = metrics.getPollIdleRatio();
+ final int currentTaskCount = metrics.getCurrentTaskCount();
+ final double predictedIdleRatio;
+ final double overrun;
+ if (currentPollIdleRatio < 0) {
+ predictedIdleRatio = 0.5;
+ overrun = 0.0;
+ } else if (currentTaskCount <= 0 || proposedTaskCount == currentTaskCount)
{
+ predictedIdleRatio = currentPollIdleRatio;
+ overrun = 0.0;
+ } else {
+ final double busyFraction = 1.0 - currentPollIdleRatio;
+ final double taskRatio = (double) proposedTaskCount / currentTaskCount;
+ final double rawIdle = 1.0 - busyFraction / taskRatio;
+ if (rawIdle >= 0) {
+ predictedIdleRatio = Math.min(1.0, rawIdle);
+ overrun = 0.0;
} else {
- final double lagPerPartition = metrics.getAggregateLag() /
metrics.getPartitionCount();
- final double amplification = Math.max(1.0, 1.0 +
LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition));
- final double adjustedProcessingRate = Math.max(avgProcessingRate,
MIN_PROCESSING_RATE);
- lagRecoveryTime = metrics.getAggregateLag() * amplification /
(proposedTaskCount * adjustedProcessingRate);
+ predictedIdleRatio = 0.0;
+ overrun = -rawIdle;
}
}
+ final double virtualLagRecoveryTime = overrun *
metrics.getTaskDurationSeconds();
- final double predictedIdleRatio = estimateIdleRatio(metrics,
proposedTaskCount);
- final double idleCost = proposedTaskCount * predictedIdleRatio;
- final double lagCost = config.getLagWeight() * lagRecoveryTime;
+ final double idleCost = uShapedIdleCost(predictedIdleRatio,
proposedTaskCount);
+ final double lagCost = config.getLagWeight() * (lagRecoveryTime +
virtualLagRecoveryTime);
final double weightedIdleCost = config.getIdleWeight() * idleCost;
final double cost = lagCost + weightedIdleCost;
log.debug(
"Cost for taskCount[%d]: lagCost[%.2fs], idleCost[%.2fs], "
- + "predictedIdle[%.3f], finalCost[%.2fs]",
+ + "predictedIdle[%.3f], overrun[%.3f], finalCost[%.2fs]",
proposedTaskCount,
lagCost,
weightedIdleCost,
predictedIdleRatio,
+ overrun,
cost
);
@@ -106,33 +157,28 @@ public class WeightedCostFunction
}
/**
- * Estimates the idle ratio for a proposed task count with linear prediction.
+ * U-shaped idle cost with minimum at {@link #IDEAL_IDLE_RATIO}.
*
- * @param metrics current system metrics containing idle ratio and task
count
- * @param taskCount target task count to estimate an idle ratio for
- * @return estimated idle ratio in range [0.0, 1.0]
+ * <ul>
+ * <li>idle < ideal: under-provisioning penalty, no safety margin, lag
risk</li>
+ * <li>idle = ideal: baseline cost only ({@code taskCount *
IDEAL_IDLE_RATIO})</li>
+ * <li>idle > ideal: over-provisioning penalty, wasted capacity</li>
+ * </ul>
+ * <p>
+ * The ideal-idle baseline keeps cost non-zero at the optimum so the
optimizer
+ * always has a finite trade-off against lag cost.
*/
- private double estimateIdleRatio(CostMetrics metrics, int taskCount)
+ double uShapedIdleCost(double predictedIdleRatio, int taskCount)
{
- final double currentPollIdleRatio = metrics.getPollIdleRatio();
-
- if (currentPollIdleRatio < 0) {
- // No idle data available, assume moderate idle
- return 0.5;
- }
-
- final int currentTaskCount = metrics.getCurrentTaskCount();
- if (currentTaskCount <= 0 || taskCount == currentTaskCount) {
- return currentPollIdleRatio;
+ final double penalty;
+ if (predictedIdleRatio < IDEAL_IDLE_RATIO) {
+ final double norm = (IDEAL_IDLE_RATIO - predictedIdleRatio) /
IDEAL_IDLE_RATIO;
+ penalty = UNDER_PROVISIONING_PENALTY * norm * norm;
+ } else {
+ final double norm = (predictedIdleRatio - IDEAL_IDLE_RATIO) / (1.0 -
IDEAL_IDLE_RATIO);
+ penalty = OVER_PROVISIONING_PENALTY * norm * norm;
}
-
- // Linear prediction (capacity-based) - existing logic
- final double busyFraction = 1.0 - currentPollIdleRatio;
- final double taskRatio = (double) taskCount / currentTaskCount;
- final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 -
busyFraction / taskRatio));
-
- // Clamp to valid range [0, 1]
- return Math.max(0.0, linearPrediction);
+ return taskCount * (IDEAL_IDLE_RATIO + penalty);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
index 05724d1e837..4755b3cf496 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
@@ -49,7 +49,6 @@ public class CostBasedAutoScalerConfigTest
+ " \"scaleActionPeriodMillis\": 60000,\n"
+ " \"lagWeight\": 0.6,\n"
+ " \"idleWeight\": 0.4,\n"
- + " \"highLagThreshold\": 30000,\n"
+ " \"minScaleUpDelay\": \"PT5M\",\n"
+ " \"minScaleDownDelay\": \"PT10M\",\n"
+ " \"scaleDownDuringTaskRolloverOnly\": true\n"
@@ -68,7 +67,8 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(Duration.standardMinutes(5),
config.getMinScaleUpDelay());
Assert.assertEquals(Duration.standardMinutes(10),
config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
- Assert.assertEquals(30000, config.getHighLagThreshold());
+ Assert.assertFalse(config.shouldUseTaskCountBoundariesOnScaleUp());
+ Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleDown());
// Test serialization back to JSON
String serialized = mapper.writeValueAsString(config);
@@ -101,10 +101,10 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS),
config.getMinScaleUpDelay());
Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY,
config.getMinScaleDownDelay());
Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly());
+ Assert.assertFalse(config.shouldUseTaskCountBoundariesOnScaleUp());
+ Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleDown());
Assert.assertNull(config.getTaskCountStart());
Assert.assertNull(config.getStopTaskCountRatio());
- // When highLagThreshold is not set, it defaults to -1 (burst scale-up
disabled)
- Assert.assertEquals(-1, config.getHighLagThreshold());
}
@Test
@@ -185,10 +185,11 @@ public class CostBasedAutoScalerConfigTest
.scaleActionPeriodMillis(60000L)
.lagWeight(0.6)
.idleWeight(0.4)
+
.useTaskCountBoundariesOnScaleUp(true)
+
.useTaskCountBoundariesOnScaleDown(true)
.minScaleUpDelay(Duration.standardMinutes(5))
.minScaleDownDelay(Duration.standardMinutes(10))
.scaleDownDuringTaskRolloverOnly(true)
-
.highLagThreshold(30000)
.build();
Assert.assertTrue(config.getEnableTaskAutoScaler());
@@ -199,10 +200,11 @@ public class CostBasedAutoScalerConfigTest
Assert.assertEquals(60000L, config.getScaleActionPeriodMillis());
Assert.assertEquals(0.6, config.getLagWeight(), 0.001);
Assert.assertEquals(0.4, config.getIdleWeight(), 0.001);
+ Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleUp());
+ Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleDown());
Assert.assertEquals(Duration.standardMinutes(5),
config.getMinScaleUpDelay());
Assert.assertEquals(Duration.standardMinutes(10),
config.getMinScaleDownDelay());
Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly());
- Assert.assertEquals(30000, config.getHighLagThreshold());
}
@Test
@@ -243,7 +245,10 @@ public class CostBasedAutoScalerConfigTest
.build();
Assert.assertEquals(Duration.standardMinutes(5),
bothSet.getMinScaleUpDelay());
Assert.assertEquals(Duration.standardMinutes(20),
bothSet.getMinScaleDownDelay());
- CostBasedAutoScalerConfig roundTripped =
mapper.readValue(mapper.writeValueAsString(bothSet),
CostBasedAutoScalerConfig.class);
+ CostBasedAutoScalerConfig roundTripped = mapper.readValue(
+ mapper.writeValueAsString(bothSet),
+ CostBasedAutoScalerConfig.class
+ );
Assert.assertEquals(bothSet, roundTripped);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
deleted file mode 100644
index 122a449c5f7..00000000000
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests for {@link CostBasedAutoScaler#computeExtraPPTIncrease}.
- * <p>
- * The burst scaling uses a logarithmic formula:
- * {@code deltaTasks = K * ln(lagSeverity)}
- * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}.
- */
-public class CostBasedAutoScalerHighLagScalingTest
-{
- private static final int LAG_THRESHOLD = 50_000;
- private static final int PARTITION_COUNT = 48;
- private static final int TASK_COUNT_MAX = 48;
-
- /**
- * Tests scaling behavior across different lag levels and task counts.
- * <p>
- * Expected behavior for 48 partitions with threshold=50K:
- * <pre>
- * | Current | Lag/Part | PPT reduction | Notes
|
- *
|---------|----------|---------------|-----------------------------------------|
- * | any | <50K | 0 | Below threshold
|
- * | any | =50K | 0 | ln(1) = 0
|
- * | 1 | 100K | 40 | Significant boost for recovery
|
- * | 1 | 200K | 43 | Large boost
|
- * | 4 | 200K | 6 | Moderate boost
|
- * | 12 | 200K | 0 | Delta too small for PPT change
|
- * | 24 | 200K | 0 | Delta too small for PPT change
|
- * </pre>
- */
- @Test
- public void testComputeExtraPPTIncrease()
- {
- // Below threshold: no boost
- Assert.assertEquals(
- "Below threshold should not increase PPT",
- 0,
- CostBasedAutoScaler.computeExtraPPTIncrease(
- LAG_THRESHOLD,
- PARTITION_COUNT * 40_000L,
- PARTITION_COUNT,
- 4,
- TASK_COUNT_MAX
- )
- );
-
- // At threshold (lagSeverity=1, ln(1)=0): no boost
- Assert.assertEquals(
- "At threshold (ln(1)=0) should not increase PPT",
- 0,
- CostBasedAutoScaler.computeExtraPPTIncrease(
- LAG_THRESHOLD,
- PARTITION_COUNT * 50_000L,
- PARTITION_COUNT,
- 4,
- TASK_COUNT_MAX
- )
- );
-
- // C=1, 100K lag (2x threshold): significant boost for emergency recovery
- int boost1_100k = CostBasedAutoScaler.computeExtraPPTIncrease(
- LAG_THRESHOLD,
- PARTITION_COUNT * 100_000L,
- PARTITION_COUNT,
- 1,
- TASK_COUNT_MAX
- );
- Assert.assertEquals("C=1, 100K lag boost", 40, boost1_100k);
-
- // C=1, 200K lag (4x threshold): large boost
- int boost1_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
- LAG_THRESHOLD,
- PARTITION_COUNT * 200_000L,
- PARTITION_COUNT,
- 1,
- TASK_COUNT_MAX
- );
- Assert.assertEquals("C=1, 200K lag boost", 43, boost1_200k);
-
- // C=4, 200K lag: moderate boost (K decreases with sqrt(C))
- int boost4_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
- LAG_THRESHOLD,
- PARTITION_COUNT * 200_000L,
- PARTITION_COUNT,
- 4,
- TASK_COUNT_MAX
- );
- Assert.assertEquals("C=4, 200K lag should yield a modest PPT increase", 6,
boost4_200k);
-
- // C=12, 200K lag: delta too small to change PPT
- int boost12_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
- LAG_THRESHOLD,
- PARTITION_COUNT * 200_000L,
- PARTITION_COUNT,
- 12,
- TASK_COUNT_MAX
- );
- Assert.assertEquals("C=12, 200K lag should not change PPT", 0,
boost12_200k);
-
- // C=24, 200K lag: delta too small to change PPT
- int boost24_200k = CostBasedAutoScaler.computeExtraPPTIncrease(
- LAG_THRESHOLD,
- PARTITION_COUNT * 200_000L,
- PARTITION_COUNT,
- 24,
- TASK_COUNT_MAX
- );
- Assert.assertEquals("C=24, 200K lag should not change PPT", 0,
boost24_200k);
- }
-
- @Test
- public void testComputeExtraPPTIncreaseInvalidInputs()
- {
- Assert.assertEquals(
- 0,
- CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000,
0, 4, 48)
- );
- Assert.assertEquals(
- 0,
- CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000,
48, 0, 48)
- );
- Assert.assertEquals(
- 0,
- CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000,
48, 4, 0)
- );
- Assert.assertEquals(
- 0,
- CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000,
-1, 4, 48)
- );
- }
-}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 4b5666a85ee..56eacb43eab 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -77,94 +77,62 @@ public class CostBasedAutoScalerTest
@Test
public void testComputeValidTaskCounts()
{
- boolean useTaskCountBoundaries = true;
- int highLagThreshold = 50_000;
-
// For 100 partitions at 25 tasks (4 partitions/task), valid counts
include 25 and 34
- int[] validTaskCounts = computeValidTaskCounts(
+ final int[] validTaskCounts = computeValidTaskCounts(
100,
25,
- 0L,
1,
- 100,
- useTaskCountBoundaries,
- highLagThreshold
+ 100
);
Assert.assertTrue("Expected current task count to be included",
contains(validTaskCounts, 25));
Assert.assertTrue("Expected next scale-up option (34) to be included",
contains(validTaskCounts, 34));
// Single partition
- int[] singlePartition = computeValidTaskCounts(
+ final int[] singlePartition = computeValidTaskCounts(
1,
1,
- 0L,
1,
- 100,
- useTaskCountBoundaries,
- highLagThreshold
+ 100
);
Assert.assertTrue("Single partition should yield at least one valid
count", singlePartition.length > 0);
Assert.assertTrue("Single partition should include task count 1",
contains(singlePartition, 1));
// Current exceeds partitions - should still yield valid, deduplicated
options
- int[] exceedsPartitions = computeValidTaskCounts(
+ final int[] exceedsPartitions = computeValidTaskCounts(
2,
5,
- 0L,
1,
- 100,
- useTaskCountBoundaries,
- highLagThreshold
+ 100
);
Assert.assertEquals(2, exceedsPartitions.length);
Assert.assertTrue(contains(exceedsPartitions, 1));
Assert.assertTrue(contains(exceedsPartitions, 2));
- // Lag expansion: low lag should not include max, high lag should allow
aggressive scaling
- int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 1, 30,
useTaskCountBoundaries, highLagThreshold);
- Assert.assertFalse("Low lag should not include max task count",
contains(lowLagCounts, 30));
- Assert.assertTrue("Low lag should cap scale-up around 4 tasks",
contains(lowLagCounts, 4));
-
- // High lag uses logarithmic formula: K * ln(lagSeverity) where K =
P/(6.4*sqrt(C))
- // For P=30, C=3, lagPerPartition=500K, threshold=50K: lagSeverity=10,
K=2.7, delta=6.2
- // This allows controlled scaling to ~10-15 tasks (not all the way to max)
- long highAggregateLag = 30L * 500_000L;
- int[] highLagCounts = computeValidTaskCounts(
- 30,
- 3,
- highAggregateLag,
- 1,
- 30,
- useTaskCountBoundaries,
- highLagThreshold
- );
- Assert.assertTrue("High lag should allow scaling to 10 tasks",
contains(highLagCounts, 10));
- Assert.assertTrue("High lag should allow scaling to 15 tasks",
contains(highLagCounts, 15));
- Assert.assertFalse("High lag should not jump straight to max (30) from 3",
contains(highLagCounts, 30));
+ // Unbounded candidate generation includes both nearby and maximum task
counts.
+ final int[] taskCounts = computeValidTaskCounts(30, 3, 1, 30);
+ Assert.assertTrue("Valid task counts should include max task count",
contains(taskCounts, 30));
+ Assert.assertTrue("Valid task counts should include nearby scale-up task
count", contains(taskCounts, 4));
// Respects taskCountMax
- int[] cappedCounts = computeValidTaskCounts(
+ final int[] cappedCounts = computeValidTaskCounts(
30,
4,
- highAggregateLag,
1,
- 3,
- useTaskCountBoundaries,
- highLagThreshold
+ 3
);
Assert.assertTrue("Should include taskCountMax when within bounds",
contains(cappedCounts, 3));
Assert.assertFalse("Should not exceed taskCountMax",
contains(cappedCounts, 4));
// Respects taskCountMin - filters out values below the minimum
// With partitionCount=100, currentTaskCount=10, the computed range
includes values like 8, 9, 10, 12, 13
- int[] minCappedCounts = computeValidTaskCounts(100, 10, 0L, 10, 100,
useTaskCountBoundaries, highLagThreshold);
+ final int[] minCappedCounts = computeValidTaskCounts(100, 10, 10, 100);
Assert.assertFalse("Should not include values below taskCountMin (8)",
contains(minCappedCounts, 8));
Assert.assertFalse("Should not include values below taskCountMin (9)",
contains(minCappedCounts, 9));
Assert.assertTrue("Should include values at taskCountMin (10)",
contains(minCappedCounts, 10));
Assert.assertTrue("Should include values above taskCountMin (12)",
contains(minCappedCounts, 12));
// Both bounds applied together
- int[] bothBounds = computeValidTaskCounts(100, 10, 0L, 10, 12,
useTaskCountBoundaries, highLagThreshold);
+ final int[] bothBounds = computeValidTaskCounts(100, 10, 10, 12);
Assert.assertFalse("Should not include values below taskCountMin (8)",
contains(bothBounds, 8));
Assert.assertFalse("Should not include values below taskCountMin (9)",
contains(bothBounds, 9));
Assert.assertFalse("Should not include values above taskCountMax (13)",
contains(bothBounds, 13));
@@ -182,7 +150,7 @@ public class CostBasedAutoScalerTest
Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));
// Negative pollIdleRatio (metric unavailable) should still allow scaling
- int unavailableIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, -1.0));
+ final int unavailableIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, -1.0));
MatcherAssert.assertThat(
"Negative pollIdleRatio should not reject scaling",
unavailableIdleResult,
@@ -190,16 +158,84 @@ public class CostBasedAutoScalerTest
);
// High idle (underutilized) - should scale down
- int scaleDownResult =
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
+ final int scaleDownResult =
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
Assert.assertTrue("Expected scale-down when idle ratio is high (>0.6)",
scaleDownResult < 25);
// Very high idle with high task count - should scale down
- int highIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
+ final int highIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9));
Assert.assertTrue("High idle should not suggest scale-up", highIdleResult
<= 50);
- // With low idle and balanced weights, the algorithm should not scale up
aggressively
- int lowIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
- Assert.assertTrue("With low idle and balanced weights, avoid aggressive
scale-up", lowIdleResult <= 25);
+ // With idle below ideal (0.1 < 0.25), U-shaped cost penalizes
under-provisioning,
+ // driving a moderate scale-up toward the ideal operating point.
+ final int lowIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1));
+ Assert.assertTrue(
+ "Low idle below ideal should drive scale-up toward ideal operating
point",
+ lowIdleResult > 25
+ );
+ }
+
+ @Test
+ public void testComputeOptimalTaskCountLimitsTaskCountJumps()
+ {
+ final CostBasedAutoScalerConfig boundedScaleUpConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(1.0)
+
.idleWeight(0.0)
+
.useTaskCountBoundariesOnScaleUp(true)
+
.build();
+ final CostBasedAutoScaler boundedScaleUpScaler =
createAutoScaler(boundedScaleUpConfig);
+
+ Assert.assertEquals(
+ "Scale-up should only evaluate two task-count candidates above the
current count",
+ 13,
+ boundedScaleUpScaler.computeOptimalTaskCount(createMetrics(100_000.0,
10, 100, 0.25))
+ );
+
+ final CostBasedAutoScalerConfig unboundedScaleUpConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(1.0)
+
.idleWeight(0.0)
+
.build();
+ final CostBasedAutoScaler unboundedScaleUpScaler =
createAutoScaler(unboundedScaleUpConfig);
+ Assert.assertEquals(
+ "Without scale-up boundaries, lag-only optimization should jump to max
task count",
+ 100,
+
unboundedScaleUpScaler.computeOptimalTaskCount(createMetrics(100_000.0, 10,
100, 0.25))
+ );
+
+ final CostBasedAutoScalerConfig boundedScaleDownConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(100)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(0.0)
+
.idleWeight(1.0)
+
.useTaskCountBoundariesOnScaleDown(true)
+
.build();
+ final CostBasedAutoScaler boundedScaleDownScaler =
createAutoScaler(boundedScaleDownConfig);
+
+ Assert.assertEquals(
+ "Scale-down should only evaluate two task-count candidates below the
current count",
+ 34,
+ boundedScaleDownScaler.computeOptimalTaskCount(createMetrics(0.0, 100,
100, 0.9))
+ );
+
+ final CostBasedAutoScalerConfig unboundedScaleDownConfig =
CostBasedAutoScalerConfig.builder()
+
.taskCountMax(25)
+
.taskCountMin(1)
+
.enableTaskAutoScaler(true)
+
.lagWeight(0.0)
+
.idleWeight(1.0)
+
.build();
+ final CostBasedAutoScaler unboundedScaleDownScaler =
createAutoScaler(unboundedScaleDownConfig);
+ Assert.assertEquals(
+ "Without scale-down boundaries, idle-only optimization may select a
much lower task count",
+ 1,
+ unboundedScaleDownScaler.computeOptimalTaskCount(createMetrics(0.0,
100, 100, 0.9))
+ );
}
@Test
@@ -531,6 +567,21 @@ public class CostBasedAutoScalerTest
);
}
+ private CostBasedAutoScaler createAutoScaler(CostBasedAutoScalerConfig
config)
+ {
+ final SupervisorSpec mockSupervisorSpec =
Mockito.mock(SupervisorSpec.class);
+ final SeekableStreamSupervisor mockSupervisor =
Mockito.mock(SeekableStreamSupervisor.class);
+ final ServiceEmitter mockEmitter = Mockito.mock(ServiceEmitter.class);
+ final SeekableStreamSupervisorIOConfig mockIoConfig =
Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+ when(mockSupervisorSpec.getId()).thenReturn("test-supervisor");
+
when(mockSupervisorSpec.getDataSources()).thenReturn(List.of("test-datasource"));
+ when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
+ when(mockIoConfig.getStream()).thenReturn("test-stream");
+
+ return new CostBasedAutoScaler(mockSupervisor, config, mockSupervisorSpec,
mockEmitter);
+ }
+
private boolean contains(int[] array, int value)
{
for (int i : array) {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
index f96df30049e..8d5448065fa 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java
@@ -195,10 +195,10 @@ public class WeightedCostFunctionTest
}
@Test
- public void testIdleCostMonotonicWithTaskCount()
+ public void testIdleCostIsUShapedAroundIdealRatio()
{
- // Test that idle cost increases monotonically with task count.
- // With fixed load, adding more tasks means each task has less work, so
idle increases.
+ // U-shaped cost: minimum near IDEAL_IDLE_RATIO=0.25, higher on both sides.
+ // Current: 10 tasks with 25% idle (already at ideal).
CostBasedAutoScalerConfig idleOnlyConfig =
CostBasedAutoScalerConfig.builder()
.taskCountMax(100)
.taskCountMin(1)
@@ -207,18 +207,19 @@ public class WeightedCostFunctionTest
.idleWeight(1.0)
.build();
- // Current: 10 tasks with 40% idle (60% busy)
- CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
+ CostMetrics metrics = createMetrics(0.0, 10, 100, 0.25);
+
+ // At current (idle=0.25=ideal): baseline cost only, penalty=0
+ double costAtIdeal = costFunction.computeCost(metrics, 10,
idleOnlyConfig).totalCost();
+
+ // Scale down → predicted idle falls below ideal → under-provisioning
penalty
+ double costScaleDown = costFunction.computeCost(metrics, 5,
idleOnlyConfig).totalCost();
- double costAt5 = costFunction.computeCost(metrics, 5,
idleOnlyConfig).totalCost();
- double costAt10 = costFunction.computeCost(metrics, 10,
idleOnlyConfig).totalCost();
- double costAt15 = costFunction.computeCost(metrics, 15,
idleOnlyConfig).totalCost();
- double costAt20 = costFunction.computeCost(metrics, 20,
idleOnlyConfig).totalCost();
+ // Scale up → predicted idle rises above ideal → over-provisioning penalty
+ double costScaleUp = costFunction.computeCost(metrics, 20,
idleOnlyConfig).totalCost();
- // Monotonically increasing idle cost as tasks increase
- Assert.assertTrue("cost(5) < cost(10)", costAt5 < costAt10);
- Assert.assertTrue("cost(10) < cost(15)", costAt10 < costAt15);
- Assert.assertTrue("cost(15) < cost(20)", costAt15 < costAt20);
+ Assert.assertTrue("scale-down costs more than ideal", costScaleDown >
costAtIdeal);
+ Assert.assertTrue("scale-up costs more than ideal", costScaleUp >
costAtIdeal);
}
@Test
@@ -238,8 +239,10 @@ public class WeightedCostFunctionTest
CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4);
double costAt2 = costFunction.computeCost(metrics, 2,
idleOnlyConfig).totalCost();
- // idlenessCost = taskCount * 0.0 (clamped) = 0
- Assert.assertEquals("Idle cost should be 0 when predicted idle is clamped
to 0", 0.0, costAt2, 0.0001);
+ // idle = 0: max under-provisioning penalty; cost = taskCount * (IDEAL +
UNDER_PENALTY)
+ double expectedAt2 = 2 * (WeightedCostFunction.IDEAL_IDLE_RATIO +
WeightedCostFunction.UNDER_PROVISIONING_PENALTY);
+ Assert.assertEquals("Idle cost at clamped-to-zero idle ratio should
reflect full under-provisioning penalty",
+ expectedAt2, costAt2, 0.0001);
// Extreme scale-up shouldn't exceed 1.0 for idle ratio
// 10 tasks → 100 tasks with 10% idle
@@ -267,11 +270,13 @@ public class WeightedCostFunctionTest
double cost10 = costFunction.computeCost(missingIdleData, 10,
idleOnlyConfig).totalCost();
double cost20 = costFunction.computeCost(missingIdleData, 20,
idleOnlyConfig).totalCost();
- // With missing data, predicted idle = 0.5 for all task counts
- // idlenessCost at 10 = 10 * 0.5 = 5
- // idlenessCost at 20 = 20 * 0.5 = 10
- Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 * 0.5,
cost10, 0.0001);
- Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 0.5,
cost20, 0.0001);
+ // With missing data, predicted idle = 0.5 for all task counts regardless
of proposed count.
+ // U-shaped cost at idle=0.5: idle > IDEAL(0.25),
norm=(0.5-0.25)/0.75=1/3, penalty=1*(1/3)^2=1/9
+ double expectedCostPerTask = WeightedCostFunction.IDEAL_IDLE_RATIO
+ +
WeightedCostFunction.OVER_PROVISIONING_PENALTY * (1.0 / 3.0) * (1.0 / 3.0);
+ Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 *
expectedCostPerTask, cost10, 0.0001);
+ Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 *
expectedCostPerTask, cost20, 0.0001);
+ Assert.assertEquals("Cost scales linearly with task count at fixed idle
ratio", 2 * cost10, cost20, 0.0001);
}
@Test
@@ -338,6 +343,51 @@ public class WeightedCostFunctionTest
}
+ @Test
+ public void testUShapedIdleCostFormula()
+ {
+ int n = 10;
+
+ // At ideal ratio: penalty = 0, cost = n * IDEAL_IDLE_RATIO
+ Assert.assertEquals(
+ n * WeightedCostFunction.IDEAL_IDLE_RATIO,
+ costFunction.uShapedIdleCost(WeightedCostFunction.IDEAL_IDLE_RATIO, n),
+ 1e-9
+ );
+
+ // At idle = 0 (fully under-provisioned): norm = 1, penalty =
UNDER_PROVISIONING_PENALTY
+ Assert.assertEquals(
+ n * (WeightedCostFunction.IDEAL_IDLE_RATIO +
WeightedCostFunction.UNDER_PROVISIONING_PENALTY),
+ costFunction.uShapedIdleCost(0.0, n),
+ 1e-9
+ );
+
+ // At idle = 1 (fully over-provisioned): norm = 1, penalty =
OVER_PROVISIONING_PENALTY
+ Assert.assertEquals(
+ n * (WeightedCostFunction.IDEAL_IDLE_RATIO +
WeightedCostFunction.OVER_PROVISIONING_PENALTY),
+ costFunction.uShapedIdleCost(1.0, n),
+ 1e-9
+ );
+
+ // Both extremes exceed the ideal cost
+ double idealCost =
costFunction.uShapedIdleCost(WeightedCostFunction.IDEAL_IDLE_RATIO, n);
+ Assert.assertTrue("idle=0 costs more than ideal",
costFunction.uShapedIdleCost(0.0, n) > idealCost);
+ Assert.assertTrue("idle=1 costs more than ideal",
costFunction.uShapedIdleCost(1.0, n) > idealCost);
+
+ // Under-provisioning is penalized more than over-provisioning (UNDER >
OVER)
+ Assert.assertTrue(
+ "under-provisioning penalty exceeds over-provisioning penalty",
+ costFunction.uShapedIdleCost(0.0, n) >
costFunction.uShapedIdleCost(1.0, n)
+ );
+
+ // Cost scales linearly with task count at any fixed idle ratio
+ Assert.assertEquals(
+ 2 * costFunction.uShapedIdleCost(0.5, n),
+ costFunction.uShapedIdleCost(0.5, 2 * n),
+ 1e-9
+ );
+ }
+
private CostMetrics createMetrics(
double avgPartitionLag,
int currentTaskCount,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]