Hello.

Since my environment uses kafka 2.4.0 , I got over the situation by
replacing the connec-mirror-2.4.0.jar with a new file in MM2 .
Below are notes for what I have done. I am not sure if this is the right
approach but would like to share it.
1.Having below offset in my source kafka cluster,

```
for t in 001_status_info box_direction_his; do \
        kubectl exec kafka-cluster-v2-kafka-0 --context catseye-admin -n
dev -- bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
localhost:9092 --topic $t --time -1 2>/dev/null ; \
done
001_status_info:0:4
box_direction_his:0:2
```

2.Confirmed that source connector in MM2 is acknowledging
'auto.offset.reset' as 'latest',

```
2020-10-16 03:07:21,243 INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [kafka-cluster-v2-kafka-bootstrap:9092]
```

3.MirrorMaker2 seeks source topic to offset 0. In this case
'001_status_info' .

```
2020-10-16 03:07:21,542 INFO Starting with 4 previously uncommitted
partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
2020-10-16 03:07:21,546 INFO [Consumer clientId=consumer-4, groupId=null]
Seeking to offset 0 for partition 003_term_no-0
(org.apache.kafka.clients.consumer.KafkaConsumer)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
2020-10-16 03:07:21,547 INFO [Consumer clientId=consumer-4, groupId=null]
Seeking to offset 0 for partition 001_real_latest_box_info-0
(org.apache.kafka.clients.consumer.KafkaConsumer)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
2020-10-16 03:07:21,547 INFO [Consumer clientId=consumer-4, groupId=null]
Seeking to offset 0 for partition 003_term_location_info-0
(org.apache.kafka.clients.consumer.KafkaConsumer)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
2020-10-16 03:07:21,547 INFO [Consumer clientId=consumer-4, groupId=null]
Seeking to offset 0 for partition 001_status_info-0
(org.apache.kafka.clients.consumer.KafkaConsumer)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
```

4.Messages are mirrored to the target cluster.

```
y-watanabe@LAPTOP-IG41EBJ5:~/Development/kafka_2.13-2.4.0$
bin/kafka-console-consumer.sh --bootstrap-server migration-kafka-topics-.
servicebus.windows.net:9093 --topic 001_status_info --consumer.config
consumer.properties --from-beginning
{"schema": ...
{"schema": ...
{"schema": ...
{"schema": ...
^CProcessed a total of 4 messages
```

5.To get over this , I have disabled manual offset seeking by the consumer.
Fixing my code in my local environment as below.

Source code.
https://archive.apache.org/dist/kafka/2.4.0/kafka-2.4.0-src.tgz

```
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
 // Temporarily disable manual offset seek.
 // topicPartitionOffsets.forEach(consumer::seek);
```

6. Log shows that the consumer is seeking from the latest.
```
SourceConnector-0]
2020-10-16 03:39:20,542 INFO [Consumer clientId=consumer-4, groupId=null]
Cluster ID: nwLbYZzcSvS2zWEpdib0-g (org.apache.kafka.clients.Metadata)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
2020-10-16 03:39:20,548 INFO [Consumer clientId=consumer-4, groupId=null]
Resetting offset for partition 003_term_no-0 to offset 0.
(org.apache.kafka.clients.consumer.internals.SubscriptionState)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
2020-10-16 03:39:20,549 INFO [Consumer clientId=consumer-4, groupId=null]
Resetting offset for partition 003_term_location_info-0 to offset 0.
(org.apache.kafka.clients.consumer.internals.SubscriptionState)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
2020-10-16 03:39:20,550 INFO [Consumer clientId=consumer-4, groupId=null]
Resetting offset for partition 001_real_latest_box_info-0 to offset 0.
(org.apache.kafka.clients.consumer.internals.SubscriptionState)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
2020-10-16 03:39:20,550 INFO [Consumer clientId=consumer-4, groupId=null]
Resetting offset for partition 001_status_info-0 to offset 4.
(org.apache.kafka.clients.consumer.internals.SubscriptionState)
[task-thread-csp-kafka->eventhub-.MirrorSourceConnector-0]
```

7. Confirmed that messages are NOT mirrored to the target cluster.
```
y-watanabe@LAPTOP-IG41EBJ5:~/Development/kafka_2.13-2.4.0$
bin/kafka-console-consumer.sh --bootstrap-server migration-kafka-topics-.
servicebus.windows.net:9093 --topic 001_status_info --consumer.config
consumer.properties --from-beginning
^CProcessed a total of 0 messages
```

Thanks,
Yu Watanabe

On Tue, Oct 13, 2020 at 9:45 PM Yu Watanabe <yu.w.ten...@gmail.com> wrote:

> Hello.
>
> I use kafka 2.4.0 in strimzi 0.17.0 .
>
> I have set consumer.overrides.auto.offset.reset  in Mirror Maker 2 but
> Source Connector still reads from the earliest offset .
>
> I understand that this seems to be an issue and will be fixed in 2.7.0.
>
> https://issues.apache.org/jira/browse/KAFKA-10160
>
> However, I'd really need to use this option.
> Would there be any way to set 'consumer.overrides.auto.offset.reset' to
> 'latest ' in kafka 2.4.0 , perhaps by replacing the jar file ?
>
> Best Regards,
> Yu Watanabe
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis
>
>


-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis

Reply via email to