apoorvmittal10 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1936163586


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,23 @@ be removed once all the messages (0-99) are 
acknowledged (ACCEPT or REJECT).
             long firstKeyToRemove = cachedState.firstKey();
             long lastKeyToRemove;
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
+            // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
-                startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                if (initialReadGapOffset != null) {
+                    // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
+                    // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and al these batches are acked.
+                    // In this case, lastOffsetAcknowledged will be 20, but we 
cannot simply move the start offset to the first offset
+                    // of next cachedState batch. The startOffset should be at 
21, because we have an acquirable gap there.
+                    startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), 
cachedState.higherKey(lastOffsetAcknowledged));
+                } else {
+                    // If initialReadGapOffset is null, that means the 
cachedState does not have any acquirable gaps.
+                    // We can simply move the start offset to the first offset 
of the next cachedState batch.
+                    startOffset = 
cachedState.higherKey(lastOffsetAcknowledged);
+                }

Review Comment:
   The code can be simpler though:
   
   ```
   startOffset = cachedState.higherKey(lastOffsetAcknowledged);
   if (isInitialReadGapOffsetWindowActive()) {
                       // This case will arise if we have a situation where 
there is an acquirable gap after the lastOffsetAcknowledged.
                       // Ex, the cachedState has following state batches -> 
{(0, 10), (11, 20), (31,40)} and al these batches are acked.
                       // In this case, lastOffsetAcknowledged will be 20, but 
we cannot simply move the start offset to the first offset
                       // of next cachedState batch. The startOffset should be 
at 21, because we have an acquirable gap there.
                       startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), startOffset));
   }
   
   
   



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,23 @@ be removed once all the messages (0-99) are 
acknowledged (ACCEPT or REJECT).
             long firstKeyToRemove = cachedState.firstKey();
             long lastKeyToRemove;
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
+            // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
-                startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                if (initialReadGapOffset != null) {

Review Comment:
   ```suggestion
                   if (isInitialReadGapOffsetWindowActive()) {
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1791,10 +1875,23 @@ be removed once all the messages (0-99) are 
acknowledged (ACCEPT or REJECT).
             long firstKeyToRemove = cachedState.firstKey();
             long lastKeyToRemove;
             NavigableMap.Entry<Long, InFlightBatch> entry = 
cachedState.floorEntry(lastOffsetAcknowledged);
+            // If the lastOffsetAcknowledged is equal to the last offset of 
entry, then the entire batch can potentially be removed.
             if (lastOffsetAcknowledged == entry.getValue().lastOffset()) {
-                startOffset = cachedState.higherKey(lastOffsetAcknowledged);
+                if (initialReadGapOffset != null) {
+                    // This case will arise if we have a situation where there 
is an acquirable gap after the lastOffsetAcknowledged.
+                    // Ex, the cachedState has following state batches -> {(0, 
10), (11, 20), (31,40)} and al these batches are acked.
+                    // In this case, lastOffsetAcknowledged will be 20, but we 
cannot simply move the start offset to the first offset
+                    // of next cachedState batch. The startOffset should be at 
21, because we have an acquirable gap there.
+                    startOffset = 
Math.min(initialReadGapOffset.gapStartOffset(), 
cachedState.higherKey(lastOffsetAcknowledged));

Review Comment:
   Hmmm, should there be min or just set the startOffset to gapStartOffset?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to