This is an automated email from the ASF dual-hosted git repository.
abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1ce8a309071 test: Fix row-count race in EmbeddedKafkaSupervisorTest
(#19581)
1ce8a309071 is described below
commit 1ce8a30907115327ea6df27b700d21d214d22460
Author: Andreas Maechler <[email protected]>
AuthorDate: Mon Jun 15 11:22:18 2026 -0600
test: Fix row-count race in EmbeddedKafkaSupervisorTest (#19581)
test_runKafkaSupervisor produced 10 records, waited only for the broker to
discover the datasource (the first matching metric event), then immediately
asserted SELECT COUNT(*) == 10. Under a loaded CI runner the query raced
ingestion and saw fewer than 10 rows (expected:<10> but was:<8/7/9>).
Wait for ingest/events/processed to reach the expected count before
querying,
matching the sibling test_runSupervisor_withEmptyDimension, and derive the
expected count from expectedSegments instead of a hardcoded literal.
---
.../kafka/simulate/EmbeddedKafkaSupervisorTest.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index 6f0fa52317d..9bfa4cf1607 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -125,8 +125,19 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
Assertions.assertEquals(1, taskStatuses.size());
Assertions.assertEquals(TaskState.RUNNING,
taskStatuses.get(0).getStatusCode());
+ // Wait until all produced records have been ingested before verifying the
row count,
+ // otherwise the query below can race ingestion and observe fewer than the
expected rows
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/events/processed")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+ agg -> agg.hasSumAtLeast(expectedSegments)
+ );
+
// Verify the count of rows ingested into the datasource so far
- Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+ Assertions.assertEquals(
+ String.valueOf(expectedSegments),
+ cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)
+ );
// Suspend the supervisor and verify the state
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]