This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 55b41d5cee4 feat: Config for MM / Indexer to always start enabled.
(#19373)
55b41d5cee4 is described below
commit 55b41d5cee464ead83781ac03ea8d15f9dc0d2de
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Apr 24 16:04:15 2026 -0700
feat: Config for MM / Indexer to always start enabled. (#19373)
PR #19368 updated the enabled/disabled state of an MM or Indexer to
persist across restarts. In some cases, such as disabling a server
before doing an in-place restart, it is more convenient to
automatically re-enable on restart.
This patch adds a config parameter to restore the old behavior:
druid.worker.startAlwaysEnabled.
---
docs/configuration/index.md | 2 +
.../druid/indexing/worker/WorkerTaskManager.java | 35 +++++++++---
.../druid/indexing/worker/WorkerTaskMonitor.java | 4 +-
.../indexing/worker/WorkerTaskManagerTest.java | 64 +++++++++++++++++++++-
.../indexing/worker/WorkerTaskMonitorTest.java | 1 +
.../druid/indexing/worker/config/WorkerConfig.java | 17 ++++++
6 files changed, 113 insertions(+), 10 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 935889dd2af..24c5da3717c 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1362,6 +1362,7 @@ Middle Managers pass their configurations down to their
child peons. The Middle
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one
of which is assigned per task in a round-robin fashion. This property can be
used to allow usage of multiple disks for indexing. This property is
recommended in place of and takes precedence over
`${druid.indexer.task.baseTaskDir}`. If this configuration is not set,
`${druid.indexer.task.baseTaskDir}` is used. For example,
`druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by
tasks on any single task dir. This value is treated symmetrically across all
directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then
each of those task directories is assumed to allow for 500 GB to be used and a
total of 1.5 TB will potentially be available across all tasks. The actual
amount of memory assigned to each task is discussed in [Configuring task
storage sizes](../ingestion/tasks [...]
|`druid.worker.category`|A string to name the category that the Middle Manager
node belongs to.|`_default_worker_category`|
+|`druid.worker.startAlwaysEnabled`|If true, the Middle Manager always starts
in the enabled state. If false, a disabled state set via the worker disable API
is persisted and restored across restarts.|`false`|
|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This
config should be set when [Centralized Datasource
Schema](#centralized-datasource-schema-experimental) feature is enabled. |false|
#### Peon processing
@@ -1478,6 +1479,7 @@ For most types of tasks, `SegmentWriteOutMediumFactory`
can be configured per-ta
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by
tasks on any single task dir. This value is treated symmetrically across all
directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then
each of those task directories is assumed to allow for 500 GB to be used and a
total of 1.5 TB will potentially be available across all tasks. The actual
amount of memory assigned to each task is discussed in [Configuring task
storage sizes](../ingestion/tasks [...]
|`druid.worker.globalIngestionHeapLimitBytes`|Total amount of heap available
for ingestion processing. This is applied by automatically setting the
`maxBytesInMemory` property on tasks.|Configured max JVM heap size / 6|
|`druid.worker.numConcurrentMerges`|Maximum number of segment persist or merge
operations that can run concurrently across all tasks.|`druid.worker.capacity`
/ 2, rounded down|
+|`druid.worker.startAlwaysEnabled`|If true, the Indexer always starts in the
enabled state. If false, a disabled state set via the worker disable API is
persisted and restored across restarts.|`false`|
|`druid.indexer.task.baseDir`|Base temporary working
directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for
tasks.|`${druid.indexer.task.baseDir}/persistent/tasks`|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer
restart for restorable tasks to gracefully exit.|`PT5M`|
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index ae9ecd7453b..7379e34293f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
@@ -110,10 +111,16 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
/**
* Whether this worker is disabled (i.e., not accepting new tasks).
Persisted to
* {@link #STATE_FILE} under {@link #storageDir} so the flag survives
- * process restarts.
+ * process restarts, unless {@link #startAlwaysEnabled} is true.
*/
private final AtomicBoolean disabled = new AtomicBoolean(false);
+ /**
+ * When true, {@link #STATE_FILE} is deleted (rather than read) on startup,
+ * and the worker starts enabled. Controlled by {@code
druid.worker.startAlwaysEnabled}.
+ */
+ private final boolean startAlwaysEnabled;
+
private final OverlordClient overlordClient;
private final File storageDir;
@@ -122,6 +129,7 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
ObjectMapper jsonMapper,
TaskRunner taskRunner,
TaskConfig taskConfig,
+ WorkerConfig workerConfig,
OverlordClient overlordClient
)
{
@@ -130,6 +138,7 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
this.exec = Execs.singleThreaded("WorkerTaskManager-NoticeHandler");
this.completedTasksCleanupExecutor =
Execs.scheduledSingleThreaded("WorkerTaskManager-CompletedTasksCleaner");
this.overlordClient = overlordClient;
+ this.startAlwaysEnabled = workerConfig.isStartAlwaysEnabled();
storageDir = taskConfig.getBaseTaskDir();
}
@@ -357,18 +366,28 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
}
/**
- * Read {@link #STATE_FILE} and initialize {@link #disabled}.
+ * Read {@link #STATE_FILE} and initialize {@link #disabled}. When {@link
#startAlwaysEnabled}
+ * is true, delete the file (if present) instead and leave {@link #disabled}
at its default.
*/
private void loadStateFile()
{
final File stateFile = getStateFile();
if (stateFile.exists()) {
- try {
- final WorkerState state = jsonMapper.readValue(stateFile,
WorkerState.class);
- disabled.set(state.disabled());
- }
- catch (Exception e) {
- log.warn(e, "Failed to read disabled state from[%s]. Starting as
enabled.", stateFile);
+ if (startAlwaysEnabled) {
+ try {
+ Files.delete(stateFile.toPath());
+ }
+ catch (IOException e) {
+ log.warn(e, "Failed to delete state file[%s].", stateFile);
+ }
+ } else {
+ try {
+ final WorkerState state = jsonMapper.readValue(stateFile,
WorkerState.class);
+ disabled.set(state.disabled());
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to read state file[%s]. Starting as enabled.",
stateFile);
+ }
}
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
index 895bd29faf2..c8537997b58 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -63,12 +64,13 @@ public class WorkerTaskMonitor extends WorkerTaskManager
ObjectMapper jsonMapper,
TaskRunner taskRunner,
TaskConfig taskConfig,
+ WorkerConfig workerConfig,
CuratorFramework cf,
WorkerCuratorCoordinator workerCuratorCoordinator,
OverlordClient overlordClient
)
{
- super(jsonMapper, taskRunner, taskConfig, overlordClient);
+ super(jsonMapper, taskRunner, taskConfig, workerConfig, overlordClient);
this.jsonMapper = jsonMapper;
this.pathChildrenCache = new PathChildrenCache(
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 157dab56628..1fd4347185f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.TestTaskRunner;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.FileUtils;
import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.query.policy.NoopPolicyEnforcer;
@@ -115,10 +116,15 @@ public class WorkerTaskManagerTest
private WorkerTaskManager createWorkerTaskManager()
{
- return createWorkerTaskManager(FileUtils.createTempDir());
+ return createWorkerTaskManager(FileUtils.createTempDir(), new
WorkerConfig());
}
private WorkerTaskManager createWorkerTaskManager(File baseDir)
+ {
+ return createWorkerTaskManager(baseDir, new WorkerConfig());
+ }
+
+ private WorkerTaskManager createWorkerTaskManager(File baseDir, WorkerConfig
workerConfig)
{
TaskConfig taskConfig = new TaskConfigBuilder()
.setBaseDir(baseDir.toString())
@@ -185,6 +191,7 @@ public class WorkerTaskManagerTest
location
),
taskConfig,
+ workerConfig,
overlordClient
)
{
@@ -626,4 +633,59 @@ public class WorkerTaskManagerTest
workerTaskManager.start();
Assert.assertTrue(workerTaskManager.isWorkerEnabled());
}
+
+ @Test
+ public void
test_startAlwaysEnabled_ignoresAndDeletesPersistedDisabledState() throws
Exception
+ {
+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+ EasyMock.replay(overlordClient);
+
+ final File baseTaskDir = FileUtils.createTempDir();
+
+ workerTaskManager = createWorkerTaskManager(baseTaskDir);
+ workerTaskManager.start();
+ workerTaskManager.workerDisabled();
+ Assert.assertFalse(workerTaskManager.isWorkerEnabled());
+ Assert.assertTrue(workerTaskManager.getStateFile().exists());
+ workerTaskManager.stop();
+
+ final WorkerConfig workerConfig = new WorkerConfig().cloneBuilder()
+ .setStartAlwaysEnabled(true)
+ .build();
+ workerTaskManager = createWorkerTaskManager(baseTaskDir, workerConfig);
+ workerTaskManager.start();
+ Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+ Assert.assertFalse(workerTaskManager.getStateFile().exists());
+ }
+
+ @Test
+ public void test_startAlwaysEnabled_doesNotCreateStateFileWhenAbsent()
throws Exception
+ {
+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+ EasyMock.replay(overlordClient);
+
+ final WorkerConfig workerConfig = new WorkerConfig().cloneBuilder()
+ .setStartAlwaysEnabled(true)
+ .build();
+ workerTaskManager = createWorkerTaskManager(FileUtils.createTempDir(),
workerConfig);
+ workerTaskManager.start();
+ Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+ Assert.assertFalse(workerTaskManager.getStateFile().exists());
+ }
+
+ @Test
+ public void test_startAlwaysEnabled_runtimeDisableStillPersistsToStateFile()
throws Exception
+ {
+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+ EasyMock.replay(overlordClient);
+
+ final WorkerConfig workerConfig = new WorkerConfig().cloneBuilder()
+ .setStartAlwaysEnabled(true)
+ .build();
+ workerTaskManager = createWorkerTaskManager(FileUtils.createTempDir(),
workerConfig);
+ workerTaskManager.start();
+ workerTaskManager.workerDisabled();
+ Assert.assertFalse(workerTaskManager.isWorkerEnabled());
+ Assert.assertTrue(workerTaskManager.getStateFile().exists());
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 74f86664c75..21e531f7a3a 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -226,6 +226,7 @@ public class WorkerTaskMonitorTest
new ServerConfig()
),
taskConfig,
+ new WorkerConfig(),
cf,
workerCuratorCoordinator,
EasyMock.createNiceMock(OverlordClient.class)
diff --git
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index 36b702bfb95..2d084913a83 100644
---
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -68,6 +68,9 @@ public class WorkerConfig
@JsonProperty
private int numConcurrentMerges = Math.max(1, capacity / 2);
+ @JsonProperty
+ private boolean startAlwaysEnabled = false;
+
public String getIp()
{
return ip;
@@ -130,6 +133,11 @@ public class WorkerConfig
return numConcurrentMerges;
}
+ public boolean isStartAlwaysEnabled()
+ {
+ return startAlwaysEnabled;
+ }
+
public Builder cloneBuilder()
{
return new Builder(this);
@@ -148,6 +156,7 @@ public class WorkerConfig
private Period intermediaryPartitionTimeout;
private long globalIngestionHeapLimitBytes;
private int numConcurrentMerges;
+ private boolean startAlwaysEnabled;
private Builder(WorkerConfig input)
{
@@ -162,6 +171,7 @@ public class WorkerConfig
this.intermediaryPartitionTimeout = input.intermediaryPartitionTimeout;
this.globalIngestionHeapLimitBytes = input.globalIngestionHeapLimitBytes;
this.numConcurrentMerges = input.numConcurrentMerges;
+ this.startAlwaysEnabled = input.startAlwaysEnabled;
}
public Builder setIp(String ip)
@@ -230,6 +240,12 @@ public class WorkerConfig
return this;
}
+ public Builder setStartAlwaysEnabled(boolean startAlwaysEnabled)
+ {
+ this.startAlwaysEnabled = startAlwaysEnabled;
+ return this;
+ }
+
public WorkerConfig build()
{
final WorkerConfig retVal = new WorkerConfig();
@@ -244,6 +260,7 @@ public class WorkerConfig
retVal.intermediaryPartitionTimeout = this.intermediaryPartitionTimeout;
retVal.globalIngestionHeapLimitBytes =
this.globalIngestionHeapLimitBytes;
retVal.numConcurrentMerges = this.numConcurrentMerges;
+ retVal.startAlwaysEnabled = this.startAlwaysEnabled;
return retVal;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]