[ https://issues.apache.org/jira/browse/KAFKA-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171612#comment-17171612 ]
Ning Zhang edited comment on KAFKA-10339 at 8/5/20, 4:47 PM: ------------------------------------------------------------- thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map<TopicPartition, Long> loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets for the consumer in WorkerSinkTask as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? was (Author: yangguo1220): thanks for the input. I think that sounds a working plan. Here is my follow-up thoughts _"when MirrorSourceTask starts, it loads initial offsets from __consumer_offsets on the target cluster."_ As the consumer is configured to pull data from source cluster, I am thinking we probably need to: (1) add a new API (called "Map<TopicPartition, Long> loadOffsets()") to [SinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java] (2) in MirrorSinkTask.java, implement/override loadOffsets() to supply the consumer offsets loaded from target cluster. (3) in [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295], when initialize the consumer, if `task.loadOffsets()` returns non empty, use the returned offsets as the starting point. _"in addition, we call addOffsetsToTransaction in order to commit offsets to the "fake" consumer group on the target cluster."_ I fully agree with the "fake" consumer group on the target cluster. I am thinking if "addOffsetsToTransaction" has been taken care by *producer.sendOffsetsToTransaction(offsetsMap, consumerGroupId);*? > MirrorMaker2 Exactly-once Semantics > ----------------------------------- > > Key: KAFKA-10339 > URL: https://issues.apache.org/jira/browse/KAFKA-10339 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Ning Zhang > Assignee: Ning Zhang > Priority: Major > Labels: needs-kip > > MirrorMaker2 is currently implemented on Kafka Connect Framework, more > specifically the Source Connector / Task, which do not provide exactly-once > semantics (EOS) out-of-the-box, as discussed in > https://github.com/confluentinc/kafka-connect-jdbc/issues/461, > https://github.com/apache/kafka/pull/5553, > https://issues.apache.org/jira/browse/KAFKA-6080 and > https://issues.apache.org/jira/browse/KAFKA-3821. Therefore MirrorMaker2 > currently does not provide EOS. -- This message was sent by Atlassian Jira (v8.3.4#803005)