Hey Ning,

I believe "if the CG offsets do not contain a pair of <topic, partition>,
simply sync the offsets from source" could be the problematic behavior
here? I'm not very familiar with Mirrormaker's internals, so can't say for
sure.

As I described previously, this "negative lag" problem happens when a
target partition is empty (log end offset = 0) and the source CG offset is
> 0. This scenario can be reached if a consumer processed some records
which were then deleted (eg. by retention policy), or, like Alan
encountered, when MM replication is set to start at "latest".  When this is
the case, Mirrormaker sets the target CG offset equal to the literal, not
converted, source CG offset. The correct behavior here should be to set the
target CG offset to 0 when the target partition is empty, not to the source
CG offset.

The negative lag can cause CGs to miss messages during a migration if new
messages are sent between stopping a CG on the source cluster and starting
the CG on the target cluster.


On Fri, Mar 26, 2021 at 1:52 AM Ning Zhang <ning2008w...@gmail.com> wrote:

> Hello Frank,
>
> Thanks for helping on analyzing the issue.
>
> Regarding when the CG offsets at destination cluster will be updated. From
> the current codebase, there seems 2 criteria:
>
> (1) if the CG offsets do not contain a pair of <topic, partition>, simply
> sync the offsets from source
> (2) for a pair of <topic, partition>, if the converted CG offset is
> greater than the real CG offset at destination, sync the offset from source
>
>
> https://github.com/apache/kafka/blob/c19a35d1b740c85559a7ff4e882fc95b4737808d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L270-L279
>
> If you have any suggested change, please share with me. Thanks
>
> On 2021/03/23 18:42:19, Frank Yi <f...@coursera.org> wrote:
> > Hey Alan,
> >
> > I'm running into the same issue as you and I believe I've figured it out.
> >
> > I noticed that consumer partitions on the destination cluster that have
> > a LOG-END-OFFSET=0 all exhibit this issue. It looks like
> > Mirrormaker's offset sync does not work correctly if the partition is
> > empty. Instead of setting the destination's offset to 0 for these empty
> > partitions, it sets destination's offset to the same numerical value as
> the
> > source's offset.
> >
> > If you now send a message to these partitions, making them non-empty, the
> > offset will still not update until the "correct" offset is greater than
> the
> > offset on the partition. The only workaround I've found is to make these
> > partitions non-empty, then delete the consumer group on the destination
> > cluster and let Mirrormaker resync it.
> >
> > Hope this helps,
> > Frank
> >
> > On 2021/03/15 21:59:03, Alan Ning <a...@gmail.com> wrote:
> > > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics
> > from>
> > > one cluster to another while preserving through>
> > > `sync.group.offsets.enabled=true`. My source cluster is running Kafka
> > 0.10,>
> > > while the target cluster is running 2.6.1.>
> > >
> > > While I can see data being replicated, the data on the replicated
> > Consumer>
> > > Group in the target cluster looks wrong. The lag values of the
> > replicated>
> > > Consumer Group are large negative values, and the LOG-END-OFFSET are
> > mostly>
> > > 0. I determined this information from kafka-consumer-groups.sh.>
> > >
> > > I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag
> > JMX>
> > > metrics in MM2 and the reported lag is zero for all partitions.>
> > >
> > > By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will>
> > > automatically replicate and sync all Consumer Groups with a meaningful>
> > > offset in the target cluster. Am I misunderstanding how MM2 is supposed
> > to>
> > > work?>
> > >
> > > Here is my mm2.properties and the CG details.>
> > >
> > > # mm2.properties>
> > > ```>
> > > clusters = src, dst>
> > > src.bootstrap.servers = 10.0.0.1:9092>
> > > dst.bootstrap.servers = 10.0.0.2:9092>
> > > src->dst.enabled = true>
> > > src->dst.topics = compute.*>
> > > src->dst.offset.flush.timeout.ms=60000>
> > > src->dst.buffer.memory=10000>
> > > dst->src.enabled = true>
> > > dst->src.topics = .*>
> > > replication.factor=3>
> > > src->dst.sync.group.offsets.enabled = true>
> > > src->dst.emit.checkpoints.enabled = true>
> > > src->dst.consumer.auto.offset.reset=latest>
> > > consumer.auto.offset.reset = latest>
> > > auto.offset.reset = latest>
> > > replication.policy.class =>
> > > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy>
> > > checkpoints.topic.replication.factor=3>
> > > heartbeats.topic.replication.factor=3>
> > > offset-syncs.topic.replication.factor=3>
> > > offset.storage.replication.factor=3>
> > > status.storage.replication.factor=3>
> > > config.storage.replication.factor=3>
> > > sync.topic.acls.enabled = false>
> > > sync.group.offsets.enabled = true>
> > > emit.checkpoints.enabled = true>
> > > tasks.max = 8>
> > > dst.producer.offset.flush.timeout.ms = 60000>
> > > dst.offset.flush.timeout.ms = 60000>
> > > ```>
> > >
> > > Consumer Group details>
> > > ```>
> > > GROUP                                         TOPIC>
> > >             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG>
> > > CONSUMER-ID     HOST            CLIENT-ID>
> > > kafka-group-Compute-Requests Compute-Requests 57         5305947
> > 0>
> > >               -5305947        -               -               ->
> > > kafka-group-Compute-Requests Compute-Requests 20         5164205
> > 0>
> > >               -5164205        -               -               ->
> > > kafka-group-Compute-Requests Compute-Requests 53         4208527
> > 0>
> > >               -4208527        -               -               ->
> > > kafka-group-Compute-Requests Compute-Requests 82         5247928
> > 0>
> > >               -5247928        -               -               ->
> > > kafka-group-Compute-Requests Compute-Requests 65         5574520
> > 0>
> > >               -5574520        -               -               ->
> > > kafka-group-Compute-Requests Compute-Requests 11         5190708>
> > > 209             -5190499        -               -               ->
> > > ```>
> > >
> > > Thanks>
> > >
> > > ... Alan>
> > >
> >
>

Reply via email to