Hi Robin

Thanks for the clarification.

As you suggested, that task allocation between the workers is
nondeterministic. I have shared the same information within in my team but
there are some other parties, with whom I need to share this information as
explanation for the issue raised by them and I cannot show this mail as a
reference.

It would be very great if you please share any link/discussion reference
regarding the same.

Regards and Thanks
Deepak Raghav



On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io> wrote:

> I don't think you're right to assert that this is "expected behaviour":
>
> >  the tasks are divided in below pattern when they are first time
> registered
>
> Kafka Connect task allocation is non-determanistic.
>
> I'm still not clear if you're solving for a theoretical problem or an
> actual one. If this is an actual problem that you're encountering and need
> a solution to then since the task allocation is not deterministic it sounds
> like you need to deploy separate worker clusters based on the workload
> patterns that you are seeing and machine resources available.
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Wed, 20 May 2020 at 21:29, Deepak Raghav <deepakragha...@gmail.com>
> wrote:
>
> > Hi Robin
> >
> > I had gone though the link you provided, It is not helpful in my case.
> > Apart from this, *I am not getting why the tasks are divided in *below
> > pattern* when they are *first time registered*, which is expected
> behavior.
> > I*s there any parameter which we can pass in worker property file which
> > handle the task assignment strategy like we have range assigner or round
> > robin in consumer-group ?
> >
> > connector rest status api result after first registration :
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.5:*8080*"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:*8078*"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.5:*8080*"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> > and
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.4:*8078*"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:*8078*"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.5:*8080*"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> >
> > But when I stop the second worker process and wait for
> > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start the
> > process again. Result is different.
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.5:*8080*"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.5:*8080*"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.5:*8080*"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> > and
> >
> > {
> >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> >   "connector": {
> >     "state": "RUNNING",
> >     "worker_id": "10.0.0.4:*8078*"
> >   },
> >   "tasks": [
> >     {
> >       "id": 0,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:*8078*"
> >     },
> >     {
> >       "id": 1,
> >       "state": "RUNNING",
> >       "worker_id": "10.0.0.4:*8078*"
> >     }
> >   ],
> >   "type": "sink"
> > }
> >
> > Regards and Thanks
> > Deepak Raghav
> >
> >
> >
> > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io>
> wrote:
> >
> > > Thanks for the clarification. If this is an actual problem that you're
> > > encountering and need a solution to then since the task allocation is
> not
> > > deterministic it sounds like you need to deploy separate worker
> clusters
> > > based on the workload patterns that you are seeing and machine
> resources
> > > available.
> > >
> > >
> > > --
> > >
> > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io |
> @rmoff
> > >
> > >
> > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <deepakragha...@gmail.com>
> > > wrote:
> > >
> > > > Hi Robin
> > > >
> > > > Replying to your query i.e
> > > >
> > > > One thing I'd ask at this point is though if it makes any difference
> > > where
> > > > the tasks execute?
> > > >
> > > > It actually makes difference to us, we have 16 connectors and as I
> > stated
> > > > tasks division earlier, first 8 connector' task are assigned to first
> > > > worker process and another connector's task to another worker process
> > and
> > > > just to mention that these 16 connectors are sink connectors. Each
> sink
> > > > connector consumes message from different topic.There may be a case
> > when
> > > > messages are coming only for first 8 connector's topic and because
> all
> > > the
> > > > tasks of these connectors are assigned to First Worker, load would be
> > > high
> > > > on it and another set of connectors in another worker would be idle.
> > > >
> > > > Instead, if the task would have been divided evenly then this case
> > would
> > > > have been avoided. Because tasks of each connector would be present
> in
> > > both
> > > > workers process like below :
> > > >
> > > > *W1*                       *W2*
> > > >  C1T1                    C1T2
> > > >  C2T2                    C2T2
> > > >
> > > > I hope, I gave your answer,
> > > >
> > > >
> > > > Regards and Thanks
> > > > Deepak Raghav
> > > >
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io>
> > > wrote:
> > > >
> > > > > OK, I understand better now.
> > > > >
> > > > > You can read more about the guts of the rebalancing protocol that
> > Kafka
> > > > > Connect uses as of Apache Kafka 2.3 an onwards here:
> > > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> > > > >
> > > > > One thing I'd ask at this point is though if it makes any
> difference
> > > > where
> > > > > the tasks execute? The point of a cluster is that Kafka Connect
> > manages
> > > > the
> > > > > workload allocation. If you need workload separation and
> > > > > guaranteed execution locality I would suggest separate Kafka
> Connect
> > > > > distributed clusters.
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io |
> > > @rmoff
> > > > >
> > > > >
> > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <
> > deepakragha...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Robin
> > > > > >
> > > > > > Thanks for your reply.
> > > > > >
> > > > > > We are having two worker on different IP. The example which I
> gave
> > > you
> > > > it
> > > > > > was just a example. We are using kafka version 2.3.1.
> > > > > >
> > > > > > Let me tell you again with a simple example.
> > > > > >
> > > > > > Suppose, we have two EC2 node, N1 and N2 having worker process W1
> > and
> > > > W2
> > > > > > running in distribute mode with groupId i.e in same cluster and
> two
> > > > > > connectors with having two task each i.e
> > > > > >
> > > > > > Node N1: W1 is running
> > > > > > Node N2 : W2 is running
> > > > > >
> > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id :
> > C1T2
> > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id :
> > > C2T2
> > > > > >
> > > > > > Now Suppose If both W1 and W2 worker process are running  and I
> > > > register
> > > > > > Connector C1 and C2 one after another i.e sequentially, on any of
> > the
> > > > > > worker process, the tasks division between the worker
> > > > > > node are happening like below, which is expected.
> > > > > >
> > > > > > *W1*                       *W2*
> > > > > > C1T1                    C1T2
> > > > > > C2T2                    C2T2
> > > > > >
> > > > > > Now, suppose I stop one worker process e.g W2 and start after
> some
> > > > time,
> > > > > > the tasks division is changed like below i.e first connector's
> task
> > > > move
> > > > > to
> > > > > > W1 and second connector's task move to W2
> > > > > >
> > > > > > *W1*                       *W2*
> > > > > > C1T1                    C2T1
> > > > > > C1T2                    C2T2
> > > > > >
> > > > > >
> > > > > > Please let me know, If it is understandable or not.
> > > > > >
> > > > > > Note : Actually, In production, we are gonna have 16 connectors
> > > having
> > > > 10
> > > > > > task each and two worker node. With above scenario, first 8
> > > > connectors's
> > > > > > task move to W1 and next 8 connector' task move to W2, Which is
> not
> > > > > > expected.
> > > > > >
> > > > > >
> > > > > > Regards and Thanks
> > > > > > Deepak Raghav
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <
> ro...@confluent.io>
> > > > > wrote:
> > > > > >
> > > > > > > So you're running two workers on the same machine (10.0.0.4),
> is
> > > > > > > that correct? Normally you'd run one worker per machine unless
> > > there
> > > > > was
> > > > > > a
> > > > > > > particular reason otherwise.
> > > > > > > What version of Apache Kafka are you using?
> > > > > > > I'm not clear from your question if the distribution of tasks
> is
> > > > > > > presenting a problem to you (if so please describe why), or if
> > > you're
> > > > > > just
> > > > > > > interested in the theory behind the rebalancing protocol?
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io
> |
> > > > > @rmoff
> > > > > > >
> > > > > > >
> > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> > > > deepakragha...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi
> > > > > > > >
> > > > > > > > Please, can anybody help me with this?
> > > > > > > >
> > > > > > > > Regards and Thanks
> > > > > > > > Deepak Raghav
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > > > > deepakragha...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Team
> > > > > > > > >
> > > > > > > > > We have two worker node in a cluster and 2 connector with
> > > having
> > > > 10
> > > > > > > tasks
> > > > > > > > > each.
> > > > > > > > >
> > > > > > > > > Now, suppose if we have two kafka connect process W1(Port
> > 8080)
> > > > and
> > > > > > > > > W2(Port 8078) started already in distribute mode and then
> > > > register
> > > > > > the
> > > > > > > > > connectors, task of one connector i.e 10 tasks are divided
> > > > equally
> > > > > > > > between
> > > > > > > > > two worker i.e first task of A connector to W1 worker node
> > and
> > > > sec
> > > > > > task
> > > > > > > > of
> > > > > > > > > A connector to W2 worker node, similarly for first task of
> B
> > > > > > connector,
> > > > > > > > > will go to W1 node and sec task of B connector go to W2
> node.
> > > > > > > > >
> > > > > > > > > e.g
> > > > > > > > > *#First Connector : *
> > > > > > > > > {
> > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > >   "connector": {
> > > > > > > > >     "state": "RUNNING",
> > > > > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > > > > >   },
> > > > > > > > >   "tasks": [
> > > > > > > > >     {
> > > > > > > > >       "id": 0,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 1,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 2,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 3,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 4,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 5,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 6,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 7,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 8,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 9,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     }
> > > > > > > > >   ],
> > > > > > > > >   "type": "sink"
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > *#Sec connector*
> > > > > > > > >
> > > > > > > > > {
> > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > >   "connector": {
> > > > > > > > >     "state": "RUNNING",
> > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > >   },
> > > > > > > > >   "tasks": [
> > > > > > > > >     {
> > > > > > > > >       "id": 0,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 1,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 2,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 3,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 4,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 5,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 6,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 7,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 8,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 9,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     }
> > > > > > > > >   ],
> > > > > > > > >   "type": "sink"
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > But I have seen a strange behavior, when I just shutdown W2
> > > > worker
> > > > > > node
> > > > > > > > > and start it again task are divided but in diff way i.e all
> > the
> > > > > tasks
> > > > > > > of
> > > > > > > > A
> > > > > > > > > connector will get into W1 node and tasks of B Connector
> into
> > > W2
> > > > > > node.
> > > > > > > > >
> > > > > > > > > Can you please have a look for this.
> > > > > > > > >
> > > > > > > > > *#First Connector*
> > > > > > > > >
> > > > > > > > > {
> > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > > > >   "connector": {
> > > > > > > > >     "state": "RUNNING",
> > > > > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > > > > >   },
> > > > > > > > >   "tasks": [
> > > > > > > > >     {
> > > > > > > > >       "id": 0,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 1,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 2,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 3,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 4,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 5,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 6,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 7,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 8,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 9,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > > > >     }
> > > > > > > > >   ],
> > > > > > > > >   "type": "sink"
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > *#Second Connector *:
> > > > > > > > >
> > > > > > > > > {
> > > > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > > > >   "connector": {
> > > > > > > > >     "state": "RUNNING",
> > > > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > > > >   },
> > > > > > > > >   "tasks": [
> > > > > > > > >     {
> > > > > > > > >       "id": 0,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 1,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 2,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 3,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 4,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 5,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 6,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 7,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 8,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     },
> > > > > > > > >     {
> > > > > > > > >       "id": 9,
> > > > > > > > >       "state": "RUNNING",
> > > > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > > > >     }
> > > > > > > > >   ],
> > > > > > > > >   "type": "sink"
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards and Thanks
> > > > > > > > > Deepak Raghav
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to