C0urante commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2056973237
Thanks Sagar, great catch! I suspected this would be a gnarly one to tackle but it's turning out to be even harder than I thought. I think there's still an issue with the current state of the PR. It looks like we aren't blocking on the future returned by `setPrimaryThenSecondary`, which means that we may spuriously return early from `get` in the future we're returning from `ConnectorOffsetBackingStore::set` if the write to the primary store hasn't completed yet. I believe this is missed by tests because the producer writes we mock out all take place synchronously; maybe we can use the `MockProducer` more idiomatically to simulate records being ack'd after calls to `MockProducer::send` have returned? I've sketched a new kind of `Future` implementation that seems to do the trick, though I haven't tested it rigorously: ```java private class ChainedOffsetWriteFuture implements Future<Void> { private final OffsetBackingStore primaryStore; private final OffsetBackingStore secondaryStore; private final Map<ByteBuffer, ByteBuffer> completeOffsets; private final Map<ByteBuffer, ByteBuffer> regularOffsets; private final Callback<Void> callback; private final AtomicReference<Throwable> writeError; private final CountDownLatch completed; public ChainedOffsetWriteFuture( OffsetBackingStore primaryStore, OffsetBackingStore secondaryStore, Map<ByteBuffer, ByteBuffer> completeOffsets, Map<ByteBuffer, ByteBuffer> regularOffsets, Map<ByteBuffer, ByteBuffer> tombstoneOffsets, Callback<Void> callback ) { this.primaryStore = primaryStore; this.secondaryStore = secondaryStore; this.completeOffsets = completeOffsets; this.regularOffsets = regularOffsets; this.callback = callback; this.writeError = new AtomicReference<>(); this.completed = new CountDownLatch(1); secondaryStore.set(tombstoneOffsets, this::onFirstWrite); } private void onFirstWrite(Throwable error, Void ignored) { if (error != null) { log.trace("Skipping offsets write to primary store because secondary tombstone write has failed", error); try (LoggingContext context = loggingContext()) { callback.onCompletion(error, ignored); writeError.compareAndSet(null, error); completed.countDown(); } return; } setPrimaryThenSecondary(primaryStore, secondaryStore, completeOffsets, regularOffsets, this::onSecondWrite); } private void onSecondWrite(Throwable error, Void ignored) { callback.onCompletion(error, ignored); writeError.compareAndSet(null, error); completed.countDown(); } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return completed.getCount() == 0; } @Override public Void get() throws InterruptedException, ExecutionException { completed.await(); if (writeError.get() != null) { throw new ExecutionException(writeError.get()); } return null; } @Override public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (!completed.await(timeout, unit)) { throw new TimeoutException("Failed to complete offset write in time"); } if (writeError.get() != null) { throw new ExecutionException(writeError.get()); } return null; } } ``` (I've omitted an implementation of `cancel` and `isCancelled` for now since I'm not sure it's really necessary, but LMK if I've missed a case where this would make a difference.) The new class can be used at the end of `ConnectorOffsetBackingStore::set` like this: ```java if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { return new ChainedOffsetWriteFuture( primaryStore, secondaryStore, values, regularOffsets, tombstoneOffsets, callback ); } else { return setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org