C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1107391193


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##########
@@ -365,6 +365,10 @@ public void execute() {
         } catch (InterruptedException e) {
             // Ignore and allow to exit.
         } catch (RuntimeException e) {
+            if (isCancelled()) {
+                log.debug("Skipping final offset commit as task has been 
cancelled");
+                throw e;
+            }

Review Comment:
   Honestly not sure why we didn't put this here to begin with. Nice 👍



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##########
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws 
Exception {
         // First time the write fails
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, true, null);
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
         // Second time it succeeds
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, null);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
 
         // Third time it has no data to flush so we won't get past beginFlush()
-        assertFalse(writer.beginFlush());
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
     }
 
     @Test
-    public void testAlreadyFlushing() {
+    public void testAlreadyFlushing() throws InterruptedException, 
TimeoutException {
         @SuppressWarnings("unchecked")
         final Callback<Void> callback = mock(Callback.class);
         // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
         CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Still think this may apply here



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -100,23 +104,45 @@ private boolean flushing() {
 
     /**
      * Performs the first step of a flush operation, snapshotting the current 
state. This does not
-     * actually initiate the flush with the underlying storage.
+     * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+     * have finished before beginning a new flush.
      *
      * @return true if a flush was initiated, false if no data was available
+     * @throws ConnectException if the previous flush is not complete before 
this method is called
      */
-    public synchronized boolean beginFlush() {
-        if (flushing()) {
-            log.error("Invalid call to OffsetStorageWriter flush() while 
already flushing, the "
+    public boolean beginFlush() {
+        try {
+            return beginFlush(0, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException | TimeoutException e) {
+            log.error("Invalid call to OffsetStorageWriter beginFlush() while 
already flushing, the "
                     + "framework should not allow this");
             throw new ConnectException("OffsetStorageWriter is already 
flushing");
         }
+    }
 
-        if (data.isEmpty())
-            return false;
-
-        toFlush = data;
-        data = new HashMap<>();
-        return true;
+    /**
+     * Performs the first step of a flush operation, snapshotting the current 
state. This does not
+     * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+     * have finished before beginning a new flush.
+     *
+     * @param timeout A maximum duration to wait for previous flushes to 
finish before giving up on waiting
+     * @param timeUnit Units of the timeout argument
+     * @return true if a flush was initiated, false if no data was available
+     * @throws InterruptedException if this thread was interrupted while 
waiting for the previous flush to complete
+     * @throws TimeoutException if the {@code timeout} elapses before previous 
flushes are complete.
+     */
+    public boolean beginFlush(long timeout, TimeUnit timeUnit) throws 
InterruptedException, TimeoutException {
+        if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) {
+            synchronized (this) {
+                if (data.isEmpty())
+                    return false;

Review Comment:
   Should we release the semaphore here?
   
   And either way, we should add some detail to the Javadocs for this method 
and `cancelFlush` / `doFlush` so that people understand the conditions that 
cause a flush to transition from in-progress to completed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to