junrao commented on code in PR #19197: URL: https://github.com/apache/kafka/pull/19197#discussion_r2003759304
########## core/src/main/scala/kafka/server/DelayedRemoteFetch.scala: ########## @@ -88,8 +90,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], override def onExpiration(): Unit = { // cancel the remote storage read task, if it has not been executed yet - val cancelled = remoteFetchTask.cancel(true) - if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}") + val cancelled = remoteFetchTask.cancel(false) Review Comment: Could we add a comment on why it's important to cancel without interruption? ########## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ########## @@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, this.remoteReadTimer = remoteReadTimer; } + public void cancel() { + LOGGER.debug("Cancelling remote log reader for topic partition {}", fetchInfo.topicPartition); + callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.of(new InterruptedException("Cancelled remote log reader")))); + this.cancelled = true; + } + @Override public Void call() { + if (cancelled) { Review Comment: Yes. When the task is cancelled, if it hasn't been scheduled, executor service will skip it. If it has, we can just let it finish and the result will be ignored. The latter is a bit wasteful, but is rare. We could optimize that later if it becomes a real problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org