Hey Ryanne,

            I see a definite issue. I am doing an intense test and I bring
up 12 VMs ( they are 12 pods with 8 cpus each ), replicating about 1200
plus topics ( fairly heavy 100mbps ) ... They are acquired and are
staggered as they come up..I see a fraction of these nodes not assigned any
replication....There is plenty to go around. ( more then a couple of
thousand partitions ) .   is there something I am missing.... As in my
current case 5 of the 12 VMs are idle..

Vishal

On Fri, Oct 18, 2019 at 7:05 AM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Oh sorry a. COUNTER... is more like it....
>
> On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> Will do
>>     One more thing the age/latency metrics seem to be analogous as in
>> they seem to be calculated using similar routines.  I would think a metric
>> tracking
>> the number of flush failures ( as a GAUGE )  given
>> offset.flush.timeout.ms would be highly beneficial.
>>
>> Regards..
>>
>>
>> On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan <ryannedo...@gmail.com>
>> wrote:
>>
>>> Ah, I see you are correct. Also I misspoke saying "workers" earlier, as
>>> the
>>> consumer is not created by the worker, but the task.
>>>
>>> I suppose the put() could be changed to putIfAbsent() here to enable this
>>> property to be changed. Maybe submit a PR?
>>>
>>> Ryanne
>>>
>>> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com>
>>> wrote:
>>>
>>> > Hmm  ( I did both )
>>> >
>>> > another->another_test.enabled = true
>>> >
>>> > another->another_test.topics = act_post
>>> >
>>> > another->another_test.emit.heartbeats.enabled = false
>>> >
>>> > another->another_test.consumer.auto.offset.reset = latest
>>> >
>>> > another->another_test.sync.topic.acls.enabled = false
>>> >
>>> > another.consumer.auto.offset.reset = latest
>>> >
>>> >
>>> >
>>> > When I grep for the ConsumerConfig ( and there are 8 instances, this
>>> topic
>>> > has 4 partitions )
>>> >
>>> >
>>> >  [2019-10-17 14:01:21,879] INFO ConsumerConfig values:
>>> >
>>> > allow.auto.create.topics = true
>>> >
>>> > auto.commit.interval.ms = 5000
>>> >
>>> > *auto.offset.reset* = earliest
>>> >
>>> >
>>> > I am now using the 2.4 branch from kafka trunk
>>> > https://github.com/apache/kafka/tree/2.4/connect/mirror
>>> >
>>> >
>>> > This code change works and makes sense.. I think all other settings
>>> will be
>>> > fine ( as can be overridden )  but for the 2 below..
>>> >
>>> >
>>> > *---
>>> >
>>> >
>>> a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
>>> >
>>> > *+++
>>> >
>>> >
>>> b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
>>> >
>>> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig extends
>>> > AbstractConfig {
>>> >
>>> >
>>> > props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
>>> >
>>> >          props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
>>> >
>>> >          props.put("enable.auto.commit", "false");
>>> >
>>> > -        props.put("auto.offset.reset", "earliest");
>>> >
>>> > +        props.put("auto.offset.reset", "latest");
>>> >
>>> >          return props;
>>> >
>>> >      }
>>> >
>>> >
>>> > Regards.
>>> >
>>> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan <ryannedo...@gmail.com>
>>> > wrote:
>>> >
>>> > > Vishal, you should be able to override the properties passed to the
>>> > > internal workers using properties like
>>> A->B.consumer.auto.offset.reset or
>>> > > A.consumer.auto.offset.reset in the mm2.properties file. Certain
>>> > top-level
>>> > > properties like tasks.max are honored without the A->B or A prefix,
>>> but
>>> > > auto.offset.reset is not one of them.
>>> > >
>>> > > Ryanne
>>> > >
>>> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi <
>>> > vishal.santo...@gmail.com
>>> > > >
>>> > > wrote:
>>> > >
>>> > > > Hey Ryanne,
>>> > > >
>>> > > >
>>> > > >     How do I override auto.offset.reset = latest for consumers
>>> through
>>> > > > mm2.properties. I have tried straight up . auto.offset.reset and
>>> > > consumer.
>>> > > > auto.offset.reset  but it defaults to earliest.. I do have a query
>>> in
>>> > > > another thread but though you might know off hand..
>>> > > >
>>> > > > I would imagine there is some way in general of overriding
>>> consumer and
>>> > > > producer configs through mm2.properties in MM2 ?
>>> > > >
>>> > > > Regards.
>>> > > >
>>> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi <
>>> > > vishal.santo...@gmail.com
>>> > > > >
>>> > > > wrote:
>>> > > >
>>> > > > > Thank you so much for all your help.  Will keep you posted on
>>> tests I
>>> > > > do..
>>> > > > > I hope this is helpful to other folks too..
>>> > > > >
>>> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan <
>>> ryannedo...@gmail.com>
>>> > > > > wrote:
>>> > > > >
>>> > > > >> That's right. MM2 is at-least-once for now, same as legacy
>>> > > MirrorMaker.
>>> > > > >> You
>>> > > > >> can follow https://issues.apache.org/jira/browse/KAFKA-6080 for
>>> > > updates
>>> > > > >> on
>>> > > > >> exactly-once semantics in Connect.
>>> > > > >>
>>> > > > >> Ryanne
>>> > > > >>
>>> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi <
>>> > > > >> vishal.santo...@gmail.com>
>>> > > > >> wrote:
>>> > > > >>
>>> > > > >> >   >> You are correct. I'm working on a KIP and PoC to
>>> introduce
>>> > > > >> > transactions to
>>> > > > >> > >> Connect for this exact purpose :)
>>> > > > >> >
>>> > > > >> > That is awesome. Any time frame ?
>>> > > > >> >
>>> > > > >> >
>>> > > > >> > In the mean time the SLA as of now
>>> > > > >> >
>>> > > > >> > 1. It is conceivable that we flush the producer to the target
>>> > > cluster
>>> > > > >> but
>>> > > > >> > fail to offset commit. If there was a restart before the next
>>> > > > successful
>>> > > > >> > offset commit, there will be  duplicates  and a part of data
>>> is
>>> > > > >> replayed (
>>> > > > >> > at least once ) ?
>>> > > > >> >
>>> > > > >> > 2. The same can be said about  partial flushes, though am not
>>> sure
>>> > > > about
>>> > > > >> > how kafka addresses flush ( Is a flush either success or a
>>> > failure,
>>> > > > and
>>> > > > >> > nothing in between )
>>> > > > >> >
>>> > > > >> > Thanks..
>>> > > > >> >
>>> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan <
>>> > > ryannedo...@gmail.com>
>>> > > > >> > wrote:
>>> > > > >> >
>>> > > > >> > > Hey Vishal, glad to hear you're making progress.
>>> > > > >> > >
>>> > > > >> > > > 1. It seems though that flushing [...] the producer and
>>> > setting
>>> > > > the
>>> > > > >> > > > offset to the compacting topic is not atomic  OR  do we
>>> use
>>> > > > >> > > > transactions here  ?
>>> > > > >> > >
>>> > > > >> > > You are correct. I'm working on a KIP and PoC to introduce
>>> > > > >> transactions
>>> > > > >> > to
>>> > > > >> > > Connect for this exact purpose :)
>>> > > > >> > >
>>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ), and I
>>> have
>>> > 2
>>> > > > >> topics
>>> > > > >> > > with
>>> > > > >> > > > 1 partition each. Do I assume this right, as in there are
>>> 4
>>> > > > consumer
>>> > > > >> > > groups
>>> > > > >> > > > ( on CG per thread ) ...
>>> > > > >> > >
>>> > > > >> > > Some details here:
>>> > > > >> > > - tasks.max controls the maximum number of tasks created per
>>> > > > Connector
>>> > > > >> > > instance. Both MirrorSourceConnector and
>>> > MirrorCheckpointConnector
>>> > > > >> will
>>> > > > >> > > create multiple tasks (up to tasks.max), but
>>> > > > MirrorHeartbeatConnector
>>> > > > >> > only
>>> > > > >> > > ever creates a single task. Moreover, there cannot be more
>>> tasks
>>> > > > than
>>> > > > >> > > topic-partitions (for MirrorSourceConnector) or consumer
>>> groups
>>> > > (for
>>> > > > >> > > MirrorCheckpointConnector). So if you have two topics with
>>> one
>>> > > > >> partition
>>> > > > >> > > each and 1 consumer group total, you'll have two
>>> > > > MirrorSourceConnector
>>> > > > >> > > tasks, one MirrorHeartbeatConnector task, and one
>>> > > > >> > MirrorCheckpointConnector
>>> > > > >> > > tasks, for a total of four. And that's in one direction
>>> only: if
>>> > > you
>>> > > > >> have
>>> > > > >> > > multiple source->target herders enabled, each will create
>>> tasks
>>> > > > >> > > independently.
>>> > > > >> > > - There are no consumer groups in MM2, technically. The
>>> Connect
>>> > > > >> framework
>>> > > > >> > > uses the Coordinator API and internal topics to divide tasks
>>> > among
>>> > > > >> > workers
>>> > > > >> > > -- not a consumer group per se. The MM2 connectors use the
>>> > > assign()
>>> > > > >> API,
>>> > > > >> > > not the subscribe() API, so there are no consumer groups
>>> there
>>> > > > >> either. In
>>> > > > >> > > fact, they don't commit() either. This is nice, as it
>>> > eliminates a
>>> > > > >> lot of
>>> > > > >> > > the rebalancing problems legacy MirrorMaker has been plagued
>>> > with.
>>> > > > >> With
>>> > > > >> > > MM2, rebalancing only occurs when the number of workers
>>> changes
>>> > or
>>> > > > >> when
>>> > > > >> > the
>>> > > > >> > > assignments change (e.g. new topics are discovered).
>>> > > > >> > >
>>> > > > >> > > Ryanne
>>> > > > >> > >
>>> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal Santoshi <
>>> > > > >> > > vishal.santo...@gmail.com>
>>> > > > >> > > wrote:
>>> > > > >> > >
>>> > > > >> > > > Hey Ryanne,
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > >            The test was on topics that had a 7 day
>>> retention.
>>> > > > Which
>>> > > > >> > > > generally implies that the batch size for flush is pretty
>>> > high (
>>> > > > >> till
>>> > > > >> > the
>>> > > > >> > > > consumption becomes current ). The
>>> offset.flush.timeout.ms
>>> > > > >> defaults
>>> > > > >> > to
>>> > > > >> > > 5
>>> > > > >> > > > seconds and the code will not send in the offsets if the
>>> flush
>>> > > is
>>> > > > >> not
>>> > > > >> > > > complete. Increasing that time out did solve the "not
>>> sending
>>> > > the
>>> > > > >> > offset
>>> > > > >> > > to
>>> > > > >> > > > topic" issue.
>>> > > > >> > > >
>>> > > > >> > > > Two questions ( I am being greedy here :) )
>>> > > > >> > > >
>>> > > > >> > > > 1. It seems though that flushing the flushing the
>>> producer and
>>> > > > >> setting
>>> > > > >> > > the
>>> > > > >> > > > offset to the compacting topic is not atomic  OR  do we
>>> use
>>> > > > >> > > > transactions here  ?
>>> > > > >> > > >
>>> > > > >> > > > 2. I see
>>> > > > >> > > >
>>> > > > >> > > >  WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing
>>> > 956435
>>> > > > >> > > >
>>> > > > >> > > >  WorkerSourceTask{id=MirrorSourceConnector-1} flushing
>>> 356251
>>> > > > >> > > >
>>> > > > >> > > >  WorkerSourceTask{id=MirrorCheckpointConnector-2}
>>> flushing 0
>>> > > > >> > > >
>>> > > > >> > > >  WorkerSourceTask{id=MirrorCheckpointConnector-3}
>>> flushing 0
>>> > > > >> > outstanding
>>> > > > >> > > > messages
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > > I think these are 4 threads ( b'coz num.tasks=4 ), and I
>>> have
>>> > 2
>>> > > > >> topics
>>> > > > >> > > with
>>> > > > >> > > > 1 partition each. Do I assume this right, as in there are
>>> 4
>>> > > > consumer
>>> > > > >> > > groups
>>> > > > >> > > > ( on CG per thread ) ...
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > > THANKS A LOT
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > > Vishal.
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan <
>>> > > > ryannedo...@gmail.com
>>> > > > >> >
>>> > > > >> > > > wrote:
>>> > > > >> > > >
>>> > > > >> > > > > >  timed out
>>> > > > >> > > > > while waiting for producer to flush outstanding
>>> > > > >> > > > >
>>> > > > >> > > > > Yeah, that's what I'd expect to see if Connect was
>>> unable to
>>> > > > send
>>> > > > >> > > records
>>> > > > >> > > > > to the downstream remote topics, e.g. if
>>> > min.in-sync.replicas
>>> > > > were
>>> > > > >> > > > > misconfigured. Given some data seems to arrive, it's
>>> > possible
>>> > > > that
>>> > > > >> > > > > everything is configured correctly but with too much
>>> latency
>>> > > to
>>> > > > >> > > > > successfully commit within the default timeouts. You may
>>> > want
>>> > > to
>>> > > > >> > > increase
>>> > > > >> > > > > the number of tasks substantially to achieve more
>>> > parallelism
>>> > > > and
>>> > > > >> > > > > throughput.
>>> > > > >> > > > >
>>> > > > >> > > > > Ryanne
>>> > > > >> > > > >
>>> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal Santoshi <
>>> > > > >> > > vishal.santo...@gmail.com
>>> > > > >> > > > >
>>> > > > >> > > > > wrote:
>>> > > > >> > > > >
>>> > > > >> > > > > > Aah no.. this is more to  it. Note sure if related to
>>> the
>>> > > > above.
>>> > > > >> > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114
>>> > > > >> > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > > > Is timing out based on
>>> > > > >> > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133
>>> > > > >> > > > > >
>>> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR
>>> > > > >> > > > > > WorkerSourceTask{id=MirrorSourceConnector-0} Failed to
>>> > > flush,
>>> > > > >> timed
>>> > > > >> > > out
>>> > > > >> > > > > > while waiting for producer to flush outstanding 36478
>>> > > messages
>>> > > > >> > > > > >
>>> (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
>>> > > > >> > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal Santoshi <
>>> > > > >> > > > > vishal.santo...@gmail.com
>>> > > > >> > > > > > >
>>> > > > >> > > > > > wrote:
>>> > > > >> > > > > >
>>> > > > >> > > > > > > I think this might be it.. Could you confirm. It
>>> seems
>>> > to
>>> > > be
>>> > > > >> on
>>> > > > >> > the
>>> > > > >> > > > > path
>>> > > > >> > > > > > > to commit the offsets.. but not sure...
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR Scheduler for
>>> > > > >> > MirrorSourceConnector
>>> > > > >> > > > > > caught
>>> > > > >> > > > > > > exception in scheduled task: syncing topic ACLs
>>> > > > >> > > > > > > (org.apache.kafka.connect.mirror.Scheduler:102)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > java.util.concurrent.ExecutionException:
>>> > > > >> > > > > > >
>>> > org.apache.kafka.common.errors.SecurityDisabledException:
>>> > > No
>>> > > > >> > > > Authorizer
>>> > > > >> > > > > > is
>>> > > > >> > > > > > > configured on the broker
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > >
>>> > > > >>
>>> > >
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >         at java.lang.Thread.run(Thread.java:748)
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > Caused by:
>>> > > > >> > > org.apache.kafka.common.errors.SecurityDisabledException:
>>> > > > >> > > > No
>>> > > > >> > > > > > > Authorizer is configured on the broker
>>> > > > >> > > > > > >
>>> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne Dolan <
>>> > > > >> > > ryannedo...@gmail.com
>>> > > > >> > > > >
>>> > > > >> > > > > > > wrote:
>>> > > > >> > > > > > >
>>> > > > >> > > > > > >> > I do not have a single record in the offsets
>>> topic
>>> > > > >> > > > > > >>
>>> > > > >> > > > > > >> That's definitely not normal. You are correct that
>>> > > without
>>> > > > >> > records
>>> > > > >> > > > in
>>> > > > >> > > > > > that
>>> > > > >> > > > > > >> topic, MM2 will restart from EARLIEST. The offsets
>>> > should
>>> > > > be
>>> > > > >> > > stored
>>> > > > >> > > > > > >> periodically and whenever the connectors gracefully
>>> > > > shutdown
>>> > > > >> or
>>> > > > >> > > > > restart.
>>> > > > >> > > > > > >>
>>> > > > >> > > > > > >> Is it possible the topics don't have required ACLs
>>> or
>>> > > > >> something?
>>> > > > >> > > > Also
>>> > > > >> > > > > > >> note:
>>> > > > >> > > > > > >> Connect wants the offsets topic to have a large
>>> number
>>> > of
>>> > > > >> > > partitions
>>> > > > >> > > > > and
>>> > > > >> > > > > > >> to
>>> > > > >> > > > > > >> be compacted. Though I can't imagine either would
>>> > prevent
>>> > > > >> > commits
>>> > > > >> > > > from
>>> > > > >> > > > > > >> being sent.
>>> > > > >> > > > > > >>
>>> > > > >> > > > > > >> Ryanne
>>> > > > >> > > > > > >>
>>> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal Santoshi <
>>> > > > >> > > > > > >> vishal.santo...@gmail.com>
>>> > > > >> > > > > > >> wrote:
>>> > > > >> > > > > > >>
>>> > > > >> > > > > > >> > 2nd/restore issue  ( I think I need to solve the
>>> > > offsets
>>> > > > >> topic
>>> > > > >> > > > issue
>>> > > > >> > > > > > >> > before I go with the scale up and down issue )
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > As you had indicated, I went ahead and created
>>> the
>>> > > > offsets
>>> > > > >> > > topic.
>>> > > > >> > > > > The
>>> > > > >> > > > > > >> > status of the cluster  ( destination ) is thus
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers Spread %Brokers
>>> Skew
>>> > > > >> %Brokers
>>> > > > >> > > > > Leader
>>> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated %Leader
>>> SizeProducer
>>> > > > >> > > > > > Message/SecSummed
>>> > > > >> > > > > > >> > Recent Offsets
>>> > > > >> > > > > > >> > s8k.checkpoints.internal
>>> > > > >> > > > > > >> > <
>>> > > > >> > > > > > >>
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>> > > > >> > > > > > >> > s8k.act_search_page
>>> > > > >> > > > > > >> > <
>>> > > > >> > > > > > >>
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842
>>> > > > >> > > > > > >> > s8k.act_reach
>>> > > > >> > > > > > >> > <
>>> > > > >> > > > > > >>
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529
>>> > > > >> > > > > > >> > mm2-status.s8k.internal
>>> > > > >> > > > > > >> > <
>>> > > > >> > > > > > >>
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10
>>> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal
>>> > > > >> > > > > > >> > <
>>> > > > >> > > > > > >>
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>> > > > >> > > > > > >> > mm2-offsets.s8k.internal
>>> > > > >> > > > > > >> > <
>>> > > > >> > > > > > >>
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>> > > > >> > > > > > >> > mm2-configs.s8k.internal
>>> > > > >> > > > > > >> > <
>>> > > > >> > > > > > >>
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > You can see . that we have the  5 ( I created
>>> bot the
>>> > > > >> offsets,
>>> > > > >> > > to
>>> > > > >> > > > be
>>> > > > >> > > > > > >> safe
>>> > > > >> > > > > > >> > for the below )
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *clusters = s8k, s8k_test*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *s8k_test.bootstrap.servers = ......*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *# only allow replication dr1 -> dr2*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *s8k->s8k_test.topics =
>>> act_search_page|act_reach*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *s8k->s8k_test.emit.heartbeats.enabled = false*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *s8k_test->s8k.emit.heartbeats.enabled = false*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *s8k_test.replication.factor = 3*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *s8k.replication.factor = 3*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *offsets.storage.replication.factor = 3*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *replication.factor = 3*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *replication.policy.separator = .*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > *tasks.max = 4*
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > What seems strange is that I do not have a single
>>> > > record
>>> > > > in
>>> > > > >> > the
>>> > > > >> > > > > > offsets
>>> > > > >> > > > > > >> > topic.. Is that normal ?   I would imagine that
>>> > > without a
>>> > > > >> > > record,
>>> > > > >> > > > > > there
>>> > > > >> > > > > > >> is
>>> > > > >> > > > > > >> > no way that a restore would happen.... And that
>>> is
>>> > > > obvious
>>> > > > >> > when
>>> > > > >> > > I
>>> > > > >> > > > > > >> restart
>>> > > > >> > > > > > >> > the mm2 instance... Find the screenshot
>>> attached. In
>>> > > > >> essence
>>> > > > >> > the
>>> > > > >> > > > > > latency
>>> > > > >> > > > > > >> > avg lag is reset \when the mm2 instance is reset
>>> > > > >> indicating no
>>> > > > >> > > > > restore
>>> > > > >> > > > > > >> but
>>> > > > >> > > > > > >> > restart from EARLIEST... I must be missing some
>>> thing
>>> > > > >> simple
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM Ryanne Dolan <
>>> > > > >> > > > ryannedo...@gmail.com
>>> > > > >> > > > > >
>>> > > > >> > > > > > >> > wrote:
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >> >> Vishal, the first issue is easy: you must set
>>> > > tasks.max
>>> > > > to
>>> > > > >> > > > > something
>>> > > > >> > > > > > >> above
>>> > > > >> > > > > > >> >> 1 (the default) in order to achieve any
>>> parallelism.
>>> > > > This
>>> > > > >> > > > property
>>> > > > >> > > > > is
>>> > > > >> > > > > > >> >> passed along to the internal Connect workers.
>>> It's
>>> > > > >> > unfortunate
>>> > > > >> > > > that
>>> > > > >> > > > > > >> >> Connect
>>> > > > >> > > > > > >> >> is not smart enough to default this property to
>>> the
>>> > > > >> number of
>>> > > > >> > > > > > workers.
>>> > > > >> > > > > > >> I
>>> > > > >> > > > > > >> >> suspect that will improve before long.
>>> > > > >> > > > > > >> >>
>>> > > > >> > > > > > >> >> For the second issue, is it possible you are
>>> missing
>>> > > the
>>> > > > >> > > offsets
>>> > > > >> > > > > > >> topic? It
>>> > > > >> > > > > > >> >> should exist alongside the config and status
>>> topics.
>>> > > > >> Connect
>>> > > > >> > > > should
>>> > > > >> > > > > > >> create
>>> > > > >> > > > > > >> >> this topic, but there are various reasons this
>>> can
>>> > > fail,
>>> > > > >> e.g.
>>> > > > >> > > if
>>> > > > >> > > > > the
>>> > > > >> > > > > > >> >> replication factor is misconfigured. You can try
>>> > > > creating
>>> > > > >> > this
>>> > > > >> > > > > topic
>>> > > > >> > > > > > >> >> manually or changing
>>> > > offsets.storage.replication.factor.
>>> > > > >> > > > > > >> >>
>>> > > > >> > > > > > >> >> Ryanne
>>> > > > >> > > > > > >> >>
>>> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal Santoshi <
>>> > > > >> > > > > > >> vishal.santo...@gmail.com>
>>> > > > >> > > > > > >> >> wrote:
>>> > > > >> > > > > > >> >>
>>> > > > >> > > > > > >> >> > Using
>>> > > > >> > > > https://github.com/apache/kafka/tree/trunk/connect/mirror
>>> > > > >> > > > > > as a
>>> > > > >> > > > > > >> >> > guide,
>>> > > > >> > > > > > >> >> > I have build from source the origin/KIP-382 of
>>> > > > >> > > > > > >> >> > https://github.com/apache/kafka.git.
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > I am seeing 2 issues
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > * I brought up 2 processes on 2 different
>>> nodes (
>>> > > they
>>> > > > >> are
>>> > > > >> > > > > actually
>>> > > > >> > > > > > >> >> pods on
>>> > > > >> > > > > > >> >> > k8s but that should not matter ). They share
>>> the
>>> > > > >> > > mm2.properties
>>> > > > >> > > > > > file
>>> > > > >> > > > > > >> and
>>> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3 topics with 8
>>> > partitions
>>> > > > in
>>> > > > >> > > total.
>>> > > > >> > > > > > That
>>> > > > >> > > > > > >> >> seems
>>> > > > >> > > > > > >> >> > to be the way to create a standalone mm2
>>> cluster.
>>> > I
>>> > > do
>>> > > > >> not
>>> > > > >> > > > > however
>>> > > > >> > > > > > >> see(
>>> > > > >> > > > > > >> >> at
>>> > > > >> > > > > > >> >> > least the mbeans do not show ) any attempt to
>>> > > > rebalance.
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >>
>>> > > > >> > > > > > >>
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > >
>>> > >
>>> >
>>> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process
>>> > > > >> > > > > > >> >> > mbeans
>>> > > > >> > > > > > >> >> > are all on a single node
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > * I restart the processes on the 2 nodes (
>>> hard
>>> > stop
>>> > > > ans
>>> > > > >> > > start
>>> > > > >> > > > ).
>>> > > > >> > > > > > The
>>> > > > >> > > > > > >> >> > offsets for replication seem to be reset to
>>> the
>>> > > > >> earliest,
>>> > > > >> > as
>>> > > > >> > > if
>>> > > > >> > > > > it
>>> > > > >> > > > > > >> is a
>>> > > > >> > > > > > >> >> > brand new mirroring. It is also obvious from
>>> the
>>> > > > >> > > > > > >> >> > "record-age-ms-avg|replication-latency-ms-avg"
>>> > > > >> > > > > > >> >> > which I track through the restart.
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > This implies that
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing is not
>>> working. I
>>> > > > >> cannot
>>> > > > >> > > scale
>>> > > > >> > > > > up
>>> > > > >> > > > > > or
>>> > > > >> > > > > > >> >> down
>>> > > > >> > > > > > >> >> > by adding nodes to the mm2 cluster or removing
>>> > them.
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > 2. Restore on a mirror is not working. If the
>>> MM2
>>> > > > >> cluster
>>> > > > >> > is
>>> > > > >> > > > > > brought
>>> > > > >> > > > > > >> >> down,
>>> > > > >> > > > > > >> >> > it does not start mirroring from the last
>>> known
>>> > > > state. I
>>> > > > >> > see
>>> > > > >> > > > the,
>>> > > > >> > > > > > >> >> > state/config topics etc created as expected..
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > The mm2.properties is pretty mimimal
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > *clusters = a , b*
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....*
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....*
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > *# only allow replication dr1 -> dr2*
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > *a->b.enabled = true*
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > *a->b.topics = act_search_page*
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled = false*
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > *b->a..enabled = false*
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled = false*
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > What do you think is the issue ?
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >> > Thanks
>>> > > > >> > > > > > >> >> >
>>> > > > >> > > > > > >> >>
>>> > > > >> > > > > > >> >
>>> > > > >> > > > > > >>
>>> > > > >> > > > > > >
>>> > > > >> > > > > >
>>> > > > >> > > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to