jtuglu1 commented on code in PR #19269:
URL: https://github.com/apache/druid/pull/19269#discussion_r3059648161


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction()
     lastKnownMetrics = collectMetrics();
 
     final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
-    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+    int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+
+    // Take the current task count but clamp it to the configured boundaries 
if it is outside the boundaries.
+    // There might be a configuration instance with a handwritten taskCount 
that is outside the boundaries.
+    final boolean isTaskCountOutOfBounds = currentTaskCount < 
config.getTaskCountMin()
+                                     || currentTaskCount > 
config.getTaskCountMax();
+    if (isTaskCountOutOfBounds) {
+      currentTaskCount = Math.min(config.getTaskCountMax(),
+                                   Math.max(config.getTaskCountMin(), 
supervisor.getIoConfig().getTaskCount()));
+    }
 
     // Perform scale-up actions; scale-down actions only if configured.
     final int taskCount;
-    if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
+
+    // If task count is out of bounds, scale to the configured boundary
+    // regardless of optimal task count, to get back to a safe state.
+    if (isScaleActionAllowed() && isTaskCountOutOfBounds) {

Review Comment:
   Do we want to respect this `isScaleActionAllowed` if we're violating min/max 
task count bounds?



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java:
##########
@@ -232,6 +232,56 @@ public void testScaleUpFromMinimumTasks()
     );
   }
 
+  @Test
+  public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin()
+  {
+    CostBasedAutoScalerConfig boundedConfig = 
CostBasedAutoScalerConfig.builder()
+                                                                       
.taskCountMax(100)
+                                                                       
.taskCountMin(50)
+                                                                       
.enableTaskAutoScaler(true)
+                                                                       
.build();
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter));
+
+    final int configuredTaskCount = 1;
+    final int expectedTaskCount = 50;
+
+    
doReturn(expectedTaskCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 
0.2);
+
+    final int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(

Review Comment:
   nit: Either mock computeOptimalTaskCount to return a value different from 
the clamped value (e.g. mock it to return -1 or taskCountMin - 1) and assert 
the boundary is returned, or use verify(autoScaler, 
never()).computeOptimalTaskCount(any()) to confirm the early-return path was 
taken.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction()
     lastKnownMetrics = collectMetrics();
 
     final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
-    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+    int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+
+    // Take the current task count but clamp it to the configured boundaries 
if it is outside the boundaries.
+    // There might be a configuration instance with a handwritten taskCount 
that is outside the boundaries.
+    final boolean isTaskCountOutOfBounds = currentTaskCount < 
config.getTaskCountMin()
+                                     || currentTaskCount > 
config.getTaskCountMax();
+    if (isTaskCountOutOfBounds) {
+      currentTaskCount = Math.min(config.getTaskCountMax(),
+                                   Math.max(config.getTaskCountMin(), 
supervisor.getIoConfig().getTaskCount()));

Review Comment:
   nit: use `currentTaskCount`



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -174,11 +174,26 @@ public int computeTaskCountForScaleAction()
     lastKnownMetrics = collectMetrics();
 
     final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
-    final int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+    int currentTaskCount = supervisor.getIoConfig().getTaskCount();
+
+    // Take the current task count but clamp it to the configured boundaries 
if it is outside the boundaries.
+    // There might be a configuration instance with a handwritten taskCount 
that is outside the boundaries.
+    final boolean isTaskCountOutOfBounds = currentTaskCount < 
config.getTaskCountMin()
+                                     || currentTaskCount > 
config.getTaskCountMax();
+    if (isTaskCountOutOfBounds) {
+      currentTaskCount = Math.min(config.getTaskCountMax(),
+                                   Math.max(config.getTaskCountMin(), 
supervisor.getIoConfig().getTaskCount()));
+    }
 
     // Perform scale-up actions; scale-down actions only if configured.
     final int taskCount;
-    if (isScaleActionAllowed() && optimalTaskCount > currentTaskCount) {
+
+    // If task count is out of bounds, scale to the configured boundary
+    // regardless of optimal task count, to get back to a safe state.
+    if (isScaleActionAllowed() && isTaskCountOutOfBounds) {
+      taskCount = currentTaskCount;
+      log.info("Task count for supervisor[%s] was out of bounds [%d,%d], 
scaling.", supervisorId, config.getTaskCountMin(), config.getTaskCountMax());

Review Comment:
   Don't we want to set: `lastScaleActionTimeMillis = 
DateTimes.nowUtc().getMillis();`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -212,7 +212,7 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream (Kafka/Kinesis)
    * @return Integer, target number of tasksCount. -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  int computeDesiredTaskCount(List<Long> lags)

Review Comment:
   Let's mark with the proper @VisibleForTests annotation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to