This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new fc63999a6af fix: Handle poll idle ratio not being available. (#19246)
fc63999a6af is described below
commit fc63999a6af60d676d0f6bc5e0847659e9c0da7f
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Apr 1 03:46:47 2026 -0700
fix: Handle poll idle ratio not being available. (#19246)
The change to RecordSupplier allows Kinesis task live reports to work
again. They had been throwing UnsupportedOperationException.
The change to CostBasedAutoScaler differentiates poll idle ratio of -1
(no data) from 0 (never idle). There was already some logic in the class
for dealing with the negative case: it would treat it like 0.5. But this
logic had not been reachable due to extractPollIdleRatio returning 0
rather than -1 for the no-data case.
---
.../seekablestream/common/RecordSupplier.java | 3 +-
.../supervisor/autoscaler/CostBasedAutoScaler.java | 8 ++---
.../autoscaler/CostBasedAutoScalerTest.java | 36 ++++++++++++++--------
3 files changed, 29 insertions(+), 18 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index aac1e8a784d..4b4d0f2434a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -146,11 +146,12 @@ public interface RecordSupplier<PartitionIdType,
SequenceOffsetType, RecordType
* Average poll-to-idle ratio as reported by the stream consumer.
* A value of 0 represents that the consumer is never idle, i.e. always
consuming.
* A value of 1 represents that the consumer is always idle, i.e. not
receiving data.
+ * A negative value indicates that no valid metric is available.
* Used by the supervisor auto-scaler to find an optimal task count that
minimizes idle time.
*/
default double getPollIdleRatioMetric()
{
- throw new UnsupportedOperationException();
+ return -1;
}
/**
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 20da7ab7901..f6527beb8ec 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -400,12 +400,12 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
* This metric indicates how much time the consumer spends idle waiting for
data.
*
* @param taskStats the stats map from supervisor.getStats()
- * @return the average poll-idle-ratio across all tasks, or 0 if no valid
metrics are available
+ * @return the average poll-idle-ratio across all tasks, or -1 if no valid
metrics are available
*/
static double extractPollIdleRatio(Map<String, Map<String, Object>>
taskStats)
{
if (taskStats == null || taskStats.isEmpty()) {
- return 0.;
+ return -1.;
}
double sum = 0;
@@ -425,7 +425,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
}
}
- return count > 0 ? sum / count : 0.;
+ return count > 0 ? sum / count : -1.;
}
/**
@@ -543,7 +543,7 @@ public class CostBasedAutoScaler implements
SupervisorTaskAutoScaler
{
if (metrics == null) {
return Either.error("No metrics collected");
- } else if (metrics.getAvgProcessingRate() < 0 ||
metrics.getPollIdleRatio() < 0) {
+ } else if (metrics.getAvgProcessingRate() < 0) {
return Either.error("Task metrics not available");
} else if (metrics.getCurrentTaskCount() <= 0 ||
metrics.getPartitionCount() <= 0) {
return Either.error("Supervisor metrics not available");
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
index 0eea53b16ed..12f52279226 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java
@@ -26,6 +26,8 @@ import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
@@ -179,6 +181,14 @@ public class CostBasedAutoScalerTest
Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 10, -5, 0.3)));
Assert.assertEquals(-1,
autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));
+ // Negative pollIdleRatio (metric unavailable) should still allow scaling
+ int unavailableIdleResult =
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, -1.0));
+ MatcherAssert.assertThat(
+ "Negative pollIdleRatio should not reject scaling",
+ unavailableIdleResult,
+ Matchers.greaterThanOrEqualTo(1)
+ );
+
// High idle (underutilized) - should scale down
int scaleDownResult =
autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
Assert.assertTrue("Expected scale-down when idle ratio is high (>0.6)",
scaleDownResult < 25);
@@ -195,26 +205,26 @@ public class CostBasedAutoScalerTest
@Test
public void testExtractPollIdleRatio()
{
- // Null and empty return 0
+ // Null and empty return -1 (no data)
Assert.assertEquals(
- "Null stats should yield 0 idle ratios",
- 0.,
+ "Null stats should yield -1 idle ratio",
+ -1.,
CostBasedAutoScaler.extractPollIdleRatio(null),
0.0001
);
Assert.assertEquals(
- "Empty stats should yield 0 idle ratios",
- 0.,
+ "Empty stats should yield -1 idle ratio",
+ -1.,
CostBasedAutoScaler.extractPollIdleRatio(Collections.emptyMap()),
0.0001
);
- // Missing metrics return 0
+ // Missing metrics return -1 (no data)
Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
missingMetrics.put("0", Collections.singletonMap("task-0", new
HashMap<>()));
Assert.assertEquals(
- "Missing autoscaler metrics should yield 0 idle ratios",
- 0.,
+ "Missing autoscaler metrics should yield -1 idle ratio",
+ -1.,
CostBasedAutoScaler.extractPollIdleRatio(missingMetrics),
0.0001
);
@@ -237,7 +247,7 @@ public class CostBasedAutoScalerTest
nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
Assert.assertEquals(
"Non-map task stats should be ignored",
- 0.,
+ -1.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapTask),
0.0001
);
@@ -248,8 +258,8 @@ public class CostBasedAutoScalerTest
taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new
HashMap<>());
emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
Assert.assertEquals(
- "Empty autoscaler metrics should yield 0 idle ratios",
- 0.,
+ "Empty autoscaler metrics should yield -1 idle ratio",
+ -1.,
CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler),
0.0001
);
@@ -261,7 +271,7 @@ public class CostBasedAutoScalerTest
nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
Assert.assertEquals(
"Non-map autoscaler metrics should be ignored",
- 0.,
+ -1.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler),
0.0001
);
@@ -275,7 +285,7 @@ public class CostBasedAutoScalerTest
nonNumberRatio.put("0", Collections.singletonMap("task-0", taskStats3));
Assert.assertEquals(
"Non-numeric poll idle ratio should be ignored",
- 0.,
+ -1.,
CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio),
0.0001
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]