junrao commented on code in PR #19197:
URL: https://github.com/apache/kafka/pull/19197#discussion_r1994095620


##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -88,8 +90,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
 
   override def onExpiration(): Unit = {
     // cancel the remote storage read task, if it has not been executed yet
-    val cancelled = remoteFetchTask.cancel(true)
-    if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: 
$remoteFetchInfo could not be cancelled and its isDone value is 
${remoteFetchTask.isDone}")
+    val cancelled = remoteFetchTask.cancel(false)

Review Comment:
   Hmm, the original logic doesn't look quite right. When a delayed task 
expires due to the timeout, we first call `onComplete()` and then 
`onExpiration()`. So cancelling the task in `onExpiration()` is too late since 
we call `remoteFetchResult.get` in `onComplete()`. 
   
   To address this issue, we could call `remoteFetchTask.cancel()` in 
`onComplete()` if `remoteFetchResult.isDone` is false.
   



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
         this.remoteReadTimer = remoteReadTimer;
     }
 
+    public void cancel() {
+        LOGGER.debug("Cancelling remote log reader for topic partition {}", 
fetchInfo.topicPartition);
+        callback.accept(new RemoteLogReadResult(Optional.empty(), 
Optional.of(new InterruptedException("Cancelled remote log reader"))));

Review Comment:
   1. Hmm, doing this here would mean that we could call the callback twice?
   2. Another problem is that the caller is from the 
DelayedRemoteFetch.onComplete(). The callback calls 
`delayedRemoteFetchPurgatory.checkAndComplete(key)`. Ideally, we don't want to 
call the purgatory while we are inside the purgatory to avoid potential 
deadlocks and infinite stacks.



##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -36,6 +37,7 @@ import scala.collection._
  * in the remote fetch operation purgatory
  */
 class DelayedRemoteFetch(remoteFetchTask: Future[Void],
+                         remoteLogReader: RemoteLogReader,

Review Comment:
   This is an existing issue. It would be better if we could consolidate 
remoteFetchTask and remoteFetchResult into a single future.



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
         this.remoteReadTimer = remoteReadTimer;
     }
 
+    public void cancel() {
+        LOGGER.debug("Cancelling remote log reader for topic partition {}", 
fetchInfo.topicPartition);
+        callback.accept(new RemoteLogReadResult(Optional.empty(), 
Optional.of(new InterruptedException("Cancelled remote log reader"))));
+        this.cancelled = true;
+    }
+
     @Override
     public Void call() {
+        if (cancelled) {

Review Comment:
   We probably want to pass `cancelled` to `rlm.read()` so that we could 
complete the task early if `cancelled` is true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to