adixitconfluent commented on code in PR #20395:
URL: https://github.com/apache/kafka/pull/20395#discussion_r2300669808


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1088,6 +1088,672 @@ public void 
testMaybeInitializeStateBatchesWithoutGaps() {
         assertNull(initialReadGapOffset);
     }
 
+    @Test
+    public void testMaybeInitializeAndAcquire() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create a single batch record that covers the entire range from 10 
to 30 of initial read gap.
+        // The records in the batch are from 10 to 49.
+        MemoryRecords records = memoryRecords(40, 10);
+        // Set max fetch records to 1, records will be acquired till the first 
gap is encountered.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                1,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecord(10, 14, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(15, sharePartition.nextFetchOffset());
+        assertEquals(4, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(10L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(10L).offsetState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(15L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+
+        // Send the same batch again to acquire the next set of records.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                10,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            13);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(31, sharePartition.nextFetchOffset());
+        assertEquals(6, sharePartition.cachedState().size());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(19L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(19L).offsetState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(23L).batchDeliveryCount());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(30L, sharePartition.endOffset());
+        // As all the gaps are now filled, the initialReadGapOffset should be 
null.
+        assertNull(sharePartition.initialReadGapOffset());
+
+        // Now initial read gap is filled, so the complete batch can be 
acquired despite max fetch records being 1.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                1,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            19);
+
+        assertArrayEquals(expectedAcquiredRecord(31, 49, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(50, sharePartition.nextFetchOffset());
+        assertEquals(7, sharePartition.cachedState().size());
+        assertEquals(31, sharePartition.cachedState().get(31L).firstOffset());
+        assertEquals(49, sharePartition.cachedState().get(31L).lastOffset());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(31L).batchState());
+        assertEquals(1, 
sharePartition.cachedState().get(31L).batchDeliveryCount());
+        assertNull(sharePartition.cachedState().get(31L).offsetState());
+        assertEquals(49L, sharePartition.endOffset());
+    }
+
+    @Test
+    public void testMaybeInitializeAndAcquireWithHigherMaxFetchRecords() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create a single batch record that covers the entire range from 10 
to 30 of initial read gap.
+        // The records in the batch are from 10 to 49.
+        MemoryRecords records = memoryRecords(40, 10);
+        // Set max fetch records to 500, all records should be acquired.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                500,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            37);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(10, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(50, sharePartition.nextFetchOffset());
+        assertEquals(7, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(10L).lastOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+        assertEquals(31, sharePartition.cachedState().get(31L).firstOffset());
+        assertEquals(49, sharePartition.cachedState().get(31L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(31L).batchState());
+        assertEquals(49L, sharePartition.endOffset());
+        // As all the gaps are now filled, the initialReadGapOffset should be 
null.
+        assertNull(sharePartition.initialReadGapOffset());
+    }
+
+    @Test
+    public void 
testMaybeInitializeAndAcquireWithFetchBatchLastOffsetWithinCachedBatch() {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 10L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(15L, 18L, 
RecordState.AVAILABLE.id, (short) 2),
+                        new PersisterStateBatch(20L, 22L, 
RecordState.ARCHIVED.id, (short) 2),
+                        new PersisterStateBatch(26L, 30L, 
RecordState.AVAILABLE.id, (short) 3)))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        CompletableFuture<Void> result = sharePartition.maybeInitialize();
+        assertTrue(result.isDone());
+        assertFalse(result.isCompletedExceptionally());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.startOffset());
+        assertEquals(30, sharePartition.endOffset());
+        assertEquals(10, sharePartition.nextFetchOffset());
+
+        assertEquals(18, sharePartition.cachedState().get(15L).lastOffset());
+        assertEquals(22, sharePartition.cachedState().get(20L).lastOffset());
+        assertEquals(30, sharePartition.cachedState().get(26L).lastOffset());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).batchState());
+        assertNotNull(sharePartition.initialReadGapOffset());
+        assertEquals(10L, 
sharePartition.initialReadGapOffset().gapStartOffset());
+        assertEquals(30L, sharePartition.initialReadGapOffset().endOffset());
+
+        // Create a single batch record that ends in between the cached batch 
and the fetch offset is
+        // post startOffset.
+        MemoryRecords records = memoryRecords(16, 12);
+        // Set max fetch records to 500, records should be acquired till the 
last offset of the fetched batch.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+                MEMBER_ID,
+                BATCH_SIZE,
+                500,
+                DEFAULT_FETCH_OFFSET,
+                fetchPartitionData(records),
+                FETCH_ISOLATION_HWM),
+            13);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(12, 14, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(28, sharePartition.nextFetchOffset());
+        assertEquals(6, sharePartition.cachedState().size());
+        assertEquals(12, sharePartition.cachedState().get(12L).firstOffset());
+        assertEquals(14, sharePartition.cachedState().get(12L).lastOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).firstOffset());
+        assertEquals(19, sharePartition.cachedState().get(19L).lastOffset());
+        assertEquals(23, sharePartition.cachedState().get(23L).firstOffset());
+        assertEquals(25, sharePartition.cachedState().get(23L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(12L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(15L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(19L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(20L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(23L).batchState());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(26L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).offsetState().get(26L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(26L).offsetState().get(27L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(28L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(29L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(26L).offsetState().get(30L).state());
+        assertEquals(30L, sharePartition.endOffset());
+        assertNotNull(sharePartition.initialReadGapOffset());

Review Comment:
   perhaps, the variable `initialGapOffsetReadWindow` is a better variable 
name, if it makes sense @apoorvmittal10 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to