Here is what I see

* The max tasks are a a cap  on a Connector across the cluster. If have 8
VMs but 8 max tasks my assumption  that there would be 8 * 8 = 72 task
threads was
wring. The logs showed that the partitions were consumed by  8 threads on
the 8 VMs ( 1 per VM ) which was highly un optimal.  When I scaled the
VMs to 12, it did not matter, as the max tasks still prevented any further
distribution.

*  If I cancel/resume the cluster with a max task of 48 ( keeping the same
job name and thus connector definition   the max tasks does not change, as
in
 it seems to keep the same number of max task  threads limit ( as in 8 )

* I can bring down a VM and see the task migrate to a free VM but the
overall count of task threads remain the same.


In essence, the num of tasks is a cap on threads in the cluster per
connector, A connector is a source->sink pair that spans a cluster. Thus if
we have a
A->B DAG and  max tasks of 8, then there will be no more that 8 Source
Tasks  ( threads ) no matter how big the cluster is, It thus makes sense to
over provision ( within limits of a single VM ) on the max tasks to allow
for adding more VMs for scale up.....



On Fri, Oct 18, 2019 at 8:04 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> I misspoke
>
> >> I now have 8 VMs 8 cpus with 48 max tasks and it did spread to the the
> 8  VMs. I then upscaled to 12 VMs and the tasks *have not *migrated as I
> would expect .
>
>
>
>
> On Fri, Oct 18, 2019 at 8:00 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> OK, You will have to explain :)
>>
>> I had 12 VMs with 8 cpus and 8 max tasks.  I thought let me give a CPU to
>> each task, which I presumed is a java thread ( even though I know the
>> thread would be mostly ip bound ). . I saw the issue I pointed up.
>> *I now have 8 VMs 8 cpus with 48 max tasks and it did spread to the the
>> 8  VMs. I then upscaled to 12 VMs and the tasks migrated as I would expect
>> .*
>>
>>  I know that a VM will have MirrorSourceConnector and
>> MirrorHeartbeatConnector tasks up till  tasks.max.  So a few questions
>>
>>
>>
>> * When we say there are 48 max tasks, are we saying there are  48 threads
>> ( in fact 96, each for the 2 groups above,  worst case  + 2 ) ?
>> * When we talk about Connector, are we talking about a JVM process, as in
>> a Connector is a JVM process ?
>> * Why larger number of tasks.max help the spread  ?  As in I would assume
>> there are up till 8 tasks ( or 16 )  per VM but how that should not have
>> prevented  re assignment  on a scale up ( as it clearly did ) ?
>>
>> The reason I ask is that I plan to run mm2 cluster on  k8s and I want to
>> make sure that I use the version of JVM that is more docker friendly vis a
>> vis, how many cpus it believes it has  and as explained here
>> https://blog.softwaremill.com/docker-support-in-new-java-8-finally-fd595df0ca54
>>
>>
>>
>>
>> On Fri, Oct 18, 2019 at 4:15 PM Ryanne Dolan <ryannedo...@gmail.com>
>> wrote:
>>
>>> What is tasks.max? Consider bumping to something like 48 if you're
>>> running
>>> on a dozen nodes.
>>>
>>> Ryanne
>>>
>>> On Fri, Oct 18, 2019, 1:43 PM Vishal Santoshi <vishal.santo...@gmail.com
>>> >
>>> wrote:
>>>
>>> > Hey Ryanne,
>>> >
>>> >
>>> >             I see a definite issue. I am doing an intense test and I
>>> bring
>>> > up 12 VMs ( they are 12 pods with 8 cpus each ), replicating about 1200
>>> > plus topics ( fairly heavy 100mbps ) ... They are acquired and are
>>> > staggered as they come up..I see a fraction of these nodes not
>>> assigned any
>>> > replication....There is plenty to go around. ( more then a couple of
>>> > thousand partitions ) .   is there something I am missing.... As in my
>>> > current case 5 of the 12 VMs are idle..
>>> >
>>> > Vishal
>>> >
>>> > On Fri, Oct 18, 2019 at 7:05 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com
>>> > >
>>> > wrote:
>>> >
>>> > > Oh sorry a. COUNTER... is more like it....
>>> > >
>>> > > On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com
>>> > >
>>> > > wrote:
>>> > >
>>> > >> Will do
>>> > >>     One more thing the age/latency metrics seem to be analogous as
>>> in
>>> > >> they seem to be calculated using similar routines.  I would think a
>>> > metric
>>> > >> tracking
>>> > >> the number of flush failures ( as a GAUGE )  given
>>> > >> offset.flush.timeout.ms would be highly beneficial.
>>> > >>
>>> > >> Regards..
>>> > >>
>>> > >>
>>> > >> On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan <
>>> ryannedo...@gmail.com>
>>> > >> wrote:
>>> > >>
>>> > >>> Ah, I see you are correct. Also I misspoke saying "workers"
>>> earlier, as
>>> > >>> the
>>> > >>> consumer is not created by the worker, but the task.
>>> > >>>
>>> > >>> I suppose the put() could be changed to putIfAbsent() here to
>>> enable
>>> > this
>>> > >>> property to be changed. Maybe submit a PR?
>>> > >>>
>>> > >>> Ryanne
>>> > >>>
>>> > >>> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi <
>>> > >>> vishal.santo...@gmail.com>
>>> > >>> wrote:
>>> > >>>
>>> > >>> > Hmm  ( I did both )
>>> > >>> >
>>> > >>> > another->another_test.enabled = true
>>> > >>> >
>>> > >>> > another->another_test.topics = act_post
>>> > >>> >
>>> > >>> > another->another_test.emit.heartbeats.enabled = false
>>> > >>> >
>>> > >>> > another->another_test.consumer.auto.offset.reset = latest
>>> > >>> >
>>> > >>> > another->another_test.sync.topic.acls.enabled = false
>>> > >>> >
>>> > >>> > another.consumer.auto.offset.reset = latest
>>> > >>> >
>>> > >>> >
>>> > >>> >
>>> > >>> > When I grep for the ConsumerConfig ( and there are 8 instances,
>>> this
>>> > >>> topic
>>> > >>> > has 4 partitions )
>>> > >>> >
>>> > >>> >
>>> > >>> >  [2019-10-17 14:01:21,879] INFO ConsumerConfig values:
>>> > >>> >
>>> > >>> > allow.auto.create.topics = true
>>> > >>> >
>>> > >>> > auto.commit.interval.ms = 5000
>>> > >>> >
>>> > >>> > *auto.offset.reset* = earliest
>>> > >>> >
>>> > >>> >
>>> > >>> > I am now using the 2.4 branch from kafka trunk
>>> > >>> > https://github.com/apache/kafka/tree/2.4/connect/mirror
>>> > >>> >
>>> > >>> >
>>> > >>> > This code change works and makes sense.. I think all other
>>> settings
>>> > >>> will be
>>> > >>> > fine ( as can be overridden )  but for the 2 below..
>>> > >>> >
>>> > >>> >
>>> > >>> > *---
>>> > >>> >
>>> > >>> >
>>> > >>>
>>> >
>>> a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
>>> > >>> >
>>> > >>> > *+++
>>> > >>> >
>>> > >>> >
>>> > >>>
>>> >
>>> b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
>>> > >>> >
>>> > >>> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig extends
>>> > >>> > AbstractConfig {
>>> > >>> >
>>> > >>> >
>>> > >>> >
>>> > props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
>>> > >>> >
>>> > >>> >
>>> props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
>>> > >>> >
>>> > >>> >          props.put("enable.auto.commit", "false");
>>> > >>> >
>>> > >>> > -        props.put("auto.offset.reset", "earliest");
>>> > >>> >
>>> > >>> > +        props.put("auto.offset.reset", "latest");
>>> > >>> >
>>> > >>> >          return props;
>>> > >>> >
>>> > >>> >      }
>>> > >>> >
>>> > >>> >
>>> > >>> > Regards.
>>> > >>> >
>>> > >>> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan <
>>> ryannedo...@gmail.com>
>>> > >>> > wrote:
>>> > >>> >
>>> > >>> > > Vishal, you should be able to override the properties passed
>>> to the
>>> > >>> > > internal workers using properties like
>>> > >>> A->B.consumer.auto.offset.reset or
>>> > >>> > > A.consumer.auto.offset.reset in the mm2.properties file.
>>> Certain
>>> > >>> > top-level
>>> > >>> > > properties like tasks.max are honored without the A->B or A
>>> prefix,
>>> > >>> but
>>> > >>> > > auto.offset.reset is not one of them.
>>> > >>> > >
>>> > >>> > > Ryanne
>>> > >>> > >
>>> > >>> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi <
>>> > >>> > vishal.santo...@gmail.com
>>> > >>> > > >
>>> > >>> > > wrote:
>>> > >>> > >
>>> > >>> > > > Hey Ryanne,
>>> > >>> > > >
>>> > >>> > > >
>>> > >>> > > >     How do I override auto.offset.reset = latest for
>>> consumers
>>> > >>> through
>>> > >>> > > > mm2.properties. I have tried straight up . auto.offset.reset
>>> and
>>> > >>> > > consumer.
>>> > >>> > > > auto.offset.reset  but it defaults to earliest.. I do have a
>>> > query
>>> > >>> in
>>> > >>> > > > another thread but though you might know off hand..
>>> > >>> > > >
>>> > >>> > > > I would imagine there is some way in general of overriding
>>> > >>> consumer and
>>> > >>> > > > producer configs through mm2.properties in MM2 ?
>>> > >>> > > >
>>> > >>> > > > Regards.
>>> > >>> > > >
>>> > >>> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi <
>>> > >>> > > vishal.santo...@gmail.com
>>> > >>> > > > >
>>> > >>> > > > wrote:
>>> > >>> > > >
>>> > >>> > > > > Thank you so much for all your help.  Will keep you posted
>>> on
>>> > >>> tests I
>>> > >>> > > > do..
>>> > >>> > > > > I hope this is helpful to other folks too..
>>> > >>> > > > >
>>> > >>> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan <
>>> > >>> ryannedo...@gmail.com>
>>> > >>> > > > > wrote:
>>> > >>> > > > >
>>> > >>> > > > >> That's right. MM2 is at-least-once for now, same as legacy
>>> > >>> > > MirrorMaker.
>>> > >>> > > > >> You
>>> > >>> > > > >> can follow
>>> https://issues.apache.org/jira/browse/KAFKA-6080
>>> > for
>>> > >>> > > updates
>>> > >>> > > > >> on
>>> > >>> > > > >> exactly-once semantics in Connect.
>>> > >>> > > > >>
>>> > >>> > > > >> Ryanne
>>> > >>> > > > >>
>>> > >>> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi <
>>> > >>> > > > >> vishal.santo...@gmail.com>
>>> > >>> > > > >> wrote:
>>> > >>> > > > >>
>>> > >>> > > > >> >   >> You are correct. I'm working on a KIP and PoC to
>>> > >>> introduce
>>> > >>> > > > >> > transactions to
>>> > >>> > > > >> > >> Connect for this exact purpose :)
>>> > >>> > > > >> >
>>> > >>> > > > >> > That is awesome. Any time frame ?
>>> > >>> > > > >> >
>>> > >>> > > > >> >
>>> > >>> > > > >> > In the mean time the SLA as of now
>>> > >>> > > > >> >
>>> > >>> > > > >> > 1. It is conceivable that we flush the producer to the
>>> > target
>>> > >>> > > cluster
>>> > >>> > > > >> but
>>> > >>> > > > >> > fail to offset commit. If there was a restart before the
>>> > next
>>> > >>> > > > successful
>>> > >>> > > > >> > offset commit, there will be  duplicates  and a part of
>>> data
>>> > >>> is
>>> > >>> > > > >> replayed (
>>> > >>> > > > >> > at least once ) ?
>>> > >>> > > > >> >
>>> > >>> > > > >> > 2. The same can be said about  partial flushes, though
>>> am
>>> > not
>>> > >>> sure
>>> > >>> > > > about
>>> > >>> > > > >> > how kafka addresses flush ( Is a flush either success
>>> or a
>>> > >>> > failure,
>>> > >>> > > > and
>>> > >>> > > > >> > nothing in between )
>>> > >>> > > > >> >
>>> > >>> > > > >> > Thanks..
>>> > >>> > > > >> >
>>> > >>> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan <
>>> > >>> > > ryannedo...@gmail.com>
>>> > >>> > > > >> > wrote:
>>> > >>> > > > >> >
>>> > >>> > > > >> > > Hey Vishal, glad to hear you're making progress.
>>> > >>> > > > >> > >
>>> > >>> > > > >> > > > 1. It seems though that flushing [...] the producer
>>> and
>>> > >>> > setting
>>> > >>> > > > the
>>> > >>> > > > >> > > > offset to the compacting topic is not atomic  OR
>>> do we
>>> > >>> use
>>> > >>> > > > >> > > > transactions here  ?
>>> > >>> > > > >> > >
>>> > >>> > > > >> > > You are correct. I'm working on a KIP and PoC to
>>> introduce
>>> > >>> > > > >> transactions
>>> > >>> > > > >> > to
>>> > >>> > > > >> > > Connect for this exact purpose :)
>>> > >>> > > > >> > >
>>> > >>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ),
>>> and I
>>> > >>> have
>>> > >>> > 2
>>> > >>> > > > >> topics
>>> > >>> > > > >> > > with
>>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as in
>>> there
>>> > are
>>> > >>> 4
>>> > >>> > > > consumer
>>> > >>> > > > >> > > groups
>>> > >>> > > > >> > > > ( on CG per thread ) ...
>>> > >>> > > > >> > >
>>> > >>> > > > >> > > Some details here:
>>> > >>> > > > >> > > - tasks.max controls the maximum number of tasks
>>> created
>>> > per
>>> > >>> > > > Connector
>>> > >>> > > > >> > > instance. Both MirrorSourceConnector and
>>> > >>> > MirrorCheckpointConnector
>>> > >>> > > > >> will
>>> > >>> > > > >> > > create multiple tasks (up to tasks.max), but
>>> > >>> > > > MirrorHeartbeatConnector
>>> > >>> > > > >> > only
>>> > >>> > > > >> > > ever creates a single task. Moreover, there cannot be
>>> more
>>> > >>> tasks
>>> > >>> > > > than
>>> > >>> > > > >> > > topic-partitions (for MirrorSourceConnector) or
>>> consumer
>>> > >>> groups
>>> > >>> > > (for
>>> > >>> > > > >> > > MirrorCheckpointConnector). So if you have two topics
>>> with
>>> > >>> one
>>> > >>> > > > >> partition
>>> > >>> > > > >> > > each and 1 consumer group total, you'll have two
>>> > >>> > > > MirrorSourceConnector
>>> > >>> > > > >> > > tasks, one MirrorHeartbeatConnector task, and one
>>> > >>> > > > >> > MirrorCheckpointConnector
>>> > >>> > > > >> > > tasks, for a total of four. And that's in one
>>> direction
>>> > >>> only: if
>>> > >>> > > you
>>> > >>> > > > >> have
>>> > >>> > > > >> > > multiple source->target herders enabled, each will
>>> create
>>> > >>> tasks
>>> > >>> > > > >> > > independently.
>>> > >>> > > > >> > > - There are no consumer groups in MM2, technically.
>>> The
>>> > >>> Connect
>>> > >>> > > > >> framework
>>> > >>> > > > >> > > uses the Coordinator API and internal topics to divide
>>> > tasks
>>> > >>> > among
>>> > >>> > > > >> > workers
>>> > >>> > > > >> > > -- not a consumer group per se. The MM2 connectors
>>> use the
>>> > >>> > > assign()
>>> > >>> > > > >> API,
>>> > >>> > > > >> > > not the subscribe() API, so there are no consumer
>>> groups
>>> > >>> there
>>> > >>> > > > >> either. In
>>> > >>> > > > >> > > fact, they don't commit() either. This is nice, as it
>>> > >>> > eliminates a
>>> > >>> > > > >> lot of
>>> > >>> > > > >> > > the rebalancing problems legacy MirrorMaker has been
>>> > plagued
>>> > >>> > with.
>>> > >>> > > > >> With
>>> > >>> > > > >> > > MM2, rebalancing only occurs when the number of
>>> workers
>>> > >>> changes
>>> > >>> > or
>>> > >>> > > > >> when
>>> > >>> > > > >> > the
>>> > >>> > > > >> > > assignments change (e.g. new topics are discovered).
>>> > >>> > > > >> > >
>>> > >>> > > > >> > > Ryanne
>>> > >>> > > > >> > >
>>> > >>> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal Santoshi <
>>> > >>> > > > >> > > vishal.santo...@gmail.com>
>>> > >>> > > > >> > > wrote:
>>> > >>> > > > >> > >
>>> > >>> > > > >> > > > Hey Ryanne,
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >            The test was on topics that had a 7 day
>>> > >>> retention.
>>> > >>> > > > Which
>>> > >>> > > > >> > > > generally implies that the batch size for flush is
>>> > pretty
>>> > >>> > high (
>>> > >>> > > > >> till
>>> > >>> > > > >> > the
>>> > >>> > > > >> > > > consumption becomes current ). The
>>> > >>> offset.flush.timeout.ms
>>> > >>> > > > >> defaults
>>> > >>> > > > >> > to
>>> > >>> > > > >> > > 5
>>> > >>> > > > >> > > > seconds and the code will not send in the offsets
>>> if the
>>> > >>> flush
>>> > >>> > > is
>>> > >>> > > > >> not
>>> > >>> > > > >> > > > complete. Increasing that time out did solve the
>>> "not
>>> > >>> sending
>>> > >>> > > the
>>> > >>> > > > >> > offset
>>> > >>> > > > >> > > to
>>> > >>> > > > >> > > > topic" issue.
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > > Two questions ( I am being greedy here :) )
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > > 1. It seems though that flushing the flushing the
>>> > >>> producer and
>>> > >>> > > > >> setting
>>> > >>> > > > >> > > the
>>> > >>> > > > >> > > > offset to the compacting topic is not atomic  OR
>>> do we
>>> > >>> use
>>> > >>> > > > >> > > > transactions here  ?
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > > 2. I see
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorHeartbeatConnector-0}
>>> > flushing
>>> > >>> > 956435
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorSourceConnector-1}
>>> flushing
>>> > >>> 356251
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorCheckpointConnector-2}
>>> > >>> flushing 0
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorCheckpointConnector-3}
>>> > >>> flushing 0
>>> > >>> > > > >> > outstanding
>>> > >>> > > > >> > > > messages
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ),
>>> and I
>>> > >>> have
>>> > >>> > 2
>>> > >>> > > > >> topics
>>> > >>> > > > >> > > with
>>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as in
>>> there
>>> > are
>>> > >>> 4
>>> > >>> > > > consumer
>>> > >>> > > > >> > > groups
>>> > >>> > > > >> > > > ( on CG per thread ) ...
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > > THANKS A LOT
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > > Vishal.
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan <
>>> > >>> > > > ryannedo...@gmail.com
>>> > >>> > > > >> >
>>> > >>> > > > >> > > > wrote:
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > > > > >  timed out
>>> > >>> > > > >> > > > > while waiting for producer to flush outstanding
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > > > Yeah, that's what I'd expect to see if Connect was
>>> > >>> unable to
>>> > >>> > > > send
>>> > >>> > > > >> > > records
>>> > >>> > > > >> > > > > to the downstream remote topics, e.g. if
>>> > >>> > min.in-sync.replicas
>>> > >>> > > > were
>>> > >>> > > > >> > > > > misconfigured. Given some data seems to arrive,
>>> it's
>>> > >>> > possible
>>> > >>> > > > that
>>> > >>> > > > >> > > > > everything is configured correctly but with too
>>> much
>>> > >>> latency
>>> > >>> > > to
>>> > >>> > > > >> > > > > successfully commit within the default timeouts.
>>> You
>>> > may
>>> > >>> > want
>>> > >>> > > to
>>> > >>> > > > >> > > increase
>>> > >>> > > > >> > > > > the number of tasks substantially to achieve more
>>> > >>> > parallelism
>>> > >>> > > > and
>>> > >>> > > > >> > > > > throughput.
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > > > Ryanne
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi <
>>> > >>> > > > >> > > vishal.santo...@gmail.com
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > > > wrote:
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > > > > Aah no.. this is more to  it. Note sure if
>>> related
>>> > to
>>> > >>> the
>>> > >>> > > > above.
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > > Is timing out based on
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR
>>> > >>> > > > >> > > > > > WorkerSourceTask{id=MirrorSourceConnector-0}
>>> Failed
>>> > to
>>> > >>> > > flush,
>>> > >>> > > > >> timed
>>> > >>> > > > >> > > out
>>> > >>> > > > >> > > > > > while waiting for producer to flush outstanding
>>> > 36478
>>> > >>> > > messages
>>> > >>> > > > >> > > > > >
>>> > >>> (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi
>>> <
>>> > >>> > > > >> > > > > vishal.santo...@gmail.com
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > wrote:
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > > > I think this might be it.. Could you confirm.
>>> It
>>> > >>> seems
>>> > >>> > to
>>> > >>> > > be
>>> > >>> > > > >> on
>>> > >>> > > > >> > the
>>> > >>> > > > >> > > > > path
>>> > >>> > > > >> > > > > > > to commit the offsets.. but not sure...
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR Scheduler for
>>> > >>> > > > >> > MirrorSourceConnector
>>> > >>> > > > >> > > > > > caught
>>> > >>> > > > >> > > > > > > exception in scheduled task: syncing topic
>>> ACLs
>>> > >>> > > > >> > > > > > >
>>> (org.apache.kafka.connect.mirror.Scheduler:102)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > > java.util.concurrent.ExecutionException:
>>> > >>> > > > >> > > > > > >
>>> > >>> > org.apache.kafka.common.errors.SecurityDisabledException:
>>> > >>> > > No
>>> > >>> > > > >> > > > Authorizer
>>> > >>> > > > >> > > > > > is
>>> > >>> > > > >> > > > > > > configured on the broker
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >>
>>> > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >>
>>> > >>> > >
>>> > >>>
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >>
>>> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >         at
>>> java.lang.Thread.run(Thread.java:748)
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > > Caused by:
>>> > >>> > > > >> > >
>>> org.apache.kafka.common.errors.SecurityDisabledException:
>>> > >>> > > > >> > > > No
>>> > >>> > > > >> > > > > > > Authorizer is configured on the broker
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan
>>> <
>>> > >>> > > > >> > > ryannedo...@gmail.com
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > > > > > wrote:
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > > >> > I do not have a single record in the
>>> offsets
>>> > >>> topic
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > > >> That's definitely not normal. You are correct
>>> > that
>>> > >>> > > without
>>> > >>> > > > >> > records
>>> > >>> > > > >> > > > in
>>> > >>> > > > >> > > > > > that
>>> > >>> > > > >> > > > > > >> topic, MM2 will restart from EARLIEST. The
>>> > offsets
>>> > >>> > should
>>> > >>> > > > be
>>> > >>> > > > >> > > stored
>>> > >>> > > > >> > > > > > >> periodically and whenever the connectors
>>> > gracefully
>>> > >>> > > > shutdown
>>> > >>> > > > >> or
>>> > >>> > > > >> > > > > restart.
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > > >> Is it possible the topics don't have required
>>> > ACLs
>>> > >>> or
>>> > >>> > > > >> something?
>>> > >>> > > > >> > > > Also
>>> > >>> > > > >> > > > > > >> note:
>>> > >>> > > > >> > > > > > >> Connect wants the offsets topic to have a
>>> large
>>> > >>> number
>>> > >>> > of
>>> > >>> > > > >> > > partitions
>>> > >>> > > > >> > > > > and
>>> > >>> > > > >> > > > > > >> to
>>> > >>> > > > >> > > > > > >> be compacted. Though I can't imagine either
>>> would
>>> > >>> > prevent
>>> > >>> > > > >> > commits
>>> > >>> > > > >> > > > from
>>> > >>> > > > >> > > > > > >> being sent.
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > > >> Ryanne
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal
>>> Santoshi
>>> > <
>>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com>
>>> > >>> > > > >> > > > > > >> wrote:
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > > >> > 2nd/restore issue  ( I think I need to
>>> solve
>>> > the
>>> > >>> > > offsets
>>> > >>> > > > >> topic
>>> > >>> > > > >> > > > issue
>>> > >>> > > > >> > > > > > >> > before I go with the scale up and down
>>> issue )
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > As you had indicated, I went ahead and
>>> created
>>> > >>> the
>>> > >>> > > > offsets
>>> > >>> > > > >> > > topic.
>>> > >>> > > > >> > > > > The
>>> > >>> > > > >> > > > > > >> > status of the cluster  ( destination ) is
>>> thus
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers Spread
>>> > %Brokers
>>> > >>> Skew
>>> > >>> > > > >> %Brokers
>>> > >>> > > > >> > > > > Leader
>>> > >>> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated %Leader
>>> > >>> SizeProducer
>>> > >>> > > > >> > > > > > Message/SecSummed
>>> > >>> > > > >> > > > > > >> > Recent Offsets
>>> > >>> > > > >> > > > > > >> > s8k.checkpoints.internal
>>> > >>> > > > >> > > > > > >> > <
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>> > >>> > > > >> > > > > > >> > s8k.act_search_page
>>> > >>> > > > >> > > > > > >> > <
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842
>>> > >>> > > > >> > > > > > >> > s8k.act_reach
>>> > >>> > > > >> > > > > > >> > <
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529
>>> > >>> > > > >> > > > > > >> > mm2-status.s8k.internal
>>> > >>> > > > >> > > > > > >> > <
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10
>>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal
>>> > >>> > > > >> > > > > > >> > <
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k.internal
>>> > >>> > > > >> > > > > > >> > <
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>> > >>> > > > >> > > > > > >> > mm2-configs.s8k.internal
>>> > >>> > > > >> > > > > > >> > <
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > You can see . that we have the  5 ( I
>>> created
>>> > >>> bot the
>>> > >>> > > > >> offsets,
>>> > >>> > > > >> > > to
>>> > >>> > > > >> > > > be
>>> > >>> > > > >> > > > > > >> safe
>>> > >>> > > > >> > > > > > >> > for the below )
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *clusters = s8k, s8k_test*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *s8k_test.bootstrap.servers = ......*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *# only allow replication dr1 -> dr2*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.topics =
>>> > >>> act_search_page|act_reach*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.emit.heartbeats.enabled =
>>> false*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.emit.heartbeats.enabled =
>>> false*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *s8k_test.replication.factor = 3*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *s8k.replication.factor = 3*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *offsets.storage.replication.factor = 3*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *replication.factor = 3*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *replication.policy.separator = .*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > *tasks.max = 4*
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > What seems strange is that I do not have a
>>> > single
>>> > >>> > > record
>>> > >>> > > > in
>>> > >>> > > > >> > the
>>> > >>> > > > >> > > > > > offsets
>>> > >>> > > > >> > > > > > >> > topic.. Is that normal ?   I would imagine
>>> that
>>> > >>> > > without a
>>> > >>> > > > >> > > record,
>>> > >>> > > > >> > > > > > there
>>> > >>> > > > >> > > > > > >> is
>>> > >>> > > > >> > > > > > >> > no way that a restore would happen.... And
>>> that
>>> > >>> is
>>> > >>> > > > obvious
>>> > >>> > > > >> > when
>>> > >>> > > > >> > > I
>>> > >>> > > > >> > > > > > >> restart
>>> > >>> > > > >> > > > > > >> > the mm2 instance... Find the screenshot
>>> > >>> attached. In
>>> > >>> > > > >> essence
>>> > >>> > > > >> > the
>>> > >>> > > > >> > > > > > latency
>>> > >>> > > > >> > > > > > >> > avg lag is reset \when the mm2 instance is
>>> > reset
>>> > >>> > > > >> indicating no
>>> > >>> > > > >> > > > > restore
>>> > >>> > > > >> > > > > > >> but
>>> > >>> > > > >> > > > > > >> > restart from EARLIEST... I must be missing
>>> some
>>> > >>> thing
>>> > >>> > > > >> simple
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne
>>> Dolan <
>>> > >>> > > > >> > > > ryannedo...@gmail.com
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > > > >> > wrote:
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >> >> Vishal, the first issue is easy: you must
>>> set
>>> > >>> > > tasks.max
>>> > >>> > > > to
>>> > >>> > > > >> > > > > something
>>> > >>> > > > >> > > > > > >> above
>>> > >>> > > > >> > > > > > >> >> 1 (the default) in order to achieve any
>>> > >>> parallelism.
>>> > >>> > > > This
>>> > >>> > > > >> > > > property
>>> > >>> > > > >> > > > > is
>>> > >>> > > > >> > > > > > >> >> passed along to the internal Connect
>>> workers.
>>> > >>> It's
>>> > >>> > > > >> > unfortunate
>>> > >>> > > > >> > > > that
>>> > >>> > > > >> > > > > > >> >> Connect
>>> > >>> > > > >> > > > > > >> >> is not smart enough to default this
>>> property
>>> > to
>>> > >>> the
>>> > >>> > > > >> number of
>>> > >>> > > > >> > > > > > workers.
>>> > >>> > > > >> > > > > > >> I
>>> > >>> > > > >> > > > > > >> >> suspect that will improve before long.
>>> > >>> > > > >> > > > > > >> >>
>>> > >>> > > > >> > > > > > >> >> For the second issue, is it possible you
>>> are
>>> > >>> missing
>>> > >>> > > the
>>> > >>> > > > >> > > offsets
>>> > >>> > > > >> > > > > > >> topic? It
>>> > >>> > > > >> > > > > > >> >> should exist alongside the config and
>>> status
>>> > >>> topics.
>>> > >>> > > > >> Connect
>>> > >>> > > > >> > > > should
>>> > >>> > > > >> > > > > > >> create
>>> > >>> > > > >> > > > > > >> >> this topic, but there are various reasons
>>> this
>>> > >>> can
>>> > >>> > > fail,
>>> > >>> > > > >> e.g.
>>> > >>> > > > >> > > if
>>> > >>> > > > >> > > > > the
>>> > >>> > > > >> > > > > > >> >> replication factor is misconfigured. You
>>> can
>>> > try
>>> > >>> > > > creating
>>> > >>> > > > >> > this
>>> > >>> > > > >> > > > > topic
>>> > >>> > > > >> > > > > > >> >> manually or changing
>>> > >>> > > offsets.storage.replication.factor.
>>> > >>> > > > >> > > > > > >> >>
>>> > >>> > > > >> > > > > > >> >> Ryanne
>>> > >>> > > > >> > > > > > >> >>
>>> > >>> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal
>>> Santoshi
>>> > <
>>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com>
>>> > >>> > > > >> > > > > > >> >> wrote:
>>> > >>> > > > >> > > > > > >> >>
>>> > >>> > > > >> > > > > > >> >> > Using
>>> > >>> > > > >> > > >
>>> > https://github.com/apache/kafka/tree/trunk/connect/mirror
>>> > >>> > > > >> > > > > > as a
>>> > >>> > > > >> > > > > > >> >> > guide,
>>> > >>> > > > >> > > > > > >> >> > I have build from source the
>>> origin/KIP-382
>>> > of
>>> > >>> > > > >> > > > > > >> >> > https://github.com/apache/kafka.git.
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > I am seeing 2 issues
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > * I brought up 2 processes on 2
>>> different
>>> > >>> nodes (
>>> > >>> > > they
>>> > >>> > > > >> are
>>> > >>> > > > >> > > > > actually
>>> > >>> > > > >> > > > > > >> >> pods on
>>> > >>> > > > >> > > > > > >> >> > k8s but that should not matter ). They
>>> share
>>> > >>> the
>>> > >>> > > > >> > > mm2.properties
>>> > >>> > > > >> > > > > > file
>>> > >>> > > > >> > > > > > >> and
>>> > >>> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3 topics with
>>> 8
>>> > >>> > partitions
>>> > >>> > > > in
>>> > >>> > > > >> > > total.
>>> > >>> > > > >> > > > > > That
>>> > >>> > > > >> > > > > > >> >> seems
>>> > >>> > > > >> > > > > > >> >> > to be the way to create a standalone mm2
>>> > >>> cluster.
>>> > >>> > I
>>> > >>> > > do
>>> > >>> > > > >> not
>>> > >>> > > > >> > > > > however
>>> > >>> > > > >> > > > > > >> see(
>>> > >>> > > > >> > > > > > >> >> at
>>> > >>> > > > >> > > > > > >> >> > least the mbeans do not show ) any
>>> attempt
>>> > to
>>> > >>> > > > rebalance.
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >>
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process
>>> > >>> > > > >> > > > > > >> >> > mbeans
>>> > >>> > > > >> > > > > > >> >> > are all on a single node
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > * I restart the processes on the 2
>>> nodes (
>>> > >>> hard
>>> > >>> > stop
>>> > >>> > > > ans
>>> > >>> > > > >> > > start
>>> > >>> > > > >> > > > ).
>>> > >>> > > > >> > > > > > The
>>> > >>> > > > >> > > > > > >> >> > offsets for replication seem to be
>>> reset to
>>> > >>> the
>>> > >>> > > > >> earliest,
>>> > >>> > > > >> > as
>>> > >>> > > > >> > > if
>>> > >>> > > > >> > > > > it
>>> > >>> > > > >> > > > > > >> is a
>>> > >>> > > > >> > > > > > >> >> > brand new mirroring. It is also obvious
>>> from
>>> > >>> the
>>> > >>> > > > >> > > > > > >> >> >
>>> > "record-age-ms-avg|replication-latency-ms-avg"
>>> > >>> > > > >> > > > > > >> >> > which I track through the restart.
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > This implies that
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing is not
>>> > >>> working. I
>>> > >>> > > > >> cannot
>>> > >>> > > > >> > > scale
>>> > >>> > > > >> > > > > up
>>> > >>> > > > >> > > > > > or
>>> > >>> > > > >> > > > > > >> >> down
>>> > >>> > > > >> > > > > > >> >> > by adding nodes to the mm2 cluster or
>>> > removing
>>> > >>> > them.
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > 2. Restore on a mirror is not working.
>>> If
>>> > the
>>> > >>> MM2
>>> > >>> > > > >> cluster
>>> > >>> > > > >> > is
>>> > >>> > > > >> > > > > > brought
>>> > >>> > > > >> > > > > > >> >> down,
>>> > >>> > > > >> > > > > > >> >> > it does not start mirroring from the
>>> last
>>> > >>> known
>>> > >>> > > > state. I
>>> > >>> > > > >> > see
>>> > >>> > > > >> > > > the,
>>> > >>> > > > >> > > > > > >> >> > state/config topics etc created as
>>> > expected..
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > The mm2.properties is pretty mimimal
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > *clusters = a , b*
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....*
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....*
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > *# only allow replication dr1 -> dr2*
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > *a->b.enabled = true*
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > *a->b.topics = act_search_page*
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled = false*
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > *b->a..enabled = false*
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled = false*
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > What do you think is the issue ?
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >> > Thanks
>>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > > > >> > > > > > >> >>
>>> > >>> > > > >> > > > > > >> >
>>> > >>> > > > >> > > > > > >>
>>> > >>> > > > >> > > > > > >
>>> > >>> > > > >> > > > > >
>>> > >>> > > > >> > > > >
>>> > >>> > > > >> > > >
>>> > >>> > > > >> > >
>>> > >>> > > > >> >
>>> > >>> > > > >>
>>> > >>> > > > >
>>> > >>> > > >
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> > >>
>>> >
>>>
>>

Reply via email to