>  timed out
while waiting for producer to flush outstanding

Yeah, that's what I'd expect to see if Connect was unable to send records
to the downstream remote topics, e.g. if min.in-sync.replicas were
misconfigured. Given some data seems to arrive, it's possible that
everything is configured correctly but with too much latency to
successfully commit within the default timeouts. You may want to increase
the number of tasks substantially to achieve more parallelism and
throughput.

Ryanne

On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Aah no.. this is more to  it. Note sure if related to the above.
>
>
> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114
>
>
> Is timing out based on
>
> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133
>
> [2019-10-14 18:55:20,820] ERROR
> WorkerSourceTask{id=MirrorSourceConnector-0} Failed to flush, timed out
> while waiting for producer to flush outstanding 36478 messages
> (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
>
>
>
> On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi <vishal.santo...@gmail.com
> >
> wrote:
>
> > I think this might be it.. Could you confirm. It seems to be on the path
> > to commit the offsets.. but not sure...
> >
> > [2019-10-14 15:29:14,531] ERROR Scheduler for MirrorSourceConnector
> caught
> > exception in scheduled task: syncing topic ACLs
> > (org.apache.kafka.connect.mirror.Scheduler:102)
> >
> > java.util.concurrent.ExecutionException:
> > org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer
> is
> > configured on the broker
> >
> >         at
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> >
> >         at
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> >
> >         at
> >
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> >
> >         at
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> >
> >         at
> >
> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273)
> >
> >         at
> >
> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214)
> >
> >         at
> > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
> >
> >         at
> >
> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
> >
> >         at
> >
> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
> >
> >         at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >
> >         at
> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >
> >         at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >
> >         at
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >
> >         at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >
> >         at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >
> >         at java.lang.Thread.run(Thread.java:748)
> >
> > Caused by: org.apache.kafka.common.errors.SecurityDisabledException: No
> > Authorizer is configured on the broker
> >
> > On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan <ryannedo...@gmail.com>
> > wrote:
> >
> >> > I do not have a single record in the offsets topic
> >>
> >> That's definitely not normal. You are correct that without records in
> that
> >> topic, MM2 will restart from EARLIEST. The offsets should be stored
> >> periodically and whenever the connectors gracefully shutdown or restart.
> >>
> >> Is it possible the topics don't have required ACLs or something? Also
> >> note:
> >> Connect wants the offsets topic to have a large number of partitions and
> >> to
> >> be compacted. Though I can't imagine either would prevent commits from
> >> being sent.
> >>
> >> Ryanne
> >>
> >> On Mon, Oct 14, 2019 at 10:46 AM Vishal Santoshi <
> >> vishal.santo...@gmail.com>
> >> wrote:
> >>
> >> > 2nd/restore issue  ( I think I need to solve the offsets topic issue
> >> > before I go with the scale up and down issue )
> >> >
> >> > As you had indicated, I went ahead and created the offsets topic. The
> >> > status of the cluster  ( destination ) is thus
> >> >
> >> > opic# Partitions# BrokersBrokers Spread %Brokers Skew %Brokers Leader
> >> > Skew %# ReplicasUnder Replicated %Leader SizeProducer
> Message/SecSummed
> >> > Recent Offsets
> >> > s8k.checkpoints.internal
> >> > <
> >>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal
> >> >
> >> > 1 3 60 0 0 3 0 0.00 0
> >> > s8k.act_search_page
> >> > <
> >>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page
> >> >
> >> > 1 3 60 0 0 3 0 6675.30 4,166,842
> >> > s8k.act_reach
> >> > <
> >>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach
> >> >
> >> > 1 3 60 0 0 3 0 20657.92 11,579,529
> >> > mm2-status.s8k.internal
> >> > <
> >>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal
> >> >
> >> > 5 5 100 0 0 3 0 0.00 10
> >> > mm2-offsets.s8k_test.internal
> >> > <
> >>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal
> >> >
> >> > 1 3 60 0 0 3 0 0.00 0
> >> > mm2-offsets.s8k.internal
> >> > <
> >>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal
> >> >
> >> > 1 3 60 0 0 3 0 0.00 0
> >> > mm2-configs.s8k.internal
> >> > <
> >>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal
> >> >
> >> > 1 3 60 0 0 3 0 0.00 13
> >> >
> >> > You can see . that we have the  5 ( I created bot the offsets, to be
> >> safe
> >> > for the below )
> >> >
> >> > *clusters = s8k, s8k_test*
> >> >
> >> >
> >> >
> >> > *s8k.bootstrap.servers = .....*
> >> >
> >> > *s8k_test.bootstrap.servers = ......*
> >> >
> >> >
> >> > *# only allow replication dr1 -> dr2*
> >> >
> >> > *s8k->s8k_test.enabled = true*
> >> >
> >> > *s8k->s8k_test.topics = act_search_page|act_reach*
> >> >
> >> > *s8k->s8k_test.emit.heartbeats.enabled = false*
> >> >
> >> >
> >> >
> >> >
> >> > *s8k_test->s8k.enabled = false*
> >> >
> >> > *s8k_test->s8k.emit.heartbeats.enabled = false*
> >> >
> >> > *s8k_test.replication.factor = 3*
> >> >
> >> > *s8k.replication.factor = 3*
> >> >
> >> > *offsets.storage.replication.factor = 3*
> >> >
> >> > *replication.factor = 3*
> >> >
> >> > *replication.policy.separator = .*
> >> >
> >> > *tasks.max = 4*
> >> >
> >> >
> >> >
> >> >
> >> > What seems strange is that I do not have a single record in the
> offsets
> >> > topic.. Is that normal ?   I would imagine that without a record,
> there
> >> is
> >> > no way that a restore would happen.... And that is obvious when I
> >> restart
> >> > the mm2 instance... Find the screenshot attached. In essence the
> latency
> >> > avg lag is reset \when the mm2 instance is reset indicating no restore
> >> but
> >> > restart from EARLIEST... I must be missing some thing simple
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne Dolan <ryannedo...@gmail.com>
> >> > wrote:
> >> >
> >> >> Vishal, the first issue is easy: you must set tasks.max to something
> >> above
> >> >> 1 (the default) in order to achieve any parallelism. This property is
> >> >> passed along to the internal Connect workers. It's unfortunate that
> >> >> Connect
> >> >> is not smart enough to default this property to the number of
> workers.
> >> I
> >> >> suspect that will improve before long.
> >> >>
> >> >> For the second issue, is it possible you are missing the offsets
> >> topic? It
> >> >> should exist alongside the config and status topics. Connect should
> >> create
> >> >> this topic, but there are various reasons this can fail, e.g. if the
> >> >> replication factor is misconfigured. You can try creating this topic
> >> >> manually or changing offsets.storage.replication.factor.
> >> >>
> >> >> Ryanne
> >> >>
> >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal Santoshi <
> >> vishal.santo...@gmail.com>
> >> >> wrote:
> >> >>
> >> >> > Using https://github.com/apache/kafka/tree/trunk/connect/mirror
> as a
> >> >> > guide,
> >> >> > I have build from source the origin/KIP-382 of
> >> >> > https://github.com/apache/kafka.git.
> >> >> >
> >> >> > I am seeing 2 issues
> >> >> >
> >> >> > * I brought up 2 processes on 2 different nodes ( they are actually
> >> >> pods on
> >> >> > k8s but that should not matter ). They share the mm2.properties
> file
> >> and
> >> >> > are replicating ( 1-way ) 3 topics with 8 partitions in total.
> That
> >> >> seems
> >> >> > to be the way to create a standalone mm2 cluster. I do not however
> >> see(
> >> >> at
> >> >> > least the mbeans do not show ) any attempt to rebalance.
> >> >> >
> >> >> >
> >> >>
> >>
> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process
> >> >> > mbeans
> >> >> > are all on a single node
> >> >> >
> >> >> >
> >> >> >
> >> >> > * I restart the processes on the 2 nodes ( hard stop ans start ).
> The
> >> >> > offsets for replication seem to be reset to the earliest, as if it
> >> is a
> >> >> > brand new mirroring. It is also obvious from the
> >> >> > "record-age-ms-avg|replication-latency-ms-avg"
> >> >> > which I track through the restart.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > This implies that
> >> >> >
> >> >> >
> >> >> > 1. Load balancing by rebalancing is not working. I cannot scale up
> or
> >> >> down
> >> >> > by adding nodes to the mm2 cluster or removing them.
> >> >> >
> >> >> > 2. Restore on a mirror is not working. If the MM2 cluster is
> brought
> >> >> down,
> >> >> > it does not start mirroring from the last known state. I see the,
> >> >> > state/config topics etc created as expected..
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > The mm2.properties is pretty mimimal
> >> >> >
> >> >> >
> >> >> > *clusters = a , b*
> >> >> >
> >> >> >
> >> >> >
> >> >> > *a.bootstrap.servers = k.....*
> >> >> >
> >> >> > *b.bootstrap.servers = k.....*
> >> >> >
> >> >> >
> >> >> > *# only allow replication dr1 -> dr2*
> >> >> >
> >> >> > *a->b.enabled = true*
> >> >> >
> >> >> > *a->b.topics = act_search_page*
> >> >> >
> >> >> > *a->b.emit.heartbeats.enabled = false*
> >> >> >
> >> >> >
> >> >> > *b->a..enabled = false*
> >> >> >
> >> >> > *b->a.emit.heartbeats.enabled = false*
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> > What do you think is the issue ?
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> >
> >> >>
> >> >
> >>
> >
>

Reply via email to