I might have created a build from the trunk, rather then the 2.4 branch ,
but will confirm.

On Thu, Oct 24, 2019 at 4:44 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> The above may not be an issue as in  it just uses  the returned class
> loader to resolve the Connector I think .  What is not obvious, why it does
> not go ahead and consume ..
>
> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,571] INFO refreshing known
> target topics took 15 ms (org.apache.kafka.connect.mirror.Scheduler:95)
>
> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,574] INFO Started
> MirrorSourceConnector with 120 topic-partitions.
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:121)
>
> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,574] INFO Starting
> MirrorSourceConnector took 160 ms.
> (org.apache.kafka.connect.mirror.MirrorSourceConnector:122)
>
> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,577] INFO Finished
> creating connector MirrorSourceConnector
> (org.apache.kafka.connect.runtime.Worker:272)
>
> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,578] ERROR Plugin class
> loader for connector:
> 'org.apache.kafka.connect.mirror.MirrorSourceConnector' was not found.
> Returning:
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2262b621
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
>
> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,579] INFO
> SourceConnectorConfig values:
>
> [mm2-dev-749469cf68-vpm2l] config.action.reload = restart
>
> [mm2-dev-749469cf68-vpm2l] connector.class =
> org.apache.kafka.connect.mirror.MirrorSourceConnector
>
> [mm2-dev-749469cf68-vpm2l] errors.log.enable = false
>
> [mm2-dev-749469cf68-vpm2l] errors.log.include.messages = false
>
> [mm2-dev-749469cf68-vpm2l] errors.retry.delay.max.ms = 60000
>
> [mm2-dev-749469cf68-vpm2l] errors.retry.timeout = 0
>
> [mm2-dev-749469cf68-vpm2l] errors.tolerance = none
>
> [mm2-dev-749469cf68-vpm2l] header.converter = null
>
> [mm2-dev-749469cf68-vpm2l] key.converter = null
>
> [mm2-dev-749469cf68-vpm2l] name = MirrorSourceConnector
>
> [mm2-dev-749469cf68-vpm2l] tasks.max = 48
>
> [mm2-dev-749469cf68-vpm2l] transforms = []
>
> [mm2-dev-749469cf68-vpm2l] value.converter = null
>
> [mm2-dev-749469cf68-vpm2l]
> (org.apache.kafka.connect.runtime.SourceConnectorConfig:347)
>
> [mm2-dev-749469cf68-vpm2l] [2019-10-24 20:26:07,580] INFO
> EnrichedConnectorConfig values:
>
> [mm2-dev-749469cf68-vpm2l] config.action.reload = restart
>
> [mm2-dev-749469cf68-vpm2l] connector.class =
> org.apache.kafka.connect.mirror.MirrorSourceConnector
>
> [mm2-dev-749469cf68-vpm2l] errors.log.enable = false
>
> [mm2-dev-749469cf68-vpm2l] errors.log.include.messages = false
>
> [mm2-dev-749469cf68-vpm2l] errors.retry.delay.max.ms = 60000
>
> [mm2-dev-749469cf68-vpm2l] errors.retry.timeout = 0
>
> [mm2-dev-749469cf68-vpm2l] errors.tolerance = none
>
> [mm2-dev-749469cf68-vpm2l] header.converter = null
>
> [mm2-dev-749469cf68-vpm2l] key.converter = null
>
> [mm2-dev-749469cf68-vpm2l] name = MirrorSourceConnector
>
> [mm2-dev-749469cf68-vpm2l] tasks.max = 48
>
> [mm2-dev-749469cf68-vpm2l] transforms = []
>
> [mm2-dev-749469cf68-vpm2l] value.converter = null
>
> [mm2-dev-749469cf68-vpm2l]
> (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347)
>
>
>
> And from there on nothing..
>
>
>
>
> On Thu, Oct 24, 2019 at 3:02 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> Hey Ryanne,
>>
>>            Seeing the below ERROR in the logs  and then, it seems the
>> process does not consume ( it does not exit with any errors ) . And this is
>> intermittent. As in do it enough times. that does relaunch :)  Is this
>> something a known bug
>>
>>          [mm2-dev-58bf5df684-ln9k2] [2019-10-24 18:41:03,067] ERROR
>> Plugin class loader for connector:
>> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found.
>> Returning:
>> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2262b621
>> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
>>
>>
>>
>>
>>
>>
>> On Mon, Oct 21, 2019 at 5:16 PM Ryanne Dolan <ryannedo...@gmail.com>
>> wrote:
>>
>>> Vishal, the number of tasks created per source->target herder is
>>> determined
>>> by both tasks.max and the total number of topic-partitions being
>>> replicated. In order to use all 12 worker nodes, you'd need tasks.max >=
>>> 12
>>> and number of topic-partitions >= 12. From previous emails it sounds like
>>> you have a small number of topic-partitions total (i.e. a small number of
>>> topics with a small number of partitions per topic), so I'm guessing
>>> that's
>>> the reason you aren't seeing more tasks being created.
>>>
>>> Ryanne
>>>
>>> On Sat, Oct 19, 2019 at 1:28 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com>
>>> wrote:
>>>
>>> > Here is what I see
>>> >
>>> > * The max tasks are a a cap  on a Connector across the cluster. If
>>> have 8
>>> > VMs but 8 max tasks my assumption  that there would be 8 * 8 = 72 task
>>> > threads was
>>> > wring. The logs showed that the partitions were consumed by  8 threads
>>> on
>>> > the 8 VMs ( 1 per VM ) which was highly un optimal.  When I scaled the
>>> > VMs to 12, it did not matter, as the max tasks still prevented any
>>> further
>>> > distribution.
>>> >
>>> > *  If I cancel/resume the cluster with a max task of 48 ( keeping the
>>> same
>>> > job name and thus connector definition   the max tasks does not
>>> change, as
>>> > in
>>> >  it seems to keep the same number of max task  threads limit ( as in 8
>>> )
>>> >
>>> > * I can bring down a VM and see the task migrate to a free VM but the
>>> > overall count of task threads remain the same.
>>> >
>>> >
>>> > In essence, the num of tasks is a cap on threads in the cluster per
>>> > connector, A connector is a source->sink pair that spans a cluster.
>>> Thus if
>>> > we have a
>>> > A->B DAG and  max tasks of 8, then there will be no more that 8 Source
>>> > Tasks  ( threads ) no matter how big the cluster is, It thus makes
>>> sense to
>>> > over provision ( within limits of a single VM ) on the max tasks to
>>> allow
>>> > for adding more VMs for scale up.....
>>> >
>>> >
>>> >
>>> > On Fri, Oct 18, 2019 at 8:04 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com
>>> > >
>>> > wrote:
>>> >
>>> > > I misspoke
>>> > >
>>> > > >> I now have 8 VMs 8 cpus with 48 max tasks and it did spread to
>>> the the
>>> > > 8  VMs. I then upscaled to 12 VMs and the tasks *have not *migrated
>>> as I
>>> > > would expect .
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > On Fri, Oct 18, 2019 at 8:00 PM Vishal Santoshi <
>>> > vishal.santo...@gmail.com>
>>> > > wrote:
>>> > >
>>> > >> OK, You will have to explain :)
>>> > >>
>>> > >> I had 12 VMs with 8 cpus and 8 max tasks.  I thought let me give a
>>> CPU
>>> > to
>>> > >> each task, which I presumed is a java thread ( even though I know
>>> the
>>> > >> thread would be mostly ip bound ). . I saw the issue I pointed up.
>>> > >> *I now have 8 VMs 8 cpus with 48 max tasks and it did spread to the
>>> the
>>> > >> 8  VMs. I then upscaled to 12 VMs and the tasks migrated as I would
>>> > expect
>>> > >> .*
>>> > >>
>>> > >>  I know that a VM will have MirrorSourceConnector and
>>> > >> MirrorHeartbeatConnector tasks up till  tasks.max.  So a few
>>> questions
>>> > >>
>>> > >>
>>> > >>
>>> > >> * When we say there are 48 max tasks, are we saying there are  48
>>> > threads
>>> > >> ( in fact 96, each for the 2 groups above,  worst case  + 2 ) ?
>>> > >> * When we talk about Connector, are we talking about a JVM process,
>>> as
>>> > in
>>> > >> a Connector is a JVM process ?
>>> > >> * Why larger number of tasks.max help the spread  ?  As in I would
>>> > assume
>>> > >> there are up till 8 tasks ( or 16 )  per VM but how that should not
>>> have
>>> > >> prevented  re assignment  on a scale up ( as it clearly did ) ?
>>> > >>
>>> > >> The reason I ask is that I plan to run mm2 cluster on  k8s and I
>>> want to
>>> > >> make sure that I use the version of JVM that is more docker friendly
>>> > vis a
>>> > >> vis, how many cpus it believes it has  and as explained here
>>> > >>
>>> >
>>> https://blog.softwaremill.com/docker-support-in-new-java-8-finally-fd595df0ca54
>>> > >>
>>> > >>
>>> > >>
>>> > >>
>>> > >> On Fri, Oct 18, 2019 at 4:15 PM Ryanne Dolan <ryannedo...@gmail.com
>>> >
>>> > >> wrote:
>>> > >>
>>> > >>> What is tasks.max? Consider bumping to something like 48 if you're
>>> > >>> running
>>> > >>> on a dozen nodes.
>>> > >>>
>>> > >>> Ryanne
>>> > >>>
>>> > >>> On Fri, Oct 18, 2019, 1:43 PM Vishal Santoshi <
>>> > vishal.santo...@gmail.com
>>> > >>> >
>>> > >>> wrote:
>>> > >>>
>>> > >>> > Hey Ryanne,
>>> > >>> >
>>> > >>> >
>>> > >>> >             I see a definite issue. I am doing an intense test
>>> and I
>>> > >>> bring
>>> > >>> > up 12 VMs ( they are 12 pods with 8 cpus each ), replicating
>>> about
>>> > 1200
>>> > >>> > plus topics ( fairly heavy 100mbps ) ... They are acquired and
>>> are
>>> > >>> > staggered as they come up..I see a fraction of these nodes not
>>> > >>> assigned any
>>> > >>> > replication....There is plenty to go around. ( more then a
>>> couple of
>>> > >>> > thousand partitions ) .   is there something I am missing.... As
>>> in
>>> > my
>>> > >>> > current case 5 of the 12 VMs are idle..
>>> > >>> >
>>> > >>> > Vishal
>>> > >>> >
>>> > >>> > On Fri, Oct 18, 2019 at 7:05 AM Vishal Santoshi <
>>> > >>> vishal.santo...@gmail.com
>>> > >>> > >
>>> > >>> > wrote:
>>> > >>> >
>>> > >>> > > Oh sorry a. COUNTER... is more like it....
>>> > >>> > >
>>> > >>> > > On Fri, Oct 18, 2019, 6:58 AM Vishal Santoshi <
>>> > >>> vishal.santo...@gmail.com
>>> > >>> > >
>>> > >>> > > wrote:
>>> > >>> > >
>>> > >>> > >> Will do
>>> > >>> > >>     One more thing the age/latency metrics seem to be
>>> analogous as
>>> > >>> in
>>> > >>> > >> they seem to be calculated using similar routines.  I would
>>> think
>>> > a
>>> > >>> > metric
>>> > >>> > >> tracking
>>> > >>> > >> the number of flush failures ( as a GAUGE )  given
>>> > >>> > >> offset.flush.timeout.ms would be highly beneficial.
>>> > >>> > >>
>>> > >>> > >> Regards..
>>> > >>> > >>
>>> > >>> > >>
>>> > >>> > >> On Thu, Oct 17, 2019 at 11:53 PM Ryanne Dolan <
>>> > >>> ryannedo...@gmail.com>
>>> > >>> > >> wrote:
>>> > >>> > >>
>>> > >>> > >>> Ah, I see you are correct. Also I misspoke saying "workers"
>>> > >>> earlier, as
>>> > >>> > >>> the
>>> > >>> > >>> consumer is not created by the worker, but the task.
>>> > >>> > >>>
>>> > >>> > >>> I suppose the put() could be changed to putIfAbsent() here to
>>> > >>> enable
>>> > >>> > this
>>> > >>> > >>> property to be changed. Maybe submit a PR?
>>> > >>> > >>>
>>> > >>> > >>> Ryanne
>>> > >>> > >>>
>>> > >>> > >>> On Thu, Oct 17, 2019 at 10:00 AM Vishal Santoshi <
>>> > >>> > >>> vishal.santo...@gmail.com>
>>> > >>> > >>> wrote:
>>> > >>> > >>>
>>> > >>> > >>> > Hmm  ( I did both )
>>> > >>> > >>> >
>>> > >>> > >>> > another->another_test.enabled = true
>>> > >>> > >>> >
>>> > >>> > >>> > another->another_test.topics = act_post
>>> > >>> > >>> >
>>> > >>> > >>> > another->another_test.emit.heartbeats.enabled = false
>>> > >>> > >>> >
>>> > >>> > >>> > another->another_test.consumer.auto.offset.reset = latest
>>> > >>> > >>> >
>>> > >>> > >>> > another->another_test.sync.topic.acls.enabled = false
>>> > >>> > >>> >
>>> > >>> > >>> > another.consumer.auto.offset.reset = latest
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>> > When I grep for the ConsumerConfig ( and there are 8
>>> instances,
>>> > >>> this
>>> > >>> > >>> topic
>>> > >>> > >>> > has 4 partitions )
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>> >  [2019-10-17 14:01:21,879] INFO ConsumerConfig values:
>>> > >>> > >>> >
>>> > >>> > >>> > allow.auto.create.topics = true
>>> > >>> > >>> >
>>> > >>> > >>> > auto.commit.interval.ms = 5000
>>> > >>> > >>> >
>>> > >>> > >>> > *auto.offset.reset* = earliest
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>> > I am now using the 2.4 branch from kafka trunk
>>> > >>> > >>> > https://github.com/apache/kafka/tree/2.4/connect/mirror
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>> > This code change works and makes sense.. I think all other
>>> > >>> settings
>>> > >>> > >>> will be
>>> > >>> > >>> > fine ( as can be overridden )  but for the 2 below..
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>> > *---
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
>>> > >>> > >>> >
>>> > >>> > >>> > *+++
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java*
>>> > >>> > >>> >
>>> > >>> > >>> > @@ -230,7 +230,7 @@ public class MirrorConnectorConfig
>>> extends
>>> > >>> > >>> > AbstractConfig {
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> >
>>> > props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
>>> > >>> > >>> >
>>> > >>> > >>> >          props.put("enable.auto.commit", "false");
>>> > >>> > >>> >
>>> > >>> > >>> > -        props.put("auto.offset.reset", "earliest");
>>> > >>> > >>> >
>>> > >>> > >>> > +        props.put("auto.offset.reset", "latest");
>>> > >>> > >>> >
>>> > >>> > >>> >          return props;
>>> > >>> > >>> >
>>> > >>> > >>> >      }
>>> > >>> > >>> >
>>> > >>> > >>> >
>>> > >>> > >>> > Regards.
>>> > >>> > >>> >
>>> > >>> > >>> > On Wed, Oct 16, 2019 at 3:36 PM Ryanne Dolan <
>>> > >>> ryannedo...@gmail.com>
>>> > >>> > >>> > wrote:
>>> > >>> > >>> >
>>> > >>> > >>> > > Vishal, you should be able to override the properties
>>> passed
>>> > >>> to the
>>> > >>> > >>> > > internal workers using properties like
>>> > >>> > >>> A->B.consumer.auto.offset.reset or
>>> > >>> > >>> > > A.consumer.auto.offset.reset in the mm2.properties file.
>>> > >>> Certain
>>> > >>> > >>> > top-level
>>> > >>> > >>> > > properties like tasks.max are honored without the A->B
>>> or A
>>> > >>> prefix,
>>> > >>> > >>> but
>>> > >>> > >>> > > auto.offset.reset is not one of them.
>>> > >>> > >>> > >
>>> > >>> > >>> > > Ryanne
>>> > >>> > >>> > >
>>> > >>> > >>> > > On Wed, Oct 16, 2019 at 9:13 AM Vishal Santoshi <
>>> > >>> > >>> > vishal.santo...@gmail.com
>>> > >>> > >>> > > >
>>> > >>> > >>> > > wrote:
>>> > >>> > >>> > >
>>> > >>> > >>> > > > Hey Ryanne,
>>> > >>> > >>> > > >
>>> > >>> > >>> > > >
>>> > >>> > >>> > > >     How do I override auto.offset.reset = latest for
>>> > >>> consumers
>>> > >>> > >>> through
>>> > >>> > >>> > > > mm2.properties. I have tried straight up .
>>> > auto.offset.reset
>>> > >>> and
>>> > >>> > >>> > > consumer.
>>> > >>> > >>> > > > auto.offset.reset  but it defaults to earliest.. I do
>>> have
>>> > a
>>> > >>> > query
>>> > >>> > >>> in
>>> > >>> > >>> > > > another thread but though you might know off hand..
>>> > >>> > >>> > > >
>>> > >>> > >>> > > > I would imagine there is some way in general of
>>> overriding
>>> > >>> > >>> consumer and
>>> > >>> > >>> > > > producer configs through mm2.properties in MM2 ?
>>> > >>> > >>> > > >
>>> > >>> > >>> > > > Regards.
>>> > >>> > >>> > > >
>>> > >>> > >>> > > > On Tue, Oct 15, 2019 at 3:44 PM Vishal Santoshi <
>>> > >>> > >>> > > vishal.santo...@gmail.com
>>> > >>> > >>> > > > >
>>> > >>> > >>> > > > wrote:
>>> > >>> > >>> > > >
>>> > >>> > >>> > > > > Thank you so much for all your help.  Will keep you
>>> > posted
>>> > >>> on
>>> > >>> > >>> tests I
>>> > >>> > >>> > > > do..
>>> > >>> > >>> > > > > I hope this is helpful to other folks too..
>>> > >>> > >>> > > > >
>>> > >>> > >>> > > > > On Tue, Oct 15, 2019 at 2:44 PM Ryanne Dolan <
>>> > >>> > >>> ryannedo...@gmail.com>
>>> > >>> > >>> > > > > wrote:
>>> > >>> > >>> > > > >
>>> > >>> > >>> > > > >> That's right. MM2 is at-least-once for now, same as
>>> > legacy
>>> > >>> > >>> > > MirrorMaker.
>>> > >>> > >>> > > > >> You
>>> > >>> > >>> > > > >> can follow
>>> > >>> https://issues.apache.org/jira/browse/KAFKA-6080
>>> > >>> > for
>>> > >>> > >>> > > updates
>>> > >>> > >>> > > > >> on
>>> > >>> > >>> > > > >> exactly-once semantics in Connect.
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > > >> Ryanne
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > > >> On Tue, Oct 15, 2019 at 1:24 PM Vishal Santoshi <
>>> > >>> > >>> > > > >> vishal.santo...@gmail.com>
>>> > >>> > >>> > > > >> wrote:
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > > >> >   >> You are correct. I'm working on a KIP and
>>> PoC to
>>> > >>> > >>> introduce
>>> > >>> > >>> > > > >> > transactions to
>>> > >>> > >>> > > > >> > >> Connect for this exact purpose :)
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >> > That is awesome. Any time frame ?
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >> > In the mean time the SLA as of now
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >> > 1. It is conceivable that we flush the producer
>>> to the
>>> > >>> > target
>>> > >>> > >>> > > cluster
>>> > >>> > >>> > > > >> but
>>> > >>> > >>> > > > >> > fail to offset commit. If there was a restart
>>> before
>>> > the
>>> > >>> > next
>>> > >>> > >>> > > > successful
>>> > >>> > >>> > > > >> > offset commit, there will be  duplicates  and a
>>> part
>>> > of
>>> > >>> data
>>> > >>> > >>> is
>>> > >>> > >>> > > > >> replayed (
>>> > >>> > >>> > > > >> > at least once ) ?
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >> > 2. The same can be said about  partial flushes,
>>> though
>>> > >>> am
>>> > >>> > not
>>> > >>> > >>> sure
>>> > >>> > >>> > > > about
>>> > >>> > >>> > > > >> > how kafka addresses flush ( Is a flush either
>>> success
>>> > >>> or a
>>> > >>> > >>> > failure,
>>> > >>> > >>> > > > and
>>> > >>> > >>> > > > >> > nothing in between )
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >> > Thanks..
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >> > On Tue, Oct 15, 2019 at 12:34 PM Ryanne Dolan <
>>> > >>> > >>> > > ryannedo...@gmail.com>
>>> > >>> > >>> > > > >> > wrote:
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >> > > Hey Vishal, glad to hear you're making progress.
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> > > > 1. It seems though that flushing [...] the
>>> > producer
>>> > >>> and
>>> > >>> > >>> > setting
>>> > >>> > >>> > > > the
>>> > >>> > >>> > > > >> > > > offset to the compacting topic is not atomic
>>> OR
>>> > >>> do we
>>> > >>> > >>> use
>>> > >>> > >>> > > > >> > > > transactions here  ?
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> > > You are correct. I'm working on a KIP and PoC to
>>> > >>> introduce
>>> > >>> > >>> > > > >> transactions
>>> > >>> > >>> > > > >> > to
>>> > >>> > >>> > > > >> > > Connect for this exact purpose :)
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz
>>> num.tasks=4 ),
>>> > >>> and I
>>> > >>> > >>> have
>>> > >>> > >>> > 2
>>> > >>> > >>> > > > >> topics
>>> > >>> > >>> > > > >> > > with
>>> > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as
>>> in
>>> > >>> there
>>> > >>> > are
>>> > >>> > >>> 4
>>> > >>> > >>> > > > consumer
>>> > >>> > >>> > > > >> > > groups
>>> > >>> > >>> > > > >> > > > ( on CG per thread ) ...
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> > > Some details here:
>>> > >>> > >>> > > > >> > > - tasks.max controls the maximum number of tasks
>>> > >>> created
>>> > >>> > per
>>> > >>> > >>> > > > Connector
>>> > >>> > >>> > > > >> > > instance. Both MirrorSourceConnector and
>>> > >>> > >>> > MirrorCheckpointConnector
>>> > >>> > >>> > > > >> will
>>> > >>> > >>> > > > >> > > create multiple tasks (up to tasks.max), but
>>> > >>> > >>> > > > MirrorHeartbeatConnector
>>> > >>> > >>> > > > >> > only
>>> > >>> > >>> > > > >> > > ever creates a single task. Moreover, there
>>> cannot
>>> > be
>>> > >>> more
>>> > >>> > >>> tasks
>>> > >>> > >>> > > > than
>>> > >>> > >>> > > > >> > > topic-partitions (for MirrorSourceConnector) or
>>> > >>> consumer
>>> > >>> > >>> groups
>>> > >>> > >>> > > (for
>>> > >>> > >>> > > > >> > > MirrorCheckpointConnector). So if you have two
>>> > topics
>>> > >>> with
>>> > >>> > >>> one
>>> > >>> > >>> > > > >> partition
>>> > >>> > >>> > > > >> > > each and 1 consumer group total, you'll have two
>>> > >>> > >>> > > > MirrorSourceConnector
>>> > >>> > >>> > > > >> > > tasks, one MirrorHeartbeatConnector task, and
>>> one
>>> > >>> > >>> > > > >> > MirrorCheckpointConnector
>>> > >>> > >>> > > > >> > > tasks, for a total of four. And that's in one
>>> > >>> direction
>>> > >>> > >>> only: if
>>> > >>> > >>> > > you
>>> > >>> > >>> > > > >> have
>>> > >>> > >>> > > > >> > > multiple source->target herders enabled, each
>>> will
>>> > >>> create
>>> > >>> > >>> tasks
>>> > >>> > >>> > > > >> > > independently.
>>> > >>> > >>> > > > >> > > - There are no consumer groups in MM2,
>>> technically.
>>> > >>> The
>>> > >>> > >>> Connect
>>> > >>> > >>> > > > >> framework
>>> > >>> > >>> > > > >> > > uses the Coordinator API and internal topics to
>>> > divide
>>> > >>> > tasks
>>> > >>> > >>> > among
>>> > >>> > >>> > > > >> > workers
>>> > >>> > >>> > > > >> > > -- not a consumer group per se. The MM2
>>> connectors
>>> > >>> use the
>>> > >>> > >>> > > assign()
>>> > >>> > >>> > > > >> API,
>>> > >>> > >>> > > > >> > > not the subscribe() API, so there are no
>>> consumer
>>> > >>> groups
>>> > >>> > >>> there
>>> > >>> > >>> > > > >> either. In
>>> > >>> > >>> > > > >> > > fact, they don't commit() either. This is nice,
>>> as
>>> > it
>>> > >>> > >>> > eliminates a
>>> > >>> > >>> > > > >> lot of
>>> > >>> > >>> > > > >> > > the rebalancing problems legacy MirrorMaker has
>>> been
>>> > >>> > plagued
>>> > >>> > >>> > with.
>>> > >>> > >>> > > > >> With
>>> > >>> > >>> > > > >> > > MM2, rebalancing only occurs when the number of
>>> > >>> workers
>>> > >>> > >>> changes
>>> > >>> > >>> > or
>>> > >>> > >>> > > > >> when
>>> > >>> > >>> > > > >> > the
>>> > >>> > >>> > > > >> > > assignments change (e.g. new topics are
>>> discovered).
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> > > Ryanne
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> > > On Tue, Oct 15, 2019 at 10:23 AM Vishal
>>> Santoshi <
>>> > >>> > >>> > > > >> > > vishal.santo...@gmail.com>
>>> > >>> > >>> > > > >> > > wrote:
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> > > > Hey Ryanne,
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >            The test was on topics that had a
>>> 7 day
>>> > >>> > >>> retention.
>>> > >>> > >>> > > > Which
>>> > >>> > >>> > > > >> > > > generally implies that the batch size for
>>> flush is
>>> > >>> > pretty
>>> > >>> > >>> > high (
>>> > >>> > >>> > > > >> till
>>> > >>> > >>> > > > >> > the
>>> > >>> > >>> > > > >> > > > consumption becomes current ). The
>>> > >>> > >>> offset.flush.timeout.ms
>>> > >>> > >>> > > > >> defaults
>>> > >>> > >>> > > > >> > to
>>> > >>> > >>> > > > >> > > 5
>>> > >>> > >>> > > > >> > > > seconds and the code will not send in the
>>> offsets
>>> > >>> if the
>>> > >>> > >>> flush
>>> > >>> > >>> > > is
>>> > >>> > >>> > > > >> not
>>> > >>> > >>> > > > >> > > > complete. Increasing that time out did solve
>>> the
>>> > >>> "not
>>> > >>> > >>> sending
>>> > >>> > >>> > > the
>>> > >>> > >>> > > > >> > offset
>>> > >>> > >>> > > > >> > > to
>>> > >>> > >>> > > > >> > > > topic" issue.
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > > Two questions ( I am being greedy here :) )
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > > 1. It seems though that flushing the flushing
>>> the
>>> > >>> > >>> producer and
>>> > >>> > >>> > > > >> setting
>>> > >>> > >>> > > > >> > > the
>>> > >>> > >>> > > > >> > > > offset to the compacting topic is not atomic
>>> OR
>>> > >>> do we
>>> > >>> > >>> use
>>> > >>> > >>> > > > >> > > > transactions here  ?
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > > 2. I see
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> WorkerSourceTask{id=MirrorHeartbeatConnector-0}
>>> > >>> > flushing
>>> > >>> > >>> > 956435
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >  WorkerSourceTask{id=MirrorSourceConnector-1}
>>> > >>> flushing
>>> > >>> > >>> 356251
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> WorkerSourceTask{id=MirrorCheckpointConnector-2}
>>> > >>> > >>> flushing 0
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> WorkerSourceTask{id=MirrorCheckpointConnector-3}
>>> > >>> > >>> flushing 0
>>> > >>> > >>> > > > >> > outstanding
>>> > >>> > >>> > > > >> > > > messages
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > > I think these are 4 threads ( b'coz
>>> num.tasks=4 ),
>>> > >>> and I
>>> > >>> > >>> have
>>> > >>> > >>> > 2
>>> > >>> > >>> > > > >> topics
>>> > >>> > >>> > > > >> > > with
>>> > >>> > >>> > > > >> > > > 1 partition each. Do I assume this right, as
>>> in
>>> > >>> there
>>> > >>> > are
>>> > >>> > >>> 4
>>> > >>> > >>> > > > consumer
>>> > >>> > >>> > > > >> > > groups
>>> > >>> > >>> > > > >> > > > ( on CG per thread ) ...
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > > THANKS A LOT
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > > Vishal.
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > > On Mon, Oct 14, 2019 at 3:42 PM Ryanne Dolan <
>>> > >>> > >>> > > > ryannedo...@gmail.com
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >> > > > wrote:
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > > > > >  timed out
>>> > >>> > >>> > > > >> > > > > while waiting for producer to flush
>>> outstanding
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > > > Yeah, that's what I'd expect to see if
>>> Connect
>>> > was
>>> > >>> > >>> unable to
>>> > >>> > >>> > > > send
>>> > >>> > >>> > > > >> > > records
>>> > >>> > >>> > > > >> > > > > to the downstream remote topics, e.g. if
>>> > >>> > >>> > min.in-sync.replicas
>>> > >>> > >>> > > > were
>>> > >>> > >>> > > > >> > > > > misconfigured. Given some data seems to
>>> arrive,
>>> > >>> it's
>>> > >>> > >>> > possible
>>> > >>> > >>> > > > that
>>> > >>> > >>> > > > >> > > > > everything is configured correctly but with
>>> too
>>> > >>> much
>>> > >>> > >>> latency
>>> > >>> > >>> > > to
>>> > >>> > >>> > > > >> > > > > successfully commit within the default
>>> timeouts.
>>> > >>> You
>>> > >>> > may
>>> > >>> > >>> > want
>>> > >>> > >>> > > to
>>> > >>> > >>> > > > >> > > increase
>>> > >>> > >>> > > > >> > > > > the number of tasks substantially to achieve
>>> > more
>>> > >>> > >>> > parallelism
>>> > >>> > >>> > > > and
>>> > >>> > >>> > > > >> > > > > throughput.
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > > > Ryanne
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > > > On Mon, Oct 14, 2019, 2:30 PM Vishal
>>> Santoshi <
>>> > >>> > >>> > > > >> > > vishal.santo...@gmail.com
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > > > wrote:
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > > > > Aah no.. this is more to  it. Note sure if
>>> > >>> related
>>> > >>> > to
>>> > >>> > >>> the
>>> > >>> > >>> > > > above.
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L114
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > > Is timing out based on
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L133
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > > [2019-10-14 18:55:20,820] ERROR
>>> > >>> > >>> > > > >> > > > > >
>>> WorkerSourceTask{id=MirrorSourceConnector-0}
>>> > >>> Failed
>>> > >>> > to
>>> > >>> > >>> > > flush,
>>> > >>> > >>> > > > >> timed
>>> > >>> > >>> > > > >> > > out
>>> > >>> > >>> > > > >> > > > > > while waiting for producer to flush
>>> > outstanding
>>> > >>> > 36478
>>> > >>> > >>> > > messages
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> (org.apache.kafka.connect.runtime.WorkerSourceTask:423)
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > > On Mon, Oct 14, 2019 at 3:15 PM Vishal
>>> > Santoshi
>>> > >>> <
>>> > >>> > >>> > > > >> > > > > vishal.santo...@gmail.com
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > wrote:
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > > > I think this might be it.. Could you
>>> > confirm.
>>> > >>> It
>>> > >>> > >>> seems
>>> > >>> > >>> > to
>>> > >>> > >>> > > be
>>> > >>> > >>> > > > >> on
>>> > >>> > >>> > > > >> > the
>>> > >>> > >>> > > > >> > > > > path
>>> > >>> > >>> > > > >> > > > > > > to commit the offsets.. but not sure...
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > > [2019-10-14 15:29:14,531] ERROR
>>> Scheduler
>>> > for
>>> > >>> > >>> > > > >> > MirrorSourceConnector
>>> > >>> > >>> > > > >> > > > > > caught
>>> > >>> > >>> > > > >> > > > > > > exception in scheduled task: syncing
>>> topic
>>> > >>> ACLs
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> (org.apache.kafka.connect.mirror.Scheduler:102)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > > java.util.concurrent.ExecutionException:
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > org.apache.kafka.common.errors.SecurityDisabledException:
>>> > >>> > >>> > > No
>>> > >>> > >>> > > > >> > > > Authorizer
>>> > >>> > >>> > > > >> > > > > > is
>>> > >>> > >>> > > > >> > > > > > > configured on the broker
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.listTopicAclBindings(MirrorSourceConnector.java:273)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.connect.mirror.MirrorSourceConnector.syncTopicAcls(MirrorSourceConnector.java:214)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >>
>>> > >>> > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> org.apache.kafka.connect.mirror.Scheduler.lambda$scheduleRepeating$0(Scheduler.java:50)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > >
>>> > >>> > >>>
>>> > >>>
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >>
>>> > >>> > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >         at
>>> > >>> java.lang.Thread.run(Thread.java:748)
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > > Caused by:
>>> > >>> > >>> > > > >> > >
>>> > >>> org.apache.kafka.common.errors.SecurityDisabledException:
>>> > >>> > >>> > > > >> > > > No
>>> > >>> > >>> > > > >> > > > > > > Authorizer is configured on the broker
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > > On Mon, Oct 14, 2019 at 12:30 PM Ryanne
>>> > Dolan
>>> > >>> <
>>> > >>> > >>> > > > >> > > ryannedo...@gmail.com
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > > > > > wrote:
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > > >> > I do not have a single record in the
>>> > >>> offsets
>>> > >>> > >>> topic
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > > >> That's definitely not normal. You are
>>> > correct
>>> > >>> > that
>>> > >>> > >>> > > without
>>> > >>> > >>> > > > >> > records
>>> > >>> > >>> > > > >> > > > in
>>> > >>> > >>> > > > >> > > > > > that
>>> > >>> > >>> > > > >> > > > > > >> topic, MM2 will restart from EARLIEST.
>>> The
>>> > >>> > offsets
>>> > >>> > >>> > should
>>> > >>> > >>> > > > be
>>> > >>> > >>> > > > >> > > stored
>>> > >>> > >>> > > > >> > > > > > >> periodically and whenever the
>>> connectors
>>> > >>> > gracefully
>>> > >>> > >>> > > > shutdown
>>> > >>> > >>> > > > >> or
>>> > >>> > >>> > > > >> > > > > restart.
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > > >> Is it possible the topics don't have
>>> > required
>>> > >>> > ACLs
>>> > >>> > >>> or
>>> > >>> > >>> > > > >> something?
>>> > >>> > >>> > > > >> > > > Also
>>> > >>> > >>> > > > >> > > > > > >> note:
>>> > >>> > >>> > > > >> > > > > > >> Connect wants the offsets topic to
>>> have a
>>> > >>> large
>>> > >>> > >>> number
>>> > >>> > >>> > of
>>> > >>> > >>> > > > >> > > partitions
>>> > >>> > >>> > > > >> > > > > and
>>> > >>> > >>> > > > >> > > > > > >> to
>>> > >>> > >>> > > > >> > > > > > >> be compacted. Though I can't imagine
>>> either
>>> > >>> would
>>> > >>> > >>> > prevent
>>> > >>> > >>> > > > >> > commits
>>> > >>> > >>> > > > >> > > > from
>>> > >>> > >>> > > > >> > > > > > >> being sent.
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > > >> Ryanne
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > > >> On Mon, Oct 14, 2019 at 10:46 AM Vishal
>>> > >>> Santoshi
>>> > >>> > <
>>> > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com>
>>> > >>> > >>> > > > >> > > > > > >> wrote:
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > > >> > 2nd/restore issue  ( I think I need
>>> to
>>> > >>> solve
>>> > >>> > the
>>> > >>> > >>> > > offsets
>>> > >>> > >>> > > > >> topic
>>> > >>> > >>> > > > >> > > > issue
>>> > >>> > >>> > > > >> > > > > > >> > before I go with the scale up and
>>> down
>>> > >>> issue )
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > As you had indicated, I went ahead
>>> and
>>> > >>> created
>>> > >>> > >>> the
>>> > >>> > >>> > > > offsets
>>> > >>> > >>> > > > >> > > topic.
>>> > >>> > >>> > > > >> > > > > The
>>> > >>> > >>> > > > >> > > > > > >> > status of the cluster  ( destination
>>> ) is
>>> > >>> thus
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > opic# Partitions# BrokersBrokers
>>> Spread
>>> > >>> > %Brokers
>>> > >>> > >>> Skew
>>> > >>> > >>> > > > >> %Brokers
>>> > >>> > >>> > > > >> > > > > Leader
>>> > >>> > >>> > > > >> > > > > > >> > Skew %# ReplicasUnder Replicated
>>> %Leader
>>> > >>> > >>> SizeProducer
>>> > >>> > >>> > > > >> > > > > > Message/SecSummed
>>> > >>> > >>> > > > >> > > > > > >> > Recent Offsets
>>> > >>> > >>> > > > >> > > > > > >> > s8k.checkpoints.internal
>>> > >>> > >>> > > > >> > > > > > >> > <
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.checkpoints.internal
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>> > >>> > >>> > > > >> > > > > > >> > s8k.act_search_page
>>> > >>> > >>> > > > >> > > > > > >> > <
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_search_page
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 6675.30 4,166,842
>>> > >>> > >>> > > > >> > > > > > >> > s8k.act_reach
>>> > >>> > >>> > > > >> > > > > > >> > <
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/s8k.act_reach
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 20657.92 11,579,529
>>> > >>> > >>> > > > >> > > > > > >> > mm2-status.s8k.internal
>>> > >>> > >>> > > > >> > > > > > >> > <
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-status.s8k.internal
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > 5 5 100 0 0 3 0 0.00 10
>>> > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k_test.internal
>>> > >>> > >>> > > > >> > > > > > >> > <
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k_test.internal
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>> > >>> > >>> > > > >> > > > > > >> > mm2-offsets.s8k.internal
>>> > >>> > >>> > > > >> > > > > > >> > <
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-offsets.s8k.internal
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 0
>>> > >>> > >>> > > > >> > > > > > >> > mm2-configs.s8k.internal
>>> > >>> > >>> > > > >> > > > > > >> > <
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka-manager.bf2.tumblr.net/clusters/grete_test/topics/mm2-configs.s8k.internal
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > 1 3 60 0 0 3 0 0.00 13
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > You can see . that we have the  5 ( I
>>> > >>> created
>>> > >>> > >>> bot the
>>> > >>> > >>> > > > >> offsets,
>>> > >>> > >>> > > > >> > > to
>>> > >>> > >>> > > > >> > > > be
>>> > >>> > >>> > > > >> > > > > > >> safe
>>> > >>> > >>> > > > >> > > > > > >> > for the below )
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *clusters = s8k, s8k_test*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *s8k.bootstrap.servers = .....*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *s8k_test.bootstrap.servers = ......*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *# only allow replication dr1 -> dr2*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.enabled = true*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *s8k->s8k_test.topics =
>>> > >>> > >>> act_search_page|act_reach*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> *s8k->s8k_test.emit.heartbeats.enabled =
>>> > >>> false*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *s8k_test->s8k.enabled = false*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> *s8k_test->s8k.emit.heartbeats.enabled =
>>> > >>> false*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *s8k_test.replication.factor = 3*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *s8k.replication.factor = 3*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *offsets.storage.replication.factor
>>> = 3*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *replication.factor = 3*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *replication.policy.separator = .*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > *tasks.max = 4*
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > What seems strange is that I do not
>>> have
>>> > a
>>> > >>> > single
>>> > >>> > >>> > > record
>>> > >>> > >>> > > > in
>>> > >>> > >>> > > > >> > the
>>> > >>> > >>> > > > >> > > > > > offsets
>>> > >>> > >>> > > > >> > > > > > >> > topic.. Is that normal ?   I would
>>> > imagine
>>> > >>> that
>>> > >>> > >>> > > without a
>>> > >>> > >>> > > > >> > > record,
>>> > >>> > >>> > > > >> > > > > > there
>>> > >>> > >>> > > > >> > > > > > >> is
>>> > >>> > >>> > > > >> > > > > > >> > no way that a restore would
>>> happen....
>>> > And
>>> > >>> that
>>> > >>> > >>> is
>>> > >>> > >>> > > > obvious
>>> > >>> > >>> > > > >> > when
>>> > >>> > >>> > > > >> > > I
>>> > >>> > >>> > > > >> > > > > > >> restart
>>> > >>> > >>> > > > >> > > > > > >> > the mm2 instance... Find the
>>> screenshot
>>> > >>> > >>> attached. In
>>> > >>> > >>> > > > >> essence
>>> > >>> > >>> > > > >> > the
>>> > >>> > >>> > > > >> > > > > > latency
>>> > >>> > >>> > > > >> > > > > > >> > avg lag is reset \when the mm2
>>> instance
>>> > is
>>> > >>> > reset
>>> > >>> > >>> > > > >> indicating no
>>> > >>> > >>> > > > >> > > > > restore
>>> > >>> > >>> > > > >> > > > > > >> but
>>> > >>> > >>> > > > >> > > > > > >> > restart from EARLIEST... I must be
>>> > missing
>>> > >>> some
>>> > >>> > >>> thing
>>> > >>> > >>> > > > >> simple
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> > On Sun, Oct 13, 2019 at 7:41 PM
>>> Ryanne
>>> > >>> Dolan <
>>> > >>> > >>> > > > >> > > > ryannedo...@gmail.com
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > > > >> > wrote:
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >> >> Vishal, the first issue is easy: you
>>> > must
>>> > >>> set
>>> > >>> > >>> > > tasks.max
>>> > >>> > >>> > > > to
>>> > >>> > >>> > > > >> > > > > something
>>> > >>> > >>> > > > >> > > > > > >> above
>>> > >>> > >>> > > > >> > > > > > >> >> 1 (the default) in order to achieve
>>> any
>>> > >>> > >>> parallelism.
>>> > >>> > >>> > > > This
>>> > >>> > >>> > > > >> > > > property
>>> > >>> > >>> > > > >> > > > > is
>>> > >>> > >>> > > > >> > > > > > >> >> passed along to the internal Connect
>>> > >>> workers.
>>> > >>> > >>> It's
>>> > >>> > >>> > > > >> > unfortunate
>>> > >>> > >>> > > > >> > > > that
>>> > >>> > >>> > > > >> > > > > > >> >> Connect
>>> > >>> > >>> > > > >> > > > > > >> >> is not smart enough to default this
>>> > >>> property
>>> > >>> > to
>>> > >>> > >>> the
>>> > >>> > >>> > > > >> number of
>>> > >>> > >>> > > > >> > > > > > workers.
>>> > >>> > >>> > > > >> > > > > > >> I
>>> > >>> > >>> > > > >> > > > > > >> >> suspect that will improve before
>>> long.
>>> > >>> > >>> > > > >> > > > > > >> >>
>>> > >>> > >>> > > > >> > > > > > >> >> For the second issue, is it
>>> possible you
>>> > >>> are
>>> > >>> > >>> missing
>>> > >>> > >>> > > the
>>> > >>> > >>> > > > >> > > offsets
>>> > >>> > >>> > > > >> > > > > > >> topic? It
>>> > >>> > >>> > > > >> > > > > > >> >> should exist alongside the config
>>> and
>>> > >>> status
>>> > >>> > >>> topics.
>>> > >>> > >>> > > > >> Connect
>>> > >>> > >>> > > > >> > > > should
>>> > >>> > >>> > > > >> > > > > > >> create
>>> > >>> > >>> > > > >> > > > > > >> >> this topic, but there are various
>>> > reasons
>>> > >>> this
>>> > >>> > >>> can
>>> > >>> > >>> > > fail,
>>> > >>> > >>> > > > >> e.g.
>>> > >>> > >>> > > > >> > > if
>>> > >>> > >>> > > > >> > > > > the
>>> > >>> > >>> > > > >> > > > > > >> >> replication factor is
>>> misconfigured. You
>>> > >>> can
>>> > >>> > try
>>> > >>> > >>> > > > creating
>>> > >>> > >>> > > > >> > this
>>> > >>> > >>> > > > >> > > > > topic
>>> > >>> > >>> > > > >> > > > > > >> >> manually or changing
>>> > >>> > >>> > > offsets.storage.replication.factor.
>>> > >>> > >>> > > > >> > > > > > >> >>
>>> > >>> > >>> > > > >> > > > > > >> >> Ryanne
>>> > >>> > >>> > > > >> > > > > > >> >>
>>> > >>> > >>> > > > >> > > > > > >> >> On Sun, Oct 13, 2019, 5:13 PM Vishal
>>> > >>> Santoshi
>>> > >>> > <
>>> > >>> > >>> > > > >> > > > > > >> vishal.santo...@gmail.com>
>>> > >>> > >>> > > > >> > > > > > >> >> wrote:
>>> > >>> > >>> > > > >> > > > > > >> >>
>>> > >>> > >>> > > > >> > > > > > >> >> > Using
>>> > >>> > >>> > > > >> > > >
>>> > >>> > https://github.com/apache/kafka/tree/trunk/connect/mirror
>>> > >>> > >>> > > > >> > > > > > as a
>>> > >>> > >>> > > > >> > > > > > >> >> > guide,
>>> > >>> > >>> > > > >> > > > > > >> >> > I have build from source the
>>> > >>> origin/KIP-382
>>> > >>> > of
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> https://github.com/apache/kafka.git.
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > I am seeing 2 issues
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > * I brought up 2 processes on 2
>>> > >>> different
>>> > >>> > >>> nodes (
>>> > >>> > >>> > > they
>>> > >>> > >>> > > > >> are
>>> > >>> > >>> > > > >> > > > > actually
>>> > >>> > >>> > > > >> > > > > > >> >> pods on
>>> > >>> > >>> > > > >> > > > > > >> >> > k8s but that should not matter ).
>>> They
>>> > >>> share
>>> > >>> > >>> the
>>> > >>> > >>> > > > >> > > mm2.properties
>>> > >>> > >>> > > > >> > > > > > file
>>> > >>> > >>> > > > >> > > > > > >> and
>>> > >>> > >>> > > > >> > > > > > >> >> > are replicating ( 1-way ) 3 topics
>>> > with
>>> > >>> 8
>>> > >>> > >>> > partitions
>>> > >>> > >>> > > > in
>>> > >>> > >>> > > > >> > > total.
>>> > >>> > >>> > > > >> > > > > > That
>>> > >>> > >>> > > > >> > > > > > >> >> seems
>>> > >>> > >>> > > > >> > > > > > >> >> > to be the way to create a
>>> standalone
>>> > mm2
>>> > >>> > >>> cluster.
>>> > >>> > >>> > I
>>> > >>> > >>> > > do
>>> > >>> > >>> > > > >> not
>>> > >>> > >>> > > > >> > > > > however
>>> > >>> > >>> > > > >> > > > > > >> see(
>>> > >>> > >>> > > > >> > > > > > >> >> at
>>> > >>> > >>> > > > >> > > > > > >> >> > least the mbeans do not show ) any
>>> > >>> attempt
>>> > >>> > to
>>> > >>> > >>> > > > rebalance.
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >>
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> >
>>> > >>>
>>> >
>>> https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process
>>> > >>> > >>> > > > >> > > > > > >> >> > mbeans
>>> > >>> > >>> > > > >> > > > > > >> >> > are all on a single node
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > * I restart the processes on the 2
>>> > >>> nodes (
>>> > >>> > >>> hard
>>> > >>> > >>> > stop
>>> > >>> > >>> > > > ans
>>> > >>> > >>> > > > >> > > start
>>> > >>> > >>> > > > >> > > > ).
>>> > >>> > >>> > > > >> > > > > > The
>>> > >>> > >>> > > > >> > > > > > >> >> > offsets for replication seem to be
>>> > >>> reset to
>>> > >>> > >>> the
>>> > >>> > >>> > > > >> earliest,
>>> > >>> > >>> > > > >> > as
>>> > >>> > >>> > > > >> > > if
>>> > >>> > >>> > > > >> > > > > it
>>> > >>> > >>> > > > >> > > > > > >> is a
>>> > >>> > >>> > > > >> > > > > > >> >> > brand new mirroring. It is also
>>> > obvious
>>> > >>> from
>>> > >>> > >>> the
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > "record-age-ms-avg|replication-latency-ms-avg"
>>> > >>> > >>> > > > >> > > > > > >> >> > which I track through the restart.
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > This implies that
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > 1. Load balancing by rebalancing
>>> is
>>> > not
>>> > >>> > >>> working. I
>>> > >>> > >>> > > > >> cannot
>>> > >>> > >>> > > > >> > > scale
>>> > >>> > >>> > > > >> > > > > up
>>> > >>> > >>> > > > >> > > > > > or
>>> > >>> > >>> > > > >> > > > > > >> >> down
>>> > >>> > >>> > > > >> > > > > > >> >> > by adding nodes to the mm2
>>> cluster or
>>> > >>> > removing
>>> > >>> > >>> > them.
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > 2. Restore on a mirror is not
>>> working.
>>> > >>> If
>>> > >>> > the
>>> > >>> > >>> MM2
>>> > >>> > >>> > > > >> cluster
>>> > >>> > >>> > > > >> > is
>>> > >>> > >>> > > > >> > > > > > brought
>>> > >>> > >>> > > > >> > > > > > >> >> down,
>>> > >>> > >>> > > > >> > > > > > >> >> > it does not start mirroring from
>>> the
>>> > >>> last
>>> > >>> > >>> known
>>> > >>> > >>> > > > state. I
>>> > >>> > >>> > > > >> > see
>>> > >>> > >>> > > > >> > > > the,
>>> > >>> > >>> > > > >> > > > > > >> >> > state/config topics etc created as
>>> > >>> > expected..
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > The mm2.properties is pretty
>>> mimimal
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > *clusters = a , b*
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > *a.bootstrap.servers = k.....*
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > *b.bootstrap.servers = k.....*
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > *# only allow replication dr1 ->
>>> dr2*
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.enabled = true*
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.topics = act_search_page*
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > *a->b.emit.heartbeats.enabled =
>>> false*
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > *b->a..enabled = false*
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > *b->a.emit.heartbeats.enabled =
>>> false*
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > What do you think is the issue ?
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >> > Thanks
>>> > >>> > >>> > > > >> > > > > > >> >> >
>>> > >>> > >>> > > > >> > > > > > >> >>
>>> > >>> > >>> > > > >> > > > > > >> >
>>> > >>> > >>> > > > >> > > > > > >>
>>> > >>> > >>> > > > >> > > > > > >
>>> > >>> > >>> > > > >> > > > > >
>>> > >>> > >>> > > > >> > > > >
>>> > >>> > >>> > > > >> > > >
>>> > >>> > >>> > > > >> > >
>>> > >>> > >>> > > > >> >
>>> > >>> > >>> > > > >>
>>> > >>> > >>> > > > >
>>> > >>> > >>> > > >
>>> > >>> > >>> > >
>>> > >>> > >>> >
>>> > >>> > >>>
>>> > >>> > >>
>>> > >>> >
>>> > >>>
>>> > >>
>>> >
>>>
>>

Reply via email to