aho135 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3164946199
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -259,6 +262,42 @@ public void validateSpecUpdateTo(SupervisorSpec
proposedSpec) throws DruidExcept
if (!this.getSource().equals(other.getSource())) {
throw InvalidInput.exception(ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE,
this.getSource(), other.getSource());
}
+
+ // Validate bounded stream configuration
+ validateBoundedStreamConfig(other);
+ }
+
+ /**
+ * Validates bounded stream configuration for the supervisor spec.
+ *
+ * @param spec the supervisor spec to validate
+ * @throws DruidException if the bounded stream configuration is invalid
+ */
+ protected void validateBoundedStreamConfig(SeekableStreamSupervisorSpec
spec) throws DruidException
+ {
+ SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
+
+ if (ioConfig.isBounded()) {
+ // Validate partition consistency
+ BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+ if
(!boundedConfig.getStartSequenceNumbers().keySet().equals(boundedConfig.getEndSequenceNumbers().keySet()))
{
+ throw InvalidInput.exception(
+ "Bounded stream config has mismatched partitions. Start: %s, End:
%s",
+ boundedConfig.getStartSequenceNumbers().keySet(),
+ boundedConfig.getEndSequenceNumbers().keySet()
+ );
+ }
+
+ // Warn if useConcurrentLocks is not enabled
+ Map<String, Object> context = spec.getContext();
+ if (context == null ||
!Boolean.TRUE.equals(context.get("useConcurrentLocks"))) {
+ log.warn(
+ "Bounded stream processing without 'useConcurrentLocks=true' may
fail " +
+ "if other supervisors are running or segments already exist for
these intervals. " +
+ "Consider setting useConcurrentLocks=true in the supervisor
context."
+ );
Review Comment:
Yeah, this is just logged as a warning and is not a strict requirement. In
the reset and backfill API that would be a strict enforcement
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]