This is an automated email from the ASF dual-hosted git repository.

yqm pushed a commit to branch 37.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/37.0.0 by this push:
     new 7de798de39f fix: Fix how supervisor validation works for cascading 
reindexing templates in compaction supervisors (#19223) (#19334)
7de798de39f is described below

commit 7de798de39fb001615fd4048ee3d536292a97d1d
Author: Lucas Capistrant <[email protected]>
AuthorDate: Thu Apr 16 19:11:07 2026 -0500

    fix: Fix how supervisor validation works for cascading reindexing templates 
in compaction supervisors (#19223) (#19334)
    
    * Fix how supervisor validation works for cascading reindexing templates in 
compaction supervisors
    
    * some refactoring of compaction spec validation
    
    * Add a validation check again before creating jobs
    
    * Make compaction spec validation more dynamic and fresh compared to 
previous approach
    
    * Fixup a place validation wasn't beging called after removing it from 
compaction config constructor
    
    (cherry picked from commit 0e91c6f8dd3f5f3913585c022a5d8b1ac7ce9d01)
---
 .../compact/CascadingReindexingTemplate.java       |  50 ++++++++++
 .../druid/indexing/compact/CompactionJobQueue.java |  11 +++
 .../indexing/compact/CompactionSupervisor.java     |  32 ++++---
 .../indexing/compact/CompactionSupervisorSpec.java |  18 +++-
 .../compact/OverlordCompactionScheduler.java       |   6 +-
 .../overlord/http/OverlordCompactionResource.java  |   7 ++
 .../overlord/supervisor/SupervisorResource.java    |  10 ++
 .../compact/CascadingReindexingTemplateTest.java   | 102 +++++++++++++++++++++
 .../compact/CompactionSupervisorSpecTest.java      |  27 +++++-
 .../compact/OverlordCompactionSchedulerTest.java   |  23 +++++
 .../supervisor/SupervisorResourceTest.java         |  33 +++++++
 .../overlord/supervisor/SupervisorSpec.java        |  12 +++
 .../CatalogDataSourceCompactionConfig.java         |   7 ++
 .../coordinator/DataSourceCompactionConfig.java    |   5 +
 .../InlineSchemaDataSourceCompactionConfig.java    |   7 ++
 15 files changed, 326 insertions(+), 24 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
index 256f8f3909b..7fcb0fee22a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.compact;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.CompactionEngine;
@@ -38,6 +39,8 @@ import 
org.apache.druid.server.compaction.IntervalPartitioningInfo;
 import org.apache.druid.server.compaction.ReindexingPartitioningRule;
 import org.apache.druid.server.compaction.ReindexingRule;
 import org.apache.druid.server.compaction.ReindexingRuleProvider;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import 
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
@@ -249,6 +252,53 @@ public class CascadingReindexingTemplate implements 
CompactionJobTemplate, DataS
     return tuningConfig;
   }
 
+  /**
+   * Validates this template using a subset of the standard MSQ compaction 
checks.
+   * The standard path in {@link 
ClientCompactionRunnerInfo#validateCompactionConfig}
+   * assumes partitioning is controlled by {@code 
tuningConfig.partitionsSpec}, but
+   * this template forbids that field and uses {@code defaultPartitionsSpec} 
instead.
+   *
+   * <p>Checks performed:
+   * <ul>
+   *   <li>partitionsSpec type and options — validated against {@code 
defaultPartitionsSpec}.
+   *       Range partition dimension type checking passes {@code null} for 
dimensionSchemas
+   *       since those are not known at template level.</li>
+   *   <li>maxNumTasks >= 2 in taskContext.</li>
+   * </ul>
+   *
+   * <p>Standard MSQ checks skipped (not applicable at template level):
+   * <ul>
+   *   <li>rollup vs metricsSpec consistency — {@code granularitySpec} is 
always null on the
+   *       template; rollup is configured per-rule at job generation time.</li>
+   *   <li>metricsSpec aggregator combining factory — there is no metricsSpec 
on the template;
+   *       metrics come from per-rule data schema rules resolved at job 
generation time.</li>
+   * </ul>
+   *
+   * <p>Per-rule overrides (partitionsSpec, metricsSpec, rollup) are validated 
at task
+   * runtime by {@code MSQCompactionRunner.validateCompactionTask()} once the 
full config
+   * is resolved against actual data schemas.
+   */
+  @Override
+  public CompactionConfigValidationResult validate(ClusterCompactionConfig 
clusterCompactionConfig)
+  {
+    List<CompactionConfigValidationResult> results = new ArrayList<>();
+
+    results.add(ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
+        this.getDefaultPartitionsSpec(),
+        null,
+        this.getDefaultPartitioningVirtualColumns() != null
+        ? this.getDefaultPartitioningVirtualColumns()
+        : VirtualColumns.EMPTY
+    ));
+
+    
results.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(this.getTaskContext()));
+
+    return results.stream()
+                  .filter(result -> !result.isValid())
+                  .findFirst()
+                  .orElse(CompactionConfigValidationResult.success());
+  }
+
   /**
    * Checks if the given interval's end time is after the specified boundary.
    * Used to determine if intervals should be skipped based on skip offset 
configuration.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
index 31333801225..c6ce5f0a6c1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
@@ -161,6 +161,17 @@ public class CompactionJobQueue
     final String supervisorId = supervisor.getSpec().getId();
     try {
       if (supervisor.shouldCreateJobs() && 
!activeSupervisors.contains(supervisorId)) {
+        final CompactionConfigValidationResult validationResult =
+            supervisor.getSpec().getSpec().validate(clusterCompactionConfig);
+        if (!validationResult.isValid()) {
+          log.warn(
+              "Skipping job creation for invalid supervisor[%s]: %s",
+              supervisorId,
+              validationResult.getReason()
+          );
+          return;
+        }
+
         // Queue fresh jobs
         final List<CompactionJob> jobs = supervisor.createJobs(source, 
jobParams);
         jobs.forEach(job -> snapshotBuilder.addToPending(job.getCandidate()));
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
index f9ba0eee6db..0bd020bf8a2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -84,12 +85,17 @@ public class CompactionSupervisor implements Supervisor
     if (supervisorSpec.isSuspended()) {
       log.info("Suspending compaction for dataSource[%s].", dataSource);
       scheduler.stopCompaction(dataSource);
-    } else if (!supervisorSpec.getValidationResult().isValid()) {
+      return;
+    }
+
+    final CompactionConfigValidationResult validationResult =
+        scheduler.validateCompactionConfig(supervisorSpec.getSpec());
+    if (!validationResult.isValid()) {
       log.warn(
           "Cannot start compaction supervisor for datasource[%s] since the 
compaction supervisor spec is invalid. "
           + "Reason[%s].",
           dataSource,
-          supervisorSpec.getValidationResult().getReason()
+          validationResult.getReason()
       );
     } else {
       log.info("Starting compaction for dataSource[%s].", dataSource);
@@ -112,15 +118,19 @@ public class CompactionSupervisor implements Supervisor
       snapshot = AutoCompactionSnapshot.builder(dataSource)
                                        
.withStatus(AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED)
                                        .build();
-    } else if (!supervisorSpec.getValidationResult().isValid()) {
-      snapshot = AutoCompactionSnapshot.builder(dataSource)
-                                       .withMessage(StringUtils.format(
-                                           "Compaction supervisor spec is 
invalid. Reason[%s].",
-                                           
supervisorSpec.getValidationResult().getReason()
-                                       ))
-                                       .build();
     } else {
-      snapshot = scheduler.getCompactionSnapshot(dataSource);
+      final CompactionConfigValidationResult validationResult =
+          scheduler.validateCompactionConfig(supervisorSpec.getSpec());
+      if (!validationResult.isValid()) {
+        snapshot = AutoCompactionSnapshot.builder(dataSource)
+                                         .withMessage(StringUtils.format(
+                                             "Compaction supervisor spec is 
invalid. Reason[%s].",
+                                             validationResult.getReason()
+                                         ))
+                                         .build();
+      } else {
+        snapshot = scheduler.getCompactionSnapshot(dataSource);
+      }
     }
 
     return new SupervisorReport<>(supervisorSpec.getId(), DateTimes.nowUtc(), 
snapshot);
@@ -133,7 +143,7 @@ public class CompactionSupervisor implements Supervisor
       return State.SCHEDULER_STOPPED;
     } else if (supervisorSpec.isSuspended()) {
       return State.SUSPENDED;
-    } else if (!supervisorSpec.getValidationResult().isValid()) {
+    } else if 
(!scheduler.validateCompactionConfig(supervisorSpec.getSpec()).isValid()) {
       return State.INVALID_SPEC;
     } else {
       return State.RUNNING;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
index d9002655f74..7313b676f34 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.common.config.Configs;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -43,7 +44,6 @@ public class CompactionSupervisorSpec implements 
SupervisorSpec
   private final boolean suspended;
   private final DataSourceCompactionConfig spec;
   private final CompactionScheduler scheduler;
-  private final CompactionConfigValidationResult validationResult;
 
   public static String getSupervisorIdForDatasource(String dataSource)
   {
@@ -60,7 +60,6 @@ public class CompactionSupervisorSpec implements 
SupervisorSpec
     this.spec = spec;
     this.suspended = Configs.valueOrDefault(suspended, false);
     this.scheduler = scheduler;
-    this.validationResult = scheduler == null ? null : 
scheduler.validateCompactionConfig(spec);
   }
 
   @JsonProperty
@@ -82,9 +81,20 @@ public class CompactionSupervisorSpec implements 
SupervisorSpec
     return getSupervisorIdForDatasource(spec.getDataSource());
   }
 
-  public CompactionConfigValidationResult getValidationResult()
+  /**
+   * Validates the underlying {@link DataSourceCompactionConfig} against the
+   * current cluster compaction config. Throws a {@link DruidException} of
+   * category {@link DruidException.Category#INVALID_INPUT} if invalid.
+   */
+  @Override
+  public void validateSpec()
   {
-    return validationResult;
+    final CompactionConfigValidationResult result = 
scheduler.validateCompactionConfig(spec);
+    if (!result.isValid()) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("Invalid compaction supervisor spec: %s", 
result.getReason());
+    }
   }
 
   @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java
