apoorvmittal10 commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1936961623
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). long firstKeyToRemove = cachedState.firstKey(); long lastKeyToRemove; NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged); + // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { startOffset = cachedState.higherKey(lastOffsetAcknowledged); + if (isInitialReadGapOffsetWindowActive()) { + // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. + // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and all these batches are acked. + // In this case, lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset + // of next cachedState batch. The startOffset should be at 21, because we have an acquirable gap there. + startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset); + } else { + // If initialReadGapOffset is null, that means the cachedState does not have any acquirable gaps. + // We can simply move the start offset to the first offset of the next cachedState batch. + startOffset = cachedState.higherKey(lastOffsetAcknowledged); + } lastKeyToRemove = entry.getKey(); } else { + // The code will reach this point only if lastOffsetAcknowledged is in the middle of a stateBatch. In this case + // we can simply move the startOffset to the next offset of lastOffsetAcknowledged. Review Comment: ```suggestion // we can simply move the startOffset to the next offset of lastOffsetAcknowledged and should consider any read gap offsets. ``` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). long firstKeyToRemove = cachedState.firstKey(); long lastKeyToRemove; NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged); + // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { startOffset = cachedState.higherKey(lastOffsetAcknowledged); + if (isInitialReadGapOffsetWindowActive()) { + // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. + // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and all these batches are acked. + // In this case, lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset + // of next cachedState batch. The startOffset should be at 21, because we have an acquirable gap there. + startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset); + } else { + // If initialReadGapOffset is null, that means the cachedState does not have any acquirable gaps. + // We can simply move the start offset to the first offset of the next cachedState batch. + startOffset = cachedState.higherKey(lastOffsetAcknowledged); + } lastKeyToRemove = entry.getKey(); } else { + // The code will reach this point only if lastOffsetAcknowledged is in the middle of a stateBatch. In this case Review Comment: ```suggestion // The code will reach this point only if lastOffsetAcknowledged is in the middle of some stateBatch. In this case ``` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). long firstKeyToRemove = cachedState.firstKey(); long lastKeyToRemove; NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged); + // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { startOffset = cachedState.higherKey(lastOffsetAcknowledged); + if (isInitialReadGapOffsetWindowActive()) { + // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. + // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and all these batches are acked. + // In this case, lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset + // of next cachedState batch. The startOffset should be at 21, because we have an acquirable gap there. + startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset); Review Comment: Please write the example more clearly with gapStartOffset. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). long firstKeyToRemove = cachedState.firstKey(); long lastKeyToRemove; NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged); + // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { startOffset = cachedState.higherKey(lastOffsetAcknowledged); + if (isInitialReadGapOffsetWindowActive()) { + // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. + // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and all these batches are acked. + // In this case, lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset + // of next cachedState batch. The startOffset should be at 21, because we have an acquirable gap there. + startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset); + } else { + // If initialReadGapOffset is null, that means the cachedState does not have any acquirable gaps. + // We can simply move the start offset to the first offset of the next cachedState batch. + startOffset = cachedState.higherKey(lastOffsetAcknowledged); + } Review Comment: Why do you need else block? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1847,19 +1948,30 @@ private boolean isRecordStateAcknowledged(RecordState recordState) { return recordState == RecordState.ACKNOWLEDGED || recordState == RecordState.ARCHIVED; } - private long findLastOffsetAcknowledged() { - lock.readLock().lock(); + // Visible for testing + long findLastOffsetAcknowledged() { long lastOffsetAcknowledged = -1; + lock.readLock().lock(); try { for (NavigableMap.Entry<Long, InFlightBatch> entry : cachedState.entrySet()) { InFlightBatch inFlightBatch = entry.getValue(); if (inFlightBatch.offsetState() == null) { if (!isRecordStateAcknowledged(inFlightBatch.batchState())) { return lastOffsetAcknowledged; } + // If initialReadGapOffset.gapStartOffset is less than or equal to the last offset of the batch + // then we cannot identify the current inFlightBatch as acknowledged. All the offsets between + // initialReadGapOffset.gapStartOffset and initialReadGapOffset.endOffset should always be present + // in the cachedState + if (isInitialReadGapOffsetWindowActive() && inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) { + return lastOffsetAcknowledged; + } lastOffsetAcknowledged = inFlightBatch.lastOffset(); } else { for (Map.Entry<Long, InFlightState> offsetState : inFlightBatch.offsetState.entrySet()) { + if (isInitialReadGapOffsetWindowActive() && offsetState.getKey() >= initialReadGapOffset.gapStartOffset()) { + return lastOffsetAcknowledged; + } Review Comment: As explained please correct the checks. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1825,8 +1921,8 @@ private boolean canMoveStartOffset() { NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(startOffset); if (entry == null) { - log.error("The start offset: {} is not found in the cached state for share partition: {}-{}." - + " Cannot move the start offset.", startOffset, groupId, topicIdPartition); + log.info("The start offset: {} is not found in the cached state for share partition: {}-{} " + + "as there is an acquirable gap at the beginning. Cannot move the start offset.", startOffset, groupId, topicIdPartition); return false; Review Comment: Seems missed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org