apoorvmittal10 commented on code in PR #18696: URL: https://github.com/apache/kafka/pull/18696#discussion_r1930741313
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -629,15 +657,42 @@ public ShareAcquiredRecords acquire( if (subMap.isEmpty()) { log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", groupId, topicIdPartition); - return acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), + ShareAcquiredRecords acquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxFetchRecords); + // Since new records have been acquired, the window tracking the gap in cachedState might need to be reduced + maybeUpdateReadGapFetchOffset(lastBatch.lastOffset() + 1); Review Comment: This is incorrect. As the changes I made, you can see [here](https://github.com/apache/kafka/compare/trunk...apoorvmittal10:kafka:KAFKA-18494), that change goes inside the `acquireNewBatchRecords` because if `maxFetchRecords` is configured then not neccessarily all fetch records will be acquired. ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -2205,6 +2310,40 @@ Timer timer() { return timer; } + // Visible for testing + Optional<InitialReadGapOffset> initialReadGapOffset() { + return Optional.ofNullable(initialReadGapOffset); + } Review Comment: I am not getting point of wrapping it in Optional for tests? What benefit do you get? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -444,6 +450,8 @@ public CompletableFuture<Void> maybeInitialize() { stateEpoch = partitionData.stateEpoch(); List<PersisterStateBatch> stateBatches = partitionData.stateBatches(); + boolean isGapPresentInStateBatches = false; + long previousOffset = startOffset; Review Comment: I think the variable name `previousOffset` is confusing here. Probably you can name as `previousBatchLastOffset` and track that. Change if condition to `stateBatch.firstOffset() > previousBatchLastOffset + 1`. wdyt? ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -629,15 +657,42 @@ public ShareAcquiredRecords acquire( if (subMap.isEmpty()) { log.trace("No cached data exists for the share partition for requested fetch batch: {}-{}", groupId, topicIdPartition); - return acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), + ShareAcquiredRecords acquiredRecords = acquireNewBatchRecords(memberId, fetchPartitionData.records.batches(), firstBatch.baseOffset(), lastBatch.lastOffset(), batchSize, maxFetchRecords); + // Since new records have been acquired, the window tracking the gap in cachedState might need to be reduced + maybeUpdateReadGapFetchOffset(lastBatch.lastOffset() + 1); Review Comment: Also you should write a test case for this as well. cc: @adixitconfluent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org