index cf368473e1b..c2a0dabb65a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java
@@ -25,7 +25,6 @@ import com.google.common.base.Supplier;
 import com.google.inject.Inject;
 import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.broker.BrokerClient;
-import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
@@ -265,10 +264,7 @@ public class OverlordCompactionScheduler implements 
CompactionScheduler
     if (compactionConfig == null) {
       return CompactionConfigValidationResult.failure("Cannot be null");
     } else {
-      return ClientCompactionRunnerInfo.validateCompactionConfig(
-          compactionConfig,
-          getLatestClusterConfig().getEngine()
-      );
+      return compactionConfig.validate(getLatestClusterConfig());
     }
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
index e2d08fa97c4..7d90bf1047b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResource.java
@@ -28,6 +28,7 @@ import com.sun.jersey.spi.container.ResourceFilters;
 import org.apache.druid.audit.AuditInfo;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InternalServerError;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.error.NotFound;
@@ -262,6 +263,12 @@ public class OverlordCompactionResource
 
     if (scheduler.isEnabled()) {
       final CompactionSupervisorSpec spec = new 
CompactionSupervisorSpec(newConfig, false, scheduler);
+      try {
+        spec.validateSpec();
+      }
+      catch (DruidException e) {
+        return invalidInputResponse(e.getMessage());
+      }
       return ServletResourceUtils.buildUpdateResponse(
           () -> supervisorManager.updateCompactionSupervisor(spec, request)
       );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 6b145be07e4..78aa16b37cb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -35,6 +35,7 @@ import com.sun.jersey.spi.container.ResourceFilters;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.druid.audit.AuditEntry;
 import org.apache.druid.audit.AuditManager;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import 
org.apache.druid.indexing.overlord.http.security.SupervisorResourceFilter;
@@ -155,6 +156,15 @@ public class SupervisorResource
           if (!authResult.allowAccessWithNoRestriction()) {
             throw new ForbiddenException(authResult.getErrorMessage());
           }
+          try {
+            spec.validateSpec();
+          }
+          catch (DruidException e) {
+            return Response.status(Response.Status.BAD_REQUEST)
+                           .entity(ImmutableMap.of("error", e.getMessage()))
+                           .build();
+          }
+
           if (Boolean.TRUE.equals(skipRestartIfUnmodified) && 
!manager.shouldUpdateSupervisor(spec)) {
             return Response.ok(ImmutableMap.of("id", spec.getId())).build();
           }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
index 479ad183193..98e99810805 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
@@ -22,10 +22,12 @@ package org.apache.druid.indexing.compact;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.indexing.ClientMSQContext;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.SupervisorModule;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.input.DruidInputSource;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
@@ -43,6 +45,8 @@ import 
org.apache.druid.server.compaction.ReindexingDataSchemaRule;
 import org.apache.druid.server.compaction.ReindexingPartitioningRule;
 import org.apache.druid.server.compaction.ReindexingRule;
 import org.apache.druid.server.compaction.ReindexingRuleProvider;
+import org.apache.druid.server.coordinator.ClusterCompactionConfig;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import 
org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
@@ -66,6 +70,8 @@ import java.util.Map;
 public class CascadingReindexingTemplateTest extends 
InitializedNullHandlingTest
 {
   private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+  private static final ClusterCompactionConfig CLUSTER_CONFIG =
+      new ClusterCompactionConfig(null, null, null, null, null, null);
 
   @BeforeEach
   public void setUp()
@@ -1763,6 +1769,102 @@ public class CascadingReindexingTemplateTest extends 
InitializedNullHandlingTest
     );
   }
 
