adixitconfluent commented on code in PR #19592: URL: https://github.com/apache/kafka/pull/19592#discussion_r2071250949
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -291,11 +293,11 @@ public boolean tryComplete() { // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); - // Store the remote fetch info and the topic partition for which we need to perform remote fetch. - Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); Review Comment: Hi @junrao , thanks for pointing out this problem. I think this situation can potentially occur. In order to handle it, I propose that we can update the cached share partition's fetch offset metadata to `null` for this offset during `onComplete` while completing local log share fetch request. Something like below. I have added this in my latest commit. Let me know if this looks good. Thanks! ``` private void updateFetchOffsetMetadataForRemoteFetchPartitions( LinkedHashMap<TopicIdPartition, Long> topicPartitionData, LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse ) { replicaManagerReadResponse.forEach((topicIdPartition, logReadResult) -> { if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { SharePartition sharePartition = sharePartitions.get(topicIdPartition); sharePartition.updateFetchOffsetMetadata( topicPartitionData.get(topicIdPartition), null ); } }); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org