C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1100459265


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -98,6 +104,24 @@ private boolean flushing() {
         return toFlush != null;
     }
 
+    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit 
timeUnit) throws InterruptedException, TimeoutException {

Review Comment:
   It seems like we're bending over backwards here to accommodate an assumption 
made in `beginFlush` that we'll never try to trigger two offset flushes at 
once, which is clearly false given the conditions that necessitate this fix 
(i.e., a task's end-of-life offset flush is triggered at the same time as its 
periodic offset flush).
   
   Given that, do we really need a separate method here, or can we relax the 
constraints in `beginFlush` to wait for in-progress flushes to conclude instead 
of throwing an exception if there are any?
   
   Additionally, it seems like the use of a `CompleteableFuture` here is a bit 
strange. Would a `Semaphore` or `CountDownLatch` be more suited?
   
   Finally--since this change may lead to us performing double offset commits 
when a task is being shut down, do you think it might also make sense to add a 
`close` method to the offset writer that throws an exception for any further 
attempts to flush, and possibly forcibly terminates any in-progress flushes? We 
can invoke that in `AbstractWorkerTask::cancel` (or possibly 
`WorkerSourceTask::cancel` if a different approach is necessary to preserve 
exactly-once semantics) to help tasks complete shutdown within the timeout 
allotted to them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to