adixitconfluent commented on code in PR #19592: URL: https://github.com/apache/kafka/pull/19592#discussion_r2071960339
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -248,6 +250,8 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI // updated in a different tryComplete thread. responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched); + updateFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, responseData); Review Comment: Added the check for `logReadResult.info().delayedRemoteStorageFetch.isEmpty()` in `processAcquiredTopicPartitionsForLocalLogFetch` as well ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -272,13 +276,39 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI } } + /** + * This function updates the cached fetch offset metadata to null corresponding to the share partition's fetch offset. + * This is required in the case when a topic partition that has local log fetch during tryComplete changes to remote + * storage fetch in onComplete. In this situation, if the cached fetchOffsetMetadata got updated in tryComplete, then + * we will enter a state where each share fetch request for this topic partition from client will use the cached + * fetchOffsetMetadata in tryComplete and return an empty response to the client from onComplete. + * Hence, we require to set offsetMetadata to null for this fetch offset, which would cause tryComplete to update + * fetchOffsetMetadata and thereby we will identify this partition for remote storage fetch. + * @param topicPartitionData - Map containing the fetch offset for the topic partitions. + * @param replicaManagerReadResponse - Map containing the readFromLog response from replicaManager for the topic partitions. + */ + private void updateFetchOffsetMetadataForRemoteFetchPartitions( Review Comment: Yes to both the suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org