FrankChen021 commented on code in PR #19594:
URL: https://github.com/apache/druid/pull/19594#discussion_r3435734643
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -215,34 +224,67 @@ public void
test_autoScaler_scalesUpAndDown_withSlowPublish()
.build(dataSource, topic);
cluster.callApi().postSupervisor(supervisor);
- // Ingest a large number of records to trigger a scale-up
- // 10k records = 100 segments to publish * 100 rows per segment
- int totalRecords = 0;
- for (int i = 0; i < 10; ++i) {
- totalRecords += publish1kRecords(topic, false);
- }
+ overlord.latchableEmitter()
+ .waitForEvent(event -> event.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.DATASOURCE,
dataSource));
- // Wait for tasks to scale up
- overlord.latchableEmitter().waitForEvent(
- event -> event.hasMetricName("task/autoScaler/updatedCount")
- .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
- .hasValueMatching(Matchers.greaterThan(1L))
- );
- Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
- waitUntilPublishedRecordsAreIngested(totalRecords);
+ final AtomicBoolean keepPublishing = new AtomicBoolean(true);
+ final AtomicInteger totalRecords = new AtomicInteger();
+ final ExecutorService publisher = Executors.newSingleThreadExecutor();
+ final Future<?> publisherFuture = publisher.submit(() -> {
+ for (int i = 0; i < MAX_SCALE_UP_RECORD_BATCHES && keepPublishing.get();
++i) {
+ totalRecords.addAndGet(publish1kRecords(topic, false));
+ try {
+ TimeUnit.MILLISECONDS.sleep(SCALE_UP_PUBLISH_INTERVAL_MILLIS);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ });
- // Let the tasks work through the lag.
- // Do not publish any more records so that the idleness causes scale-down
- overlord.latchableEmitter().waitForEvent(
- event -> event.hasMetricName("task/autoScaler/updatedCount")
- .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
- .hasValueMatching(Matchers.equalTo(1L))
- );
- Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
+ try {
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+ .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
+ .hasValueMatching(Matchers.greaterThan(1L))
+ );
+
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/autoScaler/updatedCount")
+ .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
+ .hasValueMatching(Matchers.greaterThan(1L))
+ );
+ keepPublishing.set(false);
+ publisherFuture.get(30, TimeUnit.SECONDS);
+ ITRetryUtil.retryUntilTrue(
Review Comment:
[P2] Avoid unbounded-length retries in this integration test
ITRetryUtil.retryUntilTrue uses the default 240 retries with 5 seconds
between attempts, so this newly added check can add up to 20 minutes to a
failing run, and the method itself has no @Timeout. Since this test already
uses 600-second latch waits, a regression can now occupy CI for much longer
before failing. Use a bounded retry tuned for this test or add an explicit
method timeout.
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -215,34 +224,67 @@ public void
test_autoScaler_scalesUpAndDown_withSlowPublish()
.build(dataSource, topic);
cluster.callApi().postSupervisor(supervisor);
- // Ingest a large number of records to trigger a scale-up
- // 10k records = 100 segments to publish * 100 rows per segment
- int totalRecords = 0;
- for (int i = 0; i < 10; ++i) {
- totalRecords += publish1kRecords(topic, false);
- }
+ overlord.latchableEmitter()
+ .waitForEvent(event -> event.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.DATASOURCE,
dataSource));
- // Wait for tasks to scale up
- overlord.latchableEmitter().waitForEvent(
- event -> event.hasMetricName("task/autoScaler/updatedCount")
- .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
- .hasValueMatching(Matchers.greaterThan(1L))
- );
- Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
- waitUntilPublishedRecordsAreIngested(totalRecords);
+ final AtomicBoolean keepPublishing = new AtomicBoolean(true);
+ final AtomicInteger totalRecords = new AtomicInteger();
+ final ExecutorService publisher = Executors.newSingleThreadExecutor();
+ final Future<?> publisherFuture = publisher.submit(() -> {
+ for (int i = 0; i < MAX_SCALE_UP_RECORD_BATCHES && keepPublishing.get();
++i) {
+ totalRecords.addAndGet(publish1kRecords(topic, false));
+ try {
+ TimeUnit.MILLISECONDS.sleep(SCALE_UP_PUBLISH_INTERVAL_MILLIS);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ });
- // Let the tasks work through the lag.
- // Do not publish any more records so that the idleness causes scale-down
- overlord.latchableEmitter().waitForEvent(
- event -> event.hasMetricName("task/autoScaler/updatedCount")
- .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
- .hasValueMatching(Matchers.equalTo(1L))
- );
- Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
+ try {
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+ .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
+ .hasValueMatching(Matchers.greaterThan(1L))
+ );
+
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/autoScaler/updatedCount")
+ .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
+ .hasValueMatching(Matchers.greaterThan(1L))
+ );
+ keepPublishing.set(false);
+ publisherFuture.get(30, TimeUnit.SECONDS);
Review Comment:
[P2] Surface publisher failures before waiting for scaler metrics
The background publisher future is only observed after both autoscaler
metric waits succeed. If publish1kRecords throws before creating enough lag,
the test now waits for scaler metrics that may never arrive and then exits
through finally without ever calling publisherFuture.get(), masking the real
producer failure. The previous synchronous publish path failed immediately.
Check the future while waiting, or observe it in the failure path so producer
exceptions fail the test directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]