Aah no.. this is more to it. Note sure if related to the above. https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114
Is timing out based on https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133 [2019-10-14 18:55:20,820] ERROR WorkerSourceTask{id=MirrorSourceConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 36478 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:423) On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > I think this might be it.. Could you confirm. It seems to be on the path > to commit the offsets.. but not sure... > > [2019-10-14 15:29:14,531] ERROR Scheduler for MirrorSourceConnector caught > exception in scheduled task: syncing topic ACLs > (org.apache.kafka.connect.mirror.Scheduler:102) > > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is > configured on the broker > > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > > at > org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273) > > at > org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214) > > at > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) > > at > org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) > > at > org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: org.apache.kafka.common.errors.SecurityDisabledException: No > Authorizer is configured on the broker > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan <ryannedo...@gmail.com> > wrote: > >> > I do not have a single record in the offsets topic >> >> That's definitely not normal. You are correct that without records in that >> topic, MM2 will restart from EARLIEST. The offsets should be stored >> periodically and whenever the connectors gracefully shutdown or restart. >> >> Is it possible the topics don't have required ACLs or something? Also >> note: >> Connect wants the offsets topic to have a large number of partitions and >> to >> be compacted. Though I can't imagine either would prevent commits from >> being sent. >> >> Ryanne >> >> On Mon, Oct 14, 2019 at 10:46 AM Vishal Santoshi < >> vishal.santo...@gmail.com> >> wrote: >> >> > 2nd/restore issue ( I think I need to solve the offsets topic issue >> > before I go with the scale up and down issue ) >> > >> > As you had indicated, I went ahead and created the offsets topic. The >> > status of the cluster ( destination ) is thus >> > >> > opic# Partitions# BrokersBrokers Spread %Brokers Skew %Brokers Leader >> > Skew %# ReplicasUnder Replicated %Leader SizeProducer Message/SecSummed >> > Recent Offsets >> > s8k.checkpoints.internal >> > < >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal >> > >> > 1 3 60 0 0 3 0 0.00 0 >> > s8k.act_search_page >> > < >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page >> > >> > 1 3 60 0 0 3 0 6675.30 4,166,842 >> > s8k.act_reach >> > < >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach >> > >> > 1 3 60 0 0 3 0 20657.92 11,579,529 >> > mm2-status.s8k.internal >> > < >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal >> > >> > 5 5 100 0 0 3 0 0.00 10 >> > mm2-offsets.s8k_test.internal >> > < >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal >> > >> > 1 3 60 0 0 3 0 0.00 0 >> > mm2-offsets.s8k.internal >> > < >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal >> > >> > 1 3 60 0 0 3 0 0.00 0 >> > mm2-configs.s8k.internal >> > < >> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal >> > >> > 1 3 60 0 0 3 0 0.00 13 >> > >> > You can see . that we have the 5 ( I created bot the offsets, to be >> safe >> > for the below ) >> > >> > *clusters = s8k, s8k_test* >> > >> > >> > >> > *s8k.bootstrap.servers = .....* >> > >> > *s8k_test.bootstrap.servers = ......* >> > >> > >> > *# only allow replication dr1 -> dr2* >> > >> > *s8k->s8k_test.enabled = true* >> > >> > *s8k->s8k_test.topics = act_search_page|act_reach* >> > >> > *s8k->s8k_test.emit.heartbeats.enabled = false* >> > >> > >> > >> > >> > *s8k_test->s8k.enabled = false* >> > >> > *s8k_test->s8k.emit.heartbeats.enabled = false* >> > >> > *s8k_test.replication.factor = 3* >> > >> > *s8k.replication.factor = 3* >> > >> > *offsets.storage.replication.factor = 3* >> > >> > *replication.factor = 3* >> > >> > *replication.policy.separator = .* >> > >> > *tasks.max = 4* >> > >> > >> > >> > >> > What seems strange is that I do not have a single record in the offsets >> > topic.. Is that normal ? I would imagine that without a record, there >> is >> > no way that a restore would happen.... And that is obvious when I >> restart >> > the mm2 instance... Find the screenshot attached. In essence the latency >> > avg lag is reset \when the mm2 instance is reset indicating no restore >> but >> > restart from EARLIEST... I must be missing some thing simple >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne Dolan <ryannedo...@gmail.com> >> > wrote: >> > >> >> Vishal, the first issue is easy: you must set tasks.max to something >> above >> >> 1 (the default) in order to achieve any parallelism. This property is >> >> passed along to the internal Connect workers. It's unfortunate that >> >> Connect >> >> is not smart enough to default this property to the number of workers. >> I >> >> suspect that will improve before long. >> >> >> >> For the second issue, is it possible you are missing the offsets >> topic? It >> >> should exist alongside the config and status topics. Connect should >> create >> >> this topic, but there are various reasons this can fail, e.g. if the >> >> replication factor is misconfigured. You can try creating this topic >> >> manually or changing offsets.storage.replication.factor. >> >> >> >> Ryanne >> >> >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal Santoshi < >> vishal.santo...@gmail.com> >> >> wrote: >> >> >> >> > Using https://github.com/apache/kafka/tree/trunk/connect/mirror as a >> >> > guide, >> >> > I have build from source the origin/KIP-382 of >> >> > https://github.com/apache/kafka.git. >> >> > >> >> > I am seeing 2 issues >> >> > >> >> > * I brought up 2 processes on 2 different nodes ( they are actually >> >> pods on >> >> > k8s but that should not matter ). They share the mm2.properties file >> and >> >> > are replicating ( 1-way ) 3 topics with 8 partitions in total. That >> >> seems >> >> > to be the way to create a standalone mm2 cluster. I do not however >> see( >> >> at >> >> > least the mbeans do not show ) any attempt to rebalance. >> >> > >> >> > >> >> >> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process >> >> > mbeans >> >> > are all on a single node >> >> > >> >> > >> >> > >> >> > * I restart the processes on the 2 nodes ( hard stop ans start ). The >> >> > offsets for replication seem to be reset to the earliest, as if it >> is a >> >> > brand new mirroring. It is also obvious from the >> >> > "record-age-ms-avg|replication-latency-ms-avg" >> >> > which I track through the restart. >> >> > >> >> > >> >> > >> >> > >> >> > This implies that >> >> > >> >> > >> >> > 1. Load balancing by rebalancing is not working. I cannot scale up or >> >> down >> >> > by adding nodes to the mm2 cluster or removing them. >> >> > >> >> > 2. Restore on a mirror is not working. If the MM2 cluster is brought >> >> down, >> >> > it does not start mirroring from the last known state. I see the, >> >> > state/config topics etc created as expected.. >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > The mm2.properties is pretty mimimal >> >> > >> >> > >> >> > *clusters = a , b* >> >> > >> >> > >> >> > >> >> > *a.bootstrap.servers = k.....* >> >> > >> >> > *b.bootstrap.servers = k.....* >> >> > >> >> > >> >> > *# only allow replication dr1 -> dr2* >> >> > >> >> > *a->b.enabled = true* >> >> > >> >> > *a->b.topics = act_search_page* >> >> > >> >> > *a->b.emit.heartbeats.enabled = false* >> >> > >> >> > >> >> > *b->a..enabled = false* >> >> > >> >> > *b->a.emit.heartbeats.enabled = false* >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > What do you think is the issue ? >> >> > >> >> > >> >> > Thanks >> >> > >> >> >> > >> >