This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fc63999a6af fix: Handle poll idle ratio not being available. (#19246)
fc63999a6af is described below

commit fc63999a6af60d676d0f6bc5e0847659e9c0da7f
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Apr 1 03:46:47 2026 -0700

    fix: Handle poll idle ratio not being available. (#19246)
    
    The change to RecordSupplier allows Kinesis task live reports to work
    again. They had been throwing UnsupportedOperationException.
    
    The change to CostBasedAutoScaler differentiates poll idle ratio of -1
    (no data) from 0 (never idle). There was already some logic in the class
    for dealing with the negative case: it would treat it like 0.5. But this
    logic had not been reachable due to extractPollIdleRatio returning 0
    rather than -1 for the no-data case.
---
 .../seekablestream/common/RecordSupplier.java      |  3 +-
 .../supervisor/autoscaler/CostBasedAutoScaler.java |  8 ++---
 .../autoscaler/CostBasedAutoScalerTest.java        | 36 ++++++++++++++--------
 3 files changed, 29 insertions(+), 18 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index aac1e8a784d..4b4d0f2434a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -146,11 +146,12 @@ public interface RecordSupplier<PartitionIdType, 
SequenceOffsetType, RecordType
    * Average poll-to-idle ratio as reported by the stream consumer.
    * A value of 0 represents that the consumer is never idle, i.e. always 
consuming.
    * A value of 1 represents that the consumer is always idle, i.e. not 
receiving data.
+   * A negative value indicates that no valid metric is available.
    * Used by the supervisor auto-scaler to find an optimal task count that 
minimizes idle time.
    */
   default double getPollIdleRatioMetric()
   {
-    throw new UnsupportedOperationException();
+    return -1;
   }
 
   /**
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 20da7ab7901..f6527beb8ec 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -400,12 +400,12 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
    * This metric indicates how much time the consumer spends idle waiting for 
data.
    *
    * @param taskStats the stats map from supervisor.getStats()
-   * @return the average poll-idle-ratio across all tasks, or 0 if no valid 
metrics are available
+   * @return the average poll-idle-ratio across all tasks, or -1 if no valid 
metrics are available
    */
   static double extractPollIdleRatio(Map<String, Map<String, Object>> 
taskStats)
   {
     if (taskStats == null || taskStats.isEmpty()) {
-      return 0.;
+      return -1.;
     }
 
     double sum = 0;
@@ -425,7 +425,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
       }
     }
 
-    return count > 0 ? sum / count : 0.;
+    return count > 0 ? sum / count : -1.;
   }
 
   /**
@@ -543,7 +543,7 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
   {
     if (metrics == null) {
       return Either.error("No metrics collected");
-    } else if (metrics.getAvgProcessingRate() < 0 || 
metrics.getPollIdleRatio() < 0) {
+    } else if (metrics.getAvgProcessingRate() < 0) {
       return Either.error("Task metrics not available");
     } else if (metrics.getCurrentTaskCount() <= 0 || 
metrics.getPartitionCount() <= 0) {
       return Either.error("Supervisor metrics not available");
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 0eea53b16ed..12f52279226 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -26,6 +26,8 @@ import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Before;
@@ -179,6 +181,14 @@ public class CostBasedAutoScalerTest
     Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 10, -5, 0.3)));
     Assert.assertEquals(-1, 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));
 
+    // Negative pollIdleRatio (metric unavailable) should still allow scaling
+    int unavailableIdleResult = 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, -1.0));
+    MatcherAssert.assertThat(
+        "Negative pollIdleRatio should not reject scaling",
+        unavailableIdleResult,
+        Matchers.greaterThanOrEqualTo(1)
+    );
+
     // High idle (underutilized) - should scale down
     int scaleDownResult = 
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
     Assert.assertTrue("Expected scale-down when idle ratio is high (>0.6)", 
scaleDownResult < 25);
@@ -195,26 +205,26 @@ public class CostBasedAutoScalerTest
   @Test
   public void testExtractPollIdleRatio()
   {
-    // Null and empty return 0
+    // Null and empty return -1 (no data)
     Assert.assertEquals(
-        "Null stats should yield 0 idle ratios",
-        0.,
+        "Null stats should yield -1 idle ratio",
+        -1.,
         CostBasedAutoScaler.extractPollIdleRatio(null),
         0.0001
     );
     Assert.assertEquals(
-        "Empty stats should yield 0 idle ratios",
-        0.,
+        "Empty stats should yield -1 idle ratio",
+        -1.,
         CostBasedAutoScaler.extractPollIdleRatio(Collections.emptyMap()),
         0.0001
     );
 
-    // Missing metrics return 0
+    // Missing metrics return -1 (no data)
     Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
     missingMetrics.put("0", Collections.singletonMap("task-0", new 
HashMap<>()));
     Assert.assertEquals(
-        "Missing autoscaler metrics should yield 0 idle ratios",
-        0.,
+        "Missing autoscaler metrics should yield -1 idle ratio",
+        -1.,
         CostBasedAutoScaler.extractPollIdleRatio(missingMetrics),
         0.0001
     );
@@ -237,7 +247,7 @@ public class CostBasedAutoScalerTest
     nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
     Assert.assertEquals(
         "Non-map task stats should be ignored",
-        0.,
+        -1.,
         CostBasedAutoScaler.extractPollIdleRatio(nonMapTask),
         0.0001
     );
@@ -248,8 +258,8 @@ public class CostBasedAutoScalerTest
     taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new 
HashMap<>());
     emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
     Assert.assertEquals(
-        "Empty autoscaler metrics should yield 0 idle ratios",
-        0.,
+        "Empty autoscaler metrics should yield -1 idle ratio",
+        -1.,
         CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler),
         0.0001
     );
@@ -261,7 +271,7 @@ public class CostBasedAutoScalerTest
     nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
     Assert.assertEquals(
         "Non-map autoscaler metrics should be ignored",
-        0.,
+        -1.,
         CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler),
         0.0001
     );
@@ -275,7 +285,7 @@ public class CostBasedAutoScalerTest
     nonNumberRatio.put("0", Collections.singletonMap("task-0", taskStats3));
     Assert.assertEquals(
         "Non-numeric poll idle ratio should be ignored",
-        0.,
+        -1.,
         CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio),
         0.0001
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to