This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 55b41d5cee4 feat: Config for MM / Indexer to always start enabled. 
(#19373)
55b41d5cee4 is described below

commit 55b41d5cee464ead83781ac03ea8d15f9dc0d2de
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Apr 24 16:04:15 2026 -0700

    feat: Config for MM / Indexer to always start enabled. (#19373)
    
    PR #19368 updated the enabled/disabled state of an MM or Indexer to
    persist across restarts. In some cases, such as disabling a server
    before doing an in-place restart, it is more convenient to
    automatically re-enable on restart.
    
    This patch adds a config parameter to restore the old behavior:
    druid.worker.startAlwaysEnabled.
---
 docs/configuration/index.md                        |  2 +
 .../druid/indexing/worker/WorkerTaskManager.java   | 35 +++++++++---
 .../druid/indexing/worker/WorkerTaskMonitor.java   |  4 +-
 .../indexing/worker/WorkerTaskManagerTest.java     | 64 +++++++++++++++++++++-
 .../indexing/worker/WorkerTaskMonitorTest.java     |  1 +
 .../druid/indexing/worker/config/WorkerConfig.java | 17 ++++++
 6 files changed, 113 insertions(+), 10 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 935889dd2af..24c5da3717c 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1362,6 +1362,7 @@ Middle Managers pass their configurations down to their 
child peons. The Middle
 |`druid.worker.baseTaskDirs`|List of base temporary working directories, one 
of which is assigned per task in a round-robin fashion. This property can be 
used to allow usage of multiple disks for indexing. This property is 
recommended in place of and takes precedence over 
`${druid.indexer.task.baseTaskDir}`.  If this configuration is not set, 
`${druid.indexer.task.baseTaskDir}` is used. For example, 
`druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
 |`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by 
tasks on any single task dir. This value is treated symmetrically across all 
directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then 
each of those task directories is assumed to allow for 500 GB to be used and a 
total of 1.5 TB will potentially be available across all tasks. The actual 
amount of memory assigned to each task is discussed in [Configuring task 
storage sizes](../ingestion/tasks [...]
 |`druid.worker.category`|A string to name the category that the Middle Manager 
node belongs to.|`_default_worker_category`|
+|`druid.worker.startAlwaysEnabled`|If true, the Middle Manager always starts 
in the enabled state. If false, a disabled state set via the worker disable API 
is persisted and restored across restarts.|`false`|
 |`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This 
config should be set when [Centralized Datasource 
Schema](#centralized-datasource-schema-experimental) feature is enabled. |false|
 
 #### Peon processing
@@ -1478,6 +1479,7 @@ For most types of tasks, `SegmentWriteOutMediumFactory` 
can be configured per-ta
 |`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by 
tasks on any single task dir. This value is treated symmetrically across all 
directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then 
each of those task directories is assumed to allow for 500 GB to be used and a 
total of 1.5 TB will potentially be available across all tasks. The actual 
amount of memory assigned to each task is discussed in [Configuring task 
storage sizes](../ingestion/tasks [...]
 |`druid.worker.globalIngestionHeapLimitBytes`|Total amount of heap available 
for ingestion processing. This is applied by automatically setting the 
`maxBytesInMemory` property on tasks.|Configured max JVM heap size / 6|
 |`druid.worker.numConcurrentMerges`|Maximum number of segment persist or merge 
operations that can run concurrently across all tasks.|`druid.worker.capacity` 
/ 2, rounded down|
+|`druid.worker.startAlwaysEnabled`|If true, the Indexer always starts in the 
enabled state. If false, a disabled state set via the worker disable API is 
persisted and restored across restarts.|`false`|
 |`druid.indexer.task.baseDir`|Base temporary working 
directory.|`System.getProperty("java.io.tmpdir")`|
 |`druid.indexer.task.baseTaskDir`|Base temporary working directory for 
tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
 |`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer 
restart for restorable tasks to gracefully exit.|`PT5M`|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index ae9ecd7453b..7379e34293f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.Either;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
@@ -110,10 +111,16 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
   /**
    * Whether this worker is disabled (i.e., not accepting new tasks). 
Persisted to
    * {@link #STATE_FILE} under {@link #storageDir} so the flag survives
-   * process restarts.
+   * process restarts, unless {@link #startAlwaysEnabled} is true.
    */
   private final AtomicBoolean disabled = new AtomicBoolean(false);
 
+  /**
+   * When true, {@link #STATE_FILE} is deleted (rather than read) on startup,
+   * and the worker starts enabled. Controlled by {@code 
druid.worker.startAlwaysEnabled}.
+   */
+  private final boolean startAlwaysEnabled;
+
   private final OverlordClient overlordClient;
   private final File storageDir;
 
@@ -122,6 +129,7 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
       ObjectMapper jsonMapper,
       TaskRunner taskRunner,
       TaskConfig taskConfig,
+      WorkerConfig workerConfig,
       OverlordClient overlordClient
   )
   {
@@ -130,6 +138,7 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
     this.exec = Execs.singleThreaded("WorkerTaskManager-NoticeHandler");
     this.completedTasksCleanupExecutor = 
Execs.scheduledSingleThreaded("WorkerTaskManager-CompletedTasksCleaner");
     this.overlordClient = overlordClient;
+    this.startAlwaysEnabled = workerConfig.isStartAlwaysEnabled();
 
     storageDir = taskConfig.getBaseTaskDir();
   }
@@ -357,18 +366,28 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
   }
 
   /**
-   * Read {@link #STATE_FILE} and initialize {@link #disabled}.
+   * Read {@link #STATE_FILE} and initialize {@link #disabled}. When {@link 
#startAlwaysEnabled}
+   * is true, delete the file (if present) instead and leave {@link #disabled} 
at its default.
    */
   private void loadStateFile()
   {
     final File stateFile = getStateFile();
     if (stateFile.exists()) {
-      try {
-        final WorkerState state = jsonMapper.readValue(stateFile, 
WorkerState.class);
-        disabled.set(state.disabled());
-      }
-      catch (Exception e) {
-        log.warn(e, "Failed to read disabled state from[%s]. Starting as 
enabled.", stateFile);
+      if (startAlwaysEnabled) {
+        try {
+          Files.delete(stateFile.toPath());
+        }
+        catch (IOException e) {
+          log.warn(e, "Failed to delete state file[%s].", stateFile);
+        }
+      } else {
+        try {
+          final WorkerState state = jsonMapper.readValue(stateFile, 
WorkerState.class);
+          disabled.set(state.disabled());
+        }
+        catch (Exception e) {
+          log.warn(e, "Failed to read state file[%s]. Starting as enabled.", 
stateFile);
+        }
       }
     }
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
index 895bd29faf2..c8537997b58 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -63,12 +64,13 @@ public class WorkerTaskMonitor extends WorkerTaskManager
       ObjectMapper jsonMapper,
       TaskRunner taskRunner,
       TaskConfig taskConfig,
+      WorkerConfig workerConfig,
       CuratorFramework cf,
       WorkerCuratorCoordinator workerCuratorCoordinator,
       OverlordClient overlordClient
   )
   {
-    super(jsonMapper, taskRunner, taskConfig, overlordClient);
+    super(jsonMapper, taskRunner, taskConfig, workerConfig, overlordClient);
 
     this.jsonMapper = jsonMapper;
     this.pathChildrenCache = new PathChildrenCache(
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 157dab56628..1fd4347185f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.overlord.TestTaskRunner;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
 import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.query.policy.NoopPolicyEnforcer;
@@ -115,10 +116,15 @@ public class WorkerTaskManagerTest
 
   private WorkerTaskManager createWorkerTaskManager()
   {
-    return createWorkerTaskManager(FileUtils.createTempDir());
+    return createWorkerTaskManager(FileUtils.createTempDir(), new 
WorkerConfig());
   }
 
   private WorkerTaskManager createWorkerTaskManager(File baseDir)
+  {
+    return createWorkerTaskManager(baseDir, new WorkerConfig());
+  }
+
+  private WorkerTaskManager createWorkerTaskManager(File baseDir, WorkerConfig 
workerConfig)
   {
     TaskConfig taskConfig = new TaskConfigBuilder()
         .setBaseDir(baseDir.toString())
@@ -185,6 +191,7 @@ public class WorkerTaskManagerTest
             location
         ),
         taskConfig,
+        workerConfig,
         overlordClient
     )
     {
@@ -626,4 +633,59 @@ public class WorkerTaskManagerTest
     workerTaskManager.start();
     Assert.assertTrue(workerTaskManager.isWorkerEnabled());
   }
+
+  @Test
+  public void 
test_startAlwaysEnabled_ignoresAndDeletesPersistedDisabledState() throws 
Exception
+  {
+    
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+    EasyMock.replay(overlordClient);
+
+    final File baseTaskDir = FileUtils.createTempDir();
+
+    workerTaskManager = createWorkerTaskManager(baseTaskDir);
+    workerTaskManager.start();
+    workerTaskManager.workerDisabled();
+    Assert.assertFalse(workerTaskManager.isWorkerEnabled());
+    Assert.assertTrue(workerTaskManager.getStateFile().exists());
+    workerTaskManager.stop();
+
+    final WorkerConfig workerConfig = new WorkerConfig().cloneBuilder()
+        .setStartAlwaysEnabled(true)
+        .build();
+    workerTaskManager = createWorkerTaskManager(baseTaskDir, workerConfig);
+    workerTaskManager.start();
+    Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+    Assert.assertFalse(workerTaskManager.getStateFile().exists());
+  }
+
+  @Test
+  public void test_startAlwaysEnabled_doesNotCreateStateFileWhenAbsent() 
throws Exception
+  {
+    
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+    EasyMock.replay(overlordClient);
+
+    final WorkerConfig workerConfig = new WorkerConfig().cloneBuilder()
+        .setStartAlwaysEnabled(true)
+        .build();
+    workerTaskManager = createWorkerTaskManager(FileUtils.createTempDir(), 
workerConfig);
+    workerTaskManager.start();
+    Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+    Assert.assertFalse(workerTaskManager.getStateFile().exists());
+  }
+
+  @Test
+  public void test_startAlwaysEnabled_runtimeDisableStillPersistsToStateFile() 
throws Exception
+  {
+    
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+    EasyMock.replay(overlordClient);
+
+    final WorkerConfig workerConfig = new WorkerConfig().cloneBuilder()
+        .setStartAlwaysEnabled(true)
+        .build();
+    workerTaskManager = createWorkerTaskManager(FileUtils.createTempDir(), 
workerConfig);
+    workerTaskManager.start();
+    workerTaskManager.workerDisabled();
+    Assert.assertFalse(workerTaskManager.isWorkerEnabled());
+    Assert.assertTrue(workerTaskManager.getStateFile().exists());
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 74f86664c75..21e531f7a3a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -226,6 +226,7 @@ public class WorkerTaskMonitorTest
             new ServerConfig()
         ),
         taskConfig,
+        new WorkerConfig(),
         cf,
         workerCuratorCoordinator,
         EasyMock.createNiceMock(OverlordClient.class)
diff --git 
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
 
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index 36b702bfb95..2d084913a83 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -68,6 +68,9 @@ public class WorkerConfig
   @JsonProperty
   private int numConcurrentMerges = Math.max(1, capacity / 2);
 
+  @JsonProperty
+  private boolean startAlwaysEnabled = false;
+
   public String getIp()
   {
     return ip;
@@ -130,6 +133,11 @@ public class WorkerConfig
     return numConcurrentMerges;
   }
 
+  public boolean isStartAlwaysEnabled()
+  {
+    return startAlwaysEnabled;
+  }
+
   public Builder cloneBuilder()
   {
     return new Builder(this);
@@ -148,6 +156,7 @@ public class WorkerConfig
     private Period intermediaryPartitionTimeout;
     private long globalIngestionHeapLimitBytes;
     private int numConcurrentMerges;
+    private boolean startAlwaysEnabled;
 
     private Builder(WorkerConfig input)
     {
@@ -162,6 +171,7 @@ public class WorkerConfig
       this.intermediaryPartitionTimeout = input.intermediaryPartitionTimeout;
       this.globalIngestionHeapLimitBytes = input.globalIngestionHeapLimitBytes;
       this.numConcurrentMerges = input.numConcurrentMerges;
+      this.startAlwaysEnabled = input.startAlwaysEnabled;
     }
 
     public Builder setIp(String ip)
@@ -230,6 +240,12 @@ public class WorkerConfig
       return this;
     }
 
+    public Builder setStartAlwaysEnabled(boolean startAlwaysEnabled)
+    {
+      this.startAlwaysEnabled = startAlwaysEnabled;
+      return this;
+    }
+
     public WorkerConfig build()
     {
       final WorkerConfig retVal = new WorkerConfig();
@@ -244,6 +260,7 @@ public class WorkerConfig
       retVal.intermediaryPartitionTimeout = this.intermediaryPartitionTimeout;
       retVal.globalIngestionHeapLimitBytes = 
this.globalIngestionHeapLimitBytes;
       retVal.numConcurrentMerges = this.numConcurrentMerges;
+      retVal.startAlwaysEnabled = this.startAlwaysEnabled;
       return retVal;
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to