apoorvmittal10 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1936961623


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are 
acknowledged (ACCEPT or REJECT).
             long firstKeyToRemove = cachedState.firstKey();
             long lastKeyToRemove;
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
+            // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
                 startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                if (isInitialReadGapOffsetWindowActive()) {
+                    // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
+                    // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and all these batches are acked.
+                    // In this case, lastOffsetAcknowledged will be 20, but we 
cannot simply move the start offset to the first offset
+                    // of next cachedState batch. The startOffset should be at 
21, because we have an acquirable gap there.
+                    startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+                } else {
+                    // If initialReadGapOffset is null, that means the 
cachedState does not have any acquirable gaps.
+                    // We can simply move the start offset to the first offset 
of the next cachedState batch.
+                    startOffset = 
cachedState.higherKey(lastOffsetAcknowledged);
+                }
                 lastKeyToRemove = entry.getKey();
             } else {
+                // The code will reach this point only if 
lastOffsetAcknowledged is in the middle of a stateBatch. In this case
+                // we can simply move the startOffset to the next offset of 
lastOffsetAcknowledged.

Review Comment:
   ```suggestion
                   // we can simply move the startOffset to the next offset of 
lastOffsetAcknowledged and should consider any read gap offsets.
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are 
acknowledged (ACCEPT or REJECT).
             long firstKeyToRemove = cachedState.firstKey();
             long lastKeyToRemove;
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
+            // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
                 startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                if (isInitialReadGapOffsetWindowActive()) {
+                    // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
+                    // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and all these batches are acked.
+                    // In this case, lastOffsetAcknowledged will be 20, but we 
cannot simply move the start offset to the first offset
+                    // of next cachedState batch. The startOffset should be at 
21, because we have an acquirable gap there.
+                    startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+                } else {
+                    // If initialReadGapOffset is null, that means the 
cachedState does not have any acquirable gaps.
+                    // We can simply move the start offset to the first offset 
of the next cachedState batch.
+                    startOffset = 
cachedState.higherKey(lastOffsetAcknowledged);
+                }
                 lastKeyToRemove = entry.getKey();
             } else {
+                // The code will reach this point only if 
lastOffsetAcknowledged is in the middle of a stateBatch. In this case

Review Comment:
   ```suggestion
                   // The code will reach this point only if 
lastOffsetAcknowledged is in the middle of some stateBatch. In this case
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are 
acknowledged (ACCEPT or REJECT).
             long firstKeyToRemove = cachedState.firstKey();
             long lastKeyToRemove;
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
+            // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
                 startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                if (isInitialReadGapOffsetWindowActive()) {
+                    // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
+                    // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and all these batches are acked.
+                    // In this case, lastOffsetAcknowledged will be 20, but we 
cannot simply move the start offset to the first offset
+                    // of next cachedState batch. The startOffset should be at 
21, because we have an acquirable gap there.
+                    startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);

Review Comment:
   Please write the example more clearly with gapStartOffset.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,24 @@ be removed once all the messages (0-99) are 
acknowledged (ACCEPT or REJECT).
             long firstKeyToRemove = cachedState.firstKey();
             long lastKeyToRemove;
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
+            // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
                 startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                if (isInitialReadGapOffsetWindowActive()) {
+                    // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
+                    // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and all these batches are acked.
+                    // In this case, lastOffsetAcknowledged will be 20, but we 
cannot simply move the start offset to the first offset
+                    // of next cachedState batch. The startOffset should be at 
21, because we have an acquirable gap there.
+                    startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), startOffset);
+                } else {
+                    // If initialReadGapOffset is null, that means the 
cachedState does not have any acquirable gaps.
+                    // We can simply move the start offset to the first offset 
of the next cachedState batch.
+                    startOffset = 
cachedState.higherKey(lastOffsetAcknowledged);
+                }

Review Comment:
   Why do you need else block?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1847,19 +1948,30 @@ private boolean isRecordStateAcknowledged(RecordState 
recordState) {
         return recordState == RecordState.ACKNOWLEDGED || recordState == 
RecordState.ARCHIVED;
     }
 
-    private long findLastOffsetAcknowledged() {
-        lock.readLock().lock();
+    // Visible for testing
+    long findLastOffsetAcknowledged() {
         long lastOffsetAcknowledged = -1;
+        lock.readLock().lock();
         try {
             for (NavigableMap.Entry<Long, InFlightBatch> entry : 
cachedState.entrySet()) {
                 InFlightBatch inFlightBatch = entry.getValue();
                 if (inFlightBatch.offsetState() == null) {
                     if 
(!isRecordStateAcknowledged(inFlightBatch.batchState())) {
                         return lastOffsetAcknowledged;
                     }
+                    // If initialReadGapOffset.gapStartOffset is less than or 
equal to the last offset of the batch
+                    // then we cannot identify the current inFlightBatch as 
acknowledged. All the offsets between
+                    // initialReadGapOffset.gapStartOffset and 
initialReadGapOffset.endOffset should always be present
+                    // in the cachedState
+                    if (isInitialReadGapOffsetWindowActive() && 
inFlightBatch.lastOffset() >= initialReadGapOffset.gapStartOffset()) {
+                        return lastOffsetAcknowledged;
+                    }
                     lastOffsetAcknowledged = inFlightBatch.lastOffset();
                 } else {
                     for (Map.Entry<Long, InFlightState> offsetState : 
inFlightBatch.offsetState.entrySet()) {
+                        if (isInitialReadGapOffsetWindowActive() && 
offsetState.getKey() >= initialReadGapOffset.gapStartOffset()) {
+                            return lastOffsetAcknowledged;
+                        }

Review Comment:
   As explained please correct the checks.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1825,8 +1921,8 @@ private boolean canMoveStartOffset() {
 
         NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(startOffset);
         if (entry == null) {
-            log.error("The start offset: {} is not found in the cached state 
for share partition: {}-{}."
-                + " Cannot move the start offset.", startOffset, groupId, 
topicIdPartition);
+            log.info("The start offset: {} is not found in the cached state 
for share partition: {}-{} " +
+                "as there is an acquirable gap at the beginning. Cannot move 
the start offset.", startOffset, groupId, topicIdPartition);
             return false;

Review Comment:
   Seems missed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to