junrao commented on code in PR #19197: URL: https://github.com/apache/kafka/pull/19197#discussion_r1994095620
########## core/src/main/scala/kafka/server/DelayedRemoteFetch.scala: ########## @@ -88,8 +90,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], override def onExpiration(): Unit = { // cancel the remote storage read task, if it has not been executed yet - val cancelled = remoteFetchTask.cancel(true) - if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}") + val cancelled = remoteFetchTask.cancel(false) Review Comment: Hmm, the original logic doesn't look quite right. When a delayed task expires due to the timeout, we first call `onComplete()` and then `onExpiration()`. So cancelling the task in `onExpiration()` is too late since we call `remoteFetchResult.get` in `onComplete()`. To address this issue, we could call `remoteFetchTask.cancel()` in `onComplete()` if `remoteFetchResult.isDone` is false. ########## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ########## @@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, this.remoteReadTimer = remoteReadTimer; } + public void cancel() { + LOGGER.debug("Cancelling remote log reader for topic partition {}", fetchInfo.topicPartition); + callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.of(new InterruptedException("Cancelled remote log reader")))); Review Comment: 1. Hmm, doing this here would mean that we could call the callback twice? 2. Another problem is that the caller is from the DelayedRemoteFetch.onComplete(). The callback calls `delayedRemoteFetchPurgatory.checkAndComplete(key)`. Ideally, we don't want to call the purgatory while we are inside the purgatory to avoid potential deadlocks and infinite stacks. ########## core/src/main/scala/kafka/server/DelayedRemoteFetch.scala: ########## @@ -36,6 +37,7 @@ import scala.collection._ * in the remote fetch operation purgatory */ class DelayedRemoteFetch(remoteFetchTask: Future[Void], + remoteLogReader: RemoteLogReader, Review Comment: This is an existing issue. It would be better if we could consolidate remoteFetchTask and remoteFetchResult into a single future. ########## core/src/main/java/kafka/log/remote/RemoteLogReader.java: ########## @@ -57,8 +59,18 @@ public RemoteLogReader(RemoteStorageFetchInfo fetchInfo, this.remoteReadTimer = remoteReadTimer; } + public void cancel() { + LOGGER.debug("Cancelling remote log reader for topic partition {}", fetchInfo.topicPartition); + callback.accept(new RemoteLogReadResult(Optional.empty(), Optional.of(new InterruptedException("Cancelled remote log reader")))); + this.cancelled = true; + } + @Override public Void call() { + if (cancelled) { Review Comment: We probably want to pass `cancelled` to `rlm.read()` so that we could complete the task early if `cancelled` is true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org