urbandan commented on code in PR #13429: URL: https://github.com/apache/kafka/pull/13429#discussion_r1175076558
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ########## @@ -139,10 +171,103 @@ public void close() { protected void handleRecord(ConsumerRecord<byte[], byte[]> record) { OffsetSync offsetSync = OffsetSync.deserializeRecord(record); TopicPartition sourceTopicPartition = offsetSync.topicPartition(); - offsetSyncs.put(sourceTopicPartition, offsetSync); + offsetSyncs.computeIfAbsent(sourceTopicPartition, ignored -> createInitialSyncs(offsetSync)); + offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) -> updateExistingSyncs(syncs, offsetSync)); + } + + private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) { + // Make a copy of the array before mutating it, so that readers do not see inconsistent data + // TODO: batch updates so that this copy can be performed less often for high-volume sync topics. + OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, Long.SIZE); + updateSyncArray(mutableSyncs, offsetSync); + if (log.isTraceEnabled()) { + StringBuilder stateString = new StringBuilder(); + stateString.append("["); + for (int i = 0; i < Long.SIZE; i++) { + if (i != 0) { + stateString.append(","); + } + if (i == 0 || i == Long.SIZE - 1 || mutableSyncs[i] != mutableSyncs[i - 1]) { + // Print only if the sync is interesting, a series of repeated syncs will appear as ,,,,, + stateString.append(mutableSyncs[i].upstreamOffset()); + stateString.append(":"); + stateString.append(mutableSyncs[i].downstreamOffset()); + } + } + stateString.append("]"); + log.trace("New sync {} applied, new state is {}", offsetSync, stateString); + } + return mutableSyncs; + } + + private OffsetSync[] createInitialSyncs(OffsetSync firstSync) { + OffsetSync[] syncs = new OffsetSync[Long.SIZE]; + clearSyncArray(syncs, firstSync); + return syncs; + } + + private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + for (int i = 0; i < Long.SIZE; i++) { + syncs[i] = offsetSync; + } + } + + private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) { + long upstreamOffset = offsetSync.upstreamOffset(); + // Old offsets are invalid, so overwrite them all. + if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) { Review Comment: @gharris1727 I agree with almost all of what you said, but I would like to focus on the last point: > > Based on our discussion so far, it seems that monotonicity will not allow us to implement "old offset checkpointing after restart", which is not ideal. Worst case of breaking monotonicity is sub-optimal failover and extra re-processing. Worst case for not translating old offsets after restart is never checkpointing a group - then, with the default consumer config (auto.offset.reset=latest), failover results in data loss. > > I don't think the auto.offset.reset=latest situation is a fair criticism of this PR, as that can happen with or without this change. And actually, it's more likely to happen without this change, as the translation window is smaller. Sorry, I'm not trying to critique this PR with these points, but to critique the monotonicity requirement. My understanding is that this PR would be the one making the monotonicity guarantee "official", and I'm worried that if it is indeed a tradeoff between monotonicity and old offset translation, we already made a choice, making the latter impossible. I am interested in the future of offset translation and checkpointing, and I can think of possible solutions, but (again, based on my current understanding) those will be limited if we have to keep monotonicity as a guarantee. As an extra note, I'm familiar with the situation behind KAFKA-13659, and in that case, the symptom triggering the investigation was not non-monotonic checkpoints, but the weird "feature" of copying source offsets as-is when there was no available offset-sync, which caused negative lag in the target cluster. The checkpoints were monotonic, but they didn't make sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org