Thanks Alan for the investigation and bug report! On Thu, Mar 25, 2021, 3:25 PM Alan Ning <askl...@gmail.com> wrote:
> Another update on this. I am pretty sure I have found a bug in > MirrorSourceTask. The details are written in > https://issues.apache.org/jira/browse/KAFKA-12558. I hope this helps > others > who have encountered this issue. > > ... Alan > > > > On Thu, Mar 25, 2021 at 9:40 AM Alan Ning <askl...@gmail.com> wrote: > > > Just an update on this. It has been an adventure. > > > > Sam, I think you are right that `consumer.auto.offset.reset:latest` does > > not work. However, I was also seeing issues with the default behavior > > (which is consumer.auto.offset.reset:earliest). After a lot of trial and > > error, I have found that my MM2 setup works a lot better when I give it > > more resources and tasks. > > > > I am mirroring about 1600 partitions, running on AWS M5.4xl with > > tasks.max=8. With this setup, I will always get a lot of negative offset. > > In a sense, it really just means the mirroring tasks have stalled > silently > > and can't make progress. There was no warning or errors in the logs. > > > > I bumped the system up to C5.16xl with tasks.max=180 and suddenly > > everything runs a lot smoother. The CG offset will start at negative, and > > will eventually catch up to 0 (or positive). > > > > It seems like there is a sweet spot for partitions per task, but I have > > yet to find any documentation for it. > > > > ... Alan > > > > > > > > > > On Wed, Mar 17, 2021 at 5:49 PM Alan Ning <askl...@gmail.com> wrote: > > > >> OK. I follow now. Let me try to re-test to see if it makes a difference. > >> > >> Thanks. > >> > >> ... Alan > >> > >> On Wed, Mar 17, 2021 at 5:46 PM Samuel Cantero <scante...@gmail.com> > >> wrote: > >> > >>> I've found that bug the hard way. FWIW I've migrated several clusters > >>> from > >>> kafka 0.10 to kafka 2.x using mm2. So offsets sync work fine for kafka > >>> 0.10. > >>> > >>> Best, > >>> > >>> On Wed, Mar 17, 2021 at 6:43 PM Samuel Cantero <scante...@gmail.com> > >>> wrote: > >>> > >>> > No, what I meant is that offsets sync won't work if > >>> > `consumer.auto.offset.reset:latest` (it was not talking about that > >>> > particular bug). Try setting `consumer.auto.offset.reset:earliest` > and > >>> do > >>> > verify if offsets are sync'd correctly. > >>> > > >>> > Best, > >>> > > >>> > On Wed, Mar 17, 2021 at 6:42 PM Alan Ning <askl...@gmail.com> wrote: > >>> > > >>> >> Hey Samuel, > >>> >> > >>> >> I am aware of that `consumer.auto.offset.reset:latest` problem. It > was > >>> >> because this PR > >>> >> <https://github.com/apache/kafka/pull/8921#issuecomment-797598156> > >>> never > >>> >> made it to trunk. I patched MM2 locally for 2.7 so that `latest` > >>> offset > >>> >> will work. > >>> >> > >>> >> ... Alan > >>> >> > >>> >> On Wed, Mar 17, 2021 at 4:50 PM Samuel Cantero <scante...@gmail.com > > > >>> >> wrote: > >>> >> > >>> >> > I've seen this before. I've found that consumer offsets sync does > >>> not > >>> >> work > >>> >> > with `consumer.auto.offset.reset:latest`. If you set this to > >>> earliest, > >>> >> then > >>> >> > it should work. One way to workaround the need to start from > >>> earliest > >>> >> is by > >>> >> > starting with latest and once mirroring is ongoing swap to > earliest. > >>> >> This > >>> >> > won't affect mirroring as the mm2 consumers will resume from the > >>> last > >>> >> > committed offsets. > >>> >> > > >>> >> > Best, > >>> >> > > >>> >> > On Wed, Mar 17, 2021 at 5:27 PM Ning Zhang < > ning2008w...@gmail.com> > >>> >> wrote: > >>> >> > > >>> >> > > Hello Alan, > >>> >> > > > >>> >> > > I may probably see the similar case. One quick validation that > >>> could > >>> >> be > >>> >> > > run is to test on the source cluster with higher Kafka version. > If > >>> >> still > >>> >> > > not working, please email me and I could introduce you to person > >>> who > >>> >> may > >>> >> > > have similar case before. > >>> >> > > > >>> >> > > On 2021/03/15 21:59:03, Alan Ning <askl...@gmail.com> wrote: > >>> >> > > > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all > >>> topics > >>> >> > from > >>> >> > > > one cluster to another while preserving through > >>> >> > > > `sync.group.offsets.enabled=true`. My source cluster is > running > >>> >> Kafka > >>> >> > > 0.10, > >>> >> > > > while the target cluster is running 2.6.1. > >>> >> > > > > >>> >> > > > While I can see data being replicated, the data on the > >>> replicated > >>> >> > > Consumer > >>> >> > > > Group in the target cluster looks wrong. The lag values of the > >>> >> > replicated > >>> >> > > > Consumer Group are large negative values, and the > >>> LOG-END-OFFSET are > >>> >> > > mostly > >>> >> > > > 0. I determined this information from > kafka-consumer-groups.sh. > >>> >> > > > > >>> >> > > > I checked the > >>> >> kafka_consumer_consumer_fetch_manager_metrics_records_lag > >>> >> > > JMX > >>> >> > > > metrics in MM2 and the reported lag is zero for all > partitions. > >>> >> > > > > >>> >> > > > By using `sync.group.offsets.enabled=true`, I envisioned that > >>> MM2 > >>> >> will > >>> >> > > > automatically replicate and sync all Consumer Groups with a > >>> >> meaningful > >>> >> > > > offset in the target cluster. Am I misunderstanding how MM2 is > >>> >> supposed > >>> >> > > to > >>> >> > > > work? > >>> >> > > > > >>> >> > > > Here is my mm2.properties and the CG details. > >>> >> > > > > >>> >> > > > # mm2.properties > >>> >> > > > ``` > >>> >> > > > clusters = src, dst > >>> >> > > > src.bootstrap.servers = 10.0.0.1:9092 > >>> >> > > > dst.bootstrap.servers = 10.0.0.2:9092 > >>> >> > > > src->dst.enabled = true > >>> >> > > > src->dst.topics = compute.* > >>> >> > > > src->dst.offset.flush.timeout.ms=60000 > >>> >> > > > src->dst.buffer.memory=10000 > >>> >> > > > dst->src.enabled = true > >>> >> > > > dst->src.topics = .* > >>> >> > > > replication.factor=3 > >>> >> > > > src->dst.sync.group.offsets.enabled = true > >>> >> > > > src->dst.emit.checkpoints.enabled = true > >>> >> > > > src->dst.consumer.auto.offset.reset=latest > >>> >> > > > consumer.auto.offset.reset = latest > >>> >> > > > auto.offset.reset = latest > >>> >> > > > replication.policy.class = > >>> >> > > > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy > >>> >> > > > checkpoints.topic.replication.factor=3 > >>> >> > > > heartbeats.topic.replication.factor=3 > >>> >> > > > offset-syncs.topic.replication.factor=3 > >>> >> > > > offset.storage.replication.factor=3 > >>> >> > > > status.storage.replication.factor=3 > >>> >> > > > config.storage.replication.factor=3 > >>> >> > > > sync.topic.acls.enabled = false > >>> >> > > > sync.group.offsets.enabled = true > >>> >> > > > emit.checkpoints.enabled = true > >>> >> > > > tasks.max = 8 > >>> >> > > > dst.producer.offset.flush.timeout.ms = 60000 > >>> >> > > > dst.offset.flush.timeout.ms = 60000 > >>> >> > > > ``` > >>> >> > > > > >>> >> > > > Consumer Group details > >>> >> > > > ``` > >>> >> > > > GROUP TOPIC > >>> >> > > > PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG > >>> >> > > > CONSUMER-ID HOST CLIENT-ID > >>> >> > > > kafka-group-Compute-Requests Compute-Requests 57 > 5305947 > >>> >> > > 0 > >>> >> > > > -5305947 - - > - > >>> >> > > > kafka-group-Compute-Requests Compute-Requests 20 > 5164205 > >>> >> > > 0 > >>> >> > > > -5164205 - - > - > >>> >> > > > kafka-group-Compute-Requests Compute-Requests 53 > 4208527 > >>> >> > > 0 > >>> >> > > > -4208527 - - > - > >>> >> > > > kafka-group-Compute-Requests Compute-Requests 82 > 5247928 > >>> >> > > 0 > >>> >> > > > -5247928 - - > - > >>> >> > > > kafka-group-Compute-Requests Compute-Requests 65 > 5574520 > >>> >> > > 0 > >>> >> > > > -5574520 - - > - > >>> >> > > > kafka-group-Compute-Requests Compute-Requests 11 > 5190708 > >>> >> > > > 209 -5190499 - - > >>> - > >>> >> > > > ``` > >>> >> > > > > >>> >> > > > Thanks > >>> >> > > > > >>> >> > > > ... Alan > >>> >> > > > > >>> >> > > > >>> >> > > >>> >> > >>> > > >>> > >> >