
Since my environment uses kafka 2.4.0 , I got over the situation by
replacing the connec-mirror-2.4.0.jar with a new file in MM2 .
Below are notes for what I have done. I am not sure if this is the right
approach but would like to share it.
1.Having below offset in my source kafka cluster,

for t in 001_status_info box_direction_his; do \
        kubectl exec kafka-cluster-v2-kafka-0 --context catseye-admin -n
dev -- bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
localhost:9092 --topic $t --time -1 2>/dev/null ; \

2.Confirmed that source connector in MM2 is acknowledging
'auto.offset.reset' as 'latest',

2020-10-16 03:07:21,243 INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [kafka-cluster-v2-kafka-bootstrap:9092]

3.MirrorMaker2 seeks source topic to offset 0. In this case
'001_status_info' .

2020-10-16 03:07:21,542 INFO Starting with 4 previously uncommitted
partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask)
2020-10-16 03:07:21,546 INFO [Consumer clientId=consumer-4, groupId=null]
Seeking to offset 0 for partition 003_term_no-0
2020-10-16 03:07:21,547 INFO [Consumer clientId=consumer-4, groupId=null]
Seeking to offset 0 for partition 001_real_latest_box_info-0
2020-10-16 03:07:21,547 INFO [Consumer clientId=consumer-4, groupId=null]
Seeking to offset 0 for partition 003_term_location_info-0
2020-10-16 03:07:21,547 INFO [Consumer clientId=consumer-4, groupId=null]
Seeking to offset 0 for partition 001_status_info-0

4.Messages are mirrored to the target cluster.

bin/kafka-console-consumer.sh --bootstrap-server migration-kafka-topics-.
servicebus.windows.net:9093 --topic 001_status_info --consumer.config
consumer.properties --from-beginning
{"schema": ...
{"schema": ...
{"schema": ...
{"schema": ...
^CProcessed a total of 4 messages

5.To get over this , I have disabled manual offset seeking by the consumer.
Fixing my code in my local environment as below.

Source code.

 // Temporarily disable manual offset seek.
 // topicPartitionOffsets.forEach(consumer::seek);

6. Log shows that the consumer is seeking from the latest.
2020-10-16 03:39:20,542 INFO [Consumer clientId=consumer-4, groupId=null]
Cluster ID: nwLbYZzcSvS2zWEpdib0-g (org.apache.kafka.clients.Metadata)
2020-10-16 03:39:20,548 INFO [Consumer clientId=consumer-4, groupId=null]
Resetting offset for partition 003_term_no-0 to offset 0.
2020-10-16 03:39:20,549 INFO [Consumer clientId=consumer-4, groupId=null]
Resetting offset for partition 003_term_location_info-0 to offset 0.
2020-10-16 03:39:20,550 INFO [Consumer clientId=consumer-4, groupId=null]
Resetting offset for partition 001_real_latest_box_info-0 to offset 0.
2020-10-16 03:39:20,550 INFO [Consumer clientId=consumer-4, groupId=null]
Resetting offset for partition 001_status_info-0 to offset 4.

7. Confirmed that messages are NOT mirrored to the target cluster.
bin/kafka-console-consumer.sh --bootstrap-server migration-kafka-topics-.
servicebus.windows.net:9093 --topic 001_status_info --consumer.config
consumer.properties --from-beginning
^CProcessed a total of 0 messages

Yu Watanabe

On Tue, Oct 13, 2020 at 9:45 PM Yu Watanabe <yu.w.ten...@gmail.com> wrote:

> Hello.
> I use kafka 2.4.0 in strimzi 0.17.0 .
> I have set consumer.overrides.auto.offset.reset  in Mirror Maker 2 but
> Source Connector still reads from the earliest offset .
> I understand that this seems to be an issue and will be fixed in 2.7.0.
> https://issues.apache.org/jira/browse/KAFKA-10160
> However, I'd really need to use this option.
> Would there be any way to set 'consumer.overrides.auto.offset.reset' to
> 'latest ' in kafka 2.4.0 , perhaps by replacing the jar file ?
> Best Regards,
> Yu Watanabe
> --
> Yu Watanabe
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis

Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis

Reply via email to