Hey Ryanne,
I see a definite issue. I am doing an intense test and I bring up 12 VMs ( they are 12 pods with 8 cpus each ), replicating about 1200 plus topics ( fairly heavy 100mbps ) ... They are acquired and are staggered as they come up..I see a fraction of these nodes not assigned any replication....There is plenty to go around. ( more then a couple of thousand partitions ) . is there something I am missing.... As in my current case 5 of the 12 VMs are idle.. Vishal On Fri, Oct 18, 2019 at 7:05 AM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Oh sorry a. COUNTER... is more like it.... > > On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> Will do >> One more thing the age/latency metrics seem to be analogous as in >> they seem to be calculated using similar routines. I would think a metric >> tracking >> the number of flush failures ( as a GAUGE ) given >> offset.flush.timeout.ms would be highly beneficial. >> >> Regards.. >> >> >> On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan <ryannedo...@gmail.com> >> wrote: >> >>> Ah, I see you are correct. Also I misspoke saying "workers" earlier, as >>> the >>> consumer is not created by the worker, but the task. >>> >>> I suppose the put() could be changed to putIfAbsent() here to enable this >>> property to be changed. Maybe submit a PR? >>> >>> Ryanne >>> >>> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi < >>> vishal.santo...@gmail.com> >>> wrote: >>> >>> > Hmm ( I did both ) >>> > >>> > another->another_test.enabled = true >>> > >>> > another->another_test.topics = act_post >>> > >>> > another->another_test.emit.heartbeats.enabled = false >>> > >>> > another->another_test.consumer.auto.offset.reset = latest >>> > >>> > another->another_test.sync.topic.acls.enabled = false >>> > >>> > another.consumer.auto.offset.reset = latest >>> > >>> > >>> > >>> > When I grep for the ConsumerConfig ( and there are 8 instances, this >>> topic >>> > has 4 partitions ) >>> > >>> > >>> > [2019-10-17 14:01:21,879] INFO ConsumerConfig values: >>> > >>> > allow.auto.create.topics = true >>> > >>> > auto.commit.interval.ms = 5000 >>> > >>> > *auto.offset.reset* = earliest >>> > >>> > >>> > I am now using the 2.4 branch from kafka trunk >>> > https://github.com/apache/kafka/tree/2.4/connect/mirror >>> > >>> > >>> > This code change works and makes sense.. I think all other settings >>> will be >>> > fine ( as can be overridden ) but for the 2 below.. >>> > >>> > >>> > *--- >>> > >>> > >>> a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* >>> > >>> > *+++ >>> > >>> > >>> b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* >>> > >>> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig extends >>> > AbstractConfig { >>> > >>> > >>> > props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); >>> > >>> > props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX)); >>> > >>> > props.put("enable.auto.commit", "false"); >>> > >>> > - props.put("auto.offset.reset", "earliest"); >>> > >>> > + props.put("auto.offset.reset", "latest"); >>> > >>> > return props; >>> > >>> > } >>> > >>> > >>> > Regards. >>> > >>> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan <ryannedo...@gmail.com> >>> > wrote: >>> > >>> > > Vishal, you should be able to override the properties passed to the >>> > > internal workers using properties like >>> A->B.consumer.auto.offset.reset or >>> > > A.consumer.auto.offset.reset in the mm2.properties file. Certain >>> > top-level >>> > > properties like tasks.max are honored without the A->B or A prefix, >>> but >>> > > auto.offset.reset is not one of them. >>> > > >>> > > Ryanne >>> > > >>> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi < >>> > vishal.santo...@gmail.com >>> > > > >>> > > wrote: >>> > > >>> > > > Hey Ryanne, >>> > > > >>> > > > >>> > > > How do I override auto.offset.reset = latest for consumers >>> through >>> > > > mm2.properties. I have tried straight up . auto.offset.reset and >>> > > consumer. >>> > > > auto.offset.reset but it defaults to earliest.. I do have a query >>> in >>> > > > another thread but though you might know off hand.. >>> > > > >>> > > > I would imagine there is some way in general of overriding >>> consumer and >>> > > > producer configs through mm2.properties in MM2 ? >>> > > > >>> > > > Regards. >>> > > > >>> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi < >>> > > vishal.santo...@gmail.com >>> > > > > >>> > > > wrote: >>> > > > >>> > > > > Thank you so much for all your help. Will keep you posted on >>> tests I >>> > > > do.. >>> > > > > I hope this is helpful to other folks too.. >>> > > > > >>> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan < >>> ryannedo...@gmail.com> >>> > > > > wrote: >>> > > > > >>> > > > >> That's right. MM2 is at-least-once for now, same as legacy >>> > > MirrorMaker. >>> > > > >> You >>> > > > >> can follow https://issues.apache.org/jira/browse/KAFKA-6080 for >>> > > updates >>> > > > >> on >>> > > > >> exactly-once semantics in Connect. >>> > > > >> >>> > > > >> Ryanne >>> > > > >> >>> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi < >>> > > > >> vishal.santo...@gmail.com> >>> > > > >> wrote: >>> > > > >> >>> > > > >> > >> You are correct. I'm working on a KIP and PoC to >>> introduce >>> > > > >> > transactions to >>> > > > >> > >> Connect for this exact purpose :) >>> > > > >> > >>> > > > >> > That is awesome. Any time frame ? >>> > > > >> > >>> > > > >> > >>> > > > >> > In the mean time the SLA as of now >>> > > > >> > >>> > > > >> > 1. It is conceivable that we flush the producer to the target >>> > > cluster >>> > > > >> but >>> > > > >> > fail to offset commit. If there was a restart before the next >>> > > > successful >>> > > > >> > offset commit, there will be duplicates and a part of data >>> is >>> > > > >> replayed ( >>> > > > >> > at least once ) ? >>> > > > >> > >>> > > > >> > 2. The same can be said about partial flushes, though am not >>> sure >>> > > > about >>> > > > >> > how kafka addresses flush ( Is a flush either success or a >>> > failure, >>> > > > and >>> > > > >> > nothing in between ) >>> > > > >> > >>> > > > >> > Thanks.. >>> > > > >> > >>> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan < >>> > > ryannedo...@gmail.com> >>> > > > >> > wrote: >>> > > > >> > >>> > > > >> > > Hey Vishal, glad to hear you're making progress. >>> > > > >> > > >>> > > > >> > > > 1. It seems though that flushing [...] the producer and >>> > setting >>> > > > the >>> > > > >> > > > offset to the compacting topic is not atomic OR do we >>> use >>> > > > >> > > > transactions here ? >>> > > > >> > > >>> > > > >> > > You are correct. I'm working on a KIP and PoC to introduce >>> > > > >> transactions >>> > > > >> > to >>> > > > >> > > Connect for this exact purpose :) >>> > > > >> > > >>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ), and I >>> have >>> > 2 >>> > > > >> topics >>> > > > >> > > with >>> > > > >> > > > 1 partition each. Do I assume this right, as in there are >>> 4 >>> > > > consumer >>> > > > >> > > groups >>> > > > >> > > > ( on CG per thread ) ... >>> > > > >> > > >>> > > > >> > > Some details here: >>> > > > >> > > - tasks.max controls the maximum number of tasks created per >>> > > > Connector >>> > > > >> > > instance. Both MirrorSourceConnector and >>> > MirrorCheckpointConnector >>> > > > >> will >>> > > > >> > > create multiple tasks (up to tasks.max), but >>> > > > MirrorHeartbeatConnector >>> > > > >> > only >>> > > > >> > > ever creates a single task. Moreover, there cannot be more >>> tasks >>> > > > than >>> > > > >> > > topic-partitions (for MirrorSourceConnector) or consumer >>> groups >>> > > (for >>> > > > >> > > MirrorCheckpointConnector). So if you have two topics with >>> one >>> > > > >> partition >>> > > > >> > > each and 1 consumer group total, you'll have two >>> > > > MirrorSourceConnector >>> > > > >> > > tasks, one MirrorHeartbeatConnector task, and one >>> > > > >> > MirrorCheckpointConnector >>> > > > >> > > tasks, for a total of four. And that's in one direction >>> only: if >>> > > you >>> > > > >> have >>> > > > >> > > multiple source->target herders enabled, each will create >>> tasks >>> > > > >> > > independently. >>> > > > >> > > - There are no consumer groups in MM2, technically. The >>> Connect >>> > > > >> framework >>> > > > >> > > uses the Coordinator API and internal topics to divide tasks >>> > among >>> > > > >> > workers >>> > > > >> > > -- not a consumer group per se. The MM2 connectors use the >>> > > assign() >>> > > > >> API, >>> > > > >> > > not the subscribe() API, so there are no consumer groups >>> there >>> > > > >> either. In >>> > > > >> > > fact, they don't commit() either. This is nice, as it >>> > eliminates a >>> > > > >> lot of >>> > > > >> > > the rebalancing problems legacy MirrorMaker has been plagued >>> > with. >>> > > > >> With >>> > > > >> > > MM2, rebalancing only occurs when the number of workers >>> changes >>> > or >>> > > > >> when >>> > > > >> > the >>> > > > >> > > assignments change (e.g. new topics are discovered). >>> > > > >> > > >>> > > > >> > > Ryanne >>> > > > >> > > >>> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal Santoshi < >>> > > > >> > > vishal.santo...@gmail.com> >>> > > > >> > > wrote: >>> > > > >> > > >>> > > > >> > > > Hey Ryanne, >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > The test was on topics that had a 7 day >>> retention. >>> > > > Which >>> > > > >> > > > generally implies that the batch size for flush is pretty >>> > high ( >>> > > > >> till >>> > > > >> > the >>> > > > >> > > > consumption becomes current ). The >>> offset.flush.timeout.ms >>> > > > >> defaults >>> > > > >> > to >>> > > > >> > > 5 >>> > > > >> > > > seconds and the code will not send in the offsets if the >>> flush >>> > > is >>> > > > >> not >>> > > > >> > > > complete. Increasing that time out did solve the "not >>> sending >>> > > the >>> > > > >> > offset >>> > > > >> > > to >>> > > > >> > > > topic" issue. >>> > > > >> > > > >>> > > > >> > > > Two questions ( I am being greedy here :) ) >>> > > > >> > > > >>> > > > >> > > > 1. It seems though that flushing the flushing the >>> producer and >>> > > > >> setting >>> > > > >> > > the >>> > > > >> > > > offset to the compacting topic is not atomic OR do we >>> use >>> > > > >> > > > transactions here ? >>> > > > >> > > > >>> > > > >> > > > 2. I see >>> > > > >> > > > >>> > > > >> > > > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing >>> > 956435 >>> > > > >> > > > >>> > > > >> > > > WorkerSourceTask{id=MirrorSourceConnector-1} flushing >>> 356251 >>> > > > >> > > > >>> > > > >> > > > WorkerSourceTask{id=MirrorCheckpointConnector-2} >>> flushing 0 >>> > > > >> > > > >>> > > > >> > > > WorkerSourceTask{id=MirrorCheckpointConnector-3} >>> flushing 0 >>> > > > >> > outstanding >>> > > > >> > > > messages >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ), and I >>> have >>> > 2 >>> > > > >> topics >>> > > > >> > > with >>> > > > >> > > > 1 partition each. Do I assume this right, as in there are >>> 4 >>> > > > consumer >>> > > > >> > > groups >>> > > > >> > > > ( on CG per thread ) ... >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > THANKS A LOT >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > Vishal. >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > >>> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan < >>> > > > ryannedo...@gmail.com >>> > > > >> > >>> > > > >> > > > wrote: >>> > > > >> > > > >>> > > > >> > > > > > timed out >>> > > > >> > > > > while waiting for producer to flush outstanding >>> > > > >> > > > > >>> > > > >> > > > > Yeah, that's what I'd expect to see if Connect was >>> unable to >>> > > > send >>> > > > >> > > records >>> > > > >> > > > > to the downstream remote topics, e.g. if >>> > min.in-sync.replicas >>> > > > were >>> > > > >> > > > > misconfigured. Given some data seems to arrive, it's >>> > possible >>> > > > that >>> > > > >> > > > > everything is configured correctly but with too much >>> latency >>> > > to >>> > > > >> > > > > successfully commit within the default timeouts. You may >>> > want >>> > > to >>> > > > >> > > increase >>> > > > >> > > > > the number of tasks substantially to achieve more >>> > parallelism >>> > > > and >>> > > > >> > > > > throughput. >>> > > > >> > > > > >>> > > > >> > > > > Ryanne >>> > > > >> > > > > >>> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi < >>> > > > >> > > vishal.santo...@gmail.com >>> > > > >> > > > > >>> > > > >> > > > > wrote: >>> > > > >> > > > > >>> > > > >> > > > > > Aah no.. this is more to it. Note sure if related to >>> the >>> > > > above. >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114 >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > Is timing out based on >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133 >>> > > > >> > > > > > >>> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR >>> > > > >> > > > > > WorkerSourceTask{id=MirrorSourceConnector-0} Failed to >>> > > flush, >>> > > > >> timed >>> > > > >> > > out >>> > > > >> > > > > > while waiting for producer to flush outstanding 36478 >>> > > messages >>> > > > >> > > > > > >>> (org.apache.kafka.connect.runtime.WorkerSourceTask:423) >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi < >>> > > > >> > > > > vishal.santo...@gmail.com >>> > > > >> > > > > > > >>> > > > >> > > > > > wrote: >>> > > > >> > > > > > >>> > > > >> > > > > > > I think this might be it.. Could you confirm. It >>> seems >>> > to >>> > > be >>> > > > >> on >>> > > > >> > the >>> > > > >> > > > > path >>> > > > >> > > > > > > to commit the offsets.. but not sure... >>> > > > >> > > > > > > >>> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR Scheduler for >>> > > > >> > MirrorSourceConnector >>> > > > >> > > > > > caught >>> > > > >> > > > > > > exception in scheduled task: syncing topic ACLs >>> > > > >> > > > > > > (org.apache.kafka.connect.mirror.Scheduler:102) >>> > > > >> > > > > > > >>> > > > >> > > > > > > java.util.concurrent.ExecutionException: >>> > > > >> > > > > > > >>> > org.apache.kafka.common.errors.SecurityDisabledException: >>> > > No >>> > > > >> > > > Authorizer >>> > > > >> > > > > > is >>> > > > >> > > > > > > configured on the broker >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > >>> > > > >> >>> > > >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> > > > >> > > > > > > >>> > > > >> > > > > > > at java.lang.Thread.run(Thread.java:748) >>> > > > >> > > > > > > >>> > > > >> > > > > > > Caused by: >>> > > > >> > > org.apache.kafka.common.errors.SecurityDisabledException: >>> > > > >> > > > No >>> > > > >> > > > > > > Authorizer is configured on the broker >>> > > > >> > > > > > > >>> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan < >>> > > > >> > > ryannedo...@gmail.com >>> > > > >> > > > > >>> > > > >> > > > > > > wrote: >>> > > > >> > > > > > > >>> > > > >> > > > > > >> > I do not have a single record in the offsets >>> topic >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> That's definitely not normal. You are correct that >>> > > without >>> > > > >> > records >>> > > > >> > > > in >>> > > > >> > > > > > that >>> > > > >> > > > > > >> topic, MM2 will restart from EARLIEST. The offsets >>> > should >>> > > > be >>> > > > >> > > stored >>> > > > >> > > > > > >> periodically and whenever the connectors gracefully >>> > > > shutdown >>> > > > >> or >>> > > > >> > > > > restart. >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> Is it possible the topics don't have required ACLs >>> or >>> > > > >> something? >>> > > > >> > > > Also >>> > > > >> > > > > > >> note: >>> > > > >> > > > > > >> Connect wants the offsets topic to have a large >>> number >>> > of >>> > > > >> > > partitions >>> > > > >> > > > > and >>> > > > >> > > > > > >> to >>> > > > >> > > > > > >> be compacted. Though I can't imagine either would >>> > prevent >>> > > > >> > commits >>> > > > >> > > > from >>> > > > >> > > > > > >> being sent. >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> Ryanne >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal Santoshi < >>> > > > >> > > > > > >> vishal.santo...@gmail.com> >>> > > > >> > > > > > >> wrote: >>> > > > >> > > > > > >> >>> > > > >> > > > > > >> > 2nd/restore issue ( I think I need to solve the >>> > > offsets >>> > > > >> topic >>> > > > >> > > > issue >>> > > > >> > > > > > >> > before I go with the scale up and down issue ) >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > As you had indicated, I went ahead and created >>> the >>> > > > offsets >>> > > > >> > > topic. >>> > > > >> > > > > The >>> > > > >> > > > > > >> > status of the cluster ( destination ) is thus >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers Spread %Brokers >>> Skew >>> > > > >> %Brokers >>> > > > >> > > > > Leader >>> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated %Leader >>> SizeProducer >>> > > > >> > > > > > Message/SecSummed >>> > > > >> > > > > > >> > Recent Offsets >>> > > > >> > > > > > >> > s8k.checkpoints.internal >>> > > > >> > > > > > >> > < >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >>> > > > >> > > > > > >> > s8k.act_search_page >>> > > > >> > > > > > >> > < >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842 >>> > > > >> > > > > > >> > s8k.act_reach >>> > > > >> > > > > > >> > < >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529 >>> > > > >> > > > > > >> > mm2-status.s8k.internal >>> > > > >> > > > > > >> > < >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10 >>> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal >>> > > > >> > > > > > >> > < >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >>> > > > >> > > > > > >> > mm2-offsets.s8k.internal >>> > > > >> > > > > > >> > < >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >>> > > > >> > > > > > >> > mm2-configs.s8k.internal >>> > > > >> > > > > > >> > < >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13 >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > You can see . that we have the 5 ( I created >>> bot the >>> > > > >> offsets, >>> > > > >> > > to >>> > > > >> > > > be >>> > > > >> > > > > > >> safe >>> > > > >> > > > > > >> > for the below ) >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *clusters = s8k, s8k_test* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *s8k_test.bootstrap.servers = ......* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *# only allow replication dr1 -> dr2* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *s8k->s8k_test.topics = >>> act_search_page|act_reach* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *s8k->s8k_test.emit.heartbeats.enabled = false* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *s8k_test->s8k.emit.heartbeats.enabled = false* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *s8k_test.replication.factor = 3* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *s8k.replication.factor = 3* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *offsets.storage.replication.factor = 3* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *replication.factor = 3* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *replication.policy.separator = .* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > *tasks.max = 4* >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > What seems strange is that I do not have a single >>> > > record >>> > > > in >>> > > > >> > the >>> > > > >> > > > > > offsets >>> > > > >> > > > > > >> > topic.. Is that normal ? I would imagine that >>> > > without a >>> > > > >> > > record, >>> > > > >> > > > > > there >>> > > > >> > > > > > >> is >>> > > > >> > > > > > >> > no way that a restore would happen.... And that >>> is >>> > > > obvious >>> > > > >> > when >>> > > > >> > > I >>> > > > >> > > > > > >> restart >>> > > > >> > > > > > >> > the mm2 instance... Find the screenshot >>> attached. In >>> > > > >> essence >>> > > > >> > the >>> > > > >> > > > > > latency >>> > > > >> > > > > > >> > avg lag is reset \when the mm2 instance is reset >>> > > > >> indicating no >>> > > > >> > > > > restore >>> > > > >> > > > > > >> but >>> > > > >> > > > > > >> > restart from EARLIEST... I must be missing some >>> thing >>> > > > >> simple >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne Dolan < >>> > > > >> > > > ryannedo...@gmail.com >>> > > > >> > > > > > >>> > > > >> > > > > > >> > wrote: >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> >> Vishal, the first issue is easy: you must set >>> > > tasks.max >>> > > > to >>> > > > >> > > > > something >>> > > > >> > > > > > >> above >>> > > > >> > > > > > >> >> 1 (the default) in order to achieve any >>> parallelism. >>> > > > This >>> > > > >> > > > property >>> > > > >> > > > > is >>> > > > >> > > > > > >> >> passed along to the internal Connect workers. >>> It's >>> > > > >> > unfortunate >>> > > > >> > > > that >>> > > > >> > > > > > >> >> Connect >>> > > > >> > > > > > >> >> is not smart enough to default this property to >>> the >>> > > > >> number of >>> > > > >> > > > > > workers. >>> > > > >> > > > > > >> I >>> > > > >> > > > > > >> >> suspect that will improve before long. >>> > > > >> > > > > > >> >> >>> > > > >> > > > > > >> >> For the second issue, is it possible you are >>> missing >>> > > the >>> > > > >> > > offsets >>> > > > >> > > > > > >> topic? It >>> > > > >> > > > > > >> >> should exist alongside the config and status >>> topics. >>> > > > >> Connect >>> > > > >> > > > should >>> > > > >> > > > > > >> create >>> > > > >> > > > > > >> >> this topic, but there are various reasons this >>> can >>> > > fail, >>> > > > >> e.g. >>> > > > >> > > if >>> > > > >> > > > > the >>> > > > >> > > > > > >> >> replication factor is misconfigured. You can try >>> > > > creating >>> > > > >> > this >>> > > > >> > > > > topic >>> > > > >> > > > > > >> >> manually or changing >>> > > offsets.storage.replication.factor. >>> > > > >> > > > > > >> >> >>> > > > >> > > > > > >> >> Ryanne >>> > > > >> > > > > > >> >> >>> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal Santoshi < >>> > > > >> > > > > > >> vishal.santo...@gmail.com> >>> > > > >> > > > > > >> >> wrote: >>> > > > >> > > > > > >> >> >>> > > > >> > > > > > >> >> > Using >>> > > > >> > > > https://github.com/apache/kafka/tree/trunk/connect/mirror >>> > > > >> > > > > > as a >>> > > > >> > > > > > >> >> > guide, >>> > > > >> > > > > > >> >> > I have build from source the origin/KIP-382 of >>> > > > >> > > > > > >> >> > https://github.com/apache/kafka.git. >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > I am seeing 2 issues >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > * I brought up 2 processes on 2 different >>> nodes ( >>> > > they >>> > > > >> are >>> > > > >> > > > > actually >>> > > > >> > > > > > >> >> pods on >>> > > > >> > > > > > >> >> > k8s but that should not matter ). They share >>> the >>> > > > >> > > mm2.properties >>> > > > >> > > > > > file >>> > > > >> > > > > > >> and >>> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3 topics with 8 >>> > partitions >>> > > > in >>> > > > >> > > total. >>> > > > >> > > > > > That >>> > > > >> > > > > > >> >> seems >>> > > > >> > > > > > >> >> > to be the way to create a standalone mm2 >>> cluster. >>> > I >>> > > do >>> > > > >> not >>> > > > >> > > > > however >>> > > > >> > > > > > >> see( >>> > > > >> > > > > > >> >> at >>> > > > >> > > > > > >> >> > least the mbeans do not show ) any attempt to >>> > > > rebalance. >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> >>> > > > >> > > > > > >> >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > >>> > > >>> > >>> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process >>> > > > >> > > > > > >> >> > mbeans >>> > > > >> > > > > > >> >> > are all on a single node >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > * I restart the processes on the 2 nodes ( >>> hard >>> > stop >>> > > > ans >>> > > > >> > > start >>> > > > >> > > > ). >>> > > > >> > > > > > The >>> > > > >> > > > > > >> >> > offsets for replication seem to be reset to >>> the >>> > > > >> earliest, >>> > > > >> > as >>> > > > >> > > if >>> > > > >> > > > > it >>> > > > >> > > > > > >> is a >>> > > > >> > > > > > >> >> > brand new mirroring. It is also obvious from >>> the >>> > > > >> > > > > > >> >> > "record-age-ms-avg|replication-latency-ms-avg" >>> > > > >> > > > > > >> >> > which I track through the restart. >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > This implies that >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing is not >>> working. I >>> > > > >> cannot >>> > > > >> > > scale >>> > > > >> > > > > up >>> > > > >> > > > > > or >>> > > > >> > > > > > >> >> down >>> > > > >> > > > > > >> >> > by adding nodes to the mm2 cluster or removing >>> > them. >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > 2. Restore on a mirror is not working. If the >>> MM2 >>> > > > >> cluster >>> > > > >> > is >>> > > > >> > > > > > brought >>> > > > >> > > > > > >> >> down, >>> > > > >> > > > > > >> >> > it does not start mirroring from the last >>> known >>> > > > state. I >>> > > > >> > see >>> > > > >> > > > the, >>> > > > >> > > > > > >> >> > state/config topics etc created as expected.. >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > The mm2.properties is pretty mimimal >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > *clusters = a , b* >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....* >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....* >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > *# only allow replication dr1 -> dr2* >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > *a->b.enabled = true* >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > *a->b.topics = act_search_page* >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled = false* >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > *b->a..enabled = false* >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled = false* >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > What do you think is the issue ? >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> > Thanks >>> > > > >> > > > > > >> >> > >>> > > > >> > > > > > >> >> >>> > > > >> > > > > > >> > >>> > > > >> > > > > > >> >>> > > > >> > > > > > > >>> > > > >> > > > > > >>> > > > >> > > > > >>> > > > >> > > > >>> > > > >> > > >>> > > > >> > >>> > > > >> >>> > > > > >>> > > > >>> > > >>> > >>> >>