Vishal, the number of tasks created per source->target herder is determined
by both tasks.max and the total number of topic-partitions being
replicated. In order to use all 12 worker nodes, you'd need tasks.max >= 12
and number of topic-partitions >= 12. From previous emails it sounds like
you have a small number of topic-partitions total (i.e. a small number of
topics with a small number of partitions per topic), so I'm guessing that's
the reason you aren't seeing more tasks being created.

Ryanne

On Sat, Oct 19, 2019 at 1:28 AM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Here is what I see
>
> * The max tasks are a a cap  on a Connector across the cluster. If have 8
> VMs but 8 max tasks my assumption  that there would be 8 * 8 = 72 task
> threads was
> wring. The logs showed that the partitions were consumed by  8 threads on
> the 8 VMs ( 1 per VM ) which was highly un optimal.  When I scaled the
> VMs to 12, it did not matter, as the max tasks still prevented any further
> distribution.
>
> *  If I cancel/resume the cluster with a max task of 48 ( keeping the same
> job name and thus connector definition   the max tasks does not change, as
> in
>  it seems to keep the same number of max task  threads limit ( as in 8 )
>
> * I can bring down a VM and see the task migrate to a free VM but the
> overall count of task threads remain the same.
>
>
> In essence, the num of tasks is a cap on threads in the cluster per
> connector, A connector is a source->sink pair that spans a cluster. Thus if
> we have a
> A->B DAG and  max tasks of 8, then there will be no more that 8 Source
> Tasks  ( threads ) no matter how big the cluster is, It thus makes sense to
> over provision ( within limits of a single VM ) on the max tasks to allow
> for adding more VMs for scale up.....
>
>
>
> On Fri, Oct 18, 2019 at 8:04 PM Vishal Santoshi <vishal.santo...@gmail.com
> >
> wrote:
>
> > I misspoke
> >
> > >> I now have 8 VMs 8 cpus with 48 max tasks and it did spread to the the
> > 8  VMs. I then upscaled to 12 VMs and the tasks *have not *migrated as I
> > would expect .
> >
> >
> >
> >
> > On Fri, Oct 18, 2019 at 8:00 PM Vishal Santoshi <
> vishal.santo...@gmail.com>
> > wrote:
> >
> >> OK, You will have to explain :)
> >>
> >> I had 12 VMs with 8 cpus and 8 max tasks.  I thought let me give a CPU
> to
> >> each task, which I presumed is a java thread ( even though I know the
> >> thread would be mostly ip bound ). . I saw the issue I pointed up.
> >> *I now have 8 VMs 8 cpus with 48 max tasks and it did spread to the the
> >> 8  VMs. I then upscaled to 12 VMs and the tasks migrated as I would
> expect
> >> .*
> >>
> >>  I know that a VM will have MirrorSourceConnector and
> >> MirrorHeartbeatConnector tasks up till  tasks.max.  So a few questions
> >>
> >>
> >>
> >> * When we say there are 48 max tasks, are we saying there are  48
> threads
> >> ( in fact 96, each for the 2 groups above,  worst case  + 2 ) ?
> >> * When we talk about Connector, are we talking about a JVM process, as
> in
> >> a Connector is a JVM process ?
> >> * Why larger number of tasks.max help the spread  ?  As in I would
> assume
> >> there are up till 8 tasks ( or 16 )  per VM but how that should not have
> >> prevented  re assignment  on a scale up ( as it clearly did ) ?
> >>
> >> The reason I ask is that I plan to run mm2 cluster on  k8s and I want to
> >> make sure that I use the version of JVM that is more docker friendly
> vis a
> >> vis, how many cpus it believes it has  and as explained here
> >>
> https://blog.softwaremill.com/docker-support-in-new-java-8-finally-fd595df0ca54
> >>
> >>
> >>
> >>
> >> On Fri, Oct 18, 2019 at 4:15 PM Ryanne Dolan <ryannedo...@gmail.com>
> >> wrote:
> >>
> >>> What is tasks.max? Consider bumping to something like 48 if you're
> >>> running
> >>> on a dozen nodes.
> >>>
> >>> Ryanne
> >>>
> >>> On Fri, Oct 18, 2019, 1:43 PM Vishal Santoshi <
> vishal.santo...@gmail.com
> >>> >
> >>> wrote:
> >>>
> >>> > Hey Ryanne,
> >>> >
> >>> >
> >>> >             I see a definite issue. I am doing an intense test and I
> >>> bring
> >>> > up 12 VMs ( they are 12 pods with 8 cpus each ), replicating about
> 1200
> >>> > plus topics ( fairly heavy 100mbps ) ... They are acquired and are
> >>> > staggered as they come up..I see a fraction of these nodes not
> >>> assigned any
> >>> > replication....There is plenty to go around. ( more then a couple of
> >>> > thousand partitions ) .   is there something I am missing.... As in
> my
> >>> > current case 5 of the 12 VMs are idle..
> >>> >
> >>> > Vishal
> >>> >
> >>> > On Fri, Oct 18, 2019 at 7:05 AM Vishal Santoshi <
> >>> vishal.santo...@gmail.com
> >>> > >
> >>> > wrote:
> >>> >
> >>> > > Oh sorry a. COUNTER... is more like it....
> >>> > >
> >>> > > On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi <
> >>> vishal.santo...@gmail.com
> >>> > >
> >>> > > wrote:
> >>> > >
> >>> > >> Will do
> >>> > >>     One more thing the age/latency metrics seem to be analogous as
> >>> in
> >>> > >> they seem to be calculated using similar routines.  I would think
> a
> >>> > metric
> >>> > >> tracking
> >>> > >> the number of flush failures ( as a GAUGE )  given
> >>> > >> offset.flush.timeout.ms would be highly beneficial.
> >>> > >>
> >>> > >> Regards..
> >>> > >>
> >>> > >>
> >>> > >> On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan <
> >>> ryannedo...@gmail.com>
> >>> > >> wrote:
> >>> > >>
> >>> > >>> Ah, I see you are correct. Also I misspoke saying "workers"
> >>> earlier, as
> >>> > >>> the
> >>> > >>> consumer is not created by the worker, but the task.
> >>> > >>>
> >>> > >>> I suppose the put() could be changed to putIfAbsent() here to
> >>> enable
> >>> > this
> >>> > >>> property to be changed. Maybe submit a PR?
> >>> > >>>
> >>> > >>> Ryanne
> >>> > >>>
> >>> > >>> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi <
> >>> > >>> vishal.santo...@gmail.com>
> >>> > >>> wrote:
> >>> > >>>
> >>> > >>> > Hmm  ( I did both )
> >>> > >>> >
> >>> > >>> > another->another_test.enabled = true
> >>> > >>> >
> >>> > >>> > another->another_test.topics = act_post
> >>> > >>> >
> >>> > >>> > another->another_test.emit.heartbeats.enabled = false
> >>> > >>> >
> >>> > >>> > another->another_test.consumer.auto.offset.reset = latest
> >>> > >>> >
> >>> > >>> > another->another_test.sync.topic.acls.enabled = false
> >>> > >>> >
> >>> > >>> > another.consumer.auto.offset.reset = latest
> >>> > >>> >
> >>> > >>> >
> >>> > >>> >
> >>> > >>> > When I grep for the ConsumerConfig ( and there are 8 instances,
> >>> this
> >>> > >>> topic
> >>> > >>> > has 4 partitions )
> >>> > >>> >
> >>> > >>> >
> >>> > >>> >  [2019-10-17 14:01:21,879] INFO ConsumerConfig values:
> >>> > >>> >
> >>> > >>> > allow.auto.create.topics = true
> >>> > >>> >
> >>> > >>> > auto.commit.interval.ms = 5000
> >>> > >>> >
> >>> > >>> > *auto.offset.reset* = earliest
> >>> > >>> >
> >>> > >>> >
> >>> > >>> > I am now using the 2.4 branch from kafka trunk
> >>> > >>> > https://github.com/apache/kafka/tree/2.4/connect/mirror
> >>> > >>> >
> >>> > >>> >
> >>> > >>> > This code change works and makes sense.. I think all other
> >>> settings
> >>> > >>> will be
> >>> > >>> > fine ( as can be overridden )  but for the 2 below..
> >>> > >>> >
> >>> > >>> >
> >>> > >>> > *---
> >>> > >>> >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
> >>> > >>> >
> >>> > >>> > *+++
> >>> > >>> >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
> >>> > >>> >
> >>> > >>> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig extends
> >>> > >>> > AbstractConfig {
> >>> > >>> >
> >>> > >>> >
> >>> > >>> >
> >>> >
> props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
> >>> > >>> >
> >>> > >>> >
> >>> props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
> >>> > >>> >
> >>> > >>> >          props.put("enable.auto.commit", "false");
> >>> > >>> >
> >>> > >>> > -        props.put("auto.offset.reset", "earliest");
> >>> > >>> >
> >>> > >>> > +        props.put("auto.offset.reset", "latest");
> >>> > >>> >
> >>> > >>> >          return props;
> >>> > >>> >
> >>> > >>> >      }
> >>> > >>> >
> >>> > >>> >
> >>> > >>> > Regards.
> >>> > >>> >
> >>> > >>> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan <
> >>> ryannedo...@gmail.com>
> >>> > >>> > wrote:
> >>> > >>> >
> >>> > >>> > > Vishal, you should be able to override the properties passed
> >>> to the
> >>> > >>> > > internal workers using properties like
> >>> > >>> A->B.consumer.auto.offset.reset or
> >>> > >>> > > A.consumer.auto.offset.reset in the mm2.properties file.
> >>> Certain
> >>> > >>> > top-level
> >>> > >>> > > properties like tasks.max are honored without the A->B or A
> >>> prefix,
> >>> > >>> but
> >>> > >>> > > auto.offset.reset is not one of them.
> >>> > >>> > >
> >>> > >>> > > Ryanne
> >>> > >>> > >
> >>> > >>> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi <
> >>> > >>> > vishal.santo...@gmail.com
> >>> > >>> > > >
> >>> > >>> > > wrote:
> >>> > >>> > >
> >>> > >>> > > > Hey Ryanne,
> >>> > >>> > > >
> >>> > >>> > > >
> >>> > >>> > > >     How do I override auto.offset.reset = latest for
> >>> consumers
> >>> > >>> through
> >>> > >>> > > > mm2.properties. I have tried straight up .
> auto.offset.reset
> >>> and
> >>> > >>> > > consumer.
> >>> > >>> > > > auto.offset.reset  but it defaults to earliest.. I do have
> a
> >>> > query
> >>> > >>> in
> >>> > >>> > > > another thread but though you might know off hand..
> >>> > >>> > > >
> >>> > >>> > > > I would imagine there is some way in general of overriding
> >>> > >>> consumer and
> >>> > >>> > > > producer configs through mm2.properties in MM2 ?
> >>> > >>> > > >
> >>> > >>> > > > Regards.
> >>> > >>> > > >
> >>> > >>> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi <
> >>> > >>> > > vishal.santo...@gmail.com
> >>> > >>> > > > >
> >>> > >>> > > > wrote:
> >>> > >>> > > >
> >>> > >>> > > > > Thank you so much for all your help.  Will keep you
> posted
> >>> on
> >>> > >>> tests I
> >>> > >>> > > > do..
> >>> > >>> > > > > I hope this is helpful to other folks too..
> >>> > >>> > > > >
> >>> > >>> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan <
> >>> > >>> ryannedo...@gmail.com>
> >>> > >>> > > > > wrote:
> >>> > >>> > > > >
> >>> > >>> > > > >> That's right. MM2 is at-least-once for now, same as
> legacy
> >>> > >>> > > MirrorMaker.
> >>> > >>> > > > >> You
> >>> > >>> > > > >> can follow
> >>> https://issues.apache.org/jira/browse/KAFKA-6080
> >>> > for
> >>> > >>> > > updates
> >>> > >>> > > > >> on
> >>> > >>> > > > >> exactly-once semantics in Connect.
> >>> > >>> > > > >>
> >>> > >>> > > > >> Ryanne
> >>> > >>> > > > >>
> >>> > >>> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi <
> >>> > >>> > > > >> vishal.santo...@gmail.com>
> >>> > >>> > > > >> wrote:
> >>> > >>> > > > >>
> >>> > >>> > > > >> >   >> You are correct. I'm working on a KIP and PoC to
> >>> > >>> introduce
> >>> > >>> > > > >> > transactions to
> >>> > >>> > > > >> > >> Connect for this exact purpose :)
> >>> > >>> > > > >> >
> >>> > >>> > > > >> > That is awesome. Any time frame ?
> >>> > >>> > > > >> >
> >>> > >>> > > > >> >
> >>> > >>> > > > >> > In the mean time the SLA as of now
> >>> > >>> > > > >> >
> >>> > >>> > > > >> > 1. It is conceivable that we flush the producer to the
> >>> > target
> >>> > >>> > > cluster
> >>> > >>> > > > >> but
> >>> > >>> > > > >> > fail to offset commit. If there was a restart before
> the
> >>> > next
> >>> > >>> > > > successful
> >>> > >>> > > > >> > offset commit, there will be  duplicates  and a part
> of
> >>> data
> >>> > >>> is
> >>> > >>> > > > >> replayed (
> >>> > >>> > > > >> > at least once ) ?
> >>> > >>> > > > >> >
> >>> > >>> > > > >> > 2. The same can be said about  partial flushes, though
> >>> am
> >>> > not
> >>> > >>> sure
> >>> > >>> > > > about
> >>> > >>> > > > >> > how kafka addresses flush ( Is a flush either success
> >>> or a
> >>> > >>> > failure,
> >>> > >>> > > > and
> >>> > >>> > > > >> > nothing in between )
> >>> > >>> > > > >> >
> >>> > >>> > > > >> > Thanks..
> >>> > >>> > > > >> >
> >>> > >>> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan <
> >>> > >>> > > ryannedo...@gmail.com>
> >>> > >>> > > > >> > wrote:
> >>> > >>> > > > >> >
> >>> > >>> > > > >> > > Hey Vishal, glad to hear you're making progress.
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> > > > 1. It seems though that flushing [...] the
> producer
> >>> and
> >>> > >>> > setting
> >>> > >>> > > > the
> >>> > >>> > > > >> > > > offset to the compacting topic is not atomic  OR
> >>> do we
> >>> > >>> use
> >>> > >>> > > > >> > > > transactions here  ?
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> > > You are correct. I'm working on a KIP and PoC to
> >>> introduce
> >>> > >>> > > > >> transactions
> >>> > >>> > > > >> > to
> >>> > >>> > > > >> > > Connect for this exact purpose :)
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ),
> >>> and I
> >>> > >>> have
> >>> > >>> > 2
> >>> > >>> > > > >> topics
> >>> > >>> > > > >> > > with
> >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as in
> >>> there
> >>> > are
> >>> > >>> 4
> >>> > >>> > > > consumer
> >>> > >>> > > > >> > > groups
> >>> > >>> > > > >> > > > ( on CG per thread ) ...
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> > > Some details here:
> >>> > >>> > > > >> > > - tasks.max controls the maximum number of tasks
> >>> created
> >>> > per
> >>> > >>> > > > Connector
> >>> > >>> > > > >> > > instance. Both MirrorSourceConnector and
> >>> > >>> > MirrorCheckpointConnector
> >>> > >>> > > > >> will
> >>> > >>> > > > >> > > create multiple tasks (up to tasks.max), but
> >>> > >>> > > > MirrorHeartbeatConnector
> >>> > >>> > > > >> > only
> >>> > >>> > > > >> > > ever creates a single task. Moreover, there cannot
> be
> >>> more
> >>> > >>> tasks
> >>> > >>> > > > than
> >>> > >>> > > > >> > > topic-partitions (for MirrorSourceConnector) or
> >>> consumer
> >>> > >>> groups
> >>> > >>> > > (for
> >>> > >>> > > > >> > > MirrorCheckpointConnector). So if you have two
> topics
> >>> with
> >>> > >>> one
> >>> > >>> > > > >> partition
> >>> > >>> > > > >> > > each and 1 consumer group total, you'll have two
> >>> > >>> > > > MirrorSourceConnector
> >>> > >>> > > > >> > > tasks, one MirrorHeartbeatConnector task, and one
> >>> > >>> > > > >> > MirrorCheckpointConnector
> >>> > >>> > > > >> > > tasks, for a total of four. And that's in one
> >>> direction
> >>> > >>> only: if
> >>> > >>> > > you
> >>> > >>> > > > >> have
> >>> > >>> > > > >> > > multiple source->target herders enabled, each will
> >>> create
> >>> > >>> tasks
> >>> > >>> > > > >> > > independently.
> >>> > >>> > > > >> > > - There are no consumer groups in MM2, technically.
> >>> The
> >>> > >>> Connect
> >>> > >>> > > > >> framework
> >>> > >>> > > > >> > > uses the Coordinator API and internal topics to
> divide
> >>> > tasks
> >>> > >>> > among
> >>> > >>> > > > >> > workers
> >>> > >>> > > > >> > > -- not a consumer group per se. The MM2 connectors
> >>> use the
> >>> > >>> > > assign()
> >>> > >>> > > > >> API,
> >>> > >>> > > > >> > > not the subscribe() API, so there are no consumer
> >>> groups
> >>> > >>> there
> >>> > >>> > > > >> either. In
> >>> > >>> > > > >> > > fact, they don't commit() either. This is nice, as
> it
> >>> > >>> > eliminates a
> >>> > >>> > > > >> lot of
> >>> > >>> > > > >> > > the rebalancing problems legacy MirrorMaker has been
> >>> > plagued
> >>> > >>> > with.
> >>> > >>> > > > >> With
> >>> > >>> > > > >> > > MM2, rebalancing only occurs when the number of
> >>> workers
> >>> > >>> changes
> >>> > >>> > or
> >>> > >>> > > > >> when
> >>> > >>> > > > >> > the
> >>> > >>> > > > >> > > assignments change (e.g. new topics are discovered).
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> > > Ryanne
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal Santoshi <
> >>> > >>> > > > >> > > vishal.santo...@gmail.com>
> >>> > >>> > > > >> > > wrote:
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> > > > Hey Ryanne,
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >            The test was on topics that had a 7 day
> >>> > >>> retention.
> >>> > >>> > > > Which
> >>> > >>> > > > >> > > > generally implies that the batch size for flush is
> >>> > pretty
> >>> > >>> > high (
> >>> > >>> > > > >> till
> >>> > >>> > > > >> > the
> >>> > >>> > > > >> > > > consumption becomes current ). The
> >>> > >>> offset.flush.timeout.ms
> >>> > >>> > > > >> defaults
> >>> > >>> > > > >> > to
> >>> > >>> > > > >> > > 5
> >>> > >>> > > > >> > > > seconds and the code will not send in the offsets
> >>> if the
> >>> > >>> flush
> >>> > >>> > > is
> >>> > >>> > > > >> not
> >>> > >>> > > > >> > > > complete. Increasing that time out did solve the
> >>> "not
> >>> > >>> sending
> >>> > >>> > > the
> >>> > >>> > > > >> > offset
> >>> > >>> > > > >> > > to
> >>> > >>> > > > >> > > > topic" issue.
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > > Two questions ( I am being greedy here :) )
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > > 1. It seems though that flushing the flushing the
> >>> > >>> producer and
> >>> > >>> > > > >> setting
> >>> > >>> > > > >> > > the
> >>> > >>> > > > >> > > > offset to the compacting topic is not atomic  OR
> >>> do we
> >>> > >>> use
> >>> > >>> > > > >> > > > transactions here  ?
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > > 2. I see
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorHeartbeatConnector-0}
> >>> > flushing
> >>> > >>> > 956435
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorSourceConnector-1}
> >>> flushing
> >>> > >>> 356251
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorCheckpointConnector-2}
> >>> > >>> flushing 0
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorCheckpointConnector-3}
> >>> > >>> flushing 0
> >>> > >>> > > > >> > outstanding
> >>> > >>> > > > >> > > > messages
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ),
> >>> and I
> >>> > >>> have
> >>> > >>> > 2
> >>> > >>> > > > >> topics
> >>> > >>> > > > >> > > with
> >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as in
> >>> there
> >>> > are
> >>> > >>> 4
> >>> > >>> > > > consumer
> >>> > >>> > > > >> > > groups
> >>> > >>> > > > >> > > > ( on CG per thread ) ...
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > > THANKS A LOT
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > > Vishal.
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan <
> >>> > >>> > > > ryannedo...@gmail.com
> >>> > >>> > > > >> >
> >>> > >>> > > > >> > > > wrote:
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > > > > >  timed out
> >>> > >>> > > > >> > > > > while waiting for producer to flush outstanding
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > > > Yeah, that's what I'd expect to see if Connect
> was
> >>> > >>> unable to
> >>> > >>> > > > send
> >>> > >>> > > > >> > > records
> >>> > >>> > > > >> > > > > to the downstream remote topics, e.g. if
> >>> > >>> > min.in-sync.replicas
> >>> > >>> > > > were
> >>> > >>> > > > >> > > > > misconfigured. Given some data seems to arrive,
> >>> it's
> >>> > >>> > possible
> >>> > >>> > > > that
> >>> > >>> > > > >> > > > > everything is configured correctly but with too
> >>> much
> >>> > >>> latency
> >>> > >>> > > to
> >>> > >>> > > > >> > > > > successfully commit within the default timeouts.
> >>> You
> >>> > may
> >>> > >>> > want
> >>> > >>> > > to
> >>> > >>> > > > >> > > increase
> >>> > >>> > > > >> > > > > the number of tasks substantially to achieve
> more
> >>> > >>> > parallelism
> >>> > >>> > > > and
> >>> > >>> > > > >> > > > > throughput.
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > > > Ryanne
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi <
> >>> > >>> > > > >> > > vishal.santo...@gmail.com
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > > > wrote:
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > > > > Aah no.. this is more to  it. Note sure if
> >>> related
> >>> > to
> >>> > >>> the
> >>> > >>> > > > above.
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > > Is timing out based on
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR
> >>> > >>> > > > >> > > > > > WorkerSourceTask{id=MirrorSourceConnector-0}
> >>> Failed
> >>> > to
> >>> > >>> > > flush,
> >>> > >>> > > > >> timed
> >>> > >>> > > > >> > > out
> >>> > >>> > > > >> > > > > > while waiting for producer to flush
> outstanding
> >>> > 36478
> >>> > >>> > > messages
> >>> > >>> > > > >> > > > > >
> >>> > >>> (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal
> Santoshi
> >>> <
> >>> > >>> > > > >> > > > > vishal.santo...@gmail.com
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > wrote:
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > > > I think this might be it.. Could you
> confirm.
> >>> It
> >>> > >>> seems
> >>> > >>> > to
> >>> > >>> > > be
> >>> > >>> > > > >> on
> >>> > >>> > > > >> > the
> >>> > >>> > > > >> > > > > path
> >>> > >>> > > > >> > > > > > > to commit the offsets.. but not sure...
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR Scheduler
> for
> >>> > >>> > > > >> > MirrorSourceConnector
> >>> > >>> > > > >> > > > > > caught
> >>> > >>> > > > >> > > > > > > exception in scheduled task: syncing topic
> >>> ACLs
> >>> > >>> > > > >> > > > > > >
> >>> (org.apache.kafka.connect.mirror.Scheduler:102)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > > java.util.concurrent.ExecutionException:
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > org.apache.kafka.common.errors.SecurityDisabledException:
> >>> > >>> > > No
> >>> > >>> > > > >> > > > Authorizer
> >>> > >>> > > > >> > > > > > is
> >>> > >>> > > > >> > > > > > > configured on the broker
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >>
> >>> > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >>
> >>> > >>> > >
> >>> > >>>
> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >>
> >>> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >         at
> >>> java.lang.Thread.run(Thread.java:748)
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > > Caused by:
> >>> > >>> > > > >> > >
> >>> org.apache.kafka.common.errors.SecurityDisabledException:
> >>> > >>> > > > >> > > > No
> >>> > >>> > > > >> > > > > > > Authorizer is configured on the broker
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne
> Dolan
> >>> <
> >>> > >>> > > > >> > > ryannedo...@gmail.com
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > > > > > wrote:
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > > >> > I do not have a single record in the
> >>> offsets
> >>> > >>> topic
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > > >> That's definitely not normal. You are
> correct
> >>> > that
> >>> > >>> > > without
> >>> > >>> > > > >> > records
> >>> > >>> > > > >> > > > in
> >>> > >>> > > > >> > > > > > that
> >>> > >>> > > > >> > > > > > >> topic, MM2 will restart from EARLIEST. The
> >>> > offsets
> >>> > >>> > should
> >>> > >>> > > > be
> >>> > >>> > > > >> > > stored
> >>> > >>> > > > >> > > > > > >> periodically and whenever the connectors
> >>> > gracefully
> >>> > >>> > > > shutdown
> >>> > >>> > > > >> or
> >>> > >>> > > > >> > > > > restart.
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > > >> Is it possible the topics don't have
> required
> >>> > ACLs
> >>> > >>> or
> >>> > >>> > > > >> something?
> >>> > >>> > > > >> > > > Also
> >>> > >>> > > > >> > > > > > >> note:
> >>> > >>> > > > >> > > > > > >> Connect wants the offsets topic to have a
> >>> large
> >>> > >>> number
> >>> > >>> > of
> >>> > >>> > > > >> > > partitions
> >>> > >>> > > > >> > > > > and
> >>> > >>> > > > >> > > > > > >> to
> >>> > >>> > > > >> > > > > > >> be compacted. Though I can't imagine either
> >>> would
> >>> > >>> > prevent
> >>> > >>> > > > >> > commits
> >>> > >>> > > > >> > > > from
> >>> > >>> > > > >> > > > > > >> being sent.
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > > >> Ryanne
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal
> >>> Santoshi
> >>> > <
> >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com>
> >>> > >>> > > > >> > > > > > >> wrote:
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > > >> > 2nd/restore issue  ( I think I need to
> >>> solve
> >>> > the
> >>> > >>> > > offsets
> >>> > >>> > > > >> topic
> >>> > >>> > > > >> > > > issue
> >>> > >>> > > > >> > > > > > >> > before I go with the scale up and down
> >>> issue )
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > As you had indicated, I went ahead and
> >>> created
> >>> > >>> the
> >>> > >>> > > > offsets
> >>> > >>> > > > >> > > topic.
> >>> > >>> > > > >> > > > > The
> >>> > >>> > > > >> > > > > > >> > status of the cluster  ( destination ) is
> >>> thus
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers Spread
> >>> > %Brokers
> >>> > >>> Skew
> >>> > >>> > > > >> %Brokers
> >>> > >>> > > > >> > > > > Leader
> >>> > >>> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated %Leader
> >>> > >>> SizeProducer
> >>> > >>> > > > >> > > > > > Message/SecSummed
> >>> > >>> > > > >> > > > > > >> > Recent Offsets
> >>> > >>> > > > >> > > > > > >> > s8k.checkpoints.internal
> >>> > >>> > > > >> > > > > > >> > <
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
> >>> > >>> > > > >> > > > > > >> > s8k.act_search_page
> >>> > >>> > > > >> > > > > > >> > <
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842
> >>> > >>> > > > >> > > > > > >> > s8k.act_reach
> >>> > >>> > > > >> > > > > > >> > <
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529
> >>> > >>> > > > >> > > > > > >> > mm2-status.s8k.internal
> >>> > >>> > > > >> > > > > > >> > <
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10
> >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal
> >>> > >>> > > > >> > > > > > >> > <
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
> >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k.internal
> >>> > >>> > > > >> > > > > > >> > <
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
> >>> > >>> > > > >> > > > > > >> > mm2-configs.s8k.internal
> >>> > >>> > > > >> > > > > > >> > <
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > You can see . that we have the  5 ( I
> >>> created
> >>> > >>> bot the
> >>> > >>> > > > >> offsets,
> >>> > >>> > > > >> > > to
> >>> > >>> > > > >> > > > be
> >>> > >>> > > > >> > > > > > >> safe
> >>> > >>> > > > >> > > > > > >> > for the below )
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *clusters = s8k, s8k_test*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *s8k_test.bootstrap.servers = ......*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *# only allow replication dr1 -> dr2*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.topics =
> >>> > >>> act_search_page|act_reach*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.emit.heartbeats.enabled =
> >>> false*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.emit.heartbeats.enabled =
> >>> false*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *s8k_test.replication.factor = 3*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *s8k.replication.factor = 3*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *offsets.storage.replication.factor = 3*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *replication.factor = 3*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *replication.policy.separator = .*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > *tasks.max = 4*
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > What seems strange is that I do not have
> a
> >>> > single
> >>> > >>> > > record
> >>> > >>> > > > in
> >>> > >>> > > > >> > the
> >>> > >>> > > > >> > > > > > offsets
> >>> > >>> > > > >> > > > > > >> > topic.. Is that normal ?   I would
> imagine
> >>> that
> >>> > >>> > > without a
> >>> > >>> > > > >> > > record,
> >>> > >>> > > > >> > > > > > there
> >>> > >>> > > > >> > > > > > >> is
> >>> > >>> > > > >> > > > > > >> > no way that a restore would happen....
> And
> >>> that
> >>> > >>> is
> >>> > >>> > > > obvious
> >>> > >>> > > > >> > when
> >>> > >>> > > > >> > > I
> >>> > >>> > > > >> > > > > > >> restart
> >>> > >>> > > > >> > > > > > >> > the mm2 instance... Find the screenshot
> >>> > >>> attached. In
> >>> > >>> > > > >> essence
> >>> > >>> > > > >> > the
> >>> > >>> > > > >> > > > > > latency
> >>> > >>> > > > >> > > > > > >> > avg lag is reset \when the mm2 instance
> is
> >>> > reset
> >>> > >>> > > > >> indicating no
> >>> > >>> > > > >> > > > > restore
> >>> > >>> > > > >> > > > > > >> but
> >>> > >>> > > > >> > > > > > >> > restart from EARLIEST... I must be
> missing
> >>> some
> >>> > >>> thing
> >>> > >>> > > > >> simple
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne
> >>> Dolan <
> >>> > >>> > > > >> > > > ryannedo...@gmail.com
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > > > >> > wrote:
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >> >> Vishal, the first issue is easy: you
> must
> >>> set
> >>> > >>> > > tasks.max
> >>> > >>> > > > to
> >>> > >>> > > > >> > > > > something
> >>> > >>> > > > >> > > > > > >> above
> >>> > >>> > > > >> > > > > > >> >> 1 (the default) in order to achieve any
> >>> > >>> parallelism.
> >>> > >>> > > > This
> >>> > >>> > > > >> > > > property
> >>> > >>> > > > >> > > > > is
> >>> > >>> > > > >> > > > > > >> >> passed along to the internal Connect
> >>> workers.
> >>> > >>> It's
> >>> > >>> > > > >> > unfortunate
> >>> > >>> > > > >> > > > that
> >>> > >>> > > > >> > > > > > >> >> Connect
> >>> > >>> > > > >> > > > > > >> >> is not smart enough to default this
> >>> property
> >>> > to
> >>> > >>> the
> >>> > >>> > > > >> number of
> >>> > >>> > > > >> > > > > > workers.
> >>> > >>> > > > >> > > > > > >> I
> >>> > >>> > > > >> > > > > > >> >> suspect that will improve before long.
> >>> > >>> > > > >> > > > > > >> >>
> >>> > >>> > > > >> > > > > > >> >> For the second issue, is it possible you
> >>> are
> >>> > >>> missing
> >>> > >>> > > the
> >>> > >>> > > > >> > > offsets
> >>> > >>> > > > >> > > > > > >> topic? It
> >>> > >>> > > > >> > > > > > >> >> should exist alongside the config and
> >>> status
> >>> > >>> topics.
> >>> > >>> > > > >> Connect
> >>> > >>> > > > >> > > > should
> >>> > >>> > > > >> > > > > > >> create
> >>> > >>> > > > >> > > > > > >> >> this topic, but there are various
> reasons
> >>> this
> >>> > >>> can
> >>> > >>> > > fail,
> >>> > >>> > > > >> e.g.
> >>> > >>> > > > >> > > if
> >>> > >>> > > > >> > > > > the
> >>> > >>> > > > >> > > > > > >> >> replication factor is misconfigured. You
> >>> can
> >>> > try
> >>> > >>> > > > creating
> >>> > >>> > > > >> > this
> >>> > >>> > > > >> > > > > topic
> >>> > >>> > > > >> > > > > > >> >> manually or changing
> >>> > >>> > > offsets.storage.replication.factor.
> >>> > >>> > > > >> > > > > > >> >>
> >>> > >>> > > > >> > > > > > >> >> Ryanne
> >>> > >>> > > > >> > > > > > >> >>
> >>> > >>> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal
> >>> Santoshi
> >>> > <
> >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com>
> >>> > >>> > > > >> > > > > > >> >> wrote:
> >>> > >>> > > > >> > > > > > >> >>
> >>> > >>> > > > >> > > > > > >> >> > Using
> >>> > >>> > > > >> > > >
> >>> > https://github.com/apache/kafka/tree/trunk/connect/mirror
> >>> > >>> > > > >> > > > > > as a
> >>> > >>> > > > >> > > > > > >> >> > guide,
> >>> > >>> > > > >> > > > > > >> >> > I have build from source the
> >>> origin/KIP-382
> >>> > of
> >>> > >>> > > > >> > > > > > >> >> > https://github.com/apache/kafka.git.
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > I am seeing 2 issues
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > * I brought up 2 processes on 2
> >>> different
> >>> > >>> nodes (
> >>> > >>> > > they
> >>> > >>> > > > >> are
> >>> > >>> > > > >> > > > > actually
> >>> > >>> > > > >> > > > > > >> >> pods on
> >>> > >>> > > > >> > > > > > >> >> > k8s but that should not matter ). They
> >>> share
> >>> > >>> the
> >>> > >>> > > > >> > > mm2.properties
> >>> > >>> > > > >> > > > > > file
> >>> > >>> > > > >> > > > > > >> and
> >>> > >>> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3 topics
> with
> >>> 8
> >>> > >>> > partitions
> >>> > >>> > > > in
> >>> > >>> > > > >> > > total.
> >>> > >>> > > > >> > > > > > That
> >>> > >>> > > > >> > > > > > >> >> seems
> >>> > >>> > > > >> > > > > > >> >> > to be the way to create a standalone
> mm2
> >>> > >>> cluster.
> >>> > >>> > I
> >>> > >>> > > do
> >>> > >>> > > > >> not
> >>> > >>> > > > >> > > > > however
> >>> > >>> > > > >> > > > > > >> see(
> >>> > >>> > > > >> > > > > > >> >> at
> >>> > >>> > > > >> > > > > > >> >> > least the mbeans do not show ) any
> >>> attempt
> >>> > to
> >>> > >>> > > > rebalance.
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >>
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process
> >>> > >>> > > > >> > > > > > >> >> > mbeans
> >>> > >>> > > > >> > > > > > >> >> > are all on a single node
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > * I restart the processes on the 2
> >>> nodes (
> >>> > >>> hard
> >>> > >>> > stop
> >>> > >>> > > > ans
> >>> > >>> > > > >> > > start
> >>> > >>> > > > >> > > > ).
> >>> > >>> > > > >> > > > > > The
> >>> > >>> > > > >> > > > > > >> >> > offsets for replication seem to be
> >>> reset to
> >>> > >>> the
> >>> > >>> > > > >> earliest,
> >>> > >>> > > > >> > as
> >>> > >>> > > > >> > > if
> >>> > >>> > > > >> > > > > it
> >>> > >>> > > > >> > > > > > >> is a
> >>> > >>> > > > >> > > > > > >> >> > brand new mirroring. It is also
> obvious
> >>> from
> >>> > >>> the
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > "record-age-ms-avg|replication-latency-ms-avg"
> >>> > >>> > > > >> > > > > > >> >> > which I track through the restart.
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > This implies that
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing is
> not
> >>> > >>> working. I
> >>> > >>> > > > >> cannot
> >>> > >>> > > > >> > > scale
> >>> > >>> > > > >> > > > > up
> >>> > >>> > > > >> > > > > > or
> >>> > >>> > > > >> > > > > > >> >> down
> >>> > >>> > > > >> > > > > > >> >> > by adding nodes to the mm2 cluster or
> >>> > removing
> >>> > >>> > them.
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > 2. Restore on a mirror is not working.
> >>> If
> >>> > the
> >>> > >>> MM2
> >>> > >>> > > > >> cluster
> >>> > >>> > > > >> > is
> >>> > >>> > > > >> > > > > > brought
> >>> > >>> > > > >> > > > > > >> >> down,
> >>> > >>> > > > >> > > > > > >> >> > it does not start mirroring from the
> >>> last
> >>> > >>> known
> >>> > >>> > > > state. I
> >>> > >>> > > > >> > see
> >>> > >>> > > > >> > > > the,
> >>> > >>> > > > >> > > > > > >> >> > state/config topics etc created as
> >>> > expected..
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > The mm2.properties is pretty mimimal
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > *clusters = a , b*
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....*
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....*
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > *# only allow replication dr1 -> dr2*
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > *a->b.enabled = true*
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > *a->b.topics = act_search_page*
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled = false*
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > *b->a..enabled = false*
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled = false*
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > What do you think is the issue ?
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >> > Thanks
> >>> > >>> > > > >> > > > > > >> >> >
> >>> > >>> > > > >> > > > > > >> >>
> >>> > >>> > > > >> > > > > > >> >
> >>> > >>> > > > >> > > > > > >>
> >>> > >>> > > > >> > > > > > >
> >>> > >>> > > > >> > > > > >
> >>> > >>> > > > >> > > > >
> >>> > >>> > > > >> > > >
> >>> > >>> > > > >> > >
> >>> > >>> > > > >> >
> >>> > >>> > > > >>
> >>> > >>> > > > >
> >>> > >>> > > >
> >>> > >>> > >
> >>> > >>> >
> >>> > >>>
> >>> > >>
> >>> >
> >>>
> >>
>

Reply via email to