chirag-wadhwa5 commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1940953629
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -1951,164 +2112,1102 @@ public void testAcquireReleasedRecordMultipleBatches() { // Fourth fetch request with 5 records starting from offset 28. MemoryRecords records4 = memoryRecords(5, 28); - List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( - MEMBER_ID, - BATCH_SIZE, - MAX_FETCH_RECORDS, - new FetchPartitionData(Errors.NONE, 40, 3, records1, - Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), - 5); + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 40, 3, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 30, 3, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(20, sharePartition.nextFetchOffset()); + + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 30, 3, records3, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(28, sharePartition.nextFetchOffset()); + + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 30, 3, records4, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(), acquiredRecordsList.toArray()); + assertEquals(33, sharePartition.nextFetchOffset()); + + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(23L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(28L).batchState()); + assertNull(sharePartition.cachedState().get(10L).offsetState()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + assertNull(sharePartition.cachedState().get(23L).offsetState()); + assertNull(sharePartition.cachedState().get(28L).offsetState()); + + CompletableFuture<Void> ackResult = sharePartition.acknowledge( + MEMBER_ID, + Collections.singletonList(new ShareAcknowledgementBatch(12, 30, Collections.singletonList((byte) 2)))); + assertNull(ackResult.join()); + assertFalse(ackResult.isCompletedExceptionally()); + + assertEquals(12, sharePartition.nextFetchOffset()); + assertEquals(4, sharePartition.cachedState().size()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(10L).batchState()); + assertNotNull(sharePartition.cachedState().get(10L).offsetState()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(15L).batchState()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(15L).batchMemberId()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(23L).batchState()); + assertNull(sharePartition.cachedState().get(23L).offsetState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(23L).batchMemberId()); + assertThrows(IllegalStateException.class, () -> sharePartition.cachedState().get(28L).batchState()); + assertNotNull(sharePartition.cachedState().get(28L).offsetState()); + + Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>(); + expectedOffsetStateMap.put(10L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(11L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(12L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(13L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(14L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(10L).offsetState()); + + expectedOffsetStateMap.clear(); + expectedOffsetStateMap.put(28L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(29L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(30L, new InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID)); + expectedOffsetStateMap.put(31L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + expectedOffsetStateMap.put(32L, new InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID)); + assertEquals(expectedOffsetStateMap, sharePartition.cachedState().get(28L).offsetState()); + + // Send next batch from offset 12, only 3 records should be acquired. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 40, 3, records1, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 3); + + assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + + // Though record2 batch exists to acquire but send batch record3, it should be acquired but + // next fetch offset should not move. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 40, 3, records3, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(), acquiredRecordsList.toArray()); + assertEquals(15, sharePartition.nextFetchOffset()); + + // Acquire partial records from batch 2. + MemoryRecords subsetRecords = memoryRecords(2, 17); + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 2); + + assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(), acquiredRecordsList.toArray()); + // Next fetch offset should not move. + assertEquals(15, sharePartition.nextFetchOffset()); + + // Acquire partial records from record 4 to further test if the next fetch offset move + // accordingly once complete record 2 is also acquired. + subsetRecords = memoryRecords(1, 28); + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 1); + + assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(), acquiredRecordsList.toArray()); + // Next fetch offset should not move. + assertEquals(15, sharePartition.nextFetchOffset()); + + // Try to acquire complete record 2 though it's already partially acquired, the next fetch + // offset should move. + acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 20, 3, records2, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 3); + + // Offset 15,16 and 19 should be acquired. + List<AcquiredRecords> expectedAcquiredRecords = expectedAcquiredRecords(15, 16, 2); + expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2)); + assertArrayEquals(expectedAcquiredRecords.toArray(), acquiredRecordsList.toArray()); + // Next fetch offset should not move. + assertEquals(29, sharePartition.nextFetchOffset()); + } + + @Test + public void testAcquireGapAtBeginningAndRecordsFetchedFromGap() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(21L, 30L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20 + new PersisterStateBatch(31L, 40L, RecordState.ARCHIVED.id, (short) 1) + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(11, sharePartition.nextFetchOffset()); + + // All records fetched are part of the gap. The gap is from 11 to 20, fetched offsets are 11 to 15. + MemoryRecords records = memoryRecords(5, 11); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 5); + + assertArrayEquals(expectedAcquiredRecord(11, 15, 1).toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(16, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + // After records are acquired, the initialReadGapOffset should be updated + assertEquals(16, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightBatches() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(21L, 30L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20 + new PersisterStateBatch(31L, 40L, RecordState.ARCHIVED.id, (short) 1) + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(11, sharePartition.nextFetchOffset()); + + // Fetched offsets overlap the inFlight batches. The gap is from 11 to 20, but fetched records are from 11 to 25. + MemoryRecords records = memoryRecords(15, 11); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 10); + + assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(41, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + // After records are acquired, the initialReadGapOffset should be updated + assertEquals(21, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightAvailableBatches() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(21L, 30L, RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20 + new PersisterStateBatch(31L, 40L, RecordState.ARCHIVED.id, (short) 1) + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(11, sharePartition.nextFetchOffset()); + + // Fetched offsets overlap the inFlight batches. The gap is from 11 to 20, but fetched records are from 11 to 25. + MemoryRecords records = memoryRecords(15, 11); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 15); + + // The gap from 11 to 20 will be acquired. Since the next batch is AVAILABLE, and we records fetched from replica manager + // overlap with the next batch, some records from the next batch will also be acquired + List<AcquiredRecords> expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(11, 20, 1)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 3)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 3)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(23, 23, 3)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(24, 24, 3)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(25, 25, 3)); + assertArrayEquals(expectedAcquiredRecord.toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(26, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + // After records are acquired, the initialReadGapOffset should be updated + assertEquals(26, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testAcquireWhenCachedStateContainsGapsAndRecordsFetchedFromNonGapOffset() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(11L, 20L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(31L, 40L, RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30 + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(11, sharePartition.nextFetchOffset()); + + // Fetched records are part of inFlightBatch 11-20 with state AVAILABLE. Fetched offsets also overlap the + // inFlight batches. The gap is from 11 to 20, but fetched records are from 11 to 25. + MemoryRecords records = memoryRecords(15, 11); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 15); + + // 2 different batches will be acquired this time (11-20 and 21-25). The first batch will have delivery count 3 + // as previous deliveryCount was 2. The second batch will have delivery count 1 as it is acquired for the first time. + List<AcquiredRecords> expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(11, 20, 3)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 25, 1)); + assertArrayEquals(expectedAcquiredRecord.toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(26, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + // After records are acquired, the initialReadGapOffset should be updated + assertEquals(26, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testAcquireGapAtBeginningAndFetchedRecordsOverlapMultipleInFlightBatches() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(21L, 30L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20 + new PersisterStateBatch(41L, 50L, RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40 + new PersisterStateBatch(61L, 70L, RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60 + new PersisterStateBatch(81L, 90L, RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80 + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(90, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(11, sharePartition.nextFetchOffset()); + + MemoryRecords records = memoryRecords(75, 11); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 55); + + // Acquired batches will contain the following -> + // 1. 11-20 (gap offsets) + // 2. 31-40 (gap offsets) + // 3. 41-50 (AVAILABLE batch in cachedState) + // 4. 51-60 (gap offsets) + // 5. 71-80 (gap offsets) + // 6. 81-85 (AVAILABLE batch in cachedState). These will be acquired as separate batches because we are breaking a batch in the cachedState + List<AcquiredRecords> expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(11, 20, 1)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 80, 1)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(81, 81, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(82, 82, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(83, 83, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(84, 84, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(85, 85, 2)); + assertArrayEquals(expectedAcquiredRecord.toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(90, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(86, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + // After records are acquired, the initialReadGapOffset should be updated + assertEquals(86, initialReadGapOffset.gapStartOffset()); + assertEquals(90, initialReadGapOffset.endOffset()); + } + + @Test + public void testAcquireGapAtBeginningAndFetchedRecordsEndJustBeforeGap() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(21L, 30L, RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20 + new PersisterStateBatch(41L, 50L, RecordState.ACKNOWLEDGED.id, (short) 1), // There is a gap from 31 to 40 + new PersisterStateBatch(61L, 70L, RecordState.ARCHIVED.id, (short) 1) // There is a gap from 51 to 60 + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(70, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(11, sharePartition.nextFetchOffset()); + + MemoryRecords records = memoryRecords(20, 11); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 20); + + // Acquired batches will contain the following -> + // 1. 11-20 (gap offsets) + // 2. 21-30 (AVAILABLE batch in cachedState) + List<AcquiredRecords> expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(11, 20, 1)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 30, 3)); + assertArrayEquals(expectedAcquiredRecord.toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(70, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(31, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + // After records are acquired, the initialReadGapOffset should be updated + assertEquals(31, initialReadGapOffset.gapStartOffset()); + assertEquals(70, initialReadGapOffset.endOffset()); + } + + @Test + public void testAcquireGapAtBeginningAndFetchedRecordsIncludeGapOffsetsAtEnd() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(21L, 30L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20 + new PersisterStateBatch(41L, 50L, RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40 + new PersisterStateBatch(61L, 70L, RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60 + new PersisterStateBatch(81L, 90L, RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80 + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(90, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(11, sharePartition.nextFetchOffset()); + + MemoryRecords records = memoryRecords(65, 11); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + MAX_FETCH_RECORDS, + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 45); + + // Acquired batches will contain the following -> + // 1. 11-20 (gap offsets) + // 2. 31-40 (gap offsets) + // 3. 41-50 (AVAILABLE batch in cachedState) + // 4. 51-60 (gap offsets) + // 5. 71-75 (gap offsets). The gap is from 71 to 80, but the fetched records end at 75. These gap offsets will be acquired as a single batch + List<AcquiredRecords> expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(11, 20, 1)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1)); + expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 75, 1)); + assertArrayEquals(expectedAcquiredRecord.toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(90, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(76, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + // After records are acquired, the initialReadGapOffset should be updated + assertEquals(76, initialReadGapOffset.gapStartOffset()); + assertEquals(90, initialReadGapOffset.endOffset()); + } + + + @Test + public void testAcquireWhenRecordsFetchedFromGapAndMaxFetchRecordsIsExceeded() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(11L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), + new PersisterStateBatch(31L, 40L, RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30 + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + // The start offset will be moved to 21, since the offsets 11 to 20 are acknowledged, and will be removed + // from cached state in the maybeUpdateCachedStateAndOffsets method + assertEquals(21, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(21, sharePartition.nextFetchOffset()); + + // Creating 3 batches of records with a total of 8 records + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 3, 21).close(); + memoryRecordsBuilder(buffer, 3, 24).close(); + memoryRecordsBuilder(buffer, 2, 27).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 6, // maxFetchRecords is less than the number of records fetched + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 6); + + // Since max fetch records (6) is less than the number of records fetched (8), only 6 records will be acquired + assertArrayEquals(expectedAcquiredRecord(21, 26, 1).toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(21, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(27, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + assertEquals(27, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testAcquireMaxFetchRecordsExceededAfterAcquiringGaps() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(21L, 30L, RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11-20 + new PersisterStateBatch(31L, 40L, RecordState.ARCHIVED.id, (short) 1) + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(11, sharePartition.nextFetchOffset()); + + // Creating 3 batches of records with a total of 8 records + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 10, 11).close(); + memoryRecordsBuilder(buffer, 10, 21).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 8, // maxFetchRecords is less than the number of records fetched + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 10); + + assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(21, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + assertEquals(21, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testAcquireMaxFetchRecordsExceededBeforeAcquiringGaps() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 11L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(11L, 20L, RecordState.AVAILABLE.id, (short) 2), + new PersisterStateBatch(31L, 40L, RecordState.AVAILABLE.id, (short) 1) // There is a gap from 21-30 + )))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(11, sharePartition.nextFetchOffset()); + + // Creating 3 batches of records with a total of 8 records + ByteBuffer buffer = ByteBuffer.allocate(4096); + memoryRecordsBuilder(buffer, 10, 11).close(); + memoryRecordsBuilder(buffer, 20, 21).close(); + buffer.flip(); + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( + MEMBER_ID, + BATCH_SIZE, + 8, // maxFetchRecords is less than the number of records fetched + new FetchPartitionData(Errors.NONE, 3, 0, records, + Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)), + 10); + + assertArrayEquals(expectedAcquiredRecord(11, 20, 3).toArray(), acquiredRecordsList.toArray()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(11, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(21, sharePartition.nextFetchOffset()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + assertEquals(21, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testAcquireWhenRecordsFetchedFromGapAndPartitionContainsNaturalGaps() { Review Comment: The gaps actually correspond with the gap in the cached state (21 to 29). The memory records created have the following batches -> (10, 20) (30, 50) The significance of this test lies in the fact that the natural gap is in between the batches fetched from the partition, but also coincide with the gap in cached state. In this case, the broker acquire the gap because the broker does not parse all the batches in the fetched record, so it's not aware of the presence of any natural gaps. The broker only knows the range of offsets fetched (from 10 to 50), and will assume all these offsets contain information. It is then the client's responsibility to inform the broker about the natural gap in the batch of 21-29. I will add some comments in the test as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org