chirag-wadhwa5 commented on code in PR #20391:
URL: https://github.com/apache/kafka/pull/20391#discussion_r2291545734


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -4493,6 +4493,100 @@ public void testLsoMovementForArchivingBatches() {
         
assertNotNull(sharePartition.cachedState().get(32L).batchAcquisitionLockTimeoutTask());
     }
 
+    @Test
+    public void testLsoMovementForArchivingAllAvailableBatches() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+
+        // A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50.
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10);
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10);
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10);
+        fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10);
+
+        // After the acknowledgements, the share partition state will be:
+        // 1. 11 -> 20: AVAILABLE
+        // 2. 21 -> 30: ACQUIRED
+        // 3. 31 -> 40: AVAILABLE
+        // 4. 41 -> 50: ACKNOWLEDGED
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(11, 20, List.of((byte) 2)),
+            new ShareAcknowledgementBatch(31, 40, List.of((byte) 2)),
+            new ShareAcknowledgementBatch(41, 50, List.of((byte) 1))
+        ));
+
+        // Move the LSO to 41. When the LSO moves ahead, all batches that are 
AVAILABLE before the new LSO will be ARCHIVED.
+        // Thus, the state of the share partition will be:
+        // 1. 11 -> 20: ARCHIVED
+        // 2. 21 -> 30: ACQUIRED
+        // 3. 31 -> 40: ARCHIVED
+        // 4. 41 -> 50: ACKNOWLEDGED
+        // Note, the records that are in ACQUIRED state will remain in 
ACQUIRED state and will be transitioned to a Terminal
+        // state when the corresponding acquisition lock timer task expires.

Review Comment:
   Thanks for the review. I have made the required changes in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to