adixitconfluent commented on code in PR #19010:
URL: https://github.com/apache/kafka/pull/19010#discussion_r1969052794


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -615,13 +615,44 @@ public long nextFetchOffset() {
      * Acquire the fetched records for the share partition. The acquired 
records are added to the
      * in-flight records and the next fetch offset is updated to the next 
offset that should be
      * fetched from the leader.
+     * <p>
+     * The method always acquire the full batch records. The cache state can 
consist of multiple
+     * full batches as a single batch. This behavior is driven by client 
configurations (batch size
+     * and max fetch records) and allows for efficient client 
acknowledgements. However, partial batches
+     * can exist in the cache only after a leader change and partial 
acknowledgements have been persisted
+     * prior leader change. In such case, when a share partition loses track 
of a batch's start and
+     * end offsets (e.g., after a leader change and partial acknowledgements), 
the cache stores the
+     * batch based on the offset range provided by the persister. This method 
handles these special
+     * batches by maintaining this range up to the last offset returned by the 
persister.  No special
+     * handling is required after wards; the cache will eventually return to 
managing full batches.

Review Comment:
   nit: I guess afterwards not after wards



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1061,33 +1100,105 @@ void updateCacheAndOffsets(long logStartOffset) {
         }
     }
 
+    /**
+     * The method archives the available records in the cached state that are 
between the fetch offset
+     * and the base offset of the first fetched batch. This method is required 
to handle the compacted
+     * topics where the already fetched batch which is marked re-available, 
might not result in subsequent
+     * fetch response from log. Hence, the batches need to be archived to 
allow the SPSO and next fetch
+     * offset to progress.
+     *
+     * @param fetchOffset The fetch offset.
+     * @param baseOffset  The base offset of the first fetched batch.
+     */
+    private void maybeArchiveStaleBatches(long fetchOffset, long baseOffset) {
+        lock.writeLock().lock();
+        try {
+            // If the fetch happens from within a batch then fetchOffset can 
be ahead of base offset else
+            // should be same as baseOffset of the first fetched batch. 
Otherwise, we might need to archive
+            // some stale batches.
+            if (cachedState.isEmpty() || fetchOffset >= baseOffset) {
+                // No stale batches to archive.
+                return;
+            }
+
+            // The fetch offset can exist in the middle of the batch. Hence, 
find the floor offset
+            // for the fetch offset and then find the sub-map from the floor 
offset to the base offset.
+            long floorOffset = fetchOffset;
+            Map.Entry<Long, InFlightBatch> floorEntry = 
cachedState.floorEntry(fetchOffset);
+            if (floorEntry != null && floorEntry.getValue().lastOffset() >= 
fetchOffset) {

Review Comment:
   Question: Why do we need the second condition here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to