Hey Ryanne,

           The test was on topics that had a 7 day retention. Which
generally implies that the batch size for flush is pretty high ( till the
consumption becomes current ). The  offset.flush.timeout.ms defaults to 5
seconds and the code will not send in the offsets if the flush is not
complete. Increasing that time out did solve the "not sending the offset to
topic" issue.

Two questions ( I am being greedy here :) )

1. It seems though that flushing the flushing the producer and setting the
offset to the compacting topic is not atomic  OR  do we use
transactions here  ?

2. I see

 WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 956435

 WorkerSourceTask{id=MirrorSourceConnector-1} flushing 356251

 WorkerSourceTask{id=MirrorCheckpointConnector-2} flushing 0

 WorkerSourceTask{id=MirrorCheckpointConnector-3} flushing 0 outstanding
messages


I think these are 4 threads ( b'coz num.tasks=4 ), and I have 2 topics with
1 partition each. Do I assume this right, as in there are 4 consumer groups
( on CG per thread ) ...





THANKS A LOT


Vishal.



On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:

> >  timed out
> while waiting for producer to flush outstanding
>
> Yeah, that's what I'd expect to see if Connect was unable to send records
> to the downstream remote topics, e.g. if min.in-sync.replicas were
> misconfigured. Given some data seems to arrive, it's possible that
> everything is configured correctly but with too much latency to
> successfully commit within the default timeouts. You may want to increase
> the number of tasks substantially to achieve more parallelism and
> throughput.
>
> Ryanne
>
> On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
> > Aah no.. this is more to  it. Note sure if related to the above.
> >
> >
> >
> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114
> >
> >
> > Is timing out based on
> >
> >
> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133
> >
> > [2019-10-14 18:55:20,820] ERROR
> > WorkerSourceTask{id=MirrorSourceConnector-0} Failed to flush, timed out
> > while waiting for producer to flush outstanding 36478 messages
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
> >
> >
> >
> > On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi <
> vishal.santo...@gmail.com
> > >
> > wrote:
> >
> > > I think this might be it.. Could you confirm. It seems to be on the
> path
> > > to commit the offsets.. but not sure...
> > >
> > > [2019-10-14 15:29:14,531] ERROR Scheduler for MirrorSourceConnector
> > caught
> > > exception in scheduled task: syncing topic ACLs
> > > (org.apache.kafka.connect.mirror.Scheduler:102)
> > >
> > > java.util.concurrent.ExecutionException:
> > > org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer
> > is
> > > configured on the broker
> > >
> > >         at
> > >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> > >
> > >         at
> > >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> > >
> > >         at
> > >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> > >
> > >         at
> > >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> > >
> > >         at
> > >
> >
> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273)
> > >
> > >         at
> > >
> >
> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214)
> > >
> > >         at
> > > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
> > >
> > >         at
> > >
> >
> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
> > >
> > >         at
> > >
> >
> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
> > >
> > >         at
> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > >
> > >         at
> > > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> > >
> > >         at
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > >
> > >         at
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > >
> > >         at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > >
> > >         at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > >
> > >         at java.lang.Thread.run(Thread.java:748)
> > >
> > > Caused by: org.apache.kafka.common.errors.SecurityDisabledException: No
> > > Authorizer is configured on the broker
> > >
> > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan <ryannedo...@gmail.com>
> > > wrote:
> > >
> > >> > I do not have a single record in the offsets topic
> > >>
> > >> That's definitely not normal. You are correct that without records in
> > that
> > >> topic, MM2 will restart from EARLIEST. The offsets should be stored
> > >> periodically and whenever the connectors gracefully shutdown or
> restart.
> > >>
> > >> Is it possible the topics don't have required ACLs or something? Also
> > >> note:
> > >> Connect wants the offsets topic to have a large number of partitions
> and
> > >> to
> > >> be compacted. Though I can't imagine either would prevent commits from
> > >> being sent.
> > >>
> > >> Ryanne
> > >>
> > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal Santoshi <
> > >> vishal.santo...@gmail.com>
> > >> wrote:
> > >>
> > >> > 2nd/restore issue  ( I think I need to solve the offsets topic issue
> > >> > before I go with the scale up and down issue )
> > >> >
> > >> > As you had indicated, I went ahead and created the offsets topic.
> The
> > >> > status of the cluster  ( destination ) is thus
> > >> >
> > >> > opic# Partitions# BrokersBrokers Spread %Brokers Skew %Brokers
> Leader
> > >> > Skew %# ReplicasUnder Replicated %Leader SizeProducer
> > Message/SecSummed
> > >> > Recent Offsets
> > >> > s8k.checkpoints.internal
> > >> > <
> > >>
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal
> > >> >
> > >> > 1 3 60 0 0 3 0 0.00 0
> > >> > s8k.act_search_page
> > >> > <
> > >>
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page
> > >> >
> > >> > 1 3 60 0 0 3 0 6675.30 4,166,842
> > >> > s8k.act_reach
> > >> > <
> > >>
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach
> > >> >
> > >> > 1 3 60 0 0 3 0 20657.92 11,579,529
> > >> > mm2-status.s8k.internal
> > >> > <
> > >>
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal
> > >> >
> > >> > 5 5 100 0 0 3 0 0.00 10
> > >> > mm2-offsets.s8k_test.internal
> > >> > <
> > >>
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal
> > >> >
> > >> > 1 3 60 0 0 3 0 0.00 0
> > >> > mm2-offsets.s8k.internal
> > >> > <
> > >>
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal
> > >> >
> > >> > 1 3 60 0 0 3 0 0.00 0
> > >> > mm2-configs.s8k.internal
> > >> > <
> > >>
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal
> > >> >
> > >> > 1 3 60 0 0 3 0 0.00 13
> > >> >
> > >> > You can see . that we have the  5 ( I created bot the offsets, to be
> > >> safe
> > >> > for the below )
> > >> >
> > >> > *clusters = s8k, s8k_test*
> > >> >
> > >> >
> > >> >
> > >> > *s8k.bootstrap.servers = .....*
> > >> >
> > >> > *s8k_test.bootstrap.servers = ......*
> > >> >
> > >> >
> > >> > *# only allow replication dr1 -> dr2*
> > >> >
> > >> > *s8k->s8k_test.enabled = true*
> > >> >
> > >> > *s8k->s8k_test.topics = act_search_page|act_reach*
> > >> >
> > >> > *s8k->s8k_test.emit.heartbeats.enabled = false*
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > *s8k_test->s8k.enabled = false*
> > >> >
> > >> > *s8k_test->s8k.emit.heartbeats.enabled = false*
> > >> >
> > >> > *s8k_test.replication.factor = 3*
> > >> >
> > >> > *s8k.replication.factor = 3*
> > >> >
> > >> > *offsets.storage.replication.factor = 3*
> > >> >
> > >> > *replication.factor = 3*
> > >> >
> > >> > *replication.policy.separator = .*
> > >> >
> > >> > *tasks.max = 4*
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > What seems strange is that I do not have a single record in the
> > offsets
> > >> > topic.. Is that normal ?   I would imagine that without a record,
> > there
> > >> is
> > >> > no way that a restore would happen.... And that is obvious when I
> > >> restart
> > >> > the mm2 instance... Find the screenshot attached. In essence the
> > latency
> > >> > avg lag is reset \when the mm2 instance is reset indicating no
> restore
> > >> but
> > >> > restart from EARLIEST... I must be missing some thing simple
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne Dolan <ryannedo...@gmail.com
> >
> > >> > wrote:
> > >> >
> > >> >> Vishal, the first issue is easy: you must set tasks.max to
> something
> > >> above
> > >> >> 1 (the default) in order to achieve any parallelism. This property
> is
> > >> >> passed along to the internal Connect workers. It's unfortunate that
> > >> >> Connect
> > >> >> is not smart enough to default this property to the number of
> > workers.
> > >> I
> > >> >> suspect that will improve before long.
> > >> >>
> > >> >> For the second issue, is it possible you are missing the offsets
> > >> topic? It
> > >> >> should exist alongside the config and status topics. Connect should
> > >> create
> > >> >> this topic, but there are various reasons this can fail, e.g. if
> the
> > >> >> replication factor is misconfigured. You can try creating this
> topic
> > >> >> manually or changing offsets.storage.replication.factor.
> > >> >>
> > >> >> Ryanne
> > >> >>
> > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal Santoshi <
> > >> vishal.santo...@gmail.com>
> > >> >> wrote:
> > >> >>
> > >> >> > Using https://github.com/apache/kafka/tree/trunk/connect/mirror
> > as a
> > >> >> > guide,
> > >> >> > I have build from source the origin/KIP-382 of
> > >> >> > https://github.com/apache/kafka.git.
> > >> >> >
> > >> >> > I am seeing 2 issues
> > >> >> >
> > >> >> > * I brought up 2 processes on 2 different nodes ( they are
> actually
> > >> >> pods on
> > >> >> > k8s but that should not matter ). They share the mm2.properties
> > file
> > >> and
> > >> >> > are replicating ( 1-way ) 3 topics with 8 partitions in total.
> > That
> > >> >> seems
> > >> >> > to be the way to create a standalone mm2 cluster. I do not
> however
> > >> see(
> > >> >> at
> > >> >> > least the mbeans do not show ) any attempt to rebalance.
> > >> >> >
> > >> >> >
> > >> >>
> > >>
> >
> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process
> > >> >> > mbeans
> > >> >> > are all on a single node
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> > * I restart the processes on the 2 nodes ( hard stop ans start ).
> > The
> > >> >> > offsets for replication seem to be reset to the earliest, as if
> it
> > >> is a
> > >> >> > brand new mirroring. It is also obvious from the
> > >> >> > "record-age-ms-avg|replication-latency-ms-avg"
> > >> >> > which I track through the restart.
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> > This implies that
> > >> >> >
> > >> >> >
> > >> >> > 1. Load balancing by rebalancing is not working. I cannot scale
> up
> > or
> > >> >> down
> > >> >> > by adding nodes to the mm2 cluster or removing them.
> > >> >> >
> > >> >> > 2. Restore on a mirror is not working. If the MM2 cluster is
> > brought
> > >> >> down,
> > >> >> > it does not start mirroring from the last known state. I see the,
> > >> >> > state/config topics etc created as expected..
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> > The mm2.properties is pretty mimimal
> > >> >> >
> > >> >> >
> > >> >> > *clusters = a , b*
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> > *a.bootstrap.servers = k.....*
> > >> >> >
> > >> >> > *b.bootstrap.servers = k.....*
> > >> >> >
> > >> >> >
> > >> >> > *# only allow replication dr1 -> dr2*
> > >> >> >
> > >> >> > *a->b.enabled = true*
> > >> >> >
> > >> >> > *a->b.topics = act_search_page*
> > >> >> >
> > >> >> > *a->b.emit.heartbeats.enabled = false*
> > >> >> >
> > >> >> >
> > >> >> > *b->a..enabled = false*
> > >> >> >
> > >> >> > *b->a.emit.heartbeats.enabled = false*
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> > What do you think is the issue ?
> > >> >> >
> > >> >> >
> > >> >> > Thanks
> > >> >> >
> > >> >>
> > >> >
> > >>
> > >
> >
>

Reply via email to