C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2056973237

   Thanks Sagar, great catch! I suspected this would be a gnarly one to tackle 
but it's turning out to be even harder than I thought.
   
   I think there's still an issue with the current state of the PR. It looks 
like we aren't blocking on the future returned by `setPrimaryThenSecondary`, 
which means that we may spuriously return early from `get` in the future we're 
returning from `ConnectorOffsetBackingStore::set` if the write to the primary 
store hasn't completed yet. I believe this is missed by tests because the 
producer writes we mock out all take place synchronously; maybe we can use the 
`MockProducer` more idiomatically to simulate records being ack'd after calls 
to `MockProducer::send` have returned?
   
   I've sketched a new kind of `Future` implementation that seems to do the 
trick, though I haven't tested it rigorously:
   
   ```java
   private class ChainedOffsetWriteFuture implements Future<Void> {
   
       private final OffsetBackingStore primaryStore;
       private final OffsetBackingStore secondaryStore;
       private final Map<ByteBuffer, ByteBuffer> completeOffsets;
       private final Map<ByteBuffer, ByteBuffer> regularOffsets;
       private final Callback<Void> callback;
       private final AtomicReference<Throwable> writeError;
       private final CountDownLatch completed;
   
       public ChainedOffsetWriteFuture(
               OffsetBackingStore primaryStore,
               OffsetBackingStore secondaryStore,
               Map<ByteBuffer, ByteBuffer> completeOffsets,
               Map<ByteBuffer, ByteBuffer> regularOffsets,
               Map<ByteBuffer, ByteBuffer> tombstoneOffsets,
               Callback<Void> callback
       ) {
           this.primaryStore = primaryStore;
           this.secondaryStore = secondaryStore;
           this.completeOffsets = completeOffsets;
           this.regularOffsets = regularOffsets;
           this.callback = callback;
           this.writeError = new AtomicReference<>();
           this.completed = new CountDownLatch(1);
   
           secondaryStore.set(tombstoneOffsets, this::onFirstWrite);
       }
   
       private void onFirstWrite(Throwable error, Void ignored) {
           if (error != null) {
               log.trace("Skipping offsets write to primary store because 
secondary tombstone write has failed", error);
               try (LoggingContext context = loggingContext()) {
                   callback.onCompletion(error, ignored);
                   writeError.compareAndSet(null, error);
                   completed.countDown();
               }
               return;
           }
           setPrimaryThenSecondary(primaryStore, secondaryStore, 
completeOffsets, regularOffsets, this::onSecondWrite);
       }
   
       private void onSecondWrite(Throwable error, Void ignored) {
           callback.onCompletion(error, ignored);
           writeError.compareAndSet(null, error);
           completed.countDown();
       }
   
       @Override
       public boolean cancel(boolean mayInterruptIfRunning) {
           return false;
       }
   
       @Override
       public boolean isCancelled() {
           return false;
       }
   
       @Override
       public boolean isDone() {
           return completed.getCount() == 0;
       }
   
       @Override
       public Void get() throws InterruptedException, ExecutionException {
           completed.await();
           if (writeError.get() != null) {
               throw new ExecutionException(writeError.get());
           }
           return null;
       }
   
       @Override
       public Void get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
           if (!completed.await(timeout, unit)) {
               throw new TimeoutException("Failed to complete offset write in 
time");
           }
           if (writeError.get() != null) {
               throw new ExecutionException(writeError.get());
           }
           return null;
       }
   }
   ```
   
   (I've omitted an implementation of `cancel` and `isCancelled` for now since 
I'm not sure it's really necessary, but LMK if I've missed a case where this 
would make a difference.)
   
   The new class can be used at the end of `ConnectorOffsetBackingStore::set` 
like this:
   
   ```java
   if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
       return new ChainedOffsetWriteFuture(
               primaryStore,
               secondaryStore,
               values,
               regularOffsets,
               tombstoneOffsets,
               callback
       );
   } else {
       return setPrimaryThenSecondary(primaryStore, secondaryStore, values, 
regularOffsets, callback);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to