junrao commented on code in PR #19759:
URL: https://github.com/apache/kafka/pull/19759#discussion_r2105155776


##########
server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java:
##########
@@ -29,10 +28,9 @@
  * a delayed fetch operation could be waiting for a given number of bytes to 
accumulate.
  * <br/>
  * The logic upon completing a delayed operation is defined in onComplete() 
and will be called exactly once.
- * Once an operation is completed, isCompleted() will return true. 
onComplete() can be triggered by either
- * forceComplete(), which forces calling onComplete() after delayMs if the 
operation is not yet completed,
- * or tryComplete(), which first checks if the operation can be completed or 
not now, and if yes calls
- * forceComplete().
+ * Once an operation is completed, isCompleted() will return true. 
onComplete() is called from forceComplete(),
+ * which is triggered once by either expiration if the operation is not 
completed after delayMs, or tryComplete()

Review Comment:
   Actually, it's possible for forceComplete() to be called once from 
tryComplete() and another from expiration. So, we need to remove `once`. 



##########
server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java:
##########
@@ -68,24 +67,36 @@ public DelayedOperation(long delayMs, Lock lock) {
      * Return true iff the operation is completed by the caller: note that
      * concurrent threads can try to complete the same operation, but only
      * the first thread will succeed in completing the operation and return
-     * true, others will still return false
+     * true, others will still return false.
      */
     public boolean forceComplete() {
-        if (completed.compareAndSet(false, true)) {
-            // cancel the timeout timer
-            cancel();
-            onComplete();
-            return true;
-        } else {
+        // Do not proceed if the operation is already completed.

Review Comment:
   Well, the issue is that there are quite a few places where tryComplete() 
calls forceComplete() and checks the return value of forceComplete(). You fixed 
the issue in DelayedShareFetch, but not in other places. As we evolve the code, 
it would be useful to leave the code base in a better place for future 
developers, even though it takes a bit of more work. This can be done in a 
followup jira. Filed https://issues.apache.org/jira/browse/KAFKA-19325 to track 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to