apoorvmittal10 commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1941077150
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1926,189 +2087,992 @@ public void testAcquireReleasedRecord() { expectedOffsetStateMap.put(14L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState()); - // Send the same fetch request batch again but only 2 offsets should come as acquired. - acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( - MEMBER_ID, - BATCH_SIZE, - MAX_FETCH_RECORDS, - new FetchPartitionData(Errors.NONE, 20, 3, records, - Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), - 2); + // Send the same fetch request batch again but only 2 offsets should come as acquired. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 20, 3, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 2); + + assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + } + + @Test + public void testAcquireReleasedRecordMultipleBatches() { + SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); + // First fetch request with 5 records starting from offset 10. + MemoryRecords records1 = memoryRecords(5, 10); + // Second fetch request with 5 records starting from offset 15. + MemoryRecords records2 = memoryRecords(5, 15); + // Third fetch request with 5 records starting from offset 23, gap of 3 offsets. + MemoryRecords records3 = memoryRecords(5, 23); + // Fourth fetch request with 5 records starting from offset 28. + MemoryRecords records4 = memoryRecords(5, 28); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 40, 3, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 30, 3, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(20, sharePartition.nextFetchOffset()); + + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 30, 3, records3, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(28, sharePartition.nextFetchOffset()); + + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 30, 3, records4, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(33, sharePartition.nextFetchOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(28L).batchState()); + assertNull(sharePartition.cachedState().get(10L).offsetState()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + assertNull(sharePartition.cachedState().get(23L).offsetState()); + assertNull(sharePartition.cachedState().get(28L).offsetState()); + + CompletableFuture<Void> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(12, 30, Collections.singletonList((byte) 2)))); Review Comment: List.of -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org