Hey Vishal, glad to hear you're making progress.

> 1. It seems though that flushing [...] the producer and setting the
> offset to the compacting topic is not atomic  OR  do we use
> transactions here  ?

You are correct. I'm working on a KIP and PoC to introduce transactions to
Connect for this exact purpose :)

> I think these are 4 threads ( b'coz num.tasks=4 ), and I have 2 topics
with
> 1 partition each. Do I assume this right, as in there are 4 consumer
groups
> ( on CG per thread ) ...

Some details here:
- tasks.max controls the maximum number of tasks created per Connector
instance. Both MirrorSourceConnector and MirrorCheckpointConnector will
create multiple tasks (up to tasks.max), but MirrorHeartbeatConnector only
ever creates a single task. Moreover, there cannot be more tasks than
topic-partitions (for MirrorSourceConnector) or consumer groups (for
MirrorCheckpointConnector). So if you have two topics with one partition
each and 1 consumer group total, you'll have two MirrorSourceConnector
tasks, one MirrorHeartbeatConnector task, and one MirrorCheckpointConnector
tasks, for a total of four. And that's in one direction only: if you have
multiple source->target herders enabled, each will create tasks
independently.
- There are no consumer groups in MM2, technically. The Connect framework
uses the Coordinator API and internal topics to divide tasks among workers
-- not a consumer group per se. The MM2 connectors use the assign() API,
not the subscribe() API, so there are no consumer groups there either. In
fact, they don't commit() either. This is nice, as it eliminates a lot of
the rebalancing problems legacy MirrorMaker has been plagued with. With
MM2, rebalancing only occurs when the number of workers changes or when the
assignments change (e.g. new topics are discovered).

Ryanne

On Tue, Oct 15, 2019 at 10:23 AM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Hey Ryanne,
>
>
>            The test was on topics that had a 7 day retention. Which
> generally implies that the batch size for flush is pretty high ( till the
> consumption becomes current ). The  offset.flush.timeout.ms defaults to 5
> seconds and the code will not send in the offsets if the flush is not
> complete. Increasing that time out did solve the "not sending the offset to
> topic" issue.
>
> Two questions ( I am being greedy here :) )
>
> 1. It seems though that flushing the flushing the producer and setting the
> offset to the compacting topic is not atomic  OR  do we use
> transactions here  ?
>
> 2. I see
>
>  WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 956435
>
>  WorkerSourceTask{id=MirrorSourceConnector-1} flushing 356251
>
>  WorkerSourceTask{id=MirrorCheckpointConnector-2} flushing 0
>
>  WorkerSourceTask{id=MirrorCheckpointConnector-3} flushing 0 outstanding
> messages
>
>
> I think these are 4 threads ( b'coz num.tasks=4 ), and I have 2 topics with
> 1 partition each. Do I assume this right, as in there are 4 consumer groups
> ( on CG per thread ) ...
>
>
>
>
>
> THANKS A LOT
>
>
> Vishal.
>
>
>
> On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
>
> > >  timed out
> > while waiting for producer to flush outstanding
> >
> > Yeah, that's what I'd expect to see if Connect was unable to send records
> > to the downstream remote topics, e.g. if min.in-sync.replicas were
> > misconfigured. Given some data seems to arrive, it's possible that
> > everything is configured correctly but with too much latency to
> > successfully commit within the default timeouts. You may want to increase
> > the number of tasks substantially to achieve more parallelism and
> > throughput.
> >
> > Ryanne
> >
> > On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi <vishal.santo...@gmail.com
> >
> > wrote:
> >
> > > Aah no.. this is more to  it. Note sure if related to the above.
> > >
> > >
> > >
> >
> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114
> > >
> > >
> > > Is timing out based on
> > >
> > >
> >
> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133
> > >
> > > [2019-10-14 18:55:20,820] ERROR
> > > WorkerSourceTask{id=MirrorSourceConnector-0} Failed to flush, timed out
> > > while waiting for producer to flush outstanding 36478 messages
> > > (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
> > >
> > >
> > >
> > > On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi <
> > vishal.santo...@gmail.com
> > > >
> > > wrote:
> > >
> > > > I think this might be it.. Could you confirm. It seems to be on the
> > path
> > > > to commit the offsets.. but not sure...
> > > >
> > > > [2019-10-14 15:29:14,531] ERROR Scheduler for MirrorSourceConnector
> > > caught
> > > > exception in scheduled task: syncing topic ACLs
> > > > (org.apache.kafka.connect.mirror.Scheduler:102)
> > > >
> > > > java.util.concurrent.ExecutionException:
> > > > org.apache.kafka.common.errors.SecurityDisabledException: No
> Authorizer
> > > is
> > > > configured on the broker
> > > >
> > > >         at
> > > >
> > >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> > > >
> > > >         at
> > > >
> > >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> > > >
> > > >         at
> > > >
> > >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> > > >
> > > >         at
> > > >
> > >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> > > >
> > > >         at
> > > >
> > >
> >
> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273)
> > > >
> > > >         at
> > > >
> > >
> >
> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214)
> > > >
> > > >         at
> > > > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
> > > >
> > > >         at
> > > >
> > >
> >
> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
> > > >
> > > >         at
> > > >
> > >
> >
> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
> > > >
> > > >         at
> > > >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > >
> > > >         at
> > > > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> > > >
> > > >         at
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > > >
> > > >         at
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > >
> > > >         at
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > > >
> > > >         at
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > > >
> > > >         at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > Caused by: org.apache.kafka.common.errors.SecurityDisabledException:
> No
> > > > Authorizer is configured on the broker
> > > >
> > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan <ryannedo...@gmail.com
> >
> > > > wrote:
> > > >
> > > >> > I do not have a single record in the offsets topic
> > > >>
> > > >> That's definitely not normal. You are correct that without records
> in
> > > that
> > > >> topic, MM2 will restart from EARLIEST. The offsets should be stored
> > > >> periodically and whenever the connectors gracefully shutdown or
> > restart.
> > > >>
> > > >> Is it possible the topics don't have required ACLs or something?
> Also
> > > >> note:
> > > >> Connect wants the offsets topic to have a large number of partitions
> > and
> > > >> to
> > > >> be compacted. Though I can't imagine either would prevent commits
> from
> > > >> being sent.
> > > >>
> > > >> Ryanne
> > > >>
> > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal Santoshi <
> > > >> vishal.santo...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > 2nd/restore issue  ( I think I need to solve the offsets topic
> issue
> > > >> > before I go with the scale up and down issue )
> > > >> >
> > > >> > As you had indicated, I went ahead and created the offsets topic.
> > The
> > > >> > status of the cluster  ( destination ) is thus
> > > >> >
> > > >> > opic# Partitions# BrokersBrokers Spread %Brokers Skew %Brokers
> > Leader
> > > >> > Skew %# ReplicasUnder Replicated %Leader SizeProducer
> > > Message/SecSummed
> > > >> > Recent Offsets
> > > >> > s8k.checkpoints.internal
> > > >> > <
> > > >>
> > >
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal
> > > >> >
> > > >> > 1 3 60 0 0 3 0 0.00 0
> > > >> > s8k.act_search_page
> > > >> > <
> > > >>
> > >
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page
> > > >> >
> > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842
> > > >> > s8k.act_reach
> > > >> > <
> > > >>
> > >
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach
> > > >> >
> > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529
> > > >> > mm2-status.s8k.internal
> > > >> > <
> > > >>
> > >
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal
> > > >> >
> > > >> > 5 5 100 0 0 3 0 0.00 10
> > > >> > mm2-offsets.s8k_test.internal
> > > >> > <
> > > >>
> > >
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal
> > > >> >
> > > >> > 1 3 60 0 0 3 0 0.00 0
> > > >> > mm2-offsets.s8k.internal
> > > >> > <
> > > >>
> > >
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal
> > > >> >
> > > >> > 1 3 60 0 0 3 0 0.00 0
> > > >> > mm2-configs.s8k.internal
> > > >> > <
> > > >>
> > >
> >
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal
> > > >> >
> > > >> > 1 3 60 0 0 3 0 0.00 13
> > > >> >
> > > >> > You can see . that we have the  5 ( I created bot the offsets, to
> be
> > > >> safe
> > > >> > for the below )
> > > >> >
> > > >> > *clusters = s8k, s8k_test*
> > > >> >
> > > >> >
> > > >> >
> > > >> > *s8k.bootstrap.servers = .....*
> > > >> >
> > > >> > *s8k_test.bootstrap.servers = ......*
> > > >> >
> > > >> >
> > > >> > *# only allow replication dr1 -> dr2*
> > > >> >
> > > >> > *s8k->s8k_test.enabled = true*
> > > >> >
> > > >> > *s8k->s8k_test.topics = act_search_page|act_reach*
> > > >> >
> > > >> > *s8k->s8k_test.emit.heartbeats.enabled = false*
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > *s8k_test->s8k.enabled = false*
> > > >> >
> > > >> > *s8k_test->s8k.emit.heartbeats.enabled = false*
> > > >> >
> > > >> > *s8k_test.replication.factor = 3*
> > > >> >
> > > >> > *s8k.replication.factor = 3*
> > > >> >
> > > >> > *offsets.storage.replication.factor = 3*
> > > >> >
> > > >> > *replication.factor = 3*
> > > >> >
> > > >> > *replication.policy.separator = .*
> > > >> >
> > > >> > *tasks.max = 4*
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > What seems strange is that I do not have a single record in the
> > > offsets
> > > >> > topic.. Is that normal ?   I would imagine that without a record,
> > > there
> > > >> is
> > > >> > no way that a restore would happen.... And that is obvious when I
> > > >> restart
> > > >> > the mm2 instance... Find the screenshot attached. In essence the
> > > latency
> > > >> > avg lag is reset \when the mm2 instance is reset indicating no
> > restore
> > > >> but
> > > >> > restart from EARLIEST... I must be missing some thing simple
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne Dolan <
> ryannedo...@gmail.com
> > >
> > > >> > wrote:
> > > >> >
> > > >> >> Vishal, the first issue is easy: you must set tasks.max to
> > something
> > > >> above
> > > >> >> 1 (the default) in order to achieve any parallelism. This
> property
> > is
> > > >> >> passed along to the internal Connect workers. It's unfortunate
> that
> > > >> >> Connect
> > > >> >> is not smart enough to default this property to the number of
> > > workers.
> > > >> I
> > > >> >> suspect that will improve before long.
> > > >> >>
> > > >> >> For the second issue, is it possible you are missing the offsets
> > > >> topic? It
> > > >> >> should exist alongside the config and status topics. Connect
> should
> > > >> create
> > > >> >> this topic, but there are various reasons this can fail, e.g. if
> > the
> > > >> >> replication factor is misconfigured. You can try creating this
> > topic
> > > >> >> manually or changing offsets.storage.replication.factor.
> > > >> >>
> > > >> >> Ryanne
> > > >> >>
> > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal Santoshi <
> > > >> vishal.santo...@gmail.com>
> > > >> >> wrote:
> > > >> >>
> > > >> >> > Using
> https://github.com/apache/kafka/tree/trunk/connect/mirror
> > > as a
> > > >> >> > guide,
> > > >> >> > I have build from source the origin/KIP-382 of
> > > >> >> > https://github.com/apache/kafka.git.
> > > >> >> >
> > > >> >> > I am seeing 2 issues
> > > >> >> >
> > > >> >> > * I brought up 2 processes on 2 different nodes ( they are
> > actually
> > > >> >> pods on
> > > >> >> > k8s but that should not matter ). They share the mm2.properties
> > > file
> > > >> and
> > > >> >> > are replicating ( 1-way ) 3 topics with 8 partitions in total.
> > > That
> > > >> >> seems
> > > >> >> > to be the way to create a standalone mm2 cluster. I do not
> > however
> > > >> see(
> > > >> >> at
> > > >> >> > least the mbeans do not show ) any attempt to rebalance.
> > > >> >> >
> > > >> >> >
> > > >> >>
> > > >>
> > >
> >
> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process
> > > >> >> > mbeans
> > > >> >> > are all on a single node
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> > * I restart the processes on the 2 nodes ( hard stop ans start
> ).
> > > The
> > > >> >> > offsets for replication seem to be reset to the earliest, as if
> > it
> > > >> is a
> > > >> >> > brand new mirroring. It is also obvious from the
> > > >> >> > "record-age-ms-avg|replication-latency-ms-avg"
> > > >> >> > which I track through the restart.
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> > This implies that
> > > >> >> >
> > > >> >> >
> > > >> >> > 1. Load balancing by rebalancing is not working. I cannot scale
> > up
> > > or
> > > >> >> down
> > > >> >> > by adding nodes to the mm2 cluster or removing them.
> > > >> >> >
> > > >> >> > 2. Restore on a mirror is not working. If the MM2 cluster is
> > > brought
> > > >> >> down,
> > > >> >> > it does not start mirroring from the last known state. I see
> the,
> > > >> >> > state/config topics etc created as expected..
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> > The mm2.properties is pretty mimimal
> > > >> >> >
> > > >> >> >
> > > >> >> > *clusters = a , b*
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> > *a.bootstrap.servers = k.....*
> > > >> >> >
> > > >> >> > *b.bootstrap.servers = k.....*
> > > >> >> >
> > > >> >> >
> > > >> >> > *# only allow replication dr1 -> dr2*
> > > >> >> >
> > > >> >> > *a->b.enabled = true*
> > > >> >> >
> > > >> >> > *a->b.topics = act_search_page*
> > > >> >> >
> > > >> >> > *a->b.emit.heartbeats.enabled = false*
> > > >> >> >
> > > >> >> >
> > > >> >> > *b->a..enabled = false*
> > > >> >> >
> > > >> >> > *b->a.emit.heartbeats.enabled = false*
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> >
> > > >> >> > What do you think is the issue ?
> > > >> >> >
> > > >> >> >
> > > >> >> > Thanks
> > > >> >> >
> > > >> >>
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to