This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b44216150b3 fix: MM and Indexer should remember disabled status. 
(#19368)
b44216150b3 is described below

commit b44216150b3acc617ddaa358b3e797bb55f4d608
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Apr 23 17:06:39 2026 -0700

    fix: MM and Indexer should remember disabled status. (#19368)
    
    Currently when a MiddleManager or Indexer is disabled via API, the
    state is stored only in-memory. If it is restarted, the disabled flag
    is cleared. This is not desirable; it should only be cleared by an
    operator explicitly re-enabling the server.
    
    This patch fixes it by saving the state to a file.
---
 .../druid/indexing/worker/WorkerTaskManager.java   | 71 ++++++++++++++++++--
 .../indexing/worker/WorkerTaskManagerTest.java     | 78 +++++++++++++++++++++-
 2 files changed, 142 insertions(+), 7 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index 8b115337679..ae9ecd7453b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.worker;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
@@ -83,6 +84,7 @@ import java.util.stream.Collectors;
  */
 public class WorkerTaskManager implements IndexerTaskCountStatsProvider
 {
+  public static final String STATE_FILE = "workerState.json";
   private static final EmittingLogger log = new 
EmittingLogger(WorkerTaskManager.class);
 
   private final ObjectMapper jsonMapper;
@@ -105,6 +107,11 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
 
   private final ScheduledExecutorService completedTasksCleanupExecutor;
 
+  /**
+   * Whether this worker is disabled (i.e., not accepting new tasks). 
Persisted to
+   * {@link #STATE_FILE} under {@link #storageDir} so the flag survives
+   * process restarts.
+   */
   private final AtomicBoolean disabled = new AtomicBoolean(false);
 
   private final OverlordClient overlordClient;
@@ -138,6 +145,7 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
       try {
         log.debug("Starting...");
         cleanupAndMakeTmpTaskDir();
+        loadStateFile();
         registerLocationListener();
         restoreRestorableTasks();
         initAssignedTasks();
@@ -340,6 +348,51 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
     return new File(storageDir, "assignedTasks");
   }
 
+  /**
+   * Full path to {@link #STATE_FILE}.
+   */
+  public File getStateFile()
+  {
+    return new File(storageDir, STATE_FILE);
+  }
+
+  /**
+   * Read {@link #STATE_FILE} and initialize {@link #disabled}.
+   */
+  private void loadStateFile()
+  {
+    final File stateFile = getStateFile();
+    if (stateFile.exists()) {
+      try {
+        final WorkerState state = jsonMapper.readValue(stateFile, 
WorkerState.class);
+        disabled.set(state.disabled());
+      }
+      catch (Exception e) {
+        log.warn(e, "Failed to read disabled state from[%s]. Starting as 
enabled.", stateFile);
+      }
+    }
+  }
+
+  /**
+   * Write {@link #disabled} to {@link #STATE_FILE}.
+   */
+  private void writeStateFile()
+  {
+    final File stateFile = getStateFile();
+    try {
+      FileUtils.writeAtomically(
+          stateFile,
+          out -> {
+            jsonMapper.writeValue(out, new WorkerState(disabled.get()));
+            return null;
+          }
+      );
+    }
+    catch (Exception e) {
+      log.warn(e, "Failed to persist state file[%s].", stateFile);
+    }
+  }
+
   private void initAssignedTasks() throws IOException
   {
     File assignedTaskDir = getAssignedTaskDir();
@@ -517,18 +570,20 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
   public void workerEnabled()
   {
     Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), 
"not started");
-
-    if (disabled.compareAndSet(true, false)) {
-      changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(false));
-    }
+    setDisabled(false);
   }
 
   public void workerDisabled()
   {
     Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.SECONDS), 
"not started");
+    setDisabled(true);
+  }
 
-    if (disabled.compareAndSet(false, true)) {
-      changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(true));
+  private void setDisabled(boolean newValue)
+  {
+    if (disabled.compareAndSet(!newValue, newValue)) {
+      changeHistory.addChangeRequest(new WorkerHistoryItem.Metadata(newValue));
+      writeStateFile();
     }
   }
 
