adixitconfluent commented on code in PR #19592:
URL: https://github.com/apache/kafka/pull/19592#discussion_r2071250949


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -291,11 +293,11 @@ public boolean tryComplete() {
                 // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
                 // those topic partitions.
                 LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
-                // Store the remote fetch info and the topic partition for 
which we need to perform remote fetch.
-                Optional<TopicPartitionRemoteFetchInfo> 
topicPartitionRemoteFetchInfoOpt = 
maybePrepareRemoteStorageFetchInfo(topicPartitionData, 
replicaManagerReadResponse);

Review Comment:
   Hi @junrao , thanks for pointing out this problem. I think this situation 
can potentially occur. In order to handle it, I propose that we can update the 
cached share partition's fetch offset metadata to `null` for this offset during 
`onComplete` while completing local log share fetch request. Something like 
below. I have added this in my latest commit. Let me know if this looks good. 
Thanks!
   
   ```
       private void updateFetchOffsetMetadataForRemoteFetchPartitions(
           LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
           LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse
       ) {
           replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) 
-> {
               if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
                   SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
                   sharePartition.updateFetchOffsetMetadata(
                       topicPartitionData.get(topicIdPartition),
                       null
                   );
               }
           });
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to