This is an automated email from the ASF dual-hosted git repository.

Fly-Style pushed a commit to branch feat/stop-gracefully
in repository https://gitbox.apache.org/repos/asf/druid.git

commit 2f26286523e3c37382c84fa237039ab7d8c09a17
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Mon Jun 8 16:23:22 2026 +0300

    Introduce stopGracefullyOnNewSpec
---
 .../overlord/supervisor/SupervisorManager.java     | 26 ++++++----
 .../supervisor/SeekableStreamSupervisor.java       |  6 +++
 .../overlord/supervisor/SupervisorManagerTest.java | 58 ++++++++++++++++++++++
 .../SeekableStreamSupervisorStateTest.java         | 14 ++++++
 .../indexing/overlord/supervisor/Supervisor.java   |  9 ++++
 5 files changed, 104 insertions(+), 9 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index fa7d96634ae..d64d3f6ca4c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -377,7 +377,7 @@ public class SupervisorManager implements 
SupervisorStatsProvider
    * Resets a supervisor to the latest stream offsets and starts a bounded 
backfill supervisor to
    * process the skipped range from the previously checkpointed offsets up to 
the latest offsets.
    *
-   * @param id               supervisor ID
+   * @param id                supervisor ID
    * @param backfillTaskCount number of tasks for the backfill supervisor, or 
null to inherit from the source spec
    * @return map with {@code "id"} (the original supervisor ID) and {@code 
"backfillSupervisorId"}
    * @throws IllegalArgumentException if the supervisor is not a {@link 
SeekableStreamSupervisor},
@@ -424,10 +424,20 @@ public class SupervisorManager implements 
SupervisorStatsProvider
     String backfillSupervisorId = IdUtils.getRandomIdWithPrefix(id + 
"_backfill");
 
     try {
-      Map<String, Object> normalizedStartOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(startOffsets), Map.class);
-      Map<String, Object> normalizedEndOffsets = 
jsonMapper.readValue(jsonMapper.writeValueAsString(endOffsets), Map.class);
+      Map<String, Object> normalizedStartOffsets = jsonMapper.readValue(
+          jsonMapper.writeValueAsString(startOffsets),
+          Map.class
+      );
+      Map<String, Object> normalizedEndOffsets = jsonMapper.readValue(
+          jsonMapper.writeValueAsString(endOffsets),
+          Map.class
+      );
       BoundedStreamConfig boundedStreamConfig = new 
BoundedStreamConfig(normalizedStartOffsets, normalizedEndOffsets);
-      SupervisorSpec backfillSpec = 
streamSpec.createBackfillSpec(backfillSupervisorId, boundedStreamConfig, 
backfillTaskCount);
+      SupervisorSpec backfillSpec = streamSpec.createBackfillSpec(
+          backfillSupervisorId,
+          boundedStreamConfig,
+          backfillTaskCount
+      );
       createOrUpdateAndStartSupervisor(backfillSpec);
     }
     catch (JsonProcessingException e) {
@@ -615,12 +625,10 @@ public class SupervisorManager implements 
SupervisorStatsProvider
     }
 
     if (writeTombstone) {
-      metadataSupervisorManager.insert(
-          id,
-          new NoopSupervisorSpec(null, pair.rhs.getDataSources())
-      ); // where NoopSupervisorSpec is a tombstone
+      // NoopSupervisorSpec is a tombstone
+      metadataSupervisorManager.insert(id, new NoopSupervisorSpec(null, 
pair.rhs.getDataSources()));
     }
-    pair.lhs.stop(true);
+    pair.lhs.stop(pair.lhs.stopGracefullyOnNewSpec());
     supervisors.remove(id);
 
     SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 74329c68e1d..ae38f788e10 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -1285,6 +1285,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  @Override
+  public boolean stopGracefullyOnNewSpec()
+  {
+    return true;
+  }
+
   @Override
   public void stop(boolean stopGracefully)
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
index 199e004b424..01d88868bee 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java
@@ -135,6 +135,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     resetAll();
     supervisor2.start();
     
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true);
     supervisor1.stop(true);
     replayAll();
 
@@ -145,6 +146,7 @@ public class SupervisorManagerTest extends EasyMockSupport
 
     resetAll();
     metadataSupervisorManager.insert(EasyMock.eq("id1"), 
EasyMock.anyObject(NoopSupervisorSpec.class));
+    EasyMock.expect(supervisor2.stopGracefullyOnNewSpec()).andReturn(true);
     supervisor2.stop(true);
     replayAll();
 
