Hi Robin Thanks for the clarification.
As you suggested, that task allocation between the workers is nondeterministic. I have shared the same information within in my team but there are some other parties, with whom I need to share this information as explanation for the issue raised by them and I cannot show this mail as a reference. It would be very great if you please share any link/discussion reference regarding the same. Regards and Thanks Deepak Raghav On Thu, May 21, 2020 at 2:12 PM Robin Moffatt <ro...@confluent.io> wrote: > I don't think you're right to assert that this is "expected behaviour": > > > the tasks are divided in below pattern when they are first time > registered > > Kafka Connect task allocation is non-determanistic. > > I'm still not clear if you're solving for a theoretical problem or an > actual one. If this is an actual problem that you're encountering and need > a solution to then since the task allocation is not deterministic it sounds > like you need to deploy separate worker clusters based on the workload > patterns that you are seeing and machine resources available. > > > -- > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff > > > On Wed, 20 May 2020 at 21:29, Deepak Raghav <deepakragha...@gmail.com> > wrote: > > > Hi Robin > > > > I had gone though the link you provided, It is not helpful in my case. > > Apart from this, *I am not getting why the tasks are divided in *below > > pattern* when they are *first time registered*, which is expected > behavior. > > I*s there any parameter which we can pass in worker property file which > > handle the task assignment strategy like we have range assigner or round > > robin in consumer-group ? > > > > connector rest status api result after first registration : > > > > { > > "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent", > > "connector": { > > "state": "RUNNING", > > "worker_id": "10.0.0.5:*8080*" > > }, > > "tasks": [ > > { > > "id": 0, > > "state": "RUNNING", > > "worker_id": "10.0.0.4:*8078*" > > }, > > { > > "id": 1, > > "state": "RUNNING", > > "worker_id": "10.0.0.5:*8080*" > > } > > ], > > "type": "sink" > > } > > > > and > > > > { > > "name": "REGION_CODE_UPPER-Cdb_Neatransaction", > > "connector": { > > "state": "RUNNING", > > "worker_id": "10.0.0.4:*8078*" > > }, > > "tasks": [ > > { > > "id": 0, > > "state": "RUNNING", > > "worker_id": "10.0.0.4:*8078*" > > }, > > { > > "id": 1, > > "state": "RUNNING", > > "worker_id": "10.0.0.5:*8080*" > > } > > ], > > "type": "sink" > > } > > > > > > But when I stop the second worker process and wait for > > scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start the > > process again. Result is different. > > > > { > > "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent", > > "connector": { > > "state": "RUNNING", > > "worker_id": "10.0.0.5:*8080*" > > }, > > "tasks": [ > > { > > "id": 0, > > "state": "RUNNING", > > "worker_id": "10.0.0.5:*8080*" > > }, > > { > > "id": 1, > > "state": "RUNNING", > > "worker_id": "10.0.0.5:*8080*" > > } > > ], > > "type": "sink" > > } > > > > and > > > > { > > "name": "REGION_CODE_UPPER-Cdb_Neatransaction", > > "connector": { > > "state": "RUNNING", > > "worker_id": "10.0.0.4:*8078*" > > }, > > "tasks": [ > > { > > "id": 0, > > "state": "RUNNING", > > "worker_id": "10.0.0.4:*8078*" > > }, > > { > > "id": 1, > > "state": "RUNNING", > > "worker_id": "10.0.0.4:*8078*" > > } > > ], > > "type": "sink" > > } > > > > Regards and Thanks > > Deepak Raghav > > > > > > > > On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io> > wrote: > > > > > Thanks for the clarification. If this is an actual problem that you're > > > encountering and need a solution to then since the task allocation is > not > > > deterministic it sounds like you need to deploy separate worker > clusters > > > based on the workload patterns that you are seeing and machine > resources > > > available. > > > > > > > > > -- > > > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | > @rmoff > > > > > > > > > On Wed, 20 May 2020 at 16:39, Deepak Raghav <deepakragha...@gmail.com> > > > wrote: > > > > > > > Hi Robin > > > > > > > > Replying to your query i.e > > > > > > > > One thing I'd ask at this point is though if it makes any difference > > > where > > > > the tasks execute? > > > > > > > > It actually makes difference to us, we have 16 connectors and as I > > stated > > > > tasks division earlier, first 8 connector' task are assigned to first > > > > worker process and another connector's task to another worker process > > and > > > > just to mention that these 16 connectors are sink connectors. Each > sink > > > > connector consumes message from different topic.There may be a case > > when > > > > messages are coming only for first 8 connector's topic and because > all > > > the > > > > tasks of these connectors are assigned to First Worker, load would be > > > high > > > > on it and another set of connectors in another worker would be idle. > > > > > > > > Instead, if the task would have been divided evenly then this case > > would > > > > have been avoided. Because tasks of each connector would be present > in > > > both > > > > workers process like below : > > > > > > > > *W1* *W2* > > > > C1T1 C1T2 > > > > C2T2 C2T2 > > > > > > > > I hope, I gave your answer, > > > > > > > > > > > > Regards and Thanks > > > > Deepak Raghav > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io> > > > wrote: > > > > > > > > > OK, I understand better now. > > > > > > > > > > You can read more about the guts of the rebalancing protocol that > > Kafka > > > > > Connect uses as of Apache Kafka 2.3 an onwards here: > > > > > > > > > > > > > > > https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/ > > > > > > > > > > One thing I'd ask at this point is though if it makes any > difference > > > > where > > > > > the tasks execute? The point of a cluster is that Kafka Connect > > manages > > > > the > > > > > workload allocation. If you need workload separation and > > > > > guaranteed execution locality I would suggest separate Kafka > Connect > > > > > distributed clusters. > > > > > > > > > > > > > > > -- > > > > > > > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | > > > @rmoff > > > > > > > > > > > > > > > On Wed, 20 May 2020 at 10:24, Deepak Raghav < > > deepakragha...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Robin > > > > > > > > > > > > Thanks for your reply. > > > > > > > > > > > > We are having two worker on different IP. The example which I > gave > > > you > > > > it > > > > > > was just a example. We are using kafka version 2.3.1. > > > > > > > > > > > > Let me tell you again with a simple example. > > > > > > > > > > > > Suppose, we have two EC2 node, N1 and N2 having worker process W1 > > and > > > > W2 > > > > > > running in distribute mode with groupId i.e in same cluster and > two > > > > > > connectors with having two task each i.e > > > > > > > > > > > > Node N1: W1 is running > > > > > > Node N2 : W2 is running > > > > > > > > > > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id : > > C1T2 > > > > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id : > > > C2T2 > > > > > > > > > > > > Now Suppose If both W1 and W2 worker process are running and I > > > > register > > > > > > Connector C1 and C2 one after another i.e sequentially, on any of > > the > > > > > > worker process, the tasks division between the worker > > > > > > node are happening like below, which is expected. > > > > > > > > > > > > *W1* *W2* > > > > > > C1T1 C1T2 > > > > > > C2T2 C2T2 > > > > > > > > > > > > Now, suppose I stop one worker process e.g W2 and start after > some > > > > time, > > > > > > the tasks division is changed like below i.e first connector's > task > > > > move > > > > > to > > > > > > W1 and second connector's task move to W2 > > > > > > > > > > > > *W1* *W2* > > > > > > C1T1 C2T1 > > > > > > C1T2 C2T2 > > > > > > > > > > > > > > > > > > Please let me know, If it is understandable or not. > > > > > > > > > > > > Note : Actually, In production, we are gonna have 16 connectors > > > having > > > > 10 > > > > > > task each and two worker node. With above scenario, first 8 > > > > connectors's > > > > > > task move to W1 and next 8 connector' task move to W2, Which is > not > > > > > > expected. > > > > > > > > > > > > > > > > > > Regards and Thanks > > > > > > Deepak Raghav > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt < > ro...@confluent.io> > > > > > wrote: > > > > > > > > > > > > > So you're running two workers on the same machine (10.0.0.4), > is > > > > > > > that correct? Normally you'd run one worker per machine unless > > > there > > > > > was > > > > > > a > > > > > > > particular reason otherwise. > > > > > > > What version of Apache Kafka are you using? > > > > > > > I'm not clear from your question if the distribution of tasks > is > > > > > > > presenting a problem to you (if so please describe why), or if > > > you're > > > > > > just > > > > > > > interested in the theory behind the rebalancing protocol? > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io > | > > > > > @rmoff > > > > > > > > > > > > > > > > > > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav < > > > > deepakragha...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi > > > > > > > > > > > > > > > > Please, can anybody help me with this? > > > > > > > > > > > > > > > > Regards and Thanks > > > > > > > > Deepak Raghav > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav < > > > > > > deepakragha...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Team > > > > > > > > > > > > > > > > > > We have two worker node in a cluster and 2 connector with > > > having > > > > 10 > > > > > > > tasks > > > > > > > > > each. > > > > > > > > > > > > > > > > > > Now, suppose if we have two kafka connect process W1(Port > > 8080) > > > > and > > > > > > > > > W2(Port 8078) started already in distribute mode and then > > > > register > > > > > > the > > > > > > > > > connectors, task of one connector i.e 10 tasks are divided > > > > equally > > > > > > > > between > > > > > > > > > two worker i.e first task of A connector to W1 worker node > > and > > > > sec > > > > > > task > > > > > > > > of > > > > > > > > > A connector to W2 worker node, similarly for first task of > B > > > > > > connector, > > > > > > > > > will go to W1 node and sec task of B connector go to W2 > node. > > > > > > > > > > > > > > > > > > e.g > > > > > > > > > *#First Connector : * > > > > > > > > > { > > > > > > > > > "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent", > > > > > > > > > "connector": { > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:*8080*" > > > > > > > > > }, > > > > > > > > > "tasks": [ > > > > > > > > > { > > > > > > > > > "id": 0, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:*8078*" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 1, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 2, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 3, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 4, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 5, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 6, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 7, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 8, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 9, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > } > > > > > > > > > ], > > > > > > > > > "type": "sink" > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > *#Sec connector* > > > > > > > > > > > > > > > > > > { > > > > > > > > > "name": "REGION_CODE_UPPER-Cdb_Neatransaction", > > > > > > > > > "connector": { > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > "tasks": [ > > > > > > > > > { > > > > > > > > > "id": 0, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 1, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 2, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 3, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 4, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 5, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 6, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 7, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 8, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 9, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > } > > > > > > > > > ], > > > > > > > > > "type": "sink" > > > > > > > > > } > > > > > > > > > > > > > > > > > > But I have seen a strange behavior, when I just shutdown W2 > > > > worker > > > > > > node > > > > > > > > > and start it again task are divided but in diff way i.e all > > the > > > > > tasks > > > > > > > of > > > > > > > > A > > > > > > > > > connector will get into W1 node and tasks of B Connector > into > > > W2 > > > > > > node. > > > > > > > > > > > > > > > > > > Can you please have a look for this. > > > > > > > > > > > > > > > > > > *#First Connector* > > > > > > > > > > > > > > > > > > { > > > > > > > > > "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent", > > > > > > > > > "connector": { > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > "tasks": [ > > > > > > > > > { > > > > > > > > > "id": 0, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 1, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 2, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 3, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 4, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 5, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 6, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 7, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 8, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 9, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8080" > > > > > > > > > } > > > > > > > > > ], > > > > > > > > > "type": "sink" > > > > > > > > > } > > > > > > > > > > > > > > > > > > *#Second Connector *: > > > > > > > > > > > > > > > > > > { > > > > > > > > > "name": "REGION_CODE_UPPER-Cdb_Neatransaction", > > > > > > > > > "connector": { > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > "tasks": [ > > > > > > > > > { > > > > > > > > > "id": 0, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 1, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 2, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 3, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 4, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 5, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 6, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 7, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 8, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > }, > > > > > > > > > { > > > > > > > > > "id": 9, > > > > > > > > > "state": "RUNNING", > > > > > > > > > "worker_id": "10.0.0.4:8078" > > > > > > > > > } > > > > > > > > > ], > > > > > > > > > "type": "sink" > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards and Thanks > > > > > > > > > Deepak Raghav > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >