junrao commented on code in PR #19592: URL: https://github.com/apache/kafka/pull/19592#discussion_r2070639999
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -668,21 +677,24 @@ private void processRemoteFetchOrException( private boolean maybeCompletePendingRemoteFetch() { boolean canComplete = false; - TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition(); - try { - replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); - } catch (KafkaStorageException e) { // Case a - log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); - canComplete = true; - } catch (UnknownTopicOrPartitionException e) { // Case b - log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); - canComplete = true; - } catch (NotLeaderOrFollowerException e) { // Case c - log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); - canComplete = true; + for (TopicIdPartition topicIdPartition : pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) { Review Comment: Could we change the comment for case d to say "All remote storage read requests completed"? ########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -291,11 +293,11 @@ public boolean tryComplete() { // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); - // Store the remote fetch info and the topic partition for which we need to perform remote fetch. - Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); Review Comment: There seems to be an existing issue with remote fetch. Consider the following. 1. tryComplete() is called on 1 partition. The fetch offset is still in local log and readFromLog() returns a local fetchOffsetMetadata, which is then cached in sharePartition. The partition doesn't satisfy minBytes yet. tryCompete() returns false. 2. tryComplete() is called again on that partition. Since fetchOffsetMetadata is cached in sharePartition, readFromLog() is not called. Now, the partition satisfies minBytes. 3. onComplete() is called. The cached fetchOffsetMetadata is now only available in remote storage and readFromLog() returns an empty Records and a non-empty `logReadResult.info().delayedRemoteStorageFetch`. Since there is no logic to handle remote fetch in completeLocalLogShareFetchRequest(), an empty response is sent to the client. 4. The client will fetch with the same offset and step 2 and 3 will be repeated. If a client gets into this situation, it will never make progress. Is this correct? ########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -1287,13 +1303,15 @@ public void testRemoteStorageFetchTryCompleteThrowsException() { assertTrue(delayedShareFetch.isCompleted()); // The future of shareFetch completes. assertTrue(shareFetch.isCompleted()); + // The remoteFetchTask created for tp1 is cancelled successfully. + assertTrue(remoteFetchTask.isCancelled()); assertFalse(future.isCompletedExceptionally()); - assertEquals(Set.of(tp1), future.join().keySet()); + assertEquals(Set.of(tp1, tp2), future.join().keySet()); // Exception occurred and was handled. - Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); - // Verify the locks are released for both local and remote read topic partitions tp0 and tp1 because of exception occurrence. + Mockito.verify(exceptionHandler, times(2)).accept(any(), any()); + // Verify the locks are released for both local and remote read topic partitions tp0, tp1 and tp2 because of exception occurrence. Review Comment: both => all -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org