This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b44216150b3 fix: MM and Indexer should remember disabled status.
(#19368)
b44216150b3 is described below
commit b44216150b3acc617ddaa358b3e797bb55f4d608
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Apr 23 17:06:39 2026 -0700
fix: MM and Indexer should remember disabled status. (#19368)
Currently when a MiddleManager or Indexer is disabled via API, the
state is stored only in-memory. If it is restarted, the disabled flag
is cleared. This is not desirable; it should only be cleared by an
operator explicitly re-enabling the server.
This patch fixes it by saving the state to a file.
---
.../druid/indexing/worker/WorkerTaskManager.java | 71 ++++++++++++++++++--
.../indexing/worker/WorkerTaskManagerTest.java | 78 +++++++++++++++++++++-
2 files changed, 142 insertions(+), 7 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index 8b115337679..ae9ecd7453b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.worker;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@@ -83,6 +84,7 @@ import java.util.stream.Collectors;
*/
public class WorkerTaskManager implements IndexerTaskCountStatsProvider
{
+ public static final String STATE_FILE = "workerState.json";
private static final EmittingLogger log = new
EmittingLogger(WorkerTaskManager.class);
private final ObjectMapper jsonMapper;
@@ -105,6 +107,11 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
private final ScheduledExecutorService completedTasksCleanupExecutor;
+ /**
+ * Whether this worker is disabled (i.e., not accepting new tasks).
Persisted to
+ * {@link #STATE_FILE} under {@link #storageDir} so the flag survives
+ * process restarts.
+ */
private final AtomicBoolean disabled = new AtomicBoolean(false);
private final OverlordClient overlordClient;
@@ -138,6 +145,7 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
try {
log.debug("Starting...");
cleanupAndMakeTmpTaskDir();
+ loadStateFile();
registerLocationListener();
restoreRestorableTasks();
initAssignedTasks();
@@ -340,6 +348,51 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
return new File(storageDir, "assignedTasks");
}
+ /**
+ * Full path to {@link #STATE_FILE}.
+ */
+ public File getStateFile()
+ {
+ return new File(storageDir, STATE_FILE);
+ }
+
+ /**
+ * Read {@link #STATE_FILE} and initialize {@link #disabled}.
+ */
+ private void loadStateFile()
+ {
+ final File stateFile = getStateFile();
+ if (stateFile.exists()) {
+ try {
+ final WorkerState state = jsonMapper.readValue(stateFile,
WorkerState.class);
+ disabled.set(state.disabled());
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to read disabled state from[%s]. Starting as
enabled.", stateFile);
+ }
+ }
+ }
+
+ /**
+ * Write {@link #disabled} to {@link #STATE_FILE}.
+ */
+ private void writeStateFile()
+ {
+ final File stateFile = getStateFile();
+ try {
+ FileUtils.writeAtomically(
+ stateFile,
+ out -> {
+ jsonMapper.writeValue(out, new WorkerState(disabled.get()));
+ return null;
+ }
+ );
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to persist state file[%s].", stateFile);
+ }
+ }
+
private void initAssignedTasks() throws IOException
{
File assignedTaskDir = getAssignedTaskDir();
@@ -517,18 +570,20 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
public void workerEnabled()
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS),
"not started");
-
- if (disabled.compareAndSet(true, false)) {
- changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(false));
- }
+ setDisabled(false);
}
public void workerDisabled()
{
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS),
"not started");
+ setDisabled(true);
+ }
- if (disabled.compareAndSet(false, true)) {
- changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(true));
+ private void setDisabled(boolean newValue)
+ {
+ if (disabled.compareAndSet(!newValue, newValue)) {
+ changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(newValue));
+ writeStateFile();
}
}
@@ -656,6 +711,10 @@ public class WorkerTaskManager implements
IndexerTaskCountStatsProvider
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
}
+ record WorkerState(@JsonProperty("disabled") boolean disabled)
+ {
+ }
+
private static class TaskDetails
{
private final Task task;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index d10f33fd146..157dab56628 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -72,6 +72,7 @@ import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -113,9 +114,14 @@ public class WorkerTaskManagerTest
}
private WorkerTaskManager createWorkerTaskManager()
+ {
+ return createWorkerTaskManager(FileUtils.createTempDir());
+ }
+
+ private WorkerTaskManager createWorkerTaskManager(File baseDir)
{
TaskConfig taskConfig = new TaskConfigBuilder()
- .setBaseDir(FileUtils.createTempDir().toString())
+ .setBaseDir(baseDir.toString())
.setRestoreTasksOnRestart(restoreTasksOnRestart)
.build();
@@ -550,4 +556,74 @@ public class WorkerTaskManagerTest
));
Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
}
+
+ @Test
+ public void test_disabledState_persistsAcrossRestart() throws Exception
+ {
+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+ EasyMock.replay(overlordClient);
+
+ final File baseTaskDir = FileUtils.createTempDir();
+
+ workerTaskManager = createWorkerTaskManager(baseTaskDir);
+ workerTaskManager.start();
+ Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+ workerTaskManager.workerDisabled();
+ Assert.assertFalse(workerTaskManager.isWorkerEnabled());
+ Assert.assertTrue(workerTaskManager.getStateFile().exists());
+ workerTaskManager.stop();
+
+ workerTaskManager = createWorkerTaskManager(baseTaskDir);
+ workerTaskManager.start();
+ Assert.assertFalse(workerTaskManager.isWorkerEnabled());
+
+ final ChangeRequestsSnapshot<WorkerHistoryItem> history =
+ workerTaskManager.getChangesSince(new ChangeRequestHistory.Counter(-1,
0)).get();
+ Assert.assertTrue(((WorkerHistoryItem.Metadata)
history.getRequests().get(0)).isDisabled());
+ }
+
+ @Test
+ public void test_disabledState_reEnablePersistsAcrossRestart() throws
Exception
+ {
+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+ EasyMock.replay(overlordClient);
+
+ final File baseTaskDir = FileUtils.createTempDir();
+
+ workerTaskManager = createWorkerTaskManager(baseTaskDir);
+ workerTaskManager.start();
+ workerTaskManager.workerDisabled();
+ workerTaskManager.workerEnabled();
+ Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+ workerTaskManager.stop();
+
+ workerTaskManager = createWorkerTaskManager(baseTaskDir);
+ workerTaskManager.start();
+ Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+ }
+
+ @Test
+ public void test_disabledState_defaultsToEnabledWhenNoFile() throws Exception
+ {
+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+ EasyMock.replay(overlordClient);
+
+ workerTaskManager.start();
+ Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+ }
+
+ @Test
+ public void test_disabledState_malformedFileToleratedAndStartsEnabled()
throws Exception
+ {
+
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+ EasyMock.replay(overlordClient);
+
+ workerTaskManager = createWorkerTaskManager();
+ final File stateFile = workerTaskManager.getStateFile();
+ FileUtils.mkdirp(stateFile.getParentFile());
+ Files.write(stateFile.toPath(), "not valid
json".getBytes(StandardCharsets.UTF_8));
+
+ workerTaskManager.start();
+ Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]