Hello Frank, Happy to look into it. Do you mind to open a jira ticket and put your and Alan's observations into it, then assign it to me (also would be great to refer this link)?
I will take some time to reproduce it when available. Thanks On 2021/03/30 19:08:56, Frank Yi <f...@coursera.org> wrote: > Hey Ning, > > I believe "if the CG offsets do not contain a pair of <topic, partition>, > simply sync the offsets from source" could be the problematic behavior > here? I'm not very familiar with Mirrormaker's internals, so can't say for > sure. > > As I described previously, this "negative lag" problem happens when a > target partition is empty (log end offset = 0) and the source CG offset is > > 0. This scenario can be reached if a consumer processed some records > which were then deleted (eg. by retention policy), or, like Alan > encountered, when MM replication is set to start at "latest". When this is > the case, Mirrormaker sets the target CG offset equal to the literal, not > converted, source CG offset. The correct behavior here should be to set the > target CG offset to 0 when the target partition is empty, not to the source > CG offset. > > The negative lag can cause CGs to miss messages during a migration if new > messages are sent between stopping a CG on the source cluster and starting > the CG on the target cluster. > > > On Fri, Mar 26, 2021 at 1:52 AM Ning Zhang <ning2008w...@gmail.com> wrote: > > > Hello Frank, > > > > Thanks for helping on analyzing the issue. > > > > Regarding when the CG offsets at destination cluster will be updated. From > > the current codebase, there seems 2 criteria: > > > > (1) if the CG offsets do not contain a pair of <topic, partition>, simply > > sync the offsets from source > > (2) for a pair of <topic, partition>, if the converted CG offset is > > greater than the real CG offset at destination, sync the offset from source > > > > > > https://github.com/apache/kafka/blob/c19a35d1b740c85559a7ff4e882fc95b4737808d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L270-L279 > > > > If you have any suggested change, please share with me. Thanks > > > > On 2021/03/23 18:42:19, Frank Yi <f...@coursera.org> wrote: > > > Hey Alan, > > > > > > I'm running into the same issue as you and I believe I've figured it out. > > > > > > I noticed that consumer partitions on the destination cluster that have > > > a LOG-END-OFFSET=0 all exhibit this issue. It looks like > > > Mirrormaker's offset sync does not work correctly if the partition is > > > empty. Instead of setting the destination's offset to 0 for these empty > > > partitions, it sets destination's offset to the same numerical value as > > the > > > source's offset. > > > > > > If you now send a message to these partitions, making them non-empty, the > > > offset will still not update until the "correct" offset is greater than > > the > > > offset on the partition. The only workaround I've found is to make these > > > partitions non-empty, then delete the consumer group on the destination > > > cluster and let Mirrormaker resync it. > > > > > > Hope this helps, > > > Frank > > > > > > On 2021/03/15 21:59:03, Alan Ning <a...@gmail.com> wrote: > > > > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics > > > from> > > > > one cluster to another while preserving through> > > > > `sync.group.offsets.enabled=true`. My source cluster is running Kafka > > > 0.10,> > > > > while the target cluster is running 2.6.1.> > > > > > > > > While I can see data being replicated, the data on the replicated > > > Consumer> > > > > Group in the target cluster looks wrong. The lag values of the > > > replicated> > > > > Consumer Group are large negative values, and the LOG-END-OFFSET are > > > mostly> > > > > 0. I determined this information from kafka-consumer-groups.sh.> > > > > > > > > I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag > > > JMX> > > > > metrics in MM2 and the reported lag is zero for all partitions.> > > > > > > > > By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will> > > > > automatically replicate and sync all Consumer Groups with a meaningful> > > > > offset in the target cluster. Am I misunderstanding how MM2 is supposed > > > to> > > > > work?> > > > > > > > > Here is my mm2.properties and the CG details.> > > > > > > > > # mm2.properties> > > > > ```> > > > > clusters = src, dst> > > > > src.bootstrap.servers = 10.0.0.1:9092> > > > > dst.bootstrap.servers = 10.0.0.2:9092> > > > > src->dst.enabled = true> > > > > src->dst.topics = compute.*> > > > > src->dst.offset.flush.timeout.ms=60000> > > > > src->dst.buffer.memory=10000> > > > > dst->src.enabled = true> > > > > dst->src.topics = .*> > > > > replication.factor=3> > > > > src->dst.sync.group.offsets.enabled = true> > > > > src->dst.emit.checkpoints.enabled = true> > > > > src->dst.consumer.auto.offset.reset=latest> > > > > consumer.auto.offset.reset = latest> > > > > auto.offset.reset = latest> > > > > replication.policy.class => > > > > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy> > > > > checkpoints.topic.replication.factor=3> > > > > heartbeats.topic.replication.factor=3> > > > > offset-syncs.topic.replication.factor=3> > > > > offset.storage.replication.factor=3> > > > > status.storage.replication.factor=3> > > > > config.storage.replication.factor=3> > > > > sync.topic.acls.enabled = false> > > > > sync.group.offsets.enabled = true> > > > > emit.checkpoints.enabled = true> > > > > tasks.max = 8> > > > > dst.producer.offset.flush.timeout.ms = 60000> > > > > dst.offset.flush.timeout.ms = 60000> > > > > ```> > > > > > > > > Consumer Group details> > > > > ```> > > > > GROUP TOPIC> > > > > PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG> > > > > CONSUMER-ID HOST CLIENT-ID> > > > > kafka-group-Compute-Requests Compute-Requests 57 5305947 > > > 0> > > > > -5305947 - - -> > > > > kafka-group-Compute-Requests Compute-Requests 20 5164205 > > > 0> > > > > -5164205 - - -> > > > > kafka-group-Compute-Requests Compute-Requests 53 4208527 > > > 0> > > > > -4208527 - - -> > > > > kafka-group-Compute-Requests Compute-Requests 82 5247928 > > > 0> > > > > -5247928 - - -> > > > > kafka-group-Compute-Requests Compute-Requests 65 5574520 > > > 0> > > > > -5574520 - - -> > > > > kafka-group-Compute-Requests Compute-Requests 11 5190708> > > > > 209 -5190499 - - -> > > > > ```> > > > > > > > > Thanks> > > > > > > > > ... Alan> > > > > > > > > > >