adixitconfluent commented on code in PR #20395: URL: https://github.com/apache/kafka/pull/20395#discussion_r2300769794
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1088,6 +1088,672 @@ public void testMaybeInitializeStateBatchesWithoutGaps() { assertNull(initialReadGapOffset); } + @Test + public void testMaybeInitializeAndAcquire() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. + // The records in the batch are from 10 to 49. + MemoryRecords records = memoryRecords(40, 10); + // Set max fetch records to 1, records will be acquired till the first gap is encountered. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 1, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 5); + + assertArrayEquals(expectedAcquiredRecord(10, 14, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + assertEquals(4, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.cachedState().get(10L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(1, sharePartition.cachedState().get(10L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(10L).offsetState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(15L, sharePartition.initialReadGapOffset().gapStartOffset()); + + // Send the same batch again to acquire the next set of records. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 10, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 13); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(31, sharePartition.nextFetchOffset()); + assertEquals(6, sharePartition.cachedState().size()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(1, sharePartition.cachedState().get(19L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(19L).offsetState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(23, sharePartition.cachedState().get(23L).firstOffset()); + assertEquals(25, sharePartition.cachedState().get(23L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertEquals(1, sharePartition.cachedState().get(23L).batchDeliveryCount()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState()); + assertEquals(30L, sharePartition.endOffset()); + // As all the gaps are now filled, the initialReadGapOffset should be null. + assertNull(sharePartition.initialReadGapOffset()); + + // Now initial read gap is filled, so the complete batch can be acquired despite max fetch records being 1. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 1, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 19); + + assertArrayEquals(expectedAcquiredRecord(31, 49, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(50, sharePartition.nextFetchOffset()); + assertEquals(7, sharePartition.cachedState().size()); + assertEquals(31, sharePartition.cachedState().get(31L).firstOffset()); + assertEquals(49, sharePartition.cachedState().get(31L).lastOffset()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(31L).batchState()); + assertEquals(1, sharePartition.cachedState().get(31L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(31L).offsetState()); + assertEquals(49L, sharePartition.endOffset()); + } + + @Test + public void testMaybeInitializeAndAcquireWithHigherMaxFetchRecords() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. + // The records in the batch are from 10 to 49. + MemoryRecords records = memoryRecords(40, 10); + // Set max fetch records to 500, all records should be acquired. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 37); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(10, 14, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 30, 4)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(31, 49, 1)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(50, sharePartition.nextFetchOffset()); + assertEquals(7, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.cachedState().get(10L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(10L).lastOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + assertEquals(23, sharePartition.cachedState().get(23L).firstOffset()); + assertEquals(25, sharePartition.cachedState().get(23L).lastOffset()); + assertEquals(31, sharePartition.cachedState().get(31L).firstOffset()); + assertEquals(49, sharePartition.cachedState().get(31L).lastOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(31L).batchState()); + assertEquals(49L, sharePartition.endOffset()); + // As all the gaps are now filled, the initialReadGapOffset should be null. + assertNull(sharePartition.initialReadGapOffset()); + } + + @Test + public void testMaybeInitializeAndAcquireWithFetchBatchLastOffsetWithinCachedBatch() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + List.of( + new PersisterStateBatch(15L, 18L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(20L, 22L, RecordState.ARCHIVED.id, (short) 2), + new PersisterStateBatch(26L, 30L, RecordState.AVAILABLE.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertEquals(3, sharePartition.cachedState().size()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(18, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(22, sharePartition.cachedState().get(20L).lastOffset()); + assertEquals(30, sharePartition.cachedState().get(26L).lastOffset()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).batchState()); + assertNotNull(sharePartition.initialReadGapOffset()); + assertEquals(10L, sharePartition.initialReadGapOffset().gapStartOffset()); + assertEquals(30L, sharePartition.initialReadGapOffset().endOffset()); + + // Create a single batch record that ends in between the cached batch and the fetch offset is + // post startOffset. + MemoryRecords records = memoryRecords(16, 12); + // Set max fetch records to 500, records should be acquired till the last offset of the fetched batch. + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 500, + DEFAULT_FETCH_OFFSET, + fetchPartitionData(records), + FETCH_ISOLATION_HWM), + 13); + + List<AcquiredRecords> expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(12, 14, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1)); + expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 4)); + + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + assertEquals(28, sharePartition.nextFetchOffset()); + assertEquals(6, sharePartition.cachedState().size()); + assertEquals(12, sharePartition.cachedState().get(12L).firstOffset()); + assertEquals(14, sharePartition.cachedState().get(12L).lastOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).firstOffset()); + assertEquals(19, sharePartition.cachedState().get(19L).lastOffset()); + assertEquals(23, sharePartition.cachedState().get(23L).firstOffset()); + assertEquals(25, sharePartition.cachedState().get(23L).lastOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(12L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(19L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(20L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(26L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).offsetState().get(26L).state()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(26L).offsetState().get(27L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(28L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(29L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(26L).offsetState().get(30L).state()); + assertEquals(30L, sharePartition.endOffset()); + assertNotNull(sharePartition.initialReadGapOffset()); Review Comment: as discussed offline, we are gonna make the variable name change in a different PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org