Hey Alan,

I'm running into the same issue as you and I believe I've figured it out.

I noticed that consumer partitions on the destination cluster that have
a LOG-END-OFFSET=0 all exhibit this issue. It looks like
Mirrormaker's offset sync does not work correctly if the partition is
empty. Instead of setting the destination's offset to 0 for these empty
partitions, it sets destination's offset to the same numerical value as the
source's offset.

If you now send a message to these partitions, making them non-empty, the
offset will still not update until the "correct" offset is greater than the
offset on the partition. The only workaround I've found is to make these
partitions non-empty, then delete the consumer group on the destination
cluster and let Mirrormaker resync it.

Hope this helps,
Frank

On 2021/03/15 21:59:03, Alan Ning <a...@gmail.com> wrote:
> I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics
from>
> one cluster to another while preserving through>
> `sync.group.offsets.enabled=true`. My source cluster is running Kafka
0.10,>
> while the target cluster is running 2.6.1.>
>
> While I can see data being replicated, the data on the replicated
Consumer>
> Group in the target cluster looks wrong. The lag values of the
replicated>
> Consumer Group are large negative values, and the LOG-END-OFFSET are
mostly>
> 0. I determined this information from kafka-consumer-groups.sh.>
>
> I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag
JMX>
> metrics in MM2 and the reported lag is zero for all partitions.>
>
> By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will>
> automatically replicate and sync all Consumer Groups with a meaningful>
> offset in the target cluster. Am I misunderstanding how MM2 is supposed
to>
> work?>
>
> Here is my mm2.properties and the CG details.>
>
> # mm2.properties>
> ```>
> clusters = src, dst>
> src.bootstrap.servers = 10.0.0.1:9092>
> dst.bootstrap.servers = 10.0.0.2:9092>
> src->dst.enabled = true>
> src->dst.topics = compute.*>
> src->dst.offset.flush.timeout.ms=60000>
> src->dst.buffer.memory=10000>
> dst->src.enabled = true>
> dst->src.topics = .*>
> replication.factor=3>
> src->dst.sync.group.offsets.enabled = true>
> src->dst.emit.checkpoints.enabled = true>
> src->dst.consumer.auto.offset.reset=latest>
> consumer.auto.offset.reset = latest>
> auto.offset.reset = latest>
> replication.policy.class =>
> com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy>
> checkpoints.topic.replication.factor=3>
> heartbeats.topic.replication.factor=3>
> offset-syncs.topic.replication.factor=3>
> offset.storage.replication.factor=3>
> status.storage.replication.factor=3>
> config.storage.replication.factor=3>
> sync.topic.acls.enabled = false>
> sync.group.offsets.enabled = true>
> emit.checkpoints.enabled = true>
> tasks.max = 8>
> dst.producer.offset.flush.timeout.ms = 60000>
> dst.offset.flush.timeout.ms = 60000>
> ```>
>
> Consumer Group details>
> ```>
> GROUP                                         TOPIC>
>             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG>
> CONSUMER-ID     HOST            CLIENT-ID>
> kafka-group-Compute-Requests Compute-Requests 57         5305947
0>
>               -5305947        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 20         5164205
0>
>               -5164205        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 53         4208527
0>
>               -4208527        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 82         5247928
0>
>               -5247928        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 65         5574520
0>
>               -5574520        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 11         5190708>
> 209             -5190499        -               -               ->
> ```>
>
> Thanks>
>
> ... Alan>
>

Reply via email to