This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 226f2772afd Emit metric dimension for minor compaction and other
refactors (#19151)
226f2772afd is described below
commit 226f2772afdc51c2dc03e4562199f97d948b33e4
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Mar 16 22:27:14 2026 +0530
Emit metric dimension for minor compaction and other refactors (#19151)
---
.../embedded/indexing/KafkaClusterMetricsTest.java | 37 ++++++-
.../compact/CompactionConfigBasedJobTemplate.java | 9 +-
.../druid/indexing/compact/CompactionJob.java | 27 ++---
.../druid/indexing/compact/CompactionJobQueue.java | 8 +-
.../task/ClientCompactionTaskQuerySerdeTest.java | 5 +-
.../client/indexing/ClientCompactionIOConfig.java | 6 +-
.../client/indexing/ClientCompactionInputSpec.java | 41 +++++++
.../indexing/ClientCompactionIntervalSpec.java | 47 +-------
.../indexing/ClientMinorCompactionInputSpec.java | 101 +++++++++++++++++
.../compaction/BaseCandidateSearchPolicy.java | 2 +-
.../CompactionCandidateSearchPolicy.java | 84 +-------------
.../server/compaction/CompactionStatusTracker.java | 2 +-
.../druid/server/compaction/Eligibility.java | 121 +++++++++++++++++++++
.../compaction/FixedIntervalOrderPolicy.java | 2 +-
.../MostFragmentedIntervalFirstPolicy.java | 6 +-
.../server/coordinator/duty/CompactSegments.java | 30 +++--
.../indexing/ClientCompactionIntervalSpecTest.java | 37 ++++---
.../MostFragmentedIntervalFirstPolicyTest.java | 20 ++--
.../coordinator/duty/CompactSegmentsTest.java | 1 -
19 files changed, 386 insertions(+), 200 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index 6ed790e6a7e..c2f07d12495 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -23,15 +23,20 @@ import
org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.emitter.kafka.KafkaEmitter;
import org.apache.druid.emitter.kafka.KafkaEmitterModule;
+import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
+import org.apache.druid.server.compaction.MostFragmentedIntervalFirstPolicy;
+import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
@@ -50,10 +55,14 @@ import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Embedded test to emit cluster metrics using a {@link KafkaEmitter} and then
@@ -96,7 +105,8 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
}
};
- indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+ indexer.setServerMemory(1_000_000_000L)
+ .addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
.addProperty("druid.worker.capacity", "10");
overlord.addProperty("druid.indexer.task.default.context",
"{\"useConcurrentLocks\": true}")
.addProperty("druid.manager.segments.useIncrementalCache",
"ifSynced")
@@ -128,6 +138,20 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
return cluster;
}
+ public static Stream<Arguments> getCompactionSupervisorTestParams()
+ {
+ return Stream.of(
+ Arguments.of(
+ CompactionEngine.NATIVE,
+ new NewestSegmentFirstPolicy(null)
+ ),
+ Arguments.of(
+ CompactionEngine.MSQ,
+ new MostFragmentedIntervalFirstPolicy(1,
HumanReadableBytes.valueOf(1), null, 80, null)
+ )
+ );
+ }
+
@Test
@Timeout(20)
public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues()
@@ -176,9 +200,13 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
}
- @Test
+ @MethodSource("getCompactionSupervisorTestParams")
+ @ParameterizedTest(name = "engine={0}, policy={1}")
@Timeout(120)
- public void
test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkipKillOfUnusedSegments()
+ public void
test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkipKillOfUnusedSegments(
+ CompactionEngine engine,
+ CompactionCandidateSearchPolicy policy
+ )
{
final int maxRowsPerSegment = 500;
final int compactedMaxRowsPerSegment = 5000;
@@ -213,7 +241,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
);
final ClusterCompactionConfig updatedCompactionConfig
- = new ClusterCompactionConfig(1.0, 10, null, true, null, null);
+ = new ClusterCompactionConfig(1.0, 10, policy, true, engine, null);
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(updatedCompactionConfig)
);
@@ -237,6 +265,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
overlord.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("task/run/time")
.hasDimension(DruidMetrics.TASK_TYPE, "compact")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasDimension(DruidMetrics.TASK_STATUS, "SUCCESS"),
agg -> agg.hasCountAtLeast(10)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
index 4eae2670626..1a8dd95179b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java
@@ -28,9 +28,9 @@ import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.server.compaction.CompactionCandidate;
-import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.CompactionSlotManager;
import org.apache.druid.server.compaction.DataSourceCompactibleSegmentIterator;
+import org.apache.druid.server.compaction.Eligibility;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.duty.CompactSegments;
@@ -94,7 +94,7 @@ public class CompactionConfigBasedJobTemplate implements
CompactionJobTemplate
// Create a job for each CompactionCandidate
while (segmentIterator.hasNext()) {
final CompactionCandidate candidate = segmentIterator.next();
- final CompactionCandidateSearchPolicy.Eligibility eligibility =
+ final Eligibility eligibility =
params.getClusterCompactionConfig()
.getCompactionPolicy()
.checkEligibilityForCompaction(candidate,
params.getLatestTaskStatus(candidate));
@@ -126,7 +126,7 @@ public class CompactionConfigBasedJobTemplate implements
CompactionJobTemplate
);
ClientCompactionTaskQuery taskPayload =
CompactSegments.createCompactionTask(
finalCandidate,
- eligibility.getMode(),
+ eligibility,
finalConfig,
engine,
indexingStateFingerprint,
@@ -138,7 +138,8 @@ public class CompactionConfigBasedJobTemplate implements
CompactionJobTemplate
finalCandidate,
CompactionSlotManager.computeSlotsRequiredForTask(taskPayload),
indexingStateFingerprint,
- compactionState
+ compactionState,
+ eligibility
)
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java
index 7611485bd6d..a64abb3b2a0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java
@@ -21,8 +21,8 @@ package org.apache.druid.indexing.compact;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.indexing.template.BatchIndexingJob;
-import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.server.compaction.CompactionCandidate;
+import org.apache.druid.server.compaction.Eligibility;
import org.apache.druid.timeline.CompactionState;
/**
@@ -34,32 +34,20 @@ public class CompactionJob extends BatchIndexingJob
private final int maxRequiredTaskSlots;
private final String targetIndexingStateFingerprint;
private final CompactionState targetIndexingState;
+ private final Eligibility eligibility;
public CompactionJob(
ClientCompactionTaskQuery task,
CompactionCandidate candidate,
int maxRequiredTaskSlots,
String targetIndexingStateFingerprint,
- CompactionState targetIndexingState
+ CompactionState targetIndexingState,
+ Eligibility eligibility
)
{
super(task, null);
this.candidate = candidate;
- this.maxRequiredTaskSlots = maxRequiredTaskSlots;
- this.targetIndexingStateFingerprint = targetIndexingStateFingerprint;
- this.targetIndexingState = targetIndexingState;
- }
-
- public CompactionJob(
- ClientSqlQuery msqQuery,
- CompactionCandidate candidate,
- int maxRequiredTaskSlots,
- String targetIndexingStateFingerprint,
- CompactionState targetIndexingState
- )
- {
- super(null, msqQuery);
- this.candidate = candidate;
+ this.eligibility = eligibility;
this.maxRequiredTaskSlots = maxRequiredTaskSlots;
this.targetIndexingStateFingerprint = targetIndexingStateFingerprint;
this.targetIndexingState = targetIndexingState;
@@ -90,6 +78,11 @@ public class CompactionJob extends BatchIndexingJob
return targetIndexingState;
}
+ public Eligibility getEligibility()
+ {
+ return eligibility;
+ }
+
@Override
public String toString()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
index 1dc29cfde6a..31333801225 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
@@ -64,7 +64,6 @@ import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
-import java.util.stream.Collectors;
/**
* Iterates over all eligible compaction jobs in order of their priority.
@@ -198,7 +197,7 @@ public class CompactionJobQueue
final List<CompactionJob> jobsToRemove = queue
.stream()
.filter(job -> job.getDataSource().equals(dataSource))
- .collect(Collectors.toList());
+ .toList();
queue.removeAll(jobsToRemove);
log.info("Removed [%d] jobs for datasource[%s] from queue.",
jobsToRemove.size(), dataSource);
@@ -221,7 +220,10 @@ public class CompactionJobQueue
while (!queue.isEmpty()) {
final CompactionJob job = queue.poll();
if (startJobIfPendingAndReady(job, pendingJobs, slotManager)) {
- runStats.add(Stats.Compaction.SUBMITTED_TASKS,
RowKey.of(Dimension.DATASOURCE, job.getDataSource()), 1);
+ final RowKey rowKey = RowKey
+ .with(Dimension.DATASOURCE, job.getDataSource())
+ .and(Dimension.DESCRIPTION, job.getEligibility().getMode().name());
+ runStats.add(Stats.Compaction.SUBMITTED_TASKS, rowKey, 1);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index aa9b006dd8a..807f67bdb09 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -193,13 +193,14 @@ public class ClientCompactionTaskQuerySerdeTest
{
Assert.assertEquals(query.getId(), task.getId());
Assert.assertEquals(query.getDataSource(), task.getDataSource());
+ Assert.assertTrue(query.getIoConfig().getInputSpec() instanceof
ClientCompactionIntervalSpec);
Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof
CompactionIntervalSpec);
Assert.assertEquals(
query.getIoConfig().getInputSpec().getInterval(),
((CompactionIntervalSpec)
task.getIoConfig().getInputSpec()).getInterval()
);
Assert.assertEquals(
- query.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds(),
+ ((ClientCompactionIntervalSpec)
query.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds(),
((CompactionIntervalSpec)
task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds()
);
Assert.assertEquals(
@@ -301,7 +302,7 @@ public class ClientCompactionTaskQuerySerdeTest
id,
"datasource",
new ClientCompactionIOConfig(
- new ClientCompactionIntervalSpec(Intervals.of("2019/2020"), null,
"testSha256OfSortedSegmentIds"), true
+ new ClientCompactionIntervalSpec(Intervals.of("2019/2020"),
"testSha256OfSortedSegmentIds"), true
),
new ClientCompactionTaskQueryTuningConfig(
100,
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
index 348d9d22da0..4585995e572 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
@@ -35,12 +35,12 @@ public class ClientCompactionIOConfig
{
private static final String TYPE = "compact";
- private final ClientCompactionIntervalSpec inputSpec;
+ private final ClientCompactionInputSpec inputSpec;
private final boolean dropExisting;
@JsonCreator
public ClientCompactionIOConfig(
- @JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec,
+ @JsonProperty("inputSpec") ClientCompactionInputSpec inputSpec,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
@@ -55,7 +55,7 @@ public class ClientCompactionIOConfig
}
@JsonProperty
- public ClientCompactionIntervalSpec getInputSpec()
+ public ClientCompactionInputSpec getInputSpec()
{
return inputSpec;
}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java
new file mode 100644
index 00000000000..9f13763612e
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.joda.time.Interval;
+
+/**
+ * Client side equivalent of {@code CompactionInputSpec}. Required since the
+ * {@code CompactionInputSpec} resides in {@code indexing-service} module.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = ClientCompactionIntervalSpec.TYPE, value =
ClientCompactionIntervalSpec.class),
+ @JsonSubTypes.Type(name = ClientMinorCompactionInputSpec.TYPE, value =
ClientMinorCompactionInputSpec.class)
+})
+public interface ClientCompactionInputSpec
+{
+ /**
+ * @return non-null Interval that this input spec operates on.
+ */
+ Interval getInterval();
+}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
index 366ad089069..f14dd55b38f 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java
@@ -20,37 +20,29 @@
package org.apache.druid.client.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import javax.annotation.Nullable;
-import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
/**
* InputSpec for {@link ClientCompactionIOConfig}.
* <p>
* Should be synchronized with
org.apache.druid.indexing.common.task.CompactionIntervalSpec and
org.apache.druid.indexing.common.task.UncompactedInputSpec.
*/
-public class ClientCompactionIntervalSpec
+public class ClientCompactionIntervalSpec implements ClientCompactionInputSpec
{
- private static final String TYPE_ALL_SEGMENTS = "interval";
- private static final String TYPE_UNCOMPACTED_SEGMENTS_ONLY = "uncompacted";
+ public static final String TYPE = "interval";
private final Interval interval;
@Nullable
- private final List<SegmentDescriptor> uncompactedSegments;
- @Nullable
private final String sha256OfSortedSegmentIds;
@JsonCreator
public ClientCompactionIntervalSpec(
@JsonProperty("interval") Interval interval,
- @JsonProperty("uncompactedSegments") @Nullable List<SegmentDescriptor>
uncompactedSegments,
@JsonProperty("sha256OfSortedSegmentIds") @Nullable String
sha256OfSortedSegmentIds
)
{
@@ -58,45 +50,16 @@ public class ClientCompactionIntervalSpec
throw new IAE("Interval[%s] is empty, must specify a nonempty interval",
interval);
}
this.interval = interval;
- if (uncompactedSegments == null) {
- // perform a full compaction
- } else if (uncompactedSegments.isEmpty()) {
- throw new IAE("Can not supply empty segments as input, please use either
null or non-empty segments.");
- } else if (interval != null) {
- List<SegmentDescriptor> segmentsNotInInterval =
- uncompactedSegments.stream().filter(s ->
!interval.contains(s.getInterval())).collect(Collectors.toList());
- if (!segmentsNotInInterval.isEmpty()) {
- throw new IAE(
- "Can not supply segments outside interval[%s], got segments[%s].",
- interval,
- segmentsNotInInterval
- );
- }
- }
- this.uncompactedSegments = uncompactedSegments;
this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds;
}
- @JsonProperty
- public String getType()
- {
- return (uncompactedSegments == null) ? TYPE_ALL_SEGMENTS :
TYPE_UNCOMPACTED_SEGMENTS_ONLY;
- }
-
+ @Override
@JsonProperty
public Interval getInterval()
{
return interval;
}
- @Nullable
- @JsonProperty
- @JsonInclude(JsonInclude.Include.NON_NULL)
- public List<SegmentDescriptor> getUncompactedSegments()
- {
- return uncompactedSegments;
- }
-
@Nullable
@JsonProperty
public String getSha256OfSortedSegmentIds()
@@ -115,14 +78,13 @@ public class ClientCompactionIntervalSpec
}
ClientCompactionIntervalSpec that = (ClientCompactionIntervalSpec) o;
return Objects.equals(interval, that.interval) &&
- Objects.equals(uncompactedSegments, that.uncompactedSegments) &&
Objects.equals(sha256OfSortedSegmentIds,
that.sha256OfSortedSegmentIds);
}
@Override
public int hashCode()
{
- return Objects.hash(interval, uncompactedSegments,
sha256OfSortedSegmentIds);
+ return Objects.hash(interval, sha256OfSortedSegmentIds);
}
@Override
@@ -130,7 +92,6 @@ public class ClientCompactionIntervalSpec
{
return "ClientCompactionIntervalSpec{" +
"interval=" + interval +
- ", uncompactedSegments=" + uncompactedSegments +
", sha256OfSortedSegmentIds='" + sha256OfSortedSegmentIds + '\'' +
'}';
}
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
new file mode 100644
index 00000000000..751ec8d462a
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.SegmentDescriptor;
+import org.joda.time.Interval;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Client-side equivalent of {@code MinorCompactionInputSpec}.
+ */
+public class ClientMinorCompactionInputSpec extends
ClientCompactionIntervalSpec
+{
+ public static final String TYPE = "uncompacted";
+
+ private final List<SegmentDescriptor> uncompactedSegments;
+
+ @JsonCreator
+ public ClientMinorCompactionInputSpec(
+ @JsonProperty("interval") Interval interval,
+ @JsonProperty("uncompactedSegments") List<SegmentDescriptor>
uncompactedSegments
+ )
+ {
+ super(interval, null);
+ if (uncompactedSegments == null || uncompactedSegments.isEmpty()) {
+ throw InvalidInput.exception("'uncompactedSegments' must be non-empty.");
+ } else if (interval != null) {
+ List<SegmentDescriptor> segmentsNotInInterval =
+ uncompactedSegments.stream().filter(s ->
!interval.contains(s.getInterval())).collect(Collectors.toList());
+ if (!segmentsNotInInterval.isEmpty()) {
+ throw new IAE(
+ "Can not supply segments outside interval[%s], got segments[%s].",
+ interval,
+ segmentsNotInInterval
+ );
+ }
+ }
+ this.uncompactedSegments = uncompactedSegments;
+ }
+
+ @JsonProperty
+ public List<SegmentDescriptor> getUncompactedSegments()
+ {
+ return uncompactedSegments;
+ }
+
+ @Override
+ public boolean equals(Object object)
+ {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ if (!super.equals(object)) {
+ return false;
+ }
+ ClientMinorCompactionInputSpec that = (ClientMinorCompactionInputSpec)
object;
+ return Objects.equals(uncompactedSegments, that.uncompactedSegments);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), uncompactedSegments);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ClientMinorCompactionInputSpec{" +
+ "interval=" + getInterval() +
+ ",uncompactedSegments=" + uncompactedSegments +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
index 2a910713262..e47d8197f8b 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java
@@ -73,7 +73,7 @@ public abstract class BaseCandidateSearchPolicy implements
CompactionCandidateSe
CompactionTaskStatus latestTaskStatus
)
{
- return Eligibility.OK;
+ return Eligibility.FULL;
}
/**
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
index 44889fb7e10..f08997a2c6a 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java
@@ -21,12 +21,8 @@ package org.apache.druid.server.compaction;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.coordinator.duty.CompactSegments;
-import javax.annotation.Nullable;
-import java.util.Objects;
-
/**
* Policy used by {@link CompactSegments} duty to pick segments for compaction.
*/
@@ -53,89 +49,11 @@ public interface CompactionCandidateSearchPolicy
* in the current iteration. A policy may implement this method to skip
* compacting intervals or segments that do not fulfil some required
criteria.
*
- * @return {@link Eligibility#OK} only if eligible.
+ * @return {@link Eligibility#FULL} only if eligible.
*/
Eligibility checkEligibilityForCompaction(
CompactionCandidate candidate,
CompactionTaskStatus latestTaskStatus
);
- /**
- * Describes the eligibility of an interval for compaction.
- */
- class Eligibility
- {
- public static final Eligibility OK = new Eligibility(true, null,
CompactionMode.ALL_SEGMENTS);
-
- private final boolean eligible;
- private final String reason;
- @Nullable
- private final CompactionMode mode;
-
- private Eligibility(boolean eligible, String reason, @Nullable
CompactionMode mode)
- {
- this.eligible = eligible;
- this.reason = reason;
- this.mode = mode;
- }
-
- public boolean isEligible()
- {
- return eligible;
- }
-
- public String getReason()
- {
- return reason;
- }
-
- /**
- * The mode of compaction (full or minor). This is non-null only when the
- * candidate is considered to be eligible for compaction by the policy.
- */
- @Nullable
- public CompactionMode getMode()
- {
- return mode;
- }
-
- public static Eligibility fail(String messageFormat, Object... args)
- {
- return new Eligibility(false, StringUtils.format(messageFormat, args),
null);
- }
-
- public static Eligibility incremental(String messageFormat, Object... args)
- {
- return new Eligibility(true, StringUtils.format(messageFormat, args),
CompactionMode.UNCOMPACTED_SEGMENTS_ONLY);
- }
-
- @Override
- public boolean equals(Object object)
- {
- if (this == object) {
- return true;
- }
- if (object == null || getClass() != object.getClass()) {
- return false;
- }
- Eligibility that = (Eligibility) object;
- return eligible == that.eligible && Objects.equals(reason, that.reason)
&& Objects.equals(mode, that.mode);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(eligible, reason, mode);
- }
-
- @Override
- public String toString()
- {
- return "Eligibility{" +
- "eligible=" + eligible +
- ", reason='" + reason +
- ", mode='" + mode + '\'' +
- '}';
- }
- }
}
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
index cdd52a4f917..a5933a9450b 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java
@@ -98,7 +98,7 @@ public class CompactionStatusTracker
}
// Skip intervals that have been filtered out by the policy
- final CompactionCandidateSearchPolicy.Eligibility eligibility
+ final Eligibility eligibility
= searchPolicy.checkEligibilityForCompaction(candidate,
lastTaskStatus);
if (eligibility.isEligible()) {
return CompactionStatus.pending("Not compacted yet");
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/Eligibility.java
b/server/src/main/java/org/apache/druid/server/compaction/Eligibility.java
new file mode 100644
index 00000000000..90743308694
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/compaction/Eligibility.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.compaction;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Describes the eligibility of an interval for compaction.
+ */
+public class Eligibility
+{
+ /**
+ * Denotes that the candidate interval is eligible for a
+ * {@link CompactionMode#ALL_SEGMENTS full} compaction.
+ */
+ public static final Eligibility FULL = new Eligibility(true, null,
CompactionMode.ALL_SEGMENTS);
+
+ private final boolean eligible;
+ private final String reason;
+ @Nullable
+ private final CompactionMode mode;
+
+ private Eligibility(boolean eligible, String reason, @Nullable
CompactionMode mode)
+ {
+ this.eligible = eligible;
+ this.reason = reason;
+ this.mode = mode;
+ }
+
+ public boolean isEligible()
+ {
+ return eligible;
+ }
+
+ public String getReason()
+ {
+ return reason;
+ }
+
+ /**
+ * The mode of compaction (full or minor). This is non-null only when the
+ * candidate is considered to be eligible for compaction by the policy.
+ *
+ * @throws DruidException if {@link #isEligible()} returns false.
+ */
+ public CompactionMode getMode()
+ {
+ if (!isEligible()) {
+ throw DruidException.defensive("Cannot get mode since interval is not
eligible for compaction");
+ }
+ return mode;
+ }
+
+ /**
+ * Denotes that the candidate interval is not eligible for compaction.
+ */
+ public static Eligibility fail(String messageFormat, Object... args)
+ {
+ return new Eligibility(false, StringUtils.format(messageFormat, args),
null);
+ }
+
+ /**
+ * @return {@code Eligibility} denoting that the candidate interval is
+ * eligible for a {@link CompactionMode#UNCOMPACTED_SEGMENTS_ONLY minor}
+ * compaction.
+ */
+ public static Eligibility minor(String messageFormat, Object... args)
+ {
+ return new Eligibility(true, StringUtils.format(messageFormat, args),
CompactionMode.UNCOMPACTED_SEGMENTS_ONLY);
+ }
+
+ @Override
+ public boolean equals(Object object)
+ {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ Eligibility that = (Eligibility) object;
+ return eligible == that.eligible && Objects.equals(reason, that.reason) &&
Objects.equals(mode, that.mode);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(eligible, reason, mode);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Eligibility{" +
+ "eligible=" + eligible +
+ ", reason='" + reason +
+ ", mode='" + mode + '\'' +
+ '}';
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
index 24a2f001afe..c4b96aceaca 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java
@@ -62,7 +62,7 @@ public class FixedIntervalOrderPolicy implements
CompactionCandidateSearchPolicy
)
{
return findIndex(candidate) < Integer.MAX_VALUE
- ? Eligibility.OK
+ ? Eligibility.FULL
: Eligibility.fail("Datasource/Interval is not in the list of
'eligibleCandidates'");
}
diff --git
a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
index 90d3ce40471..001525a2a89 100644
---
a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
+++
b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java
@@ -191,7 +191,7 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
{
final CompactionStatistics uncompacted = candidate.getUncompactedStats();
if (uncompacted == null) {
- return Eligibility.OK;
+ return Eligibility.FULL;
} else if (uncompacted.getNumSegments() < 1) {
return Eligibility.fail("No uncompacted segments in interval");
} else if (uncompacted.getNumSegments() < minUncompactedCount) {
@@ -218,13 +218,13 @@ public class MostFragmentedIntervalFirstPolicy extends
BaseCandidateSearchPolicy
(uncompacted.getTotalBytes() +
candidate.getCompactedStats().getTotalBytes())
* 100;
if (uncompactedBytesRatio < minUncompactedBytesPercentForFullCompaction) {
- return Eligibility.incremental(
+ return Eligibility.minor(
"Uncompacted bytes ratio[%.2f] is below threshold[%d]",
uncompactedBytesRatio,
minUncompactedBytesPercentForFullCompaction
);
} else {
- return Eligibility.OK;
+ return Eligibility.FULL;
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 108e74b1921..e60c512e6a5 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -25,12 +25,14 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
+import org.apache.druid.client.indexing.ClientCompactionInputSpec;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.client.indexing.ClientMinorCompactionInputSpec;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
@@ -51,6 +53,7 @@ import
org.apache.druid.server.compaction.CompactionSlotManager;
import org.apache.druid.server.compaction.CompactionSnapshotBuilder;
import org.apache.druid.server.compaction.CompactionStatus;
import org.apache.druid.server.compaction.CompactionStatusTracker;
+import org.apache.druid.server.compaction.Eligibility;
import
org.apache.druid.server.compaction.PriorityBasedCompactionSegmentIterator;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -86,6 +89,8 @@ public class CompactSegments implements CoordinatorCustomDuty
public static final String INDEXING_STATE_FINGERPRINT_KEY =
"indexingStateFingerprint";
private static final String COMPACTION_REASON_KEY = "compactionReason";
+ private static final String COMPACTION_MODE_KEY = "compactionMode";
+ private static final String COMPACTION_POLICY_RESULT =
"compactionPolicyResult";
private static final Logger LOG = new Logger(CompactSegments.class);
@@ -265,7 +270,7 @@ public class CompactSegments implements
CoordinatorCustomDuty
final ClientCompactionTaskQuery taskPayload = createCompactionTask(
entry,
- CompactionMode.ALL_SEGMENTS,
+ Eligibility.FULL,
config,
defaultEngine,
null,
@@ -295,7 +300,7 @@ public class CompactSegments implements
CoordinatorCustomDuty
*/
public static ClientCompactionTaskQuery createCompactionTask(
CompactionCandidate candidate,
- CompactionMode compactionMode,
+ Eligibility eligibility,
DataSourceCompactionConfig config,
CompactionEngine defaultEngine,
String indexingStateFingerprint,
@@ -385,7 +390,7 @@ public class CompactSegments implements
CoordinatorCustomDuty
return compactSegments(
candidate,
- compactionMode,
+ eligibility,
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(
config.getTuningConfig(),
@@ -445,7 +450,7 @@ public class CompactSegments implements
CoordinatorCustomDuty
private static ClientCompactionTaskQuery compactSegments(
CompactionCandidate entry,
- CompactionMode compactionMode,
+ Eligibility eligibility,
int compactionTaskPriority,
ClientCompactionTaskQueryTuningConfig tuningConfig,
ClientCompactionTaskGranularitySpec granularitySpec,
@@ -469,18 +474,27 @@ public class CompactSegments implements
CoordinatorCustomDuty
context.put("priority", compactionTaskPriority);
- final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX,
ClientCompactionTaskQuery.TYPE, dataSource, null);
- final ClientCompactionIntervalSpec inputSpec;
+ final CompactionMode compactionMode = eligibility.getMode();
+ context.put(COMPACTION_MODE_KEY, compactionMode);
+ if (eligibility.getReason() != null) {
+ context.put(COMPACTION_POLICY_RESULT, eligibility.getReason());
+ }
+
+ String taskIdPrefix = compactionMode ==
CompactionMode.UNCOMPACTED_SEGMENTS_ONLY
+ ? TASK_ID_PREFIX + "-minor"
+ : TASK_ID_PREFIX;
+ final String taskId = IdUtils.newTaskId(taskIdPrefix,
ClientCompactionTaskQuery.TYPE, dataSource, null);
+ final ClientCompactionInputSpec inputSpec;
switch (compactionMode) {
case ALL_SEGMENTS:
- inputSpec = new
ClientCompactionIntervalSpec(entry.getCompactionInterval(), null, null);
+ inputSpec = new
ClientCompactionIntervalSpec(entry.getCompactionInterval(), null);
break;
case UNCOMPACTED_SEGMENTS_ONLY:
List<SegmentDescriptor> uncompacted = entry.getUncompactedSegments()
.stream()
.map(DataSegment::toDescriptor)
.toList();
- inputSpec = new
ClientCompactionIntervalSpec(entry.getCompactionInterval(), uncompacted, null);
+ inputSpec = new
ClientMinorCompactionInputSpec(entry.getCompactionInterval(), uncompacted);
break;
default:
throw DruidException.defensive("unexpected compaction mode[%s]",
compactionMode);
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
index 0ab3e059361..7e116a271eb 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java
@@ -21,9 +21,9 @@ package org.apache.druid.client.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.SegmentDescriptor;
@@ -121,19 +121,30 @@ public class ClientCompactionIntervalSpecTest
}
@Test
- public void
testClientCompactionIntervalSpec_throwsException_whenEmptySegmentsList()
+ public void
testClientMinorCompactionInputSpec_throwsException_whenEmptySegmentsList()
{
Interval interval = Intervals.of("2015-04-11/2015-04-12");
- List<SegmentDescriptor> emptySegments = List.of();
-
Assert.assertThrows(
- IAE.class,
- () -> new ClientCompactionIntervalSpec(interval, emptySegments, null)
+ DruidException.class,
+ () -> new ClientMinorCompactionInputSpec(interval, null)
);
}
@Test
public void testClientCompactionIntervalSpec_serde() throws Exception
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ Interval interval = Intervals.of("2015-04-11/2015-04-12");
+
+ // Test without uncompactedSegments (full compaction)
+ ClientCompactionIntervalSpec withoutSegments = new
ClientCompactionIntervalSpec(interval, null);
+ String json2 = mapper.writeValueAsString(withoutSegments);
+ ClientCompactionIntervalSpec deserialized2 = mapper.readValue(json2,
ClientCompactionIntervalSpec.class);
+ Assert.assertEquals(withoutSegments, deserialized2);
+ }
+
+ @Test
+ public void testClientMinorCompactionInputSpec_serde() throws Exception
{
ObjectMapper mapper = new DefaultObjectMapper();
Interval interval = Intervals.of("2015-04-11/2015-04-12");
@@ -142,17 +153,11 @@ public class ClientCompactionIntervalSpecTest
);
// Test with uncompactedSegments (minor compaction)
- ClientCompactionIntervalSpec withSegments = new
ClientCompactionIntervalSpec(interval, segments, "sha256hash");
+ ClientCompactionInputSpec withSegments = new
ClientMinorCompactionInputSpec(interval, segments);
String json1 = mapper.writeValueAsString(withSegments);
- ClientCompactionIntervalSpec deserialized1 = mapper.readValue(json1,
ClientCompactionIntervalSpec.class);
+ ClientCompactionInputSpec deserialized1 = mapper.readValue(json1,
ClientCompactionIntervalSpec.class);
+ Assert.assertTrue(deserialized1 instanceof ClientMinorCompactionInputSpec);
Assert.assertEquals(withSegments, deserialized1);
- Assert.assertEquals(segments, deserialized1.getUncompactedSegments());
-
- // Test without uncompactedSegments (full compaction)
- ClientCompactionIntervalSpec withoutSegments = new
ClientCompactionIntervalSpec(interval, null, null);
- String json2 = mapper.writeValueAsString(withoutSegments);
- ClientCompactionIntervalSpec deserialized2 = mapper.readValue(json2,
ClientCompactionIntervalSpec.class);
- Assert.assertEquals(withoutSegments, deserialized2);
- Assert.assertNull(deserialized2.getUncompactedSegments());
+ Assert.assertEquals(segments, ((ClientMinorCompactionInputSpec)
deserialized1).getUncompactedSegments());
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
index f242cec1680..53e027a93d9 100644
---
a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
+++
b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java
@@ -60,13 +60,13 @@ public class MostFragmentedIntervalFirstPolicyTest
);
Assertions.assertEquals(
- CompactionCandidateSearchPolicy.Eligibility.fail(
+ Eligibility.fail(
"Uncompacted segments[1] in interval must be at least [10,000]"
),
policy.checkEligibilityForCompaction(createCandidate(1, 100L), null)
);
Assertions.assertEquals(
- CompactionCandidateSearchPolicy.Eligibility.OK,
+ Eligibility.FULL,
policy.checkEligibilityForCompaction(createCandidate(10_001, 100L),
null)
);
}
@@ -84,13 +84,13 @@ public class MostFragmentedIntervalFirstPolicyTest
);
Assertions.assertEquals(
- CompactionCandidateSearchPolicy.Eligibility.fail(
+ Eligibility.fail(
"Uncompacted bytes[100] in interval must be at least [10,000]"
),
policy.checkEligibilityForCompaction(createCandidate(1, 100L), null)
);
Assertions.assertEquals(
- CompactionCandidateSearchPolicy.Eligibility.OK,
+ Eligibility.FULL,
policy.checkEligibilityForCompaction(createCandidate(100, 10_000L),
null)
);
}
@@ -108,13 +108,13 @@ public class MostFragmentedIntervalFirstPolicyTest
);
Assertions.assertEquals(
- CompactionCandidateSearchPolicy.Eligibility.fail(
+ Eligibility.fail(
"Average size[10,000] of uncompacted segments in interval must be
at most [100]"
),
policy.checkEligibilityForCompaction(createCandidate(1, 10_000L), null)
);
Assertions.assertEquals(
- CompactionCandidateSearchPolicy.Eligibility.OK,
+ Eligibility.FULL,
policy.checkEligibilityForCompaction(createCandidate(1, 100L), null)
);
}
@@ -256,7 +256,7 @@ public class MostFragmentedIntervalFirstPolicyTest
final CompactionStatus status = CompactionStatus.pending(compacted,
uncompacted, List.of(SEGMENT), "");
final CompactionCandidate candidate =
CompactionCandidate.from(List.of(SEGMENT), null, status);
- final CompactionCandidateSearchPolicy.Eligibility eligibility =
+ final Eligibility eligibility =
policy.checkEligibilityForCompaction(candidate, null);
Assertions.assertEquals(CompactionMode.UNCOMPACTED_SEGMENTS_ONLY,
eligibility.getMode());
@@ -283,7 +283,7 @@ public class MostFragmentedIntervalFirstPolicyTest
""
);
final CompactionCandidate candidate =
CompactionCandidate.from(List.of(SEGMENT), null, status);
- final CompactionCandidateSearchPolicy.Eligibility eligibility =
+ final Eligibility eligibility =
policy.checkEligibilityForCompaction(candidate, null);
Assertions.assertEquals(CompactionMode.ALL_SEGMENTS,
eligibility.getMode());
@@ -311,7 +311,7 @@ public class MostFragmentedIntervalFirstPolicyTest
""
);
final CompactionCandidate candidate =
CompactionCandidate.from(List.of(SEGMENT), null, status);
- final CompactionCandidateSearchPolicy.Eligibility eligibility =
+ final Eligibility eligibility =
policy.checkEligibilityForCompaction(candidate, null);
Assertions.assertEquals(CompactionMode.ALL_SEGMENTS,
eligibility.getMode());
@@ -336,7 +336,7 @@ public class MostFragmentedIntervalFirstPolicyTest
private void verifyCandidateIsEligible(CompactionCandidate candidate,
MostFragmentedIntervalFirstPolicy policy)
{
Assertions.assertEquals(
- CompactionCandidateSearchPolicy.Eligibility.OK,
+ Eligibility.FULL,
policy.checkEligibilityForCompaction(candidate, null)
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 1e8a8f6ea0a..b7f01cf57d2 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -1106,7 +1106,6 @@ public class CompactSegmentsTest
new ClientCompactionIOConfig(
new ClientCompactionIntervalSpec(
Intervals.of("2000/2099"),
- null,
"testSha256OfSortedSegmentIds"
),
null
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]