adixitconfluent commented on code in PR #17739: URL: https://github.com/apache/kafka/pull/17739#discussion_r1841663194
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -90,39 +90,50 @@ public void onExpiration() { */ @Override public void onComplete() { + // We are utilizing lock so that onComplete doesn't do a dirty read for global variables - + // partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. + lock.lock(); log.trace("Completing the delayed share fetch request for group {}, member {}, " + "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(), partitionsAcquired.keySet()); - if (shareFetchData.future().isDone()) - return; + try { + if (shareFetchData.future().isDone()) + return; Review Comment: Hi @junrao , you're right. There was a gap in my understanding of purgatory operation where I thought the the copy of the operation goes to multiple watch keys used for that operation, but this line in documentation cleared it out. ``` Note that a delayed operation can be watched on multiple keys. It is possible that an operation is completed after it has been added to the watch list for some, but not all the keys. In this case, the operation is considered completed and won't be added to the watch list of the remaining keys. The expiration reaper thread will remove this operation from any watcher list in which the operation exists. ``` Hence, I've removed the mentioned condition from the code now. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org