Shekharrajak commented on code in PR #19594:
URL: https://github.com/apache/druid/pull/19594#discussion_r3430386474
##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java:
##########
@@ -215,34 +224,67 @@ public void
test_autoScaler_scalesUpAndDown_withSlowPublish()
.build(dataSource, topic);
cluster.callApi().postSupervisor(supervisor);
- // Ingest a large number of records to trigger a scale-up
- // 10k records = 100 segments to publish * 100 rows per segment
- int totalRecords = 0;
- for (int i = 0; i < 10; ++i) {
- totalRecords += publish1kRecords(topic, false);
- }
+ overlord.latchableEmitter()
+ .waitForEvent(event -> event.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.DATASOURCE,
dataSource));
- // Wait for tasks to scale up
- overlord.latchableEmitter().waitForEvent(
- event -> event.hasMetricName("task/autoScaler/updatedCount")
- .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
- .hasValueMatching(Matchers.greaterThan(1L))
- );
- Assertions.assertEquals(4, getCurrentTaskCount(supervisor.getId()));
- waitUntilPublishedRecordsAreIngested(totalRecords);
+ final AtomicBoolean keepPublishing = new AtomicBoolean(true);
+ final AtomicInteger totalRecords = new AtomicInteger();
+ final ExecutorService publisher = Executors.newSingleThreadExecutor();
+ final Future<?> publisherFuture = publisher.submit(() -> {
+ for (int i = 0; i < MAX_SCALE_UP_RECORD_BATCHES && keepPublishing.get();
++i) {
+ totalRecords.addAndGet(publish1kRecords(topic, false));
+ try {
+ TimeUnit.MILLISECONDS.sleep(SCALE_UP_PUBLISH_INTERVAL_MILLIS);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ });
- // Let the tasks work through the lag.
- // Do not publish any more records so that the idleness causes scale-down
- overlord.latchableEmitter().waitForEvent(
- event -> event.hasMetricName("task/autoScaler/updatedCount")
- .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
- .hasValueMatching(Matchers.equalTo(1L))
- );
- Assertions.assertEquals(1, getCurrentTaskCount(supervisor.getId()));
+ try {
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
+ .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisor.getId())
+ .hasValueMatching(Matchers.greaterThan(1L))
Review Comment:
wait until the cost-based autoscaler computes that the optimal task count
is greater than 1. This is only the autoscaler recommendation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]