@@ -656,6 +711,10 @@ public class WorkerTaskManager implements 
IndexerTaskCountStatsProvider
             .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)).values(), TaskAnnouncement::getTaskDataSource);
   }
 
+  record WorkerState(@JsonProperty("disabled") boolean disabled)
+  {
+  }
+
   private static class TaskDetails
   {
     private final Task task;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index d10f33fd146..157dab56628 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -72,6 +72,7 @@ import org.junit.runners.Parameterized;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -113,9 +114,14 @@ public class WorkerTaskManagerTest
   }
 
   private WorkerTaskManager createWorkerTaskManager()
+  {
+    return createWorkerTaskManager(FileUtils.createTempDir());
+  }
+
+  private WorkerTaskManager createWorkerTaskManager(File baseDir)
   {
     TaskConfig taskConfig = new TaskConfigBuilder()
-        .setBaseDir(FileUtils.createTempDir().toString())
+        .setBaseDir(baseDir.toString())
         .setRestoreTasksOnRestart(restoreTasksOnRestart)
         .build();
 
@@ -550,4 +556,74 @@ public class WorkerTaskManagerTest
     ));
     Assert.assertEquals(workerTaskManager.getWorkerAssignedTasks().size(), 0L);
   }
+
+  @Test
+  public void test_disabledState_persistsAcrossRestart() throws Exception
+  {
+    
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+    EasyMock.replay(overlordClient);
+
+    final File baseTaskDir = FileUtils.createTempDir();
+
+    workerTaskManager = createWorkerTaskManager(baseTaskDir);
+    workerTaskManager.start();
+    Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+    workerTaskManager.workerDisabled();
+    Assert.assertFalse(workerTaskManager.isWorkerEnabled());
+    Assert.assertTrue(workerTaskManager.getStateFile().exists());
+    workerTaskManager.stop();
+
+    workerTaskManager = createWorkerTaskManager(baseTaskDir);
+    workerTaskManager.start();
+    Assert.assertFalse(workerTaskManager.isWorkerEnabled());
+
+    final ChangeRequestsSnapshot<WorkerHistoryItem> history =
+        workerTaskManager.getChangesSince(new ChangeRequestHistory.Counter(-1, 
0)).get();
+    Assert.assertTrue(((WorkerHistoryItem.Metadata) 
history.getRequests().get(0)).isDisabled());
+  }
+
+  @Test
+  public void test_disabledState_reEnablePersistsAcrossRestart() throws 
Exception
+  {
+    
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+    EasyMock.replay(overlordClient);
+
+    final File baseTaskDir = FileUtils.createTempDir();
+
+    workerTaskManager = createWorkerTaskManager(baseTaskDir);
+    workerTaskManager.start();
+    workerTaskManager.workerDisabled();
+    workerTaskManager.workerEnabled();
+    Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+    workerTaskManager.stop();
+
+    workerTaskManager = createWorkerTaskManager(baseTaskDir);
+    workerTaskManager.start();
+    Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+  }
+
+  @Test
+  public void test_disabledState_defaultsToEnabledWhenNoFile() throws Exception
+  {
+    
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+    EasyMock.replay(overlordClient);
+
+    workerTaskManager.start();
+    Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+  }
+
+  @Test
+  public void test_disabledState_malformedFileToleratedAndStartsEnabled() 
throws Exception
+  {
+    
EasyMock.expect(overlordClient.withRetryPolicy(EasyMock.anyObject())).andReturn(overlordClient).anyTimes();
+    EasyMock.replay(overlordClient);
+
+    workerTaskManager = createWorkerTaskManager();
+    final File stateFile = workerTaskManager.getStateFile();
+    FileUtils.mkdirp(stateFile.getParentFile());
+    Files.write(stateFile.toPath(), "not valid 
json".getBytes(StandardCharsets.UTF_8));
+
+    workerTaskManager.start();
+    Assert.assertTrue(workerTaskManager.isWorkerEnabled());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to