Vishal, you should be able to override the properties passed to the internal workers using properties like A->B.consumer.auto.offset.reset or A.consumer.auto.offset.reset in the mm2.properties file. Certain top-level properties like tasks.max are honored without the A->B or A prefix, but auto.offset.reset is not one of them.
Ryanne On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Hey Ryanne, > > > How do I override auto.offset.reset = latest for consumers through > mm2.properties. I have tried straight up . auto.offset.reset and consumer. > auto.offset.reset but it defaults to earliest.. I do have a query in > another thread but though you might know off hand.. > > I would imagine there is some way in general of overriding consumer and > producer configs through mm2.properties in MM2 ? > > Regards. > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi <vishal.santo...@gmail.com > > > wrote: > > > Thank you so much for all your help. Will keep you posted on tests I > do.. > > I hope this is helpful to other folks too.. > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan <ryannedo...@gmail.com> > > wrote: > > > >> That's right. MM2 is at-least-once for now, same as legacy MirrorMaker. > >> You > >> can follow https://issues.apache.org/jira/browse/KAFKA-6080 for updates > >> on > >> exactly-once semantics in Connect. > >> > >> Ryanne > >> > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi < > >> vishal.santo...@gmail.com> > >> wrote: > >> > >> > >> You are correct. I'm working on a KIP and PoC to introduce > >> > transactions to > >> > >> Connect for this exact purpose :) > >> > > >> > That is awesome. Any time frame ? > >> > > >> > > >> > In the mean time the SLA as of now > >> > > >> > 1. It is conceivable that we flush the producer to the target cluster > >> but > >> > fail to offset commit. If there was a restart before the next > successful > >> > offset commit, there will be duplicates and a part of data is > >> replayed ( > >> > at least once ) ? > >> > > >> > 2. The same can be said about partial flushes, though am not sure > about > >> > how kafka addresses flush ( Is a flush either success or a failure, > and > >> > nothing in between ) > >> > > >> > Thanks.. > >> > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan <ryannedo...@gmail.com> > >> > wrote: > >> > > >> > > Hey Vishal, glad to hear you're making progress. > >> > > > >> > > > 1. It seems though that flushing [...] the producer and setting > the > >> > > > offset to the compacting topic is not atomic OR do we use > >> > > > transactions here ? > >> > > > >> > > You are correct. I'm working on a KIP and PoC to introduce > >> transactions > >> > to > >> > > Connect for this exact purpose :) > >> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ), and I have 2 > >> topics > >> > > with > >> > > > 1 partition each. Do I assume this right, as in there are 4 > consumer > >> > > groups > >> > > > ( on CG per thread ) ... > >> > > > >> > > Some details here: > >> > > - tasks.max controls the maximum number of tasks created per > Connector > >> > > instance. Both MirrorSourceConnector and MirrorCheckpointConnector > >> will > >> > > create multiple tasks (up to tasks.max), but > MirrorHeartbeatConnector > >> > only > >> > > ever creates a single task. Moreover, there cannot be more tasks > than > >> > > topic-partitions (for MirrorSourceConnector) or consumer groups (for > >> > > MirrorCheckpointConnector). So if you have two topics with one > >> partition > >> > > each and 1 consumer group total, you'll have two > MirrorSourceConnector > >> > > tasks, one MirrorHeartbeatConnector task, and one > >> > MirrorCheckpointConnector > >> > > tasks, for a total of four. And that's in one direction only: if you > >> have > >> > > multiple source->target herders enabled, each will create tasks > >> > > independently. > >> > > - There are no consumer groups in MM2, technically. The Connect > >> framework > >> > > uses the Coordinator API and internal topics to divide tasks among > >> > workers > >> > > -- not a consumer group per se. The MM2 connectors use the assign() > >> API, > >> > > not the subscribe() API, so there are no consumer groups there > >> either. In > >> > > fact, they don't commit() either. This is nice, as it eliminates a > >> lot of > >> > > the rebalancing problems legacy MirrorMaker has been plagued with. > >> With > >> > > MM2, rebalancing only occurs when the number of workers changes or > >> when > >> > the > >> > > assignments change (e.g. new topics are discovered). > >> > > > >> > > Ryanne > >> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal Santoshi < > >> > > vishal.santo...@gmail.com> > >> > > wrote: > >> > > > >> > > > Hey Ryanne, > >> > > > > >> > > > > >> > > > The test was on topics that had a 7 day retention. > Which > >> > > > generally implies that the batch size for flush is pretty high ( > >> till > >> > the > >> > > > consumption becomes current ). The offset.flush.timeout.ms > >> defaults > >> > to > >> > > 5 > >> > > > seconds and the code will not send in the offsets if the flush is > >> not > >> > > > complete. Increasing that time out did solve the "not sending the > >> > offset > >> > > to > >> > > > topic" issue. > >> > > > > >> > > > Two questions ( I am being greedy here :) ) > >> > > > > >> > > > 1. It seems though that flushing the flushing the producer and > >> setting > >> > > the > >> > > > offset to the compacting topic is not atomic OR do we use > >> > > > transactions here ? > >> > > > > >> > > > 2. I see > >> > > > > >> > > > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 956435 > >> > > > > >> > > > WorkerSourceTask{id=MirrorSourceConnector-1} flushing 356251 > >> > > > > >> > > > WorkerSourceTask{id=MirrorCheckpointConnector-2} flushing 0 > >> > > > > >> > > > WorkerSourceTask{id=MirrorCheckpointConnector-3} flushing 0 > >> > outstanding > >> > > > messages > >> > > > > >> > > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ), and I have 2 > >> topics > >> > > with > >> > > > 1 partition each. Do I assume this right, as in there are 4 > consumer > >> > > groups > >> > > > ( on CG per thread ) ... > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > THANKS A LOT > >> > > > > >> > > > > >> > > > Vishal. > >> > > > > >> > > > > >> > > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan < > ryannedo...@gmail.com > >> > > >> > > > wrote: > >> > > > > >> > > > > > timed out > >> > > > > while waiting for producer to flush outstanding > >> > > > > > >> > > > > Yeah, that's what I'd expect to see if Connect was unable to > send > >> > > records > >> > > > > to the downstream remote topics, e.g. if min.in-sync.replicas > were > >> > > > > misconfigured. Given some data seems to arrive, it's possible > that > >> > > > > everything is configured correctly but with too much latency to > >> > > > > successfully commit within the default timeouts. You may want to > >> > > increase > >> > > > > the number of tasks substantially to achieve more parallelism > and > >> > > > > throughput. > >> > > > > > >> > > > > Ryanne > >> > > > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi < > >> > > vishal.santo...@gmail.com > >> > > > > > >> > > > > wrote: > >> > > > > > >> > > > > > Aah no.. this is more to it. Note sure if related to the > above. > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114 > >> > > > > > > >> > > > > > > >> > > > > > Is timing out based on > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133 > >> > > > > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR > >> > > > > > WorkerSourceTask{id=MirrorSourceConnector-0} Failed to flush, > >> timed > >> > > out > >> > > > > > while waiting for producer to flush outstanding 36478 messages > >> > > > > > (org.apache.kafka.connect.runtime.WorkerSourceTask:423) > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi < > >> > > > > vishal.santo...@gmail.com > >> > > > > > > > >> > > > > > wrote: > >> > > > > > > >> > > > > > > I think this might be it.. Could you confirm. It seems to be > >> on > >> > the > >> > > > > path > >> > > > > > > to commit the offsets.. but not sure... > >> > > > > > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR Scheduler for > >> > MirrorSourceConnector > >> > > > > > caught > >> > > > > > > exception in scheduled task: syncing topic ACLs > >> > > > > > > (org.apache.kafka.connect.mirror.Scheduler:102) > >> > > > > > > > >> > > > > > > java.util.concurrent.ExecutionException: > >> > > > > > > org.apache.kafka.common.errors.SecurityDisabledException: No > >> > > > Authorizer > >> > > > > > is > >> > > > > > > configured on the broker > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > >> > > > > > > > >> > > > > > > at > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > >> > > > > > > > >> > > > > > > at java.lang.Thread.run(Thread.java:748) > >> > > > > > > > >> > > > > > > Caused by: > >> > > org.apache.kafka.common.errors.SecurityDisabledException: > >> > > > No > >> > > > > > > Authorizer is configured on the broker > >> > > > > > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan < > >> > > ryannedo...@gmail.com > >> > > > > > >> > > > > > > wrote: > >> > > > > > > > >> > > > > > >> > I do not have a single record in the offsets topic > >> > > > > > >> > >> > > > > > >> That's definitely not normal. You are correct that without > >> > records > >> > > > in > >> > > > > > that > >> > > > > > >> topic, MM2 will restart from EARLIEST. The offsets should > be > >> > > stored > >> > > > > > >> periodically and whenever the connectors gracefully > shutdown > >> or > >> > > > > restart. > >> > > > > > >> > >> > > > > > >> Is it possible the topics don't have required ACLs or > >> something? > >> > > > Also > >> > > > > > >> note: > >> > > > > > >> Connect wants the offsets topic to have a large number of > >> > > partitions > >> > > > > and > >> > > > > > >> to > >> > > > > > >> be compacted. Though I can't imagine either would prevent > >> > commits > >> > > > from > >> > > > > > >> being sent. > >> > > > > > >> > >> > > > > > >> Ryanne > >> > > > > > >> > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal Santoshi < > >> > > > > > >> vishal.santo...@gmail.com> > >> > > > > > >> wrote: > >> > > > > > >> > >> > > > > > >> > 2nd/restore issue ( I think I need to solve the offsets > >> topic > >> > > > issue > >> > > > > > >> > before I go with the scale up and down issue ) > >> > > > > > >> > > >> > > > > > >> > As you had indicated, I went ahead and created the > offsets > >> > > topic. > >> > > > > The > >> > > > > > >> > status of the cluster ( destination ) is thus > >> > > > > > >> > > >> > > > > > >> > opic# Partitions# BrokersBrokers Spread %Brokers Skew > >> %Brokers > >> > > > > Leader > >> > > > > > >> > Skew %# ReplicasUnder Replicated %Leader SizeProducer > >> > > > > > Message/SecSummed > >> > > > > > >> > Recent Offsets > >> > > > > > >> > s8k.checkpoints.internal > >> > > > > > >> > < > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal > >> > > > > > >> > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 > >> > > > > > >> > s8k.act_search_page > >> > > > > > >> > < > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page > >> > > > > > >> > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842 > >> > > > > > >> > s8k.act_reach > >> > > > > > >> > < > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach > >> > > > > > >> > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529 > >> > > > > > >> > mm2-status.s8k.internal > >> > > > > > >> > < > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal > >> > > > > > >> > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10 > >> > > > > > >> > mm2-offsets.s8k_test.internal > >> > > > > > >> > < > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal > >> > > > > > >> > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 > >> > > > > > >> > mm2-offsets.s8k.internal > >> > > > > > >> > < > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal > >> > > > > > >> > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 > >> > > > > > >> > mm2-configs.s8k.internal > >> > > > > > >> > < > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal > >> > > > > > >> > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13 > >> > > > > > >> > > >> > > > > > >> > You can see . that we have the 5 ( I created bot the > >> offsets, > >> > > to > >> > > > be > >> > > > > > >> safe > >> > > > > > >> > for the below ) > >> > > > > > >> > > >> > > > > > >> > *clusters = s8k, s8k_test* > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > *s8k.bootstrap.servers = .....* > >> > > > > > >> > > >> > > > > > >> > *s8k_test.bootstrap.servers = ......* > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > *# only allow replication dr1 -> dr2* > >> > > > > > >> > > >> > > > > > >> > *s8k->s8k_test.enabled = true* > >> > > > > > >> > > >> > > > > > >> > *s8k->s8k_test.topics = act_search_page|act_reach* > >> > > > > > >> > > >> > > > > > >> > *s8k->s8k_test.emit.heartbeats.enabled = false* > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > *s8k_test->s8k.enabled = false* > >> > > > > > >> > > >> > > > > > >> > *s8k_test->s8k.emit.heartbeats.enabled = false* > >> > > > > > >> > > >> > > > > > >> > *s8k_test.replication.factor = 3* > >> > > > > > >> > > >> > > > > > >> > *s8k.replication.factor = 3* > >> > > > > > >> > > >> > > > > > >> > *offsets.storage.replication.factor = 3* > >> > > > > > >> > > >> > > > > > >> > *replication.factor = 3* > >> > > > > > >> > > >> > > > > > >> > *replication.policy.separator = .* > >> > > > > > >> > > >> > > > > > >> > *tasks.max = 4* > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > What seems strange is that I do not have a single record > in > >> > the > >> > > > > > offsets > >> > > > > > >> > topic.. Is that normal ? I would imagine that without a > >> > > record, > >> > > > > > there > >> > > > > > >> is > >> > > > > > >> > no way that a restore would happen.... And that is > obvious > >> > when > >> > > I > >> > > > > > >> restart > >> > > > > > >> > the mm2 instance... Find the screenshot attached. In > >> essence > >> > the > >> > > > > > latency > >> > > > > > >> > avg lag is reset \when the mm2 instance is reset > >> indicating no > >> > > > > restore > >> > > > > > >> but > >> > > > > > >> > restart from EARLIEST... I must be missing some thing > >> simple > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne Dolan < > >> > > > ryannedo...@gmail.com > >> > > > > > > >> > > > > > >> > wrote: > >> > > > > > >> > > >> > > > > > >> >> Vishal, the first issue is easy: you must set tasks.max > to > >> > > > > something > >> > > > > > >> above > >> > > > > > >> >> 1 (the default) in order to achieve any parallelism. > This > >> > > > property > >> > > > > is > >> > > > > > >> >> passed along to the internal Connect workers. It's > >> > unfortunate > >> > > > that > >> > > > > > >> >> Connect > >> > > > > > >> >> is not smart enough to default this property to the > >> number of > >> > > > > > workers. > >> > > > > > >> I > >> > > > > > >> >> suspect that will improve before long. > >> > > > > > >> >> > >> > > > > > >> >> For the second issue, is it possible you are missing the > >> > > offsets > >> > > > > > >> topic? It > >> > > > > > >> >> should exist alongside the config and status topics. > >> Connect > >> > > > should > >> > > > > > >> create > >> > > > > > >> >> this topic, but there are various reasons this can fail, > >> e.g. > >> > > if > >> > > > > the > >> > > > > > >> >> replication factor is misconfigured. You can try > creating > >> > this > >> > > > > topic > >> > > > > > >> >> manually or changing offsets.storage.replication.factor. > >> > > > > > >> >> > >> > > > > > >> >> Ryanne > >> > > > > > >> >> > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal Santoshi < > >> > > > > > >> vishal.santo...@gmail.com> > >> > > > > > >> >> wrote: > >> > > > > > >> >> > >> > > > > > >> >> > Using > >> > > > https://github.com/apache/kafka/tree/trunk/connect/mirror > >> > > > > > as a > >> > > > > > >> >> > guide, > >> > > > > > >> >> > I have build from source the origin/KIP-382 of > >> > > > > > >> >> > https://github.com/apache/kafka.git. > >> > > > > > >> >> > > >> > > > > > >> >> > I am seeing 2 issues > >> > > > > > >> >> > > >> > > > > > >> >> > * I brought up 2 processes on 2 different nodes ( they > >> are > >> > > > > actually > >> > > > > > >> >> pods on > >> > > > > > >> >> > k8s but that should not matter ). They share the > >> > > mm2.properties > >> > > > > > file > >> > > > > > >> and > >> > > > > > >> >> > are replicating ( 1-way ) 3 topics with 8 partitions > in > >> > > total. > >> > > > > > That > >> > > > > > >> >> seems > >> > > > > > >> >> > to be the way to create a standalone mm2 cluster. I do > >> not > >> > > > > however > >> > > > > > >> see( > >> > > > > > >> >> at > >> > > > > > >> >> > least the mbeans do not show ) any attempt to > rebalance. > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process > >> > > > > > >> >> > mbeans > >> > > > > > >> >> > are all on a single node > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > * I restart the processes on the 2 nodes ( hard stop > ans > >> > > start > >> > > > ). > >> > > > > > The > >> > > > > > >> >> > offsets for replication seem to be reset to the > >> earliest, > >> > as > >> > > if > >> > > > > it > >> > > > > > >> is a > >> > > > > > >> >> > brand new mirroring. It is also obvious from the > >> > > > > > >> >> > "record-age-ms-avg|replication-latency-ms-avg" > >> > > > > > >> >> > which I track through the restart. > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > This implies that > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > 1. Load balancing by rebalancing is not working. I > >> cannot > >> > > scale > >> > > > > up > >> > > > > > or > >> > > > > > >> >> down > >> > > > > > >> >> > by adding nodes to the mm2 cluster or removing them. > >> > > > > > >> >> > > >> > > > > > >> >> > 2. Restore on a mirror is not working. If the MM2 > >> cluster > >> > is > >> > > > > > brought > >> > > > > > >> >> down, > >> > > > > > >> >> > it does not start mirroring from the last known > state. I > >> > see > >> > > > the, > >> > > > > > >> >> > state/config topics etc created as expected.. > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > The mm2.properties is pretty mimimal > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > *clusters = a , b* > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > *a.bootstrap.servers = k.....* > >> > > > > > >> >> > > >> > > > > > >> >> > *b.bootstrap.servers = k.....* > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > *# only allow replication dr1 -> dr2* > >> > > > > > >> >> > > >> > > > > > >> >> > *a->b.enabled = true* > >> > > > > > >> >> > > >> > > > > > >> >> > *a->b.topics = act_search_page* > >> > > > > > >> >> > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled = false* > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > *b->a..enabled = false* > >> > > > > > >> >> > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled = false* > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > What do you think is the issue ? > >> > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > Thanks > >> > > > > > >> >> > > >> > > > > > >> >> > >> > > > > > >> > > >> > > > > > >> > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >