FrankChen021 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3141594283
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4255,6 +4418,23 @@ private OrderedSequenceNumber<SequenceOffsetType>
getOffsetFromStorageForPartiti
}
Review Comment:
P1 Bounded starts can be ignored when metadata exists
getOffsetFromStorageForPartition only falls back to
boundedStreamConfig.startSequenceNumbers when no metadata/checkpoint offset
exists. If a supervisor is reset or reconfigured with a requested bounded start
while metadata storage still has an older offset, the stored metadata wins and
the task starts from the stale position instead of the user-supplied bounded
start. That can skip the requested backfill range or process a different
interval than configured. Bounded mode should either clear/namespace old
metadata for the run or explicitly prefer the configured start when
initializing the bounded task group.
##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java:
##########
@@ -381,6 +381,28 @@ protected boolean isShardExpirationMarker(String seqNum)
return KinesisSequenceNumber.EXPIRED_MARKER.equals(seqNum);
}
+ @Override
+ protected boolean isOffsetAtOrBeyond(String current, String target)
+ {
+ // Kinesis sequence numbers are comparable strings
+ // They can be compared lexicographically to determine order
+ return current.compareTo(target) >= 0;
+ }
+
+ @Override
+ protected String createPartitionIdFromString(String partitionIdString)
+ {
+ // Kinesis uses String as partition ID, so just return the string as-is
+ return partitionIdString;
+ }
+
Review Comment:
P1 Kinesis bounded completion compares offsets lexicographically
isOffsetAtOrBeyond uses current.compareTo(target), which compares Kinesis
sequence numbers lexicographically. Kinesis sequence numbers are decimal
strings whose numeric order is not generally the same as lexicographic order,
especially across different digit lengths. A bounded supervisor can mark a
shard complete before the numeric end offset is reached, or keep recreating
tasks after it passed the end. Compare Kinesis offsets using their numeric
value or the existing Kinesis sequence-number ordering semantics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]