junrao commented on code in PR #19592:
URL: https://github.com/apache/kafka/pull/19592#discussion_r2070639999


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -668,21 +677,24 @@ private void processRemoteFetchOrException(
     private boolean maybeCompletePendingRemoteFetch() {
         boolean canComplete = false;
 
-        TopicIdPartition topicIdPartition = 
remoteFetchOpt.get().topicIdPartition();
-        try {
-            
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
-        } catch (KafkaStorageException e) { // Case a
-            log.debug("TopicPartition {} is in an offline log directory, 
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
-            canComplete = true;
-        } catch (UnknownTopicOrPartitionException e) { // Case b
-            log.debug("Broker no longer knows of topicPartition {}, satisfy {} 
immediately", topicIdPartition, shareFetch.fetchParams());
-            canComplete = true;
-        } catch (NotLeaderOrFollowerException e) { // Case c
-            log.debug("Broker is no longer the leader or follower of 
topicPartition {}, satisfy {} immediately", topicIdPartition, 
shareFetch.fetchParams());
-            canComplete = true;
+        for (TopicIdPartition topicIdPartition : 
pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) {

Review Comment:
   Could we change the comment for case d to say "All remote storage read 
requests completed"?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -291,11 +293,11 @@ public boolean tryComplete() {
                 // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
                 // those topic partitions.
                 LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
-                // Store the remote fetch info and the topic partition for 
which we need to perform remote fetch.
-                Optional<TopicPartitionRemoteFetchInfo> 
topicPartitionRemoteFetchInfoOpt = 
maybePrepareRemoteStorageFetchInfo(topicPartitionData, 
replicaManagerReadResponse);

Review Comment:
   There seems to be an existing issue with remote fetch. Consider the 
following.
   1. tryComplete() is called on 1 partition. The fetch offset is still in 
local log and readFromLog() returns a local fetchOffsetMetadata, which is then 
cached in sharePartition. The partition doesn't satisfy minBytes yet. 
tryCompete() returns false.
   2. tryComplete() is called again on that partition. Since 
fetchOffsetMetadata is cached in sharePartition, readFromLog() is not called. 
Now, the partition satisfies minBytes.
   3. onComplete() is called. The cached fetchOffsetMetadata is now only 
available in remote storage and readFromLog() returns an empty Records and a 
non-empty `logReadResult.info().delayedRemoteStorageFetch`. Since there is no 
logic to handle remote fetch in completeLocalLogShareFetchRequest(), an empty 
response is sent to the client.
   4. The client will fetch with the same offset and step 2 and 3 will be 
repeated.
   
   If a client gets into this situation, it will never make progress. Is this 
correct?



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1287,13 +1303,15 @@ public void 
testRemoteStorageFetchTryCompleteThrowsException() {
         assertTrue(delayedShareFetch.isCompleted());
         // The future of shareFetch completes.
         assertTrue(shareFetch.isCompleted());
+        // The remoteFetchTask created for tp1 is cancelled successfully.
+        assertTrue(remoteFetchTask.isCancelled());
         assertFalse(future.isCompletedExceptionally());
-        assertEquals(Set.of(tp1), future.join().keySet());
+        assertEquals(Set.of(tp1, tp2), future.join().keySet());
         // Exception occurred and was handled.
-        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
-        // Verify the locks are released for both local and remote read topic 
partitions tp0 and tp1 because of exception occurrence.
+        Mockito.verify(exceptionHandler, times(2)).accept(any(), any());
+        // Verify the locks are released for both local and remote read topic 
partitions tp0, tp1 and tp2 because of exception occurrence.

Review Comment:
   both => all 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to