apoorvmittal10 commented on code in PR #20391: URL: https://github.com/apache/kafka/pull/20391#discussion_r2291025889
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -4493,6 +4493,100 @@ public void testLsoMovementForArchivingBatches() { assertNotNull(sharePartition.cachedState().get(32L).batchAcquisitionLockTimeoutTask()); } + @Test + public void testLsoMovementForArchivingAllAvailableBatches() { + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + + // A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50. + fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10); + + // After the acknowledgements, the share partition state will be: + // 1. 11 -> 20: AVAILABLE + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 40: AVAILABLE + // 4. 41 -> 50: ACKNOWLEDGED + sharePartition.acknowledge(MEMBER_ID, List.of( + new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)), + new ShareAcknowledgementBatch(31, 40, List.of((byte) 2)), + new ShareAcknowledgementBatch(41, 50, List.of((byte) 1)) + )); + + // Move the LSO to 41. When the LSO moves ahead, all batches that are AVAILABLE before the new LSO will be ARCHIVED. + // Thus, the state of the share partition will be: + // 1. 11 -> 20: ARCHIVED + // 2. 21 -> 30: ACQUIRED + // 3. 31 -> 40: ARCHIVED + // 4. 41 -> 50: ACKNOWLEDGED + // Note, the records that are in ACQUIRED state will remain in ACQUIRED state and will be transitioned to a Terminal + // state when the corresponding acquisition lock timer task expires. Review Comment: In the end of this current method: Can you call `sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21L, 30L, List.of((byte) 2))))`. Validate the response and cached state. This will make sure that acknowledge did nothing and also didn't fail. However, I ll discuss this behaviour in a separate PR, when I ll update the method docs. Then call `sharePartition.cachedState().get(21L).batchAcquisitionLockTimeoutTask().run();` and validate the respective records are directly archived. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org