This is an automated email from the ASF dual-hosted git repository.
yqm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f9d8ef936c6 fix flaky compaction test (#19157)
f9d8ef936c6 is described below
commit f9d8ef936c6a963868850092c3f9a2d3435d72db
Author: Cece Mei <[email protected]>
AuthorDate: Tue Mar 17 11:45:32 2026 -0700
fix flaky compaction test (#19157)
* test-flaky
* processed
* batch-ingest
* flaky
* flaky
* comment
---
.../embedded/compact/CompactionSupervisorTest.java | 145 ++++++++++-----------
1 file changed, 68 insertions(+), 77 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index 2c68c0eeab6..bc9f7236530 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -23,6 +23,8 @@ import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.CompactionEngine;
@@ -32,12 +34,12 @@ import
org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.TaskBuilder;
+import org.apache.druid.indexing.common.task.TuningConfigBuilder;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.compact.CascadingReindexingTemplate;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
-import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
-import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -55,6 +57,7 @@ import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper;
import org.apache.druid.segment.metadata.IndexingStateCache;
import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
@@ -76,6 +79,7 @@ import
org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.server.metrics.StorageMonitor;
import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedHistorical;
@@ -89,7 +93,6 @@ import org.apache.druid.testing.tools.JsonEventSerializer;
import org.apache.druid.testing.tools.StreamGenerator;
import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
import org.apache.druid.timeline.DataSegment;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
@@ -101,6 +104,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -112,7 +116,6 @@ import java.util.stream.Collectors;
*/
public class CompactionSupervisorTest extends EmbeddedClusterTestBase
{
- private final KafkaResource kafkaServer = new KafkaResource();
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer()
.setServerMemory(2_000_000_000L)
@@ -138,12 +141,7 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
"[\"org.apache.druid.query.policy.NoRestrictionPolicy\"]"
)
.addCommonProperty("druid.policy.enforcer.type", "restrictAllTables")
- .addExtensions(
- CatalogClientModule.class,
- CatalogCoordinatorModule.class,
- KafkaIndexTaskModule.class
- )
- .addResource(kafkaServer)
+ .addExtensions(CatalogClientModule.class,
CatalogCoordinatorModule.class)
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
@@ -156,7 +154,14 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
private void configureCompaction(CompactionEngine compactionEngine,
@Nullable CompactionCandidateSearchPolicy policy)
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
- o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0,
100, policy, true, compactionEngine, true))
+ o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(
+ 1.0,
+ 100,
+ policy,
+ true,
+ compactionEngine,
+ true
+ ))
);
Assertions.assertTrue(updateResponse.isSuccess());
}
@@ -236,24 +241,14 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
CompactionEngine.MSQ,
new MostFragmentedIntervalFirstPolicy(2, new
HumanReadableBytes("1KiB"), null, 80, null)
);
- KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder =
MoreResources.Supervisor.KAFKA_JSON
- .get()
- .withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", "iso", null))
-
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
- .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(1))
- .withIoConfig(ioConfig ->
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
- // Set up first topic and supervisor
- final String topic1 = IdUtils.getRandomId();
- kafkaServer.createTopicWithPartitions(topic1, 1);
- final KafkaSupervisorSpec supervisor1 =
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
- cluster.callApi().postSupervisor(supervisor1);
+ ingest1kRecords();
+ ingest1kRecords();
- final int totalRowCount = publish1kRecords(topic1, true) +
publish1kRecords(topic1, false);
- waitUntilPublishedRecordsAreIngested(totalRowCount);
-
- // Before compaction
- Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+ overlord.latchableEmitter().waitForNextEvent(event ->
event.hasMetricName("segment/metadataCache/sync/time"));
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+ Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
+ Assertions.assertEquals(2000, getTotalRowCount());
// Create a compaction config with DAY granularity
InlineSchemaDataSourceCompactionConfig dayGranularityConfig =
@@ -276,31 +271,28 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
waitForAllCompactionTasksToFinish();
pauseCompaction(dayGranularityConfig);
- Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+
+ overlord.latchableEmitter().waitForNextEvent(event ->
event.hasMetricName("segment/metadataCache/sync/time"));
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(2000, getTotalRowCount());
verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig);
- // published another 1k
- final int appendedRowCount = publish1kRecords(topic1, true);
- indexer.latchableEmitter().flush();
- waitUntilPublishedRecordsAreIngested(appendedRowCount);
+ // ingest another 2k
+ ingest1kRecords();
+ ingest1kRecords();
- // Tear down both topics and supervisors
- kafkaServer.deleteTopic(topic1);
- cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
+ overlord.latchableEmitter().waitForNextEvent(event ->
event.hasMetricName("segment/metadataCache/sync/time"));
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+ Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
+ Assertions.assertEquals(4000, getTotalRowCount());
long totalUsed = overlord.latchableEmitter().getMetricValues(
"segment/metadataCache/used/count",
Map.of(DruidMetrics.DATASOURCE, dataSource)
).stream().reduce((first, second) -> second).orElse(0).longValue();
- Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
- // 1 compacted segment + 2 appended segment
- Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
- Assertions.assertEquals(3000, getTotalRowCount());
-
runCompactionWithSpec(dayGranularityConfig);
waitForAllCompactionTasksToFinish();
@@ -310,45 +302,44 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasValueMatching(Matchers.greaterThan(totalUsed)));
- // performed minor compaction: 1 previously compacted segment + 1
incrementally compacted segment
+ // performed minor compaction: 1 previously compacted segment + 1 recently
compacted segment from minor compaction
Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
- Assertions.assertEquals(3000, getTotalRowCount());
- }
-
- protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
- {
- indexer.latchableEmitter().waitForEventAggregate(
- event -> event.hasMetricName("ingest/events/processed")
- .hasDimension(DruidMetrics.DATASOURCE, dataSource),
- agg -> agg.hasSumAtLeast(expectedRowCount)
- );
-
- final int totalEventsProcessed = indexer
- .latchableEmitter()
- .getMetricValues("ingest/events/processed",
Map.of(DruidMetrics.DATASOURCE, dataSource))
- .stream()
- .mapToInt(Number::intValue)
- .sum();
- Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
+ Assertions.assertEquals(4000, getTotalRowCount());
}
- protected int publish1kRecords(String topic, boolean useTransactions)
+ protected void ingest1kRecords()
{
final EventSerializer serializer = new
JsonEventSerializer(overlord.bindings().jsonMapper());
- final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(serializer, 100, 100);
- List<byte[]> records = streamGenerator.generateEvents(10);
-
- ArrayList<ProducerRecord<byte[], byte[]>> producerRecords = new
ArrayList<>();
- for (byte[] record : records) {
- producerRecords.add(new ProducerRecord<>(topic, record));
- }
-
- if (useTransactions) {
- kafkaServer.produceRecordsToTopic(producerRecords);
- } else {
- kafkaServer.produceRecordsWithoutTransaction(producerRecords);
- }
- return producerRecords.size();
+ final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(serializer, 500, 100);
+ List<byte[]> records = streamGenerator.generateEvents(2);
+
+ final InlineInputSource input = new InlineInputSource(
+ records.stream().map(b -> new String(b,
StandardCharsets.UTF_8)).collect(Collectors.joining("\n")));
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ input,
+ new JsonInputFormat(null, null, null, null, null),
+ true,
+ null
+ );
+ final ParallelIndexIngestionSpec indexIngestionSpec = new
ParallelIndexIngestionSpec(
+ DataSchema.builder()
+ .withDataSource(dataSource)
+ .withTimestamp(new TimestampSpec("timestamp", "iso", null))
+
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+ .build(),
+ ioConfig,
+ TuningConfigBuilder.forParallelIndexTask().build()
+ );
+ final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
+ final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+ taskId,
+ null,
+ null,
+ indexIngestionSpec,
+ null
+ );
+ cluster.callApi().submitTask(task);
+ cluster.callApi().waitForTaskToSucceed(taskId, overlord);
}
@MethodSource("getEngine")
@@ -859,7 +850,7 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
public static List<PartitionsSpec> getPartitionsSpec()
{
return List.of(
- new DimensionRangePartitionsSpec(null, 5000, List.of("page"), false),
+ new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false),
new DynamicPartitionsSpec(null, null)
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]