Hello. Since my environment uses kafka 2.4.0 , I got over the situation by replacing the connec-mirror-2.4.0.jar with a new file in MM2 . Below are notes for what I have done. I am not sure if this is the right approach but would like to share it. 1.Having below offset in my source kafka cluster,
``` for t in 001_status_info box_direction_his; do \ kubectl exec kafka-cluster-v2-kafka-0 --context catseye-admin -n dev -- bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $t --time -1 2>/dev/null ; \ done 001_status_info:0:4 box_direction_his:0:2 ``` 2.Confirmed that source connector in MM2 is acknowledging 'auto.offset.reset' as 'latest', ``` 2020-10-16 03:07:21,243 INFO ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [kafka-cluster-v2-kafka-bootstrap:9092] ``` 3.MirrorMaker2 seeks source topic to offset 0. In this case '001_status_info' . ``` 2020-10-16 03:07:21,542 INFO Starting with 4 previously uncommitted partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] 2020-10-16 03:07:21,546 INFO [Consumer clientId=consumer-4, groupId=null] Seeking to offset 0 for partition 003_term_no-0 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] 2020-10-16 03:07:21,547 INFO [Consumer clientId=consumer-4, groupId=null] Seeking to offset 0 for partition 001_real_latest_box_info-0 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] 2020-10-16 03:07:21,547 INFO [Consumer clientId=consumer-4, groupId=null] Seeking to offset 0 for partition 003_term_location_info-0 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] 2020-10-16 03:07:21,547 INFO [Consumer clientId=consumer-4, groupId=null] Seeking to offset 0 for partition 001_status_info-0 (org.apache.kafka.clients.consumer.KafkaConsumer) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] ``` 4.Messages are mirrored to the target cluster. ``` y-watanabe@LAPTOP-IG41EBJ5:~/Development/kafka_2.13-2.4.0$ bin/kafka-console-consumer.sh --bootstrap-server migration-kafka-topics-. servicebus.windows.net:9093 --topic 001_status_info --consumer.config consumer.properties --from-beginning {"schema": ... {"schema": ... {"schema": ... {"schema": ... ^CProcessed a total of 4 messages ``` 5.To get over this , I have disabled manual offset seeking by the consumer. Fixing my code in my local environment as below. Source code. https://archive.apache.org/dist/kafka/2.4.0/kafka-2.4.0-src.tgz ``` connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java // Temporarily disable manual offset seek. // topicPartitionOffsets.forEach(consumer::seek); ``` 6. Log shows that the consumer is seeking from the latest. ``` SourceConnector-0] 2020-10-16 03:39:20,542 INFO [Consumer clientId=consumer-4, groupId=null] Cluster ID: nwLbYZzcSvS2zWEpdib0-g (org.apache.kafka.clients.Metadata) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] 2020-10-16 03:39:20,548 INFO [Consumer clientId=consumer-4, groupId=null] Resetting offset for partition 003_term_no-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] 2020-10-16 03:39:20,549 INFO [Consumer clientId=consumer-4, groupId=null] Resetting offset for partition 003_term_location_info-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] 2020-10-16 03:39:20,550 INFO [Consumer clientId=consumer-4, groupId=null] Resetting offset for partition 001_real_latest_box_info-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] 2020-10-16 03:39:20,550 INFO [Consumer clientId=consumer-4, groupId=null] Resetting offset for partition 001_status_info-0 to offset 4. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0] ``` 7. Confirmed that messages are NOT mirrored to the target cluster. ``` y-watanabe@LAPTOP-IG41EBJ5:~/Development/kafka_2.13-2.4.0$ bin/kafka-console-consumer.sh --bootstrap-server migration-kafka-topics-. servicebus.windows.net:9093 --topic 001_status_info --consumer.config consumer.properties --from-beginning ^CProcessed a total of 0 messages ``` Thanks, Yu Watanabe On Tue, Oct 13, 2020 at 9:45 PM Yu Watanabe <yu.w.ten...@gmail.com> wrote: > Hello. > > I use kafka 2.4.0 in strimzi 0.17.0 . > > I have set consumer.overrides.auto.offset.reset in Mirror Maker 2 but > Source Connector still reads from the earliest offset . > > I understand that this seems to be an issue and will be fixed in 2.7.0. > > https://issues.apache.org/jira/browse/KAFKA-10160 > > However, I'd really need to use this option. > Would there be any way to set 'consumer.overrides.auto.offset.reset' to > 'latest ' in kafka 2.4.0 , perhaps by replacing the jar file ? > > Best Regards, > Yu Watanabe > > -- > Yu Watanabe > > linkedin: www.linkedin.com/in/yuwatanabe1/ > twitter: twitter.com/yuwtennis > > -- Yu Watanabe linkedin: www.linkedin.com/in/yuwatanabe1/ twitter: twitter.com/yuwtennis