This is an automated email from the ASF dual-hosted git repository.

yqm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f9d8ef936c6 fix flaky compaction test (#19157)
f9d8ef936c6 is described below

commit f9d8ef936c6a963868850092c3f9a2d3435d72db
Author: Cece Mei <[email protected]>
AuthorDate: Tue Mar 17 11:45:32 2026 -0700

    fix flaky compaction test (#19157)
    
    * test-flaky
    
    * processed
    
    * batch-ingest
    
    * flaky
    
    * flaky
    
    * comment
---
 .../embedded/compact/CompactionSupervisorTest.java | 145 ++++++++++-----------
 1 file changed, 68 insertions(+), 77 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index 2c68c0eeab6..bc9f7236530 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -23,6 +23,8 @@ import org.apache.druid.catalog.guice.CatalogClientModule;
 import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.CompactionEngine;
@@ -32,12 +34,12 @@ import 
org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.TaskBuilder;
+import org.apache.druid.indexing.common.task.TuningConfigBuilder;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
+import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import org.apache.druid.indexing.compact.CascadingReindexingTemplate;
 import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
-import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
-import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
 import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
@@ -55,6 +57,7 @@ import org.apache.druid.query.filter.NotDimFilter;
 import org.apache.druid.rpc.UpdateResponse;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.metadata.DefaultIndexingStateFingerprintMapper;
 import org.apache.druid.segment.metadata.IndexingStateCache;
 import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
@@ -76,6 +79,7 @@ import 
org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.apache.druid.server.metrics.LatchableEmitter;
 import org.apache.druid.server.metrics.StorageMonitor;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
 import org.apache.druid.testing.embedded.EmbeddedHistorical;
@@ -89,7 +93,6 @@ import org.apache.druid.testing.tools.JsonEventSerializer;
 import org.apache.druid.testing.tools.StreamGenerator;
 import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
@@ -101,6 +104,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import javax.annotation.Nullable;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -112,7 +116,6 @@ import java.util.stream.Collectors;
  */
 public class CompactionSupervisorTest extends EmbeddedClusterTestBase
 {
-  private final KafkaResource kafkaServer = new KafkaResource();
   private final EmbeddedBroker broker = new EmbeddedBroker();
   private final EmbeddedIndexer indexer = new EmbeddedIndexer()
       .setServerMemory(2_000_000_000L)
@@ -138,12 +141,7 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
                                    
"[\"org.apache.druid.query.policy.NoRestrictionPolicy\"]"
                                )
                                
.addCommonProperty("druid.policy.enforcer.type", "restrictAllTables")
-                               .addExtensions(
-                                   CatalogClientModule.class,
-                                   CatalogCoordinatorModule.class,
-                                   KafkaIndexTaskModule.class
-                               )
-                               .addResource(kafkaServer)
+                               .addExtensions(CatalogClientModule.class, 
CatalogCoordinatorModule.class)
                                .addServer(coordinator)
                                .addServer(overlord)
                                .addServer(indexer)
@@ -156,7 +154,14 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
   private void configureCompaction(CompactionEngine compactionEngine, 
@Nullable CompactionCandidateSearchPolicy policy)
   {
     final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
-        o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 
100, policy, true, compactionEngine, true))
+        o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(
+            1.0,
+            100,
+            policy,
+            true,
+            compactionEngine,
+            true
+        ))
     );
     Assertions.assertTrue(updateResponse.isSuccess());
   }
@@ -236,24 +241,14 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
         CompactionEngine.MSQ,
         new MostFragmentedIntervalFirstPolicy(2, new 
HumanReadableBytes("1KiB"), null, 80, null)
     );
-    KafkaSupervisorSpecBuilder kafkaSupervisorSpecBuilder = 
MoreResources.Supervisor.KAFKA_JSON
-        .get()
-        .withDataSchema(schema -> schema.withTimestamp(new 
TimestampSpec("timestamp", "iso", null))
-                                        
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build()))
-        .withTuningConfig(tuningConfig -> 
tuningConfig.withMaxRowsPerSegment(1))
-        .withIoConfig(ioConfig -> 
ioConfig.withConsumerProperties(kafkaServer.consumerProperties()).withTaskCount(2));
 
-    // Set up first topic and supervisor
-    final String topic1 = IdUtils.getRandomId();
-    kafkaServer.createTopicWithPartitions(topic1, 1);
-    final KafkaSupervisorSpec supervisor1 = 
kafkaSupervisorSpecBuilder.withId(topic1).build(dataSource, topic1);
-    cluster.callApi().postSupervisor(supervisor1);
+    ingest1kRecords();
+    ingest1kRecords();
 
-    final int totalRowCount = publish1kRecords(topic1, true) + 
publish1kRecords(topic1, false);
-    waitUntilPublishedRecordsAreIngested(totalRowCount);
-
-    // Before compaction
-    Assertions.assertEquals(4, getNumSegmentsWith(Granularities.HOUR));
+    overlord.latchableEmitter().waitForNextEvent(event -> 
event.hasMetricName("segment/metadataCache/sync/time"));
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+    Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
+    Assertions.assertEquals(2000, getTotalRowCount());
 
     // Create a compaction config with DAY granularity
     InlineSchemaDataSourceCompactionConfig dayGranularityConfig =
