jtuglu1 commented on code in PR #19269:
URL: https://github.com/apache/druid/pull/19269#discussion_r3081066265
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction()
lastKnownMetrics = collectMetrics();
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
- final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+ int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+
+ // Take the current task count but clamp it to the configured boundaries
if it is outside the boundaries.
+ // There might be a configuration instance with a handwritten taskCount
that is outside the boundaries.
+ final boolean isTaskCountOutOfBounds = currentTaskCount <
config.getTaskCountMin()
Review Comment:
Do we want to make sure we are not bounding if task count count min, say, if
we don't have metrics temporarily (return -1 in computeOptimalTaskCount).
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction()
lastKnownMetrics = collectMetrics();
final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
- final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+ int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+
+ // Take the current task count but clamp it to the configured boundaries
if it is outside the boundaries.
+ // There might be a configuration instance with a handwritten taskCount
that is outside the boundaries.
+ final boolean isTaskCountOutOfBounds = currentTaskCount <
config.getTaskCountMin()
+ || currentTaskCount >
config.getTaskCountMax();
+ if (isTaskCountOutOfBounds) {
+ currentTaskCount = Math.min(config.getTaskCountMax(),
Math.max(config.getTaskCountMin(), currentTaskCount));
+ }
// Perform scale-up actions; scale-down actions only if configured.
final int taskCount;
- if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
+
+ // If task count is out of bounds, scale to the configured boundary
+ // regardless of optimal task count, to get back to a safe state.
+ if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
+ taskCount = currentTaskCount;
+ lastScaleActionTimeMillis = DateTimes.nowUtc().getMillis();
+ log.info("Task count for supervisor[%s] was out of bounds [%d,%d],
scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax());
Review Comment:
nit: let's include what we're scaling to. The other messages do.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java:
##########
@@ -88,13 +88,16 @@ public SeekableStreamSupervisorIOConfig(
this.lagAggregator = lagAggregator;
// Could be null
this.autoScalerConfig = autoScalerConfig;
- this.autoScalerEnabled = autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoScaler();
+ boolean isAutoScalerAvailable = autoScalerConfig != null;
+ this.autoScalerEnabled = isAutoScalerAvailable &&
autoScalerConfig.getEnableTaskAutoScaler();
if (autoScalerEnabled) {
// Priority: taskCountStart > taskCount > taskCountMin
this.taskCount = Configs.valueOrDefault(
autoScalerConfig.getTaskCountStart(),
Configs.valueOrDefault(taskCount, autoScalerConfig.getTaskCountMin())
);
+ } else if (isAutoScalerAvailable) {
+ this.taskCount = taskCount != null ? taskCount :
autoScalerConfig.getTaskCountMin();
Review Comment:
nit: Configs.valueOrDefault
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]