apoorvmittal10 commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1936163586
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1791,10 +1875,23 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). long firstKeyToRemove = cachedState.firstKey(); long lastKeyToRemove; NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged); + // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { - startOffset = cachedState.higherKey(lastOffsetAcknowledged); + if (initialReadGapOffset != null) { + // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. + // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and al these batches are acked. + // In this case, lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset + // of next cachedState batch. The startOffset should be at 21, because we have an acquirable gap there. + startOffset = Math.min(initialReadGapOffset.gapStartOffset(), cachedState.higherKey(lastOffsetAcknowledged)); + } else { + // If initialReadGapOffset is null, that means the cachedState does not have any acquirable gaps. + // We can simply move the start offset to the first offset of the next cachedState batch. + startOffset = cachedState.higherKey(lastOffsetAcknowledged); + } Review Comment: The code can be simpler though: ``` startOffset = cachedState.higherKey(lastOffsetAcknowledged); if (isInitialReadGapOffsetWindowActive()) { // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and al these batches are acked. // In this case, lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset // of next cachedState batch. The startOffset should be at 21, because we have an acquirable gap there. startOffset = Math.min(initialReadGapOffset.gapStartOffset(), startOffset)); } ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1791,10 +1875,23 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). long firstKeyToRemove = cachedState.firstKey(); long lastKeyToRemove; NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged); + // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { - startOffset = cachedState.higherKey(lastOffsetAcknowledged); + if (initialReadGapOffset != null) { Review Comment: ```suggestion if (isInitialReadGapOffsetWindowActive()) { ``` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -1791,10 +1875,23 @@ be removed once all the messages (0-99) are acknowledged (ACCEPT or REJECT). long firstKeyToRemove = cachedState.firstKey(); long lastKeyToRemove; NavigableMap.Entry<Long, InFlightBatch> entry = cachedState.floorEntry(lastOffsetAcknowledged); + // If the lastOffsetAcknowledged is equal to the last offset of entry, then the entire batch can potentially be removed. if (lastOffsetAcknowledged == entry.getValue().lastOffset()) { - startOffset = cachedState.higherKey(lastOffsetAcknowledged); + if (initialReadGapOffset != null) { + // This case will arise if we have a situation where there is an acquirable gap after the lastOffsetAcknowledged. + // Ex, the cachedState has following state batches -> {(0, 10), (11, 20), (31,40)} and al these batches are acked. + // In this case, lastOffsetAcknowledged will be 20, but we cannot simply move the start offset to the first offset + // of next cachedState batch. The startOffset should be at 21, because we have an acquirable gap there. + startOffset = Math.min(initialReadGapOffset.gapStartOffset(), cachedState.higherKey(lastOffsetAcknowledged)); Review Comment: Hmmm, should there be min or just set the startOffset to gapStartOffset? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org