chirag-wadhwa5 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1940939932


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1951,164 +2112,1102 @@ public void 
testAcquireReleasedRecordMultipleBatches() {
         // Fourth fetch request with 5 records starting from offset 28.
         MemoryRecords records4 = memoryRecords(5, 28);
 
-        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
-            MEMBER_ID,
-            BATCH_SIZE,
-            MAX_FETCH_RECORDS,
-            new FetchPartitionData(Errors.NONE, 40, 3, records1,
-                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
-            5);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records1,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records2,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(20, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records3,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(28, sharePartition.nextFetchOffset());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 30, 3, records4,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(33, sharePartition.nextFetchOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(28L).batchState());
+        assertNull(sharePartition.cachedState().get(10L).offsetState());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+        assertNull(sharePartition.cachedState().get(23L).offsetState());
+        assertNull(sharePartition.cachedState().get(28L).offsetState());
+
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            Collections.singletonList(new ShareAcknowledgementBatch(12, 30, 
Collections.singletonList((byte) 2))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        assertEquals(12, sharePartition.nextFetchOffset());
+        assertEquals(4, sharePartition.cachedState().size());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(10L).batchState());
+        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertNull(sharePartition.cachedState().get(15L).offsetState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(15L).batchMemberId());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(23L).batchState());
+        assertNull(sharePartition.cachedState().get(23L).offsetState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(23L).batchMemberId());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(28L).batchState());
+        assertNotNull(sharePartition.cachedState().get(28L).offsetState());
+
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(10L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(11L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(12L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(13L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(14L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(10L).offsetState());
+
+        expectedOffsetStateMap.clear();
+        expectedOffsetStateMap.put(28L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(29L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(30L, new 
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+        expectedOffsetStateMap.put(31L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(32L, new 
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(28L).offsetState());
+
+        // Send next batch from offset 12, only 3 records should be acquired.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records1,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            3);
+
+        assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Though record2 batch exists to acquire but send batch record3, it 
should be acquired but
+        // next fetch offset should not move.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 40, 3, records3,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Acquire partial records from batch 2.
+        MemoryRecords subsetRecords = memoryRecords(2, 17);
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            2);
+
+        assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(), 
acquiredRecordsList.toArray());
+        // Next fetch offset should not move.
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Acquire partial records from record 4 to further test if the next 
fetch offset move
+        // accordingly once complete record 2 is also acquired.
+        subsetRecords = memoryRecords(1, 28);
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            1);
+
+        assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(), 
acquiredRecordsList.toArray());
+        // Next fetch offset should not move.
+        assertEquals(15, sharePartition.nextFetchOffset());
+
+        // Try to acquire complete record 2 though it's already partially 
acquired, the next fetch
+        // offset should move.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            BATCH_SIZE,
+            MAX_FETCH_RECORDS,
+            new FetchPartitionData(Errors.NONE, 20, 3, records2,
+                Optional.empty(), OptionalLong.empty(), Optional.empty(), 
OptionalInt.empty(), false)),
+            3);
+
+        // Offset 15,16 and 19 should be acquired.
+        List<AcquiredRecords> expectedAcquiredRecords = 
expectedAcquiredRecords(15, 16, 2);
+        expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        // Next fetch offset should not move.
+        assertEquals(29, sharePartition.nextFetchOffset());

Review Comment:
   Thanks for the review. I did not make any change to this existing test, 
neither did I add this test. It's showing in the diff probably because that's 
how github is perceiving the change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to