This is an automated email from the ASF dual-hosted git repository.
yqm pushed a commit to branch 37.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/37.0.0 by this push:
new 7de798de39f fix: Fix how supervisor validation works for cascading
reindexing templates in compaction supervisors (#19223) (#19334)
7de798de39f is described below
commit 7de798de39fb001615fd4048ee3d536292a97d1d
Author: Lucas Capistrant <[email protected]>
AuthorDate: Thu Apr 16 19:11:07 2026 -0500
fix: Fix how supervisor validation works for cascading reindexing templates
in compaction supervisors (#19223) (#19334)
* Fix how supervisor validation works for cascading reindexing templates in
compaction supervisors
* some refactoring of compaction spec validation
* Add a validation check again before creating jobs
* Make compaction spec validation more dynamic and fresh compared to
previous approach
* Fixup a place validation wasn't beging called after removing it from
compaction config constructor
(cherry picked from commit 0e91c6f8dd3f5f3913585c022a5d8b1ac7ce9d01)
---
.../compact/CascadingReindexingTemplate.java | 50 ++++++++++
.../druid/indexing/compact/CompactionJobQueue.java | 11 +++
.../indexing/compact/CompactionSupervisor.java | 32 ++++---
.../indexing/compact/CompactionSupervisorSpec.java | 18 +++-
.../compact/OverlordCompactionScheduler.java | 6 +-
.../overlord/http/OverlordCompactionResource.java | 7 ++
.../overlord/supervisor/SupervisorResource.java | 10 ++
.../compact/CascadingReindexingTemplateTest.java | 102 +++++++++++++++++++++
.../compact/CompactionSupervisorSpecTest.java | 27 +++++-
.../compact/OverlordCompactionSchedulerTest.java | 23 +++++
.../supervisor/SupervisorResourceTest.java | 33 +++++++
.../overlord/supervisor/SupervisorSpec.java | 12 +++
.../CatalogDataSourceCompactionConfig.java | 7 ++
.../coordinator/DataSourceCompactionConfig.java | 5 +
.../InlineSchemaDataSourceCompactionConfig.java | 7 ++
15 files changed, 326 insertions(+), 24 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
index 256f8f3909b..7fcb0fee22a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.compact;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.CompactionEngine;
@@ -38,6 +39,8 @@ import
org.apache.druid.server.compaction.IntervalPartitioningInfo;
import org.apache.druid.server.compaction.ReindexingPartitioningRule;
import org.apache.druid.server.compaction.ReindexingRule;
import org.apache.druid.server.compaction.ReindexingRuleProvider;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
@@ -249,6 +252,53 @@ public class CascadingReindexingTemplate implements
CompactionJobTemplate, DataS
return tuningConfig;
}
+ /**
+ * Validates this template using a subset of the standard MSQ compaction
checks.
+ * The standard path in {@link
ClientCompactionRunnerInfo#validateCompactionConfig}
+ * assumes partitioning is controlled by {@code
tuningConfig.partitionsSpec}, but
+ * this template forbids that field and uses {@code defaultPartitionsSpec}
instead.
+ *
+ * <p>Checks performed:
+ * <ul>
+ * <li>partitionsSpec type and options — validated against {@code
defaultPartitionsSpec}.
+ * Range partition dimension type checking passes {@code null} for
dimensionSchemas
+ * since those are not known at template level.</li>
+ * <li>maxNumTasks >= 2 in taskContext.</li>
+ * </ul>
+ *
+ * <p>Standard MSQ checks skipped (not applicable at template level):
+ * <ul>
+ * <li>rollup vs metricsSpec consistency — {@code granularitySpec} is
always null on the
+ * template; rollup is configured per-rule at job generation time.</li>
+ * <li>metricsSpec aggregator combining factory — there is no metricsSpec
on the template;
+ * metrics come from per-rule data schema rules resolved at job
generation time.</li>
+ * </ul>
+ *
+ * <p>Per-rule overrides (partitionsSpec, metricsSpec, rollup) are validated
at task
+ * runtime by {@code MSQCompactionRunner.validateCompactionTask()} once the
full config
+ * is resolved against actual data schemas.
+ */
+ @Override
+ public CompactionConfigValidationResult validate(ClusterCompactionConfig
clusterCompactionConfig)
+ {
+ List<CompactionConfigValidationResult> results = new ArrayList<>();
+
+ results.add(ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
+ this.getDefaultPartitionsSpec(),
+ null,
+ this.getDefaultPartitioningVirtualColumns() != null
+ ? this.getDefaultPartitioningVirtualColumns()
+ : VirtualColumns.EMPTY
+ ));
+
+
results.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(this.getTaskContext()));
+
+ return results.stream()
+ .filter(result -> !result.isValid())
+ .findFirst()
+ .orElse(CompactionConfigValidationResult.success());
+ }
+
/**
* Checks if the given interval's end time is after the specified boundary.
* Used to determine if intervals should be skipped based on skip offset
configuration.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
index 31333801225..c6ce5f0a6c1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
@@ -161,6 +161,17 @@ public class CompactionJobQueue
final String supervisorId = supervisor.getSpec().getId();
try {
if (supervisor.shouldCreateJobs() &&
!activeSupervisors.contains(supervisorId)) {
+ final CompactionConfigValidationResult validationResult =
+ supervisor.getSpec().getSpec().validate(clusterCompactionConfig);
+ if (!validationResult.isValid()) {
+ log.warn(
+ "Skipping job creation for invalid supervisor[%s]: %s",
+ supervisorId,
+ validationResult.getReason()
+ );
+ return;
+ }
+
// Queue fresh jobs
final List<CompactionJob> jobs = supervisor.createJobs(source,
jobParams);
jobs.forEach(job -> snapshotBuilder.addToPending(job.getCandidate()));
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
index f9ba0eee6db..0bd020bf8a2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import javax.annotation.Nullable;
import java.util.List;
@@ -84,12 +85,17 @@ public class CompactionSupervisor implements Supervisor
if (supervisorSpec.isSuspended()) {
log.info("Suspending compaction for dataSource[%s].", dataSource);
scheduler.stopCompaction(dataSource);
- } else if (!supervisorSpec.getValidationResult().isValid()) {
+ return;
+ }
+
+ final CompactionConfigValidationResult validationResult =
+ scheduler.validateCompactionConfig(supervisorSpec.getSpec());
+ if (!validationResult.isValid()) {
log.warn(
"Cannot start compaction supervisor for datasource[%s] since the
compaction supervisor spec is invalid. "
+ "Reason[%s].",
dataSource,
- supervisorSpec.getValidationResult().getReason()
+ validationResult.getReason()
);
} else {
log.info("Starting compaction for dataSource[%s].", dataSource);
@@ -112,15 +118,19 @@ public class CompactionSupervisor implements Supervisor
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED)
.build();
- } else if (!supervisorSpec.getValidationResult().isValid()) {
- snapshot = AutoCompactionSnapshot.builder(dataSource)
- .withMessage(StringUtils.format(
- "Compaction supervisor spec is
invalid. Reason[%s].",
-
supervisorSpec.getValidationResult().getReason()
- ))
- .build();
} else {
- snapshot = scheduler.getCompactionSnapshot(dataSource);
+ final CompactionConfigValidationResult validationResult =
+ scheduler.validateCompactionConfig(supervisorSpec.getSpec());
+ if (!validationResult.isValid()) {
+ snapshot = AutoCompactionSnapshot.builder(dataSource)
+ .withMessage(StringUtils.format(
+ "Compaction supervisor spec is
invalid. Reason[%s].",
+ validationResult.getReason()
+ ))
+ .build();
+ } else {
+ snapshot = scheduler.getCompactionSnapshot(dataSource);
+ }
}
return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(),
snapshot);
@@ -133,7 +143,7 @@ public class CompactionSupervisor implements Supervisor
return State.SCHEDULER_STOPPED;
} else if (supervisorSpec.isSuspended()) {
return State.SUSPENDED;
- } else if (!supervisorSpec.getValidationResult().isValid()) {
+ } else if
(!scheduler.validateCompactionConfig(supervisorSpec.getSpec()).isValid()) {
return State.INVALID_SPEC;
} else {
return State.RUNNING;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
index d9002655f74..7313b676f34 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -43,7 +44,6 @@ public class CompactionSupervisorSpec implements
SupervisorSpec
private final boolean suspended;
private final DataSourceCompactionConfig spec;
private final CompactionScheduler scheduler;
- private final CompactionConfigValidationResult validationResult;
public static String getSupervisorIdForDatasource(String dataSource)
{
@@ -60,7 +60,6 @@ public class CompactionSupervisorSpec implements
SupervisorSpec
this.spec = spec;
this.suspended = Configs.valueOrDefault(suspended, false);
this.scheduler = scheduler;
- this.validationResult = scheduler == null ? null :
scheduler.validateCompactionConfig(spec);
}
@JsonProperty
@@ -82,9 +81,20 @@ public class CompactionSupervisorSpec implements
SupervisorSpec
return getSupervisorIdForDatasource(spec.getDataSource());
}
- public CompactionConfigValidationResult getValidationResult()
+ /**
+ * Validates the underlying {@link DataSourceCompactionConfig} against the
+ * current cluster compaction config. Throws a {@link DruidException} of
+ * category {@link DruidException.Category#INVALID_INPUT} if invalid.
+ */
+ @Override
+ public void validateSpec()
{
- return validationResult;
+ final CompactionConfigValidationResult result =
scheduler.validateCompactionConfig(spec);
+ if (!result.isValid()) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build("Invalid compaction supervisor spec: %s",
result.getReason());
+ }
}
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java
index cf368473e1b..c2a0dabb65a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java
@@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
import com.google.inject.Inject;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.broker.BrokerClient;
-import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -265,10 +264,7 @@ public class OverlordCompactionScheduler implements
CompactionScheduler
if (compactionConfig == null) {
return CompactionConfigValidationResult.failure("Cannot be null");
} else {
- return ClientCompactionRunnerInfo.validateCompactionConfig(
- compactionConfig,
- getLatestClusterConfig().getEngine()
- );
+ return compactionConfig.validate(getLatestClusterConfig());
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
index e2d08fa97c4..7d90bf1047b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
@@ -28,6 +28,7 @@ import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
import org.apache.druid.error.InternalServerError;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.NotFound;
@@ -262,6 +263,12 @@ public class OverlordCompactionResource
if (scheduler.isEnabled()) {
final CompactionSupervisorSpec spec = new
CompactionSupervisorSpec(newConfig, false, scheduler);
+ try {
+ spec.validateSpec();
+ }
+ catch (DruidException e) {
+ return invalidInputResponse(e.getMessage());
+ }
return ServletResourceUtils.buildUpdateResponse(
() -> supervisorManager.updateCompactionSupervisor(spec, request)
);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 6b145be07e4..78aa16b37cb 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -35,6 +35,7 @@ import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditManager;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
import
org.apache.druid.indexing.overlord.http.security.SupervisorResourceFilter;
@@ -155,6 +156,15 @@ public class SupervisorResource
if (!authResult.allowAccessWithNoRestriction()) {
throw new ForbiddenException(authResult.getErrorMessage());
}
+ try {
+ spec.validateSpec();
+ }
+ catch (DruidException e) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(ImmutableMap.of("error", e.getMessage()))
+ .build();
+ }
+
if (Boolean.TRUE.equals(skipRestartIfUnmodified) &&
!manager.shouldUpdateSupervisor(spec)) {
return Response.ok(ImmutableMap.of("id", spec.getId())).build();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
index 479ad183193..98e99810805 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
@@ -22,10 +22,12 @@ package org.apache.druid.indexing.compact;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.indexing.ClientMSQContext;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.SupervisorModule;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -43,6 +45,8 @@ import
org.apache.druid.server.compaction.ReindexingDataSchemaRule;
import org.apache.druid.server.compaction.ReindexingPartitioningRule;
import org.apache.druid.server.compaction.ReindexingRule;
import org.apache.druid.server.compaction.ReindexingRuleProvider;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
@@ -66,6 +70,8 @@ import java.util.Map;
public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
{
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+ private static final ClusterCompactionConfig CLUSTER_CONFIG =
+ new ClusterCompactionConfig(null, null, null, null, null, null);
@BeforeEach
public void setUp()
@@ -1763,6 +1769,102 @@ public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
);
}
+ @Test
+ public void test_validate_returnsValid_withDynamicPartitionsSpec()
+ {
+ final CascadingReindexingTemplate template = new
CascadingReindexingTemplate(
+ "testDataSource",
+ null,
+ null,
+ InlineReindexingRuleProvider.builder().build(),
+ null,
+ null,
+ null,
+ Granularities.DAY,
+ new DynamicPartitionsSpec(null, null),
+ null,
+ null
+ );
+
+ CompactionConfigValidationResult result =
template.validate(CLUSTER_CONFIG);
+ Assertions.assertTrue(result.isValid());
+ }
+
+ @Test
+ public void test_validate_returnsInvalid_withHashedPartitionsSpec()
+ {
+ final CascadingReindexingTemplate template = new
CascadingReindexingTemplate(
+ "testDataSource",
+ null,
+ null,
+ InlineReindexingRuleProvider.builder().build(),
+ null,
+ null,
+ null,
+ Granularities.DAY,
+ new HashedPartitionsSpec(null, 3, null),
+ null,
+ null
+ );
+
+ CompactionConfigValidationResult result =
template.validate(CLUSTER_CONFIG);
+ Assertions.assertFalse(result.isValid());
+ Assertions.assertEquals(
+ "MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either
'dynamic' or 'range'",
+ result.getReason()
+ );
+ }
+
+ @Test
+ public void test_validate_returnsInvalid_withMaxTotalRows()
+ {
+ final CascadingReindexingTemplate template = new
CascadingReindexingTemplate(
+ "testDataSource",
+ null,
+ null,
+ InlineReindexingRuleProvider.builder().build(),
+ null,
+ null,
+ null,
+ Granularities.DAY,
+ new DynamicPartitionsSpec(null, 1000L),
+ null,
+ null
+ );
+
+ CompactionConfigValidationResult result =
template.validate(CLUSTER_CONFIG);
+ Assertions.assertFalse(result.isValid());
+ Assertions.assertEquals(
+ "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning",
+ result.getReason()
+ );
+ }
+
+ @Test
+ public void test_validate_returnsInvalid_withOneMaxNumTasks()
+ {
+ final CascadingReindexingTemplate template = new
CascadingReindexingTemplate(
+ "testDataSource",
+ null,
+ null,
+ InlineReindexingRuleProvider.builder().build(),
+ Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1),
+ null,
+ null,
+ Granularities.DAY,
+ new DynamicPartitionsSpec(null, null),
+ null,
+ null
+ );
+
+ CompactionConfigValidationResult result =
template.validate(CLUSTER_CONFIG);
+ Assertions.assertFalse(result.isValid());
+ Assertions.assertEquals(
+ "MSQ: Context maxNumTasks[1] must be at least 2 (1 controller + 1
worker)",
+ result.getReason()
+ );
+ }
+
private static class TestCascadingReindexingTemplate extends
CascadingReindexingTemplate
{
// Capture intervals that were processed for assertions
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
index b008d74829a..87c973a773f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
@@ -97,13 +97,32 @@ public class CompactionSupervisorSpecTest
}
@Test
- public void testGetValidationResultForInvalidSpec()
+ public void testValidateSpecThrowsForInvalidConfig()
{
Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any()))
.thenReturn(CompactionConfigValidationResult.failure("bad spec"));
- CompactionConfigValidationResult validationResult = new
CompactionSupervisorSpec(null, false, scheduler).getValidationResult();
- Assert.assertFalse(validationResult.isValid());
- Assert.assertEquals("bad spec", validationResult.getReason());
+ final CompactionSupervisorSpec invalidSpec = new CompactionSupervisorSpec(
+
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
+ false,
+ scheduler
+ );
+ final DruidException thrown = Assert.assertThrows(
+ DruidException.class,
+ invalidSpec::validateSpec
+ );
+ Assert.assertEquals(DruidException.Category.INVALID_INPUT,
thrown.getCategory());
+ Assert.assertEquals("Invalid compaction supervisor spec: bad spec",
thrown.getMessage());
+ }
+
+ @Test
+ public void testValidateSpecSucceedsForValidConfig()
+ {
+ final CompactionSupervisorSpec validSpec = new CompactionSupervisorSpec(
+
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
+ false,
+ scheduler
+ );
+ validSpec.validateSpec();
}
@Test
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
index 5c1200110f8..3b9597cd79f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.guice.SupervisorModule;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -70,6 +71,7 @@ import
org.apache.druid.server.compaction.CompactionSimulateResult;
import org.apache.druid.server.compaction.CompactionStatistics;
import org.apache.druid.server.compaction.CompactionStatus;
import org.apache.druid.server.compaction.CompactionStatusTracker;
+import org.apache.druid.server.compaction.InlineReindexingRuleProvider;
import org.apache.druid.server.compaction.Table;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
@@ -383,6 +385,27 @@ public class OverlordCompactionSchedulerTest
);
}
+ @Test
+ public void
test_validateCompactionConfig_delegatesToCascadingReindexingTemplate()
+ {
+ final CascadingReindexingTemplate template = new
CascadingReindexingTemplate(
+ dataSource,
+ null,
+ null,
+ InlineReindexingRuleProvider.builder().build(),
+ null,
+ null,
+ null,
+ Granularities.DAY,
+ new DynamicPartitionsSpec(null, null),
+ null,
+ null
+ );
+
+ final CompactionConfigValidationResult result =
scheduler.validateCompactionConfig(template);
+ Assert.assertTrue(result.isValid());
+ }
+
@Test
public void test_startCompaction_enablesTaskSubmission_forDatasource()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 6c099a53b3a..9282e49d19b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -190,6 +190,39 @@ public class SupervisorResourceTest extends EasyMockSupport
Assert.assertEquals(503, response.getStatus());
}
+ @Test
+ public void testSpecPost_returnsBadRequest_whenValidateSpecThrows()
+ {
+ SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null)
+ {
+ @Override
+ public List<String> getDataSources()
+ {
+ return Collections.singletonList("datasource1");
+ }
+
+ @Override
+ public void validateSpec()
+ {
+ throw org.apache.druid.error.DruidException
+ .forPersona(org.apache.druid.error.DruidException.Persona.USER)
+
.ofCategory(org.apache.druid.error.DruidException.Category.INVALID_INPUT)
+ .build("nope");
+ }
+ };
+
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ setupMockRequest();
+ EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+ replayAll();
+
+ Response response = supervisorResource.specPost(spec, false, request);
+ verifyAll();
+
+ Assert.assertEquals(400, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("error", "nope"),
response.getEntity());
+ }
+
@Test
public void testSpecPost_whenSkipRestartIfUnmodifiedIsTrue()
{
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index fe9dd8f3942..bf50c7cf48c 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -102,6 +102,18 @@ public interface SupervisorSpec
*/
String getSource();
+ /**
+ * Validates this supervisor spec against current system state at submission
time.
+ * Implementations should throw a {@link DruidException} with category
+ * {@link DruidException.Category#INVALID_INPUT} when the spec should be
rejected.
+ * <p>
+ * The default implementation does no validation.
+ */
+ default void validateSpec() throws DruidException
+ {
+ // The default implementation does not do any validation checks.
+ }
+
/**
* Checks if a spec can be replaced with a proposed spec (proposesSpec).
* <p>
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CatalogDataSourceCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/CatalogDataSourceCompactionConfig.java
index 2a3c8ec8bce..c857a9f4877 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CatalogDataSourceCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CatalogDataSourceCompactionConfig.java
@@ -30,6 +30,7 @@ import
org.apache.druid.catalog.model.DatasourceProjectionMetadata;
import org.apache.druid.catalog.model.ResolvedTable;
import org.apache.druid.catalog.model.TableId;
import org.apache.druid.catalog.model.table.DatasourceDefn;
+import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -213,6 +214,12 @@ public class CatalogDataSourceCompactionConfig implements
DataSourceCompactionCo
return null;
}
+ @Override
+ public CompactionConfigValidationResult validate(ClusterCompactionConfig
clusterCompactionConfig)
+ {
+ return ClientCompactionRunnerInfo.validateCompactionConfig(this,
clusterCompactionConfig.getEngine());
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index 39a24d3eb5e..8d7f5a22b9d 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -60,6 +60,11 @@ public interface DataSourceCompactionConfig
String getDataSource();
+ /**
+ * Validates that the compaction config is well-formed and can be executed.
+ */
+ CompactionConfigValidationResult validate(ClusterCompactionConfig
clusterCompactionConfig);
+
@Nullable
CompactionEngine getEngine();
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/InlineSchemaDataSourceCompactionConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/InlineSchemaDataSourceCompactionConfig.java
index 58a7206b8ab..55f73758b69 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/InlineSchemaDataSourceCompactionConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/InlineSchemaDataSourceCompactionConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -228,6 +229,12 @@ public class InlineSchemaDataSourceCompactionConfig
implements DataSourceCompact
return granularitySpec == null ? null :
granularitySpec.getSegmentGranularity();
}
+ @Override
+ public CompactionConfigValidationResult validate(ClusterCompactionConfig
clusterCompactionConfig)
+ {
+ return ClientCompactionRunnerInfo.validateCompactionConfig(this,
clusterCompactionConfig.getEngine());
+ }
+
@Override
public boolean equals(Object o)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]