urbandan commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1175076558


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +171,103 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> 
createInitialSyncs(offsetSync));
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> 
updateExistingSyncs(syncs, offsetSync));
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync 
offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not 
see inconsistent data
+        // TODO: batch updates so that this copy can be performed less often 
for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            StringBuilder stateString = new StringBuilder();
+            stateString.append("[");
+            for (int i = 0; i < Long.SIZE; i++) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != 
mutableSyncs[i - 1]) {
+                    // Print only if the sync is interesting, a series of 
repeated syncs will appear as ,,,,,
+                    stateString.append(mutableSyncs[i].upstreamOffset());
+                    stateString.append(":");
+                    stateString.append(mutableSyncs[i].downstreamOffset());
+                }
+            }
+            stateString.append("]");
+            log.trace("New sync {} applied, new state is {}", offsetSync, 
stateString);
+        }
+        return mutableSyncs;
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[Long.SIZE];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        for (int i = 0; i < Long.SIZE; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {

Review Comment:
   @gharris1727 I agree with almost all of what you said, but I would like to 
focus on the last point:
   
   > > Based on our discussion so far, it seems that monotonicity will not 
allow us to implement "old offset checkpointing after restart", which is not 
ideal. Worst case of breaking monotonicity is sub-optimal failover and extra 
re-processing. Worst case for not translating old offsets after restart is 
never checkpointing a group - then, with the default consumer config 
(auto.offset.reset=latest), failover results in data loss.
   > 
   > I don't think the auto.offset.reset=latest situation is a fair criticism 
of this PR, as that can happen with or without this change. And actually, it's 
more likely to happen without this change, as the translation window is smaller.
   
   Sorry, I'm not trying to critique this PR with these points, but to critique 
the monotonicity requirement. My understanding is that this PR would be the one 
making the monotonicity guarantee "official", and I'm worried that if it is 
indeed a tradeoff between monotonicity and old offset translation, we already 
made a choice, making the latter impossible. I am interested in the future of 
offset translation and checkpointing, and I can think of possible solutions, 
but (again, based on my current understanding) those will be limited if we have 
to keep monotonicity as a guarantee.
   
   As an extra note, I'm familiar with the situation behind KAFKA-13659, and in 
that case, the symptom triggering the investigation was not non-monotonic 
checkpoints, but the weird "feature" of copying source offsets as-is when there 
was no available offset-sync, which caused negative lag in the target cluster. 
The checkpoints were monotonic, but they didn't make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to