@@ -580,6 +582,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     metadataSupervisorManager.insert(EasyMock.eq("id1"), 
EasyMock.capture(capturedInsert));
     supervisor1.start();
     
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true);
     supervisor1.stop(true);
     
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     supervisor2.start();
@@ -729,6 +732,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     metadataSupervisorManager.insert(EasyMock.eq("id1"), 
EasyMock.capture(capturedInsert));
     supervisor2.start();
     
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true);
     supervisor1.stop(true);
     replayAll();
 
@@ -742,6 +746,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     // in TestSupervisorSpec implementation of createRunningSpec
     resetAll();
     metadataSupervisorManager.insert(EasyMock.eq("id1"), 
EasyMock.capture(capturedInsert));
+    EasyMock.expect(supervisor2.stopGracefullyOnNewSpec()).andReturn(true);
     supervisor2.stop(true);
     supervisor1.start();
     
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
@@ -756,6 +761,7 @@ public class SupervisorManagerTest extends EasyMockSupport
     // mock stop of suspended then resumed supervisor
     resetAll();
     metadataSupervisorManager.insert(EasyMock.eq("id1"), 
EasyMock.anyObject(NoopSupervisorSpec.class));
+    EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(true);
     supervisor1.stop(true);
     replayAll();
 
@@ -778,6 +784,58 @@ public class SupervisorManagerTest extends EasyMockSupport
     Assert.assertTrue(manager.getSupervisorIds().isEmpty());
   }
 
+  @Test
+  public void testStopGracefullyOnNewSpecFalseUsesNonGracefulStop()
+  {
+    SupervisorSpec spec = new TestSupervisorSpec("id1", supervisor1);
+    SupervisorSpec spec2 = new TestSupervisorSpec("id1", supervisor2);
+    Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
+        "id3", new TestSupervisorSpec("id3", supervisor3)
+    );
+
+    
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
+    metadataSupervisorManager.insert("id1", spec);
+    supervisor3.start();
+    
EasyMock.expect(supervisor3.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    supervisor1.start();
+    
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    replayAll();
+
+    manager.start();
+    manager.createOrUpdateAndStartSupervisor(spec);
+    verifyAll();
+
+    // spec update: supervisor1 opts out of graceful stop-on-new-spec, so it 
is stopped with stop(false), leaving its
+    // managed tasks running for the replacement supervisor to reconcile.
+    resetAll();
+    supervisor2.start();
+    
EasyMock.expect(supervisor2.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
+    EasyMock.expect(supervisor1.stopGracefullyOnNewSpec()).andReturn(false);
+    supervisor1.stop(false);
+    replayAll();
+
+    manager.createOrUpdateAndStartSupervisor(spec2);
+    verifyAll();
+
+    // terminate also honors the supervisor's preference; a supervisor that 
opts out of graceful stop is stopped with
+    // stop(false).
+    resetAll();
+    metadataSupervisorManager.insert(EasyMock.eq("id1"), 
EasyMock.anyObject(NoopSupervisorSpec.class));
+    EasyMock.expect(supervisor2.stopGracefullyOnNewSpec()).andReturn(false);
+    supervisor2.stop(false);
+    replayAll();
+
+    Assert.assertTrue(manager.stopAndRemoveSupervisor("id1"));
+    verifyAll();
+  }
+
+  @Test
+  public void testStopGracefullyOnNewSpecDefaultsToFalse()
+  {
+    final Supervisor supervisor = new NoopSupervisorSpec(null, 
ImmutableList.of("ds")).createSupervisor();
+    Assert.assertFalse(supervisor.stopGracefullyOnNewSpec());
+  }
+
   @Test
   public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 9e45920ad71..a1835f17706 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -247,6 +247,20 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testStopGracefullyOnNewSpecReturnsTrue()
+  {
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    // SeekableStreamSupervisor retains the historical graceful stop-and-roll 
behavior on a spec update.
+    Assert.assertTrue(supervisor.stopGracefullyOnNewSpec());
+
+    verifyAll();
+  }
+
   @Test
   public void testRunningStreamGetSequenceNumberReturnsNull()
   {
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index ce6c87e5e08..084eca20804 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -47,6 +47,14 @@ public interface Supervisor
    */
   void stop(boolean stopGracefully);
 
+  /**
+   * Indicates whether this supervisor should be stopped gracefully when its 
spec is updated/suspended/resumed
+   */
+  default boolean stopGracefullyOnNewSpec()
+  {
+    return false;
+  }
+
   /**
    * Starts non-graceful shutdown of the supervisor and returns a future that 
completes when shutdown is complete.
    */
@@ -94,6 +102,7 @@ public interface Supervisor
 
   /**
    * Resets any stored metadata by the supervisor.
+   *
    * @param dataSourceMetadata optional dataSource metadata.
    */
   void reset(@Nullable DataSourceMetadata dataSourceMetadata);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to