C0urante commented on a change in pull request #11524:
URL: https://github.com/apache/kafka/pull/11524#discussion_r755298524



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -132,6 +144,27 @@ public CommittableOffsets committableOffsets() {
         return new CommittableOffsets(offsets, totalCommittableMessages, 
totalUncommittableMessages, records.size(), largestDequeSize, 
largestDequePartition);
     }
 
+    /**
+     * Wait for all currently in-flight messages to be acknowledged, up to the 
requested timeout.
+     * @param timeout the maximum time to wait
+     * @param timeUnit the time unit of the timeout argument
+     * @return whether all in-flight messages were acknowledged before the 
timeout elapsed
+     */
+    public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
+        // Create a new message drain latch as a local variable to avoid 
SpotBugs warnings about inconsistent synchronization
+        // on an instance variable when invoking CountDownLatch::await outside 
a synchronized block
+        CountDownLatch messageDrainLatch;
+        synchronized (this) {
+            messageDrainLatch = new CountDownLatch(numUnackedMessages.get());
+            this.messageDrainLatch = messageDrainLatch;
+        }

Review comment:
       Thanks Randall. I agree that the synchronization here is, even if 
necessary, inelegant, and I hope that we can improve things. But I'm worried 
that the proposal here may be prone to a race condition.
   
   Imagine we restructure the code with your suggestions and the result is this:
   ```java
   
   class SubmittedRecords {
   
       private final AtomicReference messageDrainLatch = new 
AtomicReference<>();
   
       private boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
           // (2)
           CountDownLatch messageDrainLatch = 
this.messageDrainLatch.updateAndGet(existing -> new 
CountDownLatch(numUnackedMessages.get()));
           try {
               return messageDrainLatch.await(timeout, timeUnit);
           } catch (InterruptedException e) {
               return false;
           }
       }
   
       private void messageAcked() {
           // (1)
           numUnackedMessages.decrementAndGet();
           // (3)
           CountDownLatch messageDrainLatch = this.messageDrainLatch.get();
           if (messageDrainLatch != null) {
               messageDrainLatch.countDown();
           }
       }
   }
   ```
   
   Isn't it still possible that the lines marked `(1)`, `(2)`, and `(3)` could 
execute in that order? And in that case, wouldn't it cause `awaitAllMessages` 
to return early since the `CountDownLatch` created in part `(2)` would use the 
already-decremented value of `numUnackedMessages` after part `(1)` was 
executed, but then also be counted down for the same message in part `(3)`?
   
   FWIW, I originally used a `volatile int` for the `numUnackedMessages` field, 
but got a SpotBugs warning about incrementing a volatile field being a 
non-atomic operation for lines like `numUnackedMessages++;` in 
`SubmittedRecords::submit`. If we synchronize every access to that field, it 
shouldn't matter that increments/decrements are non-atomic, and we can consider 
adding an exemption to `spotbugs-exclude.xml`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to