apoorvmittal10 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1941077150


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1926,189 +2087,992 @@ public void testAcquireReleasedRecord() {
         expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
         assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
 
-        // Send the same fetch request batch again but only 2 offsets should 
come as acquired.
-        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 20, 3, records,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            2);
+        // Send the same fetch request batch again but only 2 offsets should 
come as acquired.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, records,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            2);
+
+        assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void testAcquireReleasedRecordMultipleBatches() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+        // First fetch request with 5 records starting from offset 10.
+        MemoryRecords records1 = memoryRecords(5, 10);
+        // Second fetch request with 5 records starting from offset 15.
+        MemoryRecords records2 = memoryRecords(5, 15);
+        // Third fetch request with 5 records starting from offset 23, gap of 
3 offsets.
+        MemoryRecords records3 = memoryRecords(5, 23);
+        // Fourth fetch request with 5 records starting from offset 28.
+        MemoryRecords records4 = memoryRecords(5, 28);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records1,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records2,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(20, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records3,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(28, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records4,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(33, sharePartition.nextFetchOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(28L).batchState());
+        assertNull(sharePartition.cachedState().get(10L).offsetState());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+        assertNull(sharePartition.cachedState().get(23L).offsetState());
+        assertNull(sharePartition.cachedState().get(28L).offsetState());
+
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            Collections.singletonList(new ShareAcknowledgementBatch(12, 30, 
Collections.singletonList((byte) 2))));

Review Comment:
   List.of



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to