FrankChen021 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3141594283


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4255,6 +4418,23 @@ private OrderedSequenceNumber<SequenceOffsetType> 
getOffsetFromStorageForPartiti
       }

Review Comment:
   P1 Bounded starts can be ignored when metadata exists
   
   getOffsetFromStorageForPartition only falls back to 
boundedStreamConfig.startSequenceNumbers when no metadata/checkpoint offset 
exists. If a supervisor is reset or reconfigured with a requested bounded start 
while metadata storage still has an older offset, the stored metadata wins and 
the task starts from the stale position instead of the user-supplied bounded 
start. That can skip the requested backfill range or process a different 
interval than configured. Bounded mode should either clear/namespace old 
metadata for the run or explicitly prefer the configured start when 
initializing the bounded task group.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java:
##########
@@ -381,6 +381,28 @@ protected boolean isShardExpirationMarker(String seqNum)
     return KinesisSequenceNumber.EXPIRED_MARKER.equals(seqNum);
   }
 
+  @Override
+  protected boolean isOffsetAtOrBeyond(String current, String target)
+  {
+    // Kinesis sequence numbers are comparable strings
+    // They can be compared lexicographically to determine order
+    return current.compareTo(target) >= 0;
+  }
+
+  @Override
+  protected String createPartitionIdFromString(String partitionIdString)
+  {
+    // Kinesis uses String as partition ID, so just return the string as-is
+    return partitionIdString;
+  }
+

Review Comment:
   P1 Kinesis bounded completion compares offsets lexicographically
   
   isOffsetAtOrBeyond uses current.compareTo(target), which compares Kinesis 
sequence numbers lexicographically. Kinesis sequence numbers are decimal 
strings whose numeric order is not generally the same as lexicographic order, 
especially across different digit lengths. A bounded supervisor can mark a 
shard complete before the numeric end offset is reached, or keep recreating 
tasks after it passed the end. Compare Kinesis offsets using their numeric 
value or the existing Kinesis sequence-number ordering semantics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to