aho135 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3164946199


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -259,6 +262,42 @@ public void validateSpecUpdateTo(SupervisorSpec 
proposedSpec) throws DruidExcept
     if (!this.getSource().equals(other.getSource())) {
       throw InvalidInput.exception(ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE, 
this.getSource(), other.getSource());
     }
+
+    // Validate bounded stream configuration
+    validateBoundedStreamConfig(other);
+  }
+
+  /**
+   * Validates bounded stream configuration for the supervisor spec.
+   *
+   * @param spec the supervisor spec to validate
+   * @throws DruidException if the bounded stream configuration is invalid
+   */
+  protected void validateBoundedStreamConfig(SeekableStreamSupervisorSpec 
spec) throws DruidException
+  {
+    SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
+
+    if (ioConfig.isBounded()) {
+      // Validate partition consistency
+      BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+      if 
(!boundedConfig.getStartSequenceNumbers().keySet().equals(boundedConfig.getEndSequenceNumbers().keySet()))
 {
+        throw InvalidInput.exception(
+            "Bounded stream config has mismatched partitions. Start: %s, End: 
%s",
+            boundedConfig.getStartSequenceNumbers().keySet(),
+            boundedConfig.getEndSequenceNumbers().keySet()
+        );
+      }
+
+      // Warn if useConcurrentLocks is not enabled
+      Map<String, Object> context = spec.getContext();
+      if (context == null || 
!Boolean.TRUE.equals(context.get("useConcurrentLocks"))) {
+        log.warn(
+            "Bounded stream processing without 'useConcurrentLocks=true' may 
fail " +
+            "if other supervisors are running or segments already exist for 
these intervals. " +
+            "Consider setting useConcurrentLocks=true in the supervisor 
context."
+        );

Review Comment:
   Yeah, this is just logged as a warning and is not a strict requirement. In 
the reset and backfill API that would be a strict enforcement



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to