C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1268520485


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,10 +280,33 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 

Review Comment:
   Probably time to update this now?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,8 +284,51 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
+        boolean containsTombstones = values.containsValue(null);
+
+        // If there are tombstone offsets, then the failure to write to 
secondary store will
+        // not be ignored. Also, for tombstone records, we first write to 
secondary store and
+        // then to primary stores.
+        if (secondaryStore != null && containsTombstones) {
+            AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   We shouldn't be using an `AtomicReference` here. The reason it's used in the 
[linked 
snippet](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484)
 in the `ConsumerCoordinator` class is to ensure that, in the event that 
multiple errors occur, we don't overwrite the first exception that we saw.
   
   That's not a possibility here since `secondaryStoreTombstoneWriteError` is 
only ever updated in separate `catch` clauses for the same `try` block, which 
means that it's guaranteed to never be updated more than once.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,8 +284,51 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
+        boolean containsTombstones = values.containsValue(null);
+
+        // If there are tombstone offsets, then the failure to write to 
secondary store will
+        // not be ignored. Also, for tombstone records, we first write to 
secondary store and
+        // then to primary stores.
+        if (secondaryStore != null && containsTombstones) {
+            AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+            FutureCallback<Void> secondaryWriteFuture = new FutureCallback<>();
+            secondaryStore.set(values, secondaryWriteFuture);
+            try {
+                // For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+                // commits. We still need to wait because we want to fail the 
offset commit for cases when

Review Comment:
   I like the general idea here right now: block indefinitely for exactly-once, 
block within the offset timeout otherwise. We also note in the 
[docs](https://kafka.apache.org/documentation.html#connectconfigs_offset.flush.timeout.ms)
 for the `offset.flush.timeout.ms` property that it "has no effect for source 
connectors running with exactly-once support".
   
   I don't think we need to worry about placing an upper bound on the time we 
take with exactly-once support enabled. If we did, it would make tasks more 
brittle (remember, we fail tasks when offset commits fail in this mode), and 
preemptively writing tombstone records to the secondary offsets topic shouldn't 
corrupt the offsets that a connector sees even if the current transaction 
(including a write to the connector-specific offsets topic) fails. We may end 
up writing garbage to the secondary offsets topic, but guarantees for 
exactly-once support are lost as soon as a connector switches over to reading 
exclusively from that topic, and tombstones in the secondary topic don't 
overwrite non-tombstone offsets for the same partition in the primary topic.
   
   That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a `Future` to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).



##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##########
@@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception {
         flushFuture.get(1000, TimeUnit.MILLISECONDS);
     }
 
+    @Test

Review Comment:
   +1 for moving these tests to a `ConnectorOffsetBackingStoreTest`; the 
changes to the main code are entirely contained within the 
`ConnectorOffsetBackingStore` class right now, and the `OffsetStorageWriter` is 
agnostic about which type of backing store it uses, so introducing tests 
specifically geared towards the `ConnectorOffsetBackingStore` in these tests is 
a little strange.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to