FrankChen021 commented on code in PR #19541:
URL: https://github.com/apache/druid/pull/19541#discussion_r3341103411
##########
extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTuningConfig.java:
##########
@@ -185,6 +186,29 @@ public Duration getOffsetFetchPeriod()
return offsetFetchPeriod;
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ RabbitStreamSupervisorTuningConfig that =
(RabbitStreamSupervisorTuningConfig) o;
+ return Objects.equals(workerThreads, that.workerThreads)
Review Comment:
[P1] Rabbit tuning changes can skip required restarts
This equality method now drives
SeekableStreamSupervisorSpec.requireRestart(), but it only compares the
supervisor-level fields after super.equals(). RabbitStreamIndexTaskTuningConfig
declares recordBufferSize, recordBufferOfferTimeout, and maxRecordsPerPoll, and
that intermediate class does not override equals/hashCode, so changes to those
tuning knobs compare equal. With skipRestartIfUnmodified=true, such a changed
spec can be persisted without recreating the Rabbit supervisor, leaving the
running supervisor on the old tuning. Include the Rabbit task-tuning fields in
equality/hashCode, or implement equality on RabbitStreamIndexTaskTuningConfig,
and add a restart-decision test.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java:
##########
@@ -166,6 +166,9 @@ public Response specPost(
}
if (Boolean.TRUE.equals(skipRestartIfUnmodified) &&
!manager.shouldUpdateSupervisor(spec)) {
+ // No restart needed, but still persist the spec if it actually
changed (e.g. a taskCount
+ // change under autoscaling) so the metadata store reflects the
latest submission.
+ manager.updateSupervisorSpecWithoutRestart(spec);
Review Comment:
[P2] Persisted no-restart updates are not audited
This branch can now mutate metadata by calling
updateSupervisorSpecWithoutRestart, but it returns before the existing
auditManager.doAudit block. A taskCount-only autoscaling update with
skipRestartIfUnmodified=true is therefore persisted and exposed as the latest
supervisor spec without any supervisor audit entry, unlike normal spec updates.
Please audit when updateSupervisorSpecWithoutRestart returns true, or move the
audit logic to cover both mutating paths.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -186,14 +186,69 @@ public boolean
createOrUpdateAndStartSupervisor(SupervisorSpec spec)
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
- final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
+ // Persist whenever the spec actually changed (or is new) — independent
of whether a restart is
+ // required. This stops/recreates the supervisor regardless; persistence
must not be gated on the
+ // restart decision, otherwise a no-restart change (e.g. taskCount under
autoscaling) would be
+ // applied to the running supervisor but lost from the metadata store.
+ final boolean specChanged = isSpecChangedAndValidate(spec);
SupervisorSpec existingSpec =
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
spec.merge(existingSpec);
- createAndStartSupervisorInternal(spec, shouldUpdateSpec);
- return shouldUpdateSpec;
+ createAndStartSupervisorInternal(spec, specChanged);
+ return specChanged;
}
}
+ /**
+ * Persists a changed spec and updates the in-memory spec reference
<em>without</em> stopping or
+ * recreating the running supervisor. Used when a re-submitted spec does not
require a restart (see
+ * {@link #shouldUpdateSupervisor}) but should still be recorded so the
metadata store reflects the
+ * latest submission. No-op (returns false) if the spec is byte-identical to
the running spec.
+ */
+ public boolean updateSupervisorSpecWithoutRestart(SupervisorSpec spec)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ Preconditions.checkNotNull(spec, "spec");
+ Preconditions.checkNotNull(spec.getId(), "spec.getId()");
+ Preconditions.checkNotNull(spec.getDataSources(), "spec.getDatasources()");
+
+ synchronized (lock) {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ final Pair<Supervisor, SupervisorSpec> current =
supervisors.get(spec.getId());
+ if (current == null || current.rhs == null ||
!isSpecChangedAndValidate(spec)) {
Review Comment:
[P2] No-restart persistence can use a stale restart decision
updateSupervisorSpecWithoutRestart re-checks only whether the spec bytes
changed, not whether the current supervisor still agrees that no restart is
required. SupervisorResource calls shouldUpdateSupervisor and this method as
two separate synchronized operations, so a concurrent POST can change the
current spec between them; this method may then persist a spec that requires a
restart relative to the new current supervisor. Re-check
spec.requireRestart(current.rhs) under this lock, or expose a single atomic
manager method for the no-restart path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]