apoorvmittal10 commented on code in PR #19010:
URL: https://github.com/apache/kafka/pull/19010#discussion_r1969396237


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1061,33 +1100,105 @@ void updateCacheAndOffsets(long logStartOffset) {
         }
     }
 
+    /**
+     * The method archives the available records in the cached state that are 
between the fetch offset
+     * and the base offset of the first fetched batch. This method is required 
to handle the compacted
+     * topics where the already fetched batch which is marked re-available, 
might not result in subsequent
+     * fetch response from log. Hence, the batches need to be archived to 
allow the SPSO and next fetch
+     * offset to progress.
+     *
+     * @param fetchOffset The fetch offset.
+     * @param baseOffset  The base offset of the first fetched batch.
+     */
+    private void maybeArchiveStaleBatches(long fetchOffset, long baseOffset) {
+        lock.writeLock().lock();
+        try {
+            // If the fetch happens from within a batch then fetchOffset can 
be ahead of base offset else
+            // should be same as baseOffset of the first fetched batch. 
Otherwise, we might need to archive
+            // some stale batches.
+            if (cachedState.isEmpty() || fetchOffset >= baseOffset) {
+                // No stale batches to archive.
+                return;
+            }
+
+            // The fetch offset can exist in the middle of the batch. Hence, 
find the floor offset
+            // for the fetch offset and then find the sub-map from the floor 
offset to the base offset.
+            long floorOffset = fetchOffset;
+            Map.Entry<Long, InFlightBatch> floorEntry = 
cachedState.floorEntry(fetchOffset);
+            if (floorEntry != null && floorEntry.getValue().lastOffset() >= 
fetchOffset) {

Review Comment:
   To validate if really the the `fetchOffset` is within the floorEntry batch. 
Say if cache holds [0-5], [10-11] and fetchOffset is 8, then the floorEntry 
will be of [0-5]. In that case, we should not update floorOffset as the 
floorEntry batch is prior fetchOffset.
   I understand that there might a question regarding whether such scenario can 
happen? Well, not that I could think of i.e. the fetchOffset should be either 
at the end or anywhere within the cache where an offset is marked available. We 
shall ideally get a specific batch which should contain the fetchOffset. 
However, this seems to be a right and safe check for now where the processing 
would be deterministic and should not fetch unneccessary batches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to