apoorvmittal10 commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1939357943
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1825,8 +1922,8 @@ private boolean canMoveStartOffset() { NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(startOffset); if (entry == null) { - log.error("The start offset: {} is not found in the cached state for share partition: {}-{}." - + " Cannot move the start offset.", startOffset, groupId, topicIdPartition); + log.debug("The start offset: {} is not found in the cached state for share partition: {}-{} " + + "as there is an acquirable gap at the beginning. Cannot move the start offset.", startOffset, groupId, topicIdPartition); return false; Review Comment: Please write comments on top regarding why the entry can be null in some cases now. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1847,12 +1948,20 @@ private boolean isRecordStateAcknowledged(RecordState recordState) { return recordState == RecordState.ACKNOWLEDGED || recordState == RecordState.ARCHIVED; } - private long findLastOffsetAcknowledged() { - lock.readLock().lock(); + // Visible for testing + long findLastOffsetAcknowledged() { long lastOffsetAcknowledged = -1; + lock.readLock().lock(); try { for (NavigableMap.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) { InFlightBatch inFlightBatch = entry.getValue(); + // If initialReadGapOffset.gapStartOffset is less than or equal to the last offset of the batch + // then we cannot identify the current inFlightBatch as acknowledged. All the offsets between + // initialReadGapOffset.gapStartOffset and initialReadGapOffset.endOffset should always be present + // in the cachedState + if (isInitialReadGapOffsetWindowActive() && inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) { + return lastOffsetAcknowledged; + } Review Comment: nit: line break here please. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1847,12 +1948,20 @@ private boolean isRecordStateAcknowledged(RecordState recordState) { return recordState == RecordState.ACKNOWLEDGED || recordState == RecordState.ARCHIVED; } - private long findLastOffsetAcknowledged() { - lock.readLock().lock(); + // Visible for testing + long findLastOffsetAcknowledged() { long lastOffsetAcknowledged = -1; + lock.readLock().lock(); try { for (NavigableMap.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) { InFlightBatch inFlightBatch = entry.getValue(); + // If initialReadGapOffset.gapStartOffset is less than or equal to the last offset of the batch + // then we cannot identify the current inFlightBatch as acknowledged. All the offsets between + // initialReadGapOffset.gapStartOffset and initialReadGapOffset.endOffset should always be present + // in the cachedState Review Comment: I am not sure if this line of comment is helping here. May be remove it. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -444,6 +450,11 @@ public CompletableFuture<Void> maybeInitialize() { stateEpoch = partitionData.stateEpoch(); List<PersisterStateBatch> stateBatches = partitionData.stateBatches(); + boolean isGapPresentInStateBatches = false; Review Comment: long gapStartOffset = -1; ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -452,6 +463,10 @@ public CompletableFuture<Void> maybeInitialize() { throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)); return; } + if (stateBatch.firstOffset() > previousBatchLastOffset + 1) { + isGapPresentInStateBatches = true; Review Comment: gapStartOffset = previousBatchLastOffset + 1; ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -648,6 +682,27 @@ public ShareAcquiredRecords acquire( } InFlightBatch inFlightBatch = entry.getValue(); + Review Comment: nit: remove line break. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1791,10 +1875,23 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). long firstKeyToRemove = cachedState.firstKey(); long lastKeyToRemove; NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged); + // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { startOffset = cachedState.higherKey(lastOffsetAcknowledged); + if (isInitialReadGapOffsetWindowActive()) { + // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. + // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and all these batches are acked. + // There is a gap from 21 to 30. Let the initialReadGapOffset.gapStartOffset be 21. In this case, + // lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset + // of next cachedState batch (next cachedState batch is 31 to 40). There is an acquirable gap in between (21 to 30) + // and The startOffset should be at 21. Hence, we set startOffset to the minimum of initialReadGapOffset.gapStartOffset + // and higher key of lastOffsetAcknowledged + startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset); Review Comment: Do we have a test case covering this code? ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -896,6 +896,170 @@ public void testMaybeInitializeWithReadException() { assertThrows(RuntimeException.class, sharePartition2::maybeInitialize); } + @Test + public void testMaybeInitializeStateBatchesWithGapAtBeginning() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(15L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14 + new PersisterStateBatch(21L, 30L, RecordState.ARCHIVED.id, (short) 3))))))); + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(30, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(2, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(15L)); + assertNotNull(sharePartition.cachedState().get(21L)); + + assertEquals(20, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(2, sharePartition.cachedState().get(15L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + + assertEquals(30, sharePartition.cachedState().get(21L).lastOffset()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(21L).batchState()); + assertEquals(3, sharePartition.cachedState().get(21L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(21L).offsetState()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + assertEquals(10, initialReadGapOffset.gapStartOffset()); + assertEquals(30, initialReadGapOffset.endOffset()); + } + + @Test + public void testMaybeInitializeStateBatchesWithMultipleGaps() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 10L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(15L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 10 to 14 + new PersisterStateBatch(30L, 40L, RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29 + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(10, sharePartition.startOffset()); + assertEquals(40, sharePartition.endOffset()); + assertEquals(3, sharePartition.stateEpoch()); + assertEquals(10, sharePartition.nextFetchOffset()); + + assertEquals(2, sharePartition.cachedState().size()); + assertNotNull(sharePartition.cachedState().get(15L)); + assertNotNull(sharePartition.cachedState().get(30L)); + + assertEquals(20, sharePartition.cachedState().get(15L).lastOffset()); + assertEquals(RecordState.ACKNOWLEDGED, sharePartition.cachedState().get(15L).batchState()); + assertEquals(2, sharePartition.cachedState().get(15L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(15L).offsetState()); + + assertEquals(40, sharePartition.cachedState().get(30L).lastOffset()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(30L).batchState()); + assertEquals(3, sharePartition.cachedState().get(30L).batchDeliveryCount()); + assertNull(sharePartition.cachedState().get(30L).offsetState()); + + SharePartition.InitialReadGapOffset initialReadGapOffset = sharePartition.initialReadGapOffset(); + assertNotNull(initialReadGapOffset); + + assertEquals(10, initialReadGapOffset.gapStartOffset()); + assertEquals(40, initialReadGapOffset.endOffset()); + } + + @Test + public void testMaybeInitializeStateBatchesWithGapNotAtBeginning() { + Persister persister = Mockito.mock(Persister.class); + ReadShareGroupStateResult readShareGroupStateResult = Mockito.mock(ReadShareGroupStateResult.class); + Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList( + PartitionFactory.newPartitionAllData(0, 3, 15L, Errors.NONE.code(), Errors.NONE.message(), + Arrays.asList( + new PersisterStateBatch(15L, 20L, RecordState.ACKNOWLEDGED.id, (short) 2), + new PersisterStateBatch(30L, 40L, RecordState.ARCHIVED.id, (short) 3))))))); // There is a gap from 21 to 29 + Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult)); + SharePartition sharePartition = SharePartitionBuilder.builder().withPersister(persister).build(); + + CompletableFuture<Void> result = sharePartition.maybeInitialize(); + assertTrue(result.isDone()); + assertFalse(result.isCompletedExceptionally()); + + assertEquals(SharePartitionState.ACTIVE, sharePartition.partitionState()); + assertFalse(sharePartition.cachedState().isEmpty()); + assertEquals(15, sharePartition.startOffset()); Review Comment: Hmm, this seems problematic. So 15-20 are acknowledged and start offset is 15, hence ideally this should move. But because of gapstartoffset tracking I think we couldn't, correct? So should we move the gap start offset from first gap itself in mayBeInitilaize method? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -462,6 +477,10 @@ public CompletableFuture<Void> maybeInitialize() { // in the cached state are not missed findNextFetchOffset.set(true); endOffset = cachedState.lastEntry().getValue().lastOffset(); + // initialReadGapOffset is not required, if there are no gaps in the read state response + if (isGapPresentInStateBatches) { + initialReadGapOffset = new InitialReadGapOffset(endOffset, startOffset); Review Comment: ```suggestion initialReadGapOffset = new InitialReadGapOffset(endOffset, gapStartOffset); ``` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -462,6 +477,10 @@ public CompletableFuture<Void> maybeInitialize() { // in the cached state are not missed findNextFetchOffset.set(true); endOffset = cachedState.lastEntry().getValue().lastOffset(); + // initialReadGapOffset is not required, if there are no gaps in the read state response + if (isGapPresentInStateBatches) { Review Comment: if (gapStartOffset != -1) { ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -452,6 +463,10 @@ public CompletableFuture<Void> maybeInitialize() { throwable = new IllegalStateException(String.format("Failed to initialize the share partition %s-%s", groupId, topicIdPartition)); return; } + if (stateBatch.firstOffset() > previousBatchLastOffset + 1) { Review Comment: ```suggestion if (gapStartOffset == -1 && stateBatch.firstOffset() > previousBatchLastOffset + 1) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org