apoorvmittal10 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1930741313


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -629,15 +657,42 @@ public ShareAcquiredRecords acquire(
             if (subMap.isEmpty()) {
                 log.trace("No cached data exists for the share partition for 
requested fetch batch: {}-{}",
                     groupId, topicIdPartition);
-                return acquireNewBatchRecords(memberId, 
fetchPartitionData.records.batches(),
+                ShareAcquiredRecords acquiredRecords =  
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
                     firstBatch.baseOffset(), lastBatch.lastOffset(), 
batchSize, maxFetchRecords);
+                // Since new records have been acquired, the window tracking 
the gap in cachedState might need to be reduced
+                maybeUpdateReadGapFetchOffset(lastBatch.lastOffset() + 1);

Review Comment:
   This is incorrect. As the changes I made, you can see 
[here](https://github.com/apache/kafka/compare/trunk...apoorvmittal10:kafka:KAFKA-18494),
 that change goes inside the  `acquireNewBatchRecords` because if 
`maxFetchRecords` is configured then not neccessarily all fetch records will be 
acquired.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2205,6 +2310,40 @@ Timer timer() {
         return timer;
     }
 
+    // Visible for testing
+    Optional<InitialReadGapOffset> initialReadGapOffset() {
+        return Optional.ofNullable(initialReadGapOffset);
+    }

Review Comment:
   I am not getting point of wrapping it in Optional for tests? What benefit do 
you get?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -444,6 +450,8 @@ public CompletableFuture<Void> maybeInitialize() {
                 stateEpoch = partitionData.stateEpoch();
 
                 List<PersisterStateBatch> stateBatches = 
partitionData.stateBatches();
+                boolean isGapPresentInStateBatches = false;
+                long previousOffset = startOffset;

Review Comment:
   I think the variable name `previousOffset` is confusing here. Probably you 
can name as `previousBatchLastOffset` and track that. Change if condition to 
`stateBatch.firstOffset() > previousBatchLastOffset + 1`. wdyt?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -629,15 +657,42 @@ public ShareAcquiredRecords acquire(
             if (subMap.isEmpty()) {
                 log.trace("No cached data exists for the share partition for 
requested fetch batch: {}-{}",
                     groupId, topicIdPartition);
-                return acquireNewBatchRecords(memberId, 
fetchPartitionData.records.batches(),
+                ShareAcquiredRecords acquiredRecords =  
acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(),
                     firstBatch.baseOffset(), lastBatch.lastOffset(), 
batchSize, maxFetchRecords);
+                // Since new records have been acquired, the window tracking 
the gap in cachedState might need to be reduced
+                maybeUpdateReadGapFetchOffset(lastBatch.lastOffset() + 1);

Review Comment:
   Also you should write a test case for this as well.
   cc: @adixitconfluent 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to