C0urante commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1268520485
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,10 +280,33 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } Review Comment: Probably time to update this now? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,8 +284,51 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.containsValue(null); + + // If there are tombstone offsets, then the failure to write to secondary store will + // not be ignored. Also, for tombstone records, we first write to secondary store and + // then to primary stores. + if (secondaryStore != null && containsTombstones) { + AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new AtomicReference<>(); Review Comment: We shouldn't be using an `AtomicReference` here. The reason it's used in the [linked snippet](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484) in the `ConsumerCoordinator` class is to ensure that, in the event that multiple errors occur, we don't overwrite the first exception that we saw. That's not a possibility here since `secondaryStoreTombstoneWriteError` is only ever updated in separate `catch` clauses for the same `try` block, which means that it's guaranteed to never be updated more than once. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,8 +284,51 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.containsValue(null); + + // If there are tombstone offsets, then the failure to write to secondary store will + // not be ignored. Also, for tombstone records, we first write to secondary store and + // then to primary stores. + if (secondaryStore != null && containsTombstones) { + AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new AtomicReference<>(); + FutureCallback<Void> secondaryWriteFuture = new FutureCallback<>(); + secondaryStore.set(values, secondaryWriteFuture); + try { + // For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for + // commits. We still need to wait because we want to fail the offset commit for cases when Review Comment: I like the general idea here right now: block indefinitely for exactly-once, block within the offset timeout otherwise. We also note in the [docs](https://kafka.apache.org/documentation.html#connectconfigs_offset.flush.timeout.ms) for the `offset.flush.timeout.ms` property that it "has no effect for source connectors running with exactly-once support". I don't think we need to worry about placing an upper bound on the time we take with exactly-once support enabled. If we did, it would make tasks more brittle (remember, we fail tasks when offset commits fail in this mode), and preemptively writing tombstone records to the secondary offsets topic shouldn't corrupt the offsets that a connector sees even if the current transaction (including a write to the connector-specific offsets topic) fails. We may end up writing garbage to the secondary offsets topic, but guarantees for exactly-once support are lost as soon as a connector switches over to reading exclusively from that topic, and tombstones in the secondary topic don't overwrite non-tombstone offsets for the same partition in the primary topic. That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a `Future` to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ########## @@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } + @Test Review Comment: +1 for moving these tests to a `ConnectorOffsetBackingStoreTest`; the changes to the main code are entirely contained within the `ConnectorOffsetBackingStore` class right now, and the `OffsetStorageWriter` is agnostic about which type of backing store it uses, so introducing tests specifically geared towards the `ConnectorOffsetBackingStore` in these tests is a little strange. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org