FrankChen021 commented on code in PR #19541:
URL: https://github.com/apache/druid/pull/19541#discussion_r3341103411


##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java:
##########
@@ -185,6 +186,29 @@ public Duration getOffsetFetchPeriod()
     return offsetFetchPeriod;
   }
 
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    RabbitStreamSupervisorTuningConfig that = 
(RabbitStreamSupervisorTuningConfig) o;
+    return Objects.equals(workerThreads, that.workerThreads)

Review Comment:
   [P1] Rabbit tuning changes can skip required restarts
   
   This equality method now drives 
SeekableStreamSupervisorSpec.requireRestart(), but it only compares the 
supervisor-level fields after super.equals(). RabbitStreamIndexTaskTuningConfig 
declares recordBufferSize, recordBufferOfferTimeout, and maxRecordsPerPoll, and 
that intermediate class does not override equals/hashCode, so changes to those 
tuning knobs compare equal. With skipRestartIfUnmodified=true, such a changed 
spec can be persisted without recreating the Rabbit supervisor, leaving the 
running supervisor on the old tuning. Include the Rabbit task-tuning fields in 
equality/hashCode, or implement equality on RabbitStreamIndexTaskTuningConfig, 
and add a restart-decision test.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java:
##########
@@ -166,6 +166,9 @@ public Response specPost(
           }
 
           if (Boolean.TRUE.equals(skipRestartIfUnmodified) && 
!manager.shouldUpdateSupervisor(spec)) {
+            // No restart needed, but still persist the spec if it actually 
changed (e.g. a taskCount
+            // change under autoscaling) so the metadata store reflects the 
latest submission.
+            manager.updateSupervisorSpecWithoutRestart(spec);

Review Comment:
   [P2] Persisted no-restart updates are not audited
   
   This branch can now mutate metadata by calling 
updateSupervisorSpecWithoutRestart, but it returns before the existing 
auditManager.doAudit block. A taskCount-only autoscaling update with 
skipRestartIfUnmodified=true is therefore persisted and exposed as the latest 
supervisor spec without any supervisor audit entry, unlike normal spec updates. 
Please audit when updateSupervisorSpecWithoutRestart returns true, or move the 
audit logic to cover both mutating paths.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -186,14 +186,69 @@ public boolean 
createOrUpdateAndStartSupervisor(SupervisorSpec spec)
 
     synchronized (lock) {
       Preconditions.checkState(started, "SupervisorManager not started");
-      final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
+      // Persist whenever the spec actually changed (or is new) — independent 
of whether a restart is
+      // required. This stops/recreates the supervisor regardless; persistence 
must not be gated on the
+      // restart decision, otherwise a no-restart change (e.g. taskCount under 
autoscaling) would be
+      // applied to the running supervisor but lost from the metadata store.
+      final boolean specChanged = isSpecChangedAndValidate(spec);
       SupervisorSpec existingSpec = 
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
       spec.merge(existingSpec);
-      createAndStartSupervisorInternal(spec, shouldUpdateSpec);
-      return shouldUpdateSpec;
+      createAndStartSupervisorInternal(spec, specChanged);
+      return specChanged;
     }
   }
 
+  /**
+   * Persists a changed spec and updates the in-memory spec reference 
<em>without</em> stopping or
+   * recreating the running supervisor. Used when a re-submitted spec does not 
require a restart (see
+   * {@link #shouldUpdateSupervisor}) but should still be recorded so the 
metadata store reflects the
+   * latest submission. No-op (returns false) if the spec is byte-identical to 
the running spec.
+   */
+  public boolean updateSupervisorSpecWithoutRestart(SupervisorSpec spec)
+  {
+    Preconditions.checkState(started, "SupervisorManager not started");
+    Preconditions.checkNotNull(spec, "spec");
+    Preconditions.checkNotNull(spec.getId(), "spec.getId()");
+    Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()");
+
+    synchronized (lock) {
+      Preconditions.checkState(started, "SupervisorManager not started");
+      final Pair<Supervisor, SupervisorSpec> current = 
supervisors.get(spec.getId());
+      if (current == null || current.rhs == null || 
!isSpecChangedAndValidate(spec)) {

Review Comment:
   [P2] No-restart persistence can use a stale restart decision
   
   updateSupervisorSpecWithoutRestart re-checks only whether the spec bytes 
changed, not whether the current supervisor still agrees that no restart is 
required. SupervisorResource calls shouldUpdateSupervisor and this method as 
two separate synchronized operations, so a concurrent POST can change the 
current spec between them; this method may then persist a spec that requires a 
restart relative to the new current supervisor. Re-check 
spec.requireRestart(current.rhs) under this lock, or expose a single atomic 
manager method for the no-restart path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to