@@ -276,31 +271,28 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
     waitForAllCompactionTasksToFinish();
 
     pauseCompaction(dayGranularityConfig);
-    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
+
+    overlord.latchableEmitter().waitForNextEvent(event -> 
event.hasMetricName("segment/metadataCache/sync/time"));
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
     Assertions.assertEquals(1, getNumSegmentsWith(Granularities.DAY));
     Assertions.assertEquals(2000, getTotalRowCount());
 
     verifyCompactedSegmentsHaveFingerprints(dayGranularityConfig);
 
-    // published another 1k
-    final int appendedRowCount = publish1kRecords(topic1, true);
-    indexer.latchableEmitter().flush();
-    waitUntilPublishedRecordsAreIngested(appendedRowCount);
+    // ingest another 2k
+    ingest1kRecords();
+    ingest1kRecords();
 
-    // Tear down both topics and supervisors
-    kafkaServer.deleteTopic(topic1);
-    cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
+    overlord.latchableEmitter().waitForNextEvent(event -> 
event.hasMetricName("segment/metadataCache/sync/time"));
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+    Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
+    Assertions.assertEquals(4000, getTotalRowCount());
 
     long totalUsed = overlord.latchableEmitter().getMetricValues(
         "segment/metadataCache/used/count",
         Map.of(DruidMetrics.DATASOURCE, dataSource)
     ).stream().reduce((first, second) -> second).orElse(0).longValue();
 
-    Assertions.assertEquals(0, getNumSegmentsWith(Granularities.HOUR));
-    // 1 compacted segment + 2 appended segment
-    Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));
-    Assertions.assertEquals(3000, getTotalRowCount());
-
     runCompactionWithSpec(dayGranularityConfig);
     waitForAllCompactionTasksToFinish();
 
@@ -310,45 +302,44 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
                       .hasDimension(DruidMetrics.DATASOURCE, dataSource)
                       .hasValueMatching(Matchers.greaterThan(totalUsed)));
 
-    // performed minor compaction: 1 previously compacted segment + 1 
incrementally compacted segment
+    // performed minor compaction: 1 previously compacted segment + 1 recently 
compacted segment from minor compaction
     Assertions.assertEquals(2, getNumSegmentsWith(Granularities.DAY));
-    Assertions.assertEquals(3000, getTotalRowCount());
-  }
-
-  protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
-  {
-    indexer.latchableEmitter().waitForEventAggregate(
-        event -> event.hasMetricName("ingest/events/processed")
-                      .hasDimension(DruidMetrics.DATASOURCE, dataSource),
-        agg -> agg.hasSumAtLeast(expectedRowCount)
-    );
-
-    final int totalEventsProcessed = indexer
-        .latchableEmitter()
-        .getMetricValues("ingest/events/processed", 
Map.of(DruidMetrics.DATASOURCE, dataSource))
-        .stream()
-        .mapToInt(Number::intValue)
-        .sum();
-    Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
+    Assertions.assertEquals(4000, getTotalRowCount());
   }
 
-  protected int publish1kRecords(String topic, boolean useTransactions)
+  protected void ingest1kRecords()
   {
     final EventSerializer serializer = new 
JsonEventSerializer(overlord.bindings().jsonMapper());
-    final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(serializer, 100, 100);
-    List<byte[]> records = streamGenerator.generateEvents(10);
-
-    ArrayList<ProducerRecord<byte[], byte[]>> producerRecords = new 
ArrayList<>();
-    for (byte[] record : records) {
-      producerRecords.add(new ProducerRecord<>(topic, record));
-    }
-
-    if (useTransactions) {
-      kafkaServer.produceRecordsToTopic(producerRecords);
-    } else {
-      kafkaServer.produceRecordsWithoutTransaction(producerRecords);
-    }
-    return producerRecords.size();
+    final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(serializer, 500, 100);
+    List<byte[]> records = streamGenerator.generateEvents(2);
+
+    final InlineInputSource input = new InlineInputSource(
+        records.stream().map(b -> new String(b, 
StandardCharsets.UTF_8)).collect(Collectors.joining("\n")));
+    final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+        input,
+        new JsonInputFormat(null, null, null, null, null),
+        true,
+        null
+    );
+    final ParallelIndexIngestionSpec indexIngestionSpec = new 
ParallelIndexIngestionSpec(
+        DataSchema.builder()
+                  .withDataSource(dataSource)
+                  .withTimestamp(new TimestampSpec("timestamp", "iso", null))
+                  
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+                  .build(),
+        ioConfig,
+        TuningConfigBuilder.forParallelIndexTask().build()
+    );
+    final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
+    final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+        taskId,
+        null,
+        null,
+        indexIngestionSpec,
+        null
+    );
+    cluster.callApi().submitTask(task);
+    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
   }
 
   @MethodSource("getEngine")
@@ -859,7 +850,7 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
   public static List<PartitionsSpec> getPartitionsSpec()
   {
     return List.of(
-        new DimensionRangePartitionsSpec(null, 5000, List.of("page"), false),
+        new DimensionRangePartitionsSpec(null, 10_000, List.of("page"), false),
         new DynamicPartitionsSpec(null, null)
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to