junrao commented on code in PR #19197:
URL: https://github.com/apache/kafka/pull/19197#discussion_r2003759304


##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -88,8 +90,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
 
   override def onExpiration(): Unit = {
     // cancel the remote storage read task, if it has not been executed yet
-    val cancelled = remoteFetchTask.cancel(true)
-    if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: 
$remoteFetchInfo could not be cancelled and its isDone value is 
${remoteFetchTask.isDone}")
+    val cancelled = remoteFetchTask.cancel(false)

Review Comment:
   Could we add a comment on why it's important to cancel without interruption?



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
         this.remoteReadTimer = remoteReadTimer;
     }
 
+    public void cancel() {
+        LOGGER.debug("Cancelling remote log reader for topic partition {}", 
fetchInfo.topicPartition);
+        callback.accept(new RemoteLogReadResult(Optional.empty(), 
Optional.of(new InterruptedException("Cancelled remote log reader"))));
+        this.cancelled = true;
+    }
+
     @Override
     public Void call() {
+        if (cancelled) {

Review Comment:
   Yes. When the task is cancelled, if it hasn't been scheduled, executor 
service will skip it. If it has, we can just let it finish and the result will 
be ignored. The latter is a bit wasteful, but is rare. We could optimize that 
later if it becomes a real problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to