Oh sorry a. COUNTER... is more like it.... On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi <vishal.santo...@gmail.com> wrote:
> Will do > One more thing the age/latency metrics seem to be analogous as in they > seem to be calculated using similar routines. I would think a metric > tracking > the number of flush failures ( as a GAUGE ) given > offset.flush.timeout.ms would be highly beneficial. > > Regards.. > > > On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan <ryannedo...@gmail.com> > wrote: > >> Ah, I see you are correct. Also I misspoke saying "workers" earlier, as >> the >> consumer is not created by the worker, but the task. >> >> I suppose the put() could be changed to putIfAbsent() here to enable this >> property to be changed. Maybe submit a PR? >> >> Ryanne >> >> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi < >> vishal.santo...@gmail.com> >> wrote: >> >> > Hmm ( I did both ) >> > >> > another->another_test.enabled = true >> > >> > another->another_test.topics = act_post >> > >> > another->another_test.emit.heartbeats.enabled = false >> > >> > another->another_test.consumer.auto.offset.reset = latest >> > >> > another->another_test.sync.topic.acls.enabled = false >> > >> > another.consumer.auto.offset.reset = latest >> > >> > >> > >> > When I grep for the ConsumerConfig ( and there are 8 instances, this >> topic >> > has 4 partitions ) >> > >> > >> > [2019-10-17 14:01:21,879] INFO ConsumerConfig values: >> > >> > allow.auto.create.topics = true >> > >> > auto.commit.interval.ms = 5000 >> > >> > *auto.offset.reset* = earliest >> > >> > >> > I am now using the 2.4 branch from kafka trunk >> > https://github.com/apache/kafka/tree/2.4/connect/mirror >> > >> > >> > This code change works and makes sense.. I think all other settings >> will be >> > fine ( as can be overridden ) but for the 2 below.. >> > >> > >> > *--- >> > >> > >> a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* >> > >> > *+++ >> > >> > >> b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java* >> > >> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig extends >> > AbstractConfig { >> > >> > >> > props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); >> > >> > props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX)); >> > >> > props.put("enable.auto.commit", "false"); >> > >> > - props.put("auto.offset.reset", "earliest"); >> > >> > + props.put("auto.offset.reset", "latest"); >> > >> > return props; >> > >> > } >> > >> > >> > Regards. >> > >> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan <ryannedo...@gmail.com> >> > wrote: >> > >> > > Vishal, you should be able to override the properties passed to the >> > > internal workers using properties like >> A->B.consumer.auto.offset.reset or >> > > A.consumer.auto.offset.reset in the mm2.properties file. Certain >> > top-level >> > > properties like tasks.max are honored without the A->B or A prefix, >> but >> > > auto.offset.reset is not one of them. >> > > >> > > Ryanne >> > > >> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi < >> > vishal.santo...@gmail.com >> > > > >> > > wrote: >> > > >> > > > Hey Ryanne, >> > > > >> > > > >> > > > How do I override auto.offset.reset = latest for consumers >> through >> > > > mm2.properties. I have tried straight up . auto.offset.reset and >> > > consumer. >> > > > auto.offset.reset but it defaults to earliest.. I do have a query >> in >> > > > another thread but though you might know off hand.. >> > > > >> > > > I would imagine there is some way in general of overriding consumer >> and >> > > > producer configs through mm2.properties in MM2 ? >> > > > >> > > > Regards. >> > > > >> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi < >> > > vishal.santo...@gmail.com >> > > > > >> > > > wrote: >> > > > >> > > > > Thank you so much for all your help. Will keep you posted on >> tests I >> > > > do.. >> > > > > I hope this is helpful to other folks too.. >> > > > > >> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan < >> ryannedo...@gmail.com> >> > > > > wrote: >> > > > > >> > > > >> That's right. MM2 is at-least-once for now, same as legacy >> > > MirrorMaker. >> > > > >> You >> > > > >> can follow https://issues.apache.org/jira/browse/KAFKA-6080 for >> > > updates >> > > > >> on >> > > > >> exactly-once semantics in Connect. >> > > > >> >> > > > >> Ryanne >> > > > >> >> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi < >> > > > >> vishal.santo...@gmail.com> >> > > > >> wrote: >> > > > >> >> > > > >> > >> You are correct. I'm working on a KIP and PoC to introduce >> > > > >> > transactions to >> > > > >> > >> Connect for this exact purpose :) >> > > > >> > >> > > > >> > That is awesome. Any time frame ? >> > > > >> > >> > > > >> > >> > > > >> > In the mean time the SLA as of now >> > > > >> > >> > > > >> > 1. It is conceivable that we flush the producer to the target >> > > cluster >> > > > >> but >> > > > >> > fail to offset commit. If there was a restart before the next >> > > > successful >> > > > >> > offset commit, there will be duplicates and a part of data is >> > > > >> replayed ( >> > > > >> > at least once ) ? >> > > > >> > >> > > > >> > 2. The same can be said about partial flushes, though am not >> sure >> > > > about >> > > > >> > how kafka addresses flush ( Is a flush either success or a >> > failure, >> > > > and >> > > > >> > nothing in between ) >> > > > >> > >> > > > >> > Thanks.. >> > > > >> > >> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan < >> > > ryannedo...@gmail.com> >> > > > >> > wrote: >> > > > >> > >> > > > >> > > Hey Vishal, glad to hear you're making progress. >> > > > >> > > >> > > > >> > > > 1. It seems though that flushing [...] the producer and >> > setting >> > > > the >> > > > >> > > > offset to the compacting topic is not atomic OR do we use >> > > > >> > > > transactions here ? >> > > > >> > > >> > > > >> > > You are correct. I'm working on a KIP and PoC to introduce >> > > > >> transactions >> > > > >> > to >> > > > >> > > Connect for this exact purpose :) >> > > > >> > > >> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ), and I >> have >> > 2 >> > > > >> topics >> > > > >> > > with >> > > > >> > > > 1 partition each. Do I assume this right, as in there are 4 >> > > > consumer >> > > > >> > > groups >> > > > >> > > > ( on CG per thread ) ... >> > > > >> > > >> > > > >> > > Some details here: >> > > > >> > > - tasks.max controls the maximum number of tasks created per >> > > > Connector >> > > > >> > > instance. Both MirrorSourceConnector and >> > MirrorCheckpointConnector >> > > > >> will >> > > > >> > > create multiple tasks (up to tasks.max), but >> > > > MirrorHeartbeatConnector >> > > > >> > only >> > > > >> > > ever creates a single task. Moreover, there cannot be more >> tasks >> > > > than >> > > > >> > > topic-partitions (for MirrorSourceConnector) or consumer >> groups >> > > (for >> > > > >> > > MirrorCheckpointConnector). So if you have two topics with >> one >> > > > >> partition >> > > > >> > > each and 1 consumer group total, you'll have two >> > > > MirrorSourceConnector >> > > > >> > > tasks, one MirrorHeartbeatConnector task, and one >> > > > >> > MirrorCheckpointConnector >> > > > >> > > tasks, for a total of four. And that's in one direction >> only: if >> > > you >> > > > >> have >> > > > >> > > multiple source->target herders enabled, each will create >> tasks >> > > > >> > > independently. >> > > > >> > > - There are no consumer groups in MM2, technically. The >> Connect >> > > > >> framework >> > > > >> > > uses the Coordinator API and internal topics to divide tasks >> > among >> > > > >> > workers >> > > > >> > > -- not a consumer group per se. The MM2 connectors use the >> > > assign() >> > > > >> API, >> > > > >> > > not the subscribe() API, so there are no consumer groups >> there >> > > > >> either. In >> > > > >> > > fact, they don't commit() either. This is nice, as it >> > eliminates a >> > > > >> lot of >> > > > >> > > the rebalancing problems legacy MirrorMaker has been plagued >> > with. >> > > > >> With >> > > > >> > > MM2, rebalancing only occurs when the number of workers >> changes >> > or >> > > > >> when >> > > > >> > the >> > > > >> > > assignments change (e.g. new topics are discovered). >> > > > >> > > >> > > > >> > > Ryanne >> > > > >> > > >> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal Santoshi < >> > > > >> > > vishal.santo...@gmail.com> >> > > > >> > > wrote: >> > > > >> > > >> > > > >> > > > Hey Ryanne, >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > The test was on topics that had a 7 day >> retention. >> > > > Which >> > > > >> > > > generally implies that the batch size for flush is pretty >> > high ( >> > > > >> till >> > > > >> > the >> > > > >> > > > consumption becomes current ). The >> offset.flush.timeout.ms >> > > > >> defaults >> > > > >> > to >> > > > >> > > 5 >> > > > >> > > > seconds and the code will not send in the offsets if the >> flush >> > > is >> > > > >> not >> > > > >> > > > complete. Increasing that time out did solve the "not >> sending >> > > the >> > > > >> > offset >> > > > >> > > to >> > > > >> > > > topic" issue. >> > > > >> > > > >> > > > >> > > > Two questions ( I am being greedy here :) ) >> > > > >> > > > >> > > > >> > > > 1. It seems though that flushing the flushing the producer >> and >> > > > >> setting >> > > > >> > > the >> > > > >> > > > offset to the compacting topic is not atomic OR do we use >> > > > >> > > > transactions here ? >> > > > >> > > > >> > > > >> > > > 2. I see >> > > > >> > > > >> > > > >> > > > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing >> > 956435 >> > > > >> > > > >> > > > >> > > > WorkerSourceTask{id=MirrorSourceConnector-1} flushing >> 356251 >> > > > >> > > > >> > > > >> > > > WorkerSourceTask{id=MirrorCheckpointConnector-2} flushing >> 0 >> > > > >> > > > >> > > > >> > > > WorkerSourceTask{id=MirrorCheckpointConnector-3} flushing >> 0 >> > > > >> > outstanding >> > > > >> > > > messages >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ), and I >> have >> > 2 >> > > > >> topics >> > > > >> > > with >> > > > >> > > > 1 partition each. Do I assume this right, as in there are 4 >> > > > consumer >> > > > >> > > groups >> > > > >> > > > ( on CG per thread ) ... >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > THANKS A LOT >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > Vishal. >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan < >> > > > ryannedo...@gmail.com >> > > > >> > >> > > > >> > > > wrote: >> > > > >> > > > >> > > > >> > > > > > timed out >> > > > >> > > > > while waiting for producer to flush outstanding >> > > > >> > > > > >> > > > >> > > > > Yeah, that's what I'd expect to see if Connect was >> unable to >> > > > send >> > > > >> > > records >> > > > >> > > > > to the downstream remote topics, e.g. if >> > min.in-sync.replicas >> > > > were >> > > > >> > > > > misconfigured. Given some data seems to arrive, it's >> > possible >> > > > that >> > > > >> > > > > everything is configured correctly but with too much >> latency >> > > to >> > > > >> > > > > successfully commit within the default timeouts. You may >> > want >> > > to >> > > > >> > > increase >> > > > >> > > > > the number of tasks substantially to achieve more >> > parallelism >> > > > and >> > > > >> > > > > throughput. >> > > > >> > > > > >> > > > >> > > > > Ryanne >> > > > >> > > > > >> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi < >> > > > >> > > vishal.santo...@gmail.com >> > > > >> > > > > >> > > > >> > > > > wrote: >> > > > >> > > > > >> > > > >> > > > > > Aah no.. this is more to it. Note sure if related to >> the >> > > > above. >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114 >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > Is timing out based on >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133 >> > > > >> > > > > > >> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR >> > > > >> > > > > > WorkerSourceTask{id=MirrorSourceConnector-0} Failed to >> > > flush, >> > > > >> timed >> > > > >> > > out >> > > > >> > > > > > while waiting for producer to flush outstanding 36478 >> > > messages >> > > > >> > > > > > (org.apache.kafka.connect.runtime.WorkerSourceTask:423) >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi < >> > > > >> > > > > vishal.santo...@gmail.com >> > > > >> > > > > > > >> > > > >> > > > > > wrote: >> > > > >> > > > > > >> > > > >> > > > > > > I think this might be it.. Could you confirm. It >> seems >> > to >> > > be >> > > > >> on >> > > > >> > the >> > > > >> > > > > path >> > > > >> > > > > > > to commit the offsets.. but not sure... >> > > > >> > > > > > > >> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR Scheduler for >> > > > >> > MirrorSourceConnector >> > > > >> > > > > > caught >> > > > >> > > > > > > exception in scheduled task: syncing topic ACLs >> > > > >> > > > > > > (org.apache.kafka.connect.mirror.Scheduler:102) >> > > > >> > > > > > > >> > > > >> > > > > > > java.util.concurrent.ExecutionException: >> > > > >> > > > > > > >> > org.apache.kafka.common.errors.SecurityDisabledException: >> > > No >> > > > >> > > > Authorizer >> > > > >> > > > > > is >> > > > >> > > > > > > configured on the broker >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > >> > > > >> >> > > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> > > > >> > > > > > > >> > > > >> > > > > > > at >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> > > > >> > > > > > > >> > > > >> > > > > > > at java.lang.Thread.run(Thread.java:748) >> > > > >> > > > > > > >> > > > >> > > > > > > Caused by: >> > > > >> > > org.apache.kafka.common.errors.SecurityDisabledException: >> > > > >> > > > No >> > > > >> > > > > > > Authorizer is configured on the broker >> > > > >> > > > > > > >> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan < >> > > > >> > > ryannedo...@gmail.com >> > > > >> > > > > >> > > > >> > > > > > > wrote: >> > > > >> > > > > > > >> > > > >> > > > > > >> > I do not have a single record in the offsets topic >> > > > >> > > > > > >> >> > > > >> > > > > > >> That's definitely not normal. You are correct that >> > > without >> > > > >> > records >> > > > >> > > > in >> > > > >> > > > > > that >> > > > >> > > > > > >> topic, MM2 will restart from EARLIEST. The offsets >> > should >> > > > be >> > > > >> > > stored >> > > > >> > > > > > >> periodically and whenever the connectors gracefully >> > > > shutdown >> > > > >> or >> > > > >> > > > > restart. >> > > > >> > > > > > >> >> > > > >> > > > > > >> Is it possible the topics don't have required ACLs >> or >> > > > >> something? >> > > > >> > > > Also >> > > > >> > > > > > >> note: >> > > > >> > > > > > >> Connect wants the offsets topic to have a large >> number >> > of >> > > > >> > > partitions >> > > > >> > > > > and >> > > > >> > > > > > >> to >> > > > >> > > > > > >> be compacted. Though I can't imagine either would >> > prevent >> > > > >> > commits >> > > > >> > > > from >> > > > >> > > > > > >> being sent. >> > > > >> > > > > > >> >> > > > >> > > > > > >> Ryanne >> > > > >> > > > > > >> >> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal Santoshi < >> > > > >> > > > > > >> vishal.santo...@gmail.com> >> > > > >> > > > > > >> wrote: >> > > > >> > > > > > >> >> > > > >> > > > > > >> > 2nd/restore issue ( I think I need to solve the >> > > offsets >> > > > >> topic >> > > > >> > > > issue >> > > > >> > > > > > >> > before I go with the scale up and down issue ) >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > As you had indicated, I went ahead and created the >> > > > offsets >> > > > >> > > topic. >> > > > >> > > > > The >> > > > >> > > > > > >> > status of the cluster ( destination ) is thus >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers Spread %Brokers >> Skew >> > > > >> %Brokers >> > > > >> > > > > Leader >> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated %Leader >> SizeProducer >> > > > >> > > > > > Message/SecSummed >> > > > >> > > > > > >> > Recent Offsets >> > > > >> > > > > > >> > s8k.checkpoints.internal >> > > > >> > > > > > >> > < >> > > > >> > > > > > >> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >> > > > >> > > > > > >> > s8k.act_search_page >> > > > >> > > > > > >> > < >> > > > >> > > > > > >> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842 >> > > > >> > > > > > >> > s8k.act_reach >> > > > >> > > > > > >> > < >> > > > >> > > > > > >> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529 >> > > > >> > > > > > >> > mm2-status.s8k.internal >> > > > >> > > > > > >> > < >> > > > >> > > > > > >> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10 >> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal >> > > > >> > > > > > >> > < >> > > > >> > > > > > >> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >> > > > >> > > > > > >> > mm2-offsets.s8k.internal >> > > > >> > > > > > >> > < >> > > > >> > > > > > >> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0 >> > > > >> > > > > > >> > mm2-configs.s8k.internal >> > > > >> > > > > > >> > < >> > > > >> > > > > > >> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13 >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > You can see . that we have the 5 ( I created bot >> the >> > > > >> offsets, >> > > > >> > > to >> > > > >> > > > be >> > > > >> > > > > > >> safe >> > > > >> > > > > > >> > for the below ) >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *clusters = s8k, s8k_test* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *s8k_test.bootstrap.servers = ......* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *# only allow replication dr1 -> dr2* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *s8k->s8k_test.topics = act_search_page|act_reach* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *s8k->s8k_test.emit.heartbeats.enabled = false* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *s8k_test->s8k.emit.heartbeats.enabled = false* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *s8k_test.replication.factor = 3* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *s8k.replication.factor = 3* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *offsets.storage.replication.factor = 3* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *replication.factor = 3* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *replication.policy.separator = .* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > *tasks.max = 4* >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > What seems strange is that I do not have a single >> > > record >> > > > in >> > > > >> > the >> > > > >> > > > > > offsets >> > > > >> > > > > > >> > topic.. Is that normal ? I would imagine that >> > > without a >> > > > >> > > record, >> > > > >> > > > > > there >> > > > >> > > > > > >> is >> > > > >> > > > > > >> > no way that a restore would happen.... And that is >> > > > obvious >> > > > >> > when >> > > > >> > > I >> > > > >> > > > > > >> restart >> > > > >> > > > > > >> > the mm2 instance... Find the screenshot attached. >> In >> > > > >> essence >> > > > >> > the >> > > > >> > > > > > latency >> > > > >> > > > > > >> > avg lag is reset \when the mm2 instance is reset >> > > > >> indicating no >> > > > >> > > > > restore >> > > > >> > > > > > >> but >> > > > >> > > > > > >> > restart from EARLIEST... I must be missing some >> thing >> > > > >> simple >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > >> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne Dolan < >> > > > >> > > > ryannedo...@gmail.com >> > > > >> > > > > > >> > > > >> > > > > > >> > wrote: >> > > > >> > > > > > >> > >> > > > >> > > > > > >> >> Vishal, the first issue is easy: you must set >> > > tasks.max >> > > > to >> > > > >> > > > > something >> > > > >> > > > > > >> above >> > > > >> > > > > > >> >> 1 (the default) in order to achieve any >> parallelism. >> > > > This >> > > > >> > > > property >> > > > >> > > > > is >> > > > >> > > > > > >> >> passed along to the internal Connect workers. >> It's >> > > > >> > unfortunate >> > > > >> > > > that >> > > > >> > > > > > >> >> Connect >> > > > >> > > > > > >> >> is not smart enough to default this property to >> the >> > > > >> number of >> > > > >> > > > > > workers. >> > > > >> > > > > > >> I >> > > > >> > > > > > >> >> suspect that will improve before long. >> > > > >> > > > > > >> >> >> > > > >> > > > > > >> >> For the second issue, is it possible you are >> missing >> > > the >> > > > >> > > offsets >> > > > >> > > > > > >> topic? It >> > > > >> > > > > > >> >> should exist alongside the config and status >> topics. >> > > > >> Connect >> > > > >> > > > should >> > > > >> > > > > > >> create >> > > > >> > > > > > >> >> this topic, but there are various reasons this >> can >> > > fail, >> > > > >> e.g. >> > > > >> > > if >> > > > >> > > > > the >> > > > >> > > > > > >> >> replication factor is misconfigured. You can try >> > > > creating >> > > > >> > this >> > > > >> > > > > topic >> > > > >> > > > > > >> >> manually or changing >> > > offsets.storage.replication.factor. >> > > > >> > > > > > >> >> >> > > > >> > > > > > >> >> Ryanne >> > > > >> > > > > > >> >> >> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal Santoshi < >> > > > >> > > > > > >> vishal.santo...@gmail.com> >> > > > >> > > > > > >> >> wrote: >> > > > >> > > > > > >> >> >> > > > >> > > > > > >> >> > Using >> > > > >> > > > https://github.com/apache/kafka/tree/trunk/connect/mirror >> > > > >> > > > > > as a >> > > > >> > > > > > >> >> > guide, >> > > > >> > > > > > >> >> > I have build from source the origin/KIP-382 of >> > > > >> > > > > > >> >> > https://github.com/apache/kafka.git. >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > I am seeing 2 issues >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > * I brought up 2 processes on 2 different >> nodes ( >> > > they >> > > > >> are >> > > > >> > > > > actually >> > > > >> > > > > > >> >> pods on >> > > > >> > > > > > >> >> > k8s but that should not matter ). They share >> the >> > > > >> > > mm2.properties >> > > > >> > > > > > file >> > > > >> > > > > > >> and >> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3 topics with 8 >> > partitions >> > > > in >> > > > >> > > total. >> > > > >> > > > > > That >> > > > >> > > > > > >> >> seems >> > > > >> > > > > > >> >> > to be the way to create a standalone mm2 >> cluster. >> > I >> > > do >> > > > >> not >> > > > >> > > > > however >> > > > >> > > > > > >> see( >> > > > >> > > > > > >> >> at >> > > > >> > > > > > >> >> > least the mbeans do not show ) any attempt to >> > > > rebalance. >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> >> > > > >> > > > > > >> >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > >> > > >> > >> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process >> > > > >> > > > > > >> >> > mbeans >> > > > >> > > > > > >> >> > are all on a single node >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > * I restart the processes on the 2 nodes ( hard >> > stop >> > > > ans >> > > > >> > > start >> > > > >> > > > ). >> > > > >> > > > > > The >> > > > >> > > > > > >> >> > offsets for replication seem to be reset to the >> > > > >> earliest, >> > > > >> > as >> > > > >> > > if >> > > > >> > > > > it >> > > > >> > > > > > >> is a >> > > > >> > > > > > >> >> > brand new mirroring. It is also obvious from >> the >> > > > >> > > > > > >> >> > "record-age-ms-avg|replication-latency-ms-avg" >> > > > >> > > > > > >> >> > which I track through the restart. >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > This implies that >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing is not >> working. I >> > > > >> cannot >> > > > >> > > scale >> > > > >> > > > > up >> > > > >> > > > > > or >> > > > >> > > > > > >> >> down >> > > > >> > > > > > >> >> > by adding nodes to the mm2 cluster or removing >> > them. >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > 2. Restore on a mirror is not working. If the >> MM2 >> > > > >> cluster >> > > > >> > is >> > > > >> > > > > > brought >> > > > >> > > > > > >> >> down, >> > > > >> > > > > > >> >> > it does not start mirroring from the last known >> > > > state. I >> > > > >> > see >> > > > >> > > > the, >> > > > >> > > > > > >> >> > state/config topics etc created as expected.. >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > The mm2.properties is pretty mimimal >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > *clusters = a , b* >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....* >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....* >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > *# only allow replication dr1 -> dr2* >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > *a->b.enabled = true* >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > *a->b.topics = act_search_page* >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled = false* >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > *b->a..enabled = false* >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled = false* >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > What do you think is the issue ? >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> > Thanks >> > > > >> > > > > > >> >> > >> > > > >> > > > > > >> >> >> > > > >> > > > > > >> > >> > > > >> > > > > > >> >> > > > >> > > > > > > >> > > > >> > > > > > >> > > > >> > > > > >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > > >> > > > >> > > >> > >> >