+  @Test
+  public void test_validate_returnsValid_withDynamicPartitionsSpec()
+  {
+    final CascadingReindexingTemplate template = new 
CascadingReindexingTemplate(
+        "testDataSource",
+        null,
+        null,
+        InlineReindexingRuleProvider.builder().build(),
+        null,
+        null,
+        null,
+        Granularities.DAY,
+        new DynamicPartitionsSpec(null, null),
+        null,
+        null
+    );
+
+    CompactionConfigValidationResult result = 
template.validate(CLUSTER_CONFIG);
+    Assertions.assertTrue(result.isValid());
+  }
+
+  @Test
+  public void test_validate_returnsInvalid_withHashedPartitionsSpec()
+  {
+    final CascadingReindexingTemplate template = new 
CascadingReindexingTemplate(
+        "testDataSource",
+        null,
+        null,
+        InlineReindexingRuleProvider.builder().build(),
+        null,
+        null,
+        null,
+        Granularities.DAY,
+        new HashedPartitionsSpec(null, 3, null),
+        null,
+        null
+    );
+
+    CompactionConfigValidationResult result = 
template.validate(CLUSTER_CONFIG);
+    Assertions.assertFalse(result.isValid());
+    Assertions.assertEquals(
+        "MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either 
'dynamic' or 'range'",
+        result.getReason()
+    );
+  }
+
+  @Test
+  public void test_validate_returnsInvalid_withMaxTotalRows()
+  {
+    final CascadingReindexingTemplate template = new 
CascadingReindexingTemplate(
+        "testDataSource",
+        null,
+        null,
+        InlineReindexingRuleProvider.builder().build(),
+        null,
+        null,
+        null,
+        Granularities.DAY,
+        new DynamicPartitionsSpec(null, 1000L),
+        null,
+        null
+    );
+
+    CompactionConfigValidationResult result = 
template.validate(CLUSTER_CONFIG);
+    Assertions.assertFalse(result.isValid());
+    Assertions.assertEquals(
+        "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning",
+        result.getReason()
+    );
+  }
+
+  @Test
+  public void test_validate_returnsInvalid_withOneMaxNumTasks()
+  {
+    final CascadingReindexingTemplate template = new 
CascadingReindexingTemplate(
+        "testDataSource",
+        null,
+        null,
+        InlineReindexingRuleProvider.builder().build(),
+        Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1),
+        null,
+        null,
+        Granularities.DAY,
+        new DynamicPartitionsSpec(null, null),
+        null,
+        null
+    );
+
+    CompactionConfigValidationResult result = 
template.validate(CLUSTER_CONFIG);
+    Assertions.assertFalse(result.isValid());
+    Assertions.assertEquals(
+        "MSQ: Context maxNumTasks[1] must be at least 2 (1 controller + 1 
worker)",
+        result.getReason()
+    );
+  }
+
   private static class TestCascadingReindexingTemplate extends 
CascadingReindexingTemplate
   {
     // Capture intervals that were processed for assertions
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
index b008d74829a..87c973a773f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java
@@ -97,13 +97,32 @@ public class CompactionSupervisorSpecTest
   }
 
   @Test
-  public void testGetValidationResultForInvalidSpec()
+  public void testValidateSpecThrowsForInvalidConfig()
   {
     Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any()))
            .thenReturn(CompactionConfigValidationResult.failure("bad spec"));
-    CompactionConfigValidationResult validationResult = new 
CompactionSupervisorSpec(null, false, scheduler).getValidationResult();
-    Assert.assertFalse(validationResult.isValid());
-    Assert.assertEquals("bad spec", validationResult.getReason());
+    final CompactionSupervisorSpec invalidSpec = new CompactionSupervisorSpec(
+        
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
+        false,
+        scheduler
+    );
+    final DruidException thrown = Assert.assertThrows(
+        DruidException.class,
+        invalidSpec::validateSpec
+    );
+    Assert.assertEquals(DruidException.Category.INVALID_INPUT, 
thrown.getCategory());
+    Assert.assertEquals("Invalid compaction supervisor spec: bad spec", 
thrown.getMessage());
+  }
+
+  @Test
+  public void testValidateSpecSucceedsForValidConfig()
+  {
+    final CompactionSupervisorSpec validSpec = new CompactionSupervisorSpec(
+        
InlineSchemaDataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(),
+        false,
+        scheduler
+    );
+    validSpec.validateSpec();
   }
 
   @Test
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
index 5c1200110f8..3b9597cd79f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.guice.SupervisorModule;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.indexing.common.TimeChunkLock;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
@@ -70,6 +71,7 @@ import 
org.apache.druid.server.compaction.CompactionSimulateResult;
 import org.apache.druid.server.compaction.CompactionStatistics;
 import org.apache.druid.server.compaction.CompactionStatus;
 import org.apache.druid.server.compaction.CompactionStatusTracker;
+import org.apache.druid.server.compaction.InlineReindexingRuleProvider;
 import org.apache.druid.server.compaction.Table;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.ClusterCompactionConfig;
@@ -383,6 +385,27 @@ public class OverlordCompactionSchedulerTest
     );
   }
 
+  @Test
+  public void 
test_validateCompactionConfig_delegatesToCascadingReindexingTemplate()
+  {
+    final CascadingReindexingTemplate template = new 
CascadingReindexingTemplate(
+        dataSource,
+        null,
+        null,
+        InlineReindexingRuleProvider.builder().build(),
+        null,
+        null,
+        null,
+        Granularities.DAY,
+        new DynamicPartitionsSpec(null, null),
+        null,
+        null
+    );
+
+    final CompactionConfigValidationResult result = 
scheduler.validateCompactionConfig(template);
+    Assert.assertTrue(result.isValid());
+  }
+
   @Test
   public void test_startCompaction_enablesTaskSubmission_forDatasource()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 6c099a53b3a..9282e49d19b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -190,6 +190,39 @@ public class SupervisorResourceTest extends EasyMockSupport
     Assert.assertEquals(503, response.getStatus());
   }
 
+  @Test
+  public void testSpecPost_returnsBadRequest_whenValidateSpecThrows()
+  {
+    SupervisorSpec spec = new TestSupervisorSpec("my-id", null, null)
+    {
+      @Override
+      public List<String> getDataSources()
+      {
+        return Collections.singletonList("datasource1");
+      }
+
+      @Override
+      public void validateSpec()
+      {
+        throw org.apache.druid.error.DruidException
+            .forPersona(org.apache.druid.error.DruidException.Persona.USER)
+            
.ofCategory(org.apache.druid.error.DruidException.Category.INVALID_INPUT)
+            .build("nope");
+      }
+    };
+
+    
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+    setupMockRequest();
+    EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+    replayAll();
+
+    Response response = supervisorResource.specPost(spec, false, request);
+    verifyAll();
+
+    Assert.assertEquals(400, response.getStatus());
+    Assert.assertEquals(ImmutableMap.of("error", "nope"), 
response.getEntity());
+  }
+
   @Test
   public void testSpecPost_whenSkipRestartIfUnmodifiedIsTrue()
   {
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index fe9dd8f3942..bf50c7cf48c 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -102,6 +102,18 @@ public interface SupervisorSpec
    */
   String getSource();
 
+  /**
+   * Validates this supervisor spec against current system state at submission 
time.
+   * Implementations should throw a {@link DruidException} with category
+   * {@link DruidException.Category#INVALID_INPUT} when the spec should be 
rejected.
+   * <p>
+   * The default implementation does no validation.
+   */
+  default void validateSpec() throws DruidException
+  {
+    // The default implementation does not do any validation checks.
+  }
+
   /**
    * Checks if a spec can be replaced with a proposed spec (proposesSpec).
    * <p>
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CatalogDataSourceCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CatalogDataSourceCompactionConfig.java
index 2a3c8ec8bce..c857a9f4877 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CatalogDataSourceCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CatalogDataSourceCompactionConfig.java
@@ -30,6 +30,7 @@ import 
org.apache.druid.catalog.model.DatasourceProjectionMetadata;
 import org.apache.druid.catalog.model.ResolvedTable;
 import org.apache.druid.catalog.model.TableId;
 import org.apache.druid.catalog.model.table.DatasourceDefn;
+import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.java.util.common.granularity.Granularity;
@@ -213,6 +214,12 @@ public class CatalogDataSourceCompactionConfig implements 
DataSourceCompactionCo
     return null;
   }
 
+  @Override
+  public CompactionConfigValidationResult validate(ClusterCompactionConfig 
clusterCompactionConfig)
+  {
+    return ClientCompactionRunnerInfo.validateCompactionConfig(this, 
clusterCompactionConfig.getEngine());
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index 39a24d3eb5e..8d7f5a22b9d 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -60,6 +60,11 @@ public interface DataSourceCompactionConfig
 
   String getDataSource();
 
+  /**
+   * Validates that the compaction config is well-formed and can be executed.
+   */
+  CompactionConfigValidationResult validate(ClusterCompactionConfig 
clusterCompactionConfig);
+
   @Nullable
   CompactionEngine getEngine();
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/InlineSchemaDataSourceCompactionConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/InlineSchemaDataSourceCompactionConfig.java
index 58a7206b8ab..55f73758b69 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/InlineSchemaDataSourceCompactionConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/InlineSchemaDataSourceCompactionConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
 import org.apache.druid.data.input.impl.AggregateProjectionSpec;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.java.util.common.granularity.Granularity;
@@ -228,6 +229,12 @@ public class InlineSchemaDataSourceCompactionConfig 
implements DataSourceCompact
     return granularitySpec == null ? null : 
granularitySpec.getSegmentGranularity();
   }
 
+  @Override
+  public CompactionConfigValidationResult validate(ClusterCompactionConfig 
clusterCompactionConfig)
+  {
+    return ClientCompactionRunnerInfo.validateCompactionConfig(this, 
clusterCompactionConfig.getEngine());
+  }
+
   @Override
   public boolean equals(Object